Documentation ¶
Index ¶
- Constants
- type Broker
- type Chain
- type ChainMessage
- type ChainMeta
- type Group
- type GroupMessage
- type GroupMeta
- type Job
- type JobCtx
- type JobMessage
- type JobOpts
- type Meta
- type Results
- type Server
- func (s *Server) DeleteJob(ctx context.Context, uuid string) error
- func (s *Server) Enqueue(ctx context.Context, t Job) (string, error)
- func (s *Server) EnqueueChain(ctx context.Context, c Chain) (string, error)
- func (s *Server) EnqueueGroup(ctx context.Context, t Group) (string, error)
- func (s *Server) GetChain(ctx context.Context, uuid string) (ChainMessage, error)
- func (s *Server) GetFailed(ctx context.Context) ([]string, error)
- func (s *Server) GetGroup(ctx context.Context, uuid string) (GroupMessage, error)
- func (s *Server) GetJob(ctx context.Context, uuid string) (JobMessage, error)
- func (s *Server) GetPending(ctx context.Context, queue string) ([]JobMessage, error)
- func (s *Server) GetResult(ctx context.Context, uuid string) ([]byte, error)
- func (s *Server) GetSuccess(ctx context.Context) ([]string, error)
- func (s *Server) GetTasks() []string
- func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts)
- func (s *Server) Start(ctx context.Context)
- type ServerOpts
- type Task
- type TaskOpts
Constants ¶
const ( // This is the initial state when a job is pushed onto the broker. StatusStarted = "queued" // This is the state when a worker has recieved a job. StatusProcessing = "processing" // The state when a job completes, but returns an error (and all retries are over). StatusFailed = "failed" // The state when a job completes without any error. StatusDone = "successful" // The state when a job errors out and is queued again to be retried. // This state is analogous to statusStarted. StatusRetrying = "retrying" )
const (
DefaultQueue = "tasqueue:tasks"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker interface { // Enqueue places a task in the queue Enqueue(ctx context.Context, msg []byte, queue string) error // Consume listens for tasks on the queue and calls processor Consume(ctx context.Context, work chan []byte, queue string) // GetPending returns a list of stored job messages on the particular queue GetPending(ctx context.Context, queue string) ([]string, error) }
type Chain ¶
type Chain struct {
Jobs []Job
}
type ChainMessage ¶
type ChainMessage struct {
ChainMeta
}
ChainMessage is a wrapper over Chain, containing meta info such as status, uuid. A ChainMessage is stored in the results store.
type ChainMeta ¶
type ChainMeta struct { UUID string // Status of the overall chain Status string // UUID of the current job part of chain JobUUID string // List of UUIDs of completed jobs PrevJobs []string }
ChainMeta contains fields related to a chain job.
type GroupMessage ¶
GroupMessage is a wrapper over Group, containing meta info such as status, uuid. A GroupMessage is stored in the results store.
type GroupMeta ¶
type GroupMeta struct { UUID string Status string // JobStatus is a map of job uuid -> status JobStatus map[string]string }
GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type Job ¶
type Job struct { // If task is successful, the OnSuccess jobs are enqueued. OnSuccess *Job Task string Payload []byte Opts JobOpts }
Job represents a unit of work pushed by producers. It is the responsibility of the task handler to unmarshal (if required) the payload and process it in any manner.
type JobCtx ¶
JobCtx is passed onto handler functions. It allows access to a job's meta information to the handler.
type JobMessage ¶
JobMessage is a wrapper over Task, used to transport the task over a broker. It contains additional fields such as status and a UUID.
type Meta ¶
type Meta struct { UUID string OnSuccessUUID string Status string Queue string Schedule string MaxRetry uint32 Retried uint32 PrevErr string ProcessedAt time.Time // PrevJobResults contains any job result set by the previous job in a chain. // This will be nil if the previous job doesn't set the results on JobCtx. PrevJobResult []byte }
Meta contains fields related to a job. These are updated when a task is consumed.
func DefaultMeta ¶ added in v0.2.0
DefaultMeta returns Meta with a UUID and other defaults filled in.
type Results ¶
type Results interface { Get(ctx context.Context, uuid string) ([]byte, error) Set(ctx context.Context, uuid string, b []byte) error // DeleteJob removes the job's saved metadata from the store DeleteJob(ctx context.Context, uuid string) error GetFailed(ctx context.Context) ([]string, error) GetSuccess(ctx context.Context) ([]string, error) SetFailed(ctx context.Context, uuid string) error SetSuccess(ctx context.Context, uuid string) error }
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main store that holds the broker and the results communication interfaces. It also stores the registered tasks.
func NewServer ¶
func NewServer(o ServerOpts) (*Server, error)
NewServer() returns a new instance of server, with sane defaults.
func (*Server) DeleteJob ¶ added in v1.3.0
DeleteJob() removes the stored results of a particular job. It does not "dequeue" an unprocessed job. It is useful for removing the status of old finished jobs.
func (*Server) Enqueue ¶
Enqueue() accepts a job and returns the assigned UUID. The following steps take place: 1. Converts it into a job message, which assigns a UUID (among other meta info) to the job. 2. Sets the job status as "started" on the results store. 3. Enqueues the job (if the job is scheduled, pushes it onto the scheduler)
func (*Server) EnqueueChain ¶
func (*Server) EnqueueGroup ¶
EnqueueGroup() accepts a group and returns the assigned UUID. The following steps take place: 1. Converts it into a group message, which assigns a UUID (among other meta info) to the group. 2. Sets the group status as "started" on the results store. 3. Loops over all jobs part of the group and enqueues the job each job. 4. The job status map is updated with the uuids of each enqueued job.
func (*Server) GetFailed ¶ added in v0.3.0
GetFailed() returns the list of uuid's of jobs that failed.
func (*Server) GetJob ¶
GetJob accepts a UUID and returns the job message in the results store. This is useful to check the status of a job message.
func (*Server) GetPending ¶ added in v1.3.0
GetPending() returns the pending job message's in the broker's queue.
func (*Server) GetResult ¶
GetResult() accepts a UUID and returns the result of the job in the results store.
func (*Server) GetSuccess ¶ added in v0.3.0
GetSuccess() returns the list of uuid's of jobs that were successful.
func (*Server) GetTasks ¶ added in v1.3.0
GetTasks() returns a list of all tasks registered with the server.
func (*Server) RegisterTask ¶
RegisterTask maps a new task against the tasks map on the server. It accepts different options for the task (to set callbacks).