Documentation ¶
Overview ¶
Package tasks provides workers that effectively run the instances of the scheduled jobs.
Index ¶
- Constants
- Variables
- func UnSubWithFlush(ch chan interface{}, topics ...string)
- type ContextJobParametersKey
- type Dispatcher
- type ReconnectingClient
- type Runnable
- func (r *Runnable) Add(i int)
- func (r *Runnable) AsRunnerFuncRun() RunnerFunc
- func (r *Runnable) Dispatch(input *jobs.ActionMessage, aa []*jobs.Action, queue chan RunnerFunc)
- func (r *Runnable) Done()
- func (r *Runnable) RunAction(queue chan RunnerFunc)
- func (r *Runnable) SetupCollector(parentCtx context.Context, mergeAction *jobs.Action, queue chan RunnerFunc)
- type RunnerFunc
- type Subscriber
- type Task
- func (t *Task) Add(delta int)
- func (t *Task) CleanUp()
- func (t *Task) Clone() *jobs.Task
- func (t *Task) Done(delta int)
- func (t *Task) GetRunUUID() string
- func (t *Task) GetRunnableChannels(runnableCtx context.Context, controllable bool) (*actions.RunnableChannels, chan bool)
- func (t *Task) Queue(queue ...chan RunnerFunc)
- func (t *Task) Save()
- func (t *Task) SaveStatus(runnableContext context.Context, runnableStatus jobs.TaskStatus)
- func (t *Task) SetControllable(canPause bool)
- func (t *Task) SetEndTime(ti time.Time)
- func (t *Task) SetError(e error, appendLog bool)
- func (t *Task) SetHasProgress()
- func (t *Task) SetProgress(progress float32)
- func (t *Task) SetStartTime(ti time.Time)
- func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)
- type TaskStatusUpdate
- type Worker
Constants ¶
const ( PubSubTopicTaskStatuses = "tasks" PubSubTopicControl = "control" )
const (
// DefaultMaximumWorkers is set to 20.
DefaultMaximumWorkers = 20
)
Variables ¶
var (
PubSub *pubsub.PubSub
)
Functions ¶
func UnSubWithFlush ¶ added in v4.2.2
func UnSubWithFlush(ch chan interface{}, topics ...string)
UnSubWithFlush wraps PubSub.Unsub with a select to make sure all messages are consumed before unsubscribing.
Types ¶
type ContextJobParametersKey ¶
type ContextJobParametersKey struct{}
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher orchestrates the jobs by dispatching work to available workers.
func NewDispatcher ¶
func NewDispatcher(rootCtx context.Context, maxWorkers int, job *jobs.Job, tags map[string]string) *Dispatcher
NewDispatcher creates and initialises a new Dispatcher with this amount of workers.
func (*Dispatcher) Queue ¶ added in v4.2.4
func (d *Dispatcher) Queue() chan RunnerFunc
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run()
Run simply starts the N workers of this dispacher.
func (*Dispatcher) Stop ¶
func (d *Dispatcher) Stop()
Stop sends a quit signal to all workers and the main dispatcher
type ReconnectingClient ¶
type ReconnectingClient struct {
// contains filtered or unexported fields
}
func NewTaskReconnectingClient ¶
func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient
func (*ReconnectingClient) StartListening ¶
func (s *ReconnectingClient) StartListening(tasksChan chan interface{})
func (*ReconnectingClient) Stop ¶
func (s *ReconnectingClient) Stop()
type Runnable ¶
type Runnable struct { *jobs.Action Task *Task Message *jobs.ActionMessage Context context.Context Implementation actions.ConcreteAction ActionPath string // contains filtered or unexported fields }
Runnable represents the runnable instance of a given task
func NewRunnable ¶
func NewRunnable(ctx context.Context, parent *Runnable, queue chan RunnerFunc, action *jobs.Action, message *jobs.ActionMessage, indexTag ...int) Runnable
NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.
func (*Runnable) AsRunnerFuncRun ¶ added in v4.0.5
func (r *Runnable) AsRunnerFuncRun() RunnerFunc
func (*Runnable) Dispatch ¶
func (r *Runnable) Dispatch(input *jobs.ActionMessage, aa []*jobs.Action, queue chan RunnerFunc)
Dispatch gets next Runnable list from action and enqueues them to the Queue Done channel should be working correctly with chained actions
func (*Runnable) RunAction ¶
func (r *Runnable) RunAction(queue chan RunnerFunc)
RunAction creates an action and calls Dispatch
func (*Runnable) SetupCollector ¶ added in v4.0.5
type RunnerFunc ¶ added in v4.0.5
type RunnerFunc func(queue chan RunnerFunc)
type Subscriber ¶
Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions
func NewSubscriber ¶
func NewSubscriber(parentContext context.Context) *Subscriber
NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispatcher, one for each job definition.
type Task ¶
type Task struct { *jobs.Job common.RuntimeHolder // contains filtered or unexported fields }
func NewTaskFromEvent ¶
NewTaskFromEvent creates a task based on incoming job and event
func (*Task) CleanUp ¶ added in v4.0.1
func (t *Task) CleanUp()
CleanUp is triggered after a task has no more subroutines running.
func (*Task) Done ¶
Done decrements task internal retain counter - When reaching 0, it triggers the CleanUp operation
func (*Task) GetRunUUID ¶
GetRunUUID returns the task internal run UUID
func (*Task) GetRunnableChannels ¶
func (t *Task) GetRunnableChannels(runnableCtx context.Context, controllable bool) (*actions.RunnableChannels, chan bool)
GetRunnableChannels prepares a set of data channels for action actual Run method.
func (*Task) Queue ¶ added in v4.0.1
func (t *Task) Queue(queue ...chan RunnerFunc)
Queue send this new task to the dispatcher queue. If a second queue is passed, it may differ from main input queue, so it is used for children queuing
func (*Task) SaveStatus ¶ added in v4.4.0
func (t *Task) SaveStatus(runnableContext context.Context, runnableStatus jobs.TaskStatus)
SaveStatus publish task to PubSub topic, including Runnable context if passed
func (*Task) SetControllable ¶
SetControllable flags task as being able to be stopped or paused
func (*Task) SetHasProgress ¶
func (t *Task) SetHasProgress()
SetHasProgress flags task as providing progress information
func (*Task) SetProgress ¶
SetProgress updates task internal progress
func (*Task) SetStartTime ¶
SetStartTime updates start time
type TaskStatusUpdate ¶ added in v4.4.0
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents the worker that executes the jobs.
func NewWorker ¶
func NewWorker(workerPool chan chan RunnerFunc, requeue chan RunnerFunc, activeChan chan int, tags map[string]string) Worker
NewWorker creates and configures a new worker.