Documentation ¶
Overview ¶
Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.
Example (CustomRateLimit) ¶
package main import ( "context" "fmt" "time" "github.com/tablera/taskq/v3" "github.com/tablera/taskq/v3/memqueue" ) type RateLimitError string func (e RateLimitError) Error() string { return string(e) } func (RateLimitError) Delay() time.Duration { return 3 * time.Second } func main() { start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_customRateLimit", Handler: func() error { fmt.Println("retried in", timeSince(start)) return RateLimitError("calm down") }, RetryLimit: 2, MinBackoff: time.Millisecond, }) ctx := context.Background() q.Add(task.WithArgs(ctx)) // Wait for all messages to be processed. _ = q.Close() }
Output: retried in 0s retried in 3s
Example (MessageDelay) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_messageDelay", Handler: func() { fmt.Println("processed with delay", timeSince(start)) }, }) ctx := context.Background() msg := task.WithArgs(ctx) msg.Delay = time.Second _ = q.Add(msg) // Wait for all messages to be processed. _ = q.Close()
Output: processed with delay 1s
Example (Once) ¶
q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", Redis: redisRing(), RateLimit: redis_rate.PerSecond(1), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_once", Handler: func(name string) { fmt.Println("hello", name) }, }) ctx := context.Background() for i := 0; i < 10; i++ { msg := task.WithArgs(ctx, "world") // Call once in a second. msg.OnceInPeriod(time.Second) _ = q.Add(msg) } // Wait for all messages to be processed. _ = q.Close()
Output: hello world
Example (RateLimit) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", Redis: redisRing(), RateLimit: redis_rate.PerSecond(1), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_rateLimit", Handler: func() {}, }) const n = 5 ctx := context.Background() for i := 0; i < n; i++ { _ = q.Add(task.WithArgs(ctx)) } // Wait for all messages to be processed. _ = q.Close() fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output: 1 msg/s
Example (RetryOnError) ¶
start := time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "Example_retryOnError", Handler: func() error { fmt.Println("retried in", timeSince(start)) return errors.New("fake error") }, RetryLimit: 3, MinBackoff: time.Second, }) ctx := context.Background() q.Add(task.WithArgs(ctx)) // Wait for all messages to be processed. _ = q.Close()
Output: retried in 0s retried in 1s retried in 3s
Index ¶
- Variables
- func SetLogger(logger *log.Logger)
- func SetUnknownTaskOptions(opt *TaskOptions)
- func Version() string
- type Consumer
- func (c *Consumer) Add(msg *Message) error
- func (c *Consumer) AddHook(hook ConsumerHook)
- func (c *Consumer) Len() int
- func (c *Consumer) Options() *QueueOptions
- func (c *Consumer) Process(msg *Message) error
- func (c *Consumer) ProcessAll(ctx context.Context) error
- func (c *Consumer) ProcessOne(ctx context.Context) error
- func (c *Consumer) Purge() error
- func (c *Consumer) Put(msg *Message)
- func (c *Consumer) Queue() Queue
- func (c *Consumer) Start(ctx context.Context) error
- func (c *Consumer) Stats() *ConsumerStats
- func (c *Consumer) Stop() error
- func (c *Consumer) StopTimeout(timeout time.Duration) error
- func (c *Consumer) String() string
- type ConsumerHook
- type ConsumerStats
- type Delayer
- type Factory
- type Handler
- type HandlerFunc
- type Message
- func (m *Message) MarshalArgs() ([]byte, error)
- func (m *Message) MarshalBinary() ([]byte, error)
- func (m *Message) OnceInPeriod(period time.Duration, args ...interface{})
- func (m *Message) OnceWithDelay(delay time.Duration)
- func (m *Message) OnceWithSchedule(tm time.Time)
- func (m *Message) SetDelay(delay time.Duration)
- func (m *Message) String() string
- func (m *Message) UnmarshalBinary(b []byte) error
- type ProcessMessageEvent
- type Queue
- type QueueConsumer
- type QueueOptions
- type Redis
- type Storage
- type Task
- type TaskMap
- type TaskOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAsyncTask = errors.New("taskq: async task")
var ErrDuplicate = errors.New("taskq: message with such name already exists")
ErrDuplicate is returned when adding duplicate message to the queue.
Functions ¶
func SetUnknownTaskOptions ¶
func SetUnknownTaskOptions(opt *TaskOptions)
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
func NewConsumer ¶
NewConsumer creates new Consumer for the queue using provided processing options.
func StartConsumer ¶
StartConsumer creates new QueueConsumer and starts it.
func (*Consumer) AddHook ¶
func (c *Consumer) AddHook(hook ConsumerHook)
AddHook adds a hook into message processing.
func (*Consumer) Options ¶
func (c *Consumer) Options() *QueueOptions
func (*Consumer) Process ¶
Process is low-level API to process message bypassing the internal queue.
func (*Consumer) ProcessAll ¶
ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.
func (*Consumer) ProcessOne ¶
ProcessOne processes at most one message in the queue.
func (*Consumer) StopTimeout ¶
StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.
type ConsumerHook ¶
type ConsumerHook interface { BeforeProcessMessage(*ProcessMessageEvent) error AfterProcessMessage(*ProcessMessageEvent) error }
type ConsumerStats ¶
type Factory ¶
type Factory interface { RegisterQueue(*QueueOptions) Queue Range(func(Queue) bool) StartConsumers(context.Context) error StopConsumers() error Close() error }
Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.
type Handler ¶
Handler is an interface for processing messages.
func NewHandler ¶
func NewHandler(fn interface{}) Handler
type HandlerFunc ¶
func (HandlerFunc) HandleMessage ¶
func (fn HandlerFunc) HandleMessage(msg *Message) error
type Message ¶
type Message struct { Ctx context.Context `msgpack:"-"` // SQS/IronMQ message id. ID string `msgpack:"1,omitempty,alias:ID"` // Optional name for the message. Messages with the same name // are processed only once. Name string `msgpack:"-"` // Delay specifies the duration the queue must wait // before executing the message. Delay time.Duration `msgpack:"-"` // Args passed to the handler. Args []interface{} `msgpack:"-"` // Binary representation of the args. ArgsCompression string `msgpack:"2,omitempty,alias:ArgsCompression"` ArgsBin []byte `msgpack:"3,alias:ArgsBin"` // SQS/IronMQ reservation id that is used to release/delete the message. ReservationID string `msgpack:"-"` // The number of times the message has been reserved or released. ReservedCount int `msgpack:"4,omitempty,alias:ReservedCount"` TaskName string `msgpack:"5,alias:TaskName"` Err error `msgpack:"-"` // contains filtered or unexported fields }
Message is used to create and retrieve messages from a queue.
func NewMessage ¶
func (*Message) MarshalArgs ¶
func (*Message) MarshalBinary ¶
func (*Message) OnceInPeriod ¶
OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.
func (*Message) OnceWithDelay ¶
func (*Message) OnceWithSchedule ¶
func (*Message) UnmarshalBinary ¶
type ProcessMessageEvent ¶
type Queue ¶
type Queue interface { fmt.Stringer Name() string Options() *QueueOptions Consumer() QueueConsumer Len() (int, error) Add(msg *Message) error ReserveN(ctx context.Context, n int, waitTimeout time.Duration) ([]Message, error) Release(msg *Message) error Delete(msg *Message) error Purge() error Close() error CloseTimeout(timeout time.Duration) error }
type QueueConsumer ¶
type QueueConsumer interface { // AddHook adds a hook into message processing. AddHook(hook ConsumerHook) Queue() Queue Options() *QueueOptions Len() int // Stats returns processor stats. Stats() *ConsumerStats Add(msg *Message) error // Start starts consuming messages in the queue. Start(ctx context.Context) error // Stop is StopTimeout with 30 seconds timeout. Stop() error // StopTimeout waits workers for timeout duration to finish processing current // messages and stops workers. StopTimeout(timeout time.Duration) error // ProcessAll starts workers to process messages in the queue and then stops // them when all messages are processed. ProcessAll(ctx context.Context) error // ProcessOne processes at most one message in the queue. ProcessOne(ctx context.Context) error // Process is low-level API to process message bypassing the internal queue. Process(msg *Message) error Put(msg *Message) // Purge discards messages from the internal queue. Purge() error String() string }
QueueConsumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.
type QueueOptions ¶
type QueueOptions struct { // Queue name. Name string // Minimum number of goroutines processing messages. // Default is 1. MinNumWorker int32 // Maximum number of goroutines processing messages. // Default is 32 * number of CPUs. MaxNumWorker int32 // Global limit of concurrently running workers across all servers. // Overrides MaxNumWorker. WorkerLimit int32 // Maximum number of goroutines fetching messages. // Default is 8 * number of CPUs. MaxNumFetcher int32 // Number of messages reserved by a fetcher in the queue in one request. // Default is 10 messages. ReservationSize int // Time after which the reserved message is returned to the queue. // Default is 5 minutes. ReservationTimeout time.Duration // Time that a long polling receive call waits for a message to become // available before returning an empty response. // Default is 10 seconds. WaitTimeout time.Duration // Size of the buffer where reserved messages are stored. // Default is the same as ReservationSize. BufferSize int // Number of consecutive failures after which queue processing is paused. // Default is 100 failures. PauseErrorsThreshold int // Processing rate limit. RateLimit redis_rate.Limit // Optional rate limiter. The default is to use Redis. RateLimiter *redis_rate.Limiter // Redis client that is used for storing metadata. Redis Redis // Optional storage interface. The default is to use Redis. Storage Storage // Optional message handler. The default is the global Tasks registry. Handler Handler // ConsumerIdleTimeout Time after which the consumer need to be deleted. // Default is 6 hour ConsumerIdleTimeout time.Duration // SchedulerBackoffTime is the time of backoff for the scheduler( // Scheduler was designed to clean zombie Consumer and requeue pending msgs, and so on. // Default is randomly between 1~1.5s // We can change it to a bigger value so that it won't slowdown the redis when using redis queue. // It will be between SchedulerBackoffTime and SchedulerBackoffTime+250ms. SchedulerBackoffTime time.Duration // contains filtered or unexported fields }
func (*QueueOptions) Init ¶
func (opt *QueueOptions) Init()
type Redis ¶
type Redis interface { Del(ctx context.Context, keys ...string) *redis.IntCmd SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) // Eval Required by redislock Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd ScriptLoad(ctx context.Context, script string) *redis.StringCmd EvalRO(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd EvalShaRO(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd }
type Storage ¶
func NewLocalStorage ¶
func NewLocalStorage() Storage
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
func RegisterTask ¶
func RegisterTask(opt *TaskOptions) *Task
func (*Task) HandleMessage ¶
func (*Task) Options ¶
func (t *Task) Options() *TaskOptions
type TaskMap ¶
type TaskMap struct {
// contains filtered or unexported fields
}
var Tasks TaskMap
func (*TaskMap) HandleMessage ¶
func (*TaskMap) Unregister ¶
type TaskOptions ¶
type TaskOptions struct { // Task name. Name string // Function called to process a message. // There are three permitted types of signature: // 1. A zero-argument function // 2. A function whose arguments are assignable in type from those which are passed in the message // 3. A function which takes a single `*Message` argument // The handler function may also optionally take a Context as a first argument and may optionally return an error. // If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to // `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/. Handler interface{} // Function called to process failed message after the specified number of retries have all failed. // The FallbackHandler accepts the same types of function as the Handler. FallbackHandler interface{} // Optional function used by Consumer with defer statement // to recover from panics. DeferFunc func() // Number of tries/releases after which the message fails permanently // and is deleted. // Default is 64 retries. RetryLimit int // Minimum backoff time between retries. // Default is 30 seconds. MinBackoff time.Duration // Maximum backoff time between retries. // Default is 30 minutes. MaxBackoff time.Duration // contains filtered or unexported fields }