Documentation
¶
Overview ¶
Package gocelery is Celery Distributed Task Queue in Go
Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.
This package can also be used as pure go distributed task queue.
Supported brokers/backends
- Redis (broker/backend)
- AMQP (broker/backend)
Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.
CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] # Ignore other content CELERY_RESULT_SERIALIZER='json' CELERY_ENABLE_UTC=True
Index ¶
- Constants
- func GetRealValue(val *reflect.Value) interface{}
- type AsyncResult
- type CeleryBackend
- type CeleryBroker
- type CeleryClient
- type CeleryDeliveryInfo
- type CeleryMessage
- type CeleryProperties
- type CeleryTask
- type CeleryWorker
- func (w *CeleryWorker) GetNumWorkers() int
- func (w *CeleryWorker) GetTask(name string) interface{}
- func (w *CeleryWorker) Register(name string, task interface{})
- func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
- func (w *CeleryWorker) StartWorker()
- func (w *CeleryWorker) StopWorker()
- type ResultMessage
- type Task
- type TaskMessage
- type TaskSettings
Constants ¶
const ErrTaskRetryable = errString("task failed but retryable")
ErrTaskRetryable indicates that the task failed but need to be retried again.
const ( // MaxValidTime signifies how long is valid by default // Max set to 12 hrs MaxValidTime = 12 * time.Hour )
Variables ¶
This section is empty.
Functions ¶
func GetRealValue ¶
GetRealValue returns real value of reflect.Value Required for JSON Marshalling
Types ¶
type AsyncResult ¶
type AsyncResult struct {
// contains filtered or unexported fields
}
AsyncResult is pending result
func (*AsyncResult) AsyncGet ¶
func (ar *AsyncResult) AsyncGet() (interface{}, error)
AsyncGet gets actual result from backend returns the err if the result is not available yet. Always check Ready if the result is ready to be consumed
func (*AsyncResult) Get ¶
func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)
Get gets actual result from redis It blocks for period of delay set by timeout and return error if unavailable
func (*AsyncResult) Ready ¶
func (ar *AsyncResult) Ready() bool
Ready checks if actual result is ready
type CeleryBackend ¶
type CeleryBackend interface { GetResult(string) (*ResultMessage, error) // must be non-blocking SetResult(taskID string, result *ResultMessage) error }
CeleryBackend is interface for celery backend database
func NewInMemoryBackend ¶
func NewInMemoryBackend() CeleryBackend
NewInMemoryBackend returns a CeleryBackend implemented InMemory.
func NewLevelDBBackend ¶
func NewLevelDBBackend(db *leveldb.DB) CeleryBackend
NewLevelDBBackend returns an levelDB implementation of CeleryBackend
func NewRedisCeleryBackend ¶
func NewRedisCeleryBackend(uri string) CeleryBackend
NewRedisCeleryBackend creates new redisCeleryBackend
type CeleryBroker ¶
type CeleryBroker interface { SendCeleryMessage(*CeleryMessage) error GetTaskMessage() (*TaskMessage, error) // must be non-blocking }
CeleryBroker is interface for celery broker database
func NewInMemoryBroker ¶
func NewInMemoryBroker() CeleryBroker
NewInMemoryBroker returns immeory backed CeleryBroker
func NewLevelDBBroker ¶
func NewLevelDBBroker(db *leveldb.DB, queue string) CeleryBroker
NewLevelDBBroker returns levelDB backed implementation of CeleryBroker We will also initialise any previous state
func NewRedisCeleryBroker ¶
func NewRedisCeleryBroker(uri string) CeleryBroker
NewRedisCeleryBroker creates new redisCeleryBroker based on given uri
type CeleryClient ¶
type CeleryClient struct {
// contains filtered or unexported fields
}
CeleryClient provides API for sending celery tasks
func NewCeleryClient ¶
func NewCeleryClient(broker CeleryBroker, backend CeleryBackend, numWorkers int, workerWaitTimeMS int) (*CeleryClient, error)
NewCeleryClient creates new celery client
func (*CeleryClient) Delay ¶
func (cc *CeleryClient) Delay(task Task) (*AsyncResult, error)
Delay gets asynchronous result
func (*CeleryClient) Register ¶
func (cc *CeleryClient) Register(name string, task interface{})
Register task
func (*CeleryClient) StartWorker ¶
func (cc *CeleryClient) StartWorker()
StartWorker starts celery workers
func (*CeleryClient) StopWorker ¶
func (cc *CeleryClient) StopWorker()
StopWorker stops celery workers
type CeleryDeliveryInfo ¶
type CeleryDeliveryInfo struct { Priority int `json:"priority"` RoutingKey string `json:"routing_key"` Exchange string `json:"exchange"` }
CeleryDeliveryInfo represents deliveryinfo json
type CeleryMessage ¶
type CeleryMessage struct { Body string `json:"body"` Headers map[string]interface{} `json:"headers"` ContentType string `json:"content_type"` Properties CeleryProperties `json:"properties"` ContentEncoding string `json:"content_encoding"` }
CeleryMessage is actual message to be sent to Redis
func (*CeleryMessage) GetTaskMessage ¶
func (cm *CeleryMessage) GetTaskMessage() *TaskMessage
GetTaskMessage retrieve and decode task messages from broker
type CeleryProperties ¶
type CeleryProperties struct { BodyEncoding string `json:"body_encoding"` CorrelationID string `json:"correlation_id"` ReplyTo string `json:"replay_to"` DeliveryInfo CeleryDeliveryInfo `json:"delivery_info"` DeliveryMode int `json:"delivery_mode"` DeliveryTag string `json:"delivery_tag"` }
CeleryProperties represents properties json
type CeleryTask ¶
type CeleryTask interface { // Copy - is used to safely create and execute a copy of a task (stateful) // in a worker when there are multiple workers working on the same type of task but with different internal state. Copy() (CeleryTask, error) // ParseKwargs - define a method to parse kwargs ParseKwargs(map[string]interface{}) error // RunTask - define a method to run RunTask() (interface{}, error) }
CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()
type CeleryWorker ¶
type CeleryWorker struct {
// contains filtered or unexported fields
}
CeleryWorker represents distributed task worker. Not thread safe. Shouldn't be used from within multiple go routines.
func NewCeleryWorker ¶
func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int, waitTimeMS int) *CeleryWorker
NewCeleryWorker returns new celery worker
func (*CeleryWorker) GetNumWorkers ¶
func (w *CeleryWorker) GetNumWorkers() int
GetNumWorkers returns number of currently running workers
func (*CeleryWorker) GetTask ¶
func (w *CeleryWorker) GetTask(name string) interface{}
GetTask retrieves registered task
func (*CeleryWorker) Register ¶
func (w *CeleryWorker) Register(name string, task interface{})
Register registers tasks (functions)
func (*CeleryWorker) RunTask ¶
func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
RunTask runs celery task
func (*CeleryWorker) StartWorker ¶
func (w *CeleryWorker) StartWorker()
StartWorker starts celery worker
func (*CeleryWorker) StopWorker ¶
func (w *CeleryWorker) StopWorker()
StopWorker stops celery workers
type ResultMessage ¶
type ResultMessage struct { Result interface{} `json:"result"` Error string `json:"error"` }
ResultMessage is return message received from broker
type Task ¶
type Task struct { Name string Args []interface{} Kwargs map[string]interface{} Settings *TaskSettings // if Settings is nil, we fallback to default values }
Task represents a task gocelery receives from the client.
type TaskMessage ¶
type TaskMessage struct { ID string `json:"id"` Task string `json:"task"` Args []interface{} `json:"args"` Kwargs map[string]interface{} `json:"kwargs"` Tries uint `json:"tries"` Settings *TaskSettings `json:"settings"` }
TaskMessage is celery-compatible message
func DecodeTaskMessage ¶
func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)
DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object
func (*TaskMessage) Encode ¶
func (tm *TaskMessage) Encode() (string, error)
Encode returns base64 json encoded string
type TaskSettings ¶
type TaskSettings struct { Delay time.Time `json:"delay"` ValidUntil time.Time `json:"valid_until"` }
TaskSettings can be passed to the task with specific overrides.
func DefaultSettings ¶
func DefaultSettings() *TaskSettings
DefaultSettings returns the TaskSettings with all the default values and will be used if the Task.Settings is nil.