Documentation ¶
Overview ¶
Package async provides synchronization primitives and background workers. This is a core package that is used by a lot of other packages.
Index ¶
- Constants
- Variables
- func Recover(action func() error, errors chan error)
- func RecoverGroup(action func() error, errors chan error, wg *sync.WaitGroup)
- type Actioner
- type ActionerFunc
- type Batch
- type BatchOption
- type Checker
- type CheckerFunc
- type ContextAction
- type Errors
- type Future
- type Interceptor
- type InterceptorFunc
- type Interval
- type IntervalOption
- type Latch
- func (l *Latch) CanStart() bool
- func (l *Latch) CanStop() bool
- func (l *Latch) IsStarted() bool
- func (l *Latch) IsStarting() bool
- func (l *Latch) IsStopped() (isStopped bool)
- func (l *Latch) IsStopping() bool
- func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})
- func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})
- func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})
- func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})
- func (l *Latch) Reset()
- func (l *Latch) Started()
- func (l *Latch) Starting()
- func (l *Latch) Stopped()
- func (l *Latch) Stopping()
- func (l *Latch) WaitStarted()
- func (l *Latch) WaitStopped()
- type NoopActioner
- type Queue
- type QueueOption
- type SignalGroup
- type WorkAction
- type Worker
- func (w *Worker) Background() context.Context
- func (w *Worker) Dispatch()
- func (w *Worker) Enqueue(obj interface{})
- func (w *Worker) Execute(ctx context.Context, workItem interface{})
- func (w *Worker) HandleError(err error)
- func (w *Worker) NotifyStarted() <-chan struct{}
- func (w *Worker) NotifyStopped() <-chan struct{}
- func (w *Worker) Start() error
- func (w *Worker) Stop() error
- func (w *Worker) StopContext(ctx context.Context)
- type WorkerFinalizer
Constants ¶
const ( LatchStopped int32 = 0 LatchStarting int32 = 1 LatchResuming int32 = 2 LatchStarted int32 = 3 LatchActive int32 = 4 LatchPausing int32 = 5 LatchPaused int32 = 6 LatchStopping int32 = 7 )
Latch states
const ( DefaultQueueMaxWork = 1 << 10 DefaultInterval = 500 * time.Millisecond DefaultShutdownGracePeriod = 10 * time.Second )
Constants
Variables ¶
var ( ErrCannotStart ex.Class = "cannot start; already started" ErrCannotStop ex.Class = "cannot stop; already stopped" ErrCannotCancel ex.Class = "cannot cancel; already canceled" )
Errors
Functions ¶
Types ¶
type ActionerFunc ¶ added in v1.20210615.7
ActionerFunc is a function that implements action.
type Batch ¶ added in v1.20201204.1
type Batch struct { Action WorkAction SkipRecoverPanics bool Parallelism int Work chan interface{} Errors chan error }
Batch is a batch of work executed by a fixed count of workers.
func NewBatch ¶ added in v1.20201204.1
func NewBatch(work chan interface{}, action WorkAction, options ...BatchOption) *Batch
NewBatch creates a new batch processor. Batch processes are a known quantity of work that needs to be processed in parallel.
type BatchOption ¶ added in v1.20201204.1
type BatchOption func(*Batch)
BatchOption is an option for the batch worker.
func OptBatchErrors ¶ added in v1.20201204.1
func OptBatchErrors(errors chan error) BatchOption
OptBatchErrors sets the batch worker error return channel.
func OptBatchParallelism ¶ added in v1.20201204.1
func OptBatchParallelism(parallelism int) BatchOption
OptBatchParallelism sets the batch worker parallelism, or the number of workers to create.
func OptBatchSkipRecoverPanics ¶ added in v1.20210908.5
func OptBatchSkipRecoverPanics(skipRecoverPanics bool) BatchOption
OptBatchSkipRecoverPanics sets the batch worker to throw (or to recover) panics.
type CheckerFunc ¶ added in v1.20210615.7
CheckerFunc implements Checker.
type ContextAction ¶ added in v1.20201204.1
ContextAction is an action that is given a context and returns an error.
type Errors ¶ added in v1.20210908.5
type Errors chan error
Errors is a channel for errors
type Future ¶ added in v1.20210306.2
type Future struct {
// contains filtered or unexported fields
}
Future is a deferred action.
func Await ¶ added in v1.20210306.2
func Await(ctx context.Context, action ContextAction) *Future
Await returns a new future.
type Interceptor ¶ added in v1.20210615.7
Interceptor returns an actioner for a given actioner.
func Interceptors ¶ added in v1.20210615.7
func Interceptors(interceptors ...Interceptor) Interceptor
Interceptors chains calls to interceptors as a single interceptor.
type InterceptorFunc ¶ added in v1.20210615.7
InterceptorFunc is a function that implements action.
func (InterceptorFunc) Intercept ¶ added in v1.20210615.7
func (fn InterceptorFunc) Intercept(action Actioner) Actioner
Intercept implements Interceptor for the function.
type Interval ¶
type Interval struct { *Latch Context context.Context Interval time.Duration Action ContextAction Delay time.Duration StopOnError bool Errors chan error }
Interval is a background worker that performs an action on an interval.
func NewInterval ¶
func NewInterval(action ContextAction, interval time.Duration, options ...IntervalOption) *Interval
NewInterval returns a new worker that runs an action on an interval.
Example:
iw := NewInterval(func(ctx context.Context) error { return nil }, 500*time.Millisecond) go iw.Start() <-iw.Started()
type IntervalOption ¶ added in v1.20201204.1
type IntervalOption func(*Interval)
IntervalOption is an option for the interval worker.
func OptIntervalContext ¶ added in v1.20201204.1
func OptIntervalContext(ctx context.Context) IntervalOption
OptIntervalContext sets the interval worker context.
func OptIntervalDelay ¶ added in v1.20201204.1
func OptIntervalDelay(d time.Duration) IntervalOption
OptIntervalDelay sets the interval worker start delay.
func OptIntervalErrors ¶ added in v1.20201204.1
func OptIntervalErrors(errors chan error) IntervalOption
OptIntervalErrors sets the interval worker start error channel.
func OptIntervalStopOnError ¶ added in v1.20210306.2
func OptIntervalStopOnError(stopOnError bool) IntervalOption
OptIntervalStopOnError sets if the interval worker should stop on action error.
type Latch ¶
type Latch struct {
// contains filtered or unexported fields
}
Latch is a helper to coordinate goroutine lifecycles, specifically waiting for goroutines to start and end.
The lifecycle is generally as follows:
0 - stopped - goto 1 1 - starting - goto 2 2 - started - goto 3 3 - stopping - goto 0
Control flow is coordinated with chan struct{}, which acts as a semaphore but can only alert (1) listener as it is buffered. It is incorrect to use Latch for anything other than to tie one goroutine to another. Writers *must* perform state transitions in the recommended order, readers *must* read for state transitions in the expected order. Failure to do so can result in deadlocks. Channels are used as the synchronization mechanism, we assume a 1-1 tie between goroutines with a single read/write pair on either end (in any order).
In order to start a `stopped` latch, you must call `.Reset()` first to initialize channels.
func (*Latch) IsStarted ¶ added in v1.20201204.1
IsStarted returns if the latch state is LatchStarted.
func (*Latch) IsStarting ¶
IsStarting returns if the latch state is LatchStarting
func (*Latch) IsStopping ¶
IsStopping returns if the latch state is LatchStopping.
func (*Latch) NotifyStarted ¶
func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})
NotifyStarted returns the started signal. It is used to coordinate the transition from starting -> started. There can only be (1) effective listener at a time for these events.
func (*Latch) NotifyStarting ¶
func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})
NotifyStarting returns the starting signal. It is used to coordinate the transition from stopped -> starting. There can only be (1) effective listener at a time for these events.
func (*Latch) NotifyStopped ¶
func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})
NotifyStopped returns the stopped signal. It is used to coordinate the transition from stopping -> stopped. There can only be (1) effective listener at a time for these events.
func (*Latch) NotifyStopping ¶
func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})
NotifyStopping returns the should stop signal. It is used to trigger the transition from running -> stopping -> stopped. There can only be (1) effective listener at a time for these events.
func (*Latch) Started ¶
func (l *Latch) Started()
Started signals that the latch is started and has entered the `IsStarted` state.
func (*Latch) Starting ¶
func (l *Latch) Starting()
Starting signals the latch is starting. This is typically done before you kick off a goroutine.
func (*Latch) Stopping ¶
func (l *Latch) Stopping()
Stopping signals the latch to stop. It could also be thought of as `SignalStopping`.
func (*Latch) WaitStarted ¶ added in v1.20201204.1
func (l *Latch) WaitStarted()
WaitStarted triggers `Starting` and waits for the `Started` signal.
func (*Latch) WaitStopped ¶ added in v1.20201204.1
func (l *Latch) WaitStopped()
WaitStopped triggers `Stopping` and waits for the `Stopped` signal.
type NoopActioner ¶ added in v1.20210615.7
type NoopActioner struct{}
NoopActioner is an actioner type that does nothing.
type Queue ¶ added in v1.20201204.1
type Queue struct { *Latch Action WorkAction Context context.Context Errors chan error Parallelism int MaxWork int ShutdownGracePeriod time.Duration // these will typically be set by Start AvailableWorkers chan *Worker Workers []*Worker Work chan interface{} }
Queue is a queue with multiple workers.
func NewQueue ¶
func NewQueue(action WorkAction, options ...QueueOption) *Queue
NewQueue returns a new parallel queue.
func (*Queue) Background ¶ added in v1.20201204.1
Background returns a background context.
func (*Queue) Close ¶ added in v1.20201204.1
Close stops the queue. Any work left in the queue will be discarded.
func (*Queue) Dispatch ¶ added in v1.20201204.1
func (q *Queue) Dispatch()
Dispatch processes work items in a loop.
func (*Queue) Enqueue ¶ added in v1.20201204.1
func (q *Queue) Enqueue(obj interface{})
Enqueue adds an item to the work queue.
func (*Queue) ReturnWorker ¶ added in v1.20201204.1
ReturnWorker returns a given worker to the worker queue.
type QueueOption ¶ added in v1.20201204.1
type QueueOption func(*Queue)
QueueOption is an option for the queue worker.
func OptQueueContext ¶ added in v1.20201204.1
func OptQueueContext(ctx context.Context) QueueOption
OptQueueContext sets the queue worker context.
func OptQueueErrors ¶ added in v1.20201204.1
func OptQueueErrors(errors chan error) QueueOption
OptQueueErrors sets the queue worker start error channel.
func OptQueueMaxWork ¶ added in v1.20201204.1
func OptQueueMaxWork(maxWork int) QueueOption
OptQueueMaxWork sets the queue worker max work.
func OptQueueParallelism ¶ added in v1.20201204.1
func OptQueueParallelism(parallelism int) QueueOption
OptQueueParallelism sets the queue worker parallelism.
type SignalGroup ¶ added in v1.20210306.2
type SignalGroup struct {
// contains filtered or unexported fields
}
SignalGroup is a wait group but it awaits on a signal.
func (*SignalGroup) Add ¶ added in v1.20210306.2
func (sg *SignalGroup) Add(delta int)
Add adds delta.
func (*SignalGroup) Done ¶ added in v1.20210306.2
func (sg *SignalGroup) Done()
Done decrements delta.
func (*SignalGroup) Wait ¶ added in v1.20210306.2
func (sg *SignalGroup) Wait() <-chan struct{}
Wait returns a channel you can select from.
type WorkAction ¶ added in v1.20201204.1
WorkAction is an action handler for a queue.
type Worker ¶ added in v1.20201204.1
type Worker struct { *Latch Context context.Context Action WorkAction Finalizer WorkerFinalizer SkipRecoverPanics bool Errors chan error Work chan interface{} }
Worker is a worker that is pushed work over a channel. It is used by other work distribution types (i.e. queue and batch) but can also be used independently.
func NewWorker ¶ added in v1.20201204.1
func NewWorker(action WorkAction) *Worker
NewWorker creates a new worker.
func (*Worker) Background ¶ added in v1.20201204.1
Background returns the queue worker background context.
func (*Worker) Dispatch ¶ added in v1.20201204.1
func (w *Worker) Dispatch()
Dispatch starts the listen loop for work.
func (*Worker) Enqueue ¶ added in v1.20201204.1
func (w *Worker) Enqueue(obj interface{})
Enqueue adds an item to the work queue.
func (*Worker) HandleError ¶ added in v1.20201204.1
HandleError sends a non-nil err to the error collector if one is provided.
func (*Worker) NotifyStarted ¶ added in v1.20201204.1
func (w *Worker) NotifyStarted() <-chan struct{}
NotifyStarted returns the underlying latch signal.
func (*Worker) NotifyStopped ¶ added in v1.20201204.1
func (w *Worker) NotifyStopped() <-chan struct{}
NotifyStopped returns the underlying latch signal.
func (*Worker) Stop ¶ added in v1.20201204.1
Stop stops the worker.
If there is an item left in the work channel it will be processed synchronously.
func (*Worker) StopContext ¶ added in v1.20210215.2
StopContext stops the worker in a given cancellation context.