machinery

package
v1.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2020 License: MPL-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkerQuitGracefully is return when worker quit gracefully
	ErrWorkerQuitGracefully = errors.New("Worker quit gracefully")
	// ErrWorkerQuitGracefully is return when worker quit abruptly
	ErrWorkerQuitAbruptly = errors.New("Worker quit abruptly")
)

Functions

func BackendFactory

func BackendFactory(cnf *config.Config) (backendiface.Backend, error)

BackendFactory creates a new object of backends.Interface Currently supported backends are AMQP/S and Memcache

func BrokerFactory

func BrokerFactory(cnf *config.Config) (brokeriface.Broker, error)

BrokerFactory creates a new object of iface.Broker Currently only AMQP/S broker is supported

func ParseGCPPubSubURL added in v1.7.7

func ParseGCPPubSubURL(url string) (string, string, error)

ParseGCPPubSubURL Parse GCP Pub/Sub URL url: gcppubsub://YOUR_GCP_PROJECT_ID/YOUR_PUBSUB_SUBSCRIPTION_NAME

func ParseRedisSocketURL

func ParseRedisSocketURL(url string) (path, password string, db int, err error)

ParseRedisSocketURL extracts Redis connection options from a URL with the redis+socket:// scheme. This scheme is not standard (or even de facto) and is used as a transitional mechanism until the the config package gains the proper facilities to support socket-based connections.

func ParseRedisURL

func ParseRedisURL(url string) (host, password string, db int, err error)

ParseRedisURL ...

Types

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server

func NewServer

func NewServer(cnf *config.Config) (*Server, error)

NewServer creates Server instance

func NewServerWithBrokerBackend added in v1.4.5

func NewServerWithBrokerBackend(cnf *config.Config, brokerServer brokersiface.Broker, backendServer backendsiface.Backend) *Server

NewServerWithBrokerBackend ...

func (*Server) GetBackend

func (server *Server) GetBackend() backendsiface.Backend

GetBackend returns backend

func (*Server) GetBroker

func (server *Server) GetBroker() brokersiface.Broker

GetBroker returns broker

func (*Server) GetConfig

func (server *Server) GetConfig() *config.Config

GetConfig returns connection object

func (*Server) GetRegisteredTask

func (server *Server) GetRegisteredTask(name string) (interface{}, error)

GetRegisteredTask returns registered task by name

func (*Server) GetRegisteredTaskNames

func (server *Server) GetRegisteredTaskNames() []string

GetRegisteredTaskNames returns slice of registered task names

func (*Server) IsTaskRegistered

func (server *Server) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task name is registered with this broker

func (*Server) NewCustomQueueWorker added in v1.4.2

func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker

NewCustomQueueWorker creates Worker instance with Custom Queue

func (*Server) NewWorker

func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker

NewWorker creates Worker instance

func (*Server) RegisterTask

func (server *Server) RegisterTask(name string, taskFunc interface{}) error

RegisterTask registers a single task

func (*Server) RegisterTasks

func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error

RegisterTasks registers all tasks at once

func (*Server) SendChain

func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChain triggers a chain of tasks

func (*Server) SendChainWithContext added in v1.2.2

func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChainWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendChord

func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChord triggers a group of parallel tasks with a callback

func (*Server) SendChordWithContext added in v1.2.2

func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChordWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendGroup

func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroup triggers a group of parallel tasks

func (*Server) SendGroupWithContext added in v1.2.2

func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroupWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendTask

func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error)

SendTask publishes a task to the default queue

func (*Server) SendTaskWithContext added in v1.2.2

func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)

SendTaskWithContext will inject the trace context in the signature headers before publishing it

func (*Server) SetBackend

func (server *Server) SetBackend(backend backendsiface.Backend)

SetBackend sets backend

func (*Server) SetBroker

func (server *Server) SetBroker(broker brokersiface.Broker)

SetBroker sets broker

func (*Server) SetConfig

func (server *Server) SetConfig(cnf *config.Config)

SetConfig sets config

func (*Server) SetPreTaskHandler added in v1.7.7

func (server *Server) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler Sets pre publish handler

type Worker

type Worker struct {
	ConsumerTag string
	Concurrency int
	Queue       string
	// contains filtered or unexported fields
}

Worker represents a single worker process

func (*Worker) CustomQueue added in v1.4.2

func (worker *Worker) CustomQueue() string

CustomQueue returns Custom Queue of the running worker process

func (*Worker) GetServer added in v1.4.3

func (worker *Worker) GetServer() *Server

GetServer returns server

func (*Worker) Launch

func (worker *Worker) Launch() error

Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks

func (*Worker) LaunchAsync added in v1.0.10

func (worker *Worker) LaunchAsync(errorsChan chan<- error)

LaunchAsync is a non blocking version of Launch

func (*Worker) Process

func (worker *Worker) Process(signature *tasks.Signature) error

Process handles received tasks and triggers success/error callbacks

func (*Worker) Quit

func (worker *Worker) Quit()

Quit tears down the running worker process

func (*Worker) SetErrorHandler added in v1.3.5

func (worker *Worker) SetErrorHandler(handler func(err error))

SetErrorHandler sets a custom error handler for task errors A default behavior is just to log the error after all the retry attempts fail

func (*Worker) SetPostTaskHandler added in v1.7.7

func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature))

SetPostTaskHandler sets a custom handler for the end of a job

func (*Worker) SetPreTaskHandler added in v1.7.7

func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler sets a custom handler func before a job is started

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL