Documentation ¶
Overview ¶
Package marionette is a golang based task manager with scheduling, backoff, future scheduling built in. This is a somewhat temporary solution until an external state management solution can be implemented using Redis or similar
Index ¶
- Variables
- type Config
- type Error
- type Future
- type Futures
- type Option
- type Scheduler
- type Task
- type TaskFunc
- type TaskHandler
- type TaskManager
- func (tm *TaskManager) Delay(delay time.Duration, task Task, opts ...Option) error
- func (tm *TaskManager) IsRunning() bool
- func (tm *TaskManager) Queue(task Task, opts ...Option) error
- func (tm *TaskManager) Schedule(at time.Time, task Task, opts ...Option) error
- func (tm *TaskManager) Start()
- func (tm *TaskManager) Stop()
- func (tm *TaskManager) WrapTask(task Task, opts ...Option) *TaskHandler
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTaskManagerStopped ErrTaskManagerStopped = errors.New("the task manager is not running") // ErrUnschedulable ErrUnschedulable = errors.New("cannot schedule a task with a zero valued timestamp") // ErrNoWorkers ErrNoWorkers = errors.New("invalid configuration: at least one worker must be specified") // ErrNoServerName ErrNoServerName = errors.New("invalid configuration: no server name specified") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Logger *zap.SugaredLogger Workers int `default:"4" desc:"the number of workers to process tasks asynchronously"` QueueSize int `split_words:"true" default:"64" desc:"the number of async tasks to buffer in the queue before blocking"` ServerName string `split_words:"true" default:"marionette" desc:"used to describe the marionette service in the log"` }
Config configures the marionette task manager so that different processes can utilize different asynchronous task processing resources depending on process compute constraints
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error keeps track of task failures and reports the failure context
func Errorf ¶
Errorf returns a pointer to a new `Error` struct with the specified error message formatted as a string and arguments
func Errorw ¶
Errorw takes an error as input and returns a pointer to a new `Error` struct with the specified error set as the `err` field
func (*Error) Append ¶
Append adds a task failure (or nil) to the array of task errors and increment attempts
func (*Error) Dict ¶
func (e *Error) Dict()
Dict returns a log event that can be used with the *zap.Dict method for logging details about the error including the number of attempts, each individual attempt error and the total duration of processing the task before failure.
func (*Error) Error ¶
Error implements the error interface and gives a high level message about failures
func (*Error) Is ¶
Is checks if the error is the user specified target. If the wrapped user error is nil then it checks if the error is one of the task errors, otherwise returns false
type Future ¶
Future is a task/timestamp tuple that acts as a scheduler entry for running the task as close to the timestamp as possible without running it before the given time.
type Futures ¶
type Futures []*Future
Futures implements the sort.Sort interface and ensures that the list of future tasks is maintained in sorted order so that tasks are scheduled correctly. This slice is also memory managed to ensure that it is garbage collected routinely and does not memory leak (e.g. using the Resize function to create a new slice and free the old).
func (Futures) Insert ¶
Insert a future into the slice of futures, growing the slice as necessary and returning it to replace the original slice (similar to append). Insert insures that the slice is maintained in sorted order and should be used instead of append.
func (Futures) Resize ¶
Resize the futures by copying the current futures into a new futures array, allowing the garbage collector to cleanup the previous slice and free up memory. See: https://forum.golangbridge.org/t/free-memory-of-slice/3713/2
type Option ¶
type Option func(*TaskHandler)
Option configures the task beyond the input context allowing for retries or backoff delays in task processing when there are failures or other task-specific handling
func WithBackoff ¶
WithBackoff strategy to use when retrying (default exponential backoff)
func WithContext ¶
WithContext specifies a base context to be used as the parent context when the task is executed and on all subsequent retries
NOTE: it is recommended that this context does not contain a deadline, otherwise the deadline may expire before the specified number of retries - use WithTimeout instead
func WithError ¶
WithError logs a specific error if all retries failed under the provided context. This error will be bundled with the errors that caused the retry failure and reported in a single error log message.
func WithErrorf ¶
WithErrorf logs a specific error as WithError but using fmt.Errorf semantics to create the err.
func WithRetries ¶
WithRetries specifies the number of times to retry a task when it returns an error (default 0)
func WithTimeout ¶
WithTimeout specifies a timeout to add to the context before passing it into the task function
type Scheduler ¶
Scheduler manages a list of future tasks and on or after the time that they are supposed to be scheduled, the scheduler sends the task on the out channel. This allows marionette to schedule tasks for the future or to retry tasks with backoff delays so that the workers are not overwhelmed by long running tasks.
The scheduler is implemented with a sorted list of Futures (basically tasks arranged in time order) along with a single go routine that sleeps until the timestamp of the next task unless interrupted by a newly scheduled task. The goal of the scheduler is to minimize the number of go routines and CPU cycles in use so that higher priority work by the task manager or the main go routine is favored by the CPU. To that end the scheduler does not use a "ticker" clock checking if it should execute every second, preferring longer sleeps and interrupts instead.
func NewScheduler ¶
func NewScheduler(out chan<- Task, logger *zap.SugaredLogger) *Scheduler
NewScheduler creates a new scheduler that can schedule task futures. The out channel is used to dispatch tasks at their scheduled time. If a task is sent on the out channel it means that the task should be executed as soon as possible. The scheduler makes no guarantees about exact timing of tasks scheduled except that the task will not be sent on the out channel before its scheduled time.
func (*Scheduler) Delay ¶
Delay schedules the task to be run on or after the specified delay duration from now.
func (*Scheduler) Schedule ¶
Schedule a task to run on or after the specified timestamp. If the scheduler is running the task future is sent to the main channel loop, otherwise the tasks is simply inserted into the futures slice. Schedule blocks until the task is received by the main scheduler loop.
func (*Scheduler) Start ¶
Start the scheduler in its own go routine or no-op if already started. If the specified wait group is not nil, it is marked as done when the scheduler is stopped.
func (*Scheduler) Stop ¶
func (s *Scheduler) Stop()
Stop the scheduler if it is running, otherwise a no-op. Note that stopping the scheduler does not close the out channel. When stopped, any futures that are still pending will not be executed, but if the scheduler is started again, they will remain as previously scheduled and sent on the same out channel.
type Task ¶
Task workers in the task manager handle Tasks which can hold state and other information needed by the task. You can also specify a simple function to execute by using the TaskFunc to create a Task to provide to the task manager.
type TaskHandler ¶
type TaskHandler struct {
// contains filtered or unexported fields
}
func (*TaskHandler) Do ¶
func (h *TaskHandler) Do(context.Context) error
Do taskhandler implements Task so that it can be scheduled, but it should never be called as a Task rather than a Handler (to avoid re-wrapping) so this method simply panics if called -- it is a user error
func (*TaskHandler) Exec ¶
func (h *TaskHandler) Exec()
Exec the wrapped task with the context. If the task fails, schedule the task to be retried using the backoff specified in the options
func (*TaskHandler) String ¶
func (h *TaskHandler) String() string
String implements fmt.Stringer and checks if the underlying task does as well; if so the task name is fetched from the task stringer, otherwise a default name is returned
type TaskManager ¶
TaskManager execute Tasks using a fixed number of workers that operate in their own go routines. The TaskManager also has a fixed task queue size, so that if there are more tasks added to the task manager than the queue size, back pressure is applied
func New ¶
func New(conf Config) *TaskManager
New creates a new task manager with the specified configuration
func (*TaskManager) IsRunning ¶
func (tm *TaskManager) IsRunning() bool
IsRunning checks if the taskmanager is running
func (*TaskManager) Queue ¶
func (tm *TaskManager) Queue(task Task, opts ...Option) error
Queue a task to be executed asynchronously as soon as a worker is available; blocks if queue is full
func (*TaskManager) Start ¶
func (tm *TaskManager) Start()
Start the task manager and scheduler in their own go routines (no-op if already started)
func (*TaskManager) Stop ¶
func (tm *TaskManager) Stop()
Stop stops the task manager and scheduler if running (otherwise a no-op). This method blocks until all pending tasks have been completed, however future scheduled tasks will likely be dropped and not scheduled for execution.
func (*TaskManager) WrapTask ¶
func (tm *TaskManager) WrapTask(task Task, opts ...Option) *TaskHandler
WrapTask creates a new `TaskHandler` and sets its properties based on the provided options. If the `Task` passed to `WrapTask` is already a `TaskHandler`, it returns the same `TaskHandler` without re-wrapping it. Otherwise, it creates a new TaskHandler with a unique ID, sets the parent `TaskManager`, sets the task, sets the context, sets the number of attempts and retries, sets the backoff strategy, and sets the timeout