README
¶
Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends
Installation
taskq requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing taskq:
go get github.com/vmihailenco/taskq/v2
Features
- Redis, SQS, IronMQ, and in-memory backends.
- Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
- Global rate limiting.
- Global limit of workers.
- Call once - deduplicating messages with same name.
- Automatic retries with exponential backoffs.
- Automatic pausing when all messages in queue fail.
- Fallback handler for processing failed messages.
- Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
- Automatic message compression using zstd.
Quickstart
I recommend that you split your app into two parts:
- An API that accepts requests from customers and adds tasks to the queues.
- A Worker that fetches tasks from the queues and processes them.
This way you can:
- Isolate API and worker from each other;
- Scale API and worker separately;
- Have different configs for API and worker (like timeouts).
There is an api_worker example that demonstrates this approach using Redis as backend:
cd examples/api_worker
go run worker/main.go
go run api/main.go
You start by choosing backend to use - in our case Redis:
package api_worker
var QueueFactory = redisq.NewFactory()
Using that factory you create queue that contains task(s):
var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
Name: "api-worker",
Redis: Redis, // go-redis client
})
Using the queue you create task with handler that does some useful work:
var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
Name: "counter",
Handler: func() error {
IncrLocalCounter()
return nil
},
})
Then in API you use the task to add messages/jobs to the queues:
ctx := context.Background()
for {
// call task handler without any args
err := api_worker.MainQueue.Add(api_worker.CountTask.WithArgs(ctx))
if err != nil {
log.Fatal(err)
}
}
And in worker you start processing the queue:
err := api_worker.MainQueue.Start(context.Background())
if err != nil {
log.Fatal(err)
}
API overview
t := myQueue.RegisterTask(&taskq.TaskOptions{
Name: "greeting",
Handler: func(name string) error {
fmt.Println("Hello", name)
return nil
},
})
// Say "Hello World".
myQueue.Add(t.WithArgs(context.Background(), "World"))
// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
myQueue.Add(msg)
// Say "Hello World" once.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "hello")
msg.Name = "hello-world" // unique
myQueue.Add(msg)
}
// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "hello")
msg.Name = "hello-world"
msg.Delay = time.Hour
myQueue.Add(msg)
}
// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "hello").OnceInPeriod(time.Hour)
myQueue.Add(msg)
}
// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
msg := t.WithArgs(ctx, "hello").OnceInPeriod(time.Hour, "world", "europe")
myQueue.Add(msg)
}
Message deduplication
If a Message
has a Name
then this will be used as unique identifier and messages with the same name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if not evicted from local cache after that period). Where Name
is omitted then non deduplication occurs and each message will be processed. Task
's WithMessage
and WithArgs
both produces messages with no Name
so will not be deduplicated. OnceWithArgs
sets a name based off a consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed to OnceWithArgs
a period
. This guarantees that the same function will not be called with the same arguments during `period'.
Handlers
A Handler
and FallbackHandler
are supplied to RegisterTask
in the TaskOptions
.
There are three permitted types of signature:
- A zero-argument function
- A function whose arguments are assignable in type from those which are passed in the message
- A function which takes a single
*Message
argument
If a task is registered with a handler that takes a Go context.Context
as its first argument then when that handler is invoked it will be passed the same Context
that was passed to Consumer.Start(ctx)
. This can be used to transmit a signal to abort to all tasks being processed:
var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
Name: "SomethingLongwinded",
Handler: func(ctx context.Context) error {
for range time.Tick(time.Second) {
select {
case <-ctx.Done():
return ctx.Err()
default:
fmt.Println("Wee!")
}
}
return nil
},
})
Custom message delay
If error returned by handler implements Delay() time.Duration
interface then that delay is used to postpone message processing.
type RateLimitError string
func (e RateLimitError) Error() string {
return string(e)
}
func (RateLimitError) Delay() time.Duration {
return time.Hour
}
func handler() error {
return RateLimitError("calm down")
}
Documentation
¶
Overview ¶
Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.
Example (MessageDelay) ¶
Output: processed with delay 1s
Example (Once) ¶
Output: hello world
Example (RateLimit) ¶
Output: 1 msg/s
Example (RetryOnError) ¶
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func SetLogger(logger *log.Logger)
- func SetUnknownTaskOptions(opt *TaskOptions)
- type Consumer
- func (c *Consumer) Add(msg *Message) error
- func (c *Consumer) AddHook(hook ConsumerHook)
- func (c *Consumer) Len() int
- func (c *Consumer) Options() *QueueOptions
- func (c *Consumer) Process(msg *Message) error
- func (c *Consumer) ProcessAll(ctx context.Context) error
- func (c *Consumer) ProcessOne(ctx context.Context) error
- func (c *Consumer) Purge() error
- func (c *Consumer) Put(msg *Message)
- func (c *Consumer) Queue() Queue
- func (c *Consumer) Start(ctx context.Context) error
- func (c *Consumer) Stats() *ConsumerStats
- func (c *Consumer) Stop() error
- func (c *Consumer) StopTimeout(timeout time.Duration) error
- func (c *Consumer) String() string
- type ConsumerHook
- type ConsumerStats
- type Delayer
- type Factory
- type Handler
- type HandlerFunc
- type Message
- type ProcessMessageEvent
- type Queue
- type QueueOptions
- type RateLimiter
- type Redis
- type Storage
- type Task
- type TaskMap
- type TaskOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsyncTask = errors.New("taskq: async task")
var ErrDuplicate = errors.New("taskq: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
Functions ¶
func SetUnknownTaskOptions ¶
func SetUnknownTaskOptions(opt *TaskOptions)
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewConsumer ¶
New creates new Consumer for the queue using provided processing options.
func StartConsumer ¶
Starts creates new Consumer and starts it.
func (*Consumer) AddHook ¶
func (c *Consumer) AddHook(hook ConsumerHook)
AddHook adds a hook into message processing.
func (*Consumer) Options ¶
func (c *Consumer) Options() *QueueOptions
func (*Consumer) Process ¶
Process is low-level API to process message bypassing the internal queue.
func (*Consumer) ProcessAll ¶
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Consumer) ProcessOne ¶
ProcessOne processes at most one message in the queue.
func (*Consumer) StopTimeout ¶
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ConsumerHook ¶
type ConsumerHook interface { BeforeProcessMessage(*ProcessMessageEvent) error AfterProcessMessage(*ProcessMessageEvent) error }
type ConsumerStats ¶
type Factory ¶
type Factory interface { RegisterQueue(*QueueOptions) Queue Range(func(Queue) bool) StartConsumers(context.Context) error StopConsumers() error Close() error }
Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct { Ctx context.Context `msgpack:"-"` // SQS/IronMQ message id. ID string `msgpack:",omitempty"` // Optional name for the message. Messages with the same name // are processed only once. Name string `msgpack:"-"` // Delay specifies the duration the queue must wait // before executing the message. Delay time.Duration `msgpack:"-"` // Function args passed to the handler. Args []interface{} `msgpack:"-"` // Binary representation of the args. ArgsCompression string `msgpack:",omitempty"` ArgsBin []byte // SQS/IronMQ reservation id that is used to release/delete the message. ReservationID string `msgpack:"-"` // The number of times the message has been reserved or released. ReservedCount int `msgpack:",omitempty"` TaskName string Err error `msgpack:"-"` // contains filtered or unexported fields }
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func (*Message) MarshalArgs ¶
func (*Message) MarshalBinary ¶
func (*Message) OnceInPeriod ¶ added in v2.1.0
OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.
func (*Message) UnmarshalBinary ¶
type ProcessMessageEvent ¶
type Queue ¶
type Queue interface { fmt.Stringer Name() string Options() *QueueOptions Consumer() *Consumer Len() (int, error) Add(msg *Message) error ReserveN(n int, waitTimeout time.Duration) ([]Message, error) Release(msg *Message) error Delete(msg *Message) error Purge() error Close() error CloseTimeout(timeout time.Duration) error }
type QueueOptions ¶
type QueueOptions struct { // Queue name. Name string // Minimum number of goroutines processing messages. // Default is 1. MinWorkers int // Maximum number of goroutines processing messages. // Default is 32 * number of CPUs. MaxWorkers int // Global limit of concurrently running workers across all servers. // Overrides MaxWorkers. WorkerLimit int // Maximum number of goroutines fetching messages. // Default is 16 * number of CPUs. MaxFetchers int // Number of messages reserved by a fetcher in the queue in one request. // Default is 10 messages. ReservationSize int // Time after which the reserved message is returned to the queue. // Default is 5 minutes. ReservationTimeout time.Duration // Time that a long polling receive call waits for a message to become // available before returning an empty response. // Default is 10 seconds. WaitTimeout time.Duration // Size of the buffer where reserved messages are stored. // Default is the same as ReservationSize. BufferSize int // Number of consecutive failures after which queue processing is paused. // Default is 100 failures. PauseErrorsThreshold int // Processing rate limit. RateLimit rate.Limit // Optional rate limiter interface. The default is to use Redis. RateLimiter RateLimiter // Redis client that is used for storing metadata. Redis Redis // Optional storage interface. The default is to use Redis. Storage Storage // Optional message handler. The default is the global Tasks registry. Handler Handler // contains filtered or unexported fields }
func (*QueueOptions) Init ¶
func (opt *QueueOptions) Init()
type RateLimiter ¶
type Redis ¶
type Redis interface { Del(keys ...string) *redis.IntCmd SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) // Required by redislock Eval(script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(scripts ...string) *redis.BoolSliceCmd ScriptLoad(script string) *redis.StringCmd }
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func RegisterTask ¶
func RegisterTask(opt *TaskOptions) *Task
func (*Task) HandleMessage ¶
func (*Task) Options ¶
func (t *Task) Options() *TaskOptions
type TaskMap ¶ added in v2.1.0
type TaskMap struct {
// contains filtered or unexported fields
}
var Tasks TaskMap
func (*TaskMap) HandleMessage ¶ added in v2.1.0
func (*TaskMap) Register ¶ added in v2.1.0
func (r *TaskMap) Register(opt *TaskOptions) (*Task, error)
func (*TaskMap) Unregister ¶ added in v2.1.0
type TaskOptions ¶
type TaskOptions struct { // Task name. Name string // Function called to process a message. // There are three permitted types of signature: // 1. A zero-argument function // 2. A function whose arguments are assignable in type from those which are passed in the message // 3. A function which takes a single `*Message` argument // The handler function may also optionally take a Context as a first argument and may optionally return an error. // If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to // `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/. Handler interface{} // Function called to process failed message after the specified number of retries have all failed. // The FallbackHandler accepts the same types of function as the Handler. FallbackHandler interface{} // Optional function used by Consumer with defer statement // to recover from panics. DeferFunc func() // Number of tries/releases after which the message fails permanently // and is deleted. // Default is 64 retries. RetryLimit int // Minimum backoff time between retries. // Default is 30 seconds. MinBackoff time.Duration // Maximum backoff time between retries. // Default is 30 minutes. MaxBackoff time.Duration // contains filtered or unexported fields }
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
api_worker
This package contains an example usage of redis-backed taskq.
|
This package contains an example usage of redis-backed taskq. |