Documentation
¶
Index ¶
- Constants
- Variables
- type Actioner
- type ActionerFunc
- type Batch
- type Errors
- type Interceptor
- type InterceptorFunc
- type Interval
- type Latch
- func (l *Latch) CanStart() bool
- func (l *Latch) CanStop() bool
- func (l *Latch) IsStarted() bool
- func (l *Latch) IsStarting() bool
- func (l *Latch) IsStopped() bool
- func (l *Latch) IsStopping() bool
- func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})
- func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})
- func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})
- func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})
- func (l *Latch) Reset()
- func (l *Latch) Started()
- func (l *Latch) Starting()
- func (l *Latch) Stopped()
- func (l *Latch) Stopping()
- func (l *Latch) WaitStarted()
- func (l *Latch) WaitStopped()
- type LatchThin
- func (lt LatchThin) CanStart() bool
- func (lt LatchThin) CanStop() bool
- func (lt LatchThin) IsStarted() bool
- func (lt LatchThin) IsStarting() bool
- func (lt LatchThin) IsStopped() bool
- func (lt LatchThin) IsStopping() bool
- func (lt *LatchThin) Reset()
- func (lt *LatchThin) Started() bool
- func (lt *LatchThin) Starting() bool
- func (lt *LatchThin) Stopped() bool
- func (lt *LatchThin) Stopping() bool
- type NoopActioner
- type Queue
- func (q *Queue[T]) Background() context.Context
- func (q *Queue[T]) Close() error
- func (q *Queue[T]) Dispatch()
- func (q *Queue[T]) MaxWorkOrDefault() int
- func (q *Queue[T]) ParallelismOrDefault() int
- func (q *Queue[T]) ShutdownGracePeriodOrDefault() time.Duration
- func (q *Queue[T]) Start() error
- func (q *Queue[T]) Stop() error
- type WorkAction
- type Worker
- func (w *Worker[T]) Background() context.Context
- func (w *Worker[T]) Dispatch()
- func (w *Worker[T]) Execute(ctx context.Context, workItem T)
- func (w *Worker[T]) HandleError(err error)
- func (w *Worker[T]) HandlePanic(r interface{})
- func (w *Worker[T]) NotifyStarted() <-chan struct{}
- func (w *Worker[T]) NotifyStopped() <-chan struct{}
- func (w *Worker[T]) Start() error
- func (w *Worker[T]) Stop() error
- func (w *Worker[T]) StopContext(ctx context.Context)
- type WorkerFinalizer
Constants ¶
const ( LatchStopped int32 = 0 LatchStarting int32 = 1 LatchResuming int32 = 2 LatchStarted int32 = 3 LatchActive int32 = 4 LatchPausing int32 = 5 LatchPaused int32 = 6 LatchStopping int32 = 7 )
Latch states
They are int32 to support atomic Load/Store calls.
const ( DefaultQueueMaxWork = 1 << 10 DefaultQueueShutdownGracePeriod = 10 * time.Second )
Queue Constants
const (
DefaultInterval = 500 * time.Millisecond
)
Interval defaults
Variables ¶
var ( ErrCannotStart = errors.New("cannot start; already started") ErrCannotStop = errors.New("cannot stop; already stopped") ErrCannotCancel = errors.New("cannot cancel; already canceled") ErrCannotStartActionRequired = errors.New("cannot start; action is required") )
Errors
Functions ¶
This section is empty.
Types ¶
type ActionerFunc ¶
ActionerFunc is a function that implements action.
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch is a collection of goroutines working on subtasks that are part of the same overall task.
func BatchContext ¶
BatchContext returns a new Batch and an associated Context derived from ctx.
The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.
func (*Batch) Go ¶
Go calls the given function in a new goroutine. It blocks until the new goroutine can be added without the number of active goroutines in the group exceeding the configured limit.
The first call to return a non-nil error cancels the group; its error will be returned by Wait.
func (*Batch) SetLimit ¶
SetLimit limits the number of active goroutines in this group to at most n. A negative value indicates no limit.
Any subsequent call to the Go method will block until it can add an active goroutine without exceeding the configured limit.
The limit must not be modified while any goroutines in the group are active.
type Errors ¶
type Errors chan error
Errors is a channel for errors
type Interceptor ¶
Interceptor returns an actioner for a given actioner.
func Interceptors ¶
func Interceptors[T, V any](interceptors ...Interceptor[T, V]) Interceptor[T, V]
Interceptors chains calls to interceptors as a single interceptor.
type InterceptorFunc ¶
InterceptorFunc is a function that implements action.
func (InterceptorFunc[T, V]) Intercept ¶
func (fn InterceptorFunc[T, V]) Intercept(action Actioner[T, V]) Actioner[T, V]
Intercept implements Interceptor for the function.
type Interval ¶
type Interval struct { Context context.Context Interval time.Duration Action func(context.Context) error Delay time.Duration StopOnError bool Errors chan error // contains filtered or unexported fields }
Interval is a background worker that performs an action on an interval.
func (*Interval) Background ¶
Background returns the provided context or context.Background()
func (*Interval) IntervalOrDefault ¶
IntervalOrDefault returns the interval or a default.
func (*Interval) Start ¶
Start starts the worker.
This will start the internal ticker, with a default initial delay of the given interval, and will return an ErrCannotStart if the interval worker is already started.
This call will block.
type Latch ¶
Latch is a helper to coordinate goroutine lifecycles, specifically waiting for goroutines to start and end.
The lifecycle is generally as follows:
0 - stopped - goto 1 1 - starting - goto 2 2 - started - goto 3 3 - stopping - goto 0
Control flow is coordinated with chan struct{}, which acts as a semaphore but can only alert (1) listener as it is buffered.
In order to start a `stopped` latch, you must call `.Reset()` first to initialize channels.
func NewLatch ¶
func NewLatch() *Latch
NewLatch creates a new latch.
It is _highly_ recommended to use this constructor.
func (*Latch) IsStarting ¶
IsStarting returns if the latch state is LatchStarting
func (*Latch) IsStopping ¶
IsStopping returns if the latch state is LatchStopping.
func (*Latch) NotifyStarted ¶
func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})
NotifyStarted returns the started signal. It is used to coordinate the transition from starting -> started. There can only be (1) effective listener at a time for these events.
func (*Latch) NotifyStarting ¶
func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})
NotifyStarting returns the starting signal. It is used to coordinate the transition from stopped -> starting. There can only be (1) effective listener at a time for these events.
func (*Latch) NotifyStopped ¶
func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})
NotifyStopped returns the stopped signal. It is used to coordinate the transition from stopping -> stopped. There can only be (1) effective listener at a time for these events.
func (*Latch) NotifyStopping ¶
func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})
NotifyStopping returns the should stop signal. It is used to trigger the transition from running -> stopping -> stopped. There can only be (1) effective listener at a time for these events.
func (*Latch) Started ¶
func (l *Latch) Started()
Started signals that the latch is started and has entered the `IsStarted` state.
func (*Latch) Starting ¶
func (l *Latch) Starting()
Starting signals the latch is starting. This is typically done before you kick off a goroutine.
func (*Latch) Stopping ¶
func (l *Latch) Stopping()
Stopping signals the latch to stop. It could also be thought of as `SignalStopping`.
func (*Latch) WaitStarted ¶
func (l *Latch) WaitStarted()
WaitStarted triggers `Starting` and waits for the `Started` signal.
func (*Latch) WaitStopped ¶
func (l *Latch) WaitStopped()
WaitStopped triggers `Stopping` and waits for the `Stopped` signal.
type LatchThin ¶
type LatchThin struct {
// contains filtered or unexported fields
}
LatchThin is an implementation of a subset of the latch api that does not support notifying on channels.
As a result, it's much easier to embed and use as a zero value.
func (LatchThin) IsStarting ¶
IsStarting returns if the latch state is LatchStarting
func (LatchThin) IsStopping ¶
IsStopping returns if the latch state is LatchStopping.
func (*LatchThin) Started ¶
Started signals that the latch is started and has entered the `IsStarted` state.
type NoopActioner ¶
type NoopActioner[T, V any] struct{}
NoopActioner is an actioner type that does nothing.
type Queue ¶
type Queue[T any] struct { Action WorkAction[T] Context context.Context Errors chan error Parallelism int MaxWork int ShutdownGracePeriod time.Duration Work chan T // contains filtered or unexported fields }
Queue is a queue with multiple workers.
func (*Queue[T]) Background ¶
Background returns a background context.
func (*Queue[T]) MaxWorkOrDefault ¶
MaxWorkOrDefault returns the work queue capacity or a default value if it is unset.
func (*Queue[T]) ParallelismOrDefault ¶
ParallelismOrDefault returns the queue worker parallelism or a default value, which is the number of CPUs.
func (*Queue[T]) ShutdownGracePeriodOrDefault ¶
ShutdownGracePeriodOrDefault returns the work queue shutdown grace period or a default value if it is unset.
type WorkAction ¶
WorkAction is an action handler for a queue.
type Worker ¶
type Worker[T any] struct { *Latch Context context.Context Action WorkAction[T] Finalizer WorkerFinalizer[T] SkipRecover bool Errors chan error Work chan T }
Worker is a worker that is pushed work over a channel. It is used by other work distribution types (i.e. queue and batch) but can also be used independently.
func NewWorker ¶
func NewWorker[T any](action WorkAction[T]) *Worker[T]
NewWorker creates a new worker.
func (*Worker[T]) Background ¶
Background returns the queue worker background context.
func (*Worker[T]) Dispatch ¶
func (w *Worker[T]) Dispatch()
Dispatch starts the listen loop for work.
func (*Worker[T]) HandleError ¶
HandleError sends a non-nil err to the error collector if one is provided.
func (*Worker[T]) HandlePanic ¶
func (w *Worker[T]) HandlePanic(r interface{})
HandleError sends a non-nil err to the error collector if one is provided.
func (*Worker[T]) NotifyStarted ¶
func (w *Worker[T]) NotifyStarted() <-chan struct{}
NotifyStarted returns the underlying latch signal.
func (*Worker[T]) NotifyStopped ¶
func (w *Worker[T]) NotifyStopped() <-chan struct{}
NotifyStopped returns the underlying latch signal.
func (*Worker[T]) Stop ¶
Stop stops the worker.
If there is an item left in the work channel it will be processed synchronously.
func (*Worker[T]) StopContext ¶
StopContext stops the worker in a given cancellation context.