tasks

package
v2.3.4+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2021 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Overview

Package tasks provides workers that effectively run the instances of the scheduled jobs.

Index

Constants

View Source
const (
	PubSubTopicTaskStatuses = "tasks"
	PubSubTopicControl      = "control"
)
View Source
const (
	// DefaultMaximumWorkers is set to 20.
	DefaultMaximumWorkers = 20
)

Variables

View Source
var (
	PubSub                  *pubsub.PubSub
	ContextJobParametersKey = struct{}{}
)
View Source
var (
	RC int
)

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// A pool of workers channels that are registered with the dispatcher
	JobQueue   chan Runnable
	WorkerPool chan chan Runnable
	// contains filtered or unexported fields
}

Dispatcher orchestrates the jobs by dispatching work to available workers.

func NewDispatcher

func NewDispatcher(maxWorkers int, tags map[string]string) *Dispatcher

NewDispatcher creates and initialises a new Dispatcher with this amount of workers.

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run simply starts the N workers of this dispacher.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop sends a quit signal to all workers and the main dispatcher

type ReconnectingClient added in v1.4.0

type ReconnectingClient struct {
	// contains filtered or unexported fields
}

func NewTaskReconnectingClient added in v1.4.0

func NewTaskReconnectingClient(parentCtx context.Context) *ReconnectingClient

func (*ReconnectingClient) StartListening added in v1.4.0

func (s *ReconnectingClient) StartListening(tasksChan chan interface{})

func (*ReconnectingClient) Stop added in v1.4.0

func (s *ReconnectingClient) Stop()

type Runnable

type Runnable struct {
	jobs.Action
	Task           *Task
	Message        jobs.ActionMessage
	Client         client.Client
	Context        context.Context
	Implementation actions.ConcreteAction
	ActionPath     string
}

Runnable represents the runnable instance of a given task

func NewRunnable

func NewRunnable(ctx context.Context, parentPath string, chainIndex int, cl client.Client, task *Task, action *jobs.Action, message jobs.ActionMessage) Runnable

NewRunnable creates a new runnable and populates it with the concrete task implementation found with action.ID, if such an implementation is found.

func RootRunnable

func RootRunnable(ctx context.Context, cl client.Client, task *Task) Runnable

func (*Runnable) CreateChild

func (r *Runnable) CreateChild(parentPath string, chainIndex int, action *jobs.Action, message jobs.ActionMessage) Runnable

CreateChild replicates a runnable for child action

func (*Runnable) Dispatch

func (r *Runnable) Dispatch(parentPath string, input jobs.ActionMessage, aa []*jobs.Action, Queue chan Runnable)

Dispatch gets next runnable from Action and enqueues it to the Queue Todo - Check that done channel is working correctly with chained actions

func (*Runnable) RunAction

func (r *Runnable) RunAction(Queue chan Runnable) error

RunAction creates an action and calls Dispatch

type Subscriber

type Subscriber struct {
	Client          client.Client
	MainQueue       chan Runnable
	UpdateTasksChan chan *jobs.Task

	JobsDefinitions map[string]*jobs.Job
	Dispatchers     map[string]*Dispatcher

	RootContext context.Context
	// contains filtered or unexported fields
}

Subscriber handles incoming events, applies selectors if any and generates all ActionMessage to trigger actions

func NewSubscriber

func NewSubscriber(parentContext context.Context, client client.Client, srv server.Server) *Subscriber

NewSubscriber creates a multiplexer for tasks managements and messages by maintaining a map of dispacher, one for each job definition.

func (*Subscriber) GetDispatcherForJob

func (s *Subscriber) GetDispatcherForJob(job *jobs.Job) *Dispatcher

GetDispatcherForJob creates a new dispatcher for a job

func (*Subscriber) Init

func (s *Subscriber) Init() error

Init subscriber with current list of jobs from Jobs service

func (*Subscriber) ListenToMainQueue

func (s *Subscriber) ListenToMainQueue()

ListenToMainQueue starts a go routine that listens to the Event Bus

func (*Subscriber) Stop

func (s *Subscriber) Stop()

Stop closes internal EventsBatcher

func (*Subscriber) TaskChannelSubscription

func (s *Subscriber) TaskChannelSubscription()

TaskChannelSubscription uses PubSub library to receive update messages from tasks

type Task

type Task struct {
	*jobs.Job

	RC      int
	RunUUID string
	// contains filtered or unexported fields
}

func NewTaskFromEvent

func NewTaskFromEvent(ctx context.Context, job *jobs.Job, event interface{}) *Task

func (*Task) Add

func (t *Task) Add(delta int)

func (*Task) AppendLog

func (t *Task) AppendLog(a jobs.Action, in jobs.ActionMessage, out jobs.ActionMessage)

func (*Task) Done

func (t *Task) Done(delta int)

func (*Task) EnqueueRunnables

func (t *Task) EnqueueRunnables(c client.Client, output chan Runnable)

func (*Task) GetJobTaskClone

func (t *Task) GetJobTaskClone() *jobs.Task

func (*Task) GetRunnableChannels

func (t *Task) GetRunnableChannels() (*actions.RunnableChannels, chan bool)

func (*Task) GlobalError added in v1.2.5

func (t *Task) GlobalError(e error)

func (*Task) Save

func (t *Task) Save()

func (*Task) SetControllable

func (t *Task) SetControllable(canStop bool, canPause bool)

func (*Task) SetEndTime

func (t *Task) SetEndTime(ti time.Time)

func (*Task) SetHasProgress

func (t *Task) SetHasProgress()

func (*Task) SetProgress

func (t *Task) SetProgress(progress float32)

func (*Task) SetStartTime

func (t *Task) SetStartTime(ti time.Time)

func (*Task) SetStatus

func (t *Task) SetStatus(status jobs.TaskStatus, message ...string)

type Worker

type Worker struct {
	WorkerPool chan chan Runnable
	JobChannel chan Runnable

	JobReQueue chan Runnable
	// contains filtered or unexported fields
}

Worker represents the worker that executes the jobs.

func NewWorker

func NewWorker(workerPool chan chan Runnable, requeue chan Runnable, activeChan chan int, tags map[string]string) Worker

NewWorker creates and configures a new worker.

func (Worker) Start

func (w Worker) Start()

Start method starts the run loop for the worker, listening for a quit channel in case we need to stop it.

func (Worker) Stop

func (w Worker) Stop()

Stop signals the worker to stop listening for work requests.

Directories

Path Synopsis
Package grpc provides a gRPC service to effectively run task instances on multiple workers.
Package grpc provides a gRPC service to effectively run task instances on multiple workers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL