dispatcher

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTaskTimeout = 10 * time.Second

DefaultTaskTimeout is the timeout for tasks that do not define their own timeout.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	// Run this dispatcher by starting up all it's workers and executing any
	// tasks in it's queue. The dispatcher will run asynchronously until Quit is
	// called. Run is reentrant.
	Run()
	// Dispatch is non-blocking and will accept tasks up to the maximum number of
	// buffered tasks even if the dispatcher is not currently running. Returns true
	// if the queue is not currently full and false otherwise. If the queue is full,
	// increase the number of workers or run the dispatcher.
	Dispatch(task Task) bool
	// Quit running and stop all workers.
	Quit(drain bool)
}

Dispatcher is responsible for dispatching work to be processed asynchronously.

func NewDispatcher

func NewDispatcher(maxBufferedMessage int, minWorkers int, maxWorkers int) Dispatcher

NewDispatcher to handle asynchronous processing of Tasks with the specified maximum number of workers. In order to use the dispatcher to process work, callers must implement the `Task` interface. Execution for the dispatcher is asynchronous but `Dispatcher.Run` must be called for any tasks to be worked. Example Usage:

     type MyTask struct {}

     func (task *MyTask) Execute() error {
         // do work here
         return nil
     }
     func main() {
            maxBufferedMessages := 10
			   minWorkers := 1
			   maxWorkers := 10
            myDispatcher := dispatcher.NewDispatcher(maxBufferedMessages, minWorkers, maxWorkers)
            myDispatcher.Run()
            myTask := &MyTask{}
            myDispatcher.Dispatch(myTask)
            myDispatcher.Quit()
     }

type Status

type Status int32

Status is the how the dispatcher is currently functioning.

const (
	// Draining indicates that the dispatcher is stopping and waiting for all
	// pending tasks to exit
	Draining Status = 3
	// Running indicates that the dispatcher is receiving and executing tasks
	Running Status = 2
	// Stopping indicates that the dispatcher is stopping workers after their
	// current task finishes.
	Stopping Status = 1
	// Stopped indicates that the Dispatcher is not currently executing tasks.
	Stopped Status = 0
	// DefaultWaitBetweenScaleDowns is the time duration to wait between
	// attempts to scale down the worker pool. You do not want this to be too small
	// or a lot of time will be wasted scaling up and down.
	DefaultWaitBetweenScaleDowns = 5 * time.Minute
	// DefaultDispatchMissesBeforeDraining is the number of times the system
	// must fail to dispatch a message from the task channel prior to considering the
	// system fully drained
	DefaultDispatchMissesBeforeDraining = 5
)

type Task

type Task interface {
	// Execute function on the task on receipt and log the error.
	Execute() error
}

Task is the unit of work to be executed by a worker in the pool.

func NewRetryTask

func NewRetryTask(task Task, retry func() bool, retries int, period time.Duration) Task

NewRetryTask for the passed task that will retry execution up to the amount of retries specified whenever the passed retry function returns true.

type TaskStack

type TaskStack interface {
	// Size of the stack represented as a count of the elements in the stack.
	Size() int
	// Push a new data element onto the stack.
	Push(data *internalTask)
	// Pop the most recently pushed data element off the stack.
	Pop() (*internalTask, error)
	// Peek returns the most recently pushed element without modifying the stack
	Peek() (*internalTask, error)
}

TaskStack is a stack for storing tasks.

func NewTaskStack

func NewTaskStack() TaskStack

NewTaskStack that is empty and ready to use.

type Worker

type Worker interface {
	// Exec the passed message asynchronously
	Exec(*internalTask) bool
	// Run this worker so that it will perform work
	Run() <-chan bool
	// Quit this workers main event loop. This function block until
	// the current task has completed executing.
	Quit()
}

Worker is used internally by the dispatched to execute tasks in ready go routines

func NewWorker

func NewWorker(pool chan Worker, complete chan<- *internalTask) Worker

NewWorker for the passed worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL