Documentation
¶
Index ¶
- Variables
- func SetupSchedulerMetrics(namespace string)
- func SetupTaskQueueMetrics(namespace string)
- type Dequeuer
- type Progress
- type Queuer
- type References
- type Scheduler
- type SchedulerMetricsType
- type Spec
- type Task
- type TaskBase
- type TaskEnqueueRequest
- type TaskQueueMetricsType
- type TaskScheduleRequest
- type TaskStatus
- type TaskType
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTaskCancelled is used to notify the worker from the Heartbeat that the task // was cancelled by the user and it needs to stop working on it. ErrTaskCancelled = errors.New("task has been cancelled") // ErrTaskFinished is used to notify the worker from the Heartbeat that the task has // already finished and can't be worked on ErrTaskFinished = errors.New("task has finished") // ErrTaskNotRunning is used to notify the worker from the Heartbeat that the task has // not started yet and can't be worked on ErrTaskNotRunning = errors.New("task has not started yet") // ErrTaskNotFound is used to notify the worker from the Heartbeat that the task does not // exist ErrTaskNotFound = errors.New("task not found") // ErrTaskFailed indicates that a task has failed and should not be restarted ErrTaskFailed = errors.New("task failed") // ErrTaskQueueNotSpecified indicates that the given task cannot be enqueued because // the queue name is empty ErrTaskQueueNotSpecified = errors.New("task queue name cannot be blank") // ErrTaskTypeNotSpecified indicates that the given task cannot be enqueued because // the task type is empty ErrTaskTypeNotSpecified = errors.New("task type cannot be blank") )
var ErrNotScheduled = errors.New("Task not currently scheduled")
ErrNoteSchedule indicates the current scheduled task is has not been created yet. This should be returned from the EnsureSchedule method
Functions ¶
func SetupSchedulerMetrics ¶
func SetupSchedulerMetrics(namespace string)
SetupSchedulerMetrics must be called before any other call to the metric subsystem happens
func SetupTaskQueueMetrics ¶
func SetupTaskQueueMetrics(namespace string)
SetupTaskQueueMetrics must be called before any other call to the metric subsystem happens
Types ¶
type Dequeuer ¶
type Dequeuer interface { // Dequeue removes a task for processing from the provided queue // It's expected to block until work is available // // For zero-length queue parameter, dequeue should return a task from a random queue; // otherwise dequeue should return a task from the supplied list of queues. Dequeue(ctx context.Context, queue ...string) (*Task, error) // Heartbeat updates the task to ensure that it's still being processed by a worker. // // Once a task has begun processing, 'Heartbeat' should be called at known // intervals to update the 'Progress' and 'LastHeartbeat' fields. // This provides a way to determine if the worker is still processing. Heartbeat(ctx context.Context, taskID string, progress Progress) error // Finish marks the task as completed // It's recommended to update the progress to a final state to ensure there is // no ambiguity in whether or not the task is complete Finish(ctx context.Context, taskID string, progress Progress) error // Finish marks the task as failed // It's recommended to update the progress to a final state. Fail(ctx context.Context, taskID string, progress Progress) error }
Dequeuer is a read only interface for the queue
func DequeuerWithMetrics ¶
DequeuerWithMetrics returns q wrapped with the standard metrics implementation
type Progress ¶
type Progress []byte
Progress is a serialized status object that indicates the status of the task For example, it can be an object that contains the amount of processed bytes. The actual underlying type depends on the task type and consumers must serialize the value.
type Queuer ¶
type Queuer interface { // Enqueue adds a task to the provided queue within the Task object Enqueue(ctx context.Context, task TaskEnqueueRequest) error }
Queuer is a write-only interface for the queue
func QueuerWithMetrics ¶
QueuerWithMetrics returns q wrapped with the standard metrics implementation
type References ¶
type References map[string]interface{}
References is a dictionary of additinal SQL columns to set. Normally this dictionary contains referencies to other entities e.g. datasource_id, table_id, schedule_id, etc. So, the SQL table can have the `DELETE CASCADE` setting.
func (References) GetNamesAndValues ¶
func (a References) GetNamesAndValues() (keys []string, values []interface{})
GetNamesAndValues returns a slice of column names and a slice of values accordingly. It's to easily use it in the SQL builder.
type Scheduler ¶
type Scheduler interface { // Schedule creates a cron schedule according to which the worker will enqueue // the given task. // `task.CronSchedule` cannot be blank. Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error // EnsureSchedule checks if a task for with the given queue, type, and references // currently exists. An error ErrNotScheduled is returned if a current task cannot be found. // implementation and validation errors may also be returned and should be checked for. EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error) }
Scheduler defines how one schedules a task
func SchedulerWithMetrics ¶
SchedulerWithMetrics returns q wrapped with the standard metrics implementation
type SchedulerMetricsType ¶
type SchedulerMetricsType struct { Labels []string ScheduleCounter *prometheus.CounterVec ErrorCounter *prometheus.CounterVec }
TaskQueueMetricsType provides access to the prometheus metric objects for the scheduler
var SchedulerMetrics SchedulerMetricsType
SchedulerMetrics is the global metrics instance for the scheduler of this instance
type Spec ¶
type Spec []byte
Spec is is a serialized specification object used to set the task parameters. The actual underlying type depends on the task type and consumers must serialize the value.
type Task ¶
type Task struct { TaskBase // ID is the id of the task ID string // Status is the current status of the task Status TaskStatus // Progress is used for workers to update the status of the task. // For example, bytes processed. // The actual underlying type is specific for each task type. Progress Progress // CreatedAt is when the task was initially put in the queue CreatedAt time.Time // CreatedAt is when the task was initially put in the queue UpdatedAt time.Time // StartedAt is when a worker picked up the task StartedAt *time.Time // FinishedAt is when the task was finished being processed FinishedAt *time.Time // LastHeartbeat provides a way to ensure the task is still being processed and hasn't failed. LastHeartbeatAt *time.Time }
Task represents a task in the queue
type TaskBase ¶
type TaskBase struct { // Queue is the queue to which the task belongs Queue string // Type holds a task type identifier Type TaskType // Spec contains the task specification based on type of the task. Spec Spec }
TaskBase contains basic fields for a task
type TaskEnqueueRequest ¶
type TaskEnqueueRequest struct { TaskBase // References contain names and values for additinal // SQL columns to set external references for a task for easy clean up References References }
TaskEnqueueRequest contains fields required for adding a task to the queue
type TaskQueueMetricsType ¶
type TaskQueueMetricsType struct { Labels []string TaskCounter *prometheus.CounterVec EnqueueDuration *prometheus.HistogramVec }
TaskQueueMetricsType provides access to the prometheus metric objects for the task queue
var TaskQueueMetrics TaskQueueMetricsType
TaskQueueMetrics is the global metrics instance for the task queue of this instance
type TaskScheduleRequest ¶
type TaskScheduleRequest struct { TaskBase // CronSchedule is the schedule impression in cron syntax that defines // when the task should be executed. CronSchedule string // References contain names and values for additinal // SQL columns to set external references for a schedule for easy clean up References References }
TaskScheduleRequest contains fields required for scheduling a task
type TaskStatus ¶
type TaskStatus string
TaskStatus : Current state of the task
const ( // Waiting task is waiting for being picked up Waiting TaskStatus = "waiting" // Running task is currently running Running TaskStatus = "running" // Cancelled task is cancelled by the user Cancelled TaskStatus = "cancelled" // Finished task is successfully finished Finished TaskStatus = "finished" // Failed task is failed with an error Failed TaskStatus = "failed" )
List of TaskStatus