Versions in this module Expand all Collapse all v2 v2.2.4 Nov 12, 2020 Changes in this version + var ErrAsyncTask = errors.New("taskq: async task") + var ErrDuplicate = errors.New("taskq: message with such name already exists") + func SetLogger(logger *log.Logger) + func SetUnknownTaskOptions(opt *TaskOptions) + type Consumer struct + func NewConsumer(q Queue) *Consumer + func StartConsumer(ctx context.Context, q Queue) *Consumer + func (c *Consumer) Add(msg *Message) error + func (c *Consumer) AddHook(hook ConsumerHook) + func (c *Consumer) Len() int + func (c *Consumer) Options() *QueueOptions + func (c *Consumer) Process(msg *Message) error + func (c *Consumer) ProcessAll(ctx context.Context) error + func (c *Consumer) ProcessOne(ctx context.Context) error + func (c *Consumer) Purge() error + func (c *Consumer) Put(msg *Message) + func (c *Consumer) Queue() Queue + func (c *Consumer) Start(ctx context.Context) error + func (c *Consumer) Stats() *ConsumerStats + func (c *Consumer) Stop() error + func (c *Consumer) StopTimeout(timeout time.Duration) error + func (c *Consumer) String() string + type ConsumerHook interface + AfterProcessMessage func(*ProcessMessageEvent) error + BeforeProcessMessage func(*ProcessMessageEvent) error + type ConsumerStats struct + BufferSize uint32 + Buffered uint32 + Fails uint32 + FetcherNumber uint32 + InFlight uint32 + Processed uint32 + Retries uint32 + WorkerNumber uint32 + type Delayer interface + Delay func() time.Duration + type Factory interface + Close func() error + Range func(func(Queue) bool) + RegisterQueue func(*QueueOptions) Queue + StartConsumers func(context.Context) error + StopConsumers func() error + type Handler interface + HandleMessage func(msg *Message) error + func NewHandler(fn interface{}) Handler + type HandlerFunc func(*Message) error + func (fn HandlerFunc) HandleMessage(msg *Message) error + type Message struct + Args []interface{} + ArgsBin []byte + ArgsCompression string + Ctx context.Context + Delay time.Duration + Err error + ID string + Name string + ReservationID string + ReservedCount int + TaskName string + func NewMessage(ctx context.Context, args ...interface{}) *Message + func (m *Message) MarshalArgs() ([]byte, error) + func (m *Message) MarshalBinary() ([]byte, error) + func (m *Message) OnceInPeriod(period time.Duration, args ...interface{}) *Message + func (m *Message) SetDelay(delay time.Duration) *Message + func (m *Message) String() string + func (m *Message) UnmarshalBinary(b []byte) error + type ProcessMessageEvent struct + Err error + Message *Message + StartTime time.Time + Stash map[interface{}]interface{} + type Queue interface + Add func(msg *Message) error + Close func() error + CloseTimeout func(timeout time.Duration) error + Consumer func() *Consumer + Delete func(msg *Message) error + Len func() (int, error) + Name func() string + Options func() *QueueOptions + Purge func() error + Release func(msg *Message) error + ReserveN func(n int, waitTimeout time.Duration) ([]Message, error) + type QueueOptions struct + BufferSize int + Handler Handler + MaxFetchers int + MaxWorkers int + MinWorkers int + Name string + PauseErrorsThreshold int + RateLimit rate.Limit + RateLimiter RateLimiter + Redis Redis + ReservationSize int + ReservationTimeout time.Duration + Storage Storage + WaitTimeout time.Duration + WorkerLimit int + func (opt *QueueOptions) Init() + type RateLimiter interface + AllowRate func(name string, limit rate.Limit) (delay time.Duration, allow bool) + type Redis interface + Del func(keys ...string) *redis.IntCmd + Eval func(script string, keys []string, args ...interface{}) *redis.Cmd + EvalSha func(sha1 string, keys []string, args ...interface{}) *redis.Cmd + Pipelined func(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error) + ScriptExists func(scripts ...string) *redis.BoolSliceCmd + ScriptLoad func(script string) *redis.StringCmd + SetNX func(key string, value interface{}, expiration time.Duration) *redis.BoolCmd + type Storage interface + Exists func(key string) bool + type Task struct + func RegisterTask(opt *TaskOptions) *Task + func (t *Task) HandleMessage(msg *Message) error + func (t *Task) Name() string + func (t *Task) Options() *TaskOptions + func (t *Task) String() string + func (t *Task) WithArgs(ctx context.Context, args ...interface{}) *Message + type TaskMap struct + var Tasks TaskMap + func (r *TaskMap) Get(name string) *Task + func (r *TaskMap) HandleMessage(msg *Message) error + func (r *TaskMap) Range(fn func(name string, task *Task) bool) + func (r *TaskMap) Register(opt *TaskOptions) (*Task, error) + func (r *TaskMap) Reset() + func (r *TaskMap) Unregister(task *Task) + type TaskOptions struct + DeferFunc func() + FallbackHandler interface{} + Handler interface{} + MaxBackoff time.Duration + MinBackoff time.Duration + Name string + RetryLimit int Other modules containing this package github.com/airbrake/taskq