queue

package
v4.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2021 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	// TaskQueueMetrics is the global metrics instance for the task queue of this instance
	TaskQueueMetrics = TaskQueueMetricsType{
		Labels: queueMetricLabels,
		TaskCounter: promauto.NewCounterVec(
			defTaskCounterOpts,
			taskMetricLabels,
		),
		EnqueueDuration: promauto.NewHistogramVec(
			defEnqueueDurationOpts,
			queueMetricLabels,
		),
	}

	// TaskWorkerMetrics is the global metrics instance for the task worker of this instance
	TaskWorkerMetrics = WorkerMetricsType{
		Labels: taskMetricLabels,

		ActiveGauge:     promauto.NewGauge(defTaskWorkerActiveGaugeOpts),
		WorkingGauge:    promauto.NewGauge(defTaskWorkerWorkingGaugeOpts),
		DequeueingGauge: promauto.NewGauge(defTaskWorkerDequeueingGaugeOpts),

		ProcessingDuration: promauto.NewHistogram(defTaskWorkerProcessingDurationOpts),
		DequeueingDuration: promauto.NewHistogram(defTaskWorkerDequeueingDurationOpts),

		ProcessedCounter:        promauto.NewCounterVec(defTaskWorkerProcessedCounterOpts, taskMetricLabels),
		DequeueErrorCounter:     promauto.NewCounter(defTaskWorkerDequeueErrorsCounterOpts),
		ProcessingErrorsCounter: promauto.NewCounterVec(defTaskWorkerProcessingErrorsCounterOpts, taskMetricLabels),
		ErrorsCounter:           promauto.NewCounter(defTaskWorkerErrorsCounterOpts),
	}

	// SchedulerMetrics is the global metrics instance for the scheduler of this instance
	SchedulerMetrics = SchedulerMetricsType{
		Labels: queueMetricLabels,
		ScheduleCounter: promauto.NewCounterVec(
			defScheduleCounterOpts,
			taskMetricLabels,
		),
		ErrorCounter: promauto.NewCounterVec(
			defSchedulerErrorCounterOpts,
			taskMetricLabels,
		),
	}

	// ScheduleWorkerMetrics is the global metrics instance for the schedule worker of this instance
	ScheduleWorkerMetrics = ScheduleWorkerMetricsType{
		WorkerMetricsType: WorkerMetricsType{
			Labels: taskMetricLabels,

			ActiveGauge:  promauto.NewGauge(defScheduleWorkerActiveGaugeOpts),
			WorkingGauge: promauto.NewGauge(defScheduleWorkerWorkingGaugeOpts),

			DequeueingGauge: promauto.NewGauge(defScheduleWorkerDequeueingGaugeOpts),

			ProcessingDuration: promauto.NewHistogram(defScheduleWorkerProcessingDurationOpts),
			DequeueingDuration: promauto.NewHistogram(defScheduleWorkerDequeueingDurationOpts),

			ProcessedCounter:        promauto.NewCounterVec(defScheduleWorkerProcessedCounterOpts, taskMetricLabels),
			DequeueErrorCounter:     promauto.NewCounter(defScheduleWorkerDequeueErrorsCounterOpts),
			ProcessingErrorsCounter: promauto.NewCounterVec(defScheduleWorkerProcessingErrorsCounterOpts, taskMetricLabels),
			ErrorsCounter:           promauto.NewCounter(defScheduleWorkerErrorsCounterOpts),
		},

		WaitingGauge: promauto.NewGauge(defScheduleWorkerWaitingGaugeOpts),
	}
)
View Source
var (
	// ErrTaskCancelled is used to notify the worker from the Heartbeat that the task
	// was canceled by the user and it needs to stop working on it.
	ErrTaskCancelled = errors.New("task has been canceled")
	// ErrTaskFinished is used to notify the worker from the Heartbeat that the task has
	// already finished and can't be worked on
	ErrTaskFinished = errors.New("task has finished")
	// ErrTaskNotRunning is used to notify the worker from the Heartbeat that the task has
	// not started yet and can't be worked on
	ErrTaskNotRunning = errors.New("task has not started yet")
	// ErrTaskNotFound is used to notify the worker from the Heartbeat that the task does not
	// exist
	ErrTaskNotFound = errors.New("task not found")
	// ErrTaskFailed indicates that a task has failed and should not be restarted
	ErrTaskFailed = errors.New("task failed")
	// ErrTaskQueueNotSpecified indicates that the given task cannot be enqueued because
	// the queue name is empty
	ErrTaskQueueNotSpecified = errors.New("task queue name cannot be blank")
	// ErrTaskTypeNotSpecified indicates that the given task cannot be enqueued because
	// the task type is empty
	ErrTaskTypeNotSpecified = errors.New("task type cannot be blank")
)
View Source
var ErrNotScheduled = errors.New("Task not currently scheduled")

ErrNotScheduled indicates the current scheduled task is has not been created yet. This should be returned from the EnsureSchedule method

Functions

func SwitchMetricsServiceName

func SwitchMetricsServiceName(serviceName string)

SwitchMetricsServiceName changes the service label used in the metrics, so it can be customized with a different name

Types

type Dequeuer

type Dequeuer interface {
	// Dequeue removes a task for processing from the provided queue
	// It's expected to block until work is available
	//
	// For zero-length queue parameter, dequeue should return a task from a random queue;
	// otherwise dequeue should return a task from the supplied list of queues.
	Dequeue(ctx context.Context, queue ...string) (*Task, error)

	// Heartbeat updates the task to ensure that it's still being processed by a worker.
	//
	// Once a task has begun processing, 'Heartbeat' should be called at known
	// intervals to update the 'Progress' and 'LastHeartbeat' fields.
	// This provides a way to determine if the worker is still processing.
	Heartbeat(ctx context.Context, taskID string, progress Progress) error

	// Finish marks the task as completed
	// It's recommended to update the progress to a final state to ensure there is
	// no ambiguity in whether or not the task is complete
	Finish(ctx context.Context, taskID string, progress Progress) error

	// Finish marks the task as failed
	// It's recommended to update the progress to a final state.
	Fail(ctx context.Context, taskID string, progress Progress) error
}

Dequeuer is a read only interface for the queue

func DequeuerWithMetrics

func DequeuerWithMetrics(q Dequeuer) Dequeuer

DequeuerWithMetrics returns q wrapped with the standard metrics implementation

type Progress

type Progress []byte

Progress is a serialized status object that indicates the status of the task For example, it can be an object that contains the amount of processed bytes. The actual underlying type depends on the task type and consumers must serialize the value.

type QueueMock

type QueueMock struct {
	Queue chan Task

	EnqueueErr   error
	DequeueErr   error
	HeartbeatErr error
	FinishErr    error
	FailErr      error

	EnqueueCount   int
	DequeueCount   int
	HeartbeatCount int
	FinishCount    int
	FailCount      int
}

QueueMock implements queue with very simple counters for testing

func (*QueueMock) Dequeue

func (q *QueueMock) Dequeue(ctx context.Context, queue ...string) (*Task, error)

Dequeue implements queue Manager for testing

func (*QueueMock) Enqueue

func (q *QueueMock) Enqueue(ctx context.Context, task TaskEnqueueRequest) error

Enqueue implements queue Manager for testing

func (*QueueMock) Fail

func (q *QueueMock) Fail(ctx context.Context, taskID string, progress Progress) error

Fail implements queue Manager for testing

func (*QueueMock) Finish

func (q *QueueMock) Finish(ctx context.Context, taskID string, progress Progress) error

Finish implements queue Manager for testing

func (*QueueMock) Heartbeat

func (q *QueueMock) Heartbeat(ctx context.Context, taskID string, progress Progress) error

Heartbeat implements queue Manager for testing

type Queuer

type Queuer interface {
	// Enqueue adds a task to the provided queue within the Task object
	Enqueue(ctx context.Context, task TaskEnqueueRequest) error
}

Queuer is a write-only interface for the queue

func QueuerWithMetrics

func QueuerWithMetrics(q Queuer) Queuer

QueuerWithMetrics returns q wrapped with the standard metrics implementation

type References

type References map[string]interface{}

References is a dictionary of additinal SQL columns to set. Normally this dictionary contains referencies to other entities e.g. datasource_id, table_id, schedule_id, etc. So, the SQL table can have the `DELETE CASCADE` setting.

func (References) GetNamesAndValues

func (a References) GetNamesAndValues() (keys []string, values []interface{})

GetNamesAndValues returns a slice of column names and a slice of values accordingly. It's to easily use it in the SQL builder.

type ScheduleWorkerMetricsType

type ScheduleWorkerMetricsType struct {
	WorkerMetricsType
	// WaitingGauge is a gauge of waiting instances.
	// Unlike the task worker the schedule worker also waits between its iterations.
	WaitingGauge prometheus.Gauge
}

ScheduleWorkerMetricsType provides access to the prometheus metric objects for a schedule worker

type Scheduler

type Scheduler interface {
	// Schedule creates a cron schedule according to which the worker will enqueue
	// the given task.
	// `task.CronSchedule` cannot be blank.
	Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error

	// EnsureSchedule checks if a task for with the given queue, type, and references
	// currently exists. An error ErrNotScheduled is returned if a current task cannot be found.
	// implementation and validation errors may also be returned and should be checked for.
	EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)
	// AssertSchedule makes sure a schedule with the given parameters exists, if it does not
	// this function will create one.
	AssertSchedule(ctx context.Context, task TaskScheduleRequest) (err error)
}

Scheduler defines how one schedules a task

func SchedulerWithMetrics

func SchedulerWithMetrics(s Scheduler) Scheduler

SchedulerWithMetrics returns q wrapped with the standard metrics implementation

type SchedulerMetricsType

type SchedulerMetricsType struct {
	Labels          []string
	ScheduleCounter *prometheus.CounterVec
	ErrorCounter    *prometheus.CounterVec
}

TaskQueueMetricsType provides access to the prometheus metric objects for the scheduler

type SchedulerMock

type SchedulerMock struct {
	ScheduleErr   error
	ScheduleCount int

	EnsureError error
	EnsureCount int

	AssertError error
	AssertCount int
}

SchedulerMock implement scheduler with very simple counters for testing

func (*SchedulerMock) AssertSchedule

func (s *SchedulerMock) AssertSchedule(ctx context.Context, task TaskScheduleRequest) error

func (*SchedulerMock) EnsureSchedule

func (s *SchedulerMock) EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error

func (*SchedulerMock) Schedule

func (s *SchedulerMock) Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)

type Spec

type Spec []byte

Spec is is a serialized specification object used to set the task parameters. The actual underlying type depends on the task type and consumers must serialize the value.

type Task

type Task struct {
	TaskBase
	// ID is the id of the task
	ID string
	// Status is the current status of the task
	Status TaskStatus
	// Progress is used for workers to update the status of the task.
	// For example, bytes processed.
	// The actual underlying type is specific for each task type.
	Progress Progress
	// CreatedAt is when the task was initially put in the queue
	CreatedAt time.Time
	// CreatedAt is when the task was initially put in the queue
	UpdatedAt time.Time
	// StartedAt is when a worker picked up the task
	StartedAt *time.Time
	// FinishedAt is when the task was finished being processed
	FinishedAt *time.Time
	// LastHeartbeat provides a way to ensure the task is still being processed and hasn't failed.
	LastHeartbeatAt *time.Time
}

Task represents a task in the queue

type TaskBase

type TaskBase struct {
	// Queue is the queue to which the task belongs
	Queue string
	// Type holds a task type identifier
	Type TaskType
	// Spec contains the task specification based on type of the task.
	Spec Spec
}

TaskBase contains basic fields for a task

type TaskEnqueueRequest

type TaskEnqueueRequest struct {
	TaskBase
	// References contain names and values for additinal
	// SQL columns to set external references for a task for easy clean up
	References References
}

TaskEnqueueRequest contains fields required for adding a task to the queue

type TaskHandler

type TaskHandler interface {
	// Process implements the specific Task parsing logic
	Process(ctx context.Context, task Task, heartbeats chan<- Progress) (err error)
}

TaskHandler is a type alias for a method that parses a task and returns any processing errors

type TaskHandlerFunc

type TaskHandlerFunc func(context.Context, Task, chan<- Progress) error

TaskHandlerFunc is an adapter that allows the use of a normal function as a TaskHandler

func (TaskHandlerFunc) Process

func (f TaskHandlerFunc) Process(ctx context.Context, task Task, heartbeats chan<- Progress) error

Process implements the specific Task parsing logic

type TaskQueueMetricsType

type TaskQueueMetricsType struct {
	Labels          []string
	TaskCounter     *prometheus.CounterVec
	EnqueueDuration *prometheus.HistogramVec
}

TaskQueueMetricsType provides access to the prometheus metric objects for the task queue

type TaskScheduleRequest

type TaskScheduleRequest struct {
	TaskBase
	// CronSchedule is the schedule impression in cron syntax that defines
	// when the task should be executed.
	CronSchedule string
	// References contain names and values for additinal
	// SQL columns to set external references for a schedule for easy clean up
	References References
}

TaskScheduleRequest contains fields required for scheduling a task

type TaskStatus

type TaskStatus string

TaskStatus : Current state of the task

const (
	// Waiting task is waiting for being picked up
	Waiting TaskStatus = "waiting"
	// Running task is currently running
	Running TaskStatus = "running"
	// Cancelled task is canceled by the user
	// nolint: misspell // the initial spelling was with double 'l' now we have to stick with it
	Cancelled TaskStatus = "cancelled"
	// Finished task is successfully finished
	Finished TaskStatus = "finished"
	// Failed task is failed with an error
	Failed TaskStatus = "failed"
)

List of TaskStatus

type TaskType

type TaskType string

TaskType is a type which specifies what a task it is

func (TaskType) String

func (t TaskType) String() string

String returns a string value of the task type

type Worker

type Worker interface {
	// Work is responsible for getting and processing tasks
	// It should run continuously or until the context is canceled
	Work(context.Context) error
}

Worker provides methods to do some kind of work

type WorkerMetricsType

type WorkerMetricsType struct {
	// Labels available in its vector metrics
	Labels []string

	// ActiveGauge is a gauge of active instances
	ActiveGauge prometheus.Gauge
	// WorkingGauge is a gauge of working instances
	WorkingGauge prometheus.Gauge
	// DequeueingGauge is a gauge of instances that are trying to dequeue a task
	DequeueingGauge prometheus.Gauge

	// ProcessingDuration is a total duration of tasks being processed in seconds
	ProcessingDuration prometheus.Histogram
	// DequeueingDuration is a total duration spent waiting for a new task in seconds
	DequeueingDuration prometheus.Histogram

	// ProcessedCounter is a total count of processed tasks
	ProcessedCounter *prometheus.CounterVec
	// DequeueErrorCounter is a total count of dequeueing errors
	DequeueErrorCounter prometheus.Counter
	// ProcessingErrorsCounter is a total count of errors while handling the task
	ProcessingErrorsCounter *prometheus.CounterVec
	// ErrorsCounter is a total count of errors
	ErrorsCounter prometheus.Counter
}

WorkerMetricsType provides access to the prometheus metric objects for a worker

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL