marionette

package
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package marionette is a golang based task manager with scheduling, backoff, future scheduling built in. This is a somewhat temporary solution until an external state management solution can be implemented using Redis or similar

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskManagerStopped
	ErrTaskManagerStopped = errors.New("the task manager is not running")

	// ErrUnschedulable
	ErrUnschedulable = errors.New("cannot schedule a task with a zero valued timestamp")

	// ErrNoWorkers
	ErrNoWorkers = errors.New("invalid configuration: at least one worker must be specified")

	// ErrNoServerName
	ErrNoServerName = errors.New("invalid configuration: no server name specified")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Logger     *zap.SugaredLogger
	Workers    int    `default:"4" desc:"the number of workers to process tasks asynchronously"`
	QueueSize  int    `split_words:"true" default:"64" desc:"the number of async tasks to buffer in the queue before blocking"`
	ServerName string `split_words:"true" default:"marionette" desc:"used to describe the marionette service in the log"`
}

Config configures the marionette task manager so that different processes can utilize different asynchronous task processing resources depending on process compute constraints

func (Config) IsZero

func (c Config) IsZero() bool

IsZero checks if all the fields of the `Config` instance are set to their zero values. If all the fields are zero, it returns `true`, indicating that the `Config` instance is considered empty or uninitialized

func (Config) Validate

func (c Config) Validate() error

Validate validates the Config instance

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error keeps track of task failures and reports the failure context

func Errorf

func Errorf(format string, a ...any) *Error

Errorf returns a pointer to a new `Error` struct with the specified error message formatted as a string and arguments

func Errorw

func Errorw(err error) *Error

Errorw takes an error as input and returns a pointer to a new `Error` struct with the specified error set as the `err` field

func (*Error) Append

func (e *Error) Append(err error)

Append adds a task failure (or nil) to the array of task errors and increment attempts

func (*Error) Dict

func (e *Error) Dict()

Dict returns a log event that can be used with the *zap.Dict method for logging details about the error including the number of attempts, each individual attempt error and the total duration of processing the task before failure.

func (*Error) Error

func (e *Error) Error() string

Error implements the error interface and gives a high level message about failures

func (*Error) Is

func (e *Error) Is(target error) bool

Is checks if the error is the user specified target. If the wrapped user error is nil then it checks if the error is one of the task errors, otherwise returns false

func (*Error) Since

func (e *Error) Since(started time.Time)

Since sets the duration of processing the task to the time since the input timestamp

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the underlying user specified error, even if it is nil

type Future

type Future struct {
	Time time.Time
	Task Task
}

Future is a task/timestamp tuple that acts as a scheduler entry for running the task as close to the timestamp as possible without running it before the given time.

func (*Future) Validate

func (f *Future) Validate() error

type Futures

type Futures []*Future

Futures implements the sort.Sort interface and ensures that the list of future tasks is maintained in sorted order so that tasks are scheduled correctly. This slice is also memory managed to ensure that it is garbage collected routinely and does not memory leak (e.g. using the Resize function to create a new slice and free the old).

func (Futures) Insert

func (f Futures) Insert(t *Future) Futures

Insert a future into the slice of futures, growing the slice as necessary and returning it to replace the original slice (similar to append). Insert insures that the slice is maintained in sorted order and should be used instead of append.

func (Futures) Len

func (f Futures) Len() int

Len implementation of the sort.Sort interface

func (Futures) Less

func (f Futures) Less(i, j int) bool

func (Futures) Resize

func (f Futures) Resize() Futures

Resize the futures by copying the current futures into a new futures array, allowing the garbage collector to cleanup the previous slice and free up memory. See: https://forum.golangbridge.org/t/free-memory-of-slice/3713/2

func (Futures) Swap

func (f Futures) Swap(i, j int)

type Option

type Option func(*TaskHandler)

Option configures the task beyond the input context allowing for retries or backoff delays in task processing when there are failures or other task-specific handling

func WithBackoff

func WithBackoff(backoff backoff.BackOff) Option

WithBackoff strategy to use when retrying (default exponential backoff)

func WithContext

func WithContext(ctx context.Context) Option

WithContext specifies a base context to be used as the parent context when the task is executed and on all subsequent retries

NOTE: it is recommended that this context does not contain a deadline, otherwise the deadline may expire before the specified number of retries - use WithTimeout instead

func WithError

func WithError(err error) Option

WithError logs a specific error if all retries failed under the provided context. This error will be bundled with the errors that caused the retry failure and reported in a single error log message.

func WithErrorf

func WithErrorf(format string, a ...any) Option

WithErrorf logs a specific error as WithError but using fmt.Errorf semantics to create the err.

func WithRetries

func WithRetries(retries int) Option

WithRetries specifies the number of times to retry a task when it returns an error (default 0)

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout specifies a timeout to add to the context before passing it into the task function

type Scheduler

type Scheduler struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Scheduler manages a list of future tasks and on or after the time that they are supposed to be scheduled, the scheduler sends the task on the out channel. This allows marionette to schedule tasks for the future or to retry tasks with backoff delays so that the workers are not overwhelmed by long running tasks.

The scheduler is implemented with a sorted list of Futures (basically tasks arranged in time order) along with a single go routine that sleeps until the timestamp of the next task unless interrupted by a newly scheduled task. The goal of the scheduler is to minimize the number of go routines and CPU cycles in use so that higher priority work by the task manager or the main go routine is favored by the CPU. To that end the scheduler does not use a "ticker" clock checking if it should execute every second, preferring longer sleeps and interrupts instead.

func NewScheduler

func NewScheduler(out chan<- Task, logger *zap.SugaredLogger) *Scheduler

NewScheduler creates a new scheduler that can schedule task futures. The out channel is used to dispatch tasks at their scheduled time. If a task is sent on the out channel it means that the task should be executed as soon as possible. The scheduler makes no guarantees about exact timing of tasks scheduled except that the task will not be sent on the out channel before its scheduled time.

func (*Scheduler) Delay

func (s *Scheduler) Delay(delay time.Duration, task Task) error

Delay schedules the task to be run on or after the specified delay duration from now.

func (*Scheduler) IsRunning

func (s *Scheduler) IsRunning() bool

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(at time.Time, task Task) error

Schedule a task to run on or after the specified timestamp. If the scheduler is running the task future is sent to the main channel loop, otherwise the tasks is simply inserted into the futures slice. Schedule blocks until the task is received by the main scheduler loop.

func (*Scheduler) Start

func (s *Scheduler) Start(wg *sync.WaitGroup)

Start the scheduler in its own go routine or no-op if already started. If the specified wait group is not nil, it is marked as done when the scheduler is stopped.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop the scheduler if it is running, otherwise a no-op. Note that stopping the scheduler does not close the out channel. When stopped, any futures that are still pending will not be executed, but if the scheduler is started again, they will remain as previously scheduled and sent on the same out channel.

type Task

type Task interface {
	Do(context.Context) error
}

Task workers in the task manager handle Tasks which can hold state and other information needed by the task. You can also specify a simple function to execute by using the TaskFunc to create a Task to provide to the task manager.

type TaskFunc

type TaskFunc func(context.Context) error

TaskFunc converts a function into a Task that can be queued or scheduled

func (TaskFunc) Do

func (f TaskFunc) Do(ctx context.Context) error

Do ensures a TaskFunc implements the Task interface

type TaskHandler

type TaskHandler struct {
	// contains filtered or unexported fields
}

func (*TaskHandler) Do

Do taskhandler implements Task so that it can be scheduled, but it should never be called as a Task rather than a Handler (to avoid re-wrapping) so this method simply panics if called -- it is a user error

func (*TaskHandler) Exec

func (h *TaskHandler) Exec()

Exec the wrapped task with the context. If the task fails, schedule the task to be retried using the backoff specified in the options

func (*TaskHandler) String

func (h *TaskHandler) String() string

String implements fmt.Stringer and checks if the underlying task does as well; if so the task name is fetched from the task stringer, otherwise a default name is returned

type TaskManager

type TaskManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TaskManager execute Tasks using a fixed number of workers that operate in their own go routines. The TaskManager also has a fixed task queue size, so that if there are more tasks added to the task manager than the queue size, back pressure is applied

func New

func New(conf Config) *TaskManager

New creates a new task manager with the specified configuration

func (*TaskManager) Delay

func (tm *TaskManager) Delay(delay time.Duration, task Task, opts ...Option) error

Delay a task to be scheduled the specified duration from now

func (*TaskManager) GetQueueLength added in v0.2.4

func (tm *TaskManager) GetQueueLength() int

func (*TaskManager) IsRunning

func (tm *TaskManager) IsRunning() bool

IsRunning checks if the taskmanager is running

func (*TaskManager) Queue

func (tm *TaskManager) Queue(task Task, opts ...Option) error

Queue a task to be executed asynchronously as soon as a worker is available; blocks if queue is full

func (*TaskManager) Schedule

func (tm *TaskManager) Schedule(at time.Time, task Task, opts ...Option) error

Schedule a task to be executed at the specific timestamp

func (*TaskManager) Start

func (tm *TaskManager) Start()

Start the task manager and scheduler in their own go routines (no-op if already started)

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop stops the task manager and scheduler if running (otherwise a no-op). This method blocks until all pending tasks have been completed, however future scheduled tasks will likely be dropped and not scheduled for execution.

func (*TaskManager) WrapTask

func (tm *TaskManager) WrapTask(task Task, opts ...Option) *TaskHandler

WrapTask creates a new `TaskHandler` and sets its properties based on the provided options. If the `Task` passed to `WrapTask` is already a `TaskHandler`, it returns the same `TaskHandler` without re-wrapping it. Otherwise, it creates a new TaskHandler with a unique ID, sets the parent `TaskManager`, sets the task, sets the context, sets the number of attempts and retries, sets the backoff strategy, and sets the timeout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL