Documentation ¶
Overview ¶
Package tasks provides workers that effectively run the instances of the scheduled jobs.
Index ¶
- Constants
- Variables
- type Dispatcher
- type Runnable
- type Subscriber
- type Task
- func (t *Task) Add(delta int)
- func (t *Task) AppendLog(a jobs.Action, in jobs.ActionMessage, out jobs.ActionMessage)
- func (t *Task) Done(delta int)
- func (t *Task) EnqueueRunnables(c client.Client, output chan Runnable)
- func (t *Task) GetJobTaskClone() *jobs.Task
- func (t *Task) GetRunnableChannels() (*actions.RunnableChannels, chan bool)
- func (t *Task) GlobalError(e error)
- func (t *Task) Save()
- func (t *Task) SetControllable(canStop bool, canPause bool)
- func (t *Task) SetEndTime(ti time.Time)
- func (t *Task) SetHasProgress()
- func (t *Task) SetProgress(progress float32)
- func (t *Task) SetStartTime(ti time.Time)
- func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)
- type Worker
Constants ¶
const ( PubSubTopicTaskStatuses = "tasks" PubSubTopicControl = "control" )
const (
// DefaultMaximumWorkers is set to 20.
DefaultMaximumWorkers = 20
)
Variables ¶
var (
PubSub *pubsub.PubSub
)
var (
RC int
)
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher JobQueue chan Runnable WorkerPool chan chan Runnable // contains filtered or unexported fields }
Dispatcher orchestrates the jobs by dispatching work to available workers.
func NewDispatcher ¶
func NewDispatcher(maxWorkers int) *Dispatcher
NewDispatcher creates and initialises a new Dispatcher with this amount of workers.
func (*Dispatcher) Run ¶
func (d *Dispatcher) Run()
Run simply starts the N workers of this dispacher.
func (*Dispatcher) Stop ¶
func (d *Dispatcher) Stop()
Stop sends a quit signal to all workers. NOT YET IMPLEMENTED
type Runnable ¶
type Runnable struct { jobs.Action Task *Task Message jobs.ActionMessage Client client.Client Context context.Context Implementation actions.ConcreteAction }
Runnable represents the runnable instance of a given task
func NewRunnable ¶
func NewRunnable(ctx context.Context, cl client.Client, task *Task, action *jobs.Action, message jobs.ActionMessage) Runnable
NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.
func (*Runnable) CreateChild ¶
CreateChild replicates a runnable for child action
type Subscriber ¶
type Subscriber struct { Client client.Client MainQueue chan Runnable UpdateTasksChan chan *jobs.Task JobsDefinitions map[string]*jobs.Job Dispatchers map[string]*Dispatcher RootContext context.Context // contains filtered or unexported fields }
Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions
func NewSubscriber ¶
func NewSubscriber(parentContext context.Context, client client.Client, server server.Server) *Subscriber
NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispacher, one for each job definition.
func (*Subscriber) GetDispatcherForJob ¶
func (s *Subscriber) GetDispatcherForJob(job *jobs.Job) *Dispatcher
GetDispatcherForJob creates a new dispatcher for a job
func (*Subscriber) Init ¶
func (s *Subscriber) Init() error
Init subscriber with current list of jobs from Jobs service
func (*Subscriber) ListenToMainQueue ¶
func (s *Subscriber) ListenToMainQueue()
ListenToMainQueue starts a go routine that listens to the Event Bus
func (*Subscriber) TaskChannelSubscription ¶
func (s *Subscriber) TaskChannelSubscription()
TaskChannelSubscription uses PubSub library to receive update messages from tasks
type Task ¶
func NewTaskFromEvent ¶
func (*Task) AppendLog ¶
func (t *Task) AppendLog(a jobs.Action, in jobs.ActionMessage, out jobs.ActionMessage)
func (*Task) EnqueueRunnables ¶
func (*Task) GetJobTaskClone ¶
func (*Task) GetRunnableChannels ¶
func (t *Task) GetRunnableChannels() (*actions.RunnableChannels, chan bool)
func (*Task) GlobalError ¶ added in v1.2.5
func (*Task) SetControllable ¶
func (*Task) SetEndTime ¶
func (*Task) SetHasProgress ¶
func (t *Task) SetHasProgress()
func (*Task) SetProgress ¶
func (*Task) SetStartTime ¶
type Worker ¶
type Worker struct { WorkerPool chan chan Runnable JobChannel chan Runnable JobReQueue chan Runnable // contains filtered or unexported fields }
Worker represents the worker that executes the jobs.