async

package module
v0.17.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: BSD-3-Clause Imports: 6 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnmarshalTaskEnvelop = errors.New("failed to unmarshal task envelope")
	ErrQueueIsFull          = errors.New("queue is full")
	ErrTaskNotRegistered    = errors.New("task not registered")
	ErrTaskDeadlineExceeded = errors.New("task deadline exceeded")
)
View Source
var ErrQueueAlreadyExists = errors.New("task queue already exists")

Functions

This section is empty.

Types

type Backend

type Backend interface {
	CreateQueue(ctx context.Context, name string) error
	ListQueues(ctx context.Context) ([]string, error)
	SubscribeQueue(ctx context.Context, queueID string) (<-chan TaskEnvelope, error)

	EnqueueTask(ctx context.Context, e TaskEnvelope) error
	CheckTask(ctx context.Context, queueID string, refID string) (Status, error)
	CancelTask(ctx context.Context, queueID string, refID string) error
	PendingTasks(ctx context.Context, queueID string) ([]TaskRef, error)
	CompletedTasks(ctx context.Context, queueID string) ([]TaskRef, error)
	ArchivedTasks(ctx context.Context, queueID string) ([]TaskRef, error)
}

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context represents the execution context of a task handler. It contains the underlying context.Context, a TaskEnvelope, and an enqueueFn function for task enqueueing.

func (*Context) LastTry

func (ctx *Context) LastTry() bool

LastTry shows if this is the last time this handler will be called.

func (*Context) Requeue

func (ctx *Context) Requeue(p RequeueParams) error

Requeue puts the task back into queue. It resets the internal counters.

func (*Context) Retries

func (ctx *Context) Retries() int

Retries returns the number of times this task has been retried.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(backend Backend, opt ...Option) (*Engine, error)

func (*Engine) Shutdown

func (srv *Engine) Shutdown(ctx context.Context) error

type EnqueueParams

type EnqueueParams struct {
	// contains filtered or unexported fields
}

func (EnqueueParams) Delay

Delay set how long to wait before picking up by a worker. **NOTE**: this is just the minimum delay required, the actual delay until tasks are picked up can be different based on other factors such as server load. **NOTE**: the delay will be truncated to seconds.

func (EnqueueParams) GroupKey

func (x EnqueueParams) GroupKey(key string) EnqueueParams

func (EnqueueParams) ID

func (EnqueueParams) MaxRetry

func (x EnqueueParams) MaxRetry(max int) EnqueueParams

func (EnqueueParams) NotAfter

func (x EnqueueParams) NotAfter(notAfter time.Time) EnqueueParams

NotAfter sets a time that if a task has not been started to be processed, then it will be dropped.

func (EnqueueParams) UniqueKey

func (x EnqueueParams) UniqueKey(key string) EnqueueParams

type Handler

type Handler[TD TaskDataType, TDP TaskDataTypePtr[TD]] func(ctx *Context, p TD) error

type Option

type Option func(*Engine) error

func ErrFunc

func ErrFunc(errFn func(err error)) Option

ErrFunc set the global error handler for all unhandled exceptions raised by components. Usually there is little we can do about these errors except to capture for logging purposes.

func Register

func Register(components ...component) Option

Register is a function that takes one or more tasks/queues and registers them with a server. Task and Queue implement component interface. The function returns an Option type that can be used in NewEngine builder.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func SetupQueue

func SetupQueue(
	name string,
	opt ...QueueSetupOption,
) *Queue

func (*Queue) Archived

func (q *Queue) Archived(ctx context.Context) ([]TaskRef, error)

func (*Queue) Completed

func (q *Queue) Completed(ctx context.Context) ([]TaskRef, error)

func (*Queue) ID

func (q *Queue) ID() string

func (*Queue) Pending

func (q *Queue) Pending(ctx context.Context) ([]TaskRef, error)

func (*Queue) Priority

func (q *Queue) Priority() int

func (*Queue) Workers

func (q *Queue) Workers() int

type QueueSetupOption

type QueueSetupOption func(*queueSetupParams)

func WithMaxLen

func WithMaxLen(maxLen int) QueueSetupOption

WithMaxLen indicates the maximum number of waiting tasks should not exceed `maxLen` and the Enqueue method of the task should return ErrQueueIsFull error.

func WithPriority

func WithPriority(priority int) QueueSetupOption

func WithWorkers

func WithWorkers(workers int) QueueSetupOption

type RequeueParams

type RequeueParams struct {
	// contains filtered or unexported fields
}

func (RequeueParams) Delay

Delay set how long to wait before picking up by a worker. **NOTE**: this is just the minimum delay required, the actual delay until tasks are picked up can be different based on other factors such as server load. **NOTE**: the delay will be truncated to seconds.

func (RequeueParams) GroupKey

func (x RequeueParams) GroupKey(key string) RequeueParams

func (RequeueParams) MaxRetry

func (x RequeueParams) MaxRetry(maxRetry int) RequeueParams

func (RequeueParams) NotAfter

func (x RequeueParams) NotAfter(notAfter time.Time) RequeueParams

NotAfter sets a time that if a task has not been started to be processed, then it will be dropped.

func (RequeueParams) NotRetry

func (x RequeueParams) NotRetry() RequeueParams

NotRetry makes sure that the Retries does not increase for this. It could be useful in some cases like rate-limit backoff handling, or recurring tasks.

func (RequeueParams) UniqueKey

func (x RequeueParams) UniqueKey(key string) RequeueParams

type Status

type Status int
const (
	StatusPending Status = iota
	StatusRunning
	StatusSuccess
	StatusFailed
)

func (Status) String

func (s Status) String() string

type Task

type Task[TD TaskDataType, TDP TaskDataTypePtr[TD]] struct {
	// contains filtered or unexported fields
}

func SetupTask

func SetupTask[TD TaskDataType, TDP TaskDataTypePtr[TD]](
	name string,
	h Handler[TD, TDP],
	opts ...TaskSetupOption,
) *Task[TD, TDP]

func (*Task[TD, TDP]) Enqueue

func (t *Task[TD, TDP]) Enqueue(
	ctx context.Context,
	td TD, queueID string,
	p EnqueueParams,
) (*TaskRef, error)

Enqueue put the task into the queue, and it will be picked up by one of the workers soon.

func (*Task[TD, TDP]) Name

func (t *Task[TD, TDP]) Name() string

type TaskDataType

type TaskDataType interface {
	encoding.BinaryMarshaler
}

type TaskDataTypePtr

type TaskDataTypePtr[DT TaskDataType] interface {
	*DT

	encoding.BinaryUnmarshaler
}

type TaskEnvelope

type TaskEnvelope struct {
	ID        string `json:"id"`
	TaskName  string `json:"taskName"`
	QueueID   string `json:"queueID"`
	Payload   []byte `json:"payload"`
	Submitter string `json:"submitter"`
	// MaxRetry is the max number of retries for this task.
	MaxRetry int `json:"maxRetry"`
	// Retried is the number of times we've retried this task so far.
	Retried int `json:"retried"`
	// ErrorMsg holds the error message from the last failure.
	ErrorMsg string `json:"errorMsg"`
	// UniqueKey holds the redis key used for uniqueness lock for this task.
	//
	// Empty string indicates that no uniqueness lock was used.
	UniqueKey string `json:"uniqueKey"`
	// GroupKey holds the group key used for task aggregation.
	//
	// Empty string indicates no aggregation is used for this task.
	GroupKey string `json:"groupKey"`
	// Retention specifies the number of seconds the task should be retained after completion.
	Retention int64 `json:"retention"`
	// SubmitAt the time of the task has been submitted to backend
	// the number of seconds elapsed since January 1, 1970 UTC.
	SubmitAt int64 `json:"submitAt"`
	// NotBeforeAt is the minimum Unix timestamp at which this task could begin being processed.
	NotBeforeAt int64 `json:"notBeforeAt"`
	// NotAfterAt is the maximum Unix timestamp at which this task could begin being processed.
	NotAfterAt int64 `json:"notAfterAt"`
	// LastFailedAt is the time of last failure in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	LastFailedAt int64 `json:"lastFailedAt"`
	// CompletedAt the time the task was processed successfully in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	CompletedAt int64 `json:"completedAt"`
}

TaskEnvelope represents a task envelope containing task's data which is serialized and will be sent and received from Backend.

type TaskRef

type TaskRef struct {
	// contains filtered or unexported fields
}

func (*TaskRef) Cancel

func (tr *TaskRef) Cancel(ctx context.Context) error

func (*TaskRef) ID

func (tr *TaskRef) ID() string

func (*TaskRef) Status

func (tr *TaskRef) Status(ctx context.Context) (Status, error)

func (*TaskRef) TaskName

func (tr *TaskRef) TaskName() string

type TaskSetupOption

type TaskSetupOption func(*taskSetupParams)

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL