Documentation ¶
Index ¶
- Constants
- Variables
- func IsRetryPolicyKnown(name string) bool
- func IsValidName(name string) bool
- func ParseEventJSON(event []byte) (any, string, error)
- func RequestReplySubjectForTaskType(taskType string) string
- func RetryPolicyNames() []string
- func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error
- type BaseEvent
- type Client
- func (c *Client) EnqueueTask(ctx context.Context, task *Task) error
- func (c *Client) LoadScheduledTaskByName(name string) (*ScheduledTask, error)
- func (c *Client) LoadTaskByID(id string) (*Task, error)
- func (c *Client) NewScheduledTask(name string, schedule string, queue string, task *Task) error
- func (c *Client) RemoveScheduledTask(name string) error
- func (c *Client) RetryTaskByID(ctx context.Context, id string) error
- func (c *Client) Run(ctx context.Context, router *Mux) error
- func (c *Client) ScheduledTasksStorage() ScheduledTaskStorage
- func (c *Client) StorageAdmin() StorageAdmin
- type ClientOpt
- func BindWorkQueue(queue string) ClientOpt
- func ClientConcurrency(c int) ClientOpt
- func CustomLogger(log Logger) ClientOpt
- func DiscardTaskStates(states ...TaskState) ClientOpt
- func DiscardTaskStatesByName(states ...string) ClientOpt
- func MemoryStorage() ClientOpt
- func NatsConn(nc *nats.Conn) ClientOpt
- func NatsContext(c string, opts ...nats.Option) ClientOpt
- func NoStorageInit() ClientOpt
- func PrometheusListenPort(port int) ClientOpt
- func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt
- func RetryBackoffPolicyName(name string) ClientOpt
- func StoreReplicas(r uint) ClientOpt
- func TaskRetention(r time.Duration) ClientOpt
- func WorkQueue(queue *Queue) ClientOpt
- type ClientOpts
- type HandlerFunc
- type ItemKind
- type LeaderElectedEvent
- type Logger
- type Mux
- type ProcessItem
- type Queue
- type QueueInfo
- type RetryPolicy
- type RetryPolicyProvider
- type ScheduleWatchEntry
- type ScheduledTask
- type ScheduledTaskStorage
- type Storage
- type StorageAdmin
- type Task
- type TaskOpt
- type TaskResult
- type TaskScheduler
- type TaskState
- type TaskStateChangeEvent
- type TasksInfo
Examples ¶
Constants ¶
const ( // ShortedScheduledDeadline is the shortest deadline a scheduled task may have ShortedScheduledDeadline = 30 * time.Second // DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get DefaultJobRunTime = time.Hour // DefaultMaxTries when not configured for a task this is the default tries it will get DefaultMaxTries = 10 // DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting DefaultQueueMaxConcurrent = 100 )
const ( // TaskStateChangeEventType is the event type for TaskStateChangeEvent events TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state" // LeaderElectedEventType is the event type for LeaderElectedEvent events LeaderElectedEventType = "io.choria.asyncjobs.v1.leader_elected" )
const ( // RequestReplyContentTypeHeader is the header text sent to indicate the body encoding and type RequestReplyContentTypeHeader = "AJ-Content-Type" // RequestReplyDeadlineHeader is the header indicating the deadline for processing the item RequestReplyDeadlineHeader = "AJ-Handler-Deadline" // RequestReplyTerminateError is the header to send in a reply that the task should be terminated via ErrTerminateTask RequestReplyTerminateError = "AJ-Terminate" // RequestReplyError is the header indicating a generic failure in handling an item RequestReplyError = "AJ-Error" // RequestReplyTaskType is the content type indicating the payload is a Task in JSON format RequestReplyTaskType = "application/x-asyncjobs-task+json" )
const ( // TasksStreamName is the name of the JetStream Stream storing tasks TasksStreamName = "CHORIA_AJ_TASKS" // TasksStreamSubjects is a NATS wildcard matching all tasks TasksStreamSubjects = "CHORIA_AJ.T.*" // TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID TasksStreamSubjectPattern = "CHORIA_AJ.T.%s" // EventsSubjectWildcard is the NATS wildcard for receiving all events EventsSubjectWildcard = "CHORIA_AJ.E.>" // TaskStateChangeEventSubjectPattern is a printf pattern for determining the event publish subject TaskStateChangeEventSubjectPattern = "CHORIA_AJ.E.task_state.%s" // TaskStateChangeEventSubjectWildcard is a NATS wildcard for receiving all TaskStateChangeEvent messages TaskStateChangeEventSubjectWildcard = "CHORIA_AJ.E.task_state.*" // LeaderElectedEventSubjectPattern is the pattern for determining the event publish subject LeaderElectedEventSubjectPattern = "CHORIA_AJ.E.leader_election.%s" // LeaderElectedEventSubjectWildcard is the NATS wildcard for receiving all LeaderElectedEvent messages LeaderElectedEventSubjectWildcard = "CHORIA_AJ.E.leader_election.>" // WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue WorkStreamNamePattern = "CHORIA_AJ_Q_%s" // WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType WorkStreamSubjectPattern = "CHORIA_AJ.Q.%s.%s" // WorkStreamSubjectWildcard is a NATS filter matching all enqueued items for any task store WorkStreamSubjectWildcard = "CHORIA_AJ.Q.>" // WorkStreamNamePrefix is the prefix that, when removed, reveals the queue name WorkStreamNamePrefix = "CHORIA_AJ_Q_" // RequestReplyTaskHandlerPattern is the subject request reply task handlers should listen on by default RequestReplyTaskHandlerPattern = "CHORIA_AJ.H.T.%s" // ConfigBucketName is the KV bucket for configuration like scheduled tasks ConfigBucketName = "CHORIA_AJ_CONFIGURATION" // LeaderElectionBucketName is the KV bucket that will manage leader elections LeaderElectionBucketName = "CHORIA_AJ_ELECTIONS" )
Variables ¶
var ( // ErrTaskNotFound is the error indicating a task does not exist rather than a failure to load ErrTaskNotFound = errors.New("task not found") // ErrTerminateTask indicates that a task failed, and no further processing attempts should be made ErrTerminateTask = fmt.Errorf("terminate task") // ErrNoTasks indicates the task store is empty ErrNoTasks = fmt.Errorf("no tasks found") // ErrTaskPastDeadline indicates a task that was scheduled for handling is past its deadline ErrTaskPastDeadline = fmt.Errorf("past deadline") // ErrTaskExceedsMaxTries indicates a task exceeded its maximum attempts ErrTaskExceedsMaxTries = fmt.Errorf("exceeded maximum tries") // ErrTaskAlreadyActive indicates that a task is already in the active state ErrTaskAlreadyActive = fmt.Errorf("task already active") // ErrTaskTypeCannotEnqueue indicates that a task is in a state where it cannot be enqueued as new ErrTaskTypeCannotEnqueue = fmt.Errorf("cannot enqueue a task in state") // ErrTaskUpdateFailed indicates a task update failed ErrTaskUpdateFailed = fmt.Errorf("failed updating task state") // ErrTaskAlreadyInState indicates an update failed because a task was already in the desired state ErrTaskAlreadyInState = fmt.Errorf("%w, already in desired state", ErrTaskUpdateFailed) // ErrTaskLoadFailed indicates a task failed for an unknown reason ErrTaskLoadFailed = fmt.Errorf("loading task failed") // ErrTaskTypeRequired indicates an empty task type was given ErrTaskTypeRequired = fmt.Errorf("task type is required") // ErrTaskTypeInvalid indicates an invalid task type was given ErrTaskTypeInvalid = fmt.Errorf("task type is invalid") // ErrTaskDependenciesFailed indicates that the task cannot be run as its dependencies failed ErrTaskDependenciesFailed = fmt.Errorf("task dependencies failed") // ErrNoHandlerForTaskType indicates that a task could not be handled by any known handlers ErrNoHandlerForTaskType = fmt.Errorf("no handler for task type") // ErrDuplicateHandlerForTaskType indicates a task handler for a specific type is already registered ErrDuplicateHandlerForTaskType = fmt.Errorf("duplicate handler for task type") // ErrInvalidHeaders indicates that message headers from JetStream were not valid ErrInvalidHeaders = fmt.Errorf("coult not decode headers") // ErrContextWithoutDeadline indicates a context.Context was passed without deadline when it was expected ErrContextWithoutDeadline = fmt.Errorf("non deadline context given") // ErrInvalidStorageItem indicates a Work Queue item had no JetStream state associated with it ErrInvalidStorageItem = fmt.Errorf("invalid storage item") // ErrNoNatsConn indicates that a nil connection was supplied ErrNoNatsConn = fmt.Errorf("no NATS connection supplied") // ErrNoMux indicates that a processor was started with no routing mux configured ErrNoMux = fmt.Errorf("mux is required") // ErrStorageNotReady indicates the underlying storage is not ready ErrStorageNotReady = fmt.Errorf("storage not ready") // ErrQueueNotFound is the error indicating a queue does not exist rather than a failure to load ErrQueueNotFound = errors.New("queue not found") // ErrQueueConsumerNotFound indicates that the Work Queue store has no consumers defined ErrQueueConsumerNotFound = errors.New("queue consumer not found") // ErrQueueNameRequired indicates a queue has no name ErrQueueNameRequired = fmt.Errorf("queue name is required") // ErrQueueItemCorrupt indicates that an item received from the work queue was invalid - perhaps invalid JSON ErrQueueItemCorrupt = fmt.Errorf("corrupt queue item received") // ErrQueueItemInvalid is an item read from the queue with no data or obviously bad data ErrQueueItemInvalid = fmt.Errorf("invalid queue item received") // ErrInvalidQueueState indicates a queue was attempted to be used but no internal state is known of that queue ErrInvalidQueueState = fmt.Errorf("invalid queue storage state") // ErrDuplicateItem indicates that the Work Queue deduplication protection refused a message ErrDuplicateItem = fmt.Errorf("duplicate work queue item") // ErrExternalCommandNotFound indicates a command for an ExternalProcess handler was not found ErrExternalCommandNotFound = fmt.Errorf("command not found") // ErrExternalCommandFailed indicates a command for an ExternalProcess handler failed ErrExternalCommandFailed = fmt.Errorf("execution failed") // ErrUnknownEventType indicates that while parsing an event an unknown type of event was encountered ErrUnknownEventType = fmt.Errorf("unknown event type") // ErrUnknownRetryPolicy indicates the requested retry policy does not exist ErrUnknownRetryPolicy = fmt.Errorf("unknown retry policy") // ErrUnknownDiscardPolicy indicates a discard policy could not be found matching a name ErrUnknownDiscardPolicy = fmt.Errorf("unknown discard policy") // ErrRequestReplyFailed indicates a callout to a remote handler failed due to a timeout, lack of listeners or network error ErrRequestReplyFailed = fmt.Errorf("request-reply callout failed") // ErrRequestReplyNoDeadline indicates a request-reply handler was called without a deadline ErrRequestReplyNoDeadline = fmt.Errorf("request-reply requires deadline context") // ErrRequestReplyShortDeadline indicates a deadline context has a too short timeout ErrRequestReplyShortDeadline = fmt.Errorf("deadline too short") // ErrScheduleNameIsRequired indicates a schedule name is needed when creating new schedules ErrScheduleNameIsRequired = errors.New("name is required") // ErrScheduleNameInvalid indicates the name given to a task is invalid ErrScheduleNameInvalid = errors.New("name is invalid") // ErrScheduleIsRequired indicates a cron schedule must be supplied when creating new schedules ErrScheduleIsRequired = errors.New("schedule is required") // ErrScheduleInvalid indicates an invalid cron schedule was supplied ErrScheduleInvalid = errors.New("invalid cron schedule") // ErrScheduledTaskAlreadyExist indicates a scheduled task that was being created already existed ErrScheduledTaskAlreadyExist = errors.New("scheduled task already exist") // ErrScheduledTaskNotFound indicates the requested task does not exist ErrScheduledTaskNotFound = errors.New("scheduled task not found") // ErrScheduledTaskInvalid indicates a loaded task was invalid ErrScheduledTaskInvalid = errors.New("invalid scheduled task") // ErrScheduledTaskShortDeadline indicates the time allowed for task execution is too short ErrScheduledTaskShortDeadline = errors.New("deadline too short") )
var ( // RetryLinearTenMinutes is a 50-step policy between 1 and 10 minutes RetryLinearTenMinutes = linearPolicy(50, 0.90, time.Minute, 10*time.Minute) // RetryLinearOneHour is a 50-step policy between 10 minutes and 1 hour RetryLinearOneHour = linearPolicy(20, 0.90, 10*time.Minute, 60*time.Minute) // RetryLinearOneMinute is a 20-step policy between 1 second and 1 minute RetryLinearOneMinute = linearPolicy(20, 0.5, time.Second, time.Minute) // RetryDefault is the default retry policy RetryDefault = RetryLinearTenMinutes )
Functions ¶
func IsRetryPolicyKnown ¶ added in v0.0.5
IsRetryPolicyKnown determines if the named policy exist
func IsValidName ¶ added in v0.0.5
IsValidName is a generic strict name validator for what we want people to put in name - task names etc, things that turn into subjects
func ParseEventJSON ¶ added in v0.0.2
ParseEventJSON parses event bytes returning the parsed Event and its event type
func RequestReplySubjectForTaskType ¶ added in v0.1.0
RequestReplySubjectForTaskType returns the subject a request-reply handler should listen on for a specified task type
func RetryPolicyNames ¶ added in v0.0.5
func RetryPolicyNames() []string
RetryPolicyNames returns a list of pre-generated retry policies
func RetrySleep ¶ added in v0.0.2
func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error
RetrySleep sleeps for the duration for try n or until interrupted by ctx
Types ¶
type BaseEvent ¶ added in v0.0.2
type BaseEvent struct { EventID string `json:"event_id"` EventType string `json:"type"` TimeStamp time.Time `json:"timestamp"` }
BaseEvent is present in all event types and can be used to detect the type
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client connects Task producers and Task handlers to the backend
Example (Consumer) ¶
queue := &Queue{ Name: "P100", MaxRunTime: 60 * time.Minute, MaxConcurrent: 20, MaxTries: 100, } // Uses the NATS CLI context WQ for connection details, will create the queue if // it does not already exist client, err := NewClient(NatsContext("WQ"), WorkQueue(queue), RetryBackoffPolicy(RetryLinearOneHour)) panicIfErr(err) router := NewTaskRouter() err = router.HandleFunc("email:send", func(_ context.Context, _ Logger, t *Task) (any, error) { log.Printf("Processing task: %s", t.ID) // handle task.Payload which is a JSON encoded email // task record will be updated with this payload result return "success", nil }) panicIfErr(err) // Starts handling registered tasks, blocks until canceled err = client.Run(context.Background(), router) panicIfErr(err)
Output:
Example (Producer) ¶
queue := &Queue{ Name: "P100", MaxRunTime: 60 * time.Minute, MaxConcurrent: 20, MaxTries: 100, } email := newEmail("user@example.net", "Test Subject", "Test Body") // Creates a new task that has a deadline for processing 1 hour from now task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour))) panicIfErr(err) // Uses the NATS CLI context WQ for connection details, will create the queue if // it does not already exist client, err := NewClient(NatsContext("WQ"), WorkQueue(queue)) panicIfErr(err) // Adds the task to the queue called P100 err = client.EnqueueTask(context.Background(), task) panicIfErr(err)
Output:
func NewClient ¶
NewClient creates a new client, one of NatsConn() or NatsContext() must be passed, other options are optional.
When no Queue() is supplied a default queue called DEFAULT will be used
func (*Client) EnqueueTask ¶
EnqueueTask adds a task to the named queue which must already exist
func (*Client) LoadScheduledTaskByName ¶ added in v0.0.5
func (c *Client) LoadScheduledTaskByName(name string) (*ScheduledTask, error)
LoadScheduledTaskByName loads a scheduled task by name
func (*Client) LoadTaskByID ¶
LoadTaskByID loads a task from the backend using its ID
Example ¶
client, err := NewClient(NatsContext("WQ")) panicIfErr(err) task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB") panicIfErr(err) fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
Output:
func (*Client) NewScheduledTask ¶ added in v0.0.5
NewScheduledTask creates a new scheduled task, an existing schedule will result in failure
func (*Client) RemoveScheduledTask ¶ added in v0.0.5
RemoveScheduledTask removes a scheduled task
func (*Client) RetryTaskByID ¶ added in v0.0.2
RetryTaskByID will retry a task, first removing an entry from the Work Queue if already there
func (*Client) ScheduledTasksStorage ¶ added in v0.0.5
func (c *Client) ScheduledTasksStorage() ScheduledTaskStorage
ScheduledTasksStorage gives access to administrative functions for task maintenance
func (*Client) StorageAdmin ¶
func (c *Client) StorageAdmin() StorageAdmin
StorageAdmin access admin features of the storage backend
type ClientOpt ¶
type ClientOpt func(opts *ClientOpts) error
ClientOpt configures the client
func BindWorkQueue ¶
BindWorkQueue binds the client to a work queue that should already exist
func ClientConcurrency ¶
ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling. This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor would be below that. This allows for horizontal and vertical scaling but without unbounded growth - the queue MaxConcurrent is the absolute upper limit for in-flight jobs for 1 specific queue.
func CustomLogger ¶
CustomLogger sets a custom logger to use for all logging
func DiscardTaskStates ¶ added in v0.0.2
DiscardTaskStates configures the client to discard Tasks that reach a final state in the list of supplied TaskState
func DiscardTaskStatesByName ¶ added in v0.1.0
DiscardTaskStatesByName configures the client to discard Tasks that reach a final state in the list of supplied TaskState
func MemoryStorage ¶
func MemoryStorage() ClientOpt
MemoryStorage enables storing tasks and work queue in memory in JetStream
func NatsConn ¶
func NatsConn(nc *nats.Conn) ClientOpt
NatsConn sets an already connected NATS connection as communications channel
func NatsContext ¶
NatsContext attempts to connect to the NATS client context c
func NoStorageInit ¶ added in v0.0.2
func NoStorageInit() ClientOpt
NoStorageInit skips setting up any queues or task stores when creating a client
func PrometheusListenPort ¶
PrometheusListenPort enables prometheus listening on a specific port
func RetryBackoffPolicy ¶
func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt
RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes
func RetryBackoffPolicyName ¶ added in v0.0.5
RetryBackoffPolicyName uses the policy named to schedule job retries by using RetryPolicyLookup(name)
func StoreReplicas ¶
StoreReplicas sets the replica level to keep for the tasks store and work queue
Used only when initially creating the underlying streams.
func TaskRetention ¶
TaskRetention is the time tasks will be kept for in the task storage
Used only when initially creating the underlying streams.
type ClientOpts ¶
type ClientOpts struct {
// contains filtered or unexported fields
}
ClientOpts configures the client
type HandlerFunc ¶
HandlerFunc handles a single task, the response bytes will be stored in the original task
type ItemKind ¶
type ItemKind int
ItemKind indicates the kind of job a work queue entry represents
var ( // TaskItem is a task as defined by Task TaskItem ItemKind = 0 )
type LeaderElectedEvent ¶ added in v0.0.5
type LeaderElectedEvent struct { BaseEvent // Name of the process that gained leadership Name string `json:"name"` // Component is the component that is reporting Component string `json:"component"` }
LeaderElectedEvent notifies that a leader election was won
func NewLeaderElectedEvent ¶ added in v0.0.5
func NewLeaderElectedEvent(name string, component string) (*LeaderElectedEvent, error)
NewLeaderElectedEvent creates a new event notifying of a leader election win
type Logger ¶
type Logger interface { Debugf(format string, v ...any) Infof(format string, v ...any) Warnf(format string, v ...any) Errorf(format string, v ...any) }
Logger is a pluggable logger interface
type Mux ¶
type Mux struct {
// contains filtered or unexported fields
}
Mux routes messages
Note: this will change to be nearer to a server mux and include support for middleware
func (*Mux) ExternalProcess ¶ added in v0.0.7
ExternalProcess sets up a delegated handler that calls an external command to handle the task.
The task will be passed in JSON format on STDIN, any STDOUT/STDERR output will become the task result. Any non 0 exit code will be treated as a task failure.
func (*Mux) HandleFunc ¶
func (m *Mux) HandleFunc(taskType string, h HandlerFunc) error
HandleFunc registers a task for a taskType. The taskType must match exactly with the matching tasks
func (*Mux) Handler ¶
func (m *Mux) Handler(t *Task) HandlerFunc
Handler looks up the handler function for a task
type ProcessItem ¶
type ProcessItem struct { Kind ItemKind `json:"kind"` JobID string `json:"job"` // contains filtered or unexported fields }
ProcessItem is an individual item stored in the work queue
type Queue ¶
type Queue struct { // Name is a unique name for the work queue, should be in the character range a-zA-Z0-9 Name string `json:"name"` // MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire MaxAge time.Duration `json:"max_age"` // MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied. MaxEntries int `json:"max_entries"` // DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected DiscardOld bool `json:"discard_old"` // MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries MaxTries int `json:"max_tries"` // MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime MaxRunTime time.Duration `json:"max_runtime"` // MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent MaxConcurrent int `json:"max_concurrent"` // NoCreate will not try to create a queue, will bind to an existing one or fail NoCreate bool // contains filtered or unexported fields }
Queue represents a work queue
type QueueInfo ¶
type QueueInfo struct { // Name is the name of the queue Name string `json:"name"` // Time is the information was gathered Time time.Time `json:"time"` // Stream is the active JetStream Stream Information Stream *api.StreamInfo `json:"stream_info"` // Consumer is the worker stream information Consumer *api.ConsumerInfo `json:"consumer_info"` }
QueueInfo holds information about a queue state
type RetryPolicy ¶
type RetryPolicy struct { // Intervals is a range of time periods backoff will be based off Intervals []time.Duration // Jitter is a factor applied to the specific interval avoid repeating same backoff periods Jitter float64 }
RetryPolicy defines a period that failed jobs will be retried against
type RetryPolicyProvider ¶ added in v0.0.2
RetryPolicyProvider is the interface that the ReplyPolicy implements, use this to implement your own exponential backoff system or similar for task retries.
func RetryPolicyLookup ¶ added in v0.0.5
func RetryPolicyLookup(name string) (RetryPolicyProvider, error)
RetryPolicyLookup loads a policy by name
type ScheduleWatchEntry ¶ added in v0.0.5
type ScheduleWatchEntry struct { Name string Task *ScheduledTask Delete bool }
type ScheduledTask ¶ added in v0.0.5
type ScheduledTask struct { // Name is a unique name for the scheduled task Name string `json:"name"` // Schedule is a cron specification for the schedule Schedule string `json:"schedule"` // Queue is the name of a queue to enqueue the task into Queue string `json:"queue"` // TaskType is the type of task to create TaskType string `json:"task_type"` // Payload is the task payload for the enqueued tasks Payload []byte `json:"payload"` // Deadline is the time after scheduling that the deadline would be Deadline time.Duration `json:"deadline,omitempty"` // MaxTries is how many times the created task could be tried MaxTries int `json:"max_tries"` // CreatedAt is when the schedule was created CreatedAt time.Time `json:"created_at"` }
ScheduledTask represents a cron like schedule and task properties that will result in regular new tasks to be created machine schedule
type ScheduledTaskStorage ¶ added in v0.0.5
type ScheduledTaskStorage interface { SaveScheduledTask(st *ScheduledTask, update bool) error LoadScheduledTaskByName(name string) (*ScheduledTask, error) DeleteScheduledTaskByName(name string) error ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error) ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error) EnqueueTask(ctx context.Context, queue *Queue, task *Task) error ElectionStorage() (nats.KeyValue, error) PublishLeaderElectedEvent(ctx context.Context, name string, component string) error }
type Storage ¶
type Storage interface { SaveTaskState(ctx context.Context, task *Task, notify bool) error EnqueueTask(ctx context.Context, queue *Queue, task *Task) error RetryTaskByID(ctx context.Context, queue *Queue, id string) error LoadTaskByID(id string) (*Task, error) DeleteTaskByID(id string) error PublishTaskStateChangeEvent(ctx context.Context, task *Task) error AckItem(ctx context.Context, item *ProcessItem) error NakBlockedItem(ctx context.Context, item *ProcessItem) error NakItem(ctx context.Context, item *ProcessItem) error TerminateItem(ctx context.Context, item *ProcessItem) error PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error) PrepareQueue(q *Queue, replicas int, memory bool) error PrepareTasks(memory bool, replicas int, retention time.Duration) error PrepareConfigurationStore(memory bool, replicas int) error SaveScheduledTask(st *ScheduledTask, update bool) error LoadScheduledTaskByName(name string) (*ScheduledTask, error) DeleteScheduledTaskByName(name string) error ScheduledTasks(ctx context.Context) ([]*ScheduledTask, error) ScheduledTasksWatch(ctx context.Context) (chan *ScheduleWatchEntry, error) }
Storage implements the backend access
type StorageAdmin ¶
type StorageAdmin interface { Queues() ([]*QueueInfo, error) QueueNames() ([]string, error) QueueInfo(name string) (*QueueInfo, error) PurgeQueue(name string) error DeleteQueue(name string) error PrepareQueue(q *Queue, replicas int, memory bool) error ConfigurationInfo() (*nats.KeyValueBucketStatus, error) PrepareConfigurationStore(memory bool, replicas int) error PrepareTasks(memory bool, replicas int, retention time.Duration) error DeleteTaskByID(id string) error TasksInfo() (*TasksInfo, error) Tasks(ctx context.Context, limit int32) (chan *Task, error) TasksStore() (*jsm.Manager, *jsm.Stream, error) ElectionStorage() (nats.KeyValue, error) }
StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream but that's ok, we're not really intending to change the storage or support more
type Task ¶
type Task struct { // ID is a k-sortable unique ID for the task ID string `json:"id"` // Type is a free form string that can later be used as a routing key to send tasks to handlers Type string `json:"type"` // Queue is the name of the queue the task was enqueued with, set only during the enqueue operation else empty Queue string `json:"queue"` // Dependencies are IDs of tasks that should complete before this one becomes unblocked Dependencies []string `json:"dependencies,omitempty"` // DependentResults are results for dependent tasks DependencyResults map[string]*TaskResult `json:"dependency_results,omitempty"` // LoadDependencies indicates if this task should load dependency results before execting LoadDependencies bool `json:"load_dependencies,omitempty"` // Payload is a JSON representation of the associated work Payload []byte `json:"payload"` // Deadline is a cut-off time for the job to complete, should a job be scheduled after this time it will fail. // In-Flight jobs are allowed to continue past this time. Only starting handlers are impacted by this deadline. Deadline *time.Time `json:"deadline,omitempty"` // MaxTries sets a per task maximum try limit. If this task is in a queue that allow fewer tries the queue max tries // will override this setting. A task may not exceed the work queue max tries MaxTries int `json:"max_tries"` // Result is the outcome of the job, only set for successful jobs Result *TaskResult `json:"result,omitempty"` // State is the most recent recorded state the job is in State TaskState `json:"state"` // CreatedAt is the time the job was created in UTC timezone CreatedAt time.Time `json:"created"` // LastTriedAt is a time stamp for when the job was last handed to a handler LastTriedAt *time.Time `json:"tried,omitempty"` // Tries is how many times the job was handled Tries int `json:"tries"` // LastErr is the most recent handling error if any LastErr string `json:"last_err,omitempty"` // contains filtered or unexported fields }
Task represents a job item that handlers will execute
func NewTask ¶
NewTask creates a new task of taskType that can later be used to route tasks to handlers. The task will carry a JSON encoded representation of payload.
Example (With_deadline) ¶
email := newEmail("user@example.net", "Test Subject", "Test Body") // Creates a new task that has a deadline for processing 1 hour from now task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour))) if err != nil { panic(fmt.Sprintf("Could not create task: %v", err)) } fmt.Printf("Task ID: %s\n", task.ID)
Output:
func (*Task) HasDependencies ¶ added in v0.1.0
HasDependencies determines if the task has any dependencies
func (*Task) IsPastDeadline ¶ added in v0.0.2
IsPastDeadline determines if the task is past it's deadline
type TaskOpt ¶
TaskOpt configures Tasks made using NewTask()
func TaskDeadline ¶
TaskDeadline sets an absolute time after which the task should not be handled
func TaskDependsOn ¶ added in v0.1.0
TaskDependsOn are Tasks that this task is dependent on, can be called multiple times
func TaskDependsOnIDs ¶ added in v0.1.0
TaskDependsOnIDs are IDs that this task is dependent on, can be called multiple times
func TaskMaxTries ¶ added in v0.0.5
TaskMaxTries sets a maximum to the amount of processing attempts a task will have, the queue max tries will override this
func TaskRequiresDependencyResults ¶ added in v0.1.0
func TaskRequiresDependencyResults() TaskOpt
TaskRequiresDependencyResults indicates that if a task has any dependencies their results should be loaded before execution
type TaskResult ¶
TaskResult is the result of task execution, this will only be set for successfully processed jobs
type TaskScheduler ¶ added in v0.0.5
type TaskScheduler struct {
// contains filtered or unexported fields
}
func NewTaskScheduler ¶ added in v0.0.5
func NewTaskScheduler(name string, c *Client) (*TaskScheduler, error)
NewTaskScheduler creates a new Task Scheduler service
func (*TaskScheduler) Count ¶ added in v0.0.5
func (s *TaskScheduler) Count() int
Count reports how many schedules are managed by this Scheduler
func (*TaskScheduler) Stop ¶ added in v0.0.5
func (s *TaskScheduler) Stop()
Stop stops the scheduler service
type TaskState ¶
type TaskState string
TaskState indicates the current state a task is in
const ( // TaskStateUnknown is for tasks that do not have a state set TaskStateUnknown TaskState = "" // TaskStateNew newly created tasks that have not been handled yet TaskStateNew TaskState = "new" // TaskStateActive tasks that are currently being handled TaskStateActive TaskState = "active" // TaskStateRetry tasks that previously failed and are waiting retry TaskStateRetry TaskState = "retry" // TaskStateExpired tasks that reached their deadline or maximum tries TaskStateExpired TaskState = "expired" // TaskStateTerminated indicates that the task was terminated via the ErrTerminateTask error TaskStateTerminated TaskState = "terminated" // TaskStateCompleted tasks that are completed TaskStateCompleted TaskState = "complete" // TaskStateQueueError tasks that could not have their associated Work Queue item created TaskStateQueueError TaskState = "queue_error" // TaskStateBlocked tasks that are waiting on dependencies TaskStateBlocked TaskState = "blocked" // TaskStateUnreachable tasks that could not be run due to dependency problems TaskStateUnreachable TaskState = "unreachable" )
type TaskStateChangeEvent ¶ added in v0.0.2
type TaskStateChangeEvent struct { BaseEvent // TaskID is the ID of the task, use with LoadTaskByID() to access the task TaskID string `json:"task_id"` // State is the new state of the Task State TaskState `json:"state"` // Tries is how many times the Task has been processed Tries int `json:"tries"` // Queue is the queue the task is in, can be empty Queue string `json:"queue,omitempty"` // TaskType is the task routing type TaskType string `json:"task_type"` // LstErr is the error that caused a task to change state for error state changes LastErr string `json:"last_error,omitempty"` // Age is the time since the task was created in milliseconds Age time.Duration `json:"task_age,omitempty"` }
TaskStateChangeEvent notifies that a significant change occurred in a Task
func NewTaskStateChangeEvent ¶ added in v0.0.2
func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error)
NewTaskStateChangeEvent creates a new event notifying of a change in task state