taskq

package module
v2.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2019 License: BSD-2-Clause Imports: 20 Imported by: 4

README

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Build Status GoDoc

Installation

taskq requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing taskq:

go get github.com/vmihailenco/taskq/v2

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using zstd.

Quickstart

I recommend that you split your app into two parts:

  • An API that accepts requests from customers and adds tasks to the queues.
  • A Worker that fetches tasks from the queues and processes them.

This way you can:

  • Isolate API and worker from each other;
  • Scale API and worker separately;
  • Have different configs for API and worker (like timeouts).

There is an api_worker example that demonstrates this approach using Redis as backend:

cd examples/api_worker
go run worker/main.go
go run api/main.go

You start by choosing backend to use - in our case Redis:

package api_worker

var QueueFactory = redisq.NewFactory()

Using that factory you create queue that contains task(s):

var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
	Name:  "api-worker",
	Redis: Redis, // go-redis client
})

Using the queue you create task with handler that does some useful work:

var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
	Name: "counter",
	Handler: func() error {
		IncrLocalCounter()
		return nil
	},
})

Then in API you use the task to add messages/jobs to the queues:

ctx := context.Background()
for {
	// call task handler without any args
	err := api_worker.MainQueue.Add(api_worker.CountTask.WithArgs(ctx))
	if err != nil {
		log.Fatal(err)
	}
}

And in worker you start processing the queue:

err := api_worker.MainQueue.Start(context.Background())
if err != nil {
	log.Fatal(err)
}

API overview

t := myQueue.RegisterTask(&taskq.TaskOptions{
	Name:    "greeting",
	Handler: func(name string) error {
		fmt.Println("Hello", name)
		return nil
	},
})

// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
    panic(err)
}

// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world" // unique
    _ = myQueue.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    _ = myQueue.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
    _ = myQueue.Add(msg)
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
    _ = myQueue.Add(msg)
}

Message deduplication

If a Message has a Name then this will be used as unique identifier and messages with the same name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if not evicted from local cache after that period). Where Name is omitted then non deduplication occurs and each message will be processed. Task's WithMessage and WithArgs both produces messages with no Name so will not be deduplicated. OnceWithArgs sets a name based off a consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed to OnceWithArgs a period. This guarantees that the same function will not be called with the same arguments during `period'.

Handlers

A Handler and FallbackHandler are supplied to RegisterTask in the TaskOptions.

There are three permitted types of signature:

  1. A zero-argument function
  2. A function whose arguments are assignable in type from those which are passed in the message
  3. A function which takes a single *Message argument

If a task is registered with a handler that takes a Go context.Context as its first argument then when that handler is invoked it will be passed the same Context that was passed to Consumer.Start(ctx). This can be used to transmit a signal to abort to all tasks being processed:

var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
	Name: "SomethingLongwinded",
	Handler: func(ctx context.Context) error {
		for range time.Tick(time.Second) {
			select {
			    case <-ctx.Done():
			    	return ctx.Err()
			    default:
			    	fmt.Println("Wee!")
			}
		}
		return nil
	},
})

Custom message delay

If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

Documentation

Overview

Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.

Example (CustomRateLimit)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/vmihailenco/taskq/v2"
	"github.com/vmihailenco/taskq/v2/memqueue"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&taskq.QueueOptions{
		Name: "test",
	})
	task := taskq.RegisterTask(&taskq.TaskOptions{
		Name: "Example_customRateLimit",
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})

	ctx := context.Background()
	q.Add(task.WithArgs(ctx))

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 3s
Example (MessageDelay)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_messageDelay",
	Handler: func() {
		fmt.Println("processed with delay", timeSince(start))
	},
})

ctx := context.Background()
msg := task.WithArgs(ctx)
msg.Delay = time.Second
_ = q.Add(msg)

// Wait for all messages to be processed.
_ = q.Close()
Output:

processed with delay 1s
Example (Once)
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_once",
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
})

ctx := context.Background()
for i := 0; i < 10; i++ {
	// Call once in a second.
	_ = q.Add(task.WithArgs(ctx, "world").OnceInPeriod(time.Second))
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

hello world
Example (RateLimit)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name:    "Example_rateLimit",
	Handler: func() {},
})

const n = 5

ctx := context.Background()
for i := 0; i < n; i++ {
	_ = q.Add(task.WithArgs(ctx))
}

// Wait for all messages to be processed.
_ = q.Close()

fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output:

1 msg/s
Example (RetryOnError)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_retryOnError",
	Handler: func() error {
		fmt.Println("retried in", timeSince(start))
		return errors.New("fake error")
	},
	RetryLimit: 3,
	MinBackoff: time.Second,
})

ctx := context.Background()
q.Add(task.WithArgs(ctx))

// Wait for all messages to be processed.
_ = q.Close()
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAsyncTask = errors.New("taskq: async task")
View Source
var ErrDuplicate = errors.New("taskq: message with such name already exists")

ErrDuplicate is returned when adding duplicate message to the queue.

Functions

func SetLogger

func SetLogger(logger *log.Logger)

func SetUnknownTaskOptions

func SetUnknownTaskOptions(opt *TaskOptions)

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

func NewConsumer

func NewConsumer(q Queue) *Consumer

New creates new Consumer for the queue using provided processing options.

func StartConsumer

func StartConsumer(ctx context.Context, q Queue) *Consumer

Starts creates new Consumer and starts it.

func (*Consumer) Add

func (c *Consumer) Add(msg *Message) error

func (*Consumer) AddHook

func (c *Consumer) AddHook(hook ConsumerHook)

AddHook adds a hook into message processing.

func (*Consumer) Len

func (c *Consumer) Len() int

func (*Consumer) Options

func (c *Consumer) Options() *QueueOptions

func (*Consumer) Process

func (c *Consumer) Process(msg *Message) error

Process is low-level API to process message bypassing the internal queue.

func (*Consumer) ProcessAll

func (c *Consumer) ProcessAll(ctx context.Context) error

ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.

func (*Consumer) ProcessOne

func (c *Consumer) ProcessOne(ctx context.Context) error

ProcessOne processes at most one message in the queue.

func (*Consumer) Purge

func (c *Consumer) Purge() error

Purge discards messages from the internal queue.

func (*Consumer) Put

func (c *Consumer) Put(msg *Message)

func (*Consumer) Queue

func (c *Consumer) Queue() Queue

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start starts consuming messages in the queue.

func (*Consumer) Stats

func (c *Consumer) Stats() *ConsumerStats

Stats returns processor stats.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop is StopTimeout with 30 seconds timeout.

func (*Consumer) StopTimeout

func (c *Consumer) StopTimeout(timeout time.Duration) error

StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerHook

type ConsumerHook interface {
	BeforeProcessMessage(*ProcessMessageEvent) error
	AfterProcessMessage(*ProcessMessageEvent) error
}

type ConsumerStats

type ConsumerStats struct {
	WorkerNumber  uint32
	FetcherNumber uint32
	BufferSize    uint32
	Buffered      uint32
	InFlight      uint32
	Processed     uint32
	Retries       uint32
	Fails         uint32
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Factory

type Factory interface {
	RegisterQueue(*QueueOptions) Queue
	Range(func(Queue) bool)
	StartConsumers(context.Context) error
	StopConsumers() error
	Close() error
}

Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context `msgpack:"-"`

	// SQS/IronMQ message id.
	ID string `msgpack:",omitempty"`

	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string `msgpack:"-"`

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration `msgpack:"-"`

	// Function args passed to the handler.
	Args []interface{} `msgpack:"-"`

	// Binary representation of the args.
	ArgsCompression string `msgpack:",omitempty"`
	ArgsBin         []byte

	// SQS/IronMQ reservation id that is used to release/delete the message.
	ReservationID string `msgpack:"-"`

	// The number of times the message has been reserved or released.
	ReservedCount int `msgpack:",omitempty"`

	TaskName string
	Err      error `msgpack:"-"`
	// contains filtered or unexported fields
}

Message is used to create and retrieve messages from a queue.

func NewMessage

func NewMessage(ctx context.Context, args ...interface{}) *Message

func (*Message) MarshalArgs

func (m *Message) MarshalArgs() ([]byte, error)

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) OnceInPeriod added in v2.1.0

func (m *Message) OnceInPeriod(period time.Duration, args ...interface{}) *Message

OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.

func (*Message) SetDelay added in v2.2.0

func (m *Message) SetDelay(delay time.Duration) *Message

SetDelay sets message delay.

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(b []byte) error

type ProcessMessageEvent

type ProcessMessageEvent struct {
	Message   *Message
	StartTime time.Time
	Err       error

	Stash map[interface{}]interface{}
}

type Queue

type Queue interface {
	fmt.Stringer
	Name() string
	Options() *QueueOptions
	Consumer() *Consumer

	Len() (int, error)
	Add(msg *Message) error
	ReserveN(n int, waitTimeout time.Duration) ([]Message, error)
	Release(msg *Message) error
	Delete(msg *Message) error
	Purge() error
	Close() error
	CloseTimeout(timeout time.Duration) error
}

type QueueOptions

type QueueOptions struct {
	// Queue name.
	Name string

	// Minimum number of goroutines processing messages.
	// Default is 1.
	MinWorkers int
	// Maximum number of goroutines processing messages.
	// Default is 32 * number of CPUs.
	MaxWorkers int
	// Global limit of concurrently running workers across all servers.
	// Overrides MaxWorkers.
	WorkerLimit int
	// Maximum number of goroutines fetching messages.
	// Default is 16 * number of CPUs.
	MaxFetchers int

	// Number of messages reserved by a fetcher in the queue in one request.
	// Default is 10 messages.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	// Default is 5 minutes.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 10 seconds.
	WaitTimeout time.Duration
	// Size of the buffer where reserved messages are stored.
	// Default is the same as ReservationSize.
	BufferSize int

	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int

	// Processing rate limit.
	RateLimit rate.Limit

	// Optional rate limiter interface. The default is to use Redis.
	RateLimiter RateLimiter

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage

	// Optional message handler. The default is the global Tasks registry.
	Handler Handler
	// contains filtered or unexported fields
}

func (*QueueOptions) Init

func (opt *QueueOptions) Init()

type RateLimiter

type RateLimiter interface {
	AllowRate(name string, limit rate.Limit) (delay time.Duration, allow bool)
}

type Redis

type Redis interface {
	Del(keys ...string) *redis.IntCmd
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	// Required by redislock
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(script string) *redis.StringCmd
}

type Storage

type Storage interface {
	Exists(key string) bool
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func RegisterTask

func RegisterTask(opt *TaskOptions) *Task

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) Options

func (t *Task) Options() *TaskOptions

func (*Task) String

func (t *Task) String() string

func (*Task) WithArgs

func (t *Task) WithArgs(ctx context.Context, args ...interface{}) *Message

type TaskMap added in v2.1.0

type TaskMap struct {
	// contains filtered or unexported fields
}
var Tasks TaskMap

func (*TaskMap) Get added in v2.1.0

func (r *TaskMap) Get(name string) *Task

func (*TaskMap) HandleMessage added in v2.1.0

func (r *TaskMap) HandleMessage(msg *Message) error

func (*TaskMap) Range added in v2.1.0

func (r *TaskMap) Range(fn func(name string, task *Task) bool)

func (*TaskMap) Register added in v2.1.0

func (r *TaskMap) Register(opt *TaskOptions) (*Task, error)

func (*TaskMap) Reset added in v2.1.0

func (r *TaskMap) Reset()

func (*TaskMap) Unregister added in v2.1.0

func (r *TaskMap) Unregister(task *Task)

type TaskOptions

type TaskOptions struct {
	// Task name.
	Name string

	// Function called to process a message.
	// There are three permitted types of signature:
	// 1. A zero-argument function
	// 2. A function whose arguments are assignable in type from those which are passed in the message
	// 3. A function which takes a single `*Message` argument
	// The handler function may also optionally take a Context as a first argument and may optionally return an error.
	// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
	// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
	Handler interface{}
	// Function called to process failed message after the specified number of retries have all failed.
	// The FallbackHandler accepts the same types of function as the Handler.
	FallbackHandler interface{}

	// Optional function used by Consumer with defer statement
	// to recover from panics.
	DeferFunc func()

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	// Default is 64 retries.
	RetryLimit int
	// Minimum backoff time between retries.
	// Default is 30 seconds.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	// Default is 30 minutes.
	MaxBackoff time.Duration
	// contains filtered or unexported fields
}

Directories

Path Synopsis
examples
api_worker
This package contains an example usage of redis-backed taskq.
This package contains an example usage of redis-backed taskq.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL