Documentation ¶
Index ¶
- Variables
- func SwitchMetricsServiceName(serviceName string)
- type Dequeuer
- type Progress
- type QueueMock
- func (q *QueueMock) Dequeue(ctx context.Context, queue ...string) (*Task, error)
- func (q *QueueMock) Enqueue(ctx context.Context, task TaskEnqueueRequest) error
- func (q *QueueMock) Fail(ctx context.Context, taskID string, progress Progress) error
- func (q *QueueMock) Finish(ctx context.Context, taskID string, progress Progress) error
- func (q *QueueMock) Heartbeat(ctx context.Context, taskID string, progress Progress) error
- type Queuer
- type References
- type ScheduleWorkerMetricsType
- type Scheduler
- type SchedulerMetricsType
- type SchedulerMock
- func (s *SchedulerMock) AssertSchedule(ctx context.Context, task TaskScheduleRequest) error
- func (s *SchedulerMock) EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error
- func (s *SchedulerMock) Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)
- type Spec
- type Task
- type TaskBase
- type TaskEnqueueRequest
- type TaskHandler
- type TaskHandlerFunc
- type TaskQueueMetricsType
- type TaskScheduleRequest
- type TaskStatus
- type TaskType
- type Worker
- type WorkerMetricsType
Constants ¶
This section is empty.
Variables ¶
var ( // TaskQueueMetrics is the global metrics instance for the task queue of this instance TaskQueueMetrics = TaskQueueMetricsType{ Labels: queueMetricLabels, TaskCounter: promauto.NewCounterVec( defTaskCounterOpts, taskMetricLabels, ), EnqueueDuration: promauto.NewHistogramVec( defEnqueueDurationOpts, queueMetricLabels, ), } // TaskWorkerMetrics is the global metrics instance for the task worker of this instance TaskWorkerMetrics = WorkerMetricsType{ Labels: taskMetricLabels, ActiveGauge: promauto.NewGauge(defTaskWorkerActiveGaugeOpts), WorkingGauge: promauto.NewGauge(defTaskWorkerWorkingGaugeOpts), DequeueingGauge: promauto.NewGauge(defTaskWorkerDequeueingGaugeOpts), ProcessingDuration: promauto.NewHistogram(defTaskWorkerProcessingDurationOpts), DequeueingDuration: promauto.NewHistogram(defTaskWorkerDequeueingDurationOpts), ProcessedCounter: promauto.NewCounterVec(defTaskWorkerProcessedCounterOpts, taskMetricLabels), DequeueErrorCounter: promauto.NewCounter(defTaskWorkerDequeueErrorsCounterOpts), ProcessingErrorsCounter: promauto.NewCounterVec(defTaskWorkerProcessingErrorsCounterOpts, taskMetricLabels), ErrorsCounter: promauto.NewCounter(defTaskWorkerErrorsCounterOpts), } // SchedulerMetrics is the global metrics instance for the scheduler of this instance SchedulerMetrics = SchedulerMetricsType{ Labels: queueMetricLabels, ScheduleCounter: promauto.NewCounterVec( defScheduleCounterOpts, taskMetricLabels, ), ErrorCounter: promauto.NewCounterVec( defSchedulerErrorCounterOpts, taskMetricLabels, ), } // ScheduleWorkerMetrics is the global metrics instance for the schedule worker of this instance ScheduleWorkerMetrics = ScheduleWorkerMetricsType{ WorkerMetricsType: WorkerMetricsType{ Labels: taskMetricLabels, ActiveGauge: promauto.NewGauge(defScheduleWorkerActiveGaugeOpts), WorkingGauge: promauto.NewGauge(defScheduleWorkerWorkingGaugeOpts), DequeueingGauge: promauto.NewGauge(defScheduleWorkerDequeueingGaugeOpts), ProcessingDuration: promauto.NewHistogram(defScheduleWorkerProcessingDurationOpts), DequeueingDuration: promauto.NewHistogram(defScheduleWorkerDequeueingDurationOpts), ProcessedCounter: promauto.NewCounterVec(defScheduleWorkerProcessedCounterOpts, taskMetricLabels), DequeueErrorCounter: promauto.NewCounter(defScheduleWorkerDequeueErrorsCounterOpts), ProcessingErrorsCounter: promauto.NewCounterVec(defScheduleWorkerProcessingErrorsCounterOpts, taskMetricLabels), ErrorsCounter: promauto.NewCounter(defScheduleWorkerErrorsCounterOpts), }, WaitingGauge: promauto.NewGauge(defScheduleWorkerWaitingGaugeOpts), } )
var ( // ErrTaskCancelled is used to notify the worker from the Heartbeat that the task // was canceled by the user and it needs to stop working on it. ErrTaskCancelled = errors.New("task has been canceled") // ErrTaskFinished is used to notify the worker from the Heartbeat that the task has // already finished and can't be worked on ErrTaskFinished = errors.New("task has finished") // ErrTaskNotRunning is used to notify the worker from the Heartbeat that the task has // not started yet and can't be worked on ErrTaskNotRunning = errors.New("task has not started yet") // ErrTaskNotFound is used to notify the worker from the Heartbeat that the task does not // exist ErrTaskNotFound = errors.New("task not found") // ErrTaskFailed indicates that a task has failed and should not be restarted ErrTaskFailed = errors.New("task failed") // ErrTaskQueueNotSpecified indicates that the given task cannot be enqueued because // the queue name is empty ErrTaskQueueNotSpecified = errors.New("task queue name cannot be blank") // ErrTaskTypeNotSpecified indicates that the given task cannot be enqueued because // the task type is empty ErrTaskTypeNotSpecified = errors.New("task type cannot be blank") )
var ErrNotScheduled = errors.New("Task not currently scheduled")
ErrNotScheduled indicates the current scheduled task is has not been created yet. This should be returned from the EnsureSchedule method
Functions ¶
func SwitchMetricsServiceName ¶
func SwitchMetricsServiceName(serviceName string)
SwitchMetricsServiceName changes the service label used in the metrics, so it can be customized with a different name
Types ¶
type Dequeuer ¶
type Dequeuer interface { // Dequeue removes a task for processing from the provided queue // It's expected to block until work is available // // For zero-length queue parameter, dequeue should return a task from a random queue; // otherwise dequeue should return a task from the supplied list of queues. Dequeue(ctx context.Context, queue ...string) (*Task, error) // Heartbeat updates the task to ensure that it's still being processed by a worker. // // Once a task has begun processing, 'Heartbeat' should be called at known // intervals to update the 'Progress' and 'LastHeartbeat' fields. // This provides a way to determine if the worker is still processing. Heartbeat(ctx context.Context, taskID string, progress Progress) error // Finish marks the task as completed // It's recommended to update the progress to a final state to ensure there is // no ambiguity in whether or not the task is complete Finish(ctx context.Context, taskID string, progress Progress) error // Finish marks the task as failed // It's recommended to update the progress to a final state. Fail(ctx context.Context, taskID string, progress Progress) error }
Dequeuer is a read only interface for the queue
func DequeuerWithMetrics ¶
DequeuerWithMetrics returns q wrapped with the standard metrics implementation
type Progress ¶
type Progress []byte
Progress is a serialized status object that indicates the status of the task For example, it can be an object that contains the amount of processed bytes. The actual underlying type depends on the task type and consumers must serialize the value.
type QueueMock ¶
type QueueMock struct { Queue chan Task EnqueueErr error DequeueErr error HeartbeatErr error FinishErr error FailErr error EnqueueCount int DequeueCount int HeartbeatCount int FinishCount int FailCount int }
QueueMock implements queue with very simple counters for testing
func (*QueueMock) Enqueue ¶
func (q *QueueMock) Enqueue(ctx context.Context, task TaskEnqueueRequest) error
Enqueue implements queue Manager for testing
type Queuer ¶
type Queuer interface { // Enqueue adds a task to the provided queue within the Task object Enqueue(ctx context.Context, task TaskEnqueueRequest) error }
Queuer is a write-only interface for the queue
func QueuerWithMetrics ¶
QueuerWithMetrics returns q wrapped with the standard metrics implementation
type References ¶
type References map[string]interface{}
References is a dictionary of additinal SQL columns to set. Normally this dictionary contains referencies to other entities e.g. datasource_id, table_id, schedule_id, etc. So, the SQL table can have the `DELETE CASCADE` setting.
func (References) GetNamesAndValues ¶
func (a References) GetNamesAndValues() (keys []string, values []interface{})
GetNamesAndValues returns a slice of column names and a slice of values accordingly. It's to easily use it in the SQL builder.
type ScheduleWorkerMetricsType ¶
type ScheduleWorkerMetricsType struct { WorkerMetricsType // WaitingGauge is a gauge of waiting instances. // Unlike the task worker the schedule worker also waits between its iterations. WaitingGauge prometheus.Gauge }
ScheduleWorkerMetricsType provides access to the prometheus metric objects for a schedule worker
type Scheduler ¶
type Scheduler interface { // Schedule creates a cron schedule according to which the worker will enqueue // the given task. // `task.CronSchedule` cannot be blank. Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error // EnsureSchedule checks if a task for with the given queue, type, and references // currently exists. An error ErrNotScheduled is returned if a current task cannot be found. // implementation and validation errors may also be returned and should be checked for. EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error) // AssertSchedule makes sure a schedule with the given parameters exists, if it does not // this function will create one. AssertSchedule(ctx context.Context, task TaskScheduleRequest) (err error) }
Scheduler defines how one schedules a task
func SchedulerWithMetrics ¶
SchedulerWithMetrics returns q wrapped with the standard metrics implementation
type SchedulerMetricsType ¶
type SchedulerMetricsType struct { Labels []string ScheduleCounter *prometheus.CounterVec ErrorCounter *prometheus.CounterVec }
TaskQueueMetricsType provides access to the prometheus metric objects for the scheduler
type SchedulerMock ¶
type SchedulerMock struct { ScheduleErr error ScheduleCount int EnsureError error EnsureCount int AssertError error AssertCount int }
SchedulerMock implement scheduler with very simple counters for testing
func (*SchedulerMock) AssertSchedule ¶
func (s *SchedulerMock) AssertSchedule(ctx context.Context, task TaskScheduleRequest) error
func (*SchedulerMock) EnsureSchedule ¶
func (s *SchedulerMock) EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error
func (*SchedulerMock) Schedule ¶
func (s *SchedulerMock) Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)
type Spec ¶
type Spec []byte
Spec is is a serialized specification object used to set the task parameters. The actual underlying type depends on the task type and consumers must serialize the value.
type Task ¶
type Task struct { TaskBase // ID is the id of the task ID string // Status is the current status of the task Status TaskStatus // Progress is used for workers to update the status of the task. // For example, bytes processed. // The actual underlying type is specific for each task type. Progress Progress // CreatedAt is when the task was initially put in the queue CreatedAt time.Time // CreatedAt is when the task was initially put in the queue UpdatedAt time.Time // StartedAt is when a worker picked up the task StartedAt *time.Time // FinishedAt is when the task was finished being processed FinishedAt *time.Time // LastHeartbeat provides a way to ensure the task is still being processed and hasn't failed. LastHeartbeatAt *time.Time }
Task represents a task in the queue
type TaskBase ¶
type TaskBase struct { // Queue is the queue to which the task belongs Queue string // Type holds a task type identifier Type TaskType // Spec contains the task specification based on type of the task. Spec Spec }
TaskBase contains basic fields for a task
type TaskEnqueueRequest ¶
type TaskEnqueueRequest struct { TaskBase // References contain names and values for additinal // SQL columns to set external references for a task for easy clean up References References }
TaskEnqueueRequest contains fields required for adding a task to the queue
type TaskHandler ¶
type TaskHandler interface { // Process implements the specific Task parsing logic Process(ctx context.Context, task Task, heartbeats chan<- Progress) (err error) }
TaskHandler is a type alias for a method that parses a task and returns any processing errors
type TaskHandlerFunc ¶
TaskHandlerFunc is an adapter that allows the use of a normal function as a TaskHandler
type TaskQueueMetricsType ¶
type TaskQueueMetricsType struct { Labels []string TaskCounter *prometheus.CounterVec EnqueueDuration *prometheus.HistogramVec }
TaskQueueMetricsType provides access to the prometheus metric objects for the task queue
type TaskScheduleRequest ¶
type TaskScheduleRequest struct { TaskBase // CronSchedule is the schedule impression in cron syntax that defines // when the task should be executed. CronSchedule string // References contain names and values for additinal // SQL columns to set external references for a schedule for easy clean up References References }
TaskScheduleRequest contains fields required for scheduling a task
type TaskStatus ¶
type TaskStatus string
TaskStatus : Current state of the task
const ( // Waiting task is waiting for being picked up Waiting TaskStatus = "waiting" // Running task is currently running Running TaskStatus = "running" // Cancelled task is canceled by the user // nolint: misspell // the initial spelling was with double 'l' now we have to stick with it Cancelled TaskStatus = "cancelled" // Finished task is successfully finished Finished TaskStatus = "finished" // Failed task is failed with an error Failed TaskStatus = "failed" )
List of TaskStatus
type Worker ¶
type Worker interface { // Work is responsible for getting and processing tasks // It should run continuously or until the context is canceled Work(context.Context) error }
Worker provides methods to do some kind of work
type WorkerMetricsType ¶
type WorkerMetricsType struct { // Labels available in its vector metrics Labels []string // ActiveGauge is a gauge of active instances ActiveGauge prometheus.Gauge // WorkingGauge is a gauge of working instances WorkingGauge prometheus.Gauge // DequeueingGauge is a gauge of instances that are trying to dequeue a task DequeueingGauge prometheus.Gauge // ProcessingDuration is a total duration of tasks being processed in seconds ProcessingDuration prometheus.Histogram // DequeueingDuration is a total duration spent waiting for a new task in seconds DequeueingDuration prometheus.Histogram // ProcessedCounter is a total count of processed tasks ProcessedCounter *prometheus.CounterVec // DequeueErrorCounter is a total count of dequeueing errors DequeueErrorCounter prometheus.Counter // ProcessingErrorsCounter is a total count of errors while handling the task ProcessingErrorsCounter *prometheus.CounterVec // ErrorsCounter is a total count of errors ErrorsCounter prometheus.Counter }
WorkerMetricsType provides access to the prometheus metric objects for a worker