Versions in this module Expand all Collapse all v1 v1.0.2 May 19, 2020 Changes in this version + func ActiveSign(active bool) string + func OpenConnection(tag, network, address string, db int) *redisConnection + func OpenConnectionWithRedisClient(tag string, redisClient *redis.Client) *redisConnection + type BatchConsumer interface + Consume func(batch Deliveries) + type Cleaner struct + func NewCleaner(connection *redisConnection) *Cleaner + func (cleaner *Cleaner) Clean() error + func (cleaner *Cleaner) CleanConnection(connection *redisConnection) error + func (cleaner *Cleaner) CleanQueue(queue *redisQueue) + type Connection interface + CollectStats func(queueList []string) Stats + GetOpenQueues func() []string + OpenQueue func(name string) Queue + type ConnectionStat struct + func (stat ConnectionStat) String() string + type ConnectionStats map[string]ConnectionStat + type Consumer interface + Consume func(delivery Delivery) + type Deliveries []Delivery + func (deliveries Deliveries) Ack() int + func (deliveries Deliveries) Reject() int + type Delivery interface + Ack func() bool + Payload func() string + Push func() bool + Reject func() bool + type Queue interface + AddBatchConsumer func(tag string, batchSize int, consumer BatchConsumer) string + AddBatchConsumerWithTimeout func(tag string, batchSize int, timeout time.Duration, consumer BatchConsumer) string + AddConsumer func(tag string, consumer Consumer) string + Close func() bool + Publish func(payload string) bool + PublishBytes func(payload []byte) bool + PublishBytesOnDelay func(payload []byte, delayedAt time.Time) bool + PublishOnDelay func(payload string, delayedAt time.Time) bool + PublishRejected func(payload string) bool + PurgeReady func() int + PurgeRejected func() int + ReturnAllRejected func() int + ReturnRejected func(count int) int + SetPushQueue func(pushQueue Queue) + StartConsuming func(prefetchLimit int, pollDuration time.Duration) bool + StopConsuming func() bool + type QueueStat struct + ReadyCount int + RejectedCount int + func NewQueueStat(readyCount, rejectedCount int) QueueStat + func (stat QueueStat) ConnectionCount() int + func (stat QueueStat) ConsumerCount() int + func (stat QueueStat) String() string + func (stat QueueStat) UnackedCount() int + type QueueStats map[string]QueueStat + type State int + const Acked + const Delayed + const Pushed + const Rejected + const Unacked + func (i State) String() string + type Stats struct + QueueStats QueueStats + func CollectStats(queueList []string, mainConnection *redisConnection) Stats + func NewStats() Stats + func (stats Stats) GetHtml(layout, refresh string) string + func (stats Stats) String() string + type TestBatchConsumer struct + LastBatch Deliveries + func NewTestBatchConsumer() *TestBatchConsumer + func (consumer *TestBatchConsumer) Consume(batch Deliveries) + func (consumer *TestBatchConsumer) Finish() + type TestConnection struct + func NewTestConnection() TestConnection + func (connection TestConnection) CollectStats(queueList []string) Stats + func (connection TestConnection) GetDeliveries(queueName string) []string + func (connection TestConnection) GetDelivery(queueName string, index int) string + func (connection TestConnection) GetOpenQueues() []string + func (connection TestConnection) OpenQueue(name string) Queue + func (connection TestConnection) Reset() + type TestConsumer struct + AutoAck bool + AutoFinish bool + LastDeliveries []Delivery + LastDelivery Delivery + SleepDuration time.Duration + func NewTestConsumer(name string) *TestConsumer + func (consumer *TestConsumer) Consume(delivery Delivery) + func (consumer *TestConsumer) Finish() + func (consumer *TestConsumer) String() string + type TestDelivery struct + State State + func NewTestDelivery(content interface{}) *TestDelivery + func NewTestDeliveryString(payload string) *TestDelivery + func (delivery *TestDelivery) Ack() bool + func (delivery *TestDelivery) Delay(delayedAt time.Time) bool + func (delivery *TestDelivery) Payload() string + func (delivery *TestDelivery) Push() bool + func (delivery *TestDelivery) Reject() bool + type TestQueue struct + LastDeliveries []string + func NewTestQueue(name string) *TestQueue + func (queue *TestQueue) AddBatchConsumer(tag string, batchSize int, consumer BatchConsumer) string + func (queue *TestQueue) AddBatchConsumerWithTimeout(tag string, batchSize int, timeout time.Duration, consumer BatchConsumer) string + func (queue *TestQueue) AddConsumer(tag string, consumer Consumer) string + func (queue *TestQueue) Close() bool + func (queue *TestQueue) Publish(payload string) bool + func (queue *TestQueue) PublishBytes(payload []byte) bool + func (queue *TestQueue) PublishBytesOnDelay(payload []byte, delayedAt time.Time) bool + func (queue *TestQueue) PublishOnDelay(payload string, delayedAt time.Time) bool + func (queue *TestQueue) PublishRejected(payload string) bool + func (queue *TestQueue) PurgeReady() int + func (queue *TestQueue) PurgeRejected() int + func (queue *TestQueue) Reset() + func (queue *TestQueue) ReturnAllRejected() int + func (queue *TestQueue) ReturnRejected(count int) int + func (queue *TestQueue) SetPushQueue(pushQueue Queue) + func (queue *TestQueue) StartConsuming(prefetchLimit int, pollDuration time.Duration) bool + func (queue *TestQueue) StopConsuming() bool + func (queue *TestQueue) String() string