queue

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2025 License: Apache-2.0 Imports: 6 Imported by: 1

README

queue

A simple distributed message queue with swappable back ends.

Documentation

Index

Constants

View Source
const ResultStatusError = "ERROR"

ResultStatusError represents a task that was experienced an error, but CAN be retried

View Source
const ResultStatusFailure = "FAILURE"

ResultStatusFailure represents a task that was experienced an error, and CANNOT be retried

View Source
const ResultStatusIgnored = "IGNORED"

ResultStatusSkip represents a task that was not processed because the consumer does not recognize it. The task will be passed to another consumer, or error out permanently.

View Source
const ResultStatusSuccess = "SUCCESS"

ResultStatusSuccess represents a task that was completed successfully

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer func(name string, args map[string]any) Result

Consumer is a function that processes a task from the queue.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(options ...QueueOption) Queue

New returns a fully initialized Queue object, with all options applied

func (*Queue) Publish

func (q *Queue) Publish(task Task) error

PublishTask adds a Task to the Queue

func (*Queue) Schedule

func (q *Queue) Schedule(task Task, delay time.Duration) error

func (*Queue) Stop

func (queue *Queue) Stop()

Stop closes the queue and stops all workers (after they complete their current task)

type QueueOption

type QueueOption func(*Queue)

QueueOption is a function that modifies a Queue object

func WithBufferSize

func WithBufferSize(bufferSize int) QueueOption

WithBufferSize sets the number of tasks to lock in a single transaction

func WithConsumers

func WithConsumers(consumers ...Consumer) QueueOption

func WithDefaultPriority added in v0.0.2

func WithDefaultPriority(defaultPriority int) QueueOption

WithDefaultPriority sets the default priority for new tasks

func WithDefaultRetryMax added in v0.0.2

func WithDefaultRetryMax(defaultRetryMax int) QueueOption

WithDefaultRetryMax sets the default number of times to retry a task before giving up

func WithPollStorage

func WithPollStorage(pollStorage bool) QueueOption

WithPollStorage sets whether the queue should poll the storage for new tasks

func WithRunImmediatePriority added in v0.2.0

func WithRunImmediatePriority(runImmediatePriority int) QueueOption

WithRunImmediatePriority sets the maximum priority level for tasks that will run immediately

func WithStorage

func WithStorage(storage Storage) QueueOption

WithStorage sets the storage and unmarshaller for the queue

func WithWorkerCount

func WithWorkerCount(workerCount int) QueueOption

WithWorkerCount sets the number of concurrent processes to run

type Result added in v0.2.0

type Result struct {
	Status string
	Error  error
}

Result is the return value from a task function

func Error added in v0.2.0

func Error(err error) Result

Error returns a Result object with a status of "ERROR"

func Failure added in v0.2.0

func Failure(err error) Result

Failure returns a Result object with a status of "HALT"

func Ignored added in v0.2.0

func Ignored() Result

Ignored returns a Result object that has been "IGNORED" This happens when a consumer does not recognize the task name

func Success added in v0.2.0

func Success() Result

Success returns a Result object with a status of "SUCCESS"

func (Result) IsSuccessful added in v0.2.0

func (result Result) IsSuccessful() bool

IsSuccessful returns TRUE if the Result is a "SUCCESS"

func (Result) NotSuccessful added in v0.2.0

func (result Result) NotSuccessful() bool

NotSuccessful returns TRUE if the Result is NOT a "SUCCESS"

type Storage

type Storage interface {

	// GetTasks retrieves a batch of Tasks from the Storage provider
	GetTasks() ([]Task, error)

	// SaveTask saves a Task to the Storage provider
	SaveTask(task Task) error

	// DeleteTask removes a Task from the Storage provider
	DeleteTask(taskID string) error

	// LogFailure writes a Task to the error log
	LogFailure(task Task) error
}

Storage is the interface for persisting Tasks outside of memory

type Task

type Task struct {
	TaskID      string    `bson:"taskId"`      // Unique identfier for this task
	LockID      string    `bson:"lockId"`      // Unique identifier for the worker that is currently processing this task
	Name        string    `bson:"name"`        // Name of the task (used to identify the handler function)
	Arguments   mapof.Any `bson:"arguments"`   // Data required to execute this task (marshalled as a map)
	CreateDate  int64     `bson:"createDate"`  // Unix epoch seconds when this task was created
	StartDate   int64     `bson:"startDate"`   // Unix epoch seconds when this task is scheduled to execute
	TimeoutDate int64     `bson:"timeoutDate"` // Unix epoch seconds when this task will "time out" and can be reclaimed by another process
	Priority    int       `bson:"priority"`    // Priority of the handler, determines the order that tasks are executed in.
	RetryCount  int       `bson:"retryCount"`  // Number of times that this task has already been retried
	RetryMax    int       `bson:"retryMax"`    // Maximum number of times that this task can be retried
	Error       string    `bson:"error"`       // Error (if any) from the last execution
}

Task wraps a Task with the metadata required to track its runs and retries.

func NewTask

func NewTask(name string, arguments map[string]any, options ...TaskOption) Task

NewTask uses a Task object to create a new Task record that can be saved to a Storage provider.

func (*Task) Delay

func (task *Task) Delay(delay time.Duration)

Delay sets the time.Duration before the task is executed

type TaskOption

type TaskOption func(*Task)

func WithDelayHours

func WithDelayHours(delayHours int) TaskOption

WithDelayHours sets the number of hours before the task is executed

func WithDelayMinutes

func WithDelayMinutes(delayMinutes int) TaskOption

WithDelayMinutes sets the number of minutes before the task is executed

func WithDelaySeconds

func WithDelaySeconds(delaySeconds int) TaskOption

WithDelaySeconds sets the number of seconds before the task is executed

func WithPriority

func WithPriority(priority int) TaskOption

WithPriority sets the priority of the task

func WithRetryMax

func WithRetryMax(retryMax int) TaskOption

WithRetryMax sets the maximum number of times that a task can be retried

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL