Documentation ¶
Index ¶
- func BackendFactory(cnf *config.Config) (backendiface.Backend, error)
- func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error)
- func ParseGCPPubSubURL(url string) (string, string, error)
- func ParseRedisSocketURL(url string) (path, password string, db int, err error)
- func ParseRedisURL(url string) (host, password string, db int, err error)
- type Server
- func (server *Server) GetBackend() backendsiface.Backend
- func (server *Server) GetBroker() brokersiface.Broker
- func (server *Server) GetConfig() *config.Config
- func (server *Server) GetRegisteredTask(name string) (interface{}, error)
- func (server *Server) GetRegisteredTaskNames() []string
- func (server *Server) IsTaskRegistered(name string) bool
- func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker
- func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker
- func (server *Server) RegisterTask(name string, taskFunc interface{}) error
- func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error
- func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)
- func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)
- func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
- func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
- func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
- func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
- func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error)
- func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)
- func (server *Server) SetBackend(backend backendsiface.Backend)
- func (server *Server) SetBroker(broker brokersiface.Broker)
- func (server *Server) SetConfig(cnf *config.Config)
- func (server *Server) SetPreTaskHandler(handler func(*tasks.Signature))
- type Worker
- func (worker *Worker) CustomQueue() string
- func (worker *Worker) GetServer() *Server
- func (worker *Worker) Launch() error
- func (worker *Worker) LaunchAsync(errorsChan chan<- error)
- func (worker *Worker) Process(signature *tasks.Signature) error
- func (worker *Worker) Quit()
- func (worker *Worker) SetErrorHandler(handler func(err error))
- func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature))
- func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature))
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BackendFactory ¶
func BackendFactory(cnf *config.Config) (backendiface.Backend, error)
BackendFactory creates a new object of backends.Interface Currently supported backends are AMQP/S and Memcache
func BrokerFactory ¶
func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error)
BrokerFactory creates a new object of iface.Broker Currently only AMQP/S broker is supported
func ParseGCPPubSubURL ¶
ParseGCPPubSubURL Parse GCP Pub/Sub URL url: gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME
func ParseRedisSocketURL ¶
ParseRedisSocketURL extracts Redis connection options from a URL with the redis+socket:// scheme. This scheme is not standard (or even de facto) and is used as a transitional mechanism until the the config package gains the proper facilities to support socket-based connections.
Types ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server
func NewServerWithBrokerBackend ¶
func NewServerWithBrokerBackend(cnf *config.Config, brokerServer brokersiface.Broker, backendServer backendsiface.Backend) *Server
NewServerWithBrokerBackend ...
func (*Server) GetBackend ¶
func (server *Server) GetBackend() backendsiface.Backend
GetBackend returns backend
func (*Server) GetBroker ¶
func (server *Server) GetBroker() brokersiface.Broker
GetBroker returns broker
func (*Server) GetRegisteredTask ¶
GetRegisteredTask returns registered task by name
func (*Server) GetRegisteredTaskNames ¶
GetRegisteredTaskNames returns slice of registered task names
func (*Server) IsTaskRegistered ¶
IsTaskRegistered returns true if the task name is registered with this broker
func (*Server) NewCustomQueueWorker ¶
func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker
NewCustomQueueWorker creates Worker instance with Custom Queue
func (*Server) RegisterTask ¶
RegisterTask registers a single task
func (*Server) RegisterTasks ¶
RegisterTasks registers all tasks at once
func (*Server) SendChainWithContext ¶
func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)
SendChainWithContext will inject the trace context in all the signature headers before publishing it
func (*Server) SendChord ¶
func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
SendChord triggers a group of parallel tasks with a callback
func (*Server) SendChordWithContext ¶
func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)
SendChordWithContext will inject the trace context in all the signature headers before publishing it
func (*Server) SendGroup ¶
func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendGroup triggers a group of parallel tasks
func (*Server) SendGroupWithContext ¶
func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)
SendGroupWithContext will inject the trace context in all the signature headers before publishing it
func (*Server) SendTaskWithContext ¶
func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)
SendTaskWithContext will inject the trace context in the signature headers before publishing it
func (*Server) SetBackend ¶
func (server *Server) SetBackend(backend backendsiface.Backend)
SetBackend sets backend
func (*Server) SetBroker ¶
func (server *Server) SetBroker(broker brokersiface.Broker)
SetBroker sets broker
func (*Server) SetPreTaskHandler ¶
SetPreTaskHandler Sets pre publish handler
type Worker ¶
type Worker struct { ConsumerTag string Concurrency int Queue string // contains filtered or unexported fields }
Worker represents a single worker process
func (*Worker) CustomQueue ¶
CustomQueue returns Custom Queue of the running worker process
func (*Worker) Launch ¶
Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks
func (*Worker) LaunchAsync ¶
LaunchAsync is a non blocking version of Launch
func (*Worker) SetErrorHandler ¶
SetErrorHandler sets a custom error handler for task errors A default behavior is just to log the error after all the retry attempts fail
func (*Worker) SetPostTaskHandler ¶
SetPostTaskHandler sets a custom handler for the end of a job
func (*Worker) SetPreTaskHandler ¶
SetPreTaskHandler sets a custom handler func before a job is started