machinery

package
v1.7.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2020 License: MPL-2.0 Imports: 34 Imported by: 190

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkerQuitGracefully is return when worker quit gracefully
	ErrWorkerQuitGracefully = errors.New("Worker quit gracefully")
	// ErrWorkerQuitGracefully is return when worker quit abruptly
	ErrWorkerQuitAbruptly = errors.New("Worker quit abruptly")
)

Functions

func BackendFactory

func BackendFactory(cnf *config.Config) (backendiface.Backend, error)

BackendFactory creates a new object of backends.Interface Currently supported backends are AMQP/S and Memcache

func BrokerFactory

func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error)

BrokerFactory creates a new object of iface.Broker Currently only AMQP/S broker is supported

func ParseGCPPubSubURL added in v1.5.0

func ParseGCPPubSubURL(url string) (string, string, error)

ParseGCPPubSubURL Parse GCP Pub/Sub URL url: gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME

func ParseRedisSocketURL

func ParseRedisSocketURL(url string) (path, password string, db int, err error)

ParseRedisSocketURL extracts Redis connection options from a URL with the redis+socket:// scheme. This scheme is not standard (or even de facto) and is used as a transitional mechanism until the the config package gains the proper facilities to support socket-based connections.

func ParseRedisURL

func ParseRedisURL(url string) (host, password string, db int, err error)

ParseRedisURL ...

func RedactURL added in v1.7.6

func RedactURL(urlString string) string

Types

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server

func NewServer

func NewServer(cnf *config.Config) (*Server, error)

NewServer creates Server instance

func NewServerWithBrokerBackend added in v1.4.5

func NewServerWithBrokerBackend(cnf *config.Config, brokerServer brokersiface.Broker, backendServer backendsiface.Backend) *Server

NewServerWithBrokerBackend ...

func (*Server) GetBackend

func (server *Server) GetBackend() backendsiface.Backend

GetBackend returns backend

func (*Server) GetBroker

func (server *Server) GetBroker() brokersiface.Broker

GetBroker returns broker

func (*Server) GetConfig

func (server *Server) GetConfig() *config.Config

GetConfig returns connection object

func (*Server) GetRegisteredTask

func (server *Server) GetRegisteredTask(name string) (interface{}, error)

GetRegisteredTask returns registered task by name

func (*Server) GetRegisteredTaskNames

func (server *Server) GetRegisteredTaskNames() []string

GetRegisteredTaskNames returns slice of registered task names

func (*Server) IsTaskRegistered

func (server *Server) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task name is registered with this broker

func (*Server) NewCustomQueueWorker added in v1.4.2

func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker

NewCustomQueueWorker creates Worker instance with Custom Queue

func (*Server) NewWorker

func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker

NewWorker creates Worker instance

func (*Server) RegisterTask

func (server *Server) RegisterTask(name string, taskFunc interface{}) error

RegisterTask registers a single task

func (*Server) RegisterTasks

func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error

RegisterTasks registers all tasks at once

func (*Server) SendChain

func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChain triggers a chain of tasks

func (*Server) SendChainWithContext added in v1.2.2

func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChainWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendChord

func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChord triggers a group of parallel tasks with a callback

func (*Server) SendChordWithContext added in v1.2.2

func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChordWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendGroup

func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroup triggers a group of parallel tasks

func (*Server) SendGroupWithContext added in v1.2.2

func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroupWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendTask

func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error)

SendTask publishes a task to the default queue

func (*Server) SendTaskWithContext added in v1.2.2

func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)

SendTaskWithContext will inject the trace context in the signature headers before publishing it

func (*Server) SetBackend

func (server *Server) SetBackend(backend backendsiface.Backend)

SetBackend sets backend

func (*Server) SetBroker

func (server *Server) SetBroker(broker brokersiface.Broker)

SetBroker sets broker

func (*Server) SetConfig

func (server *Server) SetConfig(cnf *config.Config)

SetConfig sets config

func (*Server) SetPreTaskHandler added in v1.5.3

func (server *Server) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler Sets pre publish handler

type Worker

type Worker struct {
	ConsumerTag string
	Concurrency int
	Queue       string
	// contains filtered or unexported fields
}

Worker represents a single worker process

func (*Worker) CustomQueue added in v1.4.2

func (worker *Worker) CustomQueue() string

CustomQueue returns Custom Queue of the running worker process

func (*Worker) GetServer added in v1.4.3

func (worker *Worker) GetServer() *Server

GetServer returns server

func (*Worker) Launch

func (worker *Worker) Launch() error

Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks

func (*Worker) LaunchAsync added in v1.0.10

func (worker *Worker) LaunchAsync(errorsChan chan<- error)

LaunchAsync is a non blocking version of Launch

func (*Worker) PreConsumeHandler added in v1.7.7

func (worker *Worker) PreConsumeHandler() bool

func (*Worker) Process

func (worker *Worker) Process(signature *tasks.Signature) error

Process handles received tasks and triggers success/error callbacks

func (*Worker) Quit

func (worker *Worker) Quit()

Quit tears down the running worker process

func (*Worker) SetErrorHandler added in v1.3.5

func (worker *Worker) SetErrorHandler(handler func(err error))

SetErrorHandler sets a custom error handler for task errors A default behavior is just to log the error after all the retry attempts fail

func (*Worker) SetPostTaskHandler added in v1.5.3

func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature))

SetPostTaskHandler sets a custom handler for the end of a job

func (*Worker) SetPreConsumeHandler added in v1.7.7

func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool)

SetPreConsumeHandler sets a custom handler for the end of a job

func (*Worker) SetPreTaskHandler added in v1.5.3

func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler sets a custom handler func before a job is started

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL