Documentation ¶
Overview ¶
Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.
Example (CustomRateLimit) ¶
package main import ( "context" "fmt" "time" "github.com/vmihailenco/taskq/v2" "github.com/vmihailenco/taskq/v2/memqueue" ) type RateLimitError string func (e RateLimitError) Error() string { return string(e) } func (RateLimitError) Delay() time.Duration { return 3 * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_customRateLimit", Handler: func() error { fmt.Println("retried in", timeSince(start)) return RateLimitError("calm down") }, RetryLimit: 2, MinBackoff: time.Millisecond, }) ctx := context.Background() q.Add(task.WithArgs(ctx)) // Wait for all messages to be processed. _ = q.Close() }
Output: retried in 0s retried in 3s
Example (MessageDelay) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_messageDelay", Handler: func() { fmt.Println("processed with delay", timeSince(start)) }, }) ctx := context.Background() msg := task.WithArgs(ctx) msg.Delay = time.Second _ = q.Add(msg) // Wait for all messages to be processed. _ = q.Close()
Output: processed with delay 1s
Example (Once) ¶
q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", Redis: redisRing(), RateLimit: rate.Every(time.Second), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_once", Handler: func(name string) { fmt.Println("hello", name) }, }) ctx := context.Background() for i := 0; i < 10; i++ { // Call once in a second. _ = q.Add(task.WithArgs(ctx, "world").OnceInPeriod(time.Second)) } // Wait for all messages to be processed. _ = q.Close()
Output: hello world
Example (RateLimit) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", Redis: redisRing(), RateLimit: rate.Every(time.Second), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_rateLimit", Handler: func() {}, }) const n = 5 ctx := context.Background() for i := 0; i < n; i++ { _ = q.Add(task.WithArgs(ctx)) } // Wait for all messages to be processed. _ = q.Close() fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output: 1 msg/s
Example (RetryOnError) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_retryOnError", Handler: func() error { fmt.Println("retried in", timeSince(start)) return errors.New("fake error") }, RetryLimit: 3, MinBackoff: time.Second, }) ctx := context.Background() q.Add(task.WithArgs(ctx)) // Wait for all messages to be processed. _ = q.Close()
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func SetLogger(logger *log.Logger)
- func SetUnknownTaskOptions(opt *TaskOptions)
- type Consumer
- func (c *Consumer) Add(msg *Message) error
- func (c *Consumer) AddHook(hook ConsumerHook)
- func (c *Consumer) Len() int
- func (c *Consumer) Options() *QueueOptions
- func (c *Consumer) Process(msg *Message) error
- func (c *Consumer) ProcessAll(ctx context.Context) error
- func (c *Consumer) ProcessOne(ctx context.Context) error
- func (c *Consumer) Purge() error
- func (c *Consumer) Put(msg *Message)
- func (c *Consumer) Queue() Queue
- func (c *Consumer) Start(ctx context.Context) error
- func (c *Consumer) Stats() *ConsumerStats
- func (c *Consumer) Stop() error
- func (c *Consumer) StopTimeout(timeout time.Duration) error
- func (c *Consumer) String() string
- type ConsumerHook
- type ConsumerStats
- type Delayer
- type Factory
- type Handler
- type HandlerFunc
- type Message
- func (m *Message) MarshalArgs() ([]byte, error)
- func (m *Message) MarshalBinary() ([]byte, error)
- func (m *Message) OnceInPeriod(period time.Duration, args ...interface{}) *Message
- func (m *Message) SetDelay(delay time.Duration) *Message
- func (m *Message) String() string
- func (m *Message) UnmarshalBinary(b []byte) error
- type ProcessMessageEvent
- type Queue
- type QueueOptions
- type RateLimiter
- type Redis
- type Storage
- type Task
- type TaskMap
- type TaskOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsyncTask = errors.New("taskq: async task")
var ErrDuplicate = errors.New("taskq: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
Functions ¶
func SetUnknownTaskOptions ¶
func SetUnknownTaskOptions(opt *TaskOptions)
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewConsumer ¶
New creates new Consumer for the queue using provided processing options.
func StartConsumer ¶
Starts creates new Consumer and starts it.
func (*Consumer) AddHook ¶
func (c *Consumer) AddHook(hook ConsumerHook)
AddHook adds a hook into message processing.
func (*Consumer) Options ¶
func (c *Consumer) Options() *QueueOptions
func (*Consumer) Process ¶
Process is low-level API to process message bypassing the internal queue.
func (*Consumer) ProcessAll ¶
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Consumer) ProcessOne ¶
ProcessOne processes at most one message in the queue.
func (*Consumer) StopTimeout ¶
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ConsumerHook ¶
type ConsumerHook interface { BeforeProcessMessage(*ProcessMessageEvent) error AfterProcessMessage(*ProcessMessageEvent) error }
type ConsumerStats ¶
type Factory ¶
type Factory interface { RegisterQueue(*QueueOptions) Queue Range(func(Queue) bool) StartConsumers(context.Context) error StopConsumers() error Close() error }
Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct { Ctx context.Context `msgpack:"-"` // SQS/IronMQ message id. ID string `msgpack:",omitempty"` // Optional name for the message. Messages with the same name // are processed only once. Name string `msgpack:"-"` // Delay specifies the duration the queue must wait // before executing the message. Delay time.Duration `msgpack:"-"` // Function args passed to the handler. Args []interface{} `msgpack:"-"` // Binary representation of the args. ArgsCompression string `msgpack:",omitempty"` ArgsBin []byte // SQS/IronMQ reservation id that is used to release/delete the message. ReservationID string `msgpack:"-"` // The number of times the message has been reserved or released. ReservedCount int `msgpack:",omitempty"` TaskName string Err error `msgpack:"-"` // contains filtered or unexported fields }
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func (*Message) MarshalArgs ¶
func (*Message) MarshalBinary ¶
func (*Message) OnceInPeriod ¶ added in v2.1.0
OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.
func (*Message) UnmarshalBinary ¶
type ProcessMessageEvent ¶
type Queue ¶
type Queue interface { fmt.Stringer Name() string Options() *QueueOptions Consumer() *Consumer Len() (int, error) Add(msg *Message) error ReserveN(n int, waitTimeout time.Duration) ([]Message, error) Release(msg *Message) error Delete(msg *Message) error Purge() error Close() error CloseTimeout(timeout time.Duration) error }
type QueueOptions ¶
type QueueOptions struct { // Queue name. Name string // Minimum number of goroutines processing messages. // Default is 1. MinWorkers int // Maximum number of goroutines processing messages. // Default is 32 * number of CPUs. MaxWorkers int // Global limit of concurrently running workers across all servers. // Overrides MaxWorkers. WorkerLimit int // Maximum number of goroutines fetching messages. // Default is 16 * number of CPUs. MaxFetchers int // Number of messages reserved by a fetcher in the queue in one request. // Default is 10 messages. ReservationSize int // Time after which the reserved message is returned to the queue. // Default is 5 minutes. ReservationTimeout time.Duration // Time that a long polling receive call waits for a message to become // available before returning an empty response. // Default is 10 seconds. WaitTimeout time.Duration // Size of the buffer where reserved messages are stored. // Default is the same as ReservationSize. BufferSize int // Number of consecutive failures after which queue processing is paused. // Default is 100 failures. PauseErrorsThreshold int // Processing rate limit. RateLimit rate.Limit // Optional rate limiter interface. The default is to use Redis. RateLimiter RateLimiter // Redis client that is used for storing metadata. Redis Redis // Optional storage interface. The default is to use Redis. Storage Storage // Optional message handler. The default is the global Tasks registry. Handler Handler // contains filtered or unexported fields }
func (*QueueOptions) Init ¶
func (opt *QueueOptions) Init()
type RateLimiter ¶
type Redis ¶
type Redis interface { Del(keys ...string) *redis.IntCmd SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) // Required by redislock Eval(script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(scripts ...string) *redis.BoolSliceCmd ScriptLoad(script string) *redis.StringCmd }
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func RegisterTask ¶
func RegisterTask(opt *TaskOptions) *Task
func (*Task) HandleMessage ¶
func (*Task) Options ¶
func (t *Task) Options() *TaskOptions
type TaskMap ¶ added in v2.1.0
type TaskMap struct {
// contains filtered or unexported fields
}
var Tasks TaskMap
func (*TaskMap) HandleMessage ¶ added in v2.1.0
func (*TaskMap) Register ¶ added in v2.1.0
func (r *TaskMap) Register(opt *TaskOptions) (*Task, error)
func (*TaskMap) Unregister ¶ added in v2.1.0
type TaskOptions ¶
type TaskOptions struct { // Task name. Name string // Function called to process a message. // There are three permitted types of signature: // 1. A zero-argument function // 2. A function whose arguments are assignable in type from those which are passed in the message // 3. A function which takes a single `*Message` argument // The handler function may also optionally take a Context as a first argument and may optionally return an error. // If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to // `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/. Handler interface{} // Function called to process failed message after the specified number of retries have all failed. // The FallbackHandler accepts the same types of function as the Handler. FallbackHandler interface{} // Optional function used by Consumer with defer statement // to recover from panics. DeferFunc func() // Number of tries/releases after which the message fails permanently // and is deleted. // Default is 64 retries. RetryLimit int // Minimum backoff time between retries. // Default is 30 seconds. MinBackoff time.Duration // Maximum backoff time between retries. // Default is 30 minutes. MaxBackoff time.Duration // contains filtered or unexported fields }
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
api_worker
This package contains an example usage of redis-backed taskq.
|
This package contains an example usage of redis-backed taskq. |