radish

package
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2023 License: BSD-3-Clause Imports: 11 Imported by: 0

Documentation

Overview

Package tasks provides functionality for services to run a fixed number of workers to conduct generic asynchronous tasks. This is an intentionally simple package to make sure that routine, non-critical work happens in a non-blocking fashion.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTaskManagerStopped = errors.New("the task manager is not running")
	ErrUnschedulable      = errors.New("cannot schedule a task with a zero valued timestamp")
	ErrNoWorkers          = errors.New("invalid configuration: at least one worker must be specified")
	ErrNoServerName       = errors.New("invalid configuration: no server name specified")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Workers    int    `default:"4" desc:"the number of workers to process tasks asynchronously"`
	QueueSize  int    `split_words:"true" default:"64" desc:"the number of async tasks to buffer in the queue before blocking"`
	ServerName string `split_words:"true" default:"radish" desc:"used to describe the radish service in the log"`
}

Configures the radish task manager so that different processes can utilize different asynchronous task processing resources depending on process compute constraints.

func (Config) IsZero

func (c Config) IsZero() bool

func (Config) Validate

func (c Config) Validate() error

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error keeps track of task failures and reports the failure context to Sentry.

func Errorf

func Errorf(format string, a ...any) *Error

func Errorw

func Errorw(err error) *Error

func (*Error) Append

func (e *Error) Append(err error)

Add a task failure (or nil) to the array of task errors and increment attempts.

func (*Error) Capture

func (e *Error) Capture(hub *sentry.Hub)

Capture the task processing event in Sentry with details about the error including the number of attempts, each individual attempt error, and the total duration of processing before failure. This method sets the "radish" context for review in Sentry

func (*Error) Dict

func (e *Error) Dict() *zerolog.Event

Returns a zerlog log event that can be used with the *Event.Dict method for logging details about the error including the number of attempts, each individual attempt error and the total duration of processing the task before failure.

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface and gives a high level message about failure.

func (*Error) Is

func (e *Error) Is(target error) bool

Is checks if the error is the user specified target. If the wrapped user error is nil then it checks if the error is one of the task errors, otherwise returns false.

func (*Error) Since

func (e *Error) Since(started time.Time)

Since sets the duration of processing the task to the time since the input timestamp.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying user specified error, even if it is nil.

type Future

type Future struct {
	Time time.Time
	Task Task
}

Future is a task/timestamp tuple that acts as a scheduler entry for running the task as close to the timestamp as possible without running it before the given time.

func (*Future) Validate

func (f *Future) Validate() error

type Futures

type Futures []*Future

Futures implements the sort.Sort interface and ensures that the list of future tasks is maintained in sorted order so that tasks are scheduled correctly. This slice is also memory managed to ensure that it is garbage collected routinely and does not memory leak (e.g. using the Resize function to create a new slice and free the old).

func (Futures) Insert

func (f Futures) Insert(t *Future) Futures

Insert a future into the slice of futures, growing the slice as necessary and returning it to replace the original slice (similar to append). Insert insures that the slice is maintained in sorted order and should be used instead of append.

func (Futures) Len

func (f Futures) Len() int

Implementation of the sort.Sort interface

func (Futures) Less

func (f Futures) Less(i, j int) bool

func (Futures) Resize

func (f Futures) Resize() Futures

Resizes the futures by copying the current futures into a new futures array, allowing the garbage collector to cleanup the previous slice and free up memory. See: https://forum.golangbridge.org/t/free-memory-of-slice/3713/2

func (Futures) Swap

func (f Futures) Swap(i, j int)

type Option

type Option func(*TaskHandler)

Options configure the task beyond the input context allowing for retries or backoff delays in task processing when there are failures or other task-specific handling.

func WithBackoff

func WithBackoff(backoff backoff.BackOff) Option

Backoff strategy to use when retrying (default exponential backoff).

func WithContext

func WithContext(ctx context.Context) Option

Specify a base context to be used as the parent context when the task is executed and on all subsequent retries.

NOTE: it is recommended that this context does not contain a deadline, otherwise the deadline may expire before the specified number of retries. Use WithTimeout instead.

func WithError

func WithError(err error) Option

Log a specific error if all retries failed under the provided context. This error will be bundled with the errors that caused the retry failure and reported in a single error log message.

func WithErrorf

func WithErrorf(format string, a ...any) Option

Log a specific error as WithError but using fmt.Errorf semantics to create the err.

func WithRetries

func WithRetries(retries int) Option

Specify the number of times to retry a task when it returns an error (default 0).

func WithTimeout

func WithTimeout(timeout time.Duration) Option

Specify a timeout to add to the context before passing it into the task function.

type Scheduler

type Scheduler struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Scheduler manages a list of future tasks and on or after the time that they are supposed to be scheduled, the scheduler sends the task on the out channel. This allows radish to schedule tasks for the future or to retry tasks with backoff delays so that the workers are not overwhelmed by long running tasks.

The scheduler is implemented with a sorted list of Futures (basically tasks arranged in time order) along with a single go routine that sleeps until the timestamp of the next task unless interrupted by a newly scheduled task. The goal of the scheduler is to minimize the number of go routines and CPU cycles in use so that higher priority work by the task manager or the main go routine is favored by the CPU. To that end the scheduler does not use a "ticker" clock checking if it should execute every second, perferring longer sleeps and interrupts instead.

func NewScheduler

func NewScheduler(out chan<- Task, logger zerolog.Logger) *Scheduler

Create a new scheduler that can schedule task futures. The out channel is used to dispatch tasks at their scheduled time. If a task is sent on the out channel it means that the task should be executed as soon as possible. The scheduler makes no guarantees about exact timing of tasks scheduled except that the task will not be sent on the out channel before its scheduled time.

func (*Scheduler) Delay

func (s *Scheduler) Delay(delay time.Duration, task Task) error

Delay schedules the task to be run on or after the specified delay duration from now.

func (*Scheduler) IsRunning

func (s *Scheduler) IsRunning() bool

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(at time.Time, task Task) error

Schedule a task to run on or after the specified timestamp. If the scheduler is running the task future is sent to the main channel loop, otherwise the tasks is simply inserted into the futures slice. Schedule blocks until the task is received by the main scheduler loop.

func (*Scheduler) Start

func (s *Scheduler) Start(wg *sync.WaitGroup)

Start the scheduler in its own go routine or no-op if already started. If the specified wait group is not nil, it is marked as done when the scheduler is stopped.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop the scheduler if it is running, otherwise a no-op. Note that stopping the scheduler does not close the out channel. When stopped, any futures that are still pending will not be executed, but if the scheduler is started again, they will remain as previously scheduled and sent on the same out channel.

type Task

type Task interface {
	Do(context.Context) error
}

Workers in the task manager handle Tasks which can hold state and other information needed by the task. You can also specify a simple function to execute by using the TaskFunc to create a Task to provide to the task manager.

type TaskFunc

type TaskFunc func(context.Context) error

TaskFunc converts a function into a Task that can be queued or scheduled.

func (TaskFunc) Do

func (f TaskFunc) Do(ctx context.Context) error

Ensures a TaskFunc implements the Task interface.

type TaskHandler

type TaskHandler struct {
	// contains filtered or unexported fields
}

func (*TaskHandler) Do

TaskHandler implements Task so that it can be scheduled, but it should never be called as a Task rather than a Handler (to avoid re-wrapping) so this method simply panics if called -- it is a developer error.

func (*TaskHandler) Exec

func (h *TaskHandler) Exec()

Execute the wrapped task with the context. If the task fails, schedule the task to be retried using the backoff specified in the options.

func (*TaskHandler) String

func (h *TaskHandler) String() string

String implements fmt.Stringer and checks if the underlying task does as well; if so the task name is fetched from the task stringer, otherwise a default name is returned.

type TaskManager

type TaskManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TaskManagers execute Tasks using a fixed number of workers that operate in their own go routines. The TaskManager also has a fixed task queue size, so that if there are more tasks added to the task manager than the queue size, back pressure is applied.

func New

func New(conf Config) *TaskManager

Create a new task manager with the specified configuration.

func (*TaskManager) Delay

func (tm *TaskManager) Delay(delay time.Duration, task Task, opts ...Option) error

Delay a task to be scheduled the specified duration from now.

func (*TaskManager) IsRunning

func (tm *TaskManager) IsRunning() bool

func (*TaskManager) Queue

func (tm *TaskManager) Queue(task Task, opts ...Option) error

Queue a task to be executed asynchronously as soon as a worker is available. Options can be specified to influence the handling of the task. Blocks if queue is full.

func (*TaskManager) QueueContext deprecated

func (tm *TaskManager) QueueContext(ctx context.Context, task Task, opts ...Option) error

Queue a task with the specified context. Note that the context should not contain a deadline that might be sooner than backoff retries or the task will always fail. To specify a timeout for each retry, use WithTimeout. Blocks if the queue is full.

Deprecated: use tm.Queue(task, WithContext(ctx)) instead.

func (*TaskManager) Schedule

func (tm *TaskManager) Schedule(at time.Time, task Task, opts ...Option) error

Schedule a task to be executed at the specific timestamp.

func (*TaskManager) Start

func (tm *TaskManager) Start()

Start the task manager and scheduler in their own go routines (no-op if already started)

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop the task manager and scheduler if running (otherwise a no-op). This method blocks until all pending tasks have been completed, however future scheduled tasks will likely be dropped and not scheduled for execution.

func (*TaskManager) WrapTask

func (tm *TaskManager) WrapTask(task Task, opts ...Option) *TaskHandler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL