taskq

package module
v3.0.0-...-95fe89c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2023 License: BSD-2-Clause Imports: 23 Imported by: 0

README

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

build workflow PkgGoDev Documentation Chat

taskq is brought to you by ⭐ uptrace/uptrace. Uptrace is an open source and blazingly fast distributed tracing tool powered by OpenTelemetry and ClickHouse. Give it a star as well!

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using snappy / s2.

Resources:

Getting started

To get started, see Golang Task Queue documentation.

Producer:

import (
    "github.com/tablera/taskq/v3"
    "github.com/tablera/taskq/v3/redisq"
)

// Create a queue factory.
var QueueFactory = redisq.NewFactory()

// Create a queue.
var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
    Name:  "api-worker",
    Redis: Redis, // go-redis client
})

// Register a task.
var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
    Name: "counter",
    Handler: func() error {
        IncrLocalCounter()
        return nil
    },
})

ctx := context.Background()

// And start producing.
for {
	// Call the task without any args.
	err := MainQueue.Add(CountTask.WithArgs(ctx))
	if err != nil {
		panic(err)
	}
	time.Sleep(time.Second)
}

Consumer:

// Start consuming the queue.
if err := MainQueue.Start(context.Background()); err != nil {
    log.Fatal(err)
}

See also

Contributors

Thanks to all the people who already contributed!

Documentation

Overview

Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.

Example (CustomRateLimit)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/tablera/taskq/v3"
	"github.com/tablera/taskq/v3/memqueue"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&taskq.QueueOptions{
		Name: "test",
	})
	task := taskq.RegisterTask(&taskq.TaskOptions{
		Name: "Example_customRateLimit",
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})

	ctx := context.Background()
	q.Add(task.WithArgs(ctx))

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 3s
Example (MessageDelay)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_messageDelay",
	Handler: func() {
		fmt.Println("processed with delay", timeSince(start))
	},
})

ctx := context.Background()
msg := task.WithArgs(ctx)
msg.Delay = time.Second
_ = q.Add(msg)

// Wait for all messages to be processed.
_ = q.Close()
Output:

processed with delay 1s
Example (Once)
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_once",
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
})

ctx := context.Background()
for i := 0; i < 10; i++ {
	msg := task.WithArgs(ctx, "world")
	// Call once in a second.
	msg.OnceInPeriod(time.Second)

	_ = q.Add(msg)
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

hello world
Example (RateLimit)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name:    "Example_rateLimit",
	Handler: func() {},
})

const n = 5

ctx := context.Background()
for i := 0; i < n; i++ {
	_ = q.Add(task.WithArgs(ctx))
}

// Wait for all messages to be processed.
_ = q.Close()

fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output:

1 msg/s
Example (RetryOnError)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_retryOnError",
	Handler: func() error {
		fmt.Println("retried in", timeSince(start))
		return errors.New("fake error")
	},
	RetryLimit: 3,
	MinBackoff: time.Second,
})

ctx := context.Background()
q.Add(task.WithArgs(ctx))

// Wait for all messages to be processed.
_ = q.Close()
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAsyncTask = errors.New("taskq: async task")
View Source
var ErrDuplicate = errors.New("taskq: message with such name already exists")

ErrDuplicate is returned when adding duplicate message to the queue.

Functions

func SetLogger

func SetLogger(logger *log.Logger)

func SetUnknownTaskOptions

func SetUnknownTaskOptions(opt *TaskOptions)

func Version

func Version() string

Version is the current release version.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

func NewConsumer

func NewConsumer(q Queue) *Consumer

NewConsumer creates new Consumer for the queue using provided processing options.

func StartConsumer

func StartConsumer(ctx context.Context, q Queue) *Consumer

StartConsumer creates new QueueConsumer and starts it.

func (*Consumer) Add

func (c *Consumer) Add(msg *Message) error

func (*Consumer) AddHook

func (c *Consumer) AddHook(hook ConsumerHook)

AddHook adds a hook into message processing.

func (*Consumer) Len

func (c *Consumer) Len() int

func (*Consumer) Options

func (c *Consumer) Options() *QueueOptions

func (*Consumer) Process

func (c *Consumer) Process(msg *Message) error

Process is low-level API to process message bypassing the internal queue.

func (*Consumer) ProcessAll

func (c *Consumer) ProcessAll(ctx context.Context) error

ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.

func (*Consumer) ProcessOne

func (c *Consumer) ProcessOne(ctx context.Context) error

ProcessOne processes at most one message in the queue.

func (*Consumer) Purge

func (c *Consumer) Purge() error

Purge discards messages from the internal queue.

func (*Consumer) Put

func (c *Consumer) Put(msg *Message)

func (*Consumer) Queue

func (c *Consumer) Queue() Queue

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start starts consuming messages in the queue.

func (*Consumer) Stats

func (c *Consumer) Stats() *ConsumerStats

Stats returns processor stats.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop is StopTimeout with 30 seconds timeout.

func (*Consumer) StopTimeout

func (c *Consumer) StopTimeout(timeout time.Duration) error

StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerHook

type ConsumerHook interface {
	BeforeProcessMessage(*ProcessMessageEvent) error
	AfterProcessMessage(*ProcessMessageEvent) error
}

type ConsumerStats

type ConsumerStats struct {
	NumWorker  uint32
	NumFetcher uint32

	BufferSize uint32
	Buffered   uint32

	InFlight  uint32
	Processed uint32
	Retries   uint32
	Fails     uint32
	Timing    time.Duration
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Factory

type Factory interface {
	RegisterQueue(*QueueOptions) Queue
	Range(func(Queue) bool)
	StartConsumers(context.Context) error
	StopConsumers() error
	Close() error
}

Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context `msgpack:"-"`

	// SQS/IronMQ message id.
	ID string `msgpack:"1,omitempty,alias:ID"`

	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string `msgpack:"-"`

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration `msgpack:"-"`

	// Args passed to the handler.
	Args []interface{} `msgpack:"-"`

	// Binary representation of the args.
	ArgsCompression string `msgpack:"2,omitempty,alias:ArgsCompression"`
	ArgsBin         []byte `msgpack:"3,alias:ArgsBin"`

	// SQS/IronMQ reservation id that is used to release/delete the message.
	ReservationID string `msgpack:"-"`

	// The number of times the message has been reserved or released.
	ReservedCount int `msgpack:"4,omitempty,alias:ReservedCount"`

	TaskName string `msgpack:"5,alias:TaskName"`
	Err      error  `msgpack:"-"`
	// contains filtered or unexported fields
}

Message is used to create and retrieve messages from a queue.

func NewMessage

func NewMessage(ctx context.Context, args ...interface{}) *Message

func (*Message) MarshalArgs

func (m *Message) MarshalArgs() ([]byte, error)

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) OnceInPeriod

func (m *Message) OnceInPeriod(period time.Duration, args ...interface{})

OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.

func (*Message) OnceWithDelay

func (m *Message) OnceWithDelay(delay time.Duration)

func (*Message) OnceWithSchedule

func (m *Message) OnceWithSchedule(tm time.Time)

func (*Message) SetDelay

func (m *Message) SetDelay(delay time.Duration)

SetDelay sets the message delay.

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(b []byte) error

type ProcessMessageEvent

type ProcessMessageEvent struct {
	Message   *Message
	StartTime time.Time

	Stash map[interface{}]interface{}
}

type Queue

type Queue interface {
	fmt.Stringer
	Name() string
	Options() *QueueOptions
	Consumer() QueueConsumer

	Len() (int, error)
	Add(msg *Message) error
	ReserveN(ctx context.Context, n int, waitTimeout time.Duration) ([]Message, error)
	Release(msg *Message) error
	Delete(msg *Message) error
	Purge() error
	Close() error
	CloseTimeout(timeout time.Duration) error
}

type QueueConsumer

type QueueConsumer interface {
	// AddHook adds a hook into message processing.
	AddHook(hook ConsumerHook)
	Queue() Queue
	Options() *QueueOptions
	Len() int
	// Stats returns processor stats.
	Stats() *ConsumerStats
	Add(msg *Message) error
	// Start starts consuming messages in the queue.
	Start(ctx context.Context) error
	// Stop is StopTimeout with 30 seconds timeout.
	Stop() error
	// StopTimeout waits workers for timeout duration to finish processing current
	// messages and stops workers.
	StopTimeout(timeout time.Duration) error
	// ProcessAll starts workers to process messages in the queue and then stops
	// them when all messages are processed.
	ProcessAll(ctx context.Context) error
	// ProcessOne processes at most one message in the queue.
	ProcessOne(ctx context.Context) error
	// Process is low-level API to process message bypassing the internal queue.
	Process(msg *Message) error
	Put(msg *Message)
	// Purge discards messages from the internal queue.
	Purge() error
	String() string
}

QueueConsumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

type QueueOptions

type QueueOptions struct {
	// Queue name.
	Name string

	// Minimum number of goroutines processing messages.
	// Default is 1.
	MinNumWorker int32
	// Maximum number of goroutines processing messages.
	// Default is 32 * number of CPUs.
	MaxNumWorker int32
	// Global limit of concurrently running workers across all servers.
	// Overrides MaxNumWorker.
	WorkerLimit int32
	// Maximum number of goroutines fetching messages.
	// Default is 8 * number of CPUs.
	MaxNumFetcher int32

	// Number of messages reserved by a fetcher in the queue in one request.
	// Default is 10 messages.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	// Default is 5 minutes.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 10 seconds.
	WaitTimeout time.Duration
	// Size of the buffer where reserved messages are stored.
	// Default is the same as ReservationSize.
	BufferSize int

	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int

	// Processing rate limit.
	RateLimit redis_rate.Limit

	// Optional rate limiter. The default is to use Redis.
	RateLimiter *redis_rate.Limiter

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage

	// Optional message handler. The default is the global Tasks registry.
	Handler Handler

	// ConsumerIdleTimeout Time after which the consumer need to be deleted.
	// Default is 6 hour
	ConsumerIdleTimeout time.Duration

	// SchedulerBackoffTime is the time of backoff for the scheduler(
	// Scheduler was designed to clean zombie Consumer and requeue pending msgs, and so on.
	// Default is randomly between 1~1.5s
	// We can change it to a bigger value so that it won't slowdown the redis when using redis queue.
	// It will be between SchedulerBackoffTime and SchedulerBackoffTime+250ms.
	SchedulerBackoffTime time.Duration
	// contains filtered or unexported fields
}

func (*QueueOptions) Init

func (opt *QueueOptions) Init()

type Redis

type Redis interface {
	Del(ctx context.Context, keys ...string) *redis.IntCmd
	SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	// Eval Required by redislock
	Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(ctx context.Context, script string) *redis.StringCmd
	EvalRO(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
	EvalShaRO(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
}

type Storage

type Storage interface {
	Exists(ctx context.Context, key string) bool
}

func NewLocalStorage

func NewLocalStorage() Storage

type Task

type Task struct {
	// contains filtered or unexported fields
}

func RegisterTask

func RegisterTask(opt *TaskOptions) *Task

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) Options

func (t *Task) Options() *TaskOptions

func (*Task) String

func (t *Task) String() string

func (*Task) WithArgs

func (t *Task) WithArgs(ctx context.Context, args ...interface{}) *Message

type TaskMap

type TaskMap struct {
	// contains filtered or unexported fields
}
var Tasks TaskMap

func (*TaskMap) Get

func (r *TaskMap) Get(name string) *Task

func (*TaskMap) HandleMessage

func (r *TaskMap) HandleMessage(msg *Message) error

func (*TaskMap) Range

func (r *TaskMap) Range(fn func(name string, task *Task) bool)

func (*TaskMap) Register

func (r *TaskMap) Register(opt *TaskOptions) (*Task, error)

func (*TaskMap) Reset

func (r *TaskMap) Reset()

func (*TaskMap) Unregister

func (r *TaskMap) Unregister(task *Task)

type TaskOptions

type TaskOptions struct {
	// Task name.
	Name string

	// Function called to process a message.
	// There are three permitted types of signature:
	// 1. A zero-argument function
	// 2. A function whose arguments are assignable in type from those which are passed in the message
	// 3. A function which takes a single `*Message` argument
	// The handler function may also optionally take a Context as a first argument and may optionally return an error.
	// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
	// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
	Handler interface{}
	// Function called to process failed message after the specified number of retries have all failed.
	// The FallbackHandler accepts the same types of function as the Handler.
	FallbackHandler interface{}

	// Optional function used by Consumer with defer statement
	// to recover from panics.
	DeferFunc func()

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	// Default is 64 retries.
	RetryLimit int
	// Minimum backoff time between retries.
	// Default is 30 seconds.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	// Default is 30 minutes.
	MaxBackoff time.Duration
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL