gocelery

package module
v0.0.0-...-c17fbcb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2021 License: MIT Imports: 11 Imported by: 6

README

gocelery

Go Client/Server for Celery Distributed Task Queue

Build Status Coverage Status Go Report Card GoDoc License motivation FOSSA Status

Why?

Having being involved in a number of projects migrating server from python to go, I have realized Go can help improve performance of existing python web applications. Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,

Celery Worker Example

Run Celery Worker implemented in Go

// example/worker/main.go

// Celery Task
func add(a int, b int) int {
	return a + b
}

func main() {
    // create broker and backend
	celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
    celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

    // use AMQP instead
    // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
    // celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")

	// Configure with 2 celery workers
	celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 2)

	// worker.add name reflects "add" task method found in "worker.py"
	celeryClient.Register("worker.add", add)

    // Start Worker - blocking method
	go celeryClient.StartWorker()

    // Wait 30 seconds and stop all workers
	time.Sleep(30 * time.Second)
	celeryClient.StopWorker()
}
go run example/worker/main.go

You can use custom struct instead to hold shared structures.


type MyStruct struct {
	MyInt int
}

func (so *MyStruct) add(a int, b int) int {
	return a + b + so.MyInt
}

// code omitted ...

ms := &MyStruct{10}
celeryClient.Register("worker.add", ms.add)

// code omitted ...

Submit Task from Python Client

# example/test.py

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    # submit celery task to be executed in Go workers
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())
python example/test.py

Celery Client Example

Run Celery Worker implemented in Python

# example/worker.py

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
cd example
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

Submit Task from Go Client

func main() {
    // create broker and backend
	celeryBroker := gocelery.NewRedisCeleryBroker("redis://localhost:6379")
    celeryBackend := gocelery.NewRedisCeleryBackend("redis://localhost:6379")

    // use AMQP instead
    // celeryBroker := gocelery.NewAMQPCeleryBroker("amqp://")
    // celeryBackend := gocelery.NewAMQPCeleryBackend("amqp://")

    // create client
	celeryClient, _ := gocelery.NewCeleryClient(celeryBroker, celeryBackend, 0)

    // send task
	asyncResult, err := celeryClient.Delay("worker.add", 3, 5)
	if err != nil {
		panic(err)
	}

    // check if result is ready
	isReady, _ := asyncResult.Ready()
	fmt.Printf("ready status %v\n", isReady)

    // get result with 5s timeout
	res, err = asyncResult.Get(5 * time.Second)
	if err != nil {
		fmt.Println(err)
	} else {
        fmt.Println(res)
    }
}
go run example/client/main.go

Sample Celery Task Message

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.

FOSSA Status

Documentation

Overview

Package gocelery is Celery Distributed Task Queue in Go

Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

This package can also be used as pure go distributed task queue.

Supported brokers/backends

  • Redis (broker/backend)
  • AMQP (broker/backend)

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']  # Ignore other content
CELERY_RESULT_SERIALIZER='json'
CELERY_ENABLE_UTC=True

Index

Constants

View Source
const ErrTaskRetryable = errString("task failed but retryable")

ErrTaskRetryable indicates that the task failed but need to be retried again.

View Source
const (

	// MaxValidTime signifies how long is valid by default
	// Max set to 12 hrs
	MaxValidTime = 12 * time.Hour
)

Variables

This section is empty.

Functions

func GetRealValue

func GetRealValue(val *reflect.Value) interface{}

GetRealValue returns real value of reflect.Value Required for JSON Marshalling

Types

type AsyncResult

type AsyncResult struct {
	// contains filtered or unexported fields
}

AsyncResult is pending result

func (*AsyncResult) AsyncGet

func (ar *AsyncResult) AsyncGet() (interface{}, error)

AsyncGet gets actual result from backend returns the err if the result is not available yet. Always check Ready if the result is ready to be consumed

func (*AsyncResult) Get

func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)

Get gets actual result from redis It blocks for period of delay set by timeout and return error if unavailable

func (*AsyncResult) Ready

func (ar *AsyncResult) Ready() bool

Ready checks if actual result is ready

type CeleryBackend

type CeleryBackend interface {
	GetResult(string) (*ResultMessage, error) // must be non-blocking
	SetResult(taskID string, result *ResultMessage) error
}

CeleryBackend is interface for celery backend database

func NewInMemoryBackend

func NewInMemoryBackend() CeleryBackend

NewInMemoryBackend returns a CeleryBackend implemented InMemory.

func NewLevelDBBackend

func NewLevelDBBackend(db *leveldb.DB) CeleryBackend

NewLevelDBBackend returns an levelDB implementation of CeleryBackend

func NewRedisCeleryBackend

func NewRedisCeleryBackend(uri string) CeleryBackend

NewRedisCeleryBackend creates new redisCeleryBackend

type CeleryBroker

type CeleryBroker interface {
	SendCeleryMessage(*CeleryMessage) error
	GetTaskMessage() (*TaskMessage, error) // must be non-blocking
}

CeleryBroker is interface for celery broker database

func NewInMemoryBroker

func NewInMemoryBroker() CeleryBroker

NewInMemoryBroker returns immeory backed CeleryBroker

func NewLevelDBBroker

func NewLevelDBBroker(db *leveldb.DB, queue string) CeleryBroker

NewLevelDBBroker returns levelDB backed implementation of CeleryBroker We will also initialise any previous state

func NewRedisCeleryBroker

func NewRedisCeleryBroker(uri string) CeleryBroker

NewRedisCeleryBroker creates new redisCeleryBroker based on given uri

type CeleryClient

type CeleryClient struct {
	// contains filtered or unexported fields
}

CeleryClient provides API for sending celery tasks

func NewCeleryClient

func NewCeleryClient(broker CeleryBroker, backend CeleryBackend, numWorkers int, workerWaitTimeMS int) (*CeleryClient, error)

NewCeleryClient creates new celery client

func (*CeleryClient) Delay

func (cc *CeleryClient) Delay(task Task) (*AsyncResult, error)

Delay gets asynchronous result

func (*CeleryClient) Register

func (cc *CeleryClient) Register(name string, task interface{})

Register task

func (*CeleryClient) StartWorker

func (cc *CeleryClient) StartWorker()

StartWorker starts celery workers

func (*CeleryClient) StopWorker

func (cc *CeleryClient) StopWorker()

StopWorker stops celery workers

type CeleryDeliveryInfo

type CeleryDeliveryInfo struct {
	Priority   int    `json:"priority"`
	RoutingKey string `json:"routing_key"`
	Exchange   string `json:"exchange"`
}

CeleryDeliveryInfo represents deliveryinfo json

type CeleryMessage

type CeleryMessage struct {
	Body            string                 `json:"body"`
	Headers         map[string]interface{} `json:"headers"`
	ContentType     string                 `json:"content_type"`
	Properties      CeleryProperties       `json:"properties"`
	ContentEncoding string                 `json:"content_encoding"`
}

CeleryMessage is actual message to be sent to Redis

func (*CeleryMessage) GetTaskMessage

func (cm *CeleryMessage) GetTaskMessage() *TaskMessage

GetTaskMessage retrieve and decode task messages from broker

type CeleryProperties

type CeleryProperties struct {
	BodyEncoding  string             `json:"body_encoding"`
	CorrelationID string             `json:"correlation_id"`
	ReplyTo       string             `json:"replay_to"`
	DeliveryInfo  CeleryDeliveryInfo `json:"delivery_info"`
	DeliveryMode  int                `json:"delivery_mode"`
	DeliveryTag   string             `json:"delivery_tag"`
}

CeleryProperties represents properties json

type CeleryTask

type CeleryTask interface {

	// Copy - is used to safely create and execute a copy of a task (stateful)
	// in a worker when there are multiple workers working on the same type of task but with different internal state.
	Copy() (CeleryTask, error)

	// ParseKwargs - define a method to parse kwargs
	ParseKwargs(map[string]interface{}) error

	// RunTask - define a method to run
	RunTask() (interface{}, error)
}

CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()

type CeleryWorker

type CeleryWorker struct {
	// contains filtered or unexported fields
}

CeleryWorker represents distributed task worker. Not thread safe. Shouldn't be used from within multiple go routines.

func NewCeleryWorker

func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int, waitTimeMS int) *CeleryWorker

NewCeleryWorker returns new celery worker

func (*CeleryWorker) GetNumWorkers

func (w *CeleryWorker) GetNumWorkers() int

GetNumWorkers returns number of currently running workers

func (*CeleryWorker) GetTask

func (w *CeleryWorker) GetTask(name string) interface{}

GetTask retrieves registered task

func (*CeleryWorker) Register

func (w *CeleryWorker) Register(name string, task interface{})

Register registers tasks (functions)

func (*CeleryWorker) RunTask

func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)

RunTask runs celery task

func (*CeleryWorker) StartWorker

func (w *CeleryWorker) StartWorker()

StartWorker starts celery worker

func (*CeleryWorker) StopWorker

func (w *CeleryWorker) StopWorker()

StopWorker stops celery workers

type ResultMessage

type ResultMessage struct {
	Result interface{} `json:"result"`
	Error  string      `json:"error"`
}

ResultMessage is return message received from broker

type Task

type Task struct {
	Name     string
	Args     []interface{}
	Kwargs   map[string]interface{}
	Settings *TaskSettings // if Settings is nil, we fallback to default values
}

Task represents a task gocelery receives from the client.

type TaskMessage

type TaskMessage struct {
	ID       string                 `json:"id"`
	Task     string                 `json:"task"`
	Args     []interface{}          `json:"args"`
	Kwargs   map[string]interface{} `json:"kwargs"`
	Tries    uint                   `json:"tries"`
	Settings *TaskSettings          `json:"settings"`
}

TaskMessage is celery-compatible message

func DecodeTaskMessage

func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)

DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object

func (*TaskMessage) Encode

func (tm *TaskMessage) Encode() (string, error)

Encode returns base64 json encoded string

type TaskSettings

type TaskSettings struct {
	Delay      time.Time `json:"delay"`
	ValidUntil time.Time `json:"valid_until"`
}

TaskSettings can be passed to the task with specific overrides.

func DefaultSettings

func DefaultSettings() *TaskSettings

DefaultSettings returns the TaskSettings with all the default values and will be used if the Task.Settings is nil.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL