task

package
v0.0.0-...-7a622f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package task contains types that can be used to create cancelable tasks.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Async

func Async(ctx context.Context, t Task) (stop func() error)

Async runs the task t on a new go-routine, retuning a function that cancels the task's context, and blocks until the task completes.

func IgnoreCancellation

func IgnoreCancellation(ctx context.Context) context.Context

IgnoreCancellation returns a context that will pretend to never be canceled.

func NewSignal

func NewSignal() (Signal, Task)

NewSignal builds a new signal, and then returns the signal and a Task that is used to fire the signal. The returned fire Task must only be called once.

func Poll

func Poll(ctx context.Context, i time.Duration, f func(context.Context) error) error

Poll blocks, calling f at regular intervals of i until the context is cancelled or f returns an error.

func Pool

func Pool(queue int, parallel int) (Executor, Task)

Pool returns a new Executor that uses a pool of goroutines to run the tasks, and a Task that shuts down the pool. The number of goroutines in the pool is controlled by parallel, and it must be greater than 0. The length of the submission queue is controlled by queue. It may be 0, in which case the executor will block until a goroutine is ready to accept the task. It will also block if the queue fills up. The shutdown task may only be called once, and it is an error to call the executor again after the shutdown task has run.

func Prepare

func Prepare(ctx context.Context, task Task) (Handle, Runner)

Prepare is used to build a new Signal,Runner pair for a Task. The Signal will be closed when the task completes. The same task can be passed to Run multiple times, and will build a new Signal, Runner pair each time, but the returned runner should be executed exactly once, which will run the Task. In general this method is only used by Executor implementations when scheduling new tasks.

func Retry

func Retry(ctx context.Context, maxAttempts int, retryDelay time.Duration, f func(context.Context) (done bool, err error)) error

Retry repeatedly calls f until f returns a true, the number of attempts reaches maxAttempts or the context is cancelled. Retry will sleep for retryDelay between retry attempts. if maxAttempts <= 0, then there is no maximum limit to the number of times f will be called.

func ShouldStop

func ShouldStop(ctx context.Context) <-chan struct{}

ShouldStop returns a chan that's closed when work done on behalf of this context should be stopped. See context.Context.Done for more details.

func StopReason

func StopReason(ctx context.Context) error

StopReason returns a non-nil error value after Done is closed. See context.Context.Err for more details.

func Stopped

func Stopped(ctx context.Context) bool

Stopped is shorthand for StopReason(ctx) != nil because it increases the readability of common use cases.

Types

type Baton

type Baton chan interface{}

Baton implements a task interlock. A baton must be owned by exactly 1 task at any given time, so a release blocks until a corresponding acquire occurs. You can pass a value through the baton from the release to the acquire.

func NewBaton

func NewBaton() Baton

NewBaton returns a new Baton, with the expectation that the calling goroutine owns the baton.

func (Baton) Acquire

func (b Baton) Acquire() interface{}

Acquire is a request to pick up the baton, it will block until another goroutine releases it. It returns the value passed to the release that triggers it.

func (Baton) Relay

func (b Baton) Relay()

Relay is a helper that does an Acquire followed by a Release with the value that came from the Acquire. This waits for the baton to be available, and then immediately passes back, used as a signalling gate.

func (Baton) Release

func (b Baton) Release(value interface{})

Release is a request to relinquish the baton, it will block until another goroutine acquires it. The supplied value is returned from the Acquire this release triggers.

func (Baton) TryAcquire

func (b Baton) TryAcquire(timeout time.Duration) (interface{}, bool)

TryAcquire is a request to pick up the baton, it will block until another goroutine releases it or timeout passes. It will return the released value and true if the baton was successfully acquired.

func (Baton) TryRelease

func (b Baton) TryRelease(value interface{}, timeout time.Duration) bool

TryRelease is a request to relinquish the baton, it will block until another goroutine acquires it or timeout passes. It will return true if the baton was successfully released.

func (Baton) Yield

func (b Baton) Yield(value interface{}) interface{}

Yield is helper that does Release followed by an Acquire. It waits for another goroutine to acquire the baton, and then waits for the baton to be released back to this goroutine.

type CancelFunc

type CancelFunc context.CancelFunc

CancelFunc is a function type that can be used to stop a context.

func WithCancel

func WithCancel(ctx context.Context) (context.Context, CancelFunc)

WithCancel returns a copy of ctx with a new Done channel. See context.WithCancel for more details.

func WithDeadline

func WithDeadline(ctx context.Context, deadline time.Time) (context.Context, CancelFunc)

WithDeadline returns a copy of ctx with the deadline adjusted to be no later than deadline. See context.WithDeadline for more details.

func WithTimeout

func WithTimeout(ctx context.Context, duration time.Duration) (context.Context, CancelFunc)

WithTimeout is shorthand for ctx.WithDeadline(time.Now().Add(duration)). See context.Context.WithTimeout for more details.

type Event

type Event interface {
	// Fired returns true if Wait would not block.
	Fired() bool
	// Wait blocks until the signal has been fired or the context has been
	// cancelled.
	// Returns true if the signal was fired, false if the context was cancelled.
	Wait(ctx context.Context) bool
	// TryWait waits for the signal to fire, the context to be cancelled or the
	// timeout, whichever comes first.
	TryWait(ctx context.Context, timeout time.Duration) bool
}

Event is the interface to things that can be used to wait for conditions to occur.

type Events

type Events struct {
	// contains filtered or unexported fields
}

Events is a thread safe list of Event entries. It can be used to collect a list of events so you can Wait for them all to complete. The list of events may be purged of already completed entries at any time, this is an invisible optimization to the semantics of the API.

func (*Events) Add

func (e *Events) Add(events ...Event)

Add new events to the set to wait for completion on.

func (*Events) Join

func (e *Events) Join(ctx context.Context) Signal

Join the current list of events into a signal you can wait on. Subsequent calls to Add will not affect the returned signal.

func (*Events) Pending

func (e *Events) Pending() int

Pending returns the count of still pending events.

func (*Events) TryWait

func (e *Events) TryWait(ctx context.Context, timeout time.Duration) bool

TryWait waits for either timeout or all the events to fire, whichever comes first. This is a helper for e.Join(ctx).Wait(timeout)

func (*Events) Wait

func (e *Events) Wait(ctx context.Context) bool

Wait blocks until the all events in the list have been fired. This is a helper for e.Join(ctx).Wait(ctx)

type Executor

type Executor func(ctx context.Context, task Task) Handle

Executor is the signature for a function that executes a Task. When the task is invoked depends on the specific Executor. The returned handle can be used to wait for the task to complete and collect it's error return value.

func Batch

func Batch(executor Executor, signals *Events) Executor

Batch returns an executor that uses the supplied executor to run tasks, and automatically adds the completion signals for those tasks to the supplied Signals list.

type ExecutorFactory

type ExecutorFactory func(ctx context.Context, id interface{}) (Executor, Task)

ExecutorFactory returns an executor for the specified channel id, and a Task to shut the executor down again. The Task may be nil for executors that don't need to be shutdown.

func CachedExecutorFactory

func CachedExecutorFactory(ctx context.Context, factory ExecutorFactory) ExecutorFactory

CachedExecutorFactory builds an ExecutorFactory that makes new executors on demand using the underlying factory. It guarantees that all concurrent tasks delivered to the same id go to the same Executor, but it will drop executors that have no active tasks any more.

func PoolExecutorFactory

func PoolExecutorFactory(ctx context.Context, queue int, parallel int) ExecutorFactory

PoolExecutorFactory builds an ExecutorFactory that makes a new Pool executor each time it is invoked. The returned ExecutorFactory ignores the channel id.

type Handle

type Handle struct {
	Signal
	// contains filtered or unexported fields
}

Handle is a reference to a running task submitted to an executor. It can be used to check if the task has completed and get it's error result if it has one.

func Direct

func Direct(ctx context.Context, task Task) Handle

Direct is a synchronous implementation of an Executor that runs the task before returning. In general it is easier to just invoke the task directly, so this is only used in cases where you want to hand the Exectuor to something that is agnostic about how tasks are scheduled.

func Go

func Go(ctx context.Context, task Task) Handle

Go is an asynchronous implementation of an Executor that starts a new go routine to run the task.

func (Handle) Result

func (h Handle) Result(ctx context.Context) error

Result returns the error result of the task. It will block until the task has completed or the context is cancelled.

type Runner

type Runner func()

Runner is the type for a task that has been prepared to run by an executor. Invoking the runner will execute the underlying task, and trigger the signal when it completes.

type Signal

type Signal <-chan struct{}

Signal is used to notify that a task has completed. Nothing is ever sent through a signal, it is closed to indicate signalled.

var FiredSignal Signal

FiredSignal is a signal that is always in the fired state.

func (Signal) Fired

func (s Signal) Fired() bool

Fired returns true if the signal has been fired.

func (Signal) TryWait

func (s Signal) TryWait(ctx context.Context, timeout time.Duration) bool

TryWait waits for the signal to fire, the context to be cancelled or the timeout, whichever comes first. Returns true if the signal was fired, false if the context was cancelled or the timeout was reached.

func (Signal) Wait

func (s Signal) Wait(ctx context.Context) bool

Wait blocks until the signal has been fired or the context has been cancelled. Returns true if the signal was fired, false if the context was cancelled.

type Task

type Task func(context.Context) error

Task is the unit of work used in the task system. Tasks should generally be reentrant, they may be run more than once in more than one executor, and should generally be agnostic as to whether they are run in parallel.

func Delay

func Delay(task Task, duration time.Duration) Task

Delay wraps a task in a coroutine to asynchronously execute after a specified duration

func Noop

func Noop() Task

func Once

func Once(task Task) Task

Once wraps a task so that only the first invocation of the outer task invokes the inner task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL