Documentation ¶
Index ¶
- Constants
- Variables
- func UseHashTagKey() interface{}
- func WithMsgTTL(d time.Duration) interface{}
- func WithRetryCount(count int) interface{}
- type CallbackFunc
- type DelayQueue
- func (q *DelayQueue) DisableListener()
- func (q *DelayQueue) DisableReport()
- func (q *DelayQueue) EnableReport()
- func (q *DelayQueue) GetPendingCount() (int64, error)
- func (q *DelayQueue) GetProcessingCount() (int64, error)
- func (q *DelayQueue) GetReadyCount() (int64, error)
- func (q *DelayQueue) ListenEvent(listener EventListener)
- func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error
- func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error
- func (q *DelayQueue) StartConsume() (done <-chan struct{})
- func (q *DelayQueue) StopConsume()
- func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue
- func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue
- func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue
- func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue
- func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue
- func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue
- func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue
- type Event
- type EventListener
- type Logger
- type Monitor
- type Publisher
- type RedisCli
Constants ¶
const ( // NewMessageEvent emmited when send message NewMessageEvent = iota + 1 // ReadyEvent emmited when messages has reached delivery time ReadyEvent // DeliveredEvent emmited when messages has been delivered to consumer DeliveredEvent // AckEvent emmited when receive message successfully consumed callback AckEvent // NackEvent emmited when receive message consumption failure callback NackEvent // RetryEvent emmited when message re-delivered to consumer RetryEvent // FinalFailedEvent emmited when message reaches max retry attempts FinalFailedEvent )
Variables ¶
var NilErr = errors.New("nil")
NilErr represents redis nil
Functions ¶
func UseHashTagKey ¶
func UseHashTagKey() interface{}
UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis see more: https://redis.io/docs/reference/cluster-spec/#hash-tags
func WithMsgTTL ¶
WithMsgTTL set ttl for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
func WithRetryCount ¶
func WithRetryCount(count int) interface{}
WithRetryCount set retry count for a msg example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3))
Types ¶
type CallbackFunc ¶
CallbackFunc receives and consumes messages returns true to confirm successfully consumed, false to re-deliver this message
type DelayQueue ¶
type DelayQueue struct {
// contains filtered or unexported fields
}
DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
func NewQueue ¶
func NewQueue(name string, cli *redis.Client, opts ...interface{}) *DelayQueue
NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool { // callback returns true to confirm successful consumption. // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message return true })
func NewQueue0 ¶
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue
NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
func NewQueueOnCluster ¶
func NewQueueOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *DelayQueue
func (*DelayQueue) DisableListener ¶
func (q *DelayQueue) DisableListener()
RemoveListener stops reporting events to EventListener
func (*DelayQueue) DisableReport ¶
func (q *DelayQueue) DisableReport()
DisableReport stops reporting to monitor
func (*DelayQueue) EnableReport ¶
func (q *DelayQueue) EnableReport()
EnableReport enables reporting to monitor
func (*DelayQueue) GetPendingCount ¶
func (q *DelayQueue) GetPendingCount() (int64, error)
GetPendingCount returns the number of pending messages
func (*DelayQueue) GetProcessingCount ¶
func (q *DelayQueue) GetProcessingCount() (int64, error)
GetProcessingCount returns the number of messages which are being processed
func (*DelayQueue) GetReadyCount ¶
func (q *DelayQueue) GetReadyCount() (int64, error)
GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
func (*DelayQueue) ListenEvent ¶
func (q *DelayQueue) ListenEvent(listener EventListener)
ListenEvent register a listener which will be called when events occur, so it can be used to monitor running status
But It can ONLY receive events from the CURRENT INSTANCE, if you want to listen to all events in queue, just use Monitor.ListenEvent
There can be AT MOST ONE EventListener in an DelayQueue instance. If you are using customized listener, Monitor will stop working
func (*DelayQueue) SendDelayMsg ¶
func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error
SendDelayMsg submits a message delivered after given duration
func (*DelayQueue) SendScheduleMsg ¶
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error
SendScheduleMsg submits a message delivered at given time
func (*DelayQueue) StartConsume ¶
func (q *DelayQueue) StartConsume() (done <-chan struct{})
StartConsume creates a goroutine to consume message from DelayQueue use `<-done` to wait consumer stopping If there is no callback set, StartConsume will panic
func (*DelayQueue) StopConsume ¶
func (q *DelayQueue) StopConsume()
StopConsume stops consumer goroutine
func (*DelayQueue) WithCallback ¶
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue
WithCallback set callback for queue to receives and consumes messages callback returns true to confirm successfully consumed, false to re-deliver this message
func (*DelayQueue) WithConcurrent ¶
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue
WithConcurrent sets the number of concurrent consumers
func (*DelayQueue) WithDefaultRetryCount ¶
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue
WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
func (*DelayQueue) WithFetchInterval ¶
func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue
WithFetchInterval customizes the interval at which consumer fetch message from redis
func (*DelayQueue) WithFetchLimit ¶
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue
WithFetchLimit limits the max number of processing messages, 0 means no limit
func (*DelayQueue) WithLogger ¶
func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue
WithLogger customizes logger for queue
func (*DelayQueue) WithMaxConsumeDuration ¶
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue
WithMaxConsumeDuration customizes max consume duration If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
type Event ¶
type Event struct { // Code represents event type, such as NewMessageEvent, ReadyEvent Code int // Timestamp is the event time Timestamp int64 // MsgCount represents the number of messages related to the event MsgCount int }
Event contains internal event information during the queue operation and can be used to monitor the queue status.
type EventListener ¶
type EventListener interface { // OnEvent will be called when events occur OnEvent(*Event) }
EventListener which will be called when events occur This Listener can be used to monitor running status
type Monitor ¶
type Monitor struct {
// contains filtered or unexported fields
}
Monitor can get running status and events of DelayQueue
func NewMonitor ¶
NewPublisher creates a new Publisher by a *redis.Client
func NewMonitor0 ¶
NewMonitor0 creates a new Monitor by a RedisCli instance
func (*Monitor) GetPendingCount ¶
GetPendingCount returns the number of messages which delivery time has not arrived
func (*Monitor) GetProcessingCount ¶
GetProcessingCount returns the number of messages which are being processed
func (*Monitor) GetReadyCount ¶
GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet
func (*Monitor) ListenEvent ¶
func (m *Monitor) ListenEvent(listener EventListener) (func(), error)
ListenEvent register a listener which will be called when events occured in this queue so it can be used to monitor running status returns: close function, error
func (*Monitor) WithLogger ¶
WithLogger customizes logger for queue
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher only publishes messages to delayqueue, it is a encapsulation of delayqueue
func NewPublisher ¶
NewPublisher creates a new Publisher by a *redis.Client
func NewPublisher0 ¶
NewPublisher0 creates a new Publisher by a RedisCli instance
func (*Publisher) SendDelayMsg ¶
SendDelayMsg submits a message delivered after given duration
func (*Publisher) SendScheduleMsg ¶
SendScheduleMsg submits a message delivered at given time
func (*Publisher) WithLogger ¶
WithLogger customizes logger for queue
type RedisCli ¶
type RedisCli interface { // Eval sends lua script to redis // args should be string, integer or float // returns string, int64, []interface{} (elements can be string or int64) Eval(script string, keys []string, args []interface{}) (interface{}, error) Set(key string, value string, expiration time.Duration) error // Get represents redis command GET // please NilErr when no such key in redis Get(key string) (string, error) Del(keys []string) error HSet(key string, field string, value string) error HDel(key string, fields []string) error SMembers(key string) ([]string, error) SRem(key string, members []string) error ZAdd(key string, values map[string]float64) error ZRem(key string, fields []string) error ZCard(key string) (int64, error) LLen(key string) (int64, error) // Publish used for monitor only Publish(channel string, payload string) error // Subscribe used for monitor only // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well Subscribe(channel string) (payloads <-chan string, close func(), err error) }
RedisCli is abstraction for redis client, required commands only not all commands