Documentation
¶
Index ¶
- Constants
- Variables
- func ParseEventJSON(event []byte) (interface{}, string, error)
- func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error
- type BaseEvent
- type Client
- type ClientOpt
- func BindWorkQueue(queue string) ClientOpt
- func ClientConcurrency(c int) ClientOpt
- func CustomLogger(log Logger) ClientOpt
- func DiscardTaskStates(states ...TaskState) ClientOpt
- func MemoryStorage() ClientOpt
- func NatsConn(nc *nats.Conn) ClientOpt
- func NatsContext(c string, opts ...nats.Option) ClientOpt
- func NoStorageInit() ClientOpt
- func PrometheusListenPort(port int) ClientOpt
- func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt
- func StoreReplicas(r uint) ClientOpt
- func TaskRetention(r time.Duration) ClientOpt
- func WorkQueue(queue *Queue) ClientOpt
- type ClientOpts
- type HandlerFunc
- type ItemKind
- type Logger
- type Mux
- type ProcessItem
- type Queue
- type QueueInfo
- type RetryPolicy
- type RetryPolicyProvider
- type Storage
- type StorageAdmin
- type Task
- type TaskOpt
- type TaskResult
- type TaskState
- type TaskStateChangeEvent
- type TasksInfo
Examples ¶
Constants ¶
const ( // DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get DefaultJobRunTime = time.Hour // DefaultMaxTries when not configured for a queue this is the default tries it will get DefaultMaxTries = 10 // DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting DefaultQueueMaxConcurrent = 100 )
const ( // RequestReplyContentTypeHeader is the header text sent to indicate the body encoding and type RequestReplyContentTypeHeader = "AJ-Content-Type" // RequestReplyDeadlineHeader is the header indicating the deadline for processing the item RequestReplyDeadlineHeader = "AJ-Handler-Deadline" // RequestReplyTerminateError is the header to send in a reply that the task should be terminated via ErrTerminateTask RequestReplyTerminateError = "AJ-Terminate" // RequestReplyError is the header indicating a generic failure in handling an item RequestReplyError = "AJ-Error" // RequestReplyTaskType is the content type indicating the payload is a Task in JSON format RequestReplyTaskType = "application/x-asyncjobs-task+json" )
const ( // TasksStreamName is the name of the JetStream Stream storing tasks TasksStreamName = "CHORIA_AJ_TASKS" // TasksStreamSubjects is a NATS wildcard matching all tasks TasksStreamSubjects = "CHORIA_AJ.T.*" // TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID TasksStreamSubjectPattern = "CHORIA_AJ.T.%s" // EventsSubjectWildcard is the NATS wildcard for receiving all events EventsSubjectWildcard = "CHORIA_AJ.E.>" // TaskStateChangeEventSubjectPattern is a printf pattern for determining the event publish subject TaskStateChangeEventSubjectPattern = "CHORIA_AJ.E.task_state.%s" // TaskStateChangeEventSubjectWildcard is a NATS wildcard for receiving all TaskStateChangeEvent messages TaskStateChangeEventSubjectWildcard = "CHORIA_AJ.E.task_state.*" // WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue WorkStreamNamePattern = "CHORIA_AJ_Q_%s" // WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType WorkStreamSubjectPattern = "CHORIA_AJ.Q.%s.%s" // WorkStreamSubjectWildcard is a NATS filter matching all enqueued items for any task store WorkStreamSubjectWildcard = "CHORIA_AJ.Q.>" // WorkStreamNamePrefix is the prefix that, when removed, reveals the queue name WorkStreamNamePrefix = "CHORIA_AJ_Q_" // RequestReplyTaskHandlerPattern is the subject request reply task handlers should listen on by default RequestReplyTaskHandlerPattern = "CHORIA_AJ.H.T.%s" )
const (
// TaskStateChangeEventType is the event type for TaskStateChangeEvent types
TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state"
)
Variables ¶
var ( // ErrTaskNotFound is the error indicating a task does not exist rather than a failure to load ErrTaskNotFound = errors.New("task not found") // ErrTerminateTask indicates that a task failed, and no further processing attempts should be made ErrTerminateTask = fmt.Errorf("terminate task") // ErrNoTasks indicates the task store is empty ErrNoTasks = fmt.Errorf("no tasks found") // ErrTaskPastDeadline indicates a task that was scheduled for handling is past its deadline ErrTaskPastDeadline = fmt.Errorf("past deadline") // ErrTaskAlreadyActive indicates that a task is already in the active state ErrTaskAlreadyActive = fmt.Errorf("task already active") // ErrTaskTypeCannotEnqueue indicates that a task is in a state where it cannot be enqueued as new ErrTaskTypeCannotEnqueue = fmt.Errorf("cannot enqueue a task in state") // ErrTaskUpdateFailed indicates a task update failed ErrTaskUpdateFailed = fmt.Errorf("failed updating task state") // ErrTaskAlreadyInState indicates an update failed because a task was already in the desired state ErrTaskAlreadyInState = fmt.Errorf("%w, already in desired state", ErrTaskUpdateFailed) // ErrTaskLoadFailed indicates a task failed for an unknown reason ErrTaskLoadFailed = fmt.Errorf("loading task failed") // ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type") // ErrDuplicateHandlerForTaskType indicates a task handler for a specific type is already registered ErrDuplicateHandlerForTaskType = fmt.Errorf("duplicate handler for task type") // ErrInvalidHeaders indicates that message headers from JetStream were not valid ErrInvalidHeaders = fmt.Errorf("coult not decode headers") // ErrContextWithoutDeadline indicates a context.Context was passed without deadline when it was expected ErrContextWithoutDeadline = fmt.Errorf("non deadline context given") // ErrInvalidStorageItem indicates a Work Queue item had no JetStream state associated with it ErrInvalidStorageItem = fmt.Errorf("invalid storage item") // ErrNoNatsConn indicates that a nil connection was supplied ErrNoNatsConn = fmt.Errorf("no NATS connection supplied") // ErrNoMux indicates that a processor was started with no routing mux configured ErrNoMux = fmt.Errorf("mux is required") // ErrQueueNotFound is the error indicating a queue does not exist rather than a failure to load ErrQueueNotFound = errors.New("queue not found") // ErrQueueConsumerNotFound indicates that the Work Queue store has no consumers defined ErrQueueConsumerNotFound = errors.New("queue consumer not found") // ErrQueueNameRequired indicates a queue has no name ErrQueueNameRequired = fmt.Errorf("queue name is required") // ErrQueueItemCorrupt indicates that an item received from the work queue was invalid - perhaps invalid JSON ErrQueueItemCorrupt = fmt.Errorf("corrupt queue item received") // ErrQueueItemInvalid is an item read from the queue with no data or obviously bad data ErrQueueItemInvalid = fmt.Errorf("invalid queue item received") // ErrInvalidQueueState indicates a queue was attempted to be used but no internal state is known of that queue ErrInvalidQueueState = fmt.Errorf("invalid queue storage state") // ErrDuplicateItem indicates that the Work Queue deduplication protection refused a message ErrDuplicateItem = fmt.Errorf("duplicate work queue item") // ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered ErrUnknownEventType = fmt.Errorf("unknown event type") // ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listerners or network error ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed") // ErrRequestReplyNoDeadline indicates a request-reply handler was called without a deadline ErrRequestReplyNoDeadline = fmt.Errorf("request-reply requires deadline context") // ErrRequestReplyShortDeadline indicates a deadline context has a too short timeout ErrRequestReplyShortDeadline = fmt.Errorf("request-reply deadline too short") )
var ( // RetryLinearTenMinutes is a 20-step policy between 1 and 10 minutes RetryLinearTenMinutes = linearPolicy(20, 0.90, time.Minute, 10*time.Minute) // RetryLinearOneHour is a 50-step policy between 10 minutes and 1 hour RetryLinearOneHour = linearPolicy(20, 0.90, 10*time.Minute, 60*time.Minute) // RetryLinearOneMinute is a 20-step policy between 1 second and 1 minute RetryLinearOneMinute = linearPolicy(20, 0.5, time.Second, time.Minute) // RetryDefault is the default retry policy RetryDefault = RetryLinearTenMinutes )
Functions ¶
func ParseEventJSON ¶ added in v0.0.2
ParseEventJSON parses event bytes returning the parsed Event and its event type
func RetrySleep ¶ added in v0.0.2
func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error
RetrySleep sleeps for the duration for try n or until interrupted by ctx
Types ¶
type BaseEvent ¶ added in v0.0.2
type BaseEvent struct { EventID string `json:"event_id"` EventType string `json:"type"` TimeStamp time.Time `json:"timestamp"` }
BaseEvent is present in all event types and can be used to detect the type
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client connects Task producers and Task handlers to the backend
Example (Consumer) ¶
queue := &Queue{ Name: "P100", MaxRunTime: 60 * time.Minute, MaxConcurrent: 20, MaxTries: 100, } // Uses the NATS CLI context WQ for connection details, will create the queue if // it does not already exist client, err := NewClient(NatsContext("WQ"), WorkQueue(queue), RetryBackoffPolicy(RetryLinearOneHour)) panicIfErr(err) router := NewTaskRouter() err = router.HandleFunc("email:send", func(_ context.Context, _ Logger, t *Task) (interface{}, error) { log.Printf("Processing task: %s", t.ID) // handle task.Payload which is a JSON encoded email // task record will be updated with this payload result return "success", nil }) panicIfErr(err) // Starts handling registered tasks, blocks until canceled err = client.Run(context.Background(), router) panicIfErr(err)
Output:
Example (Producer) ¶
queue := &Queue{ Name: "P100", MaxRunTime: 60 * time.Minute, MaxConcurrent: 20, MaxTries: 100, } email := newEmail("user@example.net", "Test Subject", "Test Body") // Creates a new task that has a deadline for processing 1 hour from now task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour))) panicIfErr(err) // Uses the NATS CLI context WQ for connection details, will create the queue if // it does not already exist client, err := NewClient(NatsContext("WQ"), WorkQueue(queue)) panicIfErr(err) // Adds the task to the queue called P100 err = client.EnqueueTask(context.Background(), task) panicIfErr(err)
Output:
func NewClient ¶
NewClient creates a new client, one of NatsConn() or NatsContext() must be passed, other options are optional.
When no Queue() is supplied a default queue called DEFAULT will be used
func (*Client) EnqueueTask ¶
EnqueueTask adds a task to the named queue which must already exist
func (*Client) LoadTaskByID ¶
LoadTaskByID loads a task from the backend using its ID
Example ¶
client, err := NewClient(NatsContext("WQ")) panicIfErr(err) task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB") panicIfErr(err) fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
Output:
func (*Client) RetryTaskByID ¶ added in v0.0.2
RetryTaskByID will retry a task, first removing an entry from the Work Queue if already there
func (*Client) StorageAdmin ¶
func (c *Client) StorageAdmin() StorageAdmin
StorageAdmin access admin features of the storage backend
type ClientOpt ¶
type ClientOpt func(opts *ClientOpts) error
ClientOpt configures the client
func BindWorkQueue ¶
BindWorkQueue binds the client to a work queue that should already exist
func ClientConcurrency ¶
ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling. This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor would be below that. This allows for horizontal and vertical scaling but without unbounded growth - the queue MaxConcurrent is the absolute upper limit for in-flight jobs for 1 specific queue.
func CustomLogger ¶
CustomLogger sets a custom logger to use for all logging
func DiscardTaskStates ¶ added in v0.0.2
DiscardTaskStates configures the client to discard Tasks that reach a final state in the list of supplied TaskState
func MemoryStorage ¶
func MemoryStorage() ClientOpt
MemoryStorage enables storing tasks and work queue in memory in JetStream
func NatsConn ¶
func NatsConn(nc *nats.Conn) ClientOpt
NatsConn sets an already connected NATS connection as communications channel
func NatsContext ¶
NatsContext attempts to connect to the NATS client context c
func NoStorageInit ¶ added in v0.0.2
func NoStorageInit() ClientOpt
NoStorageInit skips setting up any queues or task stores when creating a client
func PrometheusListenPort ¶
PrometheusListenPort enables prometheus listening on a specific port
func RetryBackoffPolicy ¶
func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt
RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes
func StoreReplicas ¶
StoreReplicas sets the replica level to keep for the tasks store and work queue
Used only when initially creating the underlying streams.
func TaskRetention ¶
TaskRetention is the time tasks will be kept with.
Used only when initially creating the underlying streams.
type ClientOpts ¶
type ClientOpts struct {
// contains filtered or unexported fields
}
ClientOpts configures the client
type HandlerFunc ¶
HandlerFunc handles a single task, the response bytes will be stored in the original task
type ItemKind ¶
type ItemKind int
ItemKind indicates the kind of job a work queue entry represents
var ( // TaskItem is a task as defined by Task TaskItem ItemKind = 0 )
type Logger ¶
type Logger interface { Debugf(format string, v ...interface{}) Infof(format string, v ...interface{}) Warnf(format string, v ...interface{}) Errorf(format string, v ...interface{}) }
Logger is a pluggable logger interface
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux routes messages
Note: this will change to be nearer to a server mux and include support for middleware
func (*Mux) HandleFunc ¶
func (m *Mux) HandleFunc(taskType string, h HandlerFunc) error
HandleFunc registers a task for a taskType. The taskType must match exactly with the matching tasks
func (*Mux) Handler ¶
func (m *Mux) Handler(t *Task) HandlerFunc
Handler looks up the handler function for a task
type ProcessItem ¶
type ProcessItem struct { Kind ItemKind `json:"kind"` JobID string `json:"job"` // contains filtered or unexported fields }
ProcessItem is an individual item stored in the work queue
type Queue ¶
type Queue struct { // Name is a unique name for the work queue, should be in the character range a-zA-Z0-9 Name string `json:"name"` // MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire MaxAge time.Duration `json:"max_age"` // MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied. MaxEntries int `json:"max_entries"` // DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected DiscardOld bool `json:"discard_old"` // MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries MaxTries int `json:"max_tries"` // MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime MaxRunTime time.Duration `json:"max_runtime"` // MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent MaxConcurrent int `json:"max_concurrent"` // NoCreate will not try to create a queue, will bind to an existing one or fail NoCreate bool // contains filtered or unexported fields }
Queue represents a work queue
type QueueInfo ¶
type QueueInfo struct { // Name is the name of the queue Name string `json:"name"` // Time is the information was gathered Time time.Time `json:"time"` // Stream is the active JetStream Stream Information Stream *api.StreamInfo `json:"stream_info"` // Consumer is the worker stream information Consumer *api.ConsumerInfo `json:"consumer_info"` }
QueueInfo holds information about a queue state
type RetryPolicy ¶
type RetryPolicy struct { // Intervals is a range of time periods backoff will be based off Intervals []time.Duration // Jitter is a factor applied to the specific interval avoid repeating same backoff periods Jitter float64 }
RetryPolicy defines a period that failed jobs will be retried against
type RetryPolicyProvider ¶ added in v0.0.2
RetryPolicyProvider is the interface that the ReplyPolicy implements, use this to implement your own exponential backoff system or similar for task retries.
type Storage ¶
type Storage interface { SaveTaskState(ctx context.Context, task *Task, notify bool) error EnqueueTask(ctx context.Context, queue *Queue, task *Task) error RetryTaskByID(ctx context.Context, queue *Queue, id string) error LoadTaskByID(id string) (*Task, error) PublishTaskStateChangeEvent(ctx context.Context, task *Task) error AckItem(ctx context.Context, item *ProcessItem) error NakItem(ctx context.Context, item *ProcessItem) error TerminateItem(ctx context.Context, item *ProcessItem) error PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error) PrepareQueue(q *Queue, replicas int, memory bool) error PrepareTasks(memory bool, replicas int, retention time.Duration) error }
Storage implements the backend access
type StorageAdmin ¶
type StorageAdmin interface { Queues() ([]*QueueInfo, error) QueueNames() ([]string, error) QueueInfo(name string) (*QueueInfo, error) PurgeQueue(name string) error DeleteQueue(name string) error PrepareQueue(q *Queue, replicas int, memory bool) error PrepareTasks(memory bool, replicas int, retention time.Duration) error TasksInfo() (*TasksInfo, error) LoadTaskByID(id string) (*Task, error) DeleteTaskByID(id string) error Tasks(ctx context.Context, limit int32) (chan *Task, error) TasksStore() (*jsm.Manager, *jsm.Stream, error) }
StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream but that's ok, we're not really intending to change the storage or support more
type Task ¶
type Task struct { // ID is a k-sortable unique ID for the task ID string `json:"id"` // Type is a free form string that can later be used as a routing key to send tasks to handlers Type string `json:"type"` // Queue is the name of the queue the task was enqueued with, set only during the enqueue operation else empty Queue string `json:"queue"` // Payload is a JSON representation of the associated work Payload []byte `json:"payload"` // Deadline is a cut-off time for the job to complete, should a job be scheduled after this time it will fail. // In-Flight jobs are allowed to continue past this time. Only starting handlers are impacted by this deadline. Deadline *time.Time `json:"deadline,omitempty"` // Result is the outcome of the job, only set for successful jobs Result *TaskResult `json:"result,omitempty"` // State is the most recent recorded state the job is in State TaskState `json:"state"` // CreatedAt is the time the job was created in UTC timezone CreatedAt time.Time `json:"created"` // LastTriedAt is a time stamp for when the job was last handed to a handler LastTriedAt *time.Time `json:"tried,omitempty"` // Tries is how many times the job was handled Tries int `json:"tries"` // LastErr is the most recent handling error if any LastErr string `json:"last_err,omitempty"` // contains filtered or unexported fields }
Task represents a job item that handlers will execute
func NewTask ¶
NewTask creates a new task of taskType that can later be used to route tasks to handlers. The task will carry a JSON encoded representation of payload.
Example (With_deadline) ¶
email := newEmail("user@example.net", "Test Subject", "Test Body") // Creates a new task that has a deadline for processing 1 hour from now task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour))) if err != nil { panic(fmt.Sprintf("Could not create task: %v", err)) } fmt.Printf("Task ID: %s\n", task.ID)
Output:
func (*Task) IsPastDeadline ¶ added in v0.0.2
IsPastDeadline determines if the task is past it's deadline
type TaskOpt ¶
TaskOpt configures Tasks made using NewTask()
func TaskDeadline ¶
TaskDeadline sets an absolute time after which the task should not be handled
type TaskResult ¶
type TaskResult struct { Payload interface{} `json:"payload"` CompletedAt time.Time `json:"completed"` }
TaskResult is the result of task execution, this will only be set for successfully processed jobs
type TaskState ¶
type TaskState string
TaskState indicates the current state a task is in
const ( // TaskStateUnknown is for tasks that do not have a state set TaskStateUnknown TaskState = "" // TaskStateNew newly created tasks that have not been handled yet TaskStateNew TaskState = "new" // TaskStateActive tasks that are currently being handled TaskStateActive TaskState = "active" // TaskStateRetry tasks that previously failed and are waiting retry TaskStateRetry TaskState = "retry" // TaskStateExpired tasks that reached their deadline TaskStateExpired TaskState = "expired" // TaskStateTerminated indicates that the task was terminated via the ErrTerminateTask error TaskStateTerminated TaskState = "terminated" // TaskStateCompleted tasks that are completed TaskStateCompleted TaskState = "complete" // TaskStateQueueError tasks that could not have their associated Work Queue item created TaskStateQueueError TaskState = "queue_error" )
type TaskStateChangeEvent ¶ added in v0.0.2
type TaskStateChangeEvent struct { BaseEvent // TaskID is the ID of the task, use with LoadTaskByID() to access the task TaskID string `json:"task_id"` // State is the new state of the Task State TaskState `json:"state"` // Tries is how many times the Task has been processed Tries int `json:"tries"` // Queue is the queue the task is in, can be empty Queue string `json:"queue,omitempty"` // TaskType is the task routing type TaskType string `json:"task_type"` // LstErr is the error that caused a task to change state for error state changes LastErr string `json:"last_error,omitempty"` // Age is the time since the task was created in milliseconds Age time.Duration `json:"task_age,omitempty"` }
TaskStateChangeEvent notifies that a significant change occurred in a Task
func NewTaskStateChangeEvent ¶ added in v0.0.2
func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error)
NewTaskStateChangeEvent creates a new event notifying of a change in task state