Documentation ¶
Overview ¶
Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.
Example (CustomRateLimit) ¶
package main import ( "fmt" "time" "github.com/vmihailenco/taskq/v2" "github.com/vmihailenco/taskq/v2/memqueue" ) type RateLimitError string func (e RateLimitError) Error() string { return string(e) } func (RateLimitError) Delay() time.Duration { return 3 * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{}) task := q.NewTask(&taskq.TaskOptions{ Name: "test", Handler: func() error { fmt.Println("retried in", timeSince(start)) return RateLimitError("calm down") }, RetryLimit: 2, MinBackoff: time.Millisecond, }) task.Call() // Wait for all messages to be processed. _ = q.Close() }
Output: retried in 0s retried in 3s
Example (MessageDelay) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{}) task := q.NewTask(&taskq.TaskOptions{ Name: "test", Handler: func() { fmt.Println("processed with delay", timeSince(start)) }, }) msg := taskq.NewMessage() msg.Delay = time.Second _ = task.AddMessage(msg) // Wait for all messages to be processed. _ = q.Close()
Output: processed with delay 1s
Example (Once) ¶
q := memqueue.NewQueue(&taskq.QueueOptions{ Redis: redisRing(), RateLimit: rate.Every(time.Second), }) task := q.NewTask(&taskq.TaskOptions{ Name: "test", Handler: func(name string) { fmt.Println("hello", name) }, }) for i := 0; i < 10; i++ { // Call once in a second. _ = task.CallOnce(time.Second, "world") } // Wait for all messages to be processed. _ = q.Close()
Output: hello world
Example (RateLimit) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Redis: redisRing(), RateLimit: rate.Every(time.Second), }) task := q.NewTask(&taskq.TaskOptions{ Name: "test", Handler: func() {}, }) const n = 5 for i := 0; i < n; i++ { _ = task.Call() } // Wait for all messages to be processed. _ = q.Close() fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output: 1 msg/s
Example (RetryOnError) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{}) task := q.NewTask(&taskq.TaskOptions{ Name: "test", Handler: func() error { fmt.Println("retried in", timeSince(start)) return errors.New("fake error") }, RetryLimit: 3, MinBackoff: time.Second, }) task.Call() // Wait for all messages to be processed. _ = q.Close()
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func SetLogger(logger *log.Logger)
- func SetUnknownTaskOptions(opt *TaskOptions)
- type Consumer
- func (c *Consumer) Add(msg *Message) error
- func (c *Consumer) AddHook(hook ConsumerHook)
- func (c *Consumer) Len() int
- func (c *Consumer) Options() *QueueOptions
- func (c *Consumer) Process(msg *Message) error
- func (c *Consumer) ProcessAll() error
- func (c *Consumer) ProcessOne() error
- func (c *Consumer) Purge() error
- func (c *Consumer) Put(msg *Message)
- func (c *Consumer) Queue() Queue
- func (c *Consumer) Start() error
- func (c *Consumer) Stats() *ConsumerStats
- func (c *Consumer) Stop() error
- func (c *Consumer) StopTimeout(timeout time.Duration) error
- func (c *Consumer) String() string
- type ConsumerHook
- type ConsumerStats
- type Delayer
- type Factory
- type Handler
- type HandlerFunc
- type Message
- type ProcessMessageEvent
- type Queue
- type QueueOptions
- type RateLimiter
- type Redis
- type Storage
- type Task
- type TaskOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsyncTask = errors.New("taskq: async task")
var ErrDuplicate = errors.New("taskq: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
Functions ¶
func SetUnknownTaskOptions ¶
func SetUnknownTaskOptions(opt *TaskOptions)
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewConsumer ¶
New creates new Consumer for the queue using provided processing options.
func StartConsumer ¶
Starts creates new Consumer and starts it.
func (*Consumer) AddHook ¶
func (c *Consumer) AddHook(hook ConsumerHook)
AddHook adds a hook into message processing.
func (*Consumer) Options ¶
func (c *Consumer) Options() *QueueOptions
func (*Consumer) Process ¶
Process is low-level API to process message bypassing the internal queue.
func (*Consumer) ProcessAll ¶
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Consumer) ProcessOne ¶
ProcessOne processes at most one message in the queue.
func (*Consumer) StopTimeout ¶
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ConsumerHook ¶
type ConsumerHook interface { BeforeProcessMessage(*ProcessMessageEvent) error AfterProcessMessage(*ProcessMessageEvent) error }
type ConsumerStats ¶
type Factory ¶
type Factory interface { NewQueue(*QueueOptions) Queue Queues() []Queue StartConsumers() error StopConsumers() error Close() error }
Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct { Ctx context.Context `msgpack:"-"` // SQS/IronMQ message id. ID string `msgpack:",omitempty"` // Optional name for the message. Messages with the same name // are processed only once. Name string `msgpack:"-"` // Delay specifies the duration the queue must wait // before executing the message. Delay time.Duration `msgpack:"-"` // Function args passed to the handler. Args []interface{} `msgpack:"-"` // Binary representation of the args. ArgsCompressed bool ArgsCompression string `msgpack:",omitempty"` ArgsBin []byte // SQS/IronMQ reservation id that is used to release/delete the message. ReservationID string `msgpack:"-"` // The number of times the message has been reserved or released. ReservedCount int TaskName string Task *Task `msgpack:"-"` StickyErr error `msgpack:"-"` // contains filtered or unexported fields }
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func NewMessage(args ...interface{}) *Message
func (*Message) MarshalArgs ¶
func (*Message) MarshalBinary ¶
func (*Message) OnceWithArgs ¶
func (*Message) UnmarshalBinary ¶
type ProcessMessageEvent ¶
type Queue ¶
type Queue interface { Name() string Options() *QueueOptions Consumer() *Consumer Handler NewTask(opt *TaskOptions) *Task GetTask(name string) *Task RemoveTask(name string) Len() (int, error) Add(msg *Message) error ReserveN(n int, waitTimeout time.Duration) ([]Message, error) Release(*Message) error Delete(msg *Message) error Purge() error Close() error CloseTimeout(timeout time.Duration) error }
type QueueOptions ¶
type QueueOptions struct { // Queue name. Name string // Minimum number of goroutines processing messages. // Default is 1. MinWorkers int // Maximum number of goroutines processing messages. // Default is 32 * number of CPUs. MaxWorkers int // Global limit of concurrently running workers across all servers. // Overrides MaxWorkers. WorkerLimit int // Maximum number of goroutines fetching messages. // Default is 16 * number of CPUs. MaxFetchers int // Number of messages reserved by a fetcher in the queue in one request. // Default is 10 messages. ReservationSize int // Time after which the reserved message is returned to the queue. // Default is 5 minutes. ReservationTimeout time.Duration // Time that a long polling receive call waits for a message to become // available before returning an empty response. // Default is 10 seconds. WaitTimeout time.Duration // Size of the buffer where reserved messages are stored. // Default is the same as ReservationSize. BufferSize int // Number of consecutive failures after which queue processing is paused. // Default is 100 failures. PauseErrorsThreshold int // Processing rate limit. RateLimit rate.Limit // Optional rate limiter interface. The default is to use Redis. RateLimiter RateLimiter // Redis client that is used for storing metadata. Redis Redis // Optional storage interface. The default is to use Redis. Storage Storage // contains filtered or unexported fields }
func (*QueueOptions) Init ¶
func (opt *QueueOptions) Init()
type RateLimiter ¶
type Redis ¶
type Redis interface { Del(keys ...string) *redis.IntCmd SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) // Required by redislock Eval(script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(scripts ...string) *redis.BoolSliceCmd ScriptLoad(script string) *redis.StringCmd }
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func NewTask ¶
func NewTask(queue Queue, opt *TaskOptions) *Task
func (*Task) AddMessage ¶
AddMessage adds message to the queue.
func (*Task) CallOnce ¶
CallOnce works like Call, but it returns ErrDuplicate if message with such args was already added in a period.
func (*Task) HandleMessage ¶
func (*Task) Options ¶
func (t *Task) Options() *TaskOptions
type TaskOptions ¶
type TaskOptions struct { Name string // Function called to process a message. Handler interface{} // Function called to process failed message. FallbackHandler interface{} // Optional function used by Consumer with defer statement // to recover from panics. DeferFunc func() // Number of tries/releases after which the message fails permanently // and is deleted. // Default is 64 retries. RetryLimit int // Minimum backoff time between retries. // Default is 30 seconds. MinBackoff time.Duration // Maximum backoff time between retries. // Default is 30 minutes. MaxBackoff time.Duration // contains filtered or unexported fields }