Documentation ¶
Overview ¶
Package async has helper utilities for running async code with proper tracing.
When starting go routines, use async.Run:
// this starts a go routine async.Run(ctx, async.Func(func(ctx context.Context) error { ctx = trace.StartCall(ctx, "...") defer trace.EndCall(ctx) .... do whatever needs to be done ... .... just return error to abort loop .... }))
If the long running activity involves fetching from a reader or some other iterative pattern, use async.Loop
// this starts a go routine async.Loop(ctx, async.Func(func(ctx context.Context) error { ctx = trace.StartCall(ctx, "...") defer trace.EndCall(ctx) .... do whatever... return errors to abort loop ... maybe async.Sleep(ctx, time.Minute) }))
Note: async.Loop terminates the loop when the inner function returns an error or the context is canceled.
To wait for all go-routines to terminate, use Tasks.Wait. See examples for using Tasks
Index ¶
- Variables
- func Loop(ctx context.Context, r Runner)
- func Run(ctx context.Context, r Runner)
- func RunBackground(ctx context.Context, r Runner)
- func RunClose(ctx context.Context, r Runner) error
- func Sleep(ctx context.Context, duration time.Duration)
- func SleepUntil(ctx context.Context, deadline time.Time)
- type Closer
- type Func
- type MutexWithContext
- type Runner
- type Tasks
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var Default = NewTasks("async.run")
Default is the default runner
Functions ¶
func Loop ¶
Loop repeatedly executes the provided task until it returns false or the context is canceled.
Example ¶
package main import ( "context" "fmt" "time" "github.com/getoutreach/gobox/pkg/async" "github.com/getoutreach/gobox/pkg/trace" "github.com/getoutreach/gobox/pkg/trace/tracetest" ) func main() { recorder := tracetest.NewSpanRecorder() defer recorder.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() count := 0 async.Loop(ctx, async.Func(func(ctx context.Context) error { ctx = trace.StartCall(ctx, "example") defer trace.EndCall(ctx) if count < 3 { count++ fmt.Println("count", count) } else { <-ctx.Done() } return nil })) time.Sleep(time.Millisecond * 5) cancel() async.Default.Wait() }
Output: count 1 count 2 count 3
func Run ¶
Run executes a single asynchronous task.
It creates a new trace for the task and passes through deadlines.
func RunBackground ¶
RunBackground executes a single asynchronous task with background context
It creates a new trace for the task and passes through deadlines.
Types ¶
type Closer ¶ added in v1.36.0
Closer is the interface for closing a runner function. Implement this for cleaning up things.
type MutexWithContext ¶
type MutexWithContext struct {
// contains filtered or unexported fields
}
MutexWithContext is a lock that supports context cancellation.
Note that unlike the `sync.Locker` style of mutex, this one's `Lock` method can fail and you must check its return value.
func NewMutexWithContext ¶
func NewMutexWithContext() *MutexWithContext
NewMutexWithContext creates a new MutexWithContext instance.
func (*MutexWithContext) Lock ¶
func (m *MutexWithContext) Lock(ctx context.Context) error
Lock acquires the mutex, blocking if it is unavailable.
Unlike `sync.Mutex.Lock()`, this function can fail. It is responsibility of the caller to check the returned error and not proceed if it is non-nil.
func (*MutexWithContext) Unlock ¶
func (m *MutexWithContext) Unlock()
Unlock releases the mutex, allowing the next waiter to proceed.
type Tasks ¶
Tasks runs tasks
Example (Loop) ¶
package main import ( "context" "fmt" "time" "github.com/getoutreach/gobox/pkg/async" "github.com/getoutreach/gobox/pkg/trace" "github.com/getoutreach/gobox/pkg/trace/tracetest" ) func main() { recorder := tracetest.NewSpanRecorder() defer recorder.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() count := 0 tasks := async.Tasks{Name: "example"} tasks.Loop(ctx, async.Func(func(ctx context.Context) error { ctx = trace.StartCall(ctx, "example") defer trace.EndCall(ctx) if count < 3 { count++ fmt.Println("count", count) } else { <-ctx.Done() } return nil })) async.Sleep(ctx, time.Millisecond*5) cancel() tasks.Wait() }
Output: count 1 count 2 count 3
Example (Run) ¶
package main import ( "context" "fmt" "github.com/getoutreach/gobox/pkg/async" "github.com/getoutreach/gobox/pkg/trace" "github.com/getoutreach/gobox/pkg/trace/tracetest" ) func main() { recorder := tracetest.NewSpanRecorder() defer recorder.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() tasks := async.Tasks{Name: "example"} tasks.Run(ctx, async.Func(func(ctx context.Context) error { ctx = trace.StartCall(ctx, "example") defer trace.EndCall(ctx) fmt.Println("Run example") return nil })) cancel() tasks.Wait() }
Output: Run example
Example (RunBackground) ¶
package main import ( "context" "fmt" "github.com/getoutreach/gobox/pkg/async" "github.com/getoutreach/gobox/pkg/trace" "github.com/getoutreach/gobox/pkg/trace/tracetest" ) func main() { recorder := tracetest.NewSpanRecorder() defer recorder.Close() ctxMain, cancel := context.WithCancel(context.Background()) async.RunBackground(ctxMain, async.Func(func(ctx context.Context) error { // the task is expected to run with background context // cancel the context passed into RunBackground() function should not // propagate to the context used by async.Func() cancel() ctx = trace.StartCall(ctx, "example") defer trace.EndCall(ctx) fmt.Println(ctx.Err()) fmt.Println("Run example") return nil })) async.Default.Wait() }
Output: <nil> Run example