Versions in this module Expand all Collapse all v4 v4.0.3 Feb 25, 2022 Changes in this version + const HeartbeatErrorLimit + var ErrorAlreadyConsuming = errors.New("must not call StartConsuming() multiple times") + var ErrorConsumingStopped = errors.New("consuming stopped") + var ErrorNotConsuming = errors.New("must call StartConsuming() before adding consumers") + var ErrorNotFound = errors.New("entity not found") + func ActiveSign(active bool) string + func RandomString(n int) string + type BatchConsumer interface + Consume func(batch Deliveries) + type Cleaner struct + func NewCleaner(connection Connection) *Cleaner + func (cleaner *Cleaner) Clean() (returned int64, err error) + type Connection interface + CollectStats func(queueList []string) (Stats, error) + GetOpenQueues func() ([]string, error) + OpenQueue func(name string) (Queue, error) + StopAllConsuming func() <-chan struct{} + func OpenConnection(tag string, network string, address string, db int, errChan chan<- error) (Connection, error) + func OpenConnectionWithRedisClient(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error) + func OpenConnectionWithRmqRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (Connection, error) + func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (Connection, error) + type ConnectionStat struct + func (stat ConnectionStat) String() string + type ConnectionStats map[string]ConnectionStat + type ConsumeError struct + Count int + RedisErr error + func (e *ConsumeError) Error() string + func (e *ConsumeError) Unwrap() error + type Consumer interface + Consume func(delivery Delivery) + type ConsumerFunc func(Delivery) + func (consumerFunc ConsumerFunc) Consume(delivery Delivery) + type Deliveries []Delivery + func (deliveries Deliveries) Ack() (errMap map[int]error) + func (deliveries Deliveries) Payloads() []string + func (deliveries Deliveries) Push() (errMap map[int]error) + func (deliveries Deliveries) Reject() (errMap map[int]error) + type Delivery interface + Ack func() error + Payload func() string + Push func() error + Reject func() error + type DeliveryError struct + Count int + Delivery Delivery + RedisErr error + func (e *DeliveryError) Error() string + func (e *DeliveryError) Unwrap() error + type HeartbeatError struct + Count int + RedisErr error + func (e *HeartbeatError) Error() string + func (e *HeartbeatError) Unwrap() error + type Queue interface + AddBatchConsumer func(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error) + AddConsumer func(tag string, consumer Consumer) (string, error) + AddConsumerFunc func(tag string, consumerFunc ConsumerFunc) (string, error) + Destroy func() (readyCount, rejectedCount int64, err error) + Publish func(payload ...string) error + PublishBytes func(payload ...[]byte) error + PurgeReady func() (int64, error) + PurgeRejected func() (int64, error) + ReturnRejected func(max int64) (int64, error) + ReturnUnacked func(max int64) (int64, error) + SetPushQueue func(pushQueue Queue) + StartConsuming func(prefetchLimit int64, pollDuration time.Duration) error + StopConsuming func() <-chan struct{} + type QueueStat struct + ReadyCount int64 + RejectedCount int64 + func NewQueueStat(readyCount, rejectedCount int64) QueueStat + func (stat QueueStat) ConnectionCount() int64 + func (stat QueueStat) ConsumerCount() int64 + func (stat QueueStat) String() string + func (stat QueueStat) UnackedCount() int64 + type QueueStats map[string]QueueStat + type RedisClient interface + Del func(key string) (affected int64, err error) + FlushDb func() error + LLen func(key string) (affected int64, err error) + LPush func(key string, value ...string) (total int64, err error) + LRem func(key string, count int64, value string) (affected int64, err error) + LTrim func(key string, start, stop int64) error + RPopLPush func(source, destination string) (value string, err error) + SAdd func(key, value string) (total int64, err error) + SMembers func(key string) (members []string, err error) + SRem func(key, value string) (affected int64, err error) + Set func(key string, value string, expiration time.Duration) error + TTL func(key string) (ttl time.Duration, err error) + type RedisWrapper struct + func (wrapper RedisWrapper) Del(key string) (affected int64, err error) + func (wrapper RedisWrapper) FlushDb() error + func (wrapper RedisWrapper) LLen(key string) (affected int64, err error) + func (wrapper RedisWrapper) LPush(key string, value ...string) (total int64, err error) + func (wrapper RedisWrapper) LRem(key string, count int64, value string) (affected int64, err error) + func (wrapper RedisWrapper) LTrim(key string, start, stop int64) error + func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, err error) + func (wrapper RedisWrapper) SAdd(key, value string) (total int64, err error) + func (wrapper RedisWrapper) SMembers(key string) (members []string, err error) + func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error) + func (wrapper RedisWrapper) Set(key string, value string, expiration time.Duration) error + func (wrapper RedisWrapper) TTL(key string) (ttl time.Duration, err error) + type State int + const Acked + const Pushed + const Rejected + const Unacked + func (i State) String() string + type Stats struct + QueueStats QueueStats + func CollectStats(queueList []string, mainConnection Connection) (Stats, error) + func NewStats() Stats + func (stats Stats) GetHtml(layout, refresh string) string + func (stats Stats) String() string + type TestBatchConsumer struct + AutoFinish bool + ConsumedCount int64 + LastBatch Deliveries + func NewTestBatchConsumer() *TestBatchConsumer + func (consumer *TestBatchConsumer) Consume(batch Deliveries) + func (consumer *TestBatchConsumer) Finish() + type TestConnection struct + func NewTestConnection() TestConnection + func (TestConnection) CollectStats([]string) (Stats, error) + func (TestConnection) GetOpenQueues() ([]string, error) + func (TestConnection) StopAllConsuming() <-chan struct{} + func (connection TestConnection) GetDeliveries(queueName string) []string + func (connection TestConnection) GetDelivery(queueName string, index int) string + func (connection TestConnection) OpenQueue(name string) (Queue, error) + func (connection TestConnection) Reset() + type TestConsumer struct + AutoAck bool + AutoFinish bool + LastDeliveries []Delivery + LastDelivery Delivery + SleepDuration time.Duration + func NewTestConsumer(name string) *TestConsumer + func (consumer *TestConsumer) Consume(delivery Delivery) + func (consumer *TestConsumer) Finish() + func (consumer *TestConsumer) FinishAll() + func (consumer *TestConsumer) String() string + type TestDelivery struct + State State + func NewTestDelivery(content interface{}) *TestDelivery + func NewTestDeliveryString(payload string) *TestDelivery + func (delivery *TestDelivery) Ack() error + func (delivery *TestDelivery) Payload() string + func (delivery *TestDelivery) Push() error + func (delivery *TestDelivery) Reject() error + type TestQueue struct + LastDeliveries []string + func NewTestQueue(name string) *TestQueue + func (*TestQueue) AddBatchConsumer(string, int64, time.Duration, BatchConsumer) (string, error) + func (*TestQueue) AddConsumer(string, Consumer) (string, error) + func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error) + func (*TestQueue) Destroy() (int64, int64, error) + func (*TestQueue) PurgeReady() (int64, error) + func (*TestQueue) PurgeRejected() (int64, error) + func (*TestQueue) ReturnRejected(int64) (int64, error) + func (*TestQueue) ReturnUnacked(int64) (int64, error) + func (*TestQueue) SetPushQueue(Queue) + func (*TestQueue) StartConsuming(int64, time.Duration) error + func (*TestQueue) StopConsuming() <-chan struct{} + func (queue *TestQueue) Publish(payload ...string) error + func (queue *TestQueue) PublishBytes(payload ...[]byte) error + func (queue *TestQueue) Reset() + func (queue *TestQueue) String() string + type TestRedisClient struct + func NewTestRedisClient() *TestRedisClient + func (client *TestRedisClient) Del(key string) (affected int64, err error) + func (client *TestRedisClient) FlushDb() error + func (client *TestRedisClient) Get(key string) (string, error) + func (client *TestRedisClient) LLen(key string) (affected int64, err error) + func (client *TestRedisClient) LPush(key string, value ...string) (total int64, err error) + func (client *TestRedisClient) LRem(key string, count int64, value string) (affected int64, err error) + func (client *TestRedisClient) LTrim(key string, start, stop int64) error + func (client *TestRedisClient) RPopLPush(source, destination string) (value string, err error) + func (client *TestRedisClient) SAdd(key, value string) (total int64, err error) + func (client *TestRedisClient) SMembers(key string) (members []string, err error) + func (client *TestRedisClient) SRem(key, value string) (affected int64, err error) + func (client *TestRedisClient) Set(key string, value string, expiration time.Duration) error + func (client *TestRedisClient) TTL(key string) (ttl time.Duration, err error) Other modules containing this package github.com/jesse0michael/rmq github.com/jesse0michael/rmq/v3