async

package
v1.40.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2022 License: Apache-2.0, Apache-2.0 Imports: 10 Imported by: 7

Documentation

Overview

Package async has helper utilities for running async code with proper tracing.

When starting go routines, use async.Run:

// this starts a go routine
async.Run(ctx, async.Func(func(ctx context.Context) error {
   ctx = trace.StartCall(ctx, "...")
   defer trace.EndCall(ctx)
   .... do whatever needs to be done ...
   .... just return error to abort loop ....
}))

If the long running activity involves fetching from a reader or some other iterative pattern, use async.Loop

// this starts a go routine
async.Loop(ctx, async.Func(func(ctx context.Context) error {
    ctx = trace.StartCall(ctx, "...")
    defer trace.EndCall(ctx)
   .... do whatever... return errors to abort loop
   ... maybe async.Sleep(ctx, time.Minute)
}))

Note: async.Loop terminates the loop when the inner function returns an error or the context is canceled.

To wait for all go-routines to terminate, use Tasks.Wait. See examples for using Tasks

Index

Examples

Constants

This section is empty.

Variables

View Source
var Default = NewTasks("async.run")

Default is the default runner

Functions

func Loop

func Loop(ctx context.Context, r Runner)

Loop repeatedly executes the provided task until it returns false or the context is canceled.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/getoutreach/gobox/pkg/async"
	"github.com/getoutreach/gobox/pkg/trace"
	"github.com/getoutreach/gobox/pkg/trace/tracetest"
)

func main() {
	trlogs := tracetest.NewTraceLog()
	defer trlogs.Close()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	count := 0
	async.Loop(ctx, async.Func(func(ctx context.Context) error {
		ctx = trace.StartCall(ctx, "example")
		defer trace.EndCall(ctx)

		if count < 3 {
			count++
			fmt.Println("count", count)
		} else {
			<-ctx.Done()
		}
		return nil
	}))

	time.Sleep(time.Millisecond * 5)
	cancel()
	async.Default.Wait()

}
Output:

count 1
count 2
count 3

func Run

func Run(ctx context.Context, r Runner)

Run executes a single asynchronous task.

It creates a new trace for the task and passes through deadlines.

func RunBackground

func RunBackground(ctx context.Context, r Runner)

RunBackground executes a single asynchronous task with background context

It creates a new trace for the task and passes through deadlines.

func RunClose added in v1.36.0

func RunClose(ctx context.Context, r Runner) error

RunClose closes any references a runner might be using

func Sleep

func Sleep(ctx context.Context, duration time.Duration)

Sleep sleeps for the specified duration but can be canceled if the context is canceled or the context has an earlier deadline/timeout.

func SleepUntil

func SleepUntil(ctx context.Context, deadline time.Time)

SleepUntil sleeps until the specified deadline but can be canceled if the context is canceled or has an earlier deadline.

Types

type Closer added in v1.36.0

type Closer interface {
	Close(ctx context.Context) error
}

Closer is the interface for closing a runner function. Implement this for cleaning up things.

type Func

type Func func(ctx context.Context) error

Func is a helper that implements the Runner interface

func (Func) Run

func (f Func) Run(ctx context.Context) error

Run implements the Runner interface

type MutexWithContext

type MutexWithContext struct {
	// contains filtered or unexported fields
}

MutexWithContext is a lock that supports context cancellation.

Note that unlike the `sync.Locker` style of mutex, this one's `Lock` method can fail and you must check its return value.

func NewMutexWithContext

func NewMutexWithContext() *MutexWithContext

NewMutexWithContext creates a new MutexWithContext instance.

func (*MutexWithContext) Lock

func (m *MutexWithContext) Lock(ctx context.Context) error

Lock acquires the mutex, blocking if it is unavailable.

Unlike `sync.Mutex.Lock()`, this function can fail. It is responsibility of the caller to check the returned error and not proceed if it is non-nil.

func (*MutexWithContext) Unlock

func (m *MutexWithContext) Unlock()

Unlock releases the mutex, allowing the next waiter to proceed.

type Runner

type Runner interface {
	Run(ctx context.Context) error
}

Runner is the default interface for a runner function

func RunGroup added in v1.36.0

func RunGroup(rg []Runner) Runner

RunGroup runs a group of runner tasks and exits when the first run group errors out

type Tasks

type Tasks struct {
	Name string
	sync.WaitGroup
}

Tasks runs tasks

Example (Loop)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/getoutreach/gobox/pkg/async"
	"github.com/getoutreach/gobox/pkg/trace"
	"github.com/getoutreach/gobox/pkg/trace/tracetest"
)

func main() {
	trlogs := tracetest.NewTraceLog()
	defer trlogs.Close()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	count := 0
	tasks := async.Tasks{Name: "example"}
	tasks.Loop(ctx, async.Func(func(ctx context.Context) error {
		ctx = trace.StartCall(ctx, "example")
		defer trace.EndCall(ctx)

		if count < 3 {
			count++
			fmt.Println("count", count)
		} else {
			<-ctx.Done()
		}
		return nil
	}))

	async.Sleep(ctx, time.Millisecond*5)
	cancel()
	tasks.Wait()

}
Output:

count 1
count 2
count 3
Example (Run)
package main

import (
	"context"
	"fmt"

	"github.com/getoutreach/gobox/pkg/async"
	"github.com/getoutreach/gobox/pkg/trace"
	"github.com/getoutreach/gobox/pkg/trace/tracetest"
)

func main() {
	trlogs := tracetest.NewTraceLog()
	defer trlogs.Close()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	tasks := async.Tasks{Name: "example"}
	tasks.Run(ctx, async.Func(func(ctx context.Context) error {
		ctx = trace.StartCall(ctx, "example")
		defer trace.EndCall(ctx)

		fmt.Println("Run example")
		return nil
	}))

	cancel()
	tasks.Wait()

}
Output:

Run example
Example (RunBackground)
package main

import (
	"context"
	"fmt"

	"github.com/getoutreach/gobox/pkg/async"
	"github.com/getoutreach/gobox/pkg/trace"
	"github.com/getoutreach/gobox/pkg/trace/tracetest"
)

func main() {
	trlogs := tracetest.NewTraceLog()
	defer trlogs.Close()

	ctxMain, cancel := context.WithCancel(context.Background())

	async.RunBackground(ctxMain, async.Func(func(ctx context.Context) error {
		// the task is expected to run with background context
		// cancel the context passed into RunBackground() function should not
		// propagate to the context used by async.Func()
		cancel()
		ctx = trace.StartCall(ctx, "example")
		defer trace.EndCall(ctx)

		fmt.Println(ctx.Err())
		fmt.Println("Run example")
		return nil
	}))

	async.Default.Wait()

}
Output:

<nil>
Run example

func NewTasks

func NewTasks(name string) *Tasks

NewTasks creates new instance of Tasks

func (*Tasks) Loop

func (t *Tasks) Loop(ctx context.Context, r Runner)

Loop repeatedly executes the provided task until it returns false or the context is canceled.

func (*Tasks) Run

func (t *Tasks) Run(ctx context.Context, r Runner)

Run executes a single asynchronous task.

It creates a new trace for the task and passes through deadlines.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL