Documentation ¶
Index ¶
- Variables
- type Backend
- type Context
- type Engine
- type EnqueueParams
- func (x EnqueueParams) Delay(d time.Duration) EnqueueParams
- func (x EnqueueParams) GroupKey(key string) EnqueueParams
- func (x EnqueueParams) ID(id string) EnqueueParams
- func (x EnqueueParams) MaxRetry(max int) EnqueueParams
- func (x EnqueueParams) NotAfter(notAfter time.Time) EnqueueParams
- func (x EnqueueParams) UniqueKey(key string) EnqueueParams
- type Handler
- type Option
- type Queue
- type QueueSetupOption
- type RequeueParams
- func (x RequeueParams) Delay(d time.Duration) RequeueParams
- func (x RequeueParams) GroupKey(key string) RequeueParams
- func (x RequeueParams) MaxRetry(maxRetry int) RequeueParams
- func (x RequeueParams) NotAfter(notAfter time.Time) RequeueParams
- func (x RequeueParams) NotRetry() RequeueParams
- func (x RequeueParams) UniqueKey(key string) RequeueParams
- type Status
- type Task
- type TaskDataType
- type TaskDataTypePtr
- type TaskEnvelope
- type TaskRef
- type TaskSetupOption
Constants ¶
This section is empty.
Variables ¶
var ( ErrUnmarshalTaskEnvelop = errors.New("failed to unmarshal task envelope") ErrQueueIsFull = errors.New("queue is full") ErrTaskNotRegistered = errors.New("task not registered") ErrTaskDeadlineExceeded = errors.New("task deadline exceeded") )
var ErrQueueAlreadyExists = errors.New("task queue already exists")
Functions ¶
This section is empty.
Types ¶
type Backend ¶
type Backend interface { CreateQueue(ctx context.Context, name string) error ListQueues(ctx context.Context) ([]string, error) SubscribeQueue(ctx context.Context, queueID string) (<-chan TaskEnvelope, error) EnqueueTask(ctx context.Context, e TaskEnvelope) error CheckTask(ctx context.Context, queueID string, refID string) (Status, error) CancelTask(ctx context.Context, queueID string, refID string) error PendingTasks(ctx context.Context, queueID string) ([]TaskRef, error) CompletedTasks(ctx context.Context, queueID string) ([]TaskRef, error) ArchivedTasks(ctx context.Context, queueID string) ([]TaskRef, error) }
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context represents the execution context of a task handler. It contains the underlying context.Context, a TaskEnvelope, and an enqueueFn function for task enqueueing.
func (*Context) Requeue ¶
func (ctx *Context) Requeue(p RequeueParams) error
Requeue puts the task back into queue. It resets the internal counters.
type EnqueueParams ¶
type EnqueueParams struct {
// contains filtered or unexported fields
}
func (EnqueueParams) Delay ¶
func (x EnqueueParams) Delay(d time.Duration) EnqueueParams
Delay set how long to wait before picking up by a worker. **NOTE**: this is just the minimum delay required, the actual delay until tasks are picked up can be different based on other factors such as server load. **NOTE**: the delay will be truncated to seconds.
func (EnqueueParams) GroupKey ¶
func (x EnqueueParams) GroupKey(key string) EnqueueParams
func (EnqueueParams) ID ¶
func (x EnqueueParams) ID(id string) EnqueueParams
func (EnqueueParams) MaxRetry ¶
func (x EnqueueParams) MaxRetry(max int) EnqueueParams
func (EnqueueParams) NotAfter ¶
func (x EnqueueParams) NotAfter(notAfter time.Time) EnqueueParams
NotAfter sets a time that if a task has not been started to be processed, then it will be dropped.
func (EnqueueParams) UniqueKey ¶
func (x EnqueueParams) UniqueKey(key string) EnqueueParams
type Handler ¶
type Handler[TD TaskDataType, TDP TaskDataTypePtr[TD]] func(ctx *Context, p TD) error
type Option ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func SetupQueue ¶
func SetupQueue( name string, opt ...QueueSetupOption, ) *Queue
type QueueSetupOption ¶
type QueueSetupOption func(*queueSetupParams)
func WithMaxLen ¶
func WithMaxLen(maxLen int) QueueSetupOption
WithMaxLen indicates the maximum number of waiting tasks should not exceed `maxLen` and the Enqueue method of the task should return ErrQueueIsFull error.
func WithPriority ¶
func WithPriority(priority int) QueueSetupOption
func WithWorkers ¶
func WithWorkers(workers int) QueueSetupOption
type RequeueParams ¶
type RequeueParams struct {
// contains filtered or unexported fields
}
func (RequeueParams) Delay ¶
func (x RequeueParams) Delay(d time.Duration) RequeueParams
Delay set how long to wait before picking up by a worker. **NOTE**: this is just the minimum delay required, the actual delay until tasks are picked up can be different based on other factors such as server load. **NOTE**: the delay will be truncated to seconds.
func (RequeueParams) GroupKey ¶
func (x RequeueParams) GroupKey(key string) RequeueParams
func (RequeueParams) MaxRetry ¶
func (x RequeueParams) MaxRetry(maxRetry int) RequeueParams
func (RequeueParams) NotAfter ¶
func (x RequeueParams) NotAfter(notAfter time.Time) RequeueParams
NotAfter sets a time that if a task has not been started to be processed, then it will be dropped.
func (RequeueParams) NotRetry ¶
func (x RequeueParams) NotRetry() RequeueParams
NotRetry makes sure that the Retries does not increase for this. It could be useful in some cases like rate-limit backoff handling, or recurring tasks.
func (RequeueParams) UniqueKey ¶
func (x RequeueParams) UniqueKey(key string) RequeueParams
type Task ¶
type Task[TD TaskDataType, TDP TaskDataTypePtr[TD]] struct { // contains filtered or unexported fields }
func SetupTask ¶
func SetupTask[TD TaskDataType, TDP TaskDataTypePtr[TD]]( name string, h Handler[TD, TDP], opts ...TaskSetupOption, ) *Task[TD, TDP]
type TaskDataType ¶
type TaskDataType interface { encoding.BinaryMarshaler }
type TaskDataTypePtr ¶
type TaskDataTypePtr[DT TaskDataType] interface { *DT encoding.BinaryUnmarshaler }
type TaskEnvelope ¶
type TaskEnvelope struct { ID string `json:"id"` TaskName string `json:"taskName"` QueueID string `json:"queueID"` Payload []byte `json:"payload"` Submitter string `json:"submitter"` // MaxRetry is the max number of retries for this task. MaxRetry int `json:"maxRetry"` // Retried is the number of times we've retried this task so far. Retried int `json:"retried"` // ErrorMsg holds the error message from the last failure. ErrorMsg string `json:"errorMsg"` // UniqueKey holds the redis key used for uniqueness lock for this task. // // Empty string indicates that no uniqueness lock was used. UniqueKey string `json:"uniqueKey"` // GroupKey holds the group key used for task aggregation. // // Empty string indicates no aggregation is used for this task. GroupKey string `json:"groupKey"` // Retention specifies the number of seconds the task should be retained after completion. Retention int64 `json:"retention"` // SubmitAt the time of the task has been submitted to backend // the number of seconds elapsed since January 1, 1970 UTC. SubmitAt int64 `json:"submitAt"` // NotBeforeAt is the minimum Unix timestamp at which this task could begin being processed. NotBeforeAt int64 `json:"notBeforeAt"` // NotAfterAt is the maximum Unix timestamp at which this task could begin being processed. NotAfterAt int64 `json:"notAfterAt"` // LastFailedAt is the time of last failure in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. LastFailedAt int64 `json:"lastFailedAt"` // CompletedAt the time the task was processed successfully in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. CompletedAt int64 `json:"completedAt"` }
TaskEnvelope represents a task envelope containing task's data which is serialized and will be sent and received from Backend.
type TaskSetupOption ¶
type TaskSetupOption func(*taskSetupParams)