Documentation ¶
Overview ¶
Package gocelery is Celery Distributed Task Queue in Go
Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.
This package can also be used as pure go distributed task queue.
Supported brokers/backends
- Redis (broker/backend)
- AMQP (broker/backend)
Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.
CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] # Ignore other content CELERY_RESULT_SERIALIZER='json' CELERY_ENABLE_UTC=True
WIP: NOTICE: this broker is NOT tested yet
Example (Client) ¶
// initialize celery client cli, _ := NewCeleryClient( NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 1, ) // prepare arguments taskName := "worker.add" argA := rand.Intn(10) argB := rand.Intn(10) // run task asyncResult, err := cli.Delay(taskName, argA, argB) if err != nil { panic(err) } // get results from backend with timeout res, err := asyncResult.Get(10 * time.Second) if err != nil { panic(err) } log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
Output:
Example (ClientWithNamedArguments) ¶
// initialize celery client cli, _ := NewCeleryClient( NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 1, ) // prepare arguments taskName := "worker.add" argA := rand.Intn(10) argB := rand.Intn(10) // run task asyncResult, err := cli.DelayKwargs( taskName, map[string]interface{}{ "a": argA, "b": argB, }, ) if err != nil { panic(err) } // get results from backend with timeout res, err := asyncResult.Get(10 * time.Second) if err != nil { panic(err) } log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
Output:
Example (Worker) ¶
// initialize celery client cli, _ := NewCeleryClient( NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 5, // number of workers ) // task add := func(a, b int) int { return a + b } // register task cli.Register("add", add) // start workers (non-blocking call) cli.StartWorker() // wait for client request time.Sleep(10 * time.Second) // stop workers gracefully (blocking call) cli.StopWorker()
Output:
Example (WorkerWithContext) ¶
// initialize celery client cli, _ := NewCeleryClient( NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 1, ) // task add := func(a, b int) int { return a + b } // register task cli.Register("add", add) // context with cancelFunc to handle exit gracefully ctx, cancel := context.WithCancel(context.Background()) // start workers (non-blocking call) cli.StartWorkerWithContext(ctx) // wait for client request time.Sleep(10 * time.Second) // stop workers by cancelling context cancel() // optional: wait for all workers to terminate cli.WaitForStopWorker()
Output:
Example (WorkerWithNamedArguments) ¶
package main import ( "fmt" "time" ) // exampleAddTask is integer addition task // with named arguments type exampleAddTask struct { a int b int } func (a *exampleAddTask) ParseKwargs(kwargs map[string]interface{}) error { kwargA, ok := kwargs["a"] if !ok { return fmt.Errorf("undefined kwarg a") } kwargAFloat, ok := kwargA.(float64) if !ok { return fmt.Errorf("malformed kwarg a") } a.a = int(kwargAFloat) kwargB, ok := kwargs["b"] if !ok { return fmt.Errorf("undefined kwarg b") } kwargBFloat, ok := kwargB.(float64) if !ok { return fmt.Errorf("malformed kwarg b") } a.b = int(kwargBFloat) return nil } func (a *exampleAddTask) RunTask() (interface{}, error) { result := a.a + a.b return result, nil } func main() { // initialize celery client cli, _ := NewCeleryClient( NewRedisCeleryBroker("redis://", ""), NewRedisCeleryBackend("redis://"), 5, // number of workers ) // register task cli.Register("add", &exampleAddTask{}) // start workers (non-blocking call) cli.StartWorker() // wait for client request time.Sleep(10 * time.Second) // stop workers gracefully (blocking call) cli.StopWorker() }
Output:
Index ¶
- Constants
- func GetRealValue(val *reflect.Value) interface{}
- func NewRedisPool(uri string) *redis.Pool
- type AsyncResult
- type CeleryBackend
- type CeleryBroker
- type CeleryClient
- func (cc *CeleryClient) ClearResult(taskID string) error
- func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayJSON(task string, input interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayJSONTo(queue, task string, input interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayKwargsTo(queue, task string, args map[string]interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) DelayTo(queue, task string, args ...interface{}) (*AsyncResult, error)
- func (cc *CeleryClient) FindResult(taskID string) *AsyncResult
- func (cc *CeleryClient) PollResults(handler func(string, interface{}), taskIDs ...string)
- func (cc *CeleryClient) Register(name string, task interface{})
- func (cc *CeleryClient) StartWorker(queues ...string) error
- func (cc *CeleryClient) StartWorkerWithContext(ctx context.Context, queues ...string) (err error)
- func (cc *CeleryClient) StopWorker() (err error)
- func (cc *CeleryClient) WaitForStopWorker()
- type CeleryDeliveryInfo
- type CeleryMessage
- type CeleryProperties
- type CeleryTask
- type CeleryWorker
- func (w *CeleryWorker) GetNumWorkers() int
- func (w *CeleryWorker) GetTask(name string) interface{}
- func (w *CeleryWorker) Register(name string, task interface{})
- func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
- func (w *CeleryWorker) SetRateLimitPeriod(rate time.Duration) *CeleryWorker
- func (w *CeleryWorker) StartWorker(queues ...string) error
- func (w *CeleryWorker) StartWorkerWithContext(ctx context.Context, queues ...string) (err error)
- func (w *CeleryWorker) StopWait()
- func (w *CeleryWorker) StopWorker() (err error)
- func (w *CeleryWorker) StopWorkerWithContext(ctx context.Context) (err error)
- type NSQCeleryBroker
- func (nb *NSQCeleryBroker) GetTaskMessage() (message *TaskMessage, error error)
- func (nb *NSQCeleryBroker) GetTaskMessageFrom(queue string) (message *TaskMessage, err error)
- func (nb *NSQCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
- func (nb *NSQCeleryBroker) SendCeleryMessageTo(queue string, message *CeleryMessage) error
- type NSQConfig
- type NSQHandler
- type QueueIterator
- type RedisCeleryBackend
- type RedisCeleryBroker
- func (cb *RedisCeleryBroker) GetCeleryMessage() (msg *CeleryMessage, err error)
- func (cb *RedisCeleryBroker) GetCeleryMessageFrom(queue string) (*CeleryMessage, error)
- func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error)
- func (cb *RedisCeleryBroker) GetTaskMessageFrom(queue string) (*TaskMessage, error)
- func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
- func (cb *RedisCeleryBroker) SendCeleryMessageTo(queue string, message *CeleryMessage) error
- type ResultMessage
- type TaskMessage
Examples ¶
Constants ¶
const (
DefaultQueueName = "celery"
)
Variables ¶
This section is empty.
Functions ¶
func GetRealValue ¶
GetRealValue returns real value of reflect.Value Required for JSON Marshalling
Types ¶
type AsyncResult ¶
type AsyncResult struct { TaskID string // contains filtered or unexported fields }
AsyncResult represents pending result
func (*AsyncResult) AsyncGet ¶
func (ar *AsyncResult) AsyncGet() (interface{}, error)
AsyncGet gets actual result from backend and returns nil if not available
func (*AsyncResult) Clear ¶
func (ar *AsyncResult) Clear() (err error)
func (*AsyncResult) Get ¶
func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)
Get gets actual result from backend It blocks for period of time set by timeout and returns error if unavailable
func (*AsyncResult) Ready ¶
func (ar *AsyncResult) Ready() (bool, error)
Ready checks if actual result is ready
type CeleryBackend ¶
type CeleryBackend interface { GetResult(string) (*ResultMessage, error) // must be non-blocking SetResult(taskID string, result *ResultMessage) error ClearResult(taskID string) error // one additional api }
CeleryBackend is interface for celery backend database
type CeleryBroker ¶
type CeleryBroker interface { SendCeleryMessage(message *CeleryMessage) error SendCeleryMessageTo(queue string, message *CeleryMessage) error GetTaskMessage() (message *TaskMessage, error error) // must be non-blocking GetTaskMessageFrom(queue string) (message *TaskMessage, error error) ListQueues() []string }
CeleryBroker is interface for celery broker database
type CeleryClient ¶
type CeleryClient struct {
// contains filtered or unexported fields
}
CeleryClient provides API for sending celery tasks
func NewCeleryClient ¶
func NewCeleryClient(broker CeleryBroker, backend CeleryBackend, numWorkers int) (*CeleryClient, error)
NewCeleryClient creates new celery client
func (*CeleryClient) ClearResult ¶
func (cc *CeleryClient) ClearResult(taskID string) error
func (*CeleryClient) Delay ¶
func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
Delay gets asynchronous result
func (*CeleryClient) DelayJSON ¶
func (cc *CeleryClient) DelayJSON(task string, input interface{}) (*AsyncResult, error)
Marshal args as json
func (*CeleryClient) DelayJSONTo ¶
func (cc *CeleryClient) DelayJSONTo(queue, task string, input interface{}) (*AsyncResult, error)
Marshal args as json
func (*CeleryClient) DelayKwargs ¶
func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
DelayKwargs gets asynchronous results with argument map
func (*CeleryClient) DelayKwargsTo ¶
func (cc *CeleryClient) DelayKwargsTo(queue, task string, args map[string]interface{}) (*AsyncResult, error)
DelayKwargs gets asynchronous results with argument map
func (*CeleryClient) DelayTo ¶
func (cc *CeleryClient) DelayTo(queue, task string, args ...interface{}) (*AsyncResult, error)
Delay gets asynchronous result
func (*CeleryClient) FindResult ¶
func (cc *CeleryClient) FindResult(taskID string) *AsyncResult
func (*CeleryClient) PollResults ¶
func (cc *CeleryClient) PollResults(handler func(string, interface{}), taskIDs ...string)
given
func (*CeleryClient) Register ¶
func (cc *CeleryClient) Register(name string, task interface{})
Register task
func (*CeleryClient) StartWorker ¶
func (cc *CeleryClient) StartWorker(queues ...string) error
StartWorker starts celery workers
func (*CeleryClient) StartWorkerWithContext ¶
func (cc *CeleryClient) StartWorkerWithContext(ctx context.Context, queues ...string) (err error)
StartWorkerWithContext starts celery workers with given parent context
func (*CeleryClient) StopWorker ¶
func (cc *CeleryClient) StopWorker() (err error)
StopWorker stops celery workers
func (*CeleryClient) WaitForStopWorker ¶
func (cc *CeleryClient) WaitForStopWorker()
WaitForStopWorker waits for celery workers to terminate
type CeleryDeliveryInfo ¶
type CeleryDeliveryInfo struct { Priority int `json:"priority"` RoutingKey string `json:"routing_key"` Exchange string `json:"exchange"` }
CeleryDeliveryInfo represents deliveryinfo json
type CeleryMessage ¶
type CeleryMessage struct { Body string `json:"body"` Headers map[string]interface{} `json:"headers,omitempty"` ContentType string `json:"content-type"` Properties CeleryProperties `json:"properties"` ContentEncoding string `json:"content-encoding"` }
CeleryMessage is actual message to be sent to Redis
func (*CeleryMessage) GetTaskMessage ¶
func (cm *CeleryMessage) GetTaskMessage() *TaskMessage
GetTaskMessage retrieve and decode task messages from broker
type CeleryProperties ¶
type CeleryProperties struct { BodyEncoding string `json:"body_encoding"` CorrelationID string `json:"correlation_id"` ReplyTo string `json:"reply_to"` DeliveryInfo CeleryDeliveryInfo `json:"delivery_info"` DeliveryMode int `json:"delivery_mode"` DeliveryTag string `json:"delivery_tag"` }
CeleryProperties represents properties json
type CeleryTask ¶
type CeleryTask interface { // ParseKwargs - define a method to parse kwargs ParseKwargs(map[string]interface{}) error RunTask() (interface{}, error) }
CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()
type CeleryWorker ¶
type CeleryWorker struct {
// contains filtered or unexported fields
}
CeleryWorker represents distributed task worker
func NewCeleryWorker ¶
func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker
NewCeleryWorker returns new celery worker
func (*CeleryWorker) GetNumWorkers ¶
func (w *CeleryWorker) GetNumWorkers() int
GetNumWorkers returns number of currently running workers
func (*CeleryWorker) GetTask ¶
func (w *CeleryWorker) GetTask(name string) interface{}
GetTask retrieves registered task
func (*CeleryWorker) Register ¶
func (w *CeleryWorker) Register(name string, task interface{})
Register registers tasks (functions)
func (*CeleryWorker) RunTask ¶
func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
RunTask runs celery task
func (*CeleryWorker) SetRateLimitPeriod ¶
func (w *CeleryWorker) SetRateLimitPeriod(rate time.Duration) *CeleryWorker
func (*CeleryWorker) StartWorker ¶
func (w *CeleryWorker) StartWorker(queues ...string) error
StartWorker starts celery workers
func (*CeleryWorker) StartWorkerWithContext ¶
func (w *CeleryWorker) StartWorkerWithContext(ctx context.Context, queues ...string) (err error)
StartWorkerWithContext starts celery worker(s) with given parent context
func (*CeleryWorker) StopWait ¶
func (w *CeleryWorker) StopWait()
StopWait waits for celery workers to terminate
func (*CeleryWorker) StopWorker ¶
func (w *CeleryWorker) StopWorker() (err error)
StopWorker stops celery workers
func (*CeleryWorker) StopWorkerWithContext ¶
func (w *CeleryWorker) StopWorkerWithContext(ctx context.Context) (err error)
type NSQCeleryBroker ¶
type NSQCeleryBroker struct { *QueueIterator // contains filtered or unexported fields }
func NewNSQCeleryBroker ¶
func NewNSQCeleryBroker(cfg *NSQConfig, channel string, queues ...string) *NSQCeleryBroker
TODO: test NewNSQCeleryBroker creates new NSQCeleryBroker based on given config NOTE: DON'T USE IT NOW, IT IS WORKING IN PROGRESS!!!
func (*NSQCeleryBroker) GetTaskMessage ¶
func (nb *NSQCeleryBroker) GetTaskMessage() (message *TaskMessage, error error)
func (*NSQCeleryBroker) GetTaskMessageFrom ¶
func (nb *NSQCeleryBroker) GetTaskMessageFrom(queue string) (message *TaskMessage, err error)
TODO: NOT TESTED YET TODO: optimize: cache consumer
func (*NSQCeleryBroker) SendCeleryMessage ¶
func (nb *NSQCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
func (*NSQCeleryBroker) SendCeleryMessageTo ¶
func (nb *NSQCeleryBroker) SendCeleryMessageTo(queue string, message *CeleryMessage) error
type NSQConfig ¶
type NSQConfig struct { AddrLookupds []string `mapstructure:"addr_lookupds"` Addrs []string `mapstructure:"addrs"` DialTimeoutSec int `mapstructure:"dial_timeout_sec"` // The server-side message timeout for messages delivered to this client MsgTimeoutSec int `mapstructure:"msg_timeout_sec"` // Not Configured AuthSecret string `mapstructure:"auth_secret"` }
type NSQHandler ¶
type NSQHandler struct {
// contains filtered or unexported fields
}
func NewNSQHandler ¶
func NewNSQHandler(h func(message *nsq.Message) error) *NSQHandler
func (*NSQHandler) HandleMessage ¶
func (h *NSQHandler) HandleMessage(message *nsq.Message) error
type QueueIterator ¶
type QueueIterator struct {
// contains filtered or unexported fields
}
func NewQueueIterator ¶
func NewQueueIterator(queues ...string) *QueueIterator
func (*QueueIterator) DefaultQueueName ¶
func (qi *QueueIterator) DefaultQueueName() string
func (*QueueIterator) Length ¶
func (cb *QueueIterator) Length() int
func (*QueueIterator) ListQueues ¶
func (qi *QueueIterator) ListQueues() []string
func (*QueueIterator) NextQueueName ¶
func (qi *QueueIterator) NextQueueName() string
type RedisCeleryBackend ¶
RedisCeleryBackend is celery backend for redis
func NewRedisCeleryBackend ¶
func NewRedisCeleryBackend(uri string) *RedisCeleryBackend
NewRedisCeleryBackend creates new RedisCeleryBackend
func (*RedisCeleryBackend) ClearResult ¶
func (cb *RedisCeleryBackend) ClearResult(taskID string) error
func (*RedisCeleryBackend) GetResult ¶
func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
GetResult queries redis backend to get asynchronous result
func (*RedisCeleryBackend) SetResult ¶
func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error
SetResult pushes result back into redis backend
type RedisCeleryBroker ¶
type RedisCeleryBroker struct { *redis.Pool *QueueIterator }
RedisCeleryBroker is celery broker for redis
func NewRedisCeleryBroker ¶
func NewRedisCeleryBroker(uri string, queues ...string) *RedisCeleryBroker
NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri
func (*RedisCeleryBroker) GetCeleryMessage ¶
func (cb *RedisCeleryBroker) GetCeleryMessage() (msg *CeleryMessage, err error)
GetCeleryMessage retrieves celery message from redis queue
func (*RedisCeleryBroker) GetCeleryMessageFrom ¶
func (cb *RedisCeleryBroker) GetCeleryMessageFrom(queue string) (*CeleryMessage, error)
func (*RedisCeleryBroker) GetTaskMessage ¶
func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error)
GetTaskMessage retrieves task message from redis queue
func (*RedisCeleryBroker) GetTaskMessageFrom ¶
func (cb *RedisCeleryBroker) GetTaskMessageFrom(queue string) (*TaskMessage, error)
func (*RedisCeleryBroker) SendCeleryMessage ¶
func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
SendCeleryMessage sends CeleryMessage to redis queue
func (*RedisCeleryBroker) SendCeleryMessageTo ¶
func (cb *RedisCeleryBroker) SendCeleryMessageTo(queue string, message *CeleryMessage) error
type ResultMessage ¶
type ResultMessage struct { ID string `json:"task_id"` Status string `json:"status"` Traceback interface{} `json:"traceback"` Result interface{} `json:"result"` Children []interface{} `json:"children"` }
ResultMessage is return message received from broker
type TaskMessage ¶
type TaskMessage struct { ID string `json:"id"` Task string `json:"task"` Args []interface{} `json:"args"` Kwargs map[string]interface{} `json:"kwargs"` Retries int `json:"retries"` ETA *string `json:"eta"` }
TaskMessage is celery-compatible message
func DecodeTaskMessage ¶
func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)
DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object
func (*TaskMessage) Encode ¶
func (tm *TaskMessage) Encode() (string, error)
Encode returns base64 json encoded string