Documentation
¶
Index ¶
- Constants
- type Consumer
- type Queue
- type QueueOption
- func WithBufferSize(bufferSize int) QueueOption
- func WithConsumers(consumers ...Consumer) QueueOption
- func WithDefaultPriority(defaultPriority int) QueueOption
- func WithDefaultRetryMax(defaultRetryMax int) QueueOption
- func WithPollStorage(pollStorage bool) QueueOption
- func WithRunImmediatePriority(runImmediatePriority int) QueueOption
- func WithStorage(storage Storage) QueueOption
- func WithWorkerCount(workerCount int) QueueOption
- type Result
- type Storage
- type Task
- type TaskOption
- func WithDelayHours(delayHours int) TaskOption
- func WithDelayMinutes(delayMinutes int) TaskOption
- func WithDelaySeconds(delaySeconds int) TaskOption
- func WithPriority(priority int) TaskOption
- func WithRetryMax(retryMax int) TaskOption
- func WithSignature(signature string) TaskOption
- func WithStartTime(timestamp time.Time) TaskOption
Constants ¶
const ResultStatusError = "ERROR"
ResultStatusError represents a task that was experienced an error, but CAN be retried
const ResultStatusFailure = "FAILURE"
ResultStatusFailure represents a task that was experienced an error, and CANNOT be retried
const ResultStatusIgnored = "IGNORED"
ResultStatusSkip represents a task that was not processed because the consumer does not recognize it. The task will be passed to another consumer, or error out permanently.
const ResultStatusSuccess = "SUCCESS"
ResultStatusSuccess represents a task that was completed successfully
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func New ¶
func New(options ...QueueOption) Queue
New returns a fully initialized Queue object, with all options applied
type QueueOption ¶
type QueueOption func(*Queue)
QueueOption is a function that modifies a Queue object
func WithBufferSize ¶
func WithBufferSize(bufferSize int) QueueOption
WithBufferSize sets the number of tasks to lock in a single transaction
func WithConsumers ¶
func WithConsumers(consumers ...Consumer) QueueOption
func WithDefaultPriority ¶ added in v0.0.2
func WithDefaultPriority(defaultPriority int) QueueOption
WithDefaultPriority sets the default priority for new tasks
func WithDefaultRetryMax ¶ added in v0.0.2
func WithDefaultRetryMax(defaultRetryMax int) QueueOption
WithDefaultRetryMax sets the default number of times to retry a task before giving up
func WithPollStorage ¶
func WithPollStorage(pollStorage bool) QueueOption
WithPollStorage sets whether the queue should poll the storage for new tasks
func WithRunImmediatePriority ¶ added in v0.2.0
func WithRunImmediatePriority(runImmediatePriority int) QueueOption
WithRunImmediatePriority sets the maximum priority level for tasks that will run immediately
func WithStorage ¶
func WithStorage(storage Storage) QueueOption
WithStorage sets the storage and unmarshaller for the queue
func WithWorkerCount ¶
func WithWorkerCount(workerCount int) QueueOption
WithWorkerCount sets the number of concurrent processes to run
type Result ¶ added in v0.2.0
Result is the return value from a task function
func Ignored ¶ added in v0.2.0
func Ignored() Result
Ignored returns a Result object that has been "IGNORED" This happens when a consumer does not recognize the task name
func Success ¶ added in v0.2.0
func Success() Result
Success returns a Result object with a status of "SUCCESS"
func (Result) IsSuccessful ¶ added in v0.2.0
IsSuccessful returns TRUE if the Result is a "SUCCESS"
func (Result) NotSuccessful ¶ added in v0.2.0
NotSuccessful returns TRUE if the Result is NOT a "SUCCESS"
type Storage ¶
type Storage interface { // GetTasks retrieves a batch of Tasks from the Storage provider GetTasks() ([]Task, error) // SaveTask saves a Task to the Storage provider SaveTask(task Task) error // DeleteTask removes a Task from the Storage provider DeleteTask(taskID string) error // LogFailure writes a Task to the error log LogFailure(task Task) error }
Storage is the interface for persisting Tasks outside of memory
type Task ¶
type Task struct { TaskID string `bson:"taskId"` // Unique identfier for this task LockID string `bson:"lockId"` // Unique identifier for the worker that is currently processing this task Name string `bson:"name"` // Name of the task (used to identify the handler function) Arguments mapof.Any `bson:"arguments"` // Data required to execute this task (marshalled as a map) CreateDate int64 `bson:"createDate"` // Unix epoch seconds when this task was created StartDate int64 `bson:"startDate"` // Unix epoch seconds when this task is scheduled to execute TimeoutDate int64 `bson:"timeoutDate"` // Unix epoch seconds when this task will "time out" and can be reclaimed by another process Priority int `bson:"priority"` // Priority of the handler, determines the order that tasks are executed in. Signature string `bson:"signature,omitempty"` // Signature of the task. If a signature is present, then no other tasks will be allowed with this signature. RetryCount int `bson:"retryCount"` // Number of times that this task has already been retried RetryMax int `bson:"retryMax"` // Maximum number of times that this task can be retried Error string `bson:"error"` // Error (if any) from the last execution }
Task wraps a Task with the metadata required to track its runs and retries.
type TaskOption ¶
type TaskOption func(*Task)
func WithDelayHours ¶
func WithDelayHours(delayHours int) TaskOption
WithDelayHours sets the number of hours before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.
func WithDelayMinutes ¶
func WithDelayMinutes(delayMinutes int) TaskOption
WithDelayMinutes sets the number of minutes before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.
func WithDelaySeconds ¶
func WithDelaySeconds(delaySeconds int) TaskOption
WithDelaySeconds sets the number of seconds before the task is executed relative to the current clock. This differs from WithStartTime, which sets an absolute start time.
func WithPriority ¶
func WithPriority(priority int) TaskOption
WithPriority sets the priority of the task
func WithRetryMax ¶
func WithRetryMax(retryMax int) TaskOption
WithRetryMax sets the maximum number of times that a task can be retried
func WithSignature ¶ added in v0.3.0
func WithSignature(signature string) TaskOption
WithSignature sets the signature of the task. Only one task with a given signature can be active at a time. Duplicates are dropped silently.
func WithStartTime ¶ added in v0.3.0
func WithStartTime(timestamp time.Time) TaskOption
WithStartTime sets the absolute start time of the task This differs from WithDelayXXX options, which set a start time relative to the current clock.