Documentation ¶
Index ¶
- Constants
- type Broker
- type Chain
- type ChainMessage
- type ChainMeta
- type ChainOpts
- type Correlation
- type Field
- type Filter
- type Group
- type GroupMessage
- type GroupMeta
- type GroupOpts
- type Job
- type JobCtx
- type JobMessage
- type JobOpts
- type Kind
- type Meta
- type Op
- type Opts
- type QueryResult
- type Results
- type Server
- func (s *Server) Enqueue(ctx context.Context, t Job) (string, error)
- func (s *Server) EnqueueChain(ctx context.Context, c Chain) (string, error)
- func (s *Server) EnqueueGroup(ctx context.Context, t Group) (string, error)
- func (s *Server) GetChain(ctx context.Context, uuid string) (ChainMessage, error)
- func (s *Server) GetGroup(ctx context.Context, uuid string) (GroupMessage, error)
- func (s *Server) GetJob(ctx context.Context, uuid string) (JobMessage, error)
- func (s *Server) QueryChains(ctx context.Context, token string, filters ...*Filter) ([]*ChainMessage, string, error)
- func (s *Server) QueryGroups(ctx context.Context, token string, filters ...*Filter) ([]*GroupMessage, string, error)
- func (s *Server) QueryJobs(ctx context.Context, token string, filters ...*Filter) ([]*JobMessage, string, error)
- func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts)
- func (s *Server) Start(ctx context.Context)
- type ServerOpts
- type Task
- type TaskOpts
Constants ¶
const ( // This is the initial state when a job is pushed onto the broker. StatusStarted = "queued" // This is the state when a worker has recieved a job. StatusProcessing = "processing" // The state when a job completes, but returns an error (and all retries are over). StatusFailed = "failed" // The state when a job completes without any error. StatusDone = "successful" // The state when a job errors out and is queued again to be retried. // This state is analogous to statusStarted. StatusRetrying = "retrying" )
const (
DefaultQueue = "tasqueue:tasks"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface { // Enqueue places a task in the queue Enqueue(ctx context.Context, msg []byte, queue string) error // Consume listens for tasks on the queue and calls processor Consume(ctx context.Context, work chan []byte, queue string) }
Broker is something to manage the job queue(s)
type Chain ¶
func NewChain ¶
NewChain() accepts a list of Tasks and creates a chain by setting the onSuccess task of i'th task to (i+1)'th task, hence forming a "chain". It returns the first task (essentially the first node of the linked list), which can be queued normally.
func (*Chain) SetCorrelationID ¶
type ChainMessage ¶
ChainMessage is a wrapper over Chain, containing meta info such as status, uuid. A ChainMessage is stored in the results store.
type ChainMeta ¶
type ChainMeta struct { Correlation UUID string // Status of the overall chain Status string // UUID of the current job part of chain JobUUID string // List of UUIDs of completed jobs PrevJobs []string }
ChainMeta contains fields related to a chain job.
type ChainOpts ¶
type ChainOpts struct {
Correlation
}
ChainOpts holds the various options available to configure a chain.
type Correlation ¶
type Correlation struct { // ID constrained to 255 chars ID string // Arbitrary name constrained to 255 chars Name string // Kind, set internally to either job, chain or group Kind Kind }
Correlation is user data that can be used to identify group(s) of jobs. These are always optional.
type Field ¶
type Field string
const ( // FieldID is the systems randomly chosen ID for an object FieldID Field = "id" // FieldStatus is the status of the object (e.g. "pending", "successful", "failed") FieldStatus Field = "status" // FieldCorrelationID is the user given ID for an object (ie. SetCorrelationID) FieldCorrelationID Field = "correlation_id" // FieldTimeCreated is the unix time (seconds) the object was created FieldTimeCreated Field = "created_at" // FieldTimeModified is the unix time (seconds) the object was last modified FieldTimeModified Field = "modified_at" )
type Filter ¶
Filter is a (Field, Op, Values) tuple to filter objects by. For example, to filter jobs by status:
Filter{Field: FieldStatus, Op: OpIn, Value: []string{"pending", "successful"}}
Multiple Filters are ANDed together.
type Group ¶
func (*Group) SetCorrelationID ¶
type GroupMessage ¶
GroupMessage is a wrapper over Group, containing meta info such as status, uuid. A GroupMessage is stored in the results store.
type GroupMeta ¶
type GroupMeta struct { Correlation UUID string Status string // JobStatus is a map of job uuid -> status JobStatus map[string]string }
GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type GroupOpts ¶
type GroupOpts struct {
Correlation
}
GroupOpts holds the various options available to configure a group.
type Job ¶
type Job struct { // If task is successful, the OnSuccess jobs are enqueued. OnSuccess *Job Task string Payload []byte Opts JobOpts }
Job represents a unit of work pushed by producers. It is the responsibility of the task handler to unmarshal (if required) the payload and process it in any manner.
func NewJob ¶
NewJob returns a job with arbitrary payload. It accepts the name of the task, the payload and a list of options.
func (*Job) SetCorrelationID ¶
type JobCtx ¶
type JobCtx struct { Meta Meta // contains filtered or unexported fields }
JobCtx is passed onto handler functions. It allows access to a job's meta information to the handler.
type JobMessage ¶
JobMessage is a wrapper over Task, used to transport the task over a broker. It contains additional fields such as status and a UUID.
type JobOpts ¶
type JobOpts struct { Correlation Queue string MaxRetries uint32 Schedule string }
JobOpts holds the various options available to configure a job.
type Meta ¶
type Meta struct { Correlation UUID string OnSuccessUUID string Status string Queue string Schedule string MaxRetry uint32 Retried uint32 PrevErr string ProcessedAt time.Time JobResult []byte PrevJobResults [][]byte }
Meta contains fields related to a job. These are updated when a task is consumed.
func DefaultMeta ¶
DefaultMeta returns Meta with a UUID and other defaults filled in.
type Op ¶
type Op string
Op is some operation to compare field(s)
const ( // OpIn is a set membership test // Implies Value is []string OpIn Op = "in" // OpNotIn is a set non-membership test // Implies Value is []string OpNotIn Op = "not_in" // OpLessThan is a less than test // Implies Value is an int OpLessThan Op = "lt" // OpGreaterThan is a greater than test // Implies Value is an int OpGreaterThan Op = "gt" )
type Opts ¶
type Opts interface { Name() string Value() interface{} }
Opts is an interface to define arbitratry options.
type QueryResult ¶
type QueryResult struct { Correlation UUID string Status string Data []byte }
type Results ¶
type Results interface { Query(ctx context.Context, token string, filters ...*Filter) ([]*QueryResult, string, error) Get(ctx context.Context, uuid string) ([]byte, error) Set(ctx context.Context, uuid, status string, meta *Correlation, b []byte) error }
Results handles storing and retrieving results
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main store that holds the broker and the results communication interfaces. It also stores the registered tasks.
func NewServer ¶
func NewServer(o ServerOpts) (*Server, error)
NewServer() returns a new instance of server, with sane defaults.
func (*Server) Enqueue ¶
Enqueue() accepts a job and returns the assigned UUID. The following steps take place: 1. Converts it into a job message, which assigns a UUID (among other meta info) to the job. 2. Sets the job status as "started" on the results store. 3. Enqueues the job (if the job is scheduled, pushes it onto the scheduler)
func (*Server) EnqueueChain ¶
func (*Server) EnqueueGroup ¶
EnqueueGroup() accepts a group and returns the assigned UUID. The following steps take place: 1. Converts it into a group message, which assigns a UUID (among other meta info) to the group. 2. Sets the group status as "started" on the results store. 3. Loops over all jobs part of the group and enqueues the job each job. 4. The job status map is updated with the uuids of each enqueued job.
func (*Server) GetJob ¶
GetJob accepts a UUID and returns the job message in the results store. This is useful to check the status of a job message.
func (*Server) QueryChains ¶
func (s *Server) QueryChains(ctx context.Context, token string, filters ...*Filter) ([]*ChainMessage, string, error)
QueryChains accepts a list of filters and returns a list of chains that match the filters. It also returns a token, which can be used to paginate through the results.
func (*Server) QueryGroups ¶
func (s *Server) QueryGroups(ctx context.Context, token string, filters ...*Filter) ([]*GroupMessage, string, error)
QueryGroups accepts a list of filters and returns a list of groups that match the filters. It also returns a token, which can be used to paginate through the results.
func (*Server) QueryJobs ¶
func (s *Server) QueryJobs(ctx context.Context, token string, filters ...*Filter) ([]*JobMessage, string, error)
QueryJobs accepts a list of filters and returns a list of jobs that match the filters. It also returns a token, which can be used to paginate through the results.
func (*Server) RegisterTask ¶
RegisterTask maps a new task against the tasks map on the server. It accepts different options for the task (to set callbacks).