queue

package
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2020 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrTaskCancelled is used to notify the worker from the Heartbeat that the task
	// was cancelled by the user and it needs to stop working on it.
	ErrTaskCancelled = errors.New("task has been cancelled")
	// ErrTaskFinished is used to notify the worker from the Heartbeat that the task has
	// already finished and can't be worked on
	ErrTaskFinished = errors.New("task has finished")
	// ErrTaskNotRunning is used to notify the worker from the Heartbeat that the task has
	// not started yet and can't be worked on
	ErrTaskNotRunning = errors.New("task has not started yet")
	// ErrTaskNotFound is used to notify the worker from the Heartbeat that the task does not
	// exist
	ErrTaskNotFound = errors.New("task not found")
	// ErrTaskFailed indicates that a task has failed and should not be restarted
	ErrTaskFailed = errors.New("task failed")
	// ErrTaskQueueNotSpecified indicates that the given task cannot be enqueued because
	// the queue name is empty
	ErrTaskQueueNotSpecified = errors.New("task queue name cannot be blank")
	// ErrTaskTypeNotSpecified indicates that the given task cannot be enqueued because
	// the task type is empty
	ErrTaskTypeNotSpecified = errors.New("task type cannot be blank")
)
View Source
var ErrNotScheduled = errors.New("Task not currently scheduled")

ErrNoteSchedule indicates the current scheduled task is has not been created yet. This should be returned from the EnsureSchedule method

Functions

func SetupSchedulerMetrics

func SetupSchedulerMetrics(namespace string)

SetupSchedulerMetrics must be called before any other call to the metric subsystem happens

func SetupTaskQueueMetrics

func SetupTaskQueueMetrics(namespace string)

SetupTaskQueueMetrics must be called before any other call to the metric subsystem happens

Types

type Dequeuer

type Dequeuer interface {
	// Dequeue removes a task for processing from the provided queue
	// It's expected to block until work is available
	//
	// For zero-length queue parameter, dequeue should return a task from a random queue;
	// otherwise dequeue should return a task from the supplied list of queues.
	Dequeue(ctx context.Context, queue ...string) (*Task, error)

	// Heartbeat updates the task to ensure that it's still being processed by a worker.
	//
	// Once a task has begun processing, 'Heartbeat' should be called at known
	// intervals to update the 'Progress' and 'LastHeartbeat' fields.
	// This provides a way to determine if the worker is still processing.
	Heartbeat(ctx context.Context, taskID string, progress Progress) error

	// Finish marks the task as completed
	// It's recommended to update the progress to a final state to ensure there is
	// no ambiguity in whether or not the task is complete
	Finish(ctx context.Context, taskID string, progress Progress) error

	// Finish marks the task as failed
	// It's recommended to update the progress to a final state.
	Fail(ctx context.Context, taskID string, progress Progress) error
}

Dequeuer is a read only interface for the queue

func DequeuerWithMetrics

func DequeuerWithMetrics(q Dequeuer) Dequeuer

DequeuerWithMetrics returns q wrapped with the standard metrics implementation

type Progress

type Progress []byte

Progress is a serialized status object that indicates the status of the task For example, it can be an object that contains the amount of processed bytes. The actual underlying type depends on the task type and consumers must serialize the value.

type Queuer

type Queuer interface {
	// Enqueue adds a task to the provided queue within the Task object
	Enqueue(ctx context.Context, task TaskEnqueueRequest) error
}

Queuer is a write-only interface for the queue

func QueuerWithMetrics

func QueuerWithMetrics(q Queuer) Queuer

QueuerWithMetrics returns q wrapped with the standard metrics implementation

type References

type References map[string]interface{}

References is a dictionary of additinal SQL columns to set. Normally this dictionary contains referencies to other entities e.g. datasource_id, table_id, schedule_id, etc. So, the SQL table can have the `DELETE CASCADE` setting.

func (References) GetNamesAndValues

func (a References) GetNamesAndValues() (keys []string, values []interface{})

GetNamesAndValues returns a slice of column names and a slice of values accordingly. It's to easily use it in the SQL builder.

type Scheduler

type Scheduler interface {
	// Schedule creates a cron schedule according to which the worker will enqueue
	// the given task.
	// `task.CronSchedule` cannot be blank.
	Schedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) error

	// EnsureSchedule checks if a task for with the given queue, type, and references
	// currently exists. An error ErrNotScheduled is returned if a current task cannot be found.
	// implementation and validation errors may also be returned and should be checked for.
	EnsureSchedule(ctx context.Context, builder cdb.SQLBuilder, task TaskScheduleRequest) (err error)
}

Scheduler defines how one schedules a task

func SchedulerWithMetrics

func SchedulerWithMetrics(s Scheduler) Scheduler

SchedulerWithMetrics returns q wrapped with the standard metrics implementation

type SchedulerMetricsType

type SchedulerMetricsType struct {
	Labels          []string
	ScheduleCounter *prometheus.CounterVec
	ErrorCounter    *prometheus.CounterVec
}

TaskQueueMetricsType provides access to the prometheus metric objects for the scheduler

var SchedulerMetrics SchedulerMetricsType

SchedulerMetrics is the global metrics instance for the scheduler of this instance

type Spec

type Spec []byte

Spec is is a serialized specification object used to set the task parameters. The actual underlying type depends on the task type and consumers must serialize the value.

type Task

type Task struct {
	TaskBase
	// ID is the id of the task
	ID string
	// Status is the current status of the task
	Status TaskStatus
	// Progress is used for workers to update the status of the task.
	// For example, bytes processed.
	// The actual underlying type is specific for each task type.
	Progress Progress
	// CreatedAt is when the task was initially put in the queue
	CreatedAt time.Time
	// CreatedAt is when the task was initially put in the queue
	UpdatedAt time.Time
	// StartedAt is when a worker picked up the task
	StartedAt *time.Time
	// FinishedAt is when the task was finished being processed
	FinishedAt *time.Time
	// LastHeartbeat provides a way to ensure the task is still being processed and hasn't failed.
	LastHeartbeatAt *time.Time
}

Task represents a task in the queue

type TaskBase

type TaskBase struct {
	// Queue is the queue to which the task belongs
	Queue string
	// Type holds a task type identifier
	Type TaskType
	// Spec contains the task specification based on type of the task.
	Spec Spec
}

TaskBase contains basic fields for a task

type TaskEnqueueRequest

type TaskEnqueueRequest struct {
	TaskBase
	// References contain names and values for additinal
	// SQL columns to set external references for a task for easy clean up
	References References
}

TaskEnqueueRequest contains fields required for adding a task to the queue

type TaskQueueMetricsType

type TaskQueueMetricsType struct {
	Labels          []string
	TaskCounter     *prometheus.CounterVec
	EnqueueDuration *prometheus.HistogramVec
}

TaskQueueMetricsType provides access to the prometheus metric objects for the task queue

var TaskQueueMetrics TaskQueueMetricsType

TaskQueueMetrics is the global metrics instance for the task queue of this instance

type TaskScheduleRequest

type TaskScheduleRequest struct {
	TaskBase
	// CronSchedule is the schedule impression in cron syntax that defines
	// when the task should be executed.
	CronSchedule string
	// References contain names and values for additinal
	// SQL columns to set external references for a schedule for easy clean up
	References References
}

TaskScheduleRequest contains fields required for scheduling a task

type TaskStatus

type TaskStatus string

TaskStatus : Current state of the task

const (
	// Waiting task is waiting for being picked up
	Waiting TaskStatus = "waiting"
	// Running task is currently running
	Running TaskStatus = "running"
	// Cancelled task is cancelled by the user
	Cancelled TaskStatus = "cancelled"
	// Finished task is successfully finished
	Finished TaskStatus = "finished"
	// Failed task is failed with an error
	Failed TaskStatus = "failed"
)

List of TaskStatus

type TaskType

type TaskType string

TaskType is a type which specifies what a task it is

func (TaskType) String

func (t TaskType) String() string

String returns a string value of the task type

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL