Documentation ¶
Index ¶
Constants ¶
const DefaultTaskTimeout = 10 * time.Second
DefaultTaskTimeout is the timeout for tasks that do not define their own timeout.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher interface { // Run this dispatcher by starting up all it's workers and executing any // tasks in it's queue. The dispatcher will run asynchronously until Quit is // called. Run is reentrant. Run() // Dispatch is non-blocking and will accept tasks up to the maximum number of // buffered tasks even if the dispatcher is not currently running. Returns true // if the queue is not currently full and false otherwise. If the queue is full, // increase the number of workers or run the dispatcher. Dispatch(task Task) bool // Quit running and stop all workers. Quit(drain bool) }
Dispatcher is responsible for dispatching work to be processed asynchronously.
func NewDispatcher ¶
func NewDispatcher(maxBufferedMessage int, minWorkers int, maxWorkers int) Dispatcher
NewDispatcher to handle asynchronous processing of Tasks with the specified maximum number of workers. In order to use the dispatcher to process work, callers must implement the `Task` interface. Execution for the dispatcher is asynchronous but `Dispatcher.Run` must be called for any tasks to be worked. Example Usage:
type MyTask struct {} func (task *MyTask) Execute() error { // do work here return nil } func main() { maxBufferedMessages := 10 minWorkers := 1 maxWorkers := 10 myDispatcher := dispatcher.NewDispatcher(maxBufferedMessages, minWorkers, maxWorkers) myDispatcher.Run() myTask := &MyTask{} myDispatcher.Dispatch(myTask) myDispatcher.Quit() }
type Status ¶
type Status int32
Status is the how the dispatcher is currently functioning.
const ( // Draining indicates that the dispatcher is stopping and waiting for all // pending tasks to exit Draining Status = 3 // Running indicates that the dispatcher is receiving and executing tasks Running Status = 2 // Stopping indicates that the dispatcher is stopping workers after their // current task finishes. Stopping Status = 1 // Stopped indicates that the Dispatcher is not currently executing tasks. Stopped Status = 0 // DefaultWaitBetweenScaleDowns is the time duration to wait between // attempts to scale down the worker pool. You do not want this to be too small // or a lot of time will be wasted scaling up and down. DefaultWaitBetweenScaleDowns = 5 * time.Minute // DefaultDispatchMissesBeforeDraining is the number of times the system // must fail to dispatch a message from the task channel prior to considering the // system fully drained DefaultDispatchMissesBeforeDraining = 5 )
type Task ¶
type Task interface { // Execute function on the task on receipt and log the error. Execute() error }
Task is the unit of work to be executed by a worker in the pool.
type TaskStack ¶
type TaskStack interface { // Size of the stack represented as a count of the elements in the stack. Size() int // Push a new data element onto the stack. Push(data *internalTask) // Pop the most recently pushed data element off the stack. Pop() (*internalTask, error) // Peek returns the most recently pushed element without modifying the stack Peek() (*internalTask, error) }
TaskStack is a stack for storing tasks.
type Worker ¶
type Worker interface { // Exec the passed message asynchronously Exec(*internalTask) bool // Run this worker so that it will perform work Run() <-chan bool // Quit this workers main event loop. This function block until // the current task has completed executing. Quit() }
Worker is used internally by the dispatched to execute tasks in ready go routines