asyncjobs

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2022 License: Apache-2.0 Imports: 22 Imported by: 1

README

Choria Asynchronous Jos

Overview

This is an Asynchronous Job Queue system that relies on NATS JetStream for storage and general job life cycle management. It is compatible with any NATS JetStream based system like a private hosted JetStream, Choria Streams or a commercial SaaS.

Each Task is stored in JetStream by a unique ID and Work Queue item is made referencing that Task. JetStream will handle dealing with scheduling, retries, acknowledgements and more of the Work Queue item. The stored Task will be updated during the lifecycle.

Multiple processes can process jobs concurrently, thus job processing is both horizontally and vertically scalable. Job handlers are implemented in Go with one process hosting one or many handlers. Per process concurrency and overall per-queue concurrency controls exist.

This package heavily inspired by hibiken/asynq.

Go Reference Go Report Card CodeQL Unit Tests

Status

This is a brand-new project, under heavy development. Interfaces might change, Structures might change, features might be removed if it's found to be a bad fit for the underlying storage.

Use with care.

Requirements

NATS 2.7.2 with JetStream enabled.

Features

This feature list is incomplete, at present the focus is on determining what will work well for the particular patterns JetStream enables, so there might be some churn in the feature set here.

Tasks

  • Task definitions stored post-processing, with various retention and discard policies
  • Ability to retry a Task that has already been completed or failed
  • Task deduplication
  • Deadline per task - after this time the task will not be processed
  • Max tries per task, capped to the queue tries
  • Task state tracked throughout it's lifecycle
  • K-Sortable Task GUIDs

See Task Lifecycle for full background and details

Queues

  • Queues can store different types of task
  • Queues with caps on queued items and different queue-full behaviors
  • Default or user supplied queue definitions
  • Queue per client, many clients per queue

Processing

  • Retries of failed tasks with backoff schedules configurable using RetryBackoffPolicy(). Handler opt-in early termination.
  • Parallel processing of tasks, horizontally or vertically scaled. Run time adjustable upper boundary on a per-queue basis
  • Worker crashes does not impact the work queue
  • Handler interface with task router to select appropriate handler by task type with wildcard matches
  • Statistics via Prometheus

Storage

  • Replicated storage using RAFT protocol within JetStream Streams, disk based or memory based

Misc

  • Supports NATS Contexts for connection configuration
  • Supports custom loggers, defaulting to go internal log

Command Line

  • Various info and state requests
  • Configure aspects of Task and Queue storage
  • Watch task processing
  • Process tasks via shell commands
  • CRUD on Tasks store or individual Task

Planned Features

A number of features are planned in the near term, see our GitHub Issues

Documentation

Index

Examples

Constants

View Source
const (
	// DefaultJobRunTime when not configured for a queue this is the default run-time handlers will get
	DefaultJobRunTime = time.Hour
	// DefaultMaxTries when not configured for a queue this is the default tries it will get
	DefaultMaxTries = 10
	// DefaultQueueMaxConcurrent when not configured for a queue this is the default concurrency setting
	DefaultQueueMaxConcurrent = 100
)
View Source
const (
	// TasksStreamName is the name of the JetStream Stream storing tasks
	TasksStreamName = "CHORIA_AJ_TASKS"
	// TasksStreamSubjects is a NATS wildcard matching all tasks
	TasksStreamSubjects = "CHORIA_AJ.T.*"
	// TasksStreamSubjectPattern is the printf pattern that can be used to find an individual task by its task ID
	TasksStreamSubjectPattern = "CHORIA_AJ.T.%s"

	// EventsSubjectWildcard is the NATS wildcard for receiving all events
	EventsSubjectWildcard = "CHORIA_AJ.E.>"
	// TaskStateChangeEventSubjectPattern is a printf pattern for determining the event publish subject
	TaskStateChangeEventSubjectPattern = "CHORIA_AJ.E.task_state.%s"
	// TaskStateChangeEventSubjectWildcard is a NATS wildcard for receiving all TaskStateChangeEvent messages
	TaskStateChangeEventSubjectWildcard = "CHORIA_AJ.E.task_state.*"

	// WorkStreamNamePattern is the printf pattern for determining JetStream Stream names per queue
	WorkStreamNamePattern = "CHORIA_AJ_Q_%s"
	// WorkStreamSubjectPattern is the printf pattern individual items are placed in, placeholders for JobID and JobType
	WorkStreamSubjectPattern = "CHORIA_AJ.Q.%s.%s"
	// WorkStreamSubjectWildcard is a NATS filter matching all enqueued items for any task store
	WorkStreamSubjectWildcard = "CHORIA_AJ.Q.>"
	// WorkStreamNamePrefix is the prefix that, when removed, reveals the queue name
	WorkStreamNamePrefix = "CHORIA_AJ_Q_"
)
View Source
const (
	// TaskStateChangeEventType is the event type for TaskStateChangeEvent types
	TaskStateChangeEventType = "io.choria.asyncjobs.v1.task_state"
)

Variables

View Source
var (
	// ErrTaskNotFound is the error indicating a task does not exist rather than a failure to load
	ErrTaskNotFound = errors.New("task not found")
	// ErrQueueNotFound is the error indicating a queue does not exist rather than a failure to load
	ErrQueueNotFound = errors.New("queue not found")
	// ErrTerminateTask indicates that a task failed, and no further processing attempts should be made
	ErrTerminateTask = fmt.Errorf("terminate task")
)
View Source
var (
	// RetryLinearTenMinutes is a 20-step policy between 1 and 10 minutes
	RetryLinearTenMinutes = linearPolicy(20, 0.90, time.Minute, 10*time.Minute)

	// RetryLinearOneHour is a 50-step policy between 10 minutes and 1 hour
	RetryLinearOneHour = linearPolicy(20, 0.90, 10*time.Minute, 60*time.Minute)

	// RetryLinearOneMinute is a 20-step policy between 1 second and 1 minute
	RetryLinearOneMinute = linearPolicy(20, 0.5, time.Second, time.Minute)

	// RetryDefault is the default retry policy
	RetryDefault = RetryLinearTenMinutes
)

Functions

func ParseEventJSON added in v0.0.2

func ParseEventJSON(event []byte) (interface{}, string, error)

func RetrySleep added in v0.0.2

func RetrySleep(ctx context.Context, p RetryPolicyProvider, n int) error

RetrySleep sleeps for the duration for try n or until interrupted by ctx

Types

type BaseEvent added in v0.0.2

type BaseEvent struct {
	EventID   string `json:"event_id"`
	EventType string `json:"type"`
	TimeStamp int64  `json:"timestamp"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects Task producers and Task handlers to the backend

Example (Consumer)
queue := &Queue{
	Name:          "P100",
	MaxRunTime:    60 * time.Minute,
	MaxConcurrent: 20,
	MaxTries:      100,
}

// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue), RetryBackoffPolicy(RetryLinearOneHour))
panicIfErr(err)

router := NewTaskRouter()
err = router.HandleFunc("email:send", func(_ context.Context, t *Task) (interface{}, error) {
	log.Printf("Processing task: %s", t.ID)

	// handle task.Payload which is a JSON encoded email

	// task record will be updated with this payload result
	return "success", nil
})
panicIfErr(err)

// Starts handling registered tasks, blocks until canceled
err = client.Run(context.Background(), router)
panicIfErr(err)
Output:

Example (Producer)
queue := &Queue{
	Name:          "P100",
	MaxRunTime:    60 * time.Minute,
	MaxConcurrent: 20,
	MaxTries:      100,
}

email := newEmail("user@example.net", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
panicIfErr(err)

// Uses the NATS CLI context WQ for connection details, will create the queue if
// it does not already exist
client, err := NewClient(NatsContext("WQ"), WorkQueue(queue))
panicIfErr(err)

// Adds the task to the queue called P100
err = client.EnqueueTask(context.Background(), task)
panicIfErr(err)
Output:

func NewClient

func NewClient(opts ...ClientOpt) (*Client, error)

NewClient creates a new client, one of NatsConn() or NatsContext() must be passed, other options are optional.

When no Queue() is supplied a default queue called DEFAULT will be used

func (*Client) EnqueueTask

func (c *Client) EnqueueTask(ctx context.Context, task *Task) error

EnqueueTask adds a task to the named queue which must already exist

func (*Client) LoadTaskByID

func (c *Client) LoadTaskByID(id string) (*Task, error)

LoadTaskByID loads a task from the backend using its ID

Example
client, err := NewClient(NatsContext("WQ"))
panicIfErr(err)

task, err := client.LoadTaskByID("24ErgVol4ZjpoQ8FAima9R2jEHB")
panicIfErr(err)

fmt.Printf("Loaded task %s in state %s", task.ID, task.State)
Output:

func (*Client) RetryTaskByID added in v0.0.2

func (c *Client) RetryTaskByID(ctx context.Context, id string) error

RetryTaskByID will retry a task, first removing an entry from the Work Queue if already there

func (*Client) Run

func (c *Client) Run(ctx context.Context, router *Mux) error

Run starts processing messages using the router until error or interruption

func (*Client) StorageAdmin

func (c *Client) StorageAdmin() StorageAdmin

StorageAdmin access admin features of the storage backend

type ClientOpt

type ClientOpt func(opts *ClientOpts) error

ClientOpt configures the client

func BindWorkQueue

func BindWorkQueue(queue string) ClientOpt

BindWorkQueue binds the client to a work queue that should already exist

func ClientConcurrency

func ClientConcurrency(c int) ClientOpt

ClientConcurrency sets the concurrency to use when executing tasks within this client for horizontal scaling. This is capped by the per-queue maximum concurrency set using the queue setting MaxConcurrent. Generally a queue would have a larger concurrency like 100 (DefaultQueueMaxConcurrent) and an individual task processor would be below that. This allows for horizontal and vertical scaling but without unbounded growth - the queue MaxConcurrent is the absolute upper limit for in-flight jobs for 1 specific queue.

func CustomLogger

func CustomLogger(log Logger) ClientOpt

CustomLogger sets a custom logger to use for all logging

func DiscardTaskStates added in v0.0.2

func DiscardTaskStates(states ...TaskState) ClientOpt

DiscardTaskStates configures the client to discard Tasks that reach a final state in the list of supplied TaskState

func MemoryStorage

func MemoryStorage() ClientOpt

MemoryStorage enables storing tasks and work queue in memory in JetStream

func NatsConn

func NatsConn(nc *nats.Conn) ClientOpt

NatsConn sets an already connected NATS connection as communications channel

func NatsContext

func NatsContext(c string, opts ...nats.Option) ClientOpt

NatsContext attempts to connect to the NATS client context c

func NoStorageInit added in v0.0.2

func NoStorageInit() ClientOpt

NoStorageInit skips setting up any queues or task stores when creating a client

func PrometheusListenPort

func PrometheusListenPort(port int) ClientOpt

PrometheusListenPort enables prometheus listening on a specific port

func RetryBackoffPolicy

func RetryBackoffPolicy(p RetryPolicyProvider) ClientOpt

RetryBackoffPolicy uses p to schedule job retries, defaults to a linear curve backoff with jitter between 1 and 10 minutes

func StoreReplicas

func StoreReplicas(r uint) ClientOpt

StoreReplicas sets the replica level to keep for the tasks store and work queue

Used only when initially creating the underlying streams.

func TaskRetention

func TaskRetention(r time.Duration) ClientOpt

TaskRetention is the time tasks will be kept with.

Used only when initially creating the underlying streams.

func WorkQueue

func WorkQueue(queue *Queue) ClientOpt

WorkQueue configures the client to consume messages from a specific queue

When not set the "DEFAULT" queue will be used.

type ClientOpts

type ClientOpts struct {
	// contains filtered or unexported fields
}

ClientOpts configures the client

type Handler

type Handler interface {
	// ProcessTask processes a single task, the response bytes will be stored in the original task
	ProcessTask(ctx context.Context, t *Task) (interface{}, error)
}

Handler handles tasks

type HandlerFunc

type HandlerFunc func(ctx context.Context, t *Task) (interface{}, error)

HandlerFunc handles a single task, the response bytes will be stored in the original task

type ItemKind

type ItemKind int

ItemKind indicates the kind of job a work queue entry represents

var (
	// TaskItem is a task as defined by Task
	TaskItem ItemKind = 0
)

type Logger

type Logger interface {
	Debugf(format string, v ...interface{})
	Infof(format string, v ...interface{})
	Warnf(format string, v ...interface{})
	Errorf(format string, v ...interface{})
}

Logger is a pluggable logger interface

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux routes messages

Note: this will change to be nearer to a server mux and include support for middleware

func NewTaskRouter

func NewTaskRouter() *Mux

NewTaskRouter creates a new Mux

func (*Mux) HandleFunc

func (m *Mux) HandleFunc(taskType string, h HandlerFunc) error

HandleFunc registers a task for a taskType. The taskType must match exactly with the matching tasks

func (*Mux) Handler

func (m *Mux) Handler(t *Task) HandlerFunc

Handler looks up the handler function for a task

type ProcessItem

type ProcessItem struct {
	Kind  ItemKind `json:"kind"`
	JobID string   `json:"job"`
	// contains filtered or unexported fields
}

ProcessItem is an individual item stored in the work queue

type Queue

type Queue struct {
	// Name is a unique name for the work queue, should be in the character range a-zA-Z0-9
	Name string `json:"name"`
	// MaxAge is the absolute longest time an entry can stay in the queue. When not set items will not expire
	MaxAge time.Duration `json:"max_age"`
	// MaxEntries represents the maximum amount of entries that can be in the queue. When it's full new entries will be rejected. When unset no limit is applied.
	MaxEntries int `json:"max_entries"`
	// DiscardOld indicates that when MaxEntries are reached old entries will be discarded rather than new ones rejected
	DiscardOld bool `json:"discard_old"`
	// MaxTries is the maximum amount of times a entry can be tried, entries will be tried every MaxRunTime with some jitter applied. Default to DefaultMaxTries
	MaxTries int `json:"max_tries"`
	// MaxRunTime is the maximum time a task can be processed. Defaults to DefaultJobRunTime
	MaxRunTime time.Duration `json:"max_runtime"`
	// MaxConcurrent is the total number of in-flight tasks across all active task handlers combined. Defaults to DefaultQueueMaxConcurrent
	MaxConcurrent int `json:"max_concurrent"`
	// NoCreate will not try to create a queue, will bind to an existing one or fail
	NoCreate bool
	// contains filtered or unexported fields
}

Queue represents a work queue

type QueueInfo

type QueueInfo struct {
	// Name is the name of the queue
	Name string `json:"name"`
	// Time is the information was gathered
	Time time.Time `json:"time"`
	// Stream is the active JetStream Stream Information
	Stream *api.StreamInfo `json:"stream_info"`
	// Consumer is the worker stream information
	Consumer *api.ConsumerInfo `json:"consumer_info"`
}

QueueInfo holds information about a queue state

type RetryPolicy

type RetryPolicy struct {
	// Intervals is a range of time periods backoff will be based off
	Intervals []time.Duration
	// Jitter is a factor applied to the specific interval avoid repeating same backoff periods
	Jitter float64
}

RetryPolicy defines a period that failed jobs will be retried against

func (RetryPolicy) Duration

func (p RetryPolicy) Duration(n int) time.Duration

Duration is the period to sleep for try n, it includes a jitter

type RetryPolicyProvider added in v0.0.2

type RetryPolicyProvider interface {
	Duration(n int) time.Duration
}

RetryPolicyProvider is the interface that the ReplyPolicy implements, use this to implement your own exponential backoff system or similar for task retries.

type Storage

type Storage interface {
	SaveTaskState(ctx context.Context, task *Task, notify bool) error
	EnqueueTask(ctx context.Context, queue *Queue, task *Task) error
	RetryTaskByID(ctx context.Context, queue *Queue, id string) error
	LoadTaskByID(id string) (*Task, error)
	PublishTaskStateChangeEvent(ctx context.Context, task *Task) error
	AckItem(ctx context.Context, item *ProcessItem) error
	NakItem(ctx context.Context, item *ProcessItem) error
	TerminateItem(ctx context.Context, item *ProcessItem) error
	PollQueue(ctx context.Context, q *Queue) (*ProcessItem, error)
	PrepareQueue(q *Queue, replicas int, memory bool) error
	PrepareTasks(memory bool, replicas int, retention time.Duration) error
}

Storage implements the backend access

type StorageAdmin

type StorageAdmin interface {
	Queues() ([]*QueueInfo, error)
	QueueNames() ([]string, error)
	QueueInfo(name string) (*QueueInfo, error)
	PurgeQueue(name string) error
	DeleteQueue(name string) error
	PrepareQueue(q *Queue, replicas int, memory bool) error
	PrepareTasks(memory bool, replicas int, retention time.Duration) error
	TasksInfo() (*TasksInfo, error)
	LoadTaskByID(id string) (*Task, error)
	DeleteTaskByID(id string) error
	Tasks(ctx context.Context, limit int32) (chan *Task, error)
	TasksStore() (*jsm.Manager, *jsm.Stream, error)
}

StorageAdmin is helpers to support the CLI mainly, this leaks a bunch of details about JetStream but that's ok, we're not really intending to change the storage or support more

type Task

type Task struct {
	// ID is a k-sortable unique ID for the task
	ID string `json:"id"`
	// Type is a free form string that can later be used as a routing key to send tasks to handlers
	Type string `json:"type"`
	// Queue is the name of the queue the task was enqueued with, set only during the enqueue operation else empty
	Queue string `json:"queue"`
	// Payload is a JSON representation of the associated work
	Payload []byte `json:"payload"`
	// Deadline is a cut-off time for the job to complete, should a job be scheduled after this time it will fail.
	// In-Flight jobs are allowed to continue past this time. Only starting handlers are impacted by this deadline.
	Deadline *time.Time `json:"deadline,omitempty"`
	// Result is the outcome of the job, only set for successful jobs
	Result *TaskResult `json:"result,omitempty"`
	// State is the most recent recorded state the job is in
	State TaskState `json:"state"`
	// CreatedAt is the time the job was created in UTC timezone
	CreatedAt time.Time `json:"created"`
	// LastTriedAt is a time stamp for when the job was last handed to a handler
	LastTriedAt *time.Time `json:"tried,omitempty"`
	// Tries is how many times the job was handled
	Tries int `json:"tries"`
	// LastErr is the most recent handling error if any
	LastErr string `json:"last_err,omitempty"`
	// contains filtered or unexported fields
}

Task represents a job item that handlers will execute

func NewTask

func NewTask(taskType string, payload interface{}, opts ...TaskOpt) (*Task, error)

NewTask creates a new task of taskType that can later be used to route tasks to handlers. The task will carry a JSON encoded representation of payload.

Example (With_deadline)
email := newEmail("user@example.net", "Test Subject", "Test Body")

// Creates a new task that has a deadline for processing 1 hour from now
task, err := NewTask("email:send", email, TaskDeadline(time.Now().Add(time.Hour)))
if err != nil {
	panic(fmt.Sprintf("Could not create task: %v", err))
}

fmt.Printf("Task ID: %s\n", task.ID)
Output:

func (*Task) IsPastDeadline added in v0.0.2

func (t *Task) IsPastDeadline() bool

IsPastDeadline determines if the task is past it's deadline

type TaskOpt

type TaskOpt func(*Task) error

TaskOpt configures Tasks made using NewTask()

func TaskDeadline

func TaskDeadline(deadline time.Time) TaskOpt

TaskDeadline sets an absolute time after which the task should not be handled

type TaskResult

type TaskResult struct {
	Payload     interface{} `json:"payload"`
	CompletedAt time.Time   `json:"completed"`
}

TaskResult is the result of task execution, this will only be set for successfully processed jobs

type TaskState

type TaskState string

TaskState indicates the current state a task is in

const (
	// TaskStateUnknown is for tasks that do not have a state set
	TaskStateUnknown TaskState = ""
	// TaskStateNew newly created tasks that have not been handled yet
	TaskStateNew TaskState = "new"
	// TaskStateActive tasks that are currently being handled
	TaskStateActive TaskState = "active"
	// TaskStateRetry tasks that previously failed and are waiting retry
	TaskStateRetry TaskState = "retry"
	// TaskStateExpired tasks that reached their deadline
	TaskStateExpired TaskState = "expired"
	// TaskStateTerminated indicates that the task was terminated via the ErrTerminateTask error
	TaskStateTerminated TaskState = "terminated"
	// TaskStateCompleted tasks that are completed
	TaskStateCompleted TaskState = "complete"
	// TaskStateQueueError tasks that could not have their associated Work Queue item created
	TaskStateQueueError TaskState = "queue_error"
)

type TaskStateChangeEvent added in v0.0.2

type TaskStateChangeEvent struct {
	TaskID   string        `json:"task_id"`
	State    TaskState     `json:"state"`
	Tries    int           `json:"tries"`
	Queue    string        `json:"queue"`
	TaskType string        `json:"task_type"`
	LastErr  string        `json:"last_error,omitempty"`
	Age      time.Duration `json:"task_age,omitempty"`

	BaseEvent
}

func NewTaskStateChangeEvent added in v0.0.2

func NewTaskStateChangeEvent(t *Task) (*TaskStateChangeEvent, error)

NewTaskStateChangeEvent creates a new event notifying of a change in task state

type TasksInfo

type TasksInfo struct {
	// Time is the information was gathered
	Time time.Time `json:"time"`
	// Stream is the active JetStream Stream Information
	Stream *api.StreamInfo `json:"stream_info"`
}

TasksInfo is state about the tasks store

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL