tasks

package
v4.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2023 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Overview

Package tasks provides workers that effectively run the instances of the scheduled jobs.

Index

Constants

View Source
const (
	PubSubTopicTaskStatuses = "tasks"
	PubSubTopicControl      = "control"
)
View Source
const (
	// DefaultMaximumWorkers is set to 20.
	DefaultMaximumWorkers = 20
)

Variables

View Source
var (
	PubSub *pubsub.PubSub
)

Functions

This section is empty.

Types

type ContextJobParametersKey

type ContextJobParametersKey struct{}

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher orchestrates the jobs by dispatching work to available workers.

func NewDispatcher

func NewDispatcher(maxWorkers int, tags map[string]string) *Dispatcher

NewDispatcher creates and initialises a new Dispatcher with this amount of workers.

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run simply starts the N workers of this dispacher.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop sends a quit signal to all workers and the main dispatcher

type ReconnectingClient

type ReconnectingClient struct {
	// contains filtered or unexported fields
}

func NewTaskReconnectingClient

func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient

func (*ReconnectingClient) StartListening

func (s *ReconnectingClient) StartListening(tasksChan chan interface{})

func (*ReconnectingClient) Stop

func (s *ReconnectingClient) Stop()

type Runnable

type Runnable struct {
	*jobs.Action
	Task           *Task
	Message        *jobs.ActionMessage
	Context        context.Context
	Implementation actions.ConcreteAction
	ActionPath     string
	// contains filtered or unexported fields
}

Runnable represents the runnable instance of a given task

func NewRunnable

func NewRunnable(ctx context.Context, parent *Runnable, queue chan RunnerFunc, action *jobs.Action, message *jobs.ActionMessage, indexTag ...int) Runnable

NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.

func RootRunnable

func RootRunnable(ctx context.Context, task *Task) Runnable

func (*Runnable) Add added in v4.0.5

func (r *Runnable) Add(i int)

func (*Runnable) AsRunnerFuncRun added in v4.0.5

func (r *Runnable) AsRunnerFuncRun() RunnerFunc

func (*Runnable) Dispatch

func (r *Runnable) Dispatch(input *jobs.ActionMessage, aa []*jobs.Action, queue chan RunnerFunc)

Dispatch gets next Runnable list from action and enqueues them to the Queue Done channel should be working correctly with chained actions

func (*Runnable) Done added in v4.0.5

func (r *Runnable) Done()

func (*Runnable) RunAction

func (r *Runnable) RunAction(queue chan RunnerFunc)

RunAction creates an action and calls Dispatch

func (*Runnable) SetupCollector added in v4.0.5

func (r *Runnable) SetupCollector(parentCtx context.Context, mergeAction *jobs.Action, queue chan RunnerFunc)

type RunnerFunc added in v4.0.5

type RunnerFunc func(queue chan RunnerFunc)

type Subscriber

type Subscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions

func NewSubscriber

func NewSubscriber(parentContext context.Context) *Subscriber

NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispatcher, one for each job definition.

func (*Subscriber) Init

func (s *Subscriber) Init(ctx context.Context) error

Init subscriber with current list of jobs from Jobs service

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop closes internal EventsBatcher

type Task

type Task struct {
	*jobs.Job
	sync.RWMutex
	common.RuntimeHolder
	// contains filtered or unexported fields
}

func NewTaskFromEvent

func NewTaskFromEvent(runtime, ctx context.Context, job *jobs.Job, event interface{}) *Task

NewTaskFromEvent creates a task based on incoming job and event

func (*Task) Add

func (t *Task) Add(delta int)

Add increments task internal retain counter

func (*Task) CleanUp added in v4.0.1

func (t *Task) CleanUp()

CleanUp is triggered after a task has no more subroutines running.

func (*Task) Done

func (t *Task) Done(delta int)

Done decrements task internal retain counter - When reaching 0, it triggers the CleanUp operation

func (*Task) GetJobTaskClone

func (t *Task) GetJobTaskClone() *jobs.Task

GetJobTaskClone creates a protobuf clone of this task

func (*Task) GetRunUUID

func (t *Task) GetRunUUID() string

GetRunUUID returns the task internal run UUID

func (*Task) GetRunnableChannels

func (t *Task) GetRunnableChannels(controllable bool) (*actions.RunnableChannels, chan bool)

GetRunnableChannels prepares a set of data channels for action actual Run method.

func (*Task) Queue added in v4.0.1

func (t *Task) Queue(queue chan RunnerFunc)

Queue send this new task to the global queue

func (*Task) Save

func (t *Task) Save()

Save publish task to PubSub topic

func (*Task) SetControllable

func (t *Task) SetControllable(canPause bool)

SetControllable flags task as being able to be stopped or paused

func (*Task) SetEndTime

func (t *Task) SetEndTime(ti time.Time)

SetEndTime updates end time

func (*Task) SetError added in v4.0.1

func (t *Task) SetError(e error, appendLog bool)

SetError set task in error globally

func (*Task) SetHasProgress

func (t *Task) SetHasProgress()

SetHasProgress flags task as providing progress information

func (*Task) SetProgress

func (t *Task) SetProgress(progress float32)

SetProgress updates task internal progress

func (*Task) SetStartTime

func (t *Task) SetStartTime(ti time.Time)

SetStartTime updates start time

func (*Task) SetStatus

func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)

SetStatus updates task internal status

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents the worker that executes the jobs.

func NewWorker

func NewWorker(workerPool chan chan RunnerFunc, requeue chan RunnerFunc, activeChan chan int, tags map[string]string) Worker

NewWorker creates and configures a new worker.

func (Worker) Start

func (w Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it.

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Directories

Path Synopsis
Package grpc provides a gRPC service to effectively run task instances on multiple workers.
Package grpc provides a gRPC service to effectively run task instances on multiple workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL