Documentation ¶
Index ¶
- Variables
- func BackendFactory(cnf *config.Config) (backendiface.Backend, error)
- func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error)
- func LockFactory(cnf *config.Config) (lockiface.Lock, error)
- func ParseGCPPubSubURL(url string) (string, string, error)
- func ParseRedisSocketURL(url string) (path, password string, db int, err error)
- func ParseRedisURL(url string) (host, password string, db int, err error)
- func RedactURL(urlString string) string
- type Server
- func (server *Server) GetBackend() backendsiface.Backend
- func (server *Server) GetBroker() brokersiface.Broker
- func (server *Server) GetConfig() *config.Config
- func (server *Server) GetRegisteredTask(name string) (interface{}, error)
- func (server *Server) GetRegisteredTaskNames() []string
- func (server *Server) IsTaskRegistered(name string) bool
- func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker
- func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker
- func (server *Server) RegisterPeriodicChain(spec, name string, signatures ...*tasks.Signature) error
- func (server *Server) RegisterPeriodicChord(spec, name string, sendConcurrency int, callback *tasks.Signature, ...) error
- func (server *Server) RegisterPeriodicGroup(spec, name string, sendConcurrency int, signatures ...*tasks.Signature) error
- func (server *Server) RegisterPeriodicTask(spec, name string, signature *tasks.Signature) error
- func (server *Server) RegisterTask(name string, taskFunc interface{}) error
- func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error
- func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)
- func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)
- func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
- func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
- func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
- func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
- func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error)
- func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)
- func (server *Server) SetBackend(backend backendsiface.Backend)
- func (server *Server) SetBroker(broker brokersiface.Broker)
- func (server *Server) SetConfig(cnf *config.Config)
- func (server *Server) SetPreTaskHandler(handler func(*tasks.Signature))
- type Worker
- func (worker *Worker) CustomQueue() string
- func (worker *Worker) GetServer() *Server
- func (worker *Worker) Launch() error
- func (worker *Worker) LaunchAsync(errorsChan chan<- error)
- func (worker *Worker) PreConsumeHandler() bool
- func (worker *Worker) Process(signature *tasks.Signature) error
- func (worker *Worker) Quit()
- func (worker *Worker) SetErrorHandler(handler func(err error))
- func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature))
- func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool)
- func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature))
Constants ¶
This section is empty.
Variables ¶
var ( // ErrWorkerQuitGracefully is return when worker quit gracefully ErrWorkerQuitGracefully = errors.New("Worker quit gracefully") // ErrWorkerQuitGracefully is return when worker quit abruptly ErrWorkerQuitAbruptly = errors.New("Worker quit abruptly") )
Functions ¶
func BackendFactory ¶
func BackendFactory(cnf *config.Config) (backendiface.Backend, error)
BackendFactory creates a new object of backends.Interface Currently supported backends are AMQP/S and Memcache
func BrokerFactory ¶
func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error)
BrokerFactory creates a new object of iface.Broker Currently only AMQP/S broker is supported
func LockFactory ¶
LockFactory creates a new object of iface.Lock Currently supported lock is redis
func ParseGCPPubSubURL ¶
ParseGCPPubSubURL Parse GCP Pub/Sub URL url: gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME
func ParseRedisSocketURL ¶
ParseRedisSocketURL extracts Redis connection options from a URL with the redis+socket:// scheme. This scheme is not standard (or even de facto) and is used as a transitional mechanism until the the config package gains the proper facilities to support socket-based connections.
func ParseRedisURL ¶
ParseRedisURL ...
Types ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server
func NewServerWithBrokerBackendLock ¶
func NewServerWithBrokerBackendLock(cnf *config.Config, brokerServer brokersiface.Broker, backendServer backendsiface.Backend, lock lockiface.Lock) *Server
NewServerWithBrokerBackend ...
func (*Server) GetBackend ¶
func (server *Server) GetBackend() backendsiface.Backend
GetBackend returns backend
func (*Server) GetBroker ¶
func (server *Server) GetBroker() brokersiface.Broker
GetBroker returns broker
func (*Server) GetRegisteredTask ¶
GetRegisteredTask returns registered task by name
func (*Server) GetRegisteredTaskNames ¶
GetRegisteredTaskNames returns slice of registered task names
func (*Server) IsTaskRegistered ¶
IsTaskRegistered returns true if the task name is registered with this broker
func (*Server) NewCustomQueueWorker ¶
func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker
NewCustomQueueWorker creates Worker instance with Custom Queue
func (*Server) RegisterPeriodicChain ¶
func (server *Server) RegisterPeriodicChain(spec, name string, signatures ...*tasks.Signature) error
RegisterPeriodicChain register a periodic chain which will be triggered periodically
func (*Server) RegisterPeriodicChord ¶
func (server *Server) RegisterPeriodicChord(spec, name string, sendConcurrency int, callback *tasks.Signature, signatures ...*tasks.Signature) error
RegisterPeriodicChord register a periodic chord which will be triggered periodically
func (*Server) RegisterPeriodicGroup ¶
func (server *Server) RegisterPeriodicGroup(spec, name string, sendConcurrency int, signatures ...*tasks.Signature) error
RegisterPeriodicGroup register a periodic group which will be triggered periodically
func (*Server) RegisterPeriodicTask ¶
RegisterPeriodicTask register a periodic task which will be triggered periodically
func (*Server) RegisterTask ¶
RegisterTask registers a single task
func (*Server) RegisterTasks ¶
RegisterTasks registers all tasks at once
func (*Server) SendChainWithContext ¶
func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)
SendChainWithContext will inject the trace context in all the signature headers before publishing it
func (*Server) SendChord ¶
func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
SendChord triggers a group of parallel tasks with a callback
func (*Server) SendChordWithContext ¶
func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
SendChordWithContext will inject the trace context in all the signature headers before publishing it
func (*Server) SendGroup ¶
func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendGroup triggers a group of parallel tasks
func (*Server) SendGroupWithContext ¶
func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendGroupWithContext will inject the trace context in all the signature headers before publishing it
func (*Server) SendTaskWithContext ¶
func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)
SendTaskWithContext will inject the trace context in the signature headers before publishing it
func (*Server) SetBackend ¶
func (server *Server) SetBackend(backend backendsiface.Backend)
SetBackend sets backend
func (*Server) SetBroker ¶
func (server *Server) SetBroker(broker brokersiface.Broker)
SetBroker sets broker
func (*Server) SetPreTaskHandler ¶
SetPreTaskHandler Sets pre publish handler
type Worker ¶
type Worker struct { ConsumerTag string Concurrency int Queue string // contains filtered or unexported fields }
Worker represents a single worker process
func (*Worker) CustomQueue ¶
CustomQueue returns Custom Queue of the running worker process
func (*Worker) Launch ¶
Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks
func (*Worker) LaunchAsync ¶
LaunchAsync is a non blocking version of Launch
func (*Worker) PreConsumeHandler ¶
func (*Worker) SetErrorHandler ¶
SetErrorHandler sets a custom error handler for task errors A default behavior is just to log the error after all the retry attempts fail
func (*Worker) SetPostTaskHandler ¶
SetPostTaskHandler sets a custom handler for the end of a job
func (*Worker) SetPreConsumeHandler ¶
SetPreConsumeHandler sets a custom handler for the end of a job
func (*Worker) SetPreTaskHandler ¶
SetPreTaskHandler sets a custom handler func before a job is started