Documentation ¶
Overview ¶
Package tasks provides workers that effectively run the instances of the scheduled jobs.
Index ¶
- Constants
- Variables
- type Dispatcher
- type ReconnectingClient
- type Runnable
- type Subscriber
- type Task
- func (t *Task) Add(delta int)
- func (t *Task) AppendLog(a jobs.Action, in jobs.ActionMessage, out jobs.ActionMessage)
- func (t *Task) Done(delta int)
- func (t *Task) EnqueueRunnables(output chan Runnable)
- func (t *Task) GetJobTaskClone() *jobs.Task
- func (t *Task) GetRunUUID() string
- func (t *Task) GetRunnableChannels() (*actions.RunnableChannels, chan bool)
- func (t *Task) GlobalError(e error)
- func (t *Task) Save()
- func (t *Task) SetControllable(canStop bool, canPause bool)
- func (t *Task) SetEndTime(ti time.Time)
- func (t *Task) SetHasProgress()
- func (t *Task) SetProgress(progress float32)
- func (t *Task) SetStartTime(ti time.Time)
- func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)
- type Worker
Constants ¶
const ( PubSubTopicTaskStatuses = "tasks" PubSubTopicControl = "control" )
const (
// DefaultMaximumWorkers is set to 20.
DefaultMaximumWorkers = 20
)
Variables ¶
var ( PubSub *pubsub.PubSub ContextJobParametersKey = struct{}{} )
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher orchestrates the jobs by dispatching work to available workers.
func NewDispatcher ¶
func NewDispatcher(maxWorkers int, tags map[string]string) *Dispatcher
NewDispatcher creates and initialises a new Dispatcher with this amount of workers.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run()
Run simply starts the N workers of this dispacher.
func (*Dispatcher) Stop ¶
func (d *Dispatcher) Stop()
Stop sends a quit signal to all workers and the main dispatcher
type ReconnectingClient ¶
type ReconnectingClient struct {
// contains filtered or unexported fields
}
func NewTaskReconnectingClient ¶
func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient
func (*ReconnectingClient) StartListening ¶
func (s *ReconnectingClient) StartListening(tasksChan chan interface{})
func (*ReconnectingClient) Stop ¶
func (s *ReconnectingClient) Stop()
type Runnable ¶
type Runnable struct { jobs.Action Task *Task Message jobs.ActionMessage Context context.Context Implementation actions.ConcreteAction ActionPath string }
Runnable represents the runnable instance of a given task
func NewRunnable ¶
func NewRunnable(ctx context.Context, parentPath string, chainIndex int, task *Task, action *jobs.Action, message jobs.ActionMessage) Runnable
NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.
func (*Runnable) CreateChild ¶
func (r *Runnable) CreateChild(parentPath string, chainIndex int, action *jobs.Action, message jobs.ActionMessage) Runnable
CreateChild replicates a runnable for child action
type Subscriber ¶
Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions
func NewSubscriber ¶
func NewSubscriber(parentContext context.Context) *Subscriber
NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispatcher, one for each job definition. Todo v4 : make sure everything is properly closed on ctx.Done() signal
func (*Subscriber) Init ¶
func (s *Subscriber) Init()
Init subscriber with current list of jobs from Jobs service
type Task ¶
type Task struct { *jobs.Job sync.RWMutex common.RuntimeHolder // contains filtered or unexported fields }
func NewTaskFromEvent ¶
NewTaskFromEvent creates a task based on incoming job and event
func (*Task) AppendLog ¶
func (t *Task) AppendLog(a jobs.Action, in jobs.ActionMessage, out jobs.ActionMessage)
AppendLog appends logs from an action to the task OutputChain
func (*Task) EnqueueRunnables ¶
EnqueueRunnables appends chained actions to a running Runnable
func (*Task) GetJobTaskClone ¶
GetJobTaskClone creates a protobuf clone of this task
func (*Task) GetRunUUID ¶
GetRunUUID returns the task internal run UUID
func (*Task) GetRunnableChannels ¶
func (t *Task) GetRunnableChannels() (*actions.RunnableChannels, chan bool)
GetRunnableChannels prepares a set of data channels for action actual Run method.
func (*Task) GlobalError ¶
GlobalError set task in error globally
func (*Task) SetControllable ¶
SetControllable flags task as being able to be stopped or paused
func (*Task) SetHasProgress ¶
func (t *Task) SetHasProgress()
SetHasProgress flags task as providing progress information
func (*Task) SetProgress ¶
SetProgress updates task internal progress
func (*Task) SetStartTime ¶
SetStartTime updates start time
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents the worker that executes the jobs.
func NewWorker ¶
func NewWorker(workerPool chan chan Runnable, requeue chan Runnable, activeChan chan int, tags map[string]string) Worker
NewWorker creates and configures a new worker.