executor

package
v2.0.0-beta.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2020 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewExecutor

func NewExecutor(log *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, opts ...executorOption) (*Executor, *ExecutorMetrics)

NewExecutor creates a new task executor

func NewRunCollector

func NewRunCollector(ex *Executor) prometheus.Collector

NewRunCollector returns a collector which exports influxdb process metrics.

func WithMaxWorkers

func WithMaxWorkers(n int) executorOption

WithMaxWorkers specifies the number of workers used by the Executor.

Types

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor it a task specific executor that works with the new scheduler system.

func (*Executor) Cancel

func (e *Executor) Cancel(ctx context.Context, runID influxdb.ID) error

Cancel a run of a specific task.

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) error

Execute is a executor to satisfy the needs of tasks

func (*Executor) ManualRun

func (e *Executor) ManualRun(ctx context.Context, id influxdb.ID, runID influxdb.ID) (Promise, error)

func (*Executor) PromiseQueueUsage

func (e *Executor) PromiseQueueUsage() float64

PromiseQueueUsage returns the percent of the Promise Queue that is currently filled

func (*Executor) PromisedExecute

func (e *Executor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) (Promise, error)

PromisedExecute begins execution for the tasks id with a specific scheduledFor time. When we execute we will first build a run for the scheduledFor time, We then want to add to the queue anything that was manually queued to run. If the queue is full the call to execute should hang and apply back pressure to the caller We then start a worker to work the newly queued jobs.

func (*Executor) ResumeCurrentRun

func (e *Executor) ResumeCurrentRun(ctx context.Context, id influxdb.ID, runID influxdb.ID) (Promise, error)

func (*Executor) RunsActive

func (e *Executor) RunsActive() int

RunsActive returns the current number of workers, which is equivalent to the number of runs actively running

func (*Executor) SetLimitFunc

func (e *Executor) SetLimitFunc(l LimitFunc)

SetLimitFunc sets the limit func for this task executor

func (*Executor) WorkersBusy

func (e *Executor) WorkersBusy() float64

WorkersBusy returns the percent of total workers that are busy

type ExecutorMetrics

type ExecutorMetrics struct {
	// contains filtered or unexported fields
}

func NewExecutorMetrics

func NewExecutorMetrics(ex *Executor) *ExecutorMetrics

func (*ExecutorMetrics) FinishRun

func (em *ExecutorMetrics) FinishRun(task *influxdb.Task, status influxdb.RunStatus, runDuration time.Duration)

FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID.

func (*ExecutorMetrics) LogError

func (em *ExecutorMetrics) LogError(taskType string, err error)

LogError increments the count of errors by error code.

func (*ExecutorMetrics) LogUnrecoverableError

func (em *ExecutorMetrics) LogUnrecoverableError(taskID influxdb.ID, err error)

LogUnrecoverableError increments the count of unrecoverable errors, which require admin intervention to resolve or deactivate This count is separate from the errors count so that the errors metric can be used to identify only internal, rather than user errors and so that unrecoverable errors can be quickly identified for deactivation

func (*ExecutorMetrics) PrometheusCollectors

func (em *ExecutorMetrics) PrometheusCollectors() []prometheus.Collector

PrometheusCollectors satisfies the prom.PrometheusCollector interface.

func (*ExecutorMetrics) StartRun

func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration, runLatency time.Duration)

StartRun store the delta time between when a run is due to start and actually starting.

type LimitFunc

type LimitFunc func(*influxdb.Task, *influxdb.Run) error

LimitFunc is a function the executor will use to

func ConcurrencyLimit

func ConcurrencyLimit(exec *Executor) LimitFunc

ConcurrencyLimit creates a concurrency limit func that uses the executor to determine if the task has exceeded the concurrency limit.

func MultiLimit

func MultiLimit(limits ...LimitFunc) LimitFunc

MultiLimit allows us to create a single limit func that applies more then one limit.

type Promise

type Promise interface {
	ID() influxdb.ID
	Cancel(ctx context.Context)
	Done() <-chan struct{}
	Error() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL