worker

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: Apache-2.0 Imports: 13 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

TaskRunner implements polling and execution logic for a Conductor worker. Every polling interval, each running task attempts to retrieve a from Conductor. Multiple tasks can be started in parallel. All Goroutines started by this worker cannot be stopped, only paused and resumed.

Conductor tasks are tracked by name separately. Each TaskRunner tracks a separate poll interval and batch size for each task, which is shared by all workers running that task. For instance, if task "foo" is running with a batch size of n, and k workers, the average number of tasks retrieved during each polling interval is n*k.

All methods on TaskRunner are thread-safe.

func NewTaskRunner

func NewTaskRunner(authenticationSettings *settings.AuthenticationSettings, httpSettings *settings.HttpSettings) *TaskRunner

NewTaskRunner returns a new TaskRunner which authenticates via HTTP using the provided settings.

func NewTaskRunnerWithApiClient

func NewTaskRunnerWithApiClient(
	apiClient *client.APIClient,
) *TaskRunner

NewTaskRunnerWithApiClient creates a new TaskRunner which uses the provided client.APIClient to communicate with Conductor.

func (*TaskRunner) DecreaseBatchSize added in v1.2.3

func (c *TaskRunner) DecreaseBatchSize(taskName string, batchSize int) error

DecreaseBatchSize decreases the batch size used for all workers running the provided task.

func (*TaskRunner) GetBatchSizeForAll added in v1.2.3

func (c *TaskRunner) GetBatchSizeForAll() (batchSizeByTaskName map[string]int)

GetBatchSizeForAll returns a map from taskName to batch size for all batch sizes currently registered with this TaskRunner.

func (*TaskRunner) GetBatchSizeForTask added in v1.2.3

func (c *TaskRunner) GetBatchSizeForTask(taskName string) (batchSize int)

GetBatchSizeForTask retrieves the current batch size for the provided task.

func (*TaskRunner) GetPollIntervalForTask added in v1.2.3

func (c *TaskRunner) GetPollIntervalForTask(taskName string) (pollInterval time.Duration, err error)

GetPollIntervalForTask retrieves the poll interval for all tasks running the provided taskName. An error is returned if no pollInterval has been registered for the provided task.

func (*TaskRunner) IncreaseBatchSize added in v1.2.3

func (c *TaskRunner) IncreaseBatchSize(taskName string, batchSize int) error

IncreaseBatchSize increases the batch size used for all workers running the provided task.

func (*TaskRunner) Pause added in v1.2.9

func (c *TaskRunner) Pause(taskName string)

Pause pauses all workers running the provided task. When paused, workers will not poll for new tasks and no new goroutines are started. However it does not stop any goroutines running. Workers must be resumed at a later time using Resume. Failing to call `Resume()` on a TaskRunner running one or more workers can result in a goroutine leak.

func (*TaskRunner) Resume added in v1.2.9

func (c *TaskRunner) Resume(taskName string)

Resume all running workers for the provided taskName. If workers for the provided task are not paused, calling this method has no impact.

func (*TaskRunner) SetBatchSize added in v1.2.3

func (c *TaskRunner) SetBatchSize(taskName string, batchSize int) error

SetBatchSize can be used to set the batch size for all workers running the provided task.

func (*TaskRunner) SetPollIntervalForTask added in v1.2.3

func (c *TaskRunner) SetPollIntervalForTask(taskName string, pollInterval time.Duration) error

SetPollIntervalForTask sets the pollInterval for all workers running the task with the provided taskName.

func (*TaskRunner) SetSleepOnGenericError added in v1.3.4

func (c *TaskRunner) SetSleepOnGenericError(duration time.Duration)

SetSleepOnGenericError Sets the time for which to wait before continuing to poll/execute when there is an error Default is 200 millis, and this function can be used to increase/decrease the duration of the wait time Useful to avoid excessive logs in the worker when there are intermittent issues

func (*TaskRunner) StartWorker

func (c *TaskRunner) StartWorker(taskName string, executeFunction model.ExecuteTaskFunction, batchSize int, pollInterval time.Duration) error

StartWorker starts a worker on a new goroutine, which polls conductor periodically for tasks matching the provided taskName and, if any are available, uses executeFunction to run them on a separate goroutine. Each call to StartWorker starts a new goroutine which performs batch polling to retrieve as many tasks from Conductor as are available, up to the batchSize set for the task. This func additionally sets the pollInterval and increases the batch size for the task, which applies to all tasks shared by this TaskRunner with the same taskName.

func (*TaskRunner) StartWorkerWithDomain

func (c *TaskRunner) StartWorkerWithDomain(taskName string, executeFunction model.ExecuteTaskFunction, batchSize int, pollInterval time.Duration, domain string) error

StartWorkerWithDomain starts a polling worker on a new goroutine, which only polls for tasks using the provided domain. Equivalent to:

StartWorkerWithDomain(taskName, executeFunction, batchSize, pollInterval, "")

func (*TaskRunner) WaitWorkers

func (c *TaskRunner) WaitWorkers()

WaitWorkers uses an internal waitgroup to block the calling thread until all workers started by this TaskRunner have been stopped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL