Documentation ¶
Index ¶
- Constants
- Variables
- func ActiveSign(active bool) string
- func ExtractHeaderAndPayload(payload string) (http.Header, string, error)
- func PayloadBytesWithHeader(payload []byte, header http.Header) []byte
- func PayloadWithHeader(payload string, header http.Header) string
- func RandomString(n int) string
- type BatchConsumer
- type BatchConsumerFunc
- type Cleaner
- type Connection
- func OpenClusterConnection(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error)
- func OpenConnection(tag string, network string, address string, db int, errChan chan<- error) (Connection, error)
- func OpenConnectionWithRedisClient(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error)
- func OpenConnectionWithRedisOptions(tag string, redisOption *redis.Options, errChan chan<- error) (Connection, error)
- func OpenConnectionWithRmqRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (Connection, error)
- func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (Connection, error)
- type ConnectionStat
- type ConnectionStats
- type ConsumeError
- type Consumer
- type ConsumerFunc
- type Deliveries
- type Delivery
- type DeliveryError
- type HeartbeatError
- type Queue
- type QueueStat
- type QueueStats
- type RedisClient
- type RedisWrapper
- func (wrapper RedisWrapper) Del(key string) (affected int64, err error)
- func (wrapper RedisWrapper) FlushDb() error
- func (wrapper RedisWrapper) LLen(key string) (affected int64, err error)
- func (wrapper RedisWrapper) LPush(key string, value ...string) (total int64, err error)
- func (wrapper RedisWrapper) LRem(key string, count int64, value string) (affected int64, err error)
- func (wrapper RedisWrapper) LTrim(key string, start, stop int64) error
- func (wrapper RedisWrapper) RPop(key string) (value string, err error)
- func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, err error)
- func (wrapper RedisWrapper) SAdd(key, value string) (total int64, err error)
- func (wrapper RedisWrapper) SMembers(key string) (members []string, err error)
- func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error)
- func (wrapper RedisWrapper) Set(key string, value string, expiration time.Duration) error
- func (wrapper RedisWrapper) TTL(key string) (ttl time.Duration, err error)
- type State
- type Stats
- type TestBatchConsumer
- type TestConnection
- func (TestConnection) CollectStats([]string) (Stats, error)
- func (connection TestConnection) GetDeliveries(queueName string) []string
- func (connection TestConnection) GetDelivery(queueName string, index int) string
- func (TestConnection) GetOpenQueues() ([]string, error)
- func (connection TestConnection) OpenQueue(name string) (Queue, error)
- func (connection TestConnection) Reset()
- func (TestConnection) StopAllConsuming() <-chan struct{}
- type TestConsumer
- type TestDelivery
- type TestQueue
- func (*TestQueue) AddBatchConsumer(string, int64, time.Duration, BatchConsumer) (string, error)
- func (*TestQueue) AddBatchConsumerFunc(string, int64, time.Duration, BatchConsumerFunc) (string, error)
- func (*TestQueue) AddConsumer(string, Consumer) (string, error)
- func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error)
- func (*TestQueue) Destroy() (int64, int64, error)
- func (*TestQueue) Drain(count int64) ([]string, error)
- func (queue *TestQueue) Publish(payload ...string) error
- func (queue *TestQueue) PublishBytes(payload ...[]byte) error
- func (*TestQueue) PurgeReady() (int64, error)
- func (*TestQueue) PurgeRejected() (int64, error)
- func (queue *TestQueue) Reset()
- func (*TestQueue) ReturnRejected(int64) (int64, error)
- func (*TestQueue) ReturnUnacked(int64) (int64, error)
- func (*TestQueue) SetPushQueue(Queue)
- func (*TestQueue) StartConsuming(int64, time.Duration) error
- func (*TestQueue) StopConsuming() <-chan struct{}
- func (queue *TestQueue) String() string
- type TestRedisClient
- func (client *TestRedisClient) Del(key string) (affected int64, err error)
- func (client *TestRedisClient) FlushDb() error
- func (client *TestRedisClient) Get(key string) (string, error)
- func (client *TestRedisClient) LLen(key string) (affected int64, err error)
- func (client *TestRedisClient) LPush(key string, values ...string) (total int64, err error)
- func (client *TestRedisClient) LRem(key string, count int64, value string) (affected int64, err error)
- func (client *TestRedisClient) LTrim(key string, start, stop int64) error
- func (client *TestRedisClient) RPop(key string) (value string, err error)
- func (client *TestRedisClient) RPopLPush(source, destination string) (value string, err error)
- func (client *TestRedisClient) SAdd(key, value string) (total int64, err error)
- func (client *TestRedisClient) SMembers(key string) (members []string, err error)
- func (client *TestRedisClient) SRem(key, value string) (affected int64, err error)
- func (client *TestRedisClient) Set(key string, value string, expiration time.Duration) error
- func (client *TestRedisClient) TTL(key string) (ttl time.Duration, err error)
- type WithHeader
Examples ¶
Constants ¶
const (
HeartbeatErrorLimit = 45 // stop consuming after this many heartbeat errors
)
Variables ¶
var ( ErrorNotFound = errors.New("entity not found") // entitify being connection/queue/delivery ErrorAlreadyConsuming = errors.New("must not call StartConsuming() multiple times") ErrorNotConsuming = errors.New("must call StartConsuming() before adding consumers") ErrorConsumingStopped = errors.New("consuming stopped") )
Functions ¶
func ActiveSign ¶
func ExtractHeaderAndPayload ¶ added in v5.1.0
ExtractHeaderAndPayload splits augmented payload into header and original payload if specific signature is present.
func PayloadBytesWithHeader ¶ added in v5.1.0
PayloadBytesWithHeader creates payload bytes slice with header.
func PayloadWithHeader ¶ added in v5.1.0
PayloadWithHeader creates a payload string with header.
Example ¶
var ( pub, con rmq.Queue ) // .... h := make(http.Header) h.Set("X-Baz", "quux") // You can add header to your payload during publish. _ = pub.Publish(rmq.PayloadWithHeader(`{"foo":"bar"}`, h)) // .... _, _ = con.AddConsumerFunc("tag", func(delivery rmq.Delivery) { // And receive header back in consumer. delivery.(rmq.WithHeader).Header().Get("X-Baz") // "quux" // .... })
Output:
Types ¶
type BatchConsumer ¶
type BatchConsumer interface {
Consume(batch Deliveries)
}
type BatchConsumerFunc ¶ added in v5.1.2
type BatchConsumerFunc func(Deliveries)
func (BatchConsumerFunc) Consume ¶ added in v5.1.2
func (batchConsumerFunc BatchConsumerFunc) Consume(batch Deliveries)
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
func NewCleaner ¶
func NewCleaner(connection Connection) *Cleaner
func (*Cleaner) Clean ¶
Clean cleans the connection of the cleaner. This is useful to make sure no deliveries get lost. The main use case is if your consumers get restarted there will be unacked deliveries assigned to the connection. Once the heartbeat of that connection dies the cleaner can recognize that and remove those unacked deliveries back to the ready list. If there was no error it returns the number of deliveries which have been returned from unacked lists to ready lists across all cleaned connections and queues.
type Connection ¶
type Connection interface { OpenQueue(name string) (Queue, error) CollectStats(queueList []string) (Stats, error) GetOpenQueues() ([]string, error) StopAllConsuming() <-chan struct{} // contains filtered or unexported methods }
Connection is an interface that can be used to test publishing
func OpenClusterConnection ¶ added in v5.2.0
func OpenClusterConnection(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error)
OpenClusterConnection: Same as OpenConnectionWithRedisClient, but using Redis hash tags {} instead of [].
func OpenConnection ¶
func OpenConnection(tag string, network string, address string, db int, errChan chan<- error) (Connection, error)
OpenConnection opens and returns a new connection
func OpenConnectionWithRedisClient ¶
func OpenConnectionWithRedisClient(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error)
OpenConnectionWithRedisClient opens and returns a new connection This can be used to passa redis.ClusterClient.
func OpenConnectionWithRedisOptions ¶ added in v5.2.0
func OpenConnectionWithRedisOptions(tag string, redisOption *redis.Options, errChan chan<- error) (Connection, error)
OpenConnectionWithRedisOptions allows you to pass more flexible options
func OpenConnectionWithRmqRedisClient ¶
func OpenConnectionWithRmqRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (Connection, error)
OpenConnectionWithRmqRedisClient: If you would like to use a redis client other than the ones supported in the constructors above, you can implement the RedisClient interface yourself
func OpenConnectionWithTestRedisClient ¶
func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (Connection, error)
OpenConnectionWithTestRedisClient opens and returns a new connection which uses a test redis client internally. This is useful in integration tests.
type ConnectionStat ¶
type ConnectionStat struct {
// contains filtered or unexported fields
}
func (ConnectionStat) String ¶
func (stat ConnectionStat) String() string
type ConnectionStats ¶
type ConnectionStats map[string]ConnectionStat
type ConsumeError ¶
func (*ConsumeError) Error ¶
func (e *ConsumeError) Error() string
func (*ConsumeError) Unwrap ¶
func (e *ConsumeError) Unwrap() error
type ConsumerFunc ¶
type ConsumerFunc func(Delivery)
func (ConsumerFunc) Consume ¶
func (consumerFunc ConsumerFunc) Consume(delivery Delivery)
type Deliveries ¶
type Deliveries []Delivery
func (Deliveries) Ack ¶
func (deliveries Deliveries) Ack() (errMap map[int]error)
func (Deliveries) Payloads ¶
func (deliveries Deliveries) Payloads() []string
func (Deliveries) Push ¶
func (deliveries Deliveries) Push() (errMap map[int]error)
func (Deliveries) Reject ¶
func (deliveries Deliveries) Reject() (errMap map[int]error)
type DeliveryError ¶
type DeliveryError struct { Delivery Delivery RedisErr error Count int // number of consecutive errors }
func (*DeliveryError) Error ¶
func (e *DeliveryError) Error() string
func (*DeliveryError) Unwrap ¶
func (e *DeliveryError) Unwrap() error
type HeartbeatError ¶
func (*HeartbeatError) Error ¶
func (e *HeartbeatError) Error() string
func (*HeartbeatError) Unwrap ¶
func (e *HeartbeatError) Unwrap() error
type Queue ¶
type Queue interface { Publish(payload ...string) error PublishBytes(payload ...[]byte) error SetPushQueue(pushQueue Queue) StartConsuming(prefetchLimit int64, pollDuration time.Duration) error StopConsuming() <-chan struct{} AddConsumer(tag string, consumer Consumer) (string, error) AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error) AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error) AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error) PurgeReady() (int64, error) PurgeRejected() (int64, error) ReturnUnacked(max int64) (int64, error) ReturnRejected(max int64) (int64, error) Destroy() (readyCount, rejectedCount int64, err error) Drain(count int64) ([]string, error) // contains filtered or unexported methods }
type QueueStat ¶
type QueueStat struct { ReadyCount int64 `json:"ready"` RejectedCount int64 `json:"rejected"` // contains filtered or unexported fields }
func NewQueueStat ¶
func (QueueStat) ConnectionCount ¶
func (QueueStat) ConsumerCount ¶
func (QueueStat) UnackedCount ¶
type QueueStats ¶
type RedisClient ¶
type RedisClient interface { // simple keys Set(key string, value string, expiration time.Duration) error Del(key string) (affected int64, err error) TTL(key string) (ttl time.Duration, err error) // lists LPush(key string, value ...string) (total int64, err error) LLen(key string) (affected int64, err error) LRem(key string, count int64, value string) (affected int64, err error) LTrim(key string, start, stop int64) error RPopLPush(source, destination string) (value string, err error) RPop(key string) (value string, err error) // sets SAdd(key, value string) (total int64, err error) SMembers(key string) (members []string, err error) SRem(key, value string) (affected int64, err error) // special FlushDb() error }
type RedisWrapper ¶
type RedisWrapper struct {
// contains filtered or unexported fields
}
func (RedisWrapper) FlushDb ¶
func (wrapper RedisWrapper) FlushDb() error
func (RedisWrapper) LPush ¶
func (wrapper RedisWrapper) LPush(key string, value ...string) (total int64, err error)
func (RedisWrapper) RPopLPush ¶
func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, err error)
func (RedisWrapper) SAdd ¶
func (wrapper RedisWrapper) SAdd(key, value string) (total int64, err error)
func (RedisWrapper) SMembers ¶
func (wrapper RedisWrapper) SMembers(key string) (members []string, err error)
func (RedisWrapper) SRem ¶
func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error)
type Stats ¶
type Stats struct { QueueStats QueueStats `json:"queues"` // contains filtered or unexported fields }
func CollectStats ¶
func CollectStats(queueList []string, mainConnection Connection) (Stats, error)
type TestBatchConsumer ¶
type TestBatchConsumer struct { // Deprecated: use Last() to avoid data races. LastBatch Deliveries // Deprecated use Consumed() to avoid data races. ConsumedCount int64 AutoFinish bool // contains filtered or unexported fields }
func NewTestBatchConsumer ¶
func NewTestBatchConsumer() *TestBatchConsumer
func (*TestBatchConsumer) Consume ¶
func (consumer *TestBatchConsumer) Consume(batch Deliveries)
func (*TestBatchConsumer) Consumed ¶ added in v5.1.0
func (consumer *TestBatchConsumer) Consumed() int64
func (*TestBatchConsumer) Finish ¶
func (consumer *TestBatchConsumer) Finish()
func (*TestBatchConsumer) Last ¶ added in v5.1.0
func (consumer *TestBatchConsumer) Last() Deliveries
type TestConnection ¶
type TestConnection struct {
// contains filtered or unexported fields
}
func NewTestConnection ¶
func NewTestConnection() TestConnection
func (TestConnection) CollectStats ¶
func (TestConnection) CollectStats([]string) (Stats, error)
func (TestConnection) GetDeliveries ¶
func (connection TestConnection) GetDeliveries(queueName string) []string
func (TestConnection) GetDelivery ¶
func (connection TestConnection) GetDelivery(queueName string, index int) string
func (TestConnection) GetOpenQueues ¶
func (TestConnection) GetOpenQueues() ([]string, error)
func (TestConnection) OpenQueue ¶
func (connection TestConnection) OpenQueue(name string) (Queue, error)
func (TestConnection) Reset ¶
func (connection TestConnection) Reset()
func (TestConnection) StopAllConsuming ¶
func (TestConnection) StopAllConsuming() <-chan struct{}
type TestConsumer ¶
type TestConsumer struct { AutoAck bool AutoFinish bool SleepDuration time.Duration // Deprecated: use Last() to avoid data races. LastDelivery Delivery // Deprecated: use Deliveries() to avoid data races. LastDeliveries []Delivery // contains filtered or unexported fields }
func NewTestConsumer ¶
func NewTestConsumer(name string) *TestConsumer
func (*TestConsumer) Consume ¶
func (consumer *TestConsumer) Consume(delivery Delivery)
func (*TestConsumer) Deliveries ¶ added in v5.1.0
func (consumer *TestConsumer) Deliveries() []Delivery
func (*TestConsumer) Finish ¶
func (consumer *TestConsumer) Finish()
func (*TestConsumer) FinishAll ¶
func (consumer *TestConsumer) FinishAll()
func (*TestConsumer) Last ¶ added in v5.1.0
func (consumer *TestConsumer) Last() Delivery
func (*TestConsumer) String ¶
func (consumer *TestConsumer) String() string
type TestDelivery ¶
type TestDelivery struct { State State // contains filtered or unexported fields }
func NewTestDelivery ¶
func NewTestDelivery(content interface{}) *TestDelivery
func NewTestDeliveryString ¶
func NewTestDeliveryString(payload string) *TestDelivery
func (*TestDelivery) Ack ¶
func (delivery *TestDelivery) Ack() error
func (*TestDelivery) Payload ¶
func (delivery *TestDelivery) Payload() string
func (*TestDelivery) Push ¶
func (delivery *TestDelivery) Push() error
func (*TestDelivery) Reject ¶
func (delivery *TestDelivery) Reject() error
type TestQueue ¶
type TestQueue struct { LastDeliveries []string // contains filtered or unexported fields }
func NewTestQueue ¶
func (*TestQueue) AddBatchConsumer ¶
func (*TestQueue) AddBatchConsumerFunc ¶ added in v5.1.2
func (*TestQueue) AddConsumerFunc ¶
func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error)
func (*TestQueue) PublishBytes ¶
func (*TestQueue) PurgeReady ¶
func (*TestQueue) PurgeRejected ¶
func (*TestQueue) SetPushQueue ¶
func (*TestQueue) StopConsuming ¶
func (*TestQueue) StopConsuming() <-chan struct{}
type TestRedisClient ¶
type TestRedisClient struct {
// contains filtered or unexported fields
}
TestRedisClient is a mock for redis
func NewTestRedisClient ¶
func NewTestRedisClient() *TestRedisClient
func (*TestRedisClient) Del ¶
func (client *TestRedisClient) Del(key string) (affected int64, err error)
Del removes the specified key. A key is ignored if it does not exist.
func (*TestRedisClient) FlushDb ¶
func (client *TestRedisClient) FlushDb() error
FlushDb delete all the keys of the currently selected DB. This command never fails.
func (*TestRedisClient) Get ¶
func (client *TestRedisClient) Get(key string) (string, error)
Get the value of key. If the key does not exist or isn't a string the special value nil is returned.
func (*TestRedisClient) LLen ¶
func (client *TestRedisClient) LLen(key string) (affected int64, err error)
LLen returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. An error is returned when the value stored at key is not a list.
func (*TestRedisClient) LPush ¶
func (client *TestRedisClient) LPush(key string, values ...string) (total int64, err error)
LPush inserts the specified value at the head of the list stored at key. If key does not exist, it is created as empty list before performing the push operations. When key holds a value that is not a list, an error is returned. It is possible to push multiple elements using a single command call just specifying multiple arguments at the end of the command. Elements are inserted one after the other to the head of the list, from the leftmost element to the rightmost element.
func (*TestRedisClient) LRem ¶
func (client *TestRedisClient) LRem(key string, count int64, value string) (affected int64, err error)
LRem removes the first count occurrences of elements equal to value from the list stored at key. The count argument influences the operation in the following ways: count > 0: Remove elements equal to value moving from head to tail. count < 0: Remove elements equal to value moving from tail to head. count = 0: Remove all elements equal to value. For example, LREM list -2 "hello" will remove the last two occurrences of "hello" in the list stored at list. Note that non-existing keys are treated like empty lists, so when key does not exist, the command will always return 0.
func (*TestRedisClient) LTrim ¶
func (client *TestRedisClient) LTrim(key string, start, stop int64) error
LTrim trims an existing list so that it will contain only the specified range of elements specified. Both start and stop are zero-based indexes, where 0 is the first element of the list (the head), 1 the next element and so on. For example: LTRIM foobar 0 2 will modify the list stored at foobar so that only the first three elements of the list will remain. start and end can also be negative numbers indicating offsets from the end of the list, where -1 is the last element of the list, -2 the penultimate element and so on. Out of range indexes will not produce an error: if start is larger than the end of the list, or start > end, the result will be an empty list (which causes key to be removed). If end is larger than the end of the list, Redis will treat it like the last element of the list
func (*TestRedisClient) RPop ¶
func (client *TestRedisClient) RPop(key string) (value string, err error)
RPop removes and returns one value from the tail of the list stored at key. When key holds a value that is not a list, an error is returned.
func (*TestRedisClient) RPopLPush ¶
func (client *TestRedisClient) RPopLPush(source, destination string) (value string, err error)
RPopLPush atomically returns and removes the last element (tail) of the list stored at source, and pushes the element at the first element (head) of the list stored at destination. For example: consider source holding the list a,b,c, and destination holding the list x,y,z. Executing RPOPLPUSH results in source holding a,b and destination holding c,x,y,z. If source does not exist, the value nil is returned and no operation is performed. If source and destination are the same, the operation is equivalent to removing the last element from the list and pushing it as first element of the list, so it can be considered as a list rotation command.
func (*TestRedisClient) SAdd ¶
func (client *TestRedisClient) SAdd(key, value string) (total int64, err error)
SAdd adds the specified members to the set stored at key. Specified members that are already a member of this set are ignored. If key does not exist, a new set is created before adding the specified members. An error is returned when the value stored at key is not a set.
func (*TestRedisClient) SMembers ¶
func (client *TestRedisClient) SMembers(key string) (members []string, err error)
SMembers returns all the members of the set value stored at key. This has the same effect as running SINTER with one argument key.
func (*TestRedisClient) SRem ¶
func (client *TestRedisClient) SRem(key, value string) (affected int64, err error)
SRem removes the specified members from the set stored at key. Specified members that are not a member of this set are ignored. If key does not exist, it is treated as an empty set and this command returns 0. An error is returned when the value stored at key is not a set.
func (*TestRedisClient) Set ¶
Set sets key to hold the string value. If key already holds a value, it is overwritten, regardless of its type. Any previous time to live associated with the key is discarded on successful SET operation.
func (*TestRedisClient) TTL ¶
func (client *TestRedisClient) TTL(key string) (ttl time.Duration, err error)
TTL returns the remaining time to live of a key that has a timeout. This introspection capability allows a Redis client to check how many seconds a given key will continue to be part of the dataset. In Redis 2.6 or older the command returns -1 if the key does not exist or if the key exist but has no associated expire. Starting with Redis 2.8 the return value in case of error changed: The command returns -2 if the key does not exist. The command returns -1 if the key exists but has no associated expire.
type WithHeader ¶ added in v5.1.0
WithHeader is a Delivery with Header.
Source Files ¶
- batch_consumer.go
- cleaner.go
- connection.go
- consumer.go
- deliveries.go
- delivery.go
- errors.go
- header.go
- queue.go
- rand.go
- redis_client.go
- redis_keys.go
- redis_wrapper.go
- state.go
- state_string.go
- stats.go
- test_batch_consumer.go
- test_connection.go
- test_consumer.go
- test_delivery.go
- test_queue.go
- test_redis_client.go
- test_util.go