async

package
v0.0.0-...-bc49051 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LatchStopped  int32 = 0
	LatchStarting int32 = 1
	LatchResuming int32 = 2
	LatchStarted  int32 = 3
	LatchActive   int32 = 4
	LatchPausing  int32 = 5
	LatchPaused   int32 = 6
	LatchStopping int32 = 7
)

Latch states

They are int32 to support atomic Load/Store calls.

View Source
const (
	DefaultQueueMaxWork             = 1 << 10
	DefaultQueueShutdownGracePeriod = 10 * time.Second
)

Queue Constants

View Source
const (
	DefaultInterval = 500 * time.Millisecond
)

Interval defaults

Variables

View Source
var (
	ErrCannotStart               = errors.New("cannot start; already started")
	ErrCannotStop                = errors.New("cannot stop; already stopped")
	ErrCannotCancel              = errors.New("cannot cancel; already canceled")
	ErrCannotStartActionRequired = errors.New("cannot start; action is required")
)

Errors

Functions

This section is empty.

Types

type Actioner

type Actioner[T, V any] interface {
	Action(context.Context, T) (V, error)
}

Actioner is a type that can be used as a tracked action.

type ActionerFunc

type ActionerFunc[T, V any] func(context.Context, T) (V, error)

ActionerFunc is a function that implements action.

func (ActionerFunc[T, V]) Action

func (af ActionerFunc[T, V]) Action(ctx context.Context, args T) (V, error)

Action implements actioner for the function.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a collection of goroutines working on subtasks that are part of the same overall task.

func BatchContext

func BatchContext(ctx context.Context) (*Batch, context.Context)

BatchContext returns a new Batch and an associated Context derived from ctx.

The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.

func (*Batch) Go

func (b *Batch) Go(f func() error)

Go calls the given function in a new goroutine. It blocks until the new goroutine can be added without the number of active goroutines in the group exceeding the configured limit.

The first call to return a non-nil error cancels the group; its error will be returned by Wait.

func (*Batch) SetLimit

func (b *Batch) SetLimit(n int)

SetLimit limits the number of active goroutines in this group to at most n. A negative value indicates no limit.

Any subsequent call to the Go method will block until it can add an active goroutine without exceeding the configured limit.

The limit must not be modified while any goroutines in the group are active.

func (*Batch) TryGo

func (b *Batch) TryGo(f func() error) bool

TryGo calls the given function in a new goroutine only if the number of active goroutines in the group is currently below the configured limit.

The return value reports whether the goroutine was started.

func (*Batch) Wait

func (b *Batch) Wait() error

Wait blocks until all function calls from the Go method have returned, then returns the first non-nil error (if any) from them.

type Errors

type Errors chan error

Errors is a channel for errors

func (Errors) All

func (e Errors) All() error

All returns all the non-nil errors in the channel as a multi-error.

func (Errors) First

func (e Errors) First() error

First returns the first (non-nil) error.

type Interceptor

type Interceptor[T, V any] interface {
	Intercept(action Actioner[T, V]) Actioner[T, V]
}

Interceptor returns an actioner for a given actioner.

func Interceptors

func Interceptors[T, V any](interceptors ...Interceptor[T, V]) Interceptor[T, V]

Interceptors chains calls to interceptors as a single interceptor.

type InterceptorFunc

type InterceptorFunc[T, V any] func(Actioner[T, V]) Actioner[T, V]

InterceptorFunc is a function that implements action.

func (InterceptorFunc[T, V]) Intercept

func (fn InterceptorFunc[T, V]) Intercept(action Actioner[T, V]) Actioner[T, V]

Intercept implements Interceptor for the function.

type Interval

type Interval struct {
	Context     context.Context
	Interval    time.Duration
	Action      func(context.Context) error
	Delay       time.Duration
	StopOnError bool
	Errors      chan error
	// contains filtered or unexported fields
}

Interval is a background worker that performs an action on an interval.

func (*Interval) Background

func (i *Interval) Background() context.Context

Background returns the provided context or context.Background()

func (*Interval) Dispatch

func (i *Interval) Dispatch() (err error)

Dispatch is the main dispatch loop.

func (*Interval) IntervalOrDefault

func (i *Interval) IntervalOrDefault() time.Duration

IntervalOrDefault returns the interval or a default.

func (*Interval) Start

func (i *Interval) Start() error

Start starts the worker.

This will start the internal ticker, with a default initial delay of the given interval, and will return an ErrCannotStart if the interval worker is already started.

This call will block.

func (*Interval) Started

func (i *Interval) Started() <-chan struct{}

Started returns the channel to notify when the worker starts.

func (*Interval) Stop

func (i *Interval) Stop() error

Stop stops the worker.

type Latch

type Latch struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Latch is a helper to coordinate goroutine lifecycles, specifically waiting for goroutines to start and end.

The lifecycle is generally as follows:

0 - stopped - goto 1
1 - starting - goto 2
2 - started - goto 3
3 - stopping - goto 0

Control flow is coordinated with chan struct{}, which acts as a semaphore but can only alert (1) listener as it is buffered.

In order to start a `stopped` latch, you must call `.Reset()` first to initialize channels.

func NewLatch

func NewLatch() *Latch

NewLatch creates a new latch.

It is _highly_ recommended to use this constructor.

func (*Latch) CanStart

func (l *Latch) CanStart() bool

CanStart returns if the latch can start.

func (*Latch) CanStop

func (l *Latch) CanStop() bool

CanStop returns if the latch can stop.

func (*Latch) IsStarted

func (l *Latch) IsStarted() bool

IsStarted returns if the latch state is LatchStarted.

func (*Latch) IsStarting

func (l *Latch) IsStarting() bool

IsStarting returns if the latch state is LatchStarting

func (*Latch) IsStopped

func (l *Latch) IsStopped() bool

IsStopped returns if the latch state is LatchStopped.

func (*Latch) IsStopping

func (l *Latch) IsStopping() bool

IsStopping returns if the latch state is LatchStopping.

func (*Latch) NotifyStarted

func (l *Latch) NotifyStarted() (notifyStarted <-chan struct{})

NotifyStarted returns the started signal. It is used to coordinate the transition from starting -> started. There can only be (1) effective listener at a time for these events.

func (*Latch) NotifyStarting

func (l *Latch) NotifyStarting() (notifyStarting <-chan struct{})

NotifyStarting returns the starting signal. It is used to coordinate the transition from stopped -> starting. There can only be (1) effective listener at a time for these events.

func (*Latch) NotifyStopped

func (l *Latch) NotifyStopped() (notifyStopped <-chan struct{})

NotifyStopped returns the stopped signal. It is used to coordinate the transition from stopping -> stopped. There can only be (1) effective listener at a time for these events.

func (*Latch) NotifyStopping

func (l *Latch) NotifyStopping() (notifyStopping <-chan struct{})

NotifyStopping returns the should stop signal. It is used to trigger the transition from running -> stopping -> stopped. There can only be (1) effective listener at a time for these events.

func (*Latch) Reset

func (l *Latch) Reset()

Reset resets the latch.

func (*Latch) Started

func (l *Latch) Started()

Started signals that the latch is started and has entered the `IsStarted` state.

func (*Latch) Starting

func (l *Latch) Starting()

Starting signals the latch is starting. This is typically done before you kick off a goroutine.

func (*Latch) Stopped

func (l *Latch) Stopped()

Stopped signals the latch has stopped.

func (*Latch) Stopping

func (l *Latch) Stopping()

Stopping signals the latch to stop. It could also be thought of as `SignalStopping`.

func (*Latch) WaitStarted

func (l *Latch) WaitStarted()

WaitStarted triggers `Starting` and waits for the `Started` signal.

func (*Latch) WaitStopped

func (l *Latch) WaitStopped()

WaitStopped triggers `Stopping` and waits for the `Stopped` signal.

type LatchThin

type LatchThin struct {
	// contains filtered or unexported fields
}

LatchThin is an implementation of a subset of the latch api that does not support notifying on channels.

As a result, it's much easier to embed and use as a zero value.

func (LatchThin) CanStart

func (lt LatchThin) CanStart() bool

CanStart returns if the latch can start.

func (LatchThin) CanStop

func (lt LatchThin) CanStop() bool

CanStop returns if the latch can stop.

func (LatchThin) IsStarted

func (lt LatchThin) IsStarted() bool

IsStarted returns if the latch state is LatchStarted.

func (LatchThin) IsStarting

func (lt LatchThin) IsStarting() bool

IsStarting returns if the latch state is LatchStarting

func (LatchThin) IsStopped

func (lt LatchThin) IsStopped() bool

IsStopped returns if the latch state is LatchStopped.

func (LatchThin) IsStopping

func (lt LatchThin) IsStopping() bool

IsStopping returns if the latch state is LatchStopping.

func (*LatchThin) Reset

func (lt *LatchThin) Reset()

Reset resets the latch.

func (*LatchThin) Started

func (lt *LatchThin) Started() bool

Started signals that the latch is started and has entered the `IsStarted` state.

func (*LatchThin) Starting

func (lt *LatchThin) Starting() bool

Starting signals the latch is starting.

func (*LatchThin) Stopped

func (lt *LatchThin) Stopped() bool

Stopped signals the latch has stopped.

func (*LatchThin) Stopping

func (lt *LatchThin) Stopping() bool

Stopping signals the latch to stop.

type NoopActioner

type NoopActioner[T, V any] struct{}

NoopActioner is an actioner type that does nothing.

func (NoopActioner[T, V]) Action

func (n NoopActioner[T, V]) Action(_ context.Context, _ T) (out V, err error)

Action implements actioner

type Queue

type Queue[T any] struct {
	Action              WorkAction[T]
	Context             context.Context
	Errors              chan error
	Parallelism         int
	MaxWork             int
	ShutdownGracePeriod time.Duration

	Work chan T
	// contains filtered or unexported fields
}

Queue is a queue with multiple workers.

func (*Queue[T]) Background

func (q *Queue[T]) Background() context.Context

Background returns a background context.

func (*Queue[T]) Close

func (q *Queue[T]) Close() error

Close stops the queue. Any work left in the queue will be discarded.

func (*Queue[T]) Dispatch

func (q *Queue[T]) Dispatch()

Dispatch processes work items in a loop.

func (*Queue[T]) MaxWorkOrDefault

func (q *Queue[T]) MaxWorkOrDefault() int

MaxWorkOrDefault returns the work queue capacity or a default value if it is unset.

func (*Queue[T]) ParallelismOrDefault

func (q *Queue[T]) ParallelismOrDefault() int

ParallelismOrDefault returns the queue worker parallelism or a default value, which is the number of CPUs.

func (*Queue[T]) ShutdownGracePeriodOrDefault

func (q *Queue[T]) ShutdownGracePeriodOrDefault() time.Duration

ShutdownGracePeriodOrDefault returns the work queue shutdown grace period or a default value if it is unset.

func (*Queue[T]) Start

func (q *Queue[T]) Start() error

Start starts the queue and its workers. This call blocks.

func (*Queue[T]) Stop

func (q *Queue[T]) Stop() error

Stop stops the queue and processes any remaining items.

type WorkAction

type WorkAction[T any] func(context.Context, T) error

WorkAction is an action handler for a queue.

type Worker

type Worker[T any] struct {
	*Latch

	Context   context.Context
	Action    WorkAction[T]
	Finalizer WorkerFinalizer[T]

	SkipRecover bool
	Errors      chan error
	Work        chan T
}

Worker is a worker that is pushed work over a channel. It is used by other work distribution types (i.e. queue and batch) but can also be used independently.

func NewWorker

func NewWorker[T any](action WorkAction[T]) *Worker[T]

NewWorker creates a new worker.

func (*Worker[T]) Background

func (w *Worker[T]) Background() context.Context

Background returns the queue worker background context.

func (*Worker[T]) Dispatch

func (w *Worker[T]) Dispatch()

Dispatch starts the listen loop for work.

func (*Worker[T]) Execute

func (w *Worker[T]) Execute(ctx context.Context, workItem T)

Execute invokes the action and recovers panics.

func (*Worker[T]) HandleError

func (w *Worker[T]) HandleError(err error)

HandleError sends a non-nil err to the error collector if one is provided.

func (*Worker[T]) HandlePanic

func (w *Worker[T]) HandlePanic(r interface{})

HandleError sends a non-nil err to the error collector if one is provided.

func (*Worker[T]) NotifyStarted

func (w *Worker[T]) NotifyStarted() <-chan struct{}

NotifyStarted returns the underlying latch signal.

func (*Worker[T]) NotifyStopped

func (w *Worker[T]) NotifyStopped() <-chan struct{}

NotifyStopped returns the underlying latch signal.

func (*Worker[T]) Start

func (w *Worker[T]) Start() error

Start starts the worker with a given context.

func (*Worker[T]) Stop

func (w *Worker[T]) Stop() error

Stop stops the worker.

If there is an item left in the work channel it will be processed synchronously.

func (*Worker[T]) StopContext

func (w *Worker[T]) StopContext(ctx context.Context)

StopContext stops the worker in a given cancellation context.

type WorkerFinalizer

type WorkerFinalizer[T any] func(context.Context, *Worker[T]) error

WorkerFinalizer is an action handler for a queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL