gocelery

package module
v0.0.0-...-52011c8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2024 License: MIT Imports: 16 Imported by: 0

README

gocelery

Go Client/Server for Celery Distributed Task Queue

Build Status Coverage Status Go Report Card GoDoc License FOSSA Status

Why?

Having been involved in several projects migrating servers from Python to Go, I have realized Go can improve performance of existing python web applications. As Celery distributed tasks are often used in such web applications, this library allows you to both implement celery workers and submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

Starting from version 4.0, Celery uses message protocol version 2 as default value. GoCelery does not yet support message protocol version 2, so you must explicitly set CELERY_TASK_PROTOCOL to 1.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,

Example

GoCelery GoDoc has good examples.
Also take a look at example directory for sample python code.

GoCelery Worker Example

Run Celery Worker implemented in Go

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	5, // number of workers
)

// task
add := func(a, b int) int {
	return a + b
}

// register task
cli.Register("worker.add", add)

// start workers (non-blocking call)
cli.StartWorker()

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)
cli.StopWorker()
Python Client Example

Submit Task from Python Client

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())
Python Worker Example

Run Celery Worker implemented in Python

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle
GoCelery Client Example

Submit Task from Go Client

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
	panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
	panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))

Sample Celery Task Message

Celery Message Protocol Version 1

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

Projects

Please let us know if you use gocelery in your project!

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.

Documentation

Overview

Package gocelery is Celery Distributed Task Queue in Go

Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

This package can also be used as pure go distributed task queue.

Supported brokers/backends

  • Redis (broker/backend)
  • AMQP (broker/backend)

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']  # Ignore other content
CELERY_RESULT_SERIALIZER='json'
CELERY_ENABLE_UTC=True
Example (Client)
// create redis connection pool
redisPool := &redis.Pool{
	Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli := NewCeleryClient(
	NewRedisBroker(redisPool),
	&RedisCeleryBackend{Pool: redisPool},
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
	panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
	panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
Output:

Example (ClientWithNamedArguments)
// create redis connection pool
// create redis connection pool
redisPool := &redis.Pool{
	Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli := NewCeleryClient(
	NewRedisBroker(redisPool),
	&RedisCeleryBackend{Pool: redisPool},
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.DelayKwargs(
	taskName,
	map[string]interface{}{
		"a": argA,
		"b": argB,
	},
)
if err != nil {
	panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
	panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
Output:

Example (Worker)
// create redis connection pool
redisPool := &redis.Pool{
	Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli := NewCeleryWorker(
	NewRedisBroker(redisPool),
	&RedisCeleryBackend{
		Pool:           redisPool,
		ExpireDuration: 24 * time.Hour,
	},
	5,
)

// task
add := func(a, b int) int {
	return a + b
}

// register task
cli.Register("add", add)

// start workers (non-blocking call)
cli.StartWorker()

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)
cli.StopWorker()
Output:

Example (WorkerWithContext)
// create redis connection pool
redisPool := &redis.Pool{
	Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli := NewCeleryWorker(
	NewRedisBroker(redisPool),
	&RedisCeleryBackend{
		Pool:           redisPool,
		ExpireDuration: 24 * time.Hour,
	},
	5,
)

// task
add := func(a, b int) int {
	return a + b
}

// register task
cli.Register("add", add)

// context with cancelFunc to handle exit gracefully
ctx, cancel := context.WithCancel(context.Background())

// start workers (non-blocking call)
cli.StartWorkerWithContext(ctx)

// wait for client request
time.Sleep(10 * time.Second)

// stop workers by cancelling context
cancel()

// optional: wait for all workers to terminate
cli.StopWait()
Output:

Example (WorkerWithNamedArguments)
package main

import (
	"fmt"
	"time"

	"github.com/gomodule/redigo/redis"
)

// exampleAddTask is integer addition task
// with named arguments
type exampleAddTask struct {
	a int
	b int
}

func (a *exampleAddTask) ParseKwargs(kwargs map[string]interface{}) error {
	kwargA, ok := kwargs["a"]
	if !ok {
		return fmt.Errorf("undefined kwarg a")
	}
	kwargAFloat, ok := kwargA.(float64)
	if !ok {
		return fmt.Errorf("malformed kwarg a")
	}
	a.a = int(kwargAFloat)
	kwargB, ok := kwargs["b"]
	if !ok {
		return fmt.Errorf("undefined kwarg b")
	}
	kwargBFloat, ok := kwargB.(float64)
	if !ok {
		return fmt.Errorf("malformed kwarg b")
	}
	a.b = int(kwargBFloat)
	return nil
}

func (a *exampleAddTask) RunTask() (interface{}, error) {
	result := a.a + a.b
	return result, nil
}

func main() {

	// create redis connection pool
	redisPool := &redis.Pool{
		Dial: func() (redis.Conn, error) {
			c, err := redis.DialURL("redis://")
			if err != nil {
				return nil, err
			}
			return c, err
		},
	}

	// initialize celery client
	cli := NewCeleryWorker(
		NewRedisBroker(redisPool),
		&RedisCeleryBackend{
			Pool:           redisPool,
			ExpireDuration: 24 * time.Hour,
		},
		5,
	)

	// register task
	cli.Register("add", &exampleAddTask{})

	// start workers (non-blocking call)
	cli.StartWorker()

	// wait for client request
	time.Sleep(10 * time.Second)

	// stop workers gracefully (blocking call)
	cli.StopWorker()
}
Output:

Index

Examples

Constants

View Source
const DefaultRetryDelay = 5

Variables

View Source
var NoBackendConfigured = fmt.Errorf("no backend configured, no result returned")
View Source
var ResultNotAvailableYet = fmt.Errorf("result not available yet")

Functions

func GetRealValue

func GetRealValue(val *reflect.Value) interface{}

GetRealValue returns real value of reflect.Value Required for JSON Marshalling

func NewRedisPool deprecated

func NewRedisPool(uri string) *redis.Pool

NewRedisPool creates pool of redis connections from given connection string

Deprecated: newRedisPool exists for historical compatibility and should not be used. Pool should be initialized outside of gocelery package.

Types

type AMQPCeleryBackend

type AMQPCeleryBackend struct {
	*AMQPSession
	ExpireDuration time.Duration
}
AMQPCeleryBackend CeleryBackend for AMQP

The difference between amqpbackend and rpcbackend: amqpbackend => reply to task_id queue rpcbackend => reply to celery_backend exchange and route the message to the client side who have waiting for it through the {oid}_result queue which is binding to celery_backend exchange.

func NewAMQPCeleryBackend

func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend

NewAMQPCeleryBackend creates new AMQPCeleryBackend

func NewAMQPCeleryBackendByAMQPSession

func NewAMQPCeleryBackendByAMQPSession(session *AMQPSession) *AMQPCeleryBackend

NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel

func (*AMQPCeleryBackend) GetResult

func (b *AMQPCeleryBackend) GetResult(taskID string) (*ResultMessage, error)

GetResult retrieves result from AMQP queue

func (*AMQPCeleryBackend) Init

func (b *AMQPCeleryBackend) Init(string) error

func (*AMQPCeleryBackend) SetResult

func (b *AMQPCeleryBackend) SetResult(taskID string, result *ResultMessage) error

SetResult sets result back to AMQP queue

type AMQPCeleryBroker

type AMQPCeleryBroker struct {
	*AMQPSession
	DirectExchange *AMQPExchange
	RpcQueue       *AMQPQueue
	DispatchQueue  *AMQPQueue

	Initialized bool
	Rate        int
	// contains filtered or unexported fields
}

AMQPCeleryBroker is RedisBroker for AMQP

func NewAMQPCeleryBroker

func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker

NewAMQPCeleryBroker creates new AMQPCeleryBroker

func NewAMQPCeleryBrokerByAMQPSession

func NewAMQPCeleryBrokerByAMQPSession(session *AMQPSession) *AMQPCeleryBroker

NewAMQPCeleryBrokerByConnAndChannel creates new AMQPCeleryBroker using AMQP conn and channel

func (*AMQPCeleryBroker) GetCeleryMessage

func (b *AMQPCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)

func (*AMQPCeleryBroker) GetConsumerChannel

func (b *AMQPCeleryBroker) GetConsumerChannel() <-chan amqp.Delivery

func (*AMQPCeleryBroker) Init

func (b *AMQPCeleryBroker) Init(oid string) error

Init will declare all exchanges or queues we need.

func (*AMQPCeleryBroker) SendCeleryMessage

func (b *AMQPCeleryBroker) SendCeleryMessage(message *CeleryMessage) error

SendCeleryMessage sends CeleryMessage to broker

func (*AMQPCeleryBroker) StartConsumingChannel

func (b *AMQPCeleryBroker) StartConsumingChannel() error

StartConsumingChannel spawns receiving channel on AMQP queue

type AMQPExchange

type AMQPExchange struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
}

AMQPExchange stores AMQP Exchange configuration

type AMQPQueue

type AMQPQueue struct {
	Name       string
	Durable    bool
	AutoDelete bool
}

AMQPQueue stores AMQP RpcQueue configuration

type AMQPSession

type AMQPSession struct {
	ConsumerDeliveryChannel <-chan amqp.Delivery

	RWLocker                  sync.RWMutex
	ConnectionCloseNotifyChan chan *amqp.Error
	ChannelCloseNotifyChan    chan *amqp.Error
	*amqp.Channel
	// contains filtered or unexported fields
}

func NewAMQPSession

func NewAMQPSession(url string) (*AMQPSession, error)

func (*AMQPSession) Publish

func (p *AMQPSession) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

func (*AMQPSession) SetupReconnectHooks

func (p *AMQPSession) SetupReconnectHooks(hook ReconnectFunc)

type AsyncResult

type AsyncResult struct {
	TaskID string
	// contains filtered or unexported fields
}

AsyncResult represents pending result

func (*AsyncResult) AsyncGet

func (ar *AsyncResult) AsyncGet() (interface{}, error)

AsyncGet gets actual result from backend and returns nil if not available

func (*AsyncResult) Get

func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)

Get gets actual result from backend It blocks for period of time set by timeout and returns error if unavailable

func (*AsyncResult) Ready

func (ar *AsyncResult) Ready() (bool, error)

Ready checks if actual result is ready

type CeleryBackend

type CeleryBackend interface {
	Init(string) error
	GetResult(string) (*ResultMessage, error) // must be non-blocking
	SetResult(taskID string, result *ResultMessage) error
}

CeleryBackend is interface for celery backend database

type CeleryBroker

type CeleryBroker interface {
	Init(string) error
	SendCeleryMessage(*CeleryMessage) error
	GetCeleryMessage() (*CeleryMessage, error) // must be non-blocking
}

CeleryBroker is interface for celery broker database

type CeleryClient

type CeleryClient struct {
	// contains filtered or unexported fields
}

CeleryClient provides API for sending celery tasks

func NewCeleryClient

func NewCeleryClient(broker CeleryBroker, backend CeleryBackend) *CeleryClient

NewCeleryClient creates new celery client

func (*CeleryClient) Call

func (cc *CeleryClient) Call(task, routingKey string, args ...interface{}) (*AsyncResult, error)

Call route the task to a specified worker, and gets asynchronous result

func (*CeleryClient) CallKwargs

func (cc *CeleryClient) CallKwargs(task, routingKey string, args map[string]interface{}) (*AsyncResult, error)

CallKwargs route the task to a specified worker, and gets asynchronous result with argument map

func (*CeleryClient) Delay

func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)

Delay gets asynchronous result

func (*CeleryClient) DelayKwargs

func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)

DelayKwargs gets asynchronous results with argument map

func (*CeleryClient) Init

func (cc *CeleryClient) Init() error

Delay gets asynchronous result

type CeleryDeliveryInfo

type CeleryDeliveryInfo struct {
	Priority   uint8  `json:"priority"`
	RoutingKey string `json:"routing_key"`
	Exchange   string `json:"exchange"`
}

CeleryDeliveryInfo represents deliveryinfo json

type CeleryMessage

type CeleryMessage struct {
	Body            string                 `json:"body"`
	Headers         map[string]interface{} `json:"headers,omitempty"`
	ContentType     string                 `json:"content-type"`
	Properties      CeleryProperties       `json:"properties"`
	ContentEncoding string                 `json:"content-encoding"`
}

CeleryMessage is actual message to be sent to Redis

func (*CeleryMessage) GetTaskMessage

func (cm *CeleryMessage) GetTaskMessage() *TaskMessage

GetTaskMessage retrieve and decode task messages from broker

type CeleryProperties

type CeleryProperties struct {
	BodyEncoding  string              `json:"body_encoding"`
	CorrelationID string              `json:"correlation_id"`
	ReplyTo       string              `json:"reply_to"`
	DeliveryInfo  *CeleryDeliveryInfo `json:"delivery_info"`
	DeliveryMode  uint8               `json:"delivery_mode"`
	DeliveryTag   string              `json:"delivery_tag"`
}

CeleryProperties represents properties json

type CeleryTask

type CeleryTask interface {

	// ParseKwargs - define a method to parse kwargs
	ParseKwargs(map[string]interface{}) error

	// RunTask - define a method for execution
	RunTask() (interface{}, error)
}

CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()

type CeleryWorker

type CeleryWorker struct {
	// contains filtered or unexported fields
}

CeleryWorker represents distributed task worker

func NewCeleryWorker

func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker

NewCeleryWorker returns new celery worker

func (*CeleryWorker) GetNumWorkers

func (w *CeleryWorker) GetNumWorkers() int

GetNumWorkers returns number of currently running workers

func (*CeleryWorker) GetTask

func (w *CeleryWorker) GetTask(name string) interface{}

GetTask retrieves registered task

func (*CeleryWorker) Init

func (w *CeleryWorker) Init() error

func (*CeleryWorker) Register

func (w *CeleryWorker) Register(name string, task interface{})

Register registers tasks (functions)

func (*CeleryWorker) RunOnce

func (w *CeleryWorker) RunOnce()

func (*CeleryWorker) RunTask

func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)

RunTask runs celery task

func (*CeleryWorker) SetOID

func (w *CeleryWorker) SetOID(oid string)

func (*CeleryWorker) StartWorker

func (w *CeleryWorker) StartWorker()

StartWorker starts celery workers

func (*CeleryWorker) StartWorkerWithContext

func (w *CeleryWorker) StartWorkerWithContext(ctx context.Context)

StartWorkerWithContext starts celery worker(s) with given parent context

func (*CeleryWorker) StopWait

func (w *CeleryWorker) StopWait()

StopWait waits for celery workers to terminate

func (*CeleryWorker) StopWorker

func (w *CeleryWorker) StopWorker()

StopWorker stops celery workers

type CeleryWorkerSpec

type CeleryWorkerSpec struct {
	// contains filtered or unexported fields
}

type ReconnectFunc

type ReconnectFunc func() error

type RedisCeleryBackend

type RedisCeleryBackend struct {
	*redis.Pool
	ExpireDuration time.Duration
}

RedisCeleryBackend is celery backend for redis

func NewRedisBackend

func NewRedisBackend(conn *redis.Pool) *RedisCeleryBackend

NewRedisBackend creates new RedisCeleryBackend with given redis pool. RedisCeleryBackend can be initialized manually as well.

func NewRedisCeleryBackend deprecated

func NewRedisCeleryBackend(uri string) *RedisCeleryBackend

NewRedisCeleryBackend creates new RedisCeleryBackend

Deprecated: NewRedisCeleryBackend exists for historical compatibility and should not be used. Pool should be initialized outside of gocelery package.

func (*RedisCeleryBackend) GetResult

func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)

GetResult queries redis backend to get asynchronous result

func (*RedisCeleryBackend) Init

func (cb *RedisCeleryBackend) Init(string) error

func (*RedisCeleryBackend) SetResult

func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error

SetResult pushes result back into redis backend

type RedisCeleryBroker

type RedisCeleryBroker struct {
	*redis.Pool
	DispatchBaseQueueName string
	RpcBaseQueueName      string
	// contains filtered or unexported fields
}

RedisCeleryBroker is celery broker for redis

func NewRedisBroker

func NewRedisBroker(conn *redis.Pool) *RedisCeleryBroker

NewRedisBroker creates new RedisCeleryBroker with given redis connection pool

func NewRedisCeleryBroker deprecated

func NewRedisCeleryBroker(uri string) *RedisCeleryBroker

NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri

Deprecated: NewRedisCeleryBroker exists for historical compatibility and should not be used. Use NewRedisBroker instead to create new RedisCeleryBroker.

func (*RedisCeleryBroker) GetCeleryMessage

func (cb *RedisCeleryBroker) GetCeleryMessage() (*CeleryMessage, error)

GetCeleryMessage retrieves celery message from redis queue

func (*RedisCeleryBroker) GetTaskMessage

func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error)

GetTaskMessage retrieves task message from redis queue

func (*RedisCeleryBroker) Init

func (cb *RedisCeleryBroker) Init(oid string) error

func (*RedisCeleryBroker) SendCeleryMessage

func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error

SendCeleryMessage sends CeleryMessage to redis queue

type ResultMessage

type ResultMessage struct {
	ID        string        `json:"task_id"`
	Status    string        `json:"status"`
	Traceback interface{}   `json:"traceback"`
	Result    interface{}   `json:"result"`
	Children  []interface{} `json:"children"`
}

ResultMessage is return message received from broker

type RpcCeleryBackend

type RpcCeleryBackend struct {
	*AMQPSession
	Queue    *AMQPQueue
	Exchange *AMQPExchange

	Initialized    bool
	ExpireDuration time.Duration
	// contains filtered or unexported fields
}
RpcCeleryBackend CeleryBackend for AMQP rpc

The difference between amqpbackend and rpcbackend: amqpbackend => reply to task_id queue rpcbackend => reply to celery_backend exchange and route the message to the client side who have waiting for it through the {oid}_result queue which is binding to celery_backend exchange.

func NewRpcCeleryBackend

func NewRpcCeleryBackend(host string) *RpcCeleryBackend

NewRpcCeleryBackend creates new RpcCeleryBackend

func NewRpcCeleryBackendByAMQPSession

func NewRpcCeleryBackendByAMQPSession(session *AMQPSession) *RpcCeleryBackend

NewRpcCeleryBackendByConnAndChannel creates new RpcCeleryBackend by AMQP connection and channel

func (*RpcCeleryBackend) GetConsumerChannel

func (b *RpcCeleryBackend) GetConsumerChannel() <-chan amqp.Delivery

func (*RpcCeleryBackend) GetResult

func (b *RpcCeleryBackend) GetResult(taskID string) (*ResultMessage, error)

GetResult retrieves result from queue named by oid

func (*RpcCeleryBackend) Init

func (b *RpcCeleryBackend) Init(oid string) error

func (*RpcCeleryBackend) SetResult

func (b *RpcCeleryBackend) SetResult(taskID string, result *ResultMessage) error

SetResult sets result back to result exchange

type TaskMessage

type TaskMessage struct {
	ID      string                 `json:"id"`
	Task    string                 `json:"task"`
	Args    []interface{}          `json:"args"`
	Kwargs  map[string]interface{} `json:"kwargs"`
	Retries int                    `json:"retries"`
	ETA     *string                `json:"eta"`
	Expires *time.Time             `json:"expires"`
}

TaskMessage is celery-compatible message

func DecodeTaskMessage

func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)

DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object

func (*TaskMessage) Encode

func (tm *TaskMessage) Encode() (string, error)

Encode returns base64 json encoded string

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL