Documentation ¶
Overview ¶
Package gocelery is Celery Distributed Task Queue in Go
Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.
This package can also be used as pure go distributed task queue.
Supported brokers/backends
- Redis (broker/backend)
- AMQP (broker/backend)
Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.
CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] # Ignore other content CELERY_RESULT_SERIALIZER='json' CELERY_ENABLE_UTC=True
Example (Client) ¶
// create redis connection pool redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { c, err := redis.DialURL("redis://") if err != nil { return nil, err } return c, err }, } // initialize celery client cli := NewCeleryClient( NewRedisBroker(redisPool), &RedisCeleryBackend{Pool: redisPool}, ) // prepare arguments taskName := "worker.add" argA := rand.Intn(10) argB := rand.Intn(10) // run task asyncResult, err := cli.Delay(taskName, argA, argB) if err != nil { panic(err) } // get results from backend with timeout res, err := asyncResult.Get(10 * time.Second) if err != nil { panic(err) } log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
Output:
Example (ClientWithNamedArguments) ¶
// create redis connection pool // create redis connection pool redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { c, err := redis.DialURL("redis://") if err != nil { return nil, err } return c, err }, } // initialize celery client cli := NewCeleryClient( NewRedisBroker(redisPool), &RedisCeleryBackend{Pool: redisPool}, ) // prepare arguments taskName := "worker.add" argA := rand.Intn(10) argB := rand.Intn(10) // run task asyncResult, err := cli.DelayKwargs( taskName, map[string]interface{}{ "a": argA, "b": argB, }, ) if err != nil { panic(err) } // get results from backend with timeout res, err := asyncResult.Get(10 * time.Second) if err != nil { panic(err) } log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
Output:
Example (Worker) ¶
// create redis connection pool redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { c, err := redis.DialURL("redis://") if err != nil { return nil, err } return c, err }, } // initialize celery client cli := NewCeleryWorker( NewRedisBroker(redisPool), &RedisCeleryBackend{ Pool: redisPool, ExpireDuration: 24 * time.Hour, }, 5, ) // task add := func(a, b int) int { return a + b } // register task cli.Register("add", add) // start workers (non-blocking call) cli.StartWorker() // wait for client request time.Sleep(10 * time.Second) // stop workers gracefully (blocking call) cli.StopWorker()
Output:
Example (WorkerWithContext) ¶
// create redis connection pool redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { c, err := redis.DialURL("redis://") if err != nil { return nil, err } return c, err }, } // initialize celery client cli := NewCeleryWorker( NewRedisBroker(redisPool), &RedisCeleryBackend{ Pool: redisPool, ExpireDuration: 24 * time.Hour, }, 5, ) // task add := func(a, b int) int { return a + b } // register task cli.Register("add", add) // context with cancelFunc to handle exit gracefully ctx, cancel := context.WithCancel(context.Background()) // start workers (non-blocking call) cli.StartWorkerWithContext(ctx) // wait for client request time.Sleep(10 * time.Second) // stop workers by cancelling context cancel() // optional: wait for all workers to terminate cli.StopWait()
Output:
Example (WorkerWithNamedArguments) ¶
package main import ( "fmt" "time" "github.com/gomodule/redigo/redis" ) // exampleAddTask is integer addition task // with named arguments type exampleAddTask struct { a int b int } func (a *exampleAddTask) ParseKwargs(kwargs map[string]interface{}) error { kwargA, ok := kwargs["a"] if !ok { return fmt.Errorf("undefined kwarg a") } kwargAFloat, ok := kwargA.(float64) if !ok { return fmt.Errorf("malformed kwarg a") } a.a = int(kwargAFloat) kwargB, ok := kwargs["b"] if !ok { return fmt.Errorf("undefined kwarg b") } kwargBFloat, ok := kwargB.(float64) if !ok { return fmt.Errorf("malformed kwarg b") } a.b = int(kwargBFloat) return nil } func (a *exampleAddTask) RunTask() (interface{}, error) { result := a.a + a.b return result, nil } func main() { // create redis connection pool redisPool := &redis.Pool{ Dial: func() (redis.Conn, error) { c, err := redis.DialURL("redis://") if err != nil { return nil, err } return c, err }, } // initialize celery client cli := NewCeleryWorker( NewRedisBroker(redisPool), &RedisCeleryBackend{ Pool: redisPool, ExpireDuration: 24 * time.Hour, }, 5, ) // register task cli.Register("add", &exampleAddTask{}) // start workers (non-blocking call) cli.StartWorker() // wait for client request time.Sleep(10 * time.Second) // stop workers gracefully (blocking call) cli.StopWorker() }
Output:
Index ¶
- Constants
- Variables
- func GetRealValue(val *reflect.Value) interface{}
- func NewRedisPool(uri string) *redis.Pooldeprecated
- type AMQPCeleryBackend
- type AMQPCeleryBroker
- func (b *AMQPCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)
- func (b *AMQPCeleryBroker) GetConsumerChannel() <-chan amqp.Delivery
- func (b *AMQPCeleryBroker) Init(oid string) error
- func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
- func (b *AMQPCeleryBroker) StartConsumingChannel() error
- type AMQPExchange
- type AMQPQueue
- type AMQPSession
- type AsyncResult
- type CeleryBackend
- type CeleryBroker
- type CeleryClient
- func (cc *CeleryClient) Call(task, routingKey string, args ...interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) CallKwargs(task, routingKey string, args map[string]interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) Init() error
- type CeleryDeliveryInfo
- type CeleryMessage
- type CeleryProperties
- type CeleryTask
- type CeleryWorker
- func (w *CeleryWorker) GetNumWorkers() int
- func (w *CeleryWorker) GetTask(name string) interface{}
- func (w *CeleryWorker) Init() error
- func (w *CeleryWorker) Register(name string, task interface{})
- func (w *CeleryWorker) RunOnce()
- func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
- func (w *CeleryWorker) SetOID(oid string)
- func (w *CeleryWorker) StartWorker()
- func (w *CeleryWorker) StartWorkerWithContext(ctx context.Context)
- func (w *CeleryWorker) StopWait()
- func (w *CeleryWorker) StopWorker()
- type CeleryWorkerSpec
- type ReconnectFunc
- type RedisCeleryBackend
- type RedisCeleryBroker
- type ResultMessage
- type RpcCeleryBackend
- type TaskMessage
Examples ¶
Constants ¶
const DefaultRetryDelay = 5
Variables ¶
var NoBackendConfigured = fmt.Errorf("no backend configured, no result returned")
var ResultNotAvailableYet = fmt.Errorf("result not available yet")
Functions ¶
func GetRealValue ¶
GetRealValue returns real value of reflect.Value Required for JSON Marshalling
func NewRedisPool
deprecated
Types ¶
type AMQPCeleryBackend ¶
type AMQPCeleryBackend struct { *AMQPSession ExpireDuration time.Duration }
AMQPCeleryBackend CeleryBackend for AMQP
The difference between amqpbackend and rpcbackend: amqpbackend => reply to task_id queue rpcbackend => reply to celery_backend exchange and route the message to the client side who have waiting for it through the {oid}_result queue which is binding to celery_backend exchange.
func NewAMQPCeleryBackend ¶
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend
NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackendByAMQPSession ¶
func NewAMQPCeleryBackendByAMQPSession(session *AMQPSession) *AMQPCeleryBackend
NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel
func (*AMQPCeleryBackend) GetResult ¶
func (b *AMQPCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult retrieves result from AMQP queue
func (*AMQPCeleryBackend) Init ¶
func (b *AMQPCeleryBackend) Init(string) error
func (*AMQPCeleryBackend) SetResult ¶
func (b *AMQPCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult sets result back to AMQP queue
type AMQPCeleryBroker ¶
type AMQPCeleryBroker struct { *AMQPSession DirectExchange *AMQPExchange RpcQueue *AMQPQueue DispatchQueue *AMQPQueue Initialized bool Rate int // contains filtered or unexported fields }
AMQPCeleryBroker is RedisBroker for AMQP
func NewAMQPCeleryBroker ¶
func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker
NewAMQPCeleryBroker creates new AMQPCeleryBroker
func NewAMQPCeleryBrokerByAMQPSession ¶
func NewAMQPCeleryBrokerByAMQPSession(session *AMQPSession) *AMQPCeleryBroker
NewAMQPCeleryBrokerByConnAndChannel creates new AMQPCeleryBroker using AMQP conn and channel
func (*AMQPCeleryBroker) GetCeleryMessage ¶
func (b *AMQPCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)
func (*AMQPCeleryBroker) GetConsumerChannel ¶
func (b *AMQPCeleryBroker) GetConsumerChannel() <-chan amqp.Delivery
func (*AMQPCeleryBroker) Init ¶
func (b *AMQPCeleryBroker) Init(oid string) error
Init will declare all exchanges or queues we need.
func (*AMQPCeleryBroker) SendCeleryMessage ¶
func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
SendCeleryMessage sends CeleryMessage to broker
func (*AMQPCeleryBroker) StartConsumingChannel ¶
func (b *AMQPCeleryBroker) StartConsumingChannel() error
StartConsumingChannel spawns receiving channel on AMQP queue
type AMQPExchange ¶
AMQPExchange stores AMQP Exchange configuration
type AMQPSession ¶
type AMQPSession struct { ConsumerDeliveryChannel <-chan amqp.Delivery RWLocker sync.RWMutex ConnectionCloseNotifyChan chan *amqp.Error ChannelCloseNotifyChan chan *amqp.Error *amqp.Channel // contains filtered or unexported fields }
func NewAMQPSession ¶
func NewAMQPSession(url string) (*AMQPSession, error)
func (*AMQPSession) Publish ¶
func (p *AMQPSession) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
func (*AMQPSession) SetupReconnectHooks ¶
func (p *AMQPSession) SetupReconnectHooks(hook ReconnectFunc)
type AsyncResult ¶
type AsyncResult struct { TaskID string // contains filtered or unexported fields }
AsyncResult represents pending result
func (*AsyncResult) AsyncGet ¶
func (ar *AsyncResult) AsyncGet() (interface{}, error)
AsyncGet gets actual result from backend and returns nil if not available
func (*AsyncResult) Get ¶
func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)
Get gets actual result from backend It blocks for period of time set by timeout and returns error if unavailable
func (*AsyncResult) Ready ¶
func (ar *AsyncResult) Ready() (bool, error)
Ready checks if actual result is ready
type CeleryBackend ¶
type CeleryBackend interface { Init(string) error GetResult(string) (*ResultMessage, error) // must be non-blocking SetResult(taskID string, result *ResultMessage) error }
CeleryBackend is interface for celery backend database
type CeleryBroker ¶
type CeleryBroker interface { Init(string) error SendCeleryMessage(*CeleryMessage) error GetCeleryMessage() (*CeleryMessage, error) // must be non-blocking }
CeleryBroker is interface for celery broker database
type CeleryClient ¶
type CeleryClient struct {
// contains filtered or unexported fields
}
CeleryClient provides API for sending celery tasks
func NewCeleryClient ¶
func NewCeleryClient(broker CeleryBroker, backend CeleryBackend) *CeleryClient
NewCeleryClient creates new celery client
func (*CeleryClient) Call ¶
func (cc *CeleryClient) Call(task, routingKey string, args ...interface{}) (*AsyncResult, error)
Call route the task to a specified worker, and gets asynchronous result
func (*CeleryClient) CallKwargs ¶
func (cc *CeleryClient) CallKwargs(task, routingKey string, args map[string]interface{}) (*AsyncResult, error)
CallKwargs route the task to a specified worker, and gets asynchronous result with argument map
func (*CeleryClient) Delay ¶
func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
Delay gets asynchronous result
func (*CeleryClient) DelayKwargs ¶
func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
DelayKwargs gets asynchronous results with argument map
type CeleryDeliveryInfo ¶
type CeleryDeliveryInfo struct { Priority uint8 `json:"priority"` RoutingKey string `json:"routing_key"` Exchange string `json:"exchange"` }
CeleryDeliveryInfo represents deliveryinfo json
type CeleryMessage ¶
type CeleryMessage struct { Body string `json:"body"` Headers map[string]interface{} `json:"headers,omitempty"` ContentType string `json:"content-type"` Properties CeleryProperties `json:"properties"` ContentEncoding string `json:"content-encoding"` }
CeleryMessage is actual message to be sent to Redis
func (*CeleryMessage) GetTaskMessage ¶
func (cm *CeleryMessage) GetTaskMessage() *TaskMessage
GetTaskMessage retrieve and decode task messages from broker
type CeleryProperties ¶
type CeleryProperties struct { BodyEncoding string `json:"body_encoding"` CorrelationID string `json:"correlation_id"` ReplyTo string `json:"reply_to"` DeliveryInfo *CeleryDeliveryInfo `json:"delivery_info"` DeliveryMode uint8 `json:"delivery_mode"` DeliveryTag string `json:"delivery_tag"` }
CeleryProperties represents properties json
type CeleryTask ¶
type CeleryTask interface { // ParseKwargs - define a method to parse kwargs ParseKwargs(map[string]interface{}) error // RunTask - define a method for execution RunTask() (interface{}, error) }
CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()
type CeleryWorker ¶
type CeleryWorker struct {
// contains filtered or unexported fields
}
CeleryWorker represents distributed task worker
func NewCeleryWorker ¶
func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker
NewCeleryWorker returns new celery worker
func (*CeleryWorker) GetNumWorkers ¶
func (w *CeleryWorker) GetNumWorkers() int
GetNumWorkers returns number of currently running workers
func (*CeleryWorker) GetTask ¶
func (w *CeleryWorker) GetTask(name string) interface{}
GetTask retrieves registered task
func (*CeleryWorker) Init ¶
func (w *CeleryWorker) Init() error
func (*CeleryWorker) Register ¶
func (w *CeleryWorker) Register(name string, task interface{})
Register registers tasks (functions)
func (*CeleryWorker) RunOnce ¶
func (w *CeleryWorker) RunOnce()
func (*CeleryWorker) RunTask ¶
func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
RunTask runs celery task
func (*CeleryWorker) SetOID ¶
func (w *CeleryWorker) SetOID(oid string)
func (*CeleryWorker) StartWorker ¶
func (w *CeleryWorker) StartWorker()
StartWorker starts celery workers
func (*CeleryWorker) StartWorkerWithContext ¶
func (w *CeleryWorker) StartWorkerWithContext(ctx context.Context)
StartWorkerWithContext starts celery worker(s) with given parent context
func (*CeleryWorker) StopWait ¶
func (w *CeleryWorker) StopWait()
StopWait waits for celery workers to terminate
func (*CeleryWorker) StopWorker ¶
func (w *CeleryWorker) StopWorker()
StopWorker stops celery workers
type CeleryWorkerSpec ¶
type CeleryWorkerSpec struct {
// contains filtered or unexported fields
}
type ReconnectFunc ¶
type ReconnectFunc func() error
type RedisCeleryBackend ¶
RedisCeleryBackend is celery backend for redis
func NewRedisBackend ¶
func NewRedisBackend(conn *redis.Pool) *RedisCeleryBackend
NewRedisBackend creates new RedisCeleryBackend with given redis pool. RedisCeleryBackend can be initialized manually as well.
func NewRedisCeleryBackend
deprecated
func NewRedisCeleryBackend(uri string) *RedisCeleryBackend
NewRedisCeleryBackend creates new RedisCeleryBackend
Deprecated: NewRedisCeleryBackend exists for historical compatibility and should not be used. Pool should be initialized outside of gocelery package.
func (*RedisCeleryBackend) GetResult ¶
func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult queries redis backend to get asynchronous result
func (*RedisCeleryBackend) Init ¶
func (cb *RedisCeleryBackend) Init(string) error
func (*RedisCeleryBackend) SetResult ¶
func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult pushes result back into redis backend
type RedisCeleryBroker ¶
type RedisCeleryBroker struct { *redis.Pool DispatchBaseQueueName string RpcBaseQueueName string // contains filtered or unexported fields }
RedisCeleryBroker is celery broker for redis
func NewRedisBroker ¶
func NewRedisBroker(conn *redis.Pool) *RedisCeleryBroker
NewRedisBroker creates new RedisCeleryBroker with given redis connection pool
func NewRedisCeleryBroker
deprecated
func NewRedisCeleryBroker(uri string) *RedisCeleryBroker
NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri
Deprecated: NewRedisCeleryBroker exists for historical compatibility and should not be used. Use NewRedisBroker instead to create new RedisCeleryBroker.
func (*RedisCeleryBroker) GetCeleryMessage ¶
func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)
GetCeleryMessage retrieves celery message from redis queue
func (*RedisCeleryBroker) GetTaskMessage ¶
func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error)
GetTaskMessage retrieves task message from redis queue
func (*RedisCeleryBroker) Init ¶
func (cb *RedisCeleryBroker) Init(oid string) error
func (*RedisCeleryBroker) SendCeleryMessage ¶
func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
SendCeleryMessage sends CeleryMessage to redis queue
type ResultMessage ¶
type ResultMessage struct { ID string `json:"task_id"` Status string `json:"status"` Traceback interface{} `json:"traceback"` Result interface{} `json:"result"` Children []interface{} `json:"children"` }
ResultMessage is return message received from broker
type RpcCeleryBackend ¶
type RpcCeleryBackend struct { *AMQPSession Queue *AMQPQueue Exchange *AMQPExchange Initialized bool ExpireDuration time.Duration // contains filtered or unexported fields }
RpcCeleryBackend CeleryBackend for AMQP rpc
The difference between amqpbackend and rpcbackend: amqpbackend => reply to task_id queue rpcbackend => reply to celery_backend exchange and route the message to the client side who have waiting for it through the {oid}_result queue which is binding to celery_backend exchange.
func NewRpcCeleryBackend ¶
func NewRpcCeleryBackend(host string) *RpcCeleryBackend
NewRpcCeleryBackend creates new RpcCeleryBackend
func NewRpcCeleryBackendByAMQPSession ¶
func NewRpcCeleryBackendByAMQPSession(session *AMQPSession) *RpcCeleryBackend
NewRpcCeleryBackendByConnAndChannel creates new RpcCeleryBackend by AMQP connection and channel
func (*RpcCeleryBackend) GetConsumerChannel ¶
func (b *RpcCeleryBackend) GetConsumerChannel() <-chan amqp.Delivery
func (*RpcCeleryBackend) GetResult ¶
func (b *RpcCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult retrieves result from queue named by oid
func (*RpcCeleryBackend) Init ¶
func (b *RpcCeleryBackend) Init(oid string) error
func (*RpcCeleryBackend) SetResult ¶
func (b *RpcCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult sets result back to result exchange
type TaskMessage ¶
type TaskMessage struct { ID string `json:"id"` Task string `json:"task"` Args []interface{} `json:"args"` Kwargs map[string]interface{} `json:"kwargs"` Retries int `json:"retries"` ETA *string `json:"eta"` Expires *time.Time `json:"expires"` }
TaskMessage is celery-compatible message
func DecodeTaskMessage ¶
func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)
DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object
func (*TaskMessage) Encode ¶
func (tm *TaskMessage) Encode() (string, error)
Encode returns base64 json encoded string