bokchoy

package module
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2021 License: MIT Imports: 21 Imported by: 3

README

bokchoy

Build Status GoDoc Go report

Introduction

Bokchoy is a simple Go library for queueing tasks and processing them in the background with workers. It should be integrated in your web stack easily and it's designed to have a low barrier entry for newcomers.

It currently only supports Redis (client, sentinel and cluster) with some Lua magic, but internally it relies on a generic broker implementation to extends it.

screen

Motivation

It's relatively easy to make a producer/receiver system in Go since the language contains builtins features to build it from scratch but we keep adding the same system everywhere instead of thinking reusable.

Bokchoy is a plug and play component, it does its job and it does it well for you that you can focus on your business logic.

Features

  • Lightweight
  • A Simple API close to net/http - if you already use net/http then you can learn it pretty quickly
  • Designed with a modular/composable APIs - middlewares, queue middlewares
  • Context control - built on context package, providing value chaining, cancelations and timeouts
  • Highly configurable - tons of options to swap internal parts (broker, logger, timeouts, etc), if you cannot customize something then an option is missing
  • Extensions - RPC server powered by gRPC, Sentry, etc.

Getting started

First, run a Redis server, of course:

redis-server

Define your producer which will send tasks:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/thoas/bokchoy"
)

func main() {
	ctx := context.Background()

	// define the main engine which will manage queues
	engine, err := bokchoy.New(ctx, bokchoy.Config{
		Broker: bokchoy.BrokerConfig{
			Type: "redis",
			Redis: bokchoy.RedisConfig{
				Type: "client",
				Client: bokchoy.RedisClientConfig{
					Addr: "localhost:6379",
				},
			},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	payload := map[string]string{
		"data": "hello world",
	}

	task, err := engine.Queue("tasks.message").Publish(ctx, payload)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(task, "has been published")
}

See producer directory for more information and to run it.

Now we have a producer which can send tasks to our engine, we need a worker to process them in the background:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/thoas/bokchoy"
)

func main() {
	ctx := context.Background()

	engine, err := bokchoy.New(ctx, bokchoy.Config{
		Broker: bokchoy.BrokerConfig{
			Type: "redis",
			Redis: bokchoy.RedisConfig{
				Type: "client",
				Client: bokchoy.RedisClientConfig{
					Addr: "localhost:6379",
				},
			},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	engine.Queue("tasks.message").HandleFunc(func(r *bokchoy.Request) error {
		fmt.Println("Receive request", r)
		fmt.Println("Payload:", r.Task.Payload)

		return nil
	})

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	go func() {
		for range c {
			log.Print("Received signal, gracefully stopping")
			engine.Stop(ctx)
		}
	}()

	engine.Run(ctx)
}

A worker is defined by handlers, to define a Handler you have to follow this interface:

type Handler interface {
	Handle(*Request) error
}

You can create your own struct which implements this interface or use the HandlerFunc to generate a Handler from your function.

See worker directory for more information and to run it.

If you want a complete application example, you can read A Tour of Bokchoy which explain how to use the main features of it.

Installation

Using Go Modules

go get github.com/thoas/bokchoy

Advanced topics

Delayed tasks

When publishing a task, it will be immediately processed by the worker if it's not already occupied, you may want to delay the task on some occasions by using bokchoy.WithCountdown option:

payload := map[string]string{
    "data": "hello world",
}

queue.Publish(ctx, payload, bokchoy.WithCountdown(5*time.Second))

This task will be executed in 5 seconds.

Priority tasks

A task can be published at front of others by providing a negative countdown.

payload := map[string]string{
    "data": "hello world",
}

queue.Publish(ctx, payload, bokchoy.WithCountdown(-1))

This task will be published and processed immediately.

Custom serializer

By default the task serializer is JSON, you can customize it when initializing the Bokchoy engine, it must respect the Serializer interface.

bokchoy.New(ctx, bokchoy.Config{
    Broker: bokchoy.BrokerConfig{
        Type: "redis",
        Redis: bokchoy.RedisConfig{
            Type: "client",
            Client: bokchoy.RedisClientConfig{
                Addr: "localhost:6379",
            },
        },
    },
}, bokchoy.WithSerializer(MySerializer{}))

You will be capable to define a msgpack, yaml serializers if you want.

Custom logger

By default the internal logger is disabled, you can provide a more verbose logger with options:

import (
	"context"
	"fmt"
	"log"

	"github.com/thoas/bokchoy/logging"
)

func main() {
	logger, err := logging.NewDevelopmentLogger()
	if err != nil {
		log.Fatal(err)
	}

	defer logger.Sync()

    bokchoy.New(ctx, bokchoy.Config{
        Broker: bokchoy.BrokerConfig{
            Type: "redis",
            Redis: bokchoy.RedisConfig{
                Type: "client",
                Client: bokchoy.RedisClientConfig{
                    Addr: "localhost:6379",
                },
            },
        },
    }, bokchoy.WithLogger(logger))
}

The builtin logger is based on zap but you can provide your own implementation easily if you have a central component.

If you don't need that much information, you can enable the Logger middleware.

Worker Concurrency

By default the worker concurrency is set to 1, you can override it based on your server capability, Bokchoy will spawn multiple goroutines to handle your tasks.

engine.Queue("tasks.message").HandleFunc(func(r *bokchoy.Request) error {
    fmt.Println("Receive request", r)
    fmt.Println("Payload:", r.Task.Payload)

    return nil
}, bokchoy.WithConcurrency(5))

You can still set it globally with bokchoy.WithConcurrency option when initializing the engine.

Retries

If your task handler is returning an error, the task will be marked as failed and retried 3 times, based on intervals: 60 seconds, 120 seconds, 180 seconds.

You can customize this globally on the engine or when publishing a new task by using bokchoy.WithMaxRetries and bokchoy.WithRetryIntervals options.

bokchoy.WithMaxRetries(1)
bokchoy.WithRetryIntervals([]time.Duration{
	180 * time.Second,
})
Timeout

By default a task will be forced to timeout and marked as canceled if its running time exceed 180 seconds.

You can customize this globally or when publishing a new task by using bokchoy.WithTimeout option:

bokchoy.WithTimeout(5*time.Second)

The worker will regain control and process the next task but be careful, each task is running in a goroutine so you have to cancel your task at some point or it will be leaking.

Catch events

You can catch events by registering handlers on your queue when your tasks are starting, succeeding, completing or failing.

queue := engine.Queue("tasks.message")
queue.OnStartFunc(func(r *bokchoy.Request) error {
    // we update the context by adding a value
    *r = *r.WithContext(context.WithValue(r.Context(), "foo", "bar"))

    return nil
})

queue.OnCompleteFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})

queue.OnSuccessFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})

queue.OnFailureFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})
Store results

By default, if you don't mutate the task in the handler its result will be always nil.

You can store a result in your task to keep it for later, for example: you might need statistics from a twitter profile to save them later.

queue.HandleFunc(func(r *bokchoy.Request) error {
	r.Task.Result = map[string]string{"result": "wow!"}

	return nil
})

You can store anything as long as your serializer can serializes it.

Keep in mind the default task TTL is 180 seconds, you can override it with bokchoy.WithTTL option.

Helpers

Let's define our previous queue:

queue := engine.Queue("tasks.message")
Empty the queue
queue.Empty()

It will remove all waiting tasks from your queue.

Cancel a waiting task

We produce a task without running the worker:

payload := map[string]string{
    "data": "hello world",
}

task, err := queue.Publish(ctx, payload)
if err != nil {
    log.Fatal(err)
}

Then we can cancel it by using its ID:

queue.Cancel(ctx, task.ID)
Retrieve a published task from the queue
queue.Get(ctx, task.ID)
Retrieve statistics from a queue
stats, err := queue.Count(ctx)
if err != nil {
    log.Fatal(err)
}

fmt.Println("Number of waiting tasks:", stats.Direct)
fmt.Println("Number of delayed tasks:", stats.Delayed)
fmt.Println("Number of total tasks:", stats.Total)

Middleware handlers

Bokchoy comes equipped with an optional middleware package, providing a suite of standard middlewares. Middlewares have the same API as handlers. It's easy to implement them and think of them like net/http middlewares, they share the same purpose to follow the lifecycle of a Bokchoy request.

Core middlewares

bokchoy/middleware description
Logger Logs the start and end of each request with the elapsed processing time
Recoverer Gracefully absorb panics and prints the stack trace
RequestID Injects a request ID into the context of each request
Timeout Signals to the request context when the timeout deadline is reached

See middleware directory for more information.

FAQs

Are Task IDs unique?

Yes! There are based on ulid.

Is exactly-once execution of tasks guaranteed?

It's guaranteed by the underlying broker, it uses BRPOP/BLPOP from Redis.

If multiple clients are blocked for the same key, the first client to be served is the one that was waiting for more time (the first that blocked for the key).

Contributing

Don't hesitate ;)

Project history

Bokchoy is highly influenced by the great rq and celery.

Both are great projects well maintained but only used in a Python ecosystem.

Some parts (middlewares mostly) of Bokchoy are heavily inspired or taken from go-chi.

Documentation

Index

Constants

View Source
const (
	VERSION = "v1.4.6, 22 September 2021, 15:11 GMT+3"
)

Variables

This section is empty.

Functions

func ClearAll added in v1.0.0

func ClearAll() *ekaerr.Error

func Empty added in v1.0.0

func Empty() *ekaerr.Error

func Init added in v1.0.0

func Init(options ...Option) *ekaerr.Error

func Run added in v1.0.0

func Run() *ekaerr.Error

func Stop added in v1.0.0

func Stop()

Types

type Bokchoy

type Bokchoy struct {
	// contains filtered or unexported fields
}

Bokchoy is the main object which stores all configuration, queues and broker.

func New

func New(options ...Option) (*Bokchoy, *ekaerr.Error)

New initializes a new Bokchoy instance.

func Use added in v1.0.0

func Use(queueName string, handlers ...HandlerFunc) *Bokchoy

func (*Bokchoy) ClearAll added in v1.0.0

func (b *Bokchoy) ClearAll() *ekaerr.Error

ClearAll clears all queues in the broker and also removes all metadata. Does nothing (but returns an error) if Bokchoy already running (Run() has called).

func (*Bokchoy) Empty

func (b *Bokchoy) Empty() *ekaerr.Error

Empty empties initialized queues. Returns an error of the first queue that can not be emptied. Does nothing (but returns an error) if Bokchoy already running (Run() has called).

func (*Bokchoy) Publish

func (b *Bokchoy) Publish(queueName string, payload interface{}, options ...Option) (*Task, *ekaerr.Error)

Publish publishes a new payload to a queue.

func (*Bokchoy) Queue

func (b *Bokchoy) Queue(name string, options ...Option) *Queue

Queue gets or creates a new queue.

If Run() has been called already, the new queue's consumers will be start immediately (if it's a new queue, and if Bokchoy has not been stopped yet).

If queue with the given 'name' has already declared, the Queue method just returns it, but if at least one Option is provided (even nil), the queue will be recreated with provided options.

func (*Bokchoy) Run

func (b *Bokchoy) Run() *ekaerr.Error

Run runs the system and block the current goroutine.

func (*Bokchoy) Stop

func (b *Bokchoy) Stop()

Stop stops all queues and their consumers.

Stopping of queues and consumers can not be failed (it's just goroutines). So, there is no returned error object, cause it never fail.

Does nothing if Bokchoy is not running.

func (*Bokchoy) Use

func (b *Bokchoy) Use(queueName string, handlers ...HandlerFunc) *Bokchoy

Use append a new middleware to the system. Does nothing if Bokchoy already running (Run() has called).

type Broker

type Broker interface {
	fmt.Stringer

	// Get returns serialized Task from the broker.
	// You can call Task.Deserialize then to decode received data.
	Get(queueName, taskID string) ([]byte, *ekaerr.Error)

	// Delete deletes raw data in broker based on key.
	Delete(queueName, taskID string) *ekaerr.Error

	// List returns raw data stored in broker.
	List(queueName string) ([][]byte, *ekaerr.Error)

	// Empty empties a queue.
	Empty(queueName string) *ekaerr.Error

	// ClearAll clears all queues in the broker and also removes all metadata.
	ClearAll() *ekaerr.Error

	// Count returns number of items from a queue name.
	Count(queueName string) (BrokerStats, *ekaerr.Error)

	// Set synchronizes the stored item.
	Set(queueName, taskID string, data []byte, ttl time.Duration) *ekaerr.Error

	// Publish publishes raw data.
	Publish(queueName, taskID string, taskPayload []byte, taskEtaUnixNano int64) *ekaerr.Error

	// Consume returns an array of raw data.
	Consume(queueName string, maxETA int64) ([][]byte, *ekaerr.Error)
}

Broker is the common interface to define a Broker.

type BrokerStats added in v1.0.0

type BrokerStats struct {
	Total   int
	Direct  int
	Delayed int
}

BrokerStats is the statistics returned by a Queue.

type Color added in v1.0.0

type Color []byte

Color is a terminal color representation.

type HandlerFunc

type HandlerFunc func(task *Task) *ekaerr.Error

HandlerFunc is a handler to handle incoming tasks.

type Option

type Option func(opts *options)

Option is an option unit.

func WithBroker added in v1.0.0

func WithBroker(broker Broker) Option

WithBroker registers new broker.

func WithConcurrency

func WithConcurrency(concurrency int8) Option

WithConcurrency defines the number of concurrent consumers.

func WithCountdown

func WithCountdown(countdown time.Duration) Option

WithCountdown defines the countdown to launch a delayed task.

func WithCustomSerializerJSON added in v1.3.1

func WithCustomSerializerJSON(example interface{}) Option

WithCustomSerializerJSON is an alias for WithSerializer(CustomSerializerJSON(example)).

func WithDisableOutput

func WithDisableOutput(disableOutput bool) Option

WithDisableOutput defines if the output (logo, queues information) should be disabled.

func WithLogger

func WithLogger(logger *ekalog.Logger) Option

WithLogger defines the Logger.

func WithMaxRetries

func WithMaxRetries(maxRetries int8) Option

WithMaxRetries defines the number of maximum retries for a failed task.

func WithQueues

func WithQueues(queues ...string) Option

WithQueues allows to override queues to run.

func WithQueuess added in v1.0.0

func WithQueuess(queues []string) Option

WithQueuess allows to override queues to run.

func WithRetryIntervals

func WithRetryIntervals(retryIntervals []time.Duration) Option

WithRetryIntervals defines the retry intervals for a failed task.

func WithSerializer

func WithSerializer(serializer Serializer) Option

WithSerializer defines the Serializer.

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL defines the duration to keep the task in the broker.

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout defines the timeout used to execute a task.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue contains consumers to enqueue.

func GetQueue added in v1.0.0

func GetQueue(name string, options ...Option) *Queue

func (*Queue) Cancel

func (q *Queue) Cancel(taskID string) (*Task, *ekaerr.Error)

Cancel cancels a task using its ID.

func (*Queue) Consume

func (q *Queue) Consume() ([]Task, *ekaerr.Error)

Consume returns an array of tasks.

func (*Queue) Count

func (q *Queue) Count() (BrokerStats, *ekaerr.Error)

Count returns statistics from queue: * direct: number of waiting tasks * delayed: number of waiting delayed tasks * total: number of total tasks

func (*Queue) Empty

func (q *Queue) Empty() *ekaerr.Error

Empty empties queue.

func (*Queue) Get

func (q *Queue) Get(taskID string) (*Task, *ekaerr.Error)

Get returns a Task instance from the current Broker's Queue with its id. Returns nil as Task if requested task is not found.

func (*Queue) List added in v1.0.0

func (q *Queue) List() ([]Task, *ekaerr.Error)

List returns tasks from the broker.

func (*Queue) Name

func (q *Queue) Name() string

Name returns the queue name.

func (*Queue) NewTask

func (q *Queue) NewTask(payload interface{}, options ...Option) *Task

NewTask returns a new Task instance from payload and options.

Requirements: - Current Queue is valid. Otherwise nil Task is returned.

func (*Queue) OnComplete

func (q *Queue) OnComplete(callback HandlerFunc) *Queue

OnComplete registers a new handler to be executed when a task is completed.

func (*Queue) OnFailure

func (q *Queue) OnFailure(callback HandlerFunc) *Queue

OnFailure registers a new handler to be executed when a task is failed.

func (*Queue) OnStart

func (q *Queue) OnStart(callback HandlerFunc) *Queue

OnStart registers a new handler to be executed when a task is started.

func (*Queue) OnSuccess

func (q *Queue) OnSuccess(callback HandlerFunc) *Queue

OnSuccess registers a new handler to be executed when a task is succeeded.

func (*Queue) Publish

func (q *Queue) Publish(payload interface{}, options ...Option) (*Task, *ekaerr.Error)

Publish is a alias for:

task := q.NewTask(ctx, payload, options...)
q.PublishTask(ctx, task)

func (*Queue) PublishTask

func (q *Queue) PublishTask(task *Task) *ekaerr.Error

PublishTask publishes a new task to the current Queue.

func (*Queue) Use

func (q *Queue) Use(callback ...HandlerFunc) *Queue

Use appends a new handler middleware to the queue.

type Serializer

type Serializer interface {
	Dumps(interface{}) ([]byte, *ekaerr.Error)
	Loads([]byte, *interface{}) *ekaerr.Error
	IsHumanReadable() bool
	Name() string
}

Serializer defines an interface to implement a serializer, to encode user's Task payload to be a part of encoded RAW data of tasks, that will be used by Broker.

func CustomSerializerJSON added in v1.0.3

func CustomSerializerJSON(example interface{}) Serializer

CustomSerializerJSON returns a new JSON Serializer, that expects the same type's values will passed to Serializer.Dumps(), Serializer.Loads() as type of value you pass to this constructor.

Using that constructor builds a special JSON Serializer exactly for you, meaning that even defining destination for Serializer.Loads() as interface{}, the underlying type will be always T, that you pass to this constructor. Look:

var (
        ser = CustomSerializerJSON(T{})
        dest interface{}
)
if err := ser.Loads(<...>, &dest); err.IsNotNil() {
        _, ok := dest.(T); // ok == true, if no err.
}

Passing nil interface{} returns the same Serializer as you may get using DefaultSerializerJSON().

It's OK to use both of T or *T as type. What you pass is what you get. Value is not important, so you can just use T{} or (*T)(nil).

func DefaultSerializerDummy added in v1.0.0

func DefaultSerializerDummy() Serializer

func DefaultSerializerJSON added in v1.0.0

func DefaultSerializerJSON() Serializer

DefaultSerializerJSON is the same as CustomSerializerJSON(nil).

type Task

type Task struct {
	Error *ekaerr.Error
	Panic interface{}

	PublishedAt ekatime.Timestamp

	TTL time.Duration
	ETA int64

	RetryIntervals []time.Duration
	MaxRetries     int8

	ExecTime time.Duration
	Timeout  time.Duration

	Payload interface{}
	// contains filtered or unexported fields
}

Task is the model stored in a Queue.

func Publish added in v1.0.0

func Publish(queueName string, payload interface{}, options ...Option) (*Task, *ekaerr.Error)

func (*Task) Deserialize added in v1.0.0

func (t *Task) Deserialize(data []byte, userPayloadSerializer Serializer) *ekaerr.Error

Deserialize returns a Task instance from raw data.

func (*Task) ID

func (t *Task) ID() string

ID returns an unique ID (ULID) of the current Task. Read more: https://github.com/oklog/ulid .

WARNING! This value not guaranteed to be an unique over queues. Although the chances of collision over queues are slim, keep that in mind. If you need to distinguish two Task s from different queues, use ID() along with QueueName().

Nil safe. Returns an empty string if Task is not initialized properly.

func (*Task) IsFinished added in v1.0.0

func (t *Task) IsFinished() bool

IsFinished reports whether current Task is considered finished, and callbacks onCompleted may be called for that.

func (*Task) MarkAsCanceled

func (t *Task) MarkAsCanceled()

func (*Task) MarkAsSucceeded

func (t *Task) MarkAsSucceeded()

func (*Task) QueueName added in v1.0.0

func (t *Task) QueueName() string

QueueName returns a queue name to which this task is published, or retrieved from.

func (*Task) Serialize

func (t *Task) Serialize(userPayloadSerializer Serializer) ([]byte, *ekaerr.Error)

Serialize serializes a Task to raw data.

func (*Task) Status

func (t *Task) Status() TaskStatus

Status returns the Task's status, that:

  • Has been sent by you, or
  • Task had at the moment when you retrieve the Task from a Bokchoy backend.

Requirements:

  • Current Task is valid. Otherwise TASK_STATUS_INVALID is returned.

WARNING! TAKE A LOOK ALSO AT THE IsFinished() METHOD.

type TaskStatus added in v1.0.0

type TaskStatus int8
const (
	TASK_STATUS_INVALID    TaskStatus = 0
	TASK_STATUS_WAITING    TaskStatus = 1
	TASK_STATUS_PROCESSING TaskStatus = 2
	TASK_STATUS_RETRYING   TaskStatus = 5
	TASK_STATUS_SUCCEEDED  TaskStatus = 10
	TASK_STATUS_FAILED     TaskStatus = -1
	TASK_STATUS_CANCELLED  TaskStatus = -2
	TASK_STATUS_TIMED_OUT  TaskStatus = -3
)

func (TaskStatus) String added in v1.0.0

func (ts TaskStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL