tasks

package
v4.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 2, 2022 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Overview

Package tasks provides workers that effectively run the instances of the scheduled jobs.

Index

Constants

View Source
const (
	PubSubTopicTaskStatuses = "tasks"
	PubSubTopicControl      = "control"
)
View Source
const (
	// DefaultMaximumWorkers is set to 20.
	DefaultMaximumWorkers = 20
)

Variables

View Source
var (
	PubSub *pubsub.PubSub
)

Functions

This section is empty.

Types

type ContextJobParametersKey

type ContextJobParametersKey struct{}

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher orchestrates the jobs by dispatching work to available workers.

func NewDispatcher

func NewDispatcher(maxWorkers int, tags map[string]string) *Dispatcher

NewDispatcher creates and initialises a new Dispatcher with this amount of workers.

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run simply starts the N workers of this dispacher.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop sends a quit signal to all workers and the main dispatcher

type ReconnectingClient

type ReconnectingClient struct {
	// contains filtered or unexported fields
}

func NewTaskReconnectingClient

func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient

func (*ReconnectingClient) StartListening

func (s *ReconnectingClient) StartListening(tasksChan chan interface{})

func (*ReconnectingClient) Stop

func (s *ReconnectingClient) Stop()

type Runnable

type Runnable struct {
	*jobs.Action
	Task           *Task
	Message        *jobs.ActionMessage
	Context        context.Context
	Implementation actions.ConcreteAction
	ActionPath     string
}

Runnable represents the runnable instance of a given task

func NewRunnable

func NewRunnable(ctx context.Context, chainIndex int, task *Task, action *jobs.Action, message *jobs.ActionMessage) Runnable

NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.

func RootRunnable

func RootRunnable(ctx context.Context, task *Task) Runnable

func (*Runnable) Dispatch

func (r *Runnable) Dispatch(parentPath string, input *jobs.ActionMessage, aa []*jobs.Action, queue chan Runnable)

Dispatch gets next runnable from Action and enqueues it to the Queue Done channel should be working correctly with chained actions

func (*Runnable) RunAction

func (r *Runnable) RunAction(queue chan Runnable)

RunAction creates an action and calls Dispatch

type Subscriber

type Subscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions

func NewSubscriber

func NewSubscriber(parentContext context.Context) *Subscriber

NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispatcher, one for each job definition.

func (*Subscriber) Init

func (s *Subscriber) Init(ctx context.Context) error

Init subscriber with current list of jobs from Jobs service

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop closes internal EventsBatcher

type Task

type Task struct {
	*jobs.Job
	sync.RWMutex
	common.RuntimeHolder
	// contains filtered or unexported fields
}

func NewTaskFromEvent

func NewTaskFromEvent(runtime, ctx context.Context, job *jobs.Job, event interface{}) *Task

NewTaskFromEvent creates a task based on incoming job and event

func (*Task) Add

func (t *Task) Add(delta int)

Add increments task internal retain counter

func (*Task) AppendLog

func (t *Task) AppendLog(a *jobs.Action, in *jobs.ActionMessage, out *jobs.ActionMessage)

AppendLog appends logs from an action to the task OutputChain

func (*Task) CleanUp added in v4.0.1

func (t *Task) CleanUp()

CleanUp is triggered after a task has no more subroutines running.

func (*Task) Done

func (t *Task) Done(delta int)

Done decrements task internal retain counter - When reaching 0, it triggers the CleanUp operation

func (*Task) GetJobTaskClone

func (t *Task) GetJobTaskClone() *jobs.Task

GetJobTaskClone creates a protobuf clone of this task

func (*Task) GetRunUUID

func (t *Task) GetRunUUID() string

GetRunUUID returns the task internal run UUID

func (*Task) GetRunnableChannels

func (t *Task) GetRunnableChannels(controllable bool) (*actions.RunnableChannels, chan bool)

GetRunnableChannels prepares a set of data channels for action actual Run method.

func (*Task) Queue added in v4.0.1

func (t *Task) Queue(queue chan Runnable)

Queue send this new task to the global queue

func (*Task) Save

func (t *Task) Save()

Save publish task to PubSub topic

func (*Task) SetControllable

func (t *Task) SetControllable(canPause bool)

SetControllable flags task as being able to be stopped or paused

func (*Task) SetEndTime

func (t *Task) SetEndTime(ti time.Time)

SetEndTime updates end time

func (*Task) SetError added in v4.0.1

func (t *Task) SetError(e error, appendLog bool)

SetError set task in error globally

func (*Task) SetHasProgress

func (t *Task) SetHasProgress()

SetHasProgress flags task as providing progress information

func (*Task) SetProgress

func (t *Task) SetProgress(progress float32)

SetProgress updates task internal progress

func (*Task) SetStartTime

func (t *Task) SetStartTime(ti time.Time)

SetStartTime updates start time

func (*Task) SetStatus

func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)

SetStatus updates task internal status

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents the worker that executes the jobs.

func NewWorker

func NewWorker(workerPool chan chan Runnable, requeue chan Runnable, activeChan chan int, tags map[string]string) Worker

NewWorker creates and configures a new worker.

func (Worker) Start

func (w Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it.

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Directories

Path Synopsis
Package grpc provides a gRPC service to effectively run task instances on multiple workers.
Package grpc provides a gRPC service to effectively run task instances on multiple workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL