queue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2024 License: Apache-2.0 Imports: 6 Imported by: 1

README

queue

A simple distributed message queue with swappable back ends.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer func(name string, args map[string]any) (bool, error)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func New

func New(options ...QueueOption) Queue

New returns a fully initialized Queue object, with all options applied

func (*Queue) Publish

func (q *Queue) Publish(task Task) error

PublishTask adds a Task to the Queue

func (*Queue) Schedule

func (q *Queue) Schedule(task Task, delay time.Duration) error

func (*Queue) Stop

func (queue *Queue) Stop()

Stop closes the queue and stops all workers (after they complete their current task)

type QueueOption

type QueueOption func(*Queue)

QueueOption is a function that modifies a Queue object

func WithBufferSize

func WithBufferSize(bufferSize int) QueueOption

WithBufferSize sets the number of tasks to lock in a single transaction

func WithConsumers

func WithConsumers(consumers ...Consumer) QueueOption

func WithDefaultPriority added in v0.0.2

func WithDefaultPriority(defaultPriority int) QueueOption

WithDefaultPriority sets the default priority for new tasks

func WithDefaultRetryMax added in v0.0.2

func WithDefaultRetryMax(defaultRetryMax int) QueueOption

WithDefaultRetryMax sets the default number of times to retry a task before giving up

func WithPollStorage

func WithPollStorage(pollStorage bool) QueueOption

WithPollStorage sets whether the queue should poll the storage for new tasks

func WithStorage

func WithStorage(storage Storage) QueueOption

WithStorage sets the storage and unmarshaller for the queue

func WithWorkerCount

func WithWorkerCount(workerCount int) QueueOption

WithWorkerCount sets the number of concurrent processes to run

type Storage

type Storage interface {

	// GetTasks retrieves a batch of Tasks from the Storage provider
	GetTasks() ([]Task, error)

	// SaveTask saves a Task to the Storage provider
	SaveTask(task Task) error

	// DeleteTask removes a Task from the Storage provider
	DeleteTask(taskID string) error

	// LogFailure writes a Task to the error log
	LogFailure(task Task) error
}

Storage is the interface for persisting Tasks outside of memory

type Task

type Task struct {
	TaskID      string    `bson:"taskId"`      // Unique identfier for this task
	LockID      string    `bson:"lockId"`      // Unique identifier for the worker that is currently processing this task
	Name        string    `bson:"name"`        // Name of the task (used to identify the handler function)
	Arguments   mapof.Any `bson:"arguments"`   // Data required to execute this task (marshalled as a map)
	CreateDate  int64     `bson:"createDate"`  // Unix epoch seconds when this task was created
	StartDate   int64     `bson:"startDate"`   // Unix epoch seconds when this task is scheduled to execute
	TimeoutDate int64     `bson:"timeoutDate"` // Unix epoch seconds when this task will "time out" and can be reclaimed by another process
	Priority    int       `bson:"priority"`    // Priority of the handler, determines the order that tasks are executed in.
	RetryCount  int       `bson:"retryCount"`  // Number of times that this task has already been retried
	RetryMax    int       `bson:"retryMax"`    // Maximum number of times that this task can be retried
	Error       string    `bson:"error"`       // Error (if any) from the last execution
}

Task wraps a Task with the metadata required to track its runs and retries.

func NewTask

func NewTask(name string, arguments map[string]any, options ...TaskOption) Task

NewTask uses a Task object to create a new Task record that can be saved to a Storage provider.

func (*Task) Delay

func (task *Task) Delay(delay time.Duration)

Delay sets the time.Duration before the task is executed

type TaskOption

type TaskOption func(*Task)

func WithDelayHours

func WithDelayHours(delayHours int) TaskOption

WithDelayHours sets the number of hours before the task is executed

func WithDelayMinutes

func WithDelayMinutes(delayMinutes int) TaskOption

WithDelayMinutes sets the number of minutes before the task is executed

func WithDelaySeconds

func WithDelaySeconds(delaySeconds int) TaskOption

WithDelaySeconds sets the number of seconds before the task is executed

func WithPriority

func WithPriority(priority int) TaskOption

WithPriority sets the priority of the task

func WithRetryMax

func WithRetryMax(retryMax int) TaskOption

WithRetryMax sets the maximum number of times that a task can be retried

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL