worker

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type WorkerHost added in v1.5.0

type WorkerHost struct {
	// contains filtered or unexported fields
}

WorkerHost implements polling and execution logic for a Conductor worker. Every polling interval, each running task attempts to retrieve a from Conductor. Multiple tasks can be started in parallel. All Goroutines started by this worker cannot be stopped, only paused and resumed.

Conductor tasks are tracked by name separately. Each WorkerHost tracks a separate poll interval and batch size for each task, which is shared by all workers running that task. For instance, if task "foo" is running with a batch size of n, and k workers, the average number of tasks retrieved during each polling interval is n*k.

All methods on WorkerHost are thread-safe.

func NewWorkerHost added in v1.5.0

func NewWorkerHost(httpSettings *settings.HttpSettings) *WorkerHost

NewWorkerHost returns a new WorkerHost using the provided settings.

func NewWorkerHostWithApiClient added in v1.5.0

func NewWorkerHostWithApiClient(apiClient *client.APIClient) *WorkerHost

NewWorkerHostWithApiClient creates a new WorkerHost which uses the provided client.APIClient to communicate with Conductor.

func (*WorkerHost) DecreaseBatchSize added in v1.5.0

func (c *WorkerHost) DecreaseBatchSize(taskName string, batchSize int) error

DecreaseBatchSize decreases the batch size used for all workers running the provided task.

func (*WorkerHost) GetBatchSizeForAll added in v1.5.0

func (c *WorkerHost) GetBatchSizeForAll() (batchSizeByTaskName map[string]int)

GetBatchSizeForAll returns a map from taskName to batch size for all batch sizes currently registered with this WorkerHost.

func (*WorkerHost) GetBatchSizeForTask added in v1.5.0

func (c *WorkerHost) GetBatchSizeForTask(taskName string) (batchSize int)

GetBatchSizeForTask retrieves the current batch size for the provided task.

func (*WorkerHost) GetPollIntervalForTask added in v1.5.0

func (c *WorkerHost) GetPollIntervalForTask(taskName string) (pollInterval time.Duration, err error)

GetPollIntervalForTask retrieves the poll interval for all tasks running the provided taskName. An error is returned if no pollInterval has been registered for the provided task.

func (*WorkerHost) IncreaseBatchSize added in v1.5.0

func (c *WorkerHost) IncreaseBatchSize(taskName string, batchSize int) error

IncreaseBatchSize increases the batch size used for all workers running the provided task.

func (*WorkerHost) Pause added in v1.5.0

func (c *WorkerHost) Pause(taskName string)

Pause pauses all workers running the provided task. When paused, workers will not poll for new tasks and no new goroutines are started. However it does not stop any goroutines running. Workers must be resumed at a later time using Resume. Failing to call `Resume()` on a WorkerHost running one or more workers can result in a goroutine leak.

func (*WorkerHost) Resume added in v1.5.0

func (c *WorkerHost) Resume(taskName string)

Resume all running workers for the provided taskName. If workers for the provided task are not paused, calling this method has no impact.

func (*WorkerHost) SetBatchSize added in v1.5.0

func (c *WorkerHost) SetBatchSize(taskName string, batchSize int) error

SetBatchSize can be used to set the batch size for all workers running the provided task.

func (*WorkerHost) SetPollIntervalForTask added in v1.5.0

func (c *WorkerHost) SetPollIntervalForTask(taskName string, pollInterval time.Duration) error

SetPollIntervalForTask sets the pollInterval for all workers running the task with the provided taskName.

func (*WorkerHost) SetSleepOnGenericError added in v1.5.0

func (c *WorkerHost) SetSleepOnGenericError(duration time.Duration)

SetSleepOnGenericError Sets the time for which to wait before continuing to poll/execute when there is an error Default is 200 millis, and this function can be used to increase/decrease the duration of the wait time Useful to avoid excessive logs in the worker when there are intermittent issues

func (*WorkerHost) StartWorker added in v1.5.0

func (c *WorkerHost) StartWorker(taskName string, executeFunction model.WorkerTaskFunction, batchSize int, pollInterval time.Duration) error

StartWorker starts a worker on a new goroutine, which polls conductor periodically for tasks matching the provided taskName and, if any are available, uses executeFunction to run them on a separate goroutine. Each call to StartWorker starts a new goroutine which performs batch polling to retrieve as many tasks from Conductor as are available, up to the batchSize set for the task. This func additionally sets the pollInterval and increases the batch size for the task, which applies to all tasks shared by this WorkerHost with the same taskName.

func (*WorkerHost) StartWorkerWithDomain added in v1.5.0

func (c *WorkerHost) StartWorkerWithDomain(taskName string, executeFunction model.WorkerTaskFunction, batchSize int, pollInterval time.Duration, domain string) error

StartWorkerWithDomain starts a polling worker on a new goroutine, which only polls for tasks using the provided domain. Equivalent to:

StartWorkerWithDomain(taskName, executeFunction, batchSize, pollInterval, "")

func (*WorkerHost) WaitWorkers added in v1.5.0

func (c *WorkerHost) WaitWorkers()

WaitWorkers uses an internal waitgroup to block the calling thread until all workers started by this WorkerHost have been stopped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL