machinery

package module
v2.0.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2024 License: MPL-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrWorkerQuitGracefully is return when worker quit gracefully
	ErrWorkerQuitGracefully = errors.New("Worker quit gracefully")
	// ErrWorkerQuitGracefully is return when worker quit abruptly
	ErrWorkerQuitAbruptly = errors.New("Worker quit abruptly")
)

Functions

func RedactURL

func RedactURL(urlString string) string

Types

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server

func NewServer

func NewServer(cnf *config.Config, brokerServer brokersiface.Broker, backendServer backendsiface.Backend, lock lockiface.Lock) *Server

NewServer creates Server instance

func (*Server) GetBackend

func (server *Server) GetBackend() backendsiface.Backend

GetBackend returns backend

func (*Server) GetBroker

func (server *Server) GetBroker() brokersiface.Broker

GetBroker returns broker

func (*Server) GetConfig

func (server *Server) GetConfig() *config.Config

GetConfig returns connection object

func (*Server) GetRegisteredTask

func (server *Server) GetRegisteredTask(name string) (interface{}, error)

GetRegisteredTask returns registered task by name

func (*Server) GetRegisteredTaskNames

func (server *Server) GetRegisteredTaskNames() []string

GetRegisteredTaskNames returns slice of registered task names

func (*Server) IsTaskRegistered

func (server *Server) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task name is registered with this broker

func (*Server) NewCustomQueueWorker

func (server *Server) NewCustomQueueWorker(consumerTag string, concurrency int, queue string) *Worker

NewCustomQueueWorker creates Worker instance with Custom Queue

func (*Server) NewWorker

func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker

NewWorker creates Worker instance

func (*Server) RegisterPeriodicChain

func (server *Server) RegisterPeriodicChain(spec, name string, signatures ...*tasks.Signature) error

RegisterPeriodicChain register a periodic chain which will be triggered periodically

func (*Server) RegisterPeriodicChord

func (server *Server) RegisterPeriodicChord(spec, name string, sendConcurrency int, callback *tasks.Signature, signatures ...*tasks.Signature) error

RegisterPeriodicChord register a periodic chord which will be triggered periodically

func (*Server) RegisterPeriodicGroup

func (server *Server) RegisterPeriodicGroup(spec, name string, sendConcurrency int, signatures ...*tasks.Signature) error

RegisterPeriodicGroup register a periodic group which will be triggered periodically

func (*Server) RegisterPeriodicTask

func (server *Server) RegisterPeriodicTask(spec, name string, signature *tasks.Signature) error

RegisterPeriodicTask register a periodic task which will be triggered periodically

func (*Server) RegisterTask

func (server *Server) RegisterTask(name string, taskFunc interface{}) error

RegisterTask registers a single task

func (*Server) RegisterTasks

func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error

RegisterTasks registers all tasks at once

func (*Server) SendChain

func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChain triggers a chain of tasks

func (*Server) SendChainWithContext

func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error)

SendChainWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendChord

func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChord triggers a group of parallel tasks with a callback

func (*Server) SendChordWithContext

func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error)

SendChordWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendGroup

func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroup triggers a group of parallel tasks

func (*Server) SendGroupWithContext

func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error)

SendGroupWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendTask

func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, error)

SendTask publishes a task to the default queue

func (*Server) SendTaskWithContext

func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error)

SendTaskWithContext will inject the trace context in the signature headers before publishing it

func (*Server) SetBackend

func (server *Server) SetBackend(backend backendsiface.Backend)

SetBackend sets backend

func (*Server) SetBroker

func (server *Server) SetBroker(broker brokersiface.Broker)

SetBroker sets broker

func (*Server) SetConfig

func (server *Server) SetConfig(cnf *config.Config)

SetConfig sets config

func (*Server) SetPreTaskHandler

func (server *Server) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler Sets pre publish handler

type Worker

type Worker struct {
	ConsumerTag string
	Concurrency int
	Queue       string
	// contains filtered or unexported fields
}

Worker represents a single worker process

func (*Worker) CustomQueue

func (worker *Worker) CustomQueue() string

CustomQueue returns Custom Queue of the running worker process

func (*Worker) GetServer

func (worker *Worker) GetServer() *Server

GetServer returns server

func (*Worker) Launch

func (worker *Worker) Launch() error

Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks

func (*Worker) LaunchAsync

func (worker *Worker) LaunchAsync(errorsChan chan<- error)

LaunchAsync is a non blocking version of Launch

func (*Worker) PreConsumeHandler

func (worker *Worker) PreConsumeHandler() bool

func (*Worker) Process

func (worker *Worker) Process(signature *tasks.Signature) error

Process handles received tasks and triggers success/error callbacks

func (*Worker) Quit

func (worker *Worker) Quit()

Quit tears down the running worker process

func (*Worker) SetErrorHandler

func (worker *Worker) SetErrorHandler(handler func(err error))

SetErrorHandler sets a custom error handler for task errors A default behavior is just to log the error after all the retry attempts fail

func (*Worker) SetPostTaskHandler

func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature))

SetPostTaskHandler sets a custom handler for the end of a job

func (*Worker) SetPreConsumeHandler

func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool)

SetPreConsumeHandler sets a custom handler for the end of a job

func (*Worker) SetPreTaskHandler

func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature))

SetPreTaskHandler sets a custom handler func before a job is started

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL