Documentation ¶
Index ¶
- Constants
- Variables
- type Client
- type ClientOpt
- func BindWorkQueue(queue string) ClientOpt
- func ClientConcurrency(c int) ClientOpt
- func CustomLogger(log Logger) ClientOpt
- func MemoryStorage() ClientOpt
- func NatsConn(nc *nats.Conn) ClientOpt
- func NatsContext(c string, opts ...nats.Option) ClientOpt
- func PrometheusListenPort(port int) ClientOpt
- func RetryBackoffPolicy(p RetryPolicy) ClientOpt
- func StoreReplicas(r uint) ClientOpt
- func TaskRetention(r time.Duration) ClientOpt
- func WorkQueue(queue *Queue) ClientOpt
- type ClientOpts
- type Handler
- type HandlerFunc
- type ItemKind
- type Logger
- type Mux
- type ProcessItem
- type Queue
- type QueueInfo
- type RetryPolicy
- type Storage
- type StorageAdmin
- type Task
- type TaskOpt
- type TaskResult
- type TaskState
- type TasksInfo
Examples ¶
Constants ¶
const ( // DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get DefaultJobRunTime = time.Hour // DefaultMaxTries when not configured for a queue this is the default tries it will get DefaultMaxTries = 10 // DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting DefaultQueueMaxConcurrent = 100 )
const ( // TasksStreamName is the name of the JetStream Stream storing tasks TasksStreamName = "CHORIA_AJ_TASKS" // TasksStreamSubjects is a NATS wildcard matching all tasks TasksStreamSubjects = "CHORIA_AJ.T.*" // TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID TasksStreamSubjectPattern = "CHORIA_AJ.T.%s" // WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue WorkStreamNamePattern = "CHORIA_AJ_Q_%s" // WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType WorkStreamSubjectPattern = "CHORIA_AJ.Q.%s.%s" // WorkStreamSubjectWildcard is a NATS filter matching all enqueued items for any task store WorkStreamSubjectWildcard = "CHORIA_AJ.Q.>" // WorkStreamNamePrefix is the prefix that, when removed, reveals the queue name WorkStreamNamePrefix = "CHORIA_AJ_Q_" )
Variables ¶
var ( // ErrTaskNotFound is the error indicating a task does not exist rather than a failure to load ErrTaskNotFound = errors.New("task not found") // ErrQueueNotFound is the error indicating a queue does not exist rather than a failure to load ErrQueueNotFound = errors.New("queue not found") // ErrTerminateTask indicates that a task failed, and no further processing attempts should be made ErrTerminateTask = fmt.Errorf("terminate task") )
var ( // RetryLinearTenMinutes is a 20-step policy between 1 and 10 minutes RetryLinearTenMinutes = linearPolicy(20, 0.90, time.Minute, 10*time.Minute) // RetryLinearOneHour is a 50-step policy between 10 minutes and 1 hour RetryLinearOneHour = linearPolicy(20, 0.90, 10*time.Minute, 60*time.Minute) // RetryLinearOneMinute is a 20-step policy between 1 second and 1 minute RetryLinearOneMinute = linearPolicy(20, 0.5, time.Second, time.Minute) // RetryDefault is the default retry policy RetryDefault = RetryLinearTenMinutes )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client connects Task producers and Task handlers to the backend
Example (Consumer) ¶
queue := &Queue{ Name: "P100", MaxRunTime: 60 * time.Minute, MaxConcurrent: 20, MaxTries: 100, } // Uses the NATS CLI context WQ for connection details, will create the queue if // it does not already exist client, err := NewClient(NatsContext("WQ"), WorkQueue(queue)) panicIfErr(err) router := NewTaskRouter() err = router.HandleFunc("email:send", func(_ context.Context, t *Task) (interface{}, error) { log.Printf("Processing task: %s", t.ID) // handle task.Payload which is a JSON encoded email // task record will be updated with this payload result return "success", nil }) panicIfErr(err) // Starts handling registered tasks, blocks until canceled err = client.Run(context.Background(), router) panicIfErr(err)
Output:
Example (Producer) ¶
queue := &Queue{ Name: "P100", MaxRunTime: 60 * time.Minute, MaxConcurrent: 20, MaxTries: 100, } email := newEmail("user@example.net", "Test Subject", "Test Body") // Creates a new task that has a deadline for processing 1 hour from now task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour))) panicIfErr(err) // Uses the NATS CLI context WQ for connection details, will create the queue if // it does not already exist client, err := NewClient(NatsContext("WQ"), WorkQueue(queue)) panicIfErr(err) // Adds the task to the queue called P100 err = client.EnqueueTask(context.Background(), task) panicIfErr(err)
Output:
func NewClient ¶
NewClient creates a new client, one of NatsConn() or NatsContext() must be passed, other options are optional.
When no Queue() is supplied a default queue called DEFAULT will be used
func (*Client) EnqueueTask ¶
EnqueueTask adds a task to the named queue which must already exist
func (*Client) LoadTaskByID ¶
LoadTaskByID loads a task from the backend using its ID
Example ¶
client, err := NewClient(NatsContext("WQ")) panicIfErr(err) task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB") panicIfErr(err) fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
Output:
func (*Client) StorageAdmin ¶
func (c *Client) StorageAdmin() StorageAdmin
StorageAdmin access admin features of the storage backend
type ClientOpt ¶
type ClientOpt func(opts *ClientOpts) error
ClientOpt configures the client
func BindWorkQueue ¶
BindWorkQueue binds the client to a work queue that should already exist
func ClientConcurrency ¶
ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling. This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor would be below that. This allows for horizontal and vertical scaling but without unbounded growth - the queue MaxConcurrent is the absolute upper limit for in-flight jobs for 1 specific queue.
func CustomLogger ¶
CustomLogger sets a custom logger to use for all logging
func MemoryStorage ¶
func MemoryStorage() ClientOpt
MemoryStorage enables storing tasks and work queue in memory in JetStream
func NatsConn ¶
func NatsConn(nc *nats.Conn) ClientOpt
NatsConn sets an already connected NATS connection as communications channel
func NatsContext ¶
NatsContext attempts to connect to the NATS client context c
func PrometheusListenPort ¶
PrometheusListenPort enables prometheus listening on a specific port
func RetryBackoffPolicy ¶
func RetryBackoffPolicy(p RetryPolicy) ClientOpt
RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes
func StoreReplicas ¶
StoreReplicas sets the replica level to keep for the tasks store and work queue
Used only when initially creating the underlying streams.
func TaskRetention ¶
TaskRetention is the time tasks will be kept with.
Used only when initially creating the underlying streams.
type ClientOpts ¶
type ClientOpts struct {
// contains filtered or unexported fields
}
ClientOpts configures the client
type Handler ¶
type Handler interface { // ProcessTask processes a single task, the response bytes will be stored in the original task ProcessTask(ctx context.Context, t *Task) (interface{}, error) }
Handler handles tasks
type HandlerFunc ¶
HandlerFunc handles a single task, the response bytes will be stored in the original task
type ItemKind ¶
type ItemKind int
ItemKind indicates the kind of job a work queue entry represents
var ( // TaskItem is a task as defined by Task TaskItem ItemKind = 0 )
type Logger ¶
type Logger interface { Debugf(format string, v ...interface{}) Infof(format string, v ...interface{}) Warnf(format string, v ...interface{}) Errorf(format string, v ...interface{}) }
Logger is a pluggable logger interface
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux routes messages
Note: this will change to be nearer to a server mux and include support for middleware
func (*Mux) HandleFunc ¶
func (m *Mux) HandleFunc(taskType string, h HandlerFunc) error
HandleFunc registers a task for a taskType. The taskType must match exactly with the matching tasks
func (*Mux) Handler ¶
func (m *Mux) Handler(t *Task) HandlerFunc
Handler looks up the handler function for a task
type ProcessItem ¶
type ProcessItem struct { Kind ItemKind `json:"kind"` JobID string `json:"job"` // contains filtered or unexported fields }
ProcessItem is an individual item stored in the work queue
type Queue ¶
type Queue struct { // Name is a unique name for the work queue, should be in the character range a-zA-Z0-9 Name string `json:"name"` // MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire MaxAge time.Duration `json:"max_age"` // MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied. MaxEntries int `json:"max_entries"` // DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected DiscardOld bool `json:"discard_old"` // MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries MaxTries int `json:"max_tries"` // MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime MaxRunTime time.Duration `json:"max_runtime"` // MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent MaxConcurrent int `json:"max_concurrent"` // NoCreate will not try to create a queue, will bind to an existing one or fail NoCreate bool // contains filtered or unexported fields }
Queue represents a work queue
type QueueInfo ¶
type QueueInfo struct { // Name is the name of the queue Name string `json:"name"` // Time is the information was gathered Time time.Time `json:"time"` // Stream is the active JetStream Stream Information Stream *api.StreamInfo `json:"stream_info"` // Consumer is the worker stream information Consumer *api.ConsumerInfo `json:"consumer_info"` }
QueueInfo holds information about a queue state
type RetryPolicy ¶
type RetryPolicy struct { // Intervals is a range of time periods backoff will be based off Intervals []time.Duration // Jitter is a factor applied to the specific interval avoid repeating same backoff periods Jitter float64 }
RetryPolicy defines a period that failed jobs will be retried against
type Storage ¶
type Storage interface { SaveTaskState(ctx context.Context, task *Task) error EnqueueTask(ctx context.Context, queue *Queue, task *Task) error AckItem(ctx context.Context, item *ProcessItem) error NakItem(ctx context.Context, item *ProcessItem) error TerminateItem(ctx context.Context, item *ProcessItem) error PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error) PrepareQueue(q *Queue, replicas int, memory bool) error PrepareTasks(memory bool, replicas int, retention time.Duration) error LoadTaskByID(id string) (*Task, error) }
Storage implements the backend access
type StorageAdmin ¶
type StorageAdmin interface { Queues() ([]*QueueInfo, error) QueueNames() ([]string, error) QueueInfo(name string) (*QueueInfo, error) PurgeQueue(name string) error DeleteQueue(name string) error PrepareQueue(q *Queue, replicas int, memory bool) error PrepareTasks(memory bool, replicas int, retention time.Duration) error TasksInfo() (*TasksInfo, error) LoadTaskByID(id string) (*Task, error) DeleteTaskByID(id string) error Tasks(ctx context.Context, limit int32) (chan *Task, error) TasksStore() (*jsm.Manager, *jsm.Stream, error) }
StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream but that's ok, we're not really intending to change the storage or support more
type Task ¶
type Task struct { // ID is a k-sortable unique ID for the task ID string `json:"id"` // Type is a free form string that can later be used as a routing key to send tasks to handlers Type string `json:"type"` // Queue is the name of the queue the task was enqueued with, set only during the enqueue operation else empty Queue string `json:"queue"` // Payload is a JSON representation of the associated work Payload []byte `json:"payload"` // Deadline is a cut-off time for the job to complete, should a job be scheduled after this time it will fail. // In-Flight jobs are allowed to continue past this time. Only starting handlers are impacted by this deadline. Deadline *time.Time `json:"deadline,omitempty"` // Result is the outcome of the job, only set for successful jobs Result *TaskResult `json:"result,omitempty"` // State is the most recent recorded state the job is in State TaskState `json:"state"` // CreatedAt is the time the job was created in UTC timezone CreatedAt time.Time `json:"created"` // LastTriedAt is a time stamp for when the job was last handed to a handler LastTriedAt *time.Time `json:"tried,omitempty"` // Tries is how many times the job was handled Tries int `json:"tries"` // LastErr is the most recent handling error if any LastErr string `json:"last_err,omitempty"` // contains filtered or unexported fields }
Task represents a job item that handlers will execute
func NewTask ¶
NewTask creates a new task of taskType that can later be used to route tasks to handlers. The task will carry a JSON encoded representation of payload.
Example (With_deadline) ¶
email := newEmail("user@example.net", "Test Subject", "Test Body") // Creates a new task that has a deadline for processing 1 hour from now task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour))) if err != nil { panic(fmt.Sprintf("Could not create task: %v", err)) } fmt.Printf("Task ID: %s\n", task.ID)
Output:
type TaskOpt ¶
TaskOpt configures Tasks made using NewTask()
func TaskDeadline ¶
TaskDeadline sets an absolute time after which the task should not be handled
type TaskResult ¶
type TaskResult struct { Payload interface{} `json:"payload"` CompletedAt time.Time `json:"completed"` }
TaskResult is the result of task execution, this will only be set for successfully processed jobs
type TaskState ¶
type TaskState string
TaskState indicates the current state a task is in
const ( // TaskStateUnknown is for tasks that do not have a state set TaskStateUnknown TaskState = "" // TaskStateNew newly created tasks that have not been handled yet TaskStateNew TaskState = "new" // TaskStateActive tasks that are currently being handled TaskStateActive TaskState = "active" // TaskStateRetry tasks that previously failed and are waiting retry TaskStateRetry TaskState = "retry" // TaskStateExpired tasks that reached their deadline TaskStateExpired TaskState = "expired" // TaskStateTerminated indicates that the task was terminated via the ErrTerminateTask error TaskStateTerminated TaskState = "terminated" // TaskStateCompleted tasks that are completed TaskStateCompleted TaskState = "complete" // TaskStateQueueError tasks that could not have their associated Work Queue item created TaskStateQueueError TaskState = "queue_error" )