tasks

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2023 License: BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Overview

Package tasks provides functionality for services to run a fixed number of workers to conduct generic asynchronous tasks. This is an intentionally simple package to make sure that routine, non-critical work happens in a non-blocking fashion.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskManagerStopped = errors.New("the task manager is not running")
)

Functions

func TaskScheduler added in v0.5.1

func TaskScheduler(wg *sync.WaitGroup, queue <-chan *TaskHandler, tasks chan<- *TaskHandler, stop <-chan struct{}, interval time.Duration)

TaskScheduler runs as a separate Go routine, listening for tasks on the retry channel and queueing them for a worker when their backoff period has expired.

func TaskWorker

func TaskWorker(wg *sync.WaitGroup, tasks <-chan *TaskHandler)

Types

type Error added in v0.5.1

type Error struct {
	// contains filtered or unexported fields
}

func NewError added in v0.5.1

func NewError(err error) *Error

func (*Error) Append added in v0.5.1

func (e *Error) Append(err error)

func (*Error) Capture added in v0.5.1

func (e *Error) Capture(hub *sentry.Hub)

func (*Error) Error added in v0.5.1

func (e *Error) Error() string

func (*Error) Is added in v0.5.1

func (e *Error) Is(target error) bool

func (*Error) Log added in v0.5.1

func (e *Error) Log(log zerolog.Logger) *zerolog.Event

func (*Error) Since added in v0.5.1

func (e *Error) Since(started time.Time)

func (*Error) Unwrap added in v0.5.1

func (e *Error) Unwrap() error

type Option added in v0.5.1

type Option func(*options)

Option allows retries and backoff to be configured for individual tasks.

func WithBackoff added in v0.5.1

func WithBackoff(backoff backoff.BackOff) Option

Backoff strategy to use when retrying, default is an exponential backoff

func WithError added in v0.5.1

func WithError(err error) Option

Log a specific error if all retries failed under the provided context. This error will be bundled with the errors that caused the retry failure and reported in a single error log message.

func WithRetries added in v0.5.1

func WithRetries(retries int) Option

Number of retries to attempt before giving up, default 0

type Task

type Task interface {
	Do(context.Context) error
}

Workers in the task manager handle Tasks which can hold state and other information needed by the task. You can also specify a simple function to execute by using the TaskFunc to create a Task to provide to the task manager.

type TaskFunc

type TaskFunc func(context.Context) error

TaskFunc is an adapter to allow ordinary functions to be used as tasks.

func (TaskFunc) Do

func (t TaskFunc) Do(ctx context.Context) error

type TaskHandler

type TaskHandler struct {
	// contains filtered or unexported fields
}

func (*TaskHandler) Exec added in v0.5.1

func (h *TaskHandler) Exec()

Execute the wrapped task with the context. If the task fails, schedule the task to be retried using the backoff specified in the options.

type TaskManager

type TaskManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TaskManagers execute Tasks using a fixed number of workers that operate in their own go routines. The TaskManager also has a fixed task queue size, so that if there are more tasks added to the task manager than the queue size, back pressure is applied.

func New

func New(workers, queueSize int, retryInterval time.Duration) *TaskManager

New returns TaskManager, running the specified number of workers in their own Go routines and creating a queue of the specified size. The task manager is now ready to perform routine tasks!

func (*TaskManager) IsStopped

func (tm *TaskManager) IsStopped() bool

Check if the task manager has been stopped (blocks until fully stopped).

func (*TaskManager) Queue

func (tm *TaskManager) Queue(task Task, opts ...Option) error

Queue a task with a background context. Blocks if the queue is full.

func (*TaskManager) QueueContext

func (tm *TaskManager) QueueContext(ctx context.Context, task Task, opts ...Option) error

Queue a task with the specified context. Blocks if the queue is full.

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop the task manager waiting for all workers to stop their tasks before returning.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL