Documentation ¶
Index ¶
- Variables
- func SwitchMetricsServiceName(serviceName string)
- type Dequeuer
- type Progress
- type QueueMock
- func (q *QueueMock) Dequeue(ctx context.Context, queue ...string) (*Task, error)
- func (q *QueueMock) Enqueue(ctx context.Context, task TaskEnqueueRequest) error
- func (q *QueueMock) Fail(ctx context.Context, taskID string, progress Progress) error
- func (q *QueueMock) Finish(ctx context.Context, taskID string, progress Progress) error
- func (q *QueueMock) Heartbeat(ctx context.Context, taskID string, progress Progress) error
- type Queuer
- type References
- type Scheduler
- type SchedulerMetricsType
- type SchedulerMock
- func (s *SchedulerMock) AssertSchedule(ctx context.Context, task TaskScheduleRequest) error
- func (s *SchedulerMock) EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error
- func (s *SchedulerMock) Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)
- type Spec
- type Task
- type TaskBase
- type TaskEnqueueRequest
- type TaskQueueMetricsType
- type TaskScheduleRequest
- type TaskStatus
- type TaskType
Constants ¶
This section is empty.
Variables ¶
var ( // TaskQueueMetrics is the global metrics instance for the task queue of this instance TaskQueueMetrics = TaskQueueMetricsType{ Labels: queueMetricLabels, TaskCounter: promauto.NewCounterVec( defTaskCounterOpts, taskMetricLabels, ), EnqueueDuration: promauto.NewHistogramVec( defEnqueueDurationOpts, queueMetricLabels, ), } // SchedulerMetrics is the global metrics instance for the scheduler of this instance SchedulerMetrics = SchedulerMetricsType{ Labels: queueMetricLabels, ScheduleCounter: promauto.NewCounterVec( defScheduleCounterOpts, taskMetricLabels, ), ErrorCounter: promauto.NewCounterVec( defSchedulerErrorCounterOpts, taskMetricLabels, ), } )
var ( // ErrTaskCancelled is used to notify the worker from the Heartbeat that the task // was cancelled by the user and it needs to stop working on it. ErrTaskCancelled = errors.New("task has been cancelled") // ErrTaskFinished is used to notify the worker from the Heartbeat that the task has // already finished and can't be worked on ErrTaskFinished = errors.New("task has finished") // ErrTaskNotRunning is used to notify the worker from the Heartbeat that the task has // not started yet and can't be worked on ErrTaskNotRunning = errors.New("task has not started yet") // ErrTaskNotFound is used to notify the worker from the Heartbeat that the task does not // exist ErrTaskNotFound = errors.New("task not found") // ErrTaskFailed indicates that a task has failed and should not be restarted ErrTaskFailed = errors.New("task failed") // ErrTaskQueueNotSpecified indicates that the given task cannot be enqueued because // the queue name is empty ErrTaskQueueNotSpecified = errors.New("task queue name cannot be blank") // ErrTaskTypeNotSpecified indicates that the given task cannot be enqueued because // the task type is empty ErrTaskTypeNotSpecified = errors.New("task type cannot be blank") )
var ErrNotScheduled = errors.New("Task not currently scheduled")
ErrNoteSchedule indicates the current scheduled task is has not been created yet. This should be returned from the EnsureSchedule method
Functions ¶
func SwitchMetricsServiceName ¶ added in v1.7.0
func SwitchMetricsServiceName(serviceName string)
SwitchMetricsServiceName changes the service label used in the metrics, so it can be customized
Types ¶
type Dequeuer ¶
type Dequeuer interface { // Dequeue removes a task for processing from the provided queue // It's expected to block until work is available // // For zero-length queue parameter, dequeue should return a task from a random queue; // otherwise dequeue should return a task from the supplied list of queues. Dequeue(ctx context.Context, queue ...string) (*Task, error) // Heartbeat updates the task to ensure that it's still being processed by a worker. // // Once a task has begun processing, 'Heartbeat' should be called at known // intervals to update the 'Progress' and 'LastHeartbeat' fields. // This provides a way to determine if the worker is still processing. Heartbeat(ctx context.Context, taskID string, progress Progress) error // Finish marks the task as completed // It's recommended to update the progress to a final state to ensure there is // no ambiguity in whether or not the task is complete Finish(ctx context.Context, taskID string, progress Progress) error // Finish marks the task as failed // It's recommended to update the progress to a final state. Fail(ctx context.Context, taskID string, progress Progress) error }
Dequeuer is a read only interface for the queue
func DequeuerWithMetrics ¶
DequeuerWithMetrics returns q wrapped with the standard metrics implementation
type Progress ¶
type Progress []byte
Progress is a serialized status object that indicates the status of the task For example, it can be an object that contains the amount of processed bytes. The actual underlying type depends on the task type and consumers must serialize the value.
type QueueMock ¶ added in v1.8.1
type QueueMock struct { Queue chan Task EnqueueErr error DequeueErr error HeartbeatErr error FinishErr error FailErr error EnqueueCount int DequeueCount int HeartbeatCount int FinishCount int FailCount int }
QueueMock implements queue with very simple counters for testing
func (*QueueMock) Enqueue ¶ added in v1.8.1
func (q *QueueMock) Enqueue(ctx context.Context, task TaskEnqueueRequest) error
Enqueue implements queue Manager for testing
type Queuer ¶
type Queuer interface { // Enqueue adds a task to the provided queue within the Task object Enqueue(ctx context.Context, task TaskEnqueueRequest) error }
Queuer is a write-only interface for the queue
func QueuerWithMetrics ¶
QueuerWithMetrics returns q wrapped with the standard metrics implementation
type References ¶
type References map[string]interface{}
References is a dictionary of additinal SQL columns to set. Normally this dictionary contains referencies to other entities e.g. datasource_id, table_id, schedule_id, etc. So, the SQL table can have the `DELETE CASCADE` setting.
func (References) GetNamesAndValues ¶
func (a References) GetNamesAndValues() (keys []string, values []interface{})
GetNamesAndValues returns a slice of column names and a slice of values accordingly. It's to easily use it in the SQL builder.
type Scheduler ¶
type Scheduler interface { // Schedule creates a cron schedule according to which the worker will enqueue // the given task. // `task.CronSchedule` cannot be blank. Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error // EnsureSchedule checks if a task for with the given queue, type, and references // currently exists. An error ErrNotScheduled is returned if a current task cannot be found. // implementation and validation errors may also be returned and should be checked for. EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error) // AssertSchedule makes sure a schedule with the given parameters exists, if it does not // this function will create one. AssertSchedule(ctx context.Context, task TaskScheduleRequest) (err error) }
Scheduler defines how one schedules a task
func SchedulerWithMetrics ¶
SchedulerWithMetrics returns q wrapped with the standard metrics implementation
type SchedulerMetricsType ¶
type SchedulerMetricsType struct { Labels []string ScheduleCounter *prometheus.CounterVec ErrorCounter *prometheus.CounterVec }
TaskQueueMetricsType provides access to the prometheus metric objects for the scheduler
type SchedulerMock ¶ added in v1.8.1
type SchedulerMock struct { ScheduleErr error ScheduleCount int EnsureError error EnsureCount int AssertError error AssertCount int }
SchedulerMock implement scheduler with very simple counters for testing
func (*SchedulerMock) AssertSchedule ¶ added in v1.8.1
func (s *SchedulerMock) AssertSchedule(ctx context.Context, task TaskScheduleRequest) error
func (*SchedulerMock) EnsureSchedule ¶ added in v1.8.1
func (s *SchedulerMock) EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error
func (*SchedulerMock) Schedule ¶ added in v1.8.1
func (s *SchedulerMock) Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)
type Spec ¶
type Spec []byte
Spec is is a serialized specification object used to set the task parameters. The actual underlying type depends on the task type and consumers must serialize the value.
type Task ¶
type Task struct { TaskBase // ID is the id of the task ID string // Status is the current status of the task Status TaskStatus // Progress is used for workers to update the status of the task. // For example, bytes processed. // The actual underlying type is specific for each task type. Progress Progress // CreatedAt is when the task was initially put in the queue CreatedAt time.Time // CreatedAt is when the task was initially put in the queue UpdatedAt time.Time // StartedAt is when a worker picked up the task StartedAt *time.Time // FinishedAt is when the task was finished being processed FinishedAt *time.Time // LastHeartbeat provides a way to ensure the task is still being processed and hasn't failed. LastHeartbeatAt *time.Time }
Task represents a task in the queue
type TaskBase ¶
type TaskBase struct { // Queue is the queue to which the task belongs Queue string // Type holds a task type identifier Type TaskType // Spec contains the task specification based on type of the task. Spec Spec }
TaskBase contains basic fields for a task
type TaskEnqueueRequest ¶
type TaskEnqueueRequest struct { TaskBase // References contain names and values for additinal // SQL columns to set external references for a task for easy clean up References References }
TaskEnqueueRequest contains fields required for adding a task to the queue
type TaskQueueMetricsType ¶
type TaskQueueMetricsType struct { Labels []string TaskCounter *prometheus.CounterVec EnqueueDuration *prometheus.HistogramVec }
TaskQueueMetricsType provides access to the prometheus metric objects for the task queue
type TaskScheduleRequest ¶
type TaskScheduleRequest struct { TaskBase // CronSchedule is the schedule impression in cron syntax that defines // when the task should be executed. CronSchedule string // References contain names and values for additinal // SQL columns to set external references for a schedule for easy clean up References References }
TaskScheduleRequest contains fields required for scheduling a task
type TaskStatus ¶
type TaskStatus string
TaskStatus : Current state of the task
const ( // Waiting task is waiting for being picked up Waiting TaskStatus = "waiting" // Running task is currently running Running TaskStatus = "running" // Cancelled task is cancelled by the user Cancelled TaskStatus = "cancelled" // Finished task is successfully finished Finished TaskStatus = "finished" // Failed task is failed with an error Failed TaskStatus = "failed" )
List of TaskStatus