machinery

package
v0.0.0-...-768bde1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2016 License: MPL-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BackendFactory

func BackendFactory(cnf *config.Config) (backends.Backend, error)

BackendFactory creates a new object with backends.Backend interface Currently supported backends are AMQP and Memcache

func BrokerFactory

func BrokerFactory(cnf *config.Config) (brokers.Broker, error)

BrokerFactory creates a new object with brokers.Broker interface Currently only AMQP broker is supported

func CreateTaskResult

func CreateTaskResult(value reflect.Value) *backends.TaskResult

CreateTaskResult ...

func ParseRedisSocketURL

func ParseRedisSocketURL(url string) (path, password string, db int, err error)

ParseRedisSocketURL extracts Redis connection options from a URL with the redis+socket:// scheme. This scheme is not standard (or even de facto) and is used as a transitional mechanism until the the config package gains the proper facilities to support socket-based connections.

func ParseRedisURL

func ParseRedisURL(url string) (host, password string, db int, err error)

ParseRedisURL ...

func ReflectArgs

func ReflectArgs(args []signatures.TaskArg) ([]reflect.Value, error)

ReflectArgs converts []TaskArg to []reflect.Value

func TryCall

func TryCall(f reflect.Value, args []reflect.Value) (results []reflect.Value, err error)

TryCall attempts to call the task with the supplied arguments.

`err` is set in the return value in two cases:

  1. The reflected function invocation panics (e.g. due to a mismatched argument list).
  2. The task func itself returns a non-nil error.

Types

type Chain

type Chain struct {
	Tasks []*signatures.TaskSignature
}

Chain creates a chain of tasks to be executed one after another

func NewChain

func NewChain(tasks ...*signatures.TaskSignature) *Chain

NewChain creates Chain instance

type Chord

type Chord struct {
	Group    *Group
	Callback *signatures.TaskSignature
}

Chord adds an optional callback to the group to be executed after all tasks in the group finished

func NewChord

func NewChord(group *Group, callback *signatures.TaskSignature) *Chord

NewChord creates Chord instance

type Group

type Group struct {
	GroupUUID string
	Tasks     []*signatures.TaskSignature
}

Group creates a set of tasks to be executed in parallel

func NewGroup

func NewGroup(tasks ...*signatures.TaskSignature) *Group

NewGroup creates Group instance

func (*Group) GetUUIDs

func (group *Group) GetUUIDs() []string

GetUUIDs returns slice of task UUIDS

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server

func NewServer

func NewServer(cnf *config.Config) (*Server, error)

NewServer creates Server instance

func (*Server) GetBackend

func (server *Server) GetBackend() backends.Backend

GetBackend returns backend

func (*Server) GetBroker

func (server *Server) GetBroker() brokers.Broker

GetBroker returns broker

func (*Server) GetConfig

func (server *Server) GetConfig() *config.Config

GetConfig returns connection object

func (*Server) GetRegisteredTask

func (server *Server) GetRegisteredTask(name string) (interface{}, error)

GetRegisteredTask returns registered task by name

func (*Server) GetRegisteredTaskNames

func (server *Server) GetRegisteredTaskNames() []string

GetRegisteredTaskNames returns slice of registered task names

func (*Server) IsTaskRegistered

func (server *Server) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task name is registered with this broker

func (*Server) NewWorker

func (server *Server) NewWorker(consumerTag string) *Worker

NewWorker creates Worker instance

func (*Server) RegisterTask

func (server *Server) RegisterTask(name string, task interface{})

RegisterTask registers a single task

func (*Server) RegisterTasks

func (server *Server) RegisterTasks(tasks map[string]interface{})

RegisterTasks registers all tasks at once

func (*Server) SendChain

func (server *Server) SendChain(chain *Chain) (*backends.ChainAsyncResult, error)

SendChain triggers a chain of tasks

func (*Server) SendChord

func (server *Server) SendChord(chord *Chord) (*backends.ChordAsyncResult, error)

SendChord triggers a group of parallel tasks with a callback

func (*Server) SendGroup

func (server *Server) SendGroup(group *Group) ([]*backends.AsyncResult, error)

SendGroup triggers a group of parallel tasks

func (*Server) SendTask

func (server *Server) SendTask(signature *signatures.TaskSignature) (*backends.AsyncResult, error)

SendTask publishes a task to the default queue

func (*Server) SetBackend

func (server *Server) SetBackend(backend backends.Backend)

SetBackend sets backend

func (*Server) SetBroker

func (server *Server) SetBroker(broker brokers.Broker)

SetBroker sets broker

func (*Server) SetConfig

func (server *Server) SetConfig(cnf *config.Config)

SetConfig sets config

type Worker

type Worker struct {
	ConsumerTag string
	// contains filtered or unexported fields
}

Worker represents a single worker process

func (*Worker) Launch

func (worker *Worker) Launch() error

Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks

func (*Worker) Process

func (worker *Worker) Process(signature *signatures.TaskSignature) error

Process handles received tasks and triggers success/error callbacks

func (*Worker) Quit

func (worker *Worker) Quit()

Quit tears down the running worker process

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL