machinery

package
v1.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2018 License: MPL-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BackendFactory

func BackendFactory(cnf *config.Config) (backends.Interface, error)

BackendFactory creates a new object of backends.Interface Currently supported backends are AMQP/S and Memcache

func BrokerFactory

func BrokerFactory(cnf *config.Config) (brokers.Interface, error)

BrokerFactory creates a new object of brokers.Interface Currently only AMQP/S broker is supported

func ParseRedisSocketURL

func ParseRedisSocketURL(url string) (path, password string, db int, err error)

ParseRedisSocketURL extracts Redis connection options from a URL with the redis+socket:// scheme. This scheme is not standard (or even de facto) and is used as a transitional mechanism until the the config package gains the proper facilities to support socket-based connections.

func ParseRedisURL

func ParseRedisURL(url string) (host, password string, db int, err error)

ParseRedisURL ...

Types

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the main Machinery object and stores all configuration All the tasks workers process are registered against the server

func NewServer

func NewServer(cnf *config.Config) (*Server, error)

NewServer creates Server instance

func (*Server) GetBackend

func (server *Server) GetBackend() backends.Interface

GetBackend returns backend

func (*Server) GetBroker

func (server *Server) GetBroker() brokers.Interface

GetBroker returns broker

func (*Server) GetConfig

func (server *Server) GetConfig() *config.Config

GetConfig returns connection object

func (*Server) GetRegisteredTask

func (server *Server) GetRegisteredTask(name string) (interface{}, error)

GetRegisteredTask returns registered task by name

func (*Server) GetRegisteredTaskNames

func (server *Server) GetRegisteredTaskNames() []string

GetRegisteredTaskNames returns slice of registered task names

func (*Server) IsTaskRegistered

func (server *Server) IsTaskRegistered(name string) bool

IsTaskRegistered returns true if the task name is registered with this broker

func (*Server) NewWorker

func (server *Server) NewWorker(consumerTag string, concurrency int) *Worker

NewWorker creates Worker instance

func (*Server) RegisterTask

func (server *Server) RegisterTask(name string, taskFunc interface{}) error

RegisterTask registers a single task

func (*Server) RegisterTasks

func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error

RegisterTasks registers all tasks at once

func (*Server) SendChain

func (server *Server) SendChain(chain *tasks.Chain) (*backends.ChainAsyncResult, error)

SendChain triggers a chain of tasks

func (*Server) SendChainWithContext

func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*backends.ChainAsyncResult, error)

SendChainWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendChord

func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*backends.ChordAsyncResult, error)

SendChord triggers a group of parallel tasks with a callback

func (*Server) SendChordWithContext

func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*backends.ChordAsyncResult, error)

SendChordWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendGroup

func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*backends.AsyncResult, error)

SendGroup triggers a group of parallel tasks

func (*Server) SendGroupWithContext

func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*backends.AsyncResult, error)

SendGroupWithContext will inject the trace context in all the signature headers before publishing it

func (*Server) SendTask

func (server *Server) SendTask(signature *tasks.Signature) (*backends.AsyncResult, error)

SendTask publishes a task to the default queue

func (*Server) SendTaskWithContext

func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*backends.AsyncResult, error)

SendTaskWithContext will inject the trace context in the signature headers before publishing it

func (*Server) SetBackend

func (server *Server) SetBackend(backend backends.Interface)

SetBackend sets backend

func (*Server) SetBroker

func (server *Server) SetBroker(broker brokers.Interface)

SetBroker sets broker

func (*Server) SetConfig

func (server *Server) SetConfig(cnf *config.Config)

SetConfig sets config

type Worker

type Worker struct {
	ConsumerTag string
	Concurrency int
	// contains filtered or unexported fields
}

Worker represents a single worker process

func (*Worker) Launch

func (worker *Worker) Launch() error

Launch starts a new worker process. The worker subscribes to the default queue and processes incoming registered tasks

func (*Worker) LaunchAsync

func (worker *Worker) LaunchAsync(errorsChan chan<- error)

LaunchAsync is a non blocking version of Launch

func (*Worker) Process

func (worker *Worker) Process(signature *tasks.Signature) error

Process handles received tasks and triggers success/error callbacks

func (*Worker) Quit

func (worker *Worker) Quit()

Quit tears down the running worker process

func (*Worker) SetErrorHandler

func (worker *Worker) SetErrorHandler(handler func(err error))

SetErrorHandler sets a custom error handler for task errors A default behavior is just to log the error after all the retry attempts fail

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL