rmq

package module
v5.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2023 License: MIT Imports: 18 Imported by: 61

README

Build Status GoDoc

Overview

rmq is short for Redis message queue. It's a message queue system written in Go and backed by Redis.

Basic Usage

Let's take a look at how to use rmq.

Import

Of course you need to import rmq wherever you want to use it.

import "github.com/adjust/rmq/v5"

Connection

Before we get to queues, we first need to establish a connection. Each rmq connection has a name (used in statistics) and Redis connection details including which Redis database to use. The most basic Redis connection uses a TCP connection to a given host and a port:

connection, err := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1, errChan)

It's also possible to access a Redis listening on a Unix socket:

connection, err := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1, errChan)

For more flexible setup you can pass Redis options or create your own Redis client:

connection, err := OpenConnectionWithRedisOptions("my service", redisOptions, errChan)
connection, err := OpenConnectionWithRedisClient("my service", redisClient, errChan)

If the Redis instance can't be reached you will receive an error indicating this.

Please also note the errChan parameter. There is some rmq logic running in the background which can run into Redis errors. If you pass an error channel to the OpenConnection() functions rmq will send those background errors to this channel so you can handle them asynchronously. For more details about this and handling suggestions see the section about handling background errors below.

Connecting to a Redis cluster

In order to connect to a Redis cluster please use OpenClusterConnection():

redisClusterOptions := &redis.ClusterOptions{ /* ... */ }
redisClusterClient := redis.NewClusterClient(redisClusterOptions)
connection, err := OpenClusterConnection("my service", redisClusterClient, errChan)

Note that such an rmq cluster connection uses different Redis than rmq connections opened by OpenConnection() or similar. If you have used a Redis instance with OpenConnection() then it is NOT SAFE to reuse that rmq system by connecting to it via OpenClusterConnection(). The cluster state won't be compatible and this will likely lead to data loss.

If you've previously used OpenConnection() or similar you should only consider using OpenClusterConnection() with a fresh Redis cluster.

Queues

Once we have a connection we can use it to finally access queues. Each queue must have a unique name by which we address it. Queues are created once they are accessed. There is no need to declare them in advance. Here we open a queue named "tasks":

taskQueue, err := connection.OpenQueue("tasks")

Again, possibly Redis errors might be returned.

Producers

An empty queue is boring, let's add some deliveries! Internally all deliveries are saved to Redis lists as strings. This is how you can publish a string payload to a queue:

delivery := "task payload"
err := TaskQueue.Publish(delivery)

In practice, however, it's more common to have instances of some struct that we want to publish to a queue. Assuming task is of some type like Task, this is how to publish the JSON representation of that task:

// create task
taskBytes, err := json.Marshal(task)
if err != nil {
    // handle error
}

err = taskQueue.PublishBytes(taskBytes)

For a full example see example/producer.

Consumers

Now that our queue starts filling, let's add a consumer. After opening the queue as before, we need it to start consuming before we can add consumers.

err := taskQueue.StartConsuming(10, time.Second)

This sets the prefetch limit to 10 and the poll duration to one second. This means the queue will fetch up to 10 deliveries at a time before giving them to the consumers. To avoid idling consumers while the queues are full, the prefetch limit should always be greater than the number of consumers you are going to add. If the queue gets empty, the poll duration sets how long rmq will wait before checking for new deliveries in Redis.

Once this is set up, we can actually add consumers to the consuming queue.

taskConsumer := &TaskConsumer{}
name, err := taskQueue.AddConsumer("task-consumer", taskConsumer)

To uniquely identify each consumer internally rmq creates a random name with the given prefix. For example in this case name might be task-consumer-WB1zaq. This name is only used in statistics.

In our example above the injected taskConsumer (of type *TaskConsumer) must implement the rmq.Consumer interface. For example:

func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {
    var task Task
    if err = json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
        // handle json error
        if err := delivery.Reject(); err != nil {
            // handle reject error
        }
        return
    }

    // perform task
    log.Printf("performing task %s", task)
    if err := delivery.Ack(); err != nil {
        // handle ack error
    }
}

First we unmarshal the JSON package found in the delivery payload. If this fails we reject the delivery. Otherwise we perform the task and ack the delivery.

If you don't actually need a consumer struct you can use AddConsumerFunc instead and pass a consumer function which handles an rmq.Delivery:

name, err := taskQueue.AddConsumerFunc(func(delivery rmq.Delivery) {
    // handle delivery and call Ack() or Reject() on it
})

Please note that delivery.Ack() and similar functions have a built-in retry mechanism which will block your consumers in some cases. This is because because failing to acknowledge a delivery is potentially dangerous. For details see the section about background errors below.

For a full example see example/consumer.

Consumer Lifecycle

As described above you can add consumers to a queue. For each consumer rmq takes one of the prefetched unacked deliveries from the delivery channel and passes it to the consumer's Consume() function. The next delivery will only be passed to the same consumer once the prior Consume() call returns. So each consumer will only be consuming a single delivery at any given time.

Furthermore each Consume() call is expected to call either delivery.Ack(), delivery.Reject() or delivery.Push() (see below). If that's not the case these deliveries will remain unacked and the prefetch goroutine won't make progress after a while. So make sure you always call exactly one of those functions in your Consume() implementations.

Background Errors

It's recommended to inject an error channel into the OpenConnection() functions. This section describes it's purpose and how you might use it to monitor rmq background Redis errors.

There are three sources of background errors which rmq detects (and handles internally):

  1. The OpenConnection() functions spawn a goroutine which keeps a heartbeat Redis key alive. This is important so that the cleaner (see below) can tell which connections are still alive and must not be cleaned yet. If the heartbeat goroutine fails to update the heartbeat Redis key repeatedly foo too long the cleaner might clean up the connection prematurely. To avoid this the connection will automatically stop all consumers after 45 consecutive heartbeat errors. This magic number is based on the details of the heartbeat key: The heartbeat tries to update the key every second with a TTL of one minute. So only after 60 failed attempts the heartbeat key would be dead.

    Every time this goroutine runs into a Redis error it gets send to the error channel as HeartbeatError.

  2. The StartConsuming() function spawns a goroutine which is responsible for prefetching deliveries from the Redis ready list and moving them into a delivery channel. This delivery channels feeds into your consumers Consume() functions. If the prefetch goroutine runs into Redis errors this basically means that there won't be new deliveries being sent to your consumers until it can fetch new ones. So these Redis errors are not dangerous, it just means that your consumers will start idling until the Redis connection recovers.

    Every time this goroutine runs into a Redis error it gets send to the error channel as ConsumeError.

  3. The delivery functions Ack(), Reject() and Push() have a built-in retry mechanism. This is because failing to acknowledge a delivery is potentially dangerous. The consumer has already handled the delivery, so if it can't ack it the cleaner might end up moving it back to the ready list so another consumer might end up consuming it again in the future, leading to double delivery.

    So if a delivery failed to be acked because of a Redis error the Ack() call will block and retry once a second until it either succeeds or until consuming gets stopped (see below). In the latter case the Ack() call will return rmq.ErrorConsumingStopped which you should handle in your consume function. For example you might want to log about the delivery so you can manually remove it from the unacked or ready list before you start new consumers. Or at least you can know which deliveries might end up being consumed twice.

    Every time these functions runs into a Redis error it gets send to the error channel as DeliveryError.

Each of those error types has a field Count which tells you how often the operation failed consecutively. This indicates for how long the affected Redis instance has been unavailable. One general way of using this information might be to have metrics about the error types including the error count so you can keep track of how stable your Redis instances and connections are. By monitoring this you might learn about instabilities before they affect your services in significant ways.

Below is some more specific advice on handling the different error cases outlined above. Keep in mind though that all of those errors are likely to happen at the same time, as Redis tends to be up or down completely. But if you're using multi Redis instance setup like nutcracker you might see some of them in isolation from the others.

  1. HeartbeatErrors: Once err.Count equals HeartbeatErrorLimit you should know that the consumers of this connection will stop consuming. And they won't restart consuming on their own. This is a condition you should closely monitor because this means you will have to restart your service in order to resume consuming. Before restarting you should check your Redis instance.

  2. ConsumeError: These are mostly informational. As long as those errors keep happening the consumers will effectively be paused. But once these operations start succeeding again the consumers will resume consumers on their own.

  3. DeliveryError: When you see deliveries failing to ack repeatedly this also means your consumers won't make progress as they will keep retrying to ack pending deliveries before starting to consume new ones. As long as this keeps happening you should avoid stopping the service if you can. That is because the already consumed by not yet unacked deliveries will be returned to ready be the cleaner afterwards, which leads to double delivery. So ideally you try to get Redis connection up again as long as the deliveries are still trying to ack. Once acking works again it's safe to restart again.

    More realistically, if you still need to stop the service when Redis is down, keep in mind that calling StopConsuming() will make the blocking Ack() calls return with ErrorConsumingStopped, so you can handle that case to make an attempt to either avoid the double delivery or at least track it for future investigation.

Advanced Usage

Batch Consumers

Sometimes it's useful to have consumers work on batches of deliveries instead of individual ones. For example for bulk database inserts. In those cases you can use AddBatchConsumer():

batchConsumer := &MyBatchConsumer{}
name, err := taskQueue.AddBatchConsumer("my-consumer", 100, time.Second, batchConsumer)

In this example we create a batch consumer which will receive batches of up to 100 deliveries. We set the batchTimeout to one second, so if there are less than 100 deliveries per second we will still consume at least one batch per second (which would contain less than 100 deliveries).

The rmq.BatchConsumer interface is very similar to rmq.Consumer.

func (consumer *MyBatchConsumer) Consume(batch rmq.Deliveries) {
    payloads := batch.Payloads()
    // handle payloads
    if errors := batch.Ack(); len(errors) > 0 {
        // handle ack errors
    }
}

Note that batch.Ack() acknowledges all deliveries in the batch. It's also possible to ack some of the deliveries and reject the rest. It uses the same retry mechanism per delivery as discussed above. If some of the deliveries continue to fail to ack when consuming gets stopped (see below), then batch.Ack() will return an error map map[int]error. For each entry in this map the key will be the index of the delivery which failed to ack and the value will be the error it ran into. That way you can map the errors back to the deliveries to know which deliveries are at risk of being consumed again in the future as discussed above.

For a full example see example/batch_consumer.

Push Queues

Another thing which can be useful is a mechanism for retries. Let's say you have tasks which can fail for external reasons but you'd like to retry them a few times after a while before you give up. In that case you can set up a chain of push queues like this:

incomingQ -> pushQ1 -> pushQ2

In the queue setup code it would look like this (error handling omitted for brevity):

incomingQ, err := connection.OpenQueue("incomingQ")
pushQ1, err := connection.OpenQueue("pushQ1")
pushQ2, err := connection.OpenQueue("pushQ2")
incomingQ.SetPushQueue(pushQ1)
pushQ1.SetPushQueue(pushQ2)
_, err := incomingQ.AddConsumer("incomingQ", NewConsumer())
_, err := pushQ1.AddConsumer("pushQ1", NewConsumer())
_, err := pushQ2.AddConsumer("pushQ2", NewConsumer())

If you have set up your queues like this, you can now call delivery.Push() in your Consume() function to push the delivery from the consuming queue to the associated push queue. So if consumption fails on incomingQ, then the delivery would be moved to pushQ1 and so on. If you have the consumers wait until the deliveries have a certain age you can use this pattern to retry after certain durations.

Note that delivery.Push() has the same affect as delivery.Reject() if the queue has no push queue set up. So in our example above, if the delivery fails in the consumer on pushQ2, then the Push() call will reject the delivery.

Stop Consuming

If you want to stop consuming from the queue, you can call StopConsuming():

finishedChan := taskQueue.StopConsuming()

When StopConsuming() is called, it will immediately stop fetching more deliveries from Redis and won't send any more of the already prefetched deliveries to consumers.

In the background it will make pending Ack() calls return rmq.ErrorConsumingStopped if they still run into Redis errors (see above) and wait for all consumers to finish consuming their current delivery before closing the returned finishedChan. So while StopConsuming() returns immediately, you can wait on the returned channel until all consumers are done:

<-finishedChan

You can also stop consuming on all queues in your connection:

finishedChan := connection.StopAllConsuming()

Wait on the finishedChan to wait for all consumers on all queues to finish.

This is useful to implement a graceful shutdown of a consumer service. Please note that after calling StopConsuming() the queue might not be in a state where you can add consumers and call StartConsuming() again. If you have a use case where you actually need that sort of flexibility, please let us know. Currently for each queue you are only supposed to call StartConsuming() and StopConsuming() at most once.

Return Rejected Deliveries

Even if you don't have a push queue setup there are cases where you need to consume previously failed deliveries again. For example an external dependency might have an issue or you might have deployed a broken consumer service which rejects all deliveries for some reason.

In those cases you would wait for the external party to recover or fix your mistake to get ready to reprocess the deliveries again. Now you can return the deliveries by opening affected queue and call ReturnRejected():

returned, err := queue.ReturnRejected(10000)

In this case we ask rmq to return up to 10k deliveries from the rejected list to the ready list. To return all of them you can pass math.MaxInt64.

If there was no error it returns the number of deliveries that were moved.

If you find yourself doing this regularly on some queues consider setting up a push queue to automatically retry failed deliveries regularly.

See example/returner

Purge Rejected Deliveries

You might run into the case where you have rejected deliveries which you don't intend to retry again for one reason or another. In those cases you can clear the full rejected list by calling PurgeRejected():

count, err := queue.PurgeRejected()

It returns the number of purged deliveries.

Similarly, there's a function to clear the ready list of deliveries:

count, err := queue.PurgeReady()

See example/purger.

Cleaner

You should regularly run a queue cleaner to make sure no unacked deliveries are stuck in the queue system. The background is that a consumer service prefetches deliveries by moving them from the ready list to an unacked list associated with the queue connection. If the consumer dies by crashing or even by being gracefully shut down by calling StopConsuming(), the unacked deliveries will remain in that Redis list.

If you run a queue cleaner regularly it will detect queue connections whose heartbeat expired and will clean up all their consumer queues by moving their unacked deliveries back to the ready list.

Although it should be safe to run multiple cleaners, it's recommended to run exactly one instance per queue system and have it trigger the cleaning process regularly, like once a minute.

See example/cleaner.

Header

Redis protocol does not define a specific way to pass additional data like header. However, there is often need to pass them (for example for traces propagation).

This implementation injects optional header values marked with a signature into payload body during publishing. When message is consumed, if signature is present, header and original payload are extracted from augmented payload.

Header is defined as http.Header for better interoperability with existing libraries, for example with propagation.HeaderCarrier.

 // ....
 
 h := make(http.Header)
 h.Set("X-Baz", "quux")

 // You can add header to your payload during publish.
 _ = pub.Publish(rmq.PayloadWithHeader(`{"foo":"bar"}`, h))

 // ....

 _, _ = con.AddConsumerFunc("tag", func(delivery rmq.Delivery) {
     // And receive header back in consumer.
     delivery.(rmq.WithHeader).Header().Get("X-Baz") // "quux"
     
     // ....
 })

Adding a header is an explicit opt-in operation and so it does not affect library's backwards compatibility by default (when not used).

Please note that adding header may lead to compatibility issues if:

  • consumer is built with older version of rmq when publisher has already started using header, this can be avoided by upgrading consumers before publishers;
  • consumer is not using rmq (other libs, low level tools like redis-cli) and is not aware of payload format extension.

Testing Included

To simplify testing of queue producers and consumers we include test mocks.

Test Connection

As before, we first need a queue connection, but this time we use a rmq.TestConnection that doesn't need any connection settings.

testConn := rmq.NewTestConnection()

If you are using a testing framework that uses test suites, you can reuse that test connection by setting it up once for the suite and resetting it with testConn.Reset() before each test.

Producer Tests

Now let's say we want to test the function publishTask() that creates a task and publishes it to a queue from that connection.

// call the function that should publish a task
publishTask(testConn)

// check that the task is published
assert.Equal(t, "task payload", suite.testConn.GetDelivery("tasks", 0))

The assert.Equal part is from testify, but it will look similar for other testing frameworks. Given a rmq.TestConnection, we can check the deliveries that were published to its queues (since the last Reset() call) with GetDelivery(queueName, index). In this case we want to extract the first (and possibly only) delivery that was published to queue tasks and just check the payload string.

If the payload is JSON again, the unmarshalling and check might look like this:

var task Task
err := json.Unmarshal([]byte(suite.testConn.GetDelivery("tasks", 0)), &task)
assert.NoError(t, err)
assert.NotNil(t, task)
assert.Equal(t, "value", task.Property)

If you expect a producer to create multiple deliveries you can use different indexes to access them all.

assert.Equal(t, "task1", suite.testConn.GetDelivery("tasks", 0))
assert.Equal(t, "task2", suite.testConn.GetDelivery("tasks", 1))

For convenience there's also a function GetDeliveries that returns all published deliveries to a queue as string array.

assert.Equal(t, []string{"task1", "task2"}, suite.testConn.GetDeliveries("tasks"))

These examples assume that you inject the rmq.Connection into your testable functions. If you inject instances of rmq.Queue instead, you can use rmq.TestQueue instances in tests and access their LastDeliveries (since Reset()) directly.

Consumer Tests

Testing consumers is a bit easier because consumers must implement the rmq.Consumer interface. In the tests just create an rmq.TestDelivery and pass it to your Consume() function. This example creates a test delivery from a string and then checks that the delivery was acked.

consumer := &TaskConsumer{}
delivery := rmq.NewTestDeliveryString("task payload")

consumer.Consume(delivery)

assert.Equal(t, rmq.Acked, delivery.State)

The State field will always be one of these values:

  • rmq.Acked: The delivery was acked
  • rmq.Rejected: The delivery was rejected
  • rmq.Pushed: The delivery was pushed (see below)
  • rmq.Unacked: Nothing of the above

If your packages are JSON marshalled objects, then you can create test deliveries out of those like this:

task := Task{Property: "bad value"}
delivery := rmq.NewTestDelivery(task)

Integration Tests

If you want to write integration tests which exercise both producers and consumers at the same time, you can use the rmq.OpenConnectionWithTestRedisClient constructor. It returns a real rmq.Connection instance which is backed by an in-memory Redis client implementation. That way it behaves exactly as in production, just without the durability of a real Redis client. Don't use this in production!

Statistics

Given a connection, you can call connection.CollectStats() to receive rmq.Stats about all open queues, connections and consumers. If you run example/handler you can see what's available:

In this example you see 5 connections consuming task_kind1, each wich 5 consumers each. They have a total of 1007 packages unacked. Below the marker you see connections which are not consuming. One of the handler connections died because I stopped the handler. Running the cleaner would clean that up (see below).

Prometheus

If you are using Prometheus, rmqprom collects statistics about all open queues and exposes them as Prometheus metrics.

Documentation

Index

Examples

Constants

View Source
const (
	HeartbeatErrorLimit = 45 // stop consuming after this many heartbeat errors
)

Variables

View Source
var (
	ErrorNotFound         = errors.New("entity not found") // entitify being connection/queue/delivery
	ErrorAlreadyConsuming = errors.New("must not call StartConsuming() multiple times")
	ErrorNotConsuming     = errors.New("must call StartConsuming() before adding consumers")
	ErrorConsumingStopped = errors.New("consuming stopped")
)

Functions

func ActiveSign

func ActiveSign(active bool) string

func ExtractHeaderAndPayload added in v5.1.0

func ExtractHeaderAndPayload(payload string) (http.Header, string, error)

ExtractHeaderAndPayload splits augmented payload into header and original payload if specific signature is present.

func PayloadBytesWithHeader added in v5.1.0

func PayloadBytesWithHeader(payload []byte, header http.Header) []byte

PayloadBytesWithHeader creates payload bytes slice with header.

func PayloadWithHeader added in v5.1.0

func PayloadWithHeader(payload string, header http.Header) string

PayloadWithHeader creates a payload string with header.

Example
var (
	pub, con rmq.Queue
)

// ....

h := make(http.Header)
h.Set("X-Baz", "quux")

// You can add header to your payload during publish.
_ = pub.Publish(rmq.PayloadWithHeader(`{"foo":"bar"}`, h))

// ....

_, _ = con.AddConsumerFunc("tag", func(delivery rmq.Delivery) {
	// And receive header back in consumer.
	delivery.(rmq.WithHeader).Header().Get("X-Baz") // "quux"

	// ....
})
Output:

Types

type BatchConsumer

type BatchConsumer interface {
	Consume(batch Deliveries)
}

type BatchConsumerFunc added in v5.1.2

type BatchConsumerFunc func(Deliveries)

func (BatchConsumerFunc) Consume added in v5.1.2

func (batchConsumerFunc BatchConsumerFunc) Consume(batch Deliveries)

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

func NewCleaner

func NewCleaner(connection Connection) *Cleaner

func (*Cleaner) Clean

func (cleaner *Cleaner) Clean() (returned int64, err error)

Clean cleans the connection of the cleaner. This is useful to make sure no deliveries get lost. The main use case is if your consumers get restarted there will be unacked deliveries assigned to the connection. Once the heartbeat of that connection dies the cleaner can recognize that and remove those unacked deliveries back to the ready list. If there was no error it returns the number of deliveries which have been returned from unacked lists to ready lists across all cleaned connections and queues.

type Connection

type Connection interface {
	OpenQueue(name string) (Queue, error)
	CollectStats(queueList []string) (Stats, error)
	GetOpenQueues() ([]string, error)
	StopAllConsuming() <-chan struct{}
	// contains filtered or unexported methods
}

Connection is an interface that can be used to test publishing

func OpenClusterConnection added in v5.2.0

func OpenClusterConnection(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error)

OpenClusterConnection: Same as OpenConnectionWithRedisClient, but using Redis hash tags {} instead of [].

func OpenConnection

func OpenConnection(tag string, network string, address string, db int, errChan chan<- error) (Connection, error)

OpenConnection opens and returns a new connection

func OpenConnectionWithRedisClient

func OpenConnectionWithRedisClient(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error)

OpenConnectionWithRedisClient opens and returns a new connection This can be used to passa redis.ClusterClient.

func OpenConnectionWithRedisOptions added in v5.2.0

func OpenConnectionWithRedisOptions(tag string, redisOption *redis.Options, errChan chan<- error) (Connection, error)

OpenConnectionWithRedisOptions allows you to pass more flexible options

func OpenConnectionWithRmqRedisClient

func OpenConnectionWithRmqRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (Connection, error)

OpenConnectionWithRmqRedisClient: If you would like to use a redis client other than the ones supported in the constructors above, you can implement the RedisClient interface yourself

func OpenConnectionWithTestRedisClient

func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (Connection, error)

OpenConnectionWithTestRedisClient opens and returns a new connection which uses a test redis client internally. This is useful in integration tests.

type ConnectionStat

type ConnectionStat struct {
	// contains filtered or unexported fields
}

func (ConnectionStat) String

func (stat ConnectionStat) String() string

type ConnectionStats

type ConnectionStats map[string]ConnectionStat

type ConsumeError

type ConsumeError struct {
	RedisErr error
	Count    int // number of consecutive errors
}

func (*ConsumeError) Error

func (e *ConsumeError) Error() string

func (*ConsumeError) Unwrap

func (e *ConsumeError) Unwrap() error

type Consumer

type Consumer interface {
	Consume(delivery Delivery)
}

type ConsumerFunc

type ConsumerFunc func(Delivery)

func (ConsumerFunc) Consume

func (consumerFunc ConsumerFunc) Consume(delivery Delivery)

type Deliveries

type Deliveries []Delivery

func (Deliveries) Ack

func (deliveries Deliveries) Ack() (errMap map[int]error)

func (Deliveries) Payloads

func (deliveries Deliveries) Payloads() []string

func (Deliveries) Push

func (deliveries Deliveries) Push() (errMap map[int]error)

func (Deliveries) Reject

func (deliveries Deliveries) Reject() (errMap map[int]error)

type Delivery

type Delivery interface {
	Payload() string

	Ack() error
	Reject() error
	Push() error
}

type DeliveryError

type DeliveryError struct {
	Delivery Delivery
	RedisErr error
	Count    int // number of consecutive errors
}

func (*DeliveryError) Error

func (e *DeliveryError) Error() string

func (*DeliveryError) Unwrap

func (e *DeliveryError) Unwrap() error

type HeartbeatError

type HeartbeatError struct {
	RedisErr error
	Count    int // number of consecutive errors
}

func (*HeartbeatError) Error

func (e *HeartbeatError) Error() string

func (*HeartbeatError) Unwrap

func (e *HeartbeatError) Unwrap() error

type Queue

type Queue interface {
	Publish(payload ...string) error
	PublishBytes(payload ...[]byte) error
	SetPushQueue(pushQueue Queue)
	StartConsuming(prefetchLimit int64, pollDuration time.Duration) error
	StopConsuming() <-chan struct{}
	AddConsumer(tag string, consumer Consumer) (string, error)
	AddConsumerFunc(tag string, consumerFunc ConsumerFunc) (string, error)
	AddBatchConsumer(tag string, batchSize int64, timeout time.Duration, consumer BatchConsumer) (string, error)
	AddBatchConsumerFunc(tag string, batchSize int64, timeout time.Duration, batchConsumerFunc BatchConsumerFunc) (string, error)
	PurgeReady() (int64, error)
	PurgeRejected() (int64, error)
	ReturnUnacked(max int64) (int64, error)
	ReturnRejected(max int64) (int64, error)
	Destroy() (readyCount, rejectedCount int64, err error)
	Drain(count int64) ([]string, error)
	// contains filtered or unexported methods
}

type QueueStat

type QueueStat struct {
	ReadyCount    int64 `json:"ready"`
	RejectedCount int64 `json:"rejected"`
	// contains filtered or unexported fields
}

func NewQueueStat

func NewQueueStat(readyCount, rejectedCount int64) QueueStat

func (QueueStat) ConnectionCount

func (stat QueueStat) ConnectionCount() int64

func (QueueStat) ConsumerCount

func (stat QueueStat) ConsumerCount() int64

func (QueueStat) String

func (stat QueueStat) String() string

func (QueueStat) UnackedCount

func (stat QueueStat) UnackedCount() int64

type QueueStats

type QueueStats map[string]QueueStat

type RedisClient

type RedisClient interface {
	// simple keys
	Set(key string, value string, expiration time.Duration) error
	Del(key string) (affected int64, err error)
	TTL(key string) (ttl time.Duration, err error)

	// lists
	LPush(key string, value ...string) (total int64, err error)
	LLen(key string) (affected int64, err error)
	LRem(key string, count int64, value string) (affected int64, err error)
	LTrim(key string, start, stop int64) error
	RPopLPush(source, destination string) (value string, err error)
	RPop(key string) (value string, err error)

	// sets
	SAdd(key, value string) (total int64, err error)
	SMembers(key string) (members []string, err error)
	SRem(key, value string) (affected int64, err error)

	// special
	FlushDb() error
}

type RedisWrapper

type RedisWrapper struct {
	// contains filtered or unexported fields
}

func (RedisWrapper) Del

func (wrapper RedisWrapper) Del(key string) (affected int64, err error)

func (RedisWrapper) FlushDb

func (wrapper RedisWrapper) FlushDb() error

func (RedisWrapper) LLen

func (wrapper RedisWrapper) LLen(key string) (affected int64, err error)

func (RedisWrapper) LPush

func (wrapper RedisWrapper) LPush(key string, value ...string) (total int64, err error)

func (RedisWrapper) LRem

func (wrapper RedisWrapper) LRem(key string, count int64, value string) (affected int64, err error)

func (RedisWrapper) LTrim

func (wrapper RedisWrapper) LTrim(key string, start, stop int64) error

func (RedisWrapper) RPop

func (wrapper RedisWrapper) RPop(key string) (value string, err error)

func (RedisWrapper) RPopLPush

func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, err error)

func (RedisWrapper) SAdd

func (wrapper RedisWrapper) SAdd(key, value string) (total int64, err error)

func (RedisWrapper) SMembers

func (wrapper RedisWrapper) SMembers(key string) (members []string, err error)

func (RedisWrapper) SRem

func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error)

func (RedisWrapper) Set

func (wrapper RedisWrapper) Set(key string, value string, expiration time.Duration) error

func (RedisWrapper) TTL

func (wrapper RedisWrapper) TTL(key string) (ttl time.Duration, err error)

type State

type State int
const (
	Unacked State = iota
	Acked
	Rejected
	Pushed
)

func (State) String

func (i State) String() string

type Stats

type Stats struct {
	QueueStats QueueStats `json:"queues"`
	// contains filtered or unexported fields
}

func CollectStats

func CollectStats(queueList []string, mainConnection Connection) (Stats, error)

func NewStats

func NewStats() Stats

func (Stats) GetHtml

func (stats Stats) GetHtml(layout, refresh string) string

func (Stats) String

func (stats Stats) String() string

type TestBatchConsumer

type TestBatchConsumer struct {

	// Deprecated: use Last() to avoid data races.
	LastBatch Deliveries
	// Deprecated use Consumed() to avoid data races.
	ConsumedCount int64
	AutoFinish    bool
	// contains filtered or unexported fields
}

func NewTestBatchConsumer

func NewTestBatchConsumer() *TestBatchConsumer

func (*TestBatchConsumer) Consume

func (consumer *TestBatchConsumer) Consume(batch Deliveries)

func (*TestBatchConsumer) Consumed added in v5.1.0

func (consumer *TestBatchConsumer) Consumed() int64

func (*TestBatchConsumer) Finish

func (consumer *TestBatchConsumer) Finish()

func (*TestBatchConsumer) Last added in v5.1.0

func (consumer *TestBatchConsumer) Last() Deliveries

type TestConnection

type TestConnection struct {
	// contains filtered or unexported fields
}

func NewTestConnection

func NewTestConnection() TestConnection

func (TestConnection) CollectStats

func (TestConnection) CollectStats([]string) (Stats, error)

func (TestConnection) GetDeliveries

func (connection TestConnection) GetDeliveries(queueName string) []string

func (TestConnection) GetDelivery

func (connection TestConnection) GetDelivery(queueName string, index int) string

func (TestConnection) GetOpenQueues

func (TestConnection) GetOpenQueues() ([]string, error)

func (TestConnection) OpenQueue

func (connection TestConnection) OpenQueue(name string) (Queue, error)

func (TestConnection) Reset

func (connection TestConnection) Reset()

func (TestConnection) StopAllConsuming

func (TestConnection) StopAllConsuming() <-chan struct{}

type TestConsumer

type TestConsumer struct {
	AutoAck       bool
	AutoFinish    bool
	SleepDuration time.Duration

	// Deprecated: use Last() to avoid data races.
	LastDelivery Delivery
	// Deprecated: use Deliveries() to avoid data races.
	LastDeliveries []Delivery
	// contains filtered or unexported fields
}

func NewTestConsumer

func NewTestConsumer(name string) *TestConsumer

func (*TestConsumer) Consume

func (consumer *TestConsumer) Consume(delivery Delivery)

func (*TestConsumer) Deliveries added in v5.1.0

func (consumer *TestConsumer) Deliveries() []Delivery

func (*TestConsumer) Finish

func (consumer *TestConsumer) Finish()

func (*TestConsumer) FinishAll

func (consumer *TestConsumer) FinishAll()

func (*TestConsumer) Last added in v5.1.0

func (consumer *TestConsumer) Last() Delivery

func (*TestConsumer) String

func (consumer *TestConsumer) String() string

type TestDelivery

type TestDelivery struct {
	State State
	// contains filtered or unexported fields
}

func NewTestDelivery

func NewTestDelivery(content interface{}) *TestDelivery

func NewTestDeliveryString

func NewTestDeliveryString(payload string) *TestDelivery

func (*TestDelivery) Ack

func (delivery *TestDelivery) Ack() error

func (*TestDelivery) Payload

func (delivery *TestDelivery) Payload() string

func (*TestDelivery) Push

func (delivery *TestDelivery) Push() error

func (*TestDelivery) Reject

func (delivery *TestDelivery) Reject() error

type TestQueue

type TestQueue struct {
	LastDeliveries []string
	// contains filtered or unexported fields
}

func NewTestQueue

func NewTestQueue(name string) *TestQueue

func (*TestQueue) AddBatchConsumer

func (*TestQueue) AddBatchConsumer(string, int64, time.Duration, BatchConsumer) (string, error)

func (*TestQueue) AddBatchConsumerFunc added in v5.1.2

func (*TestQueue) AddBatchConsumerFunc(string, int64, time.Duration, BatchConsumerFunc) (string, error)

func (*TestQueue) AddConsumer

func (*TestQueue) AddConsumer(string, Consumer) (string, error)

func (*TestQueue) AddConsumerFunc

func (*TestQueue) AddConsumerFunc(string, ConsumerFunc) (string, error)

func (*TestQueue) Destroy

func (*TestQueue) Destroy() (int64, int64, error)

func (*TestQueue) Drain

func (*TestQueue) Drain(count int64) ([]string, error)

func (*TestQueue) Publish

func (queue *TestQueue) Publish(payload ...string) error

func (*TestQueue) PublishBytes

func (queue *TestQueue) PublishBytes(payload ...[]byte) error

func (*TestQueue) PurgeReady

func (*TestQueue) PurgeReady() (int64, error)

func (*TestQueue) PurgeRejected

func (*TestQueue) PurgeRejected() (int64, error)

func (*TestQueue) Reset

func (queue *TestQueue) Reset()

func (*TestQueue) ReturnRejected

func (*TestQueue) ReturnRejected(int64) (int64, error)

func (*TestQueue) ReturnUnacked

func (*TestQueue) ReturnUnacked(int64) (int64, error)

func (*TestQueue) SetPushQueue

func (*TestQueue) SetPushQueue(Queue)

func (*TestQueue) StartConsuming

func (*TestQueue) StartConsuming(int64, time.Duration) error

func (*TestQueue) StopConsuming

func (*TestQueue) StopConsuming() <-chan struct{}

func (*TestQueue) String

func (queue *TestQueue) String() string

type TestRedisClient

type TestRedisClient struct {
	// contains filtered or unexported fields
}

TestRedisClient is a mock for redis

func NewTestRedisClient

func NewTestRedisClient() *TestRedisClient

func (*TestRedisClient) Del

func (client *TestRedisClient) Del(key string) (affected int64, err error)

Del removes the specified key. A key is ignored if it does not exist.

func (*TestRedisClient) FlushDb

func (client *TestRedisClient) FlushDb() error

FlushDb delete all the keys of the currently selected DB. This command never fails.

func (*TestRedisClient) Get

func (client *TestRedisClient) Get(key string) (string, error)

Get the value of key. If the key does not exist or isn't a string the special value nil is returned.

func (*TestRedisClient) LLen

func (client *TestRedisClient) LLen(key string) (affected int64, err error)

LLen returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. An error is returned when the value stored at key is not a list.

func (*TestRedisClient) LPush

func (client *TestRedisClient) LPush(key string, values ...string) (total int64, err error)

LPush inserts the specified value at the head of the list stored at key. If key does not exist, it is created as empty list before performing the push operations. When key holds a value that is not a list, an error is returned. It is possible to push multiple elements using a single command call just specifying multiple arguments at the end of the command. Elements are inserted one after the other to the head of the list, from the leftmost element to the rightmost element.

func (*TestRedisClient) LRem

func (client *TestRedisClient) LRem(key string, count int64, value string) (affected int64, err error)

LRem removes the first count occurrences of elements equal to value from the list stored at key. The count argument influences the operation in the following ways: count > 0: Remove elements equal to value moving from head to tail. count < 0: Remove elements equal to value moving from tail to head. count = 0: Remove all elements equal to value. For example, LREM list -2 "hello" will remove the last two occurrences of "hello" in the list stored at list. Note that non-existing keys are treated like empty lists, so when key does not exist, the command will always return 0.

func (*TestRedisClient) LTrim

func (client *TestRedisClient) LTrim(key string, start, stop int64) error

LTrim trims an existing list so that it will contain only the specified range of elements specified. Both start and stop are zero-based indexes, where 0 is the first element of the list (the head), 1 the next element and so on. For example: LTRIM foobar 0 2 will modify the list stored at foobar so that only the first three elements of the list will remain. start and end can also be negative numbers indicating offsets from the end of the list, where -1 is the last element of the list, -2 the penultimate element and so on. Out of range indexes will not produce an error: if start is larger than the end of the list, or start > end, the result will be an empty list (which causes key to be removed). If end is larger than the end of the list, Redis will treat it like the last element of the list

func (*TestRedisClient) RPop

func (client *TestRedisClient) RPop(key string) (value string, err error)

RPop removes and returns one value from the tail of the list stored at key. When key holds a value that is not a list, an error is returned.

func (*TestRedisClient) RPopLPush

func (client *TestRedisClient) RPopLPush(source, destination string) (value string, err error)

RPopLPush atomically returns and removes the last element (tail) of the list stored at source, and pushes the element at the first element (head) of the list stored at destination. For example: consider source holding the list a,b,c, and destination holding the list x,y,z. Executing RPOPLPUSH results in source holding a,b and destination holding c,x,y,z. If source does not exist, the value nil is returned and no operation is performed. If source and destination are the same, the operation is equivalent to removing the last element from the list and pushing it as first element of the list, so it can be considered as a list rotation command.

func (*TestRedisClient) SAdd

func (client *TestRedisClient) SAdd(key, value string) (total int64, err error)

SAdd adds the specified members to the set stored at key. Specified members that are already a member of this set are ignored. If key does not exist, a new set is created before adding the specified members. An error is returned when the value stored at key is not a set.

func (*TestRedisClient) SMembers

func (client *TestRedisClient) SMembers(key string) (members []string, err error)

SMembers returns all the members of the set value stored at key. This has the same effect as running SINTER with one argument key.

func (*TestRedisClient) SRem

func (client *TestRedisClient) SRem(key, value string) (affected int64, err error)

SRem removes the specified members from the set stored at key. Specified members that are not a member of this set are ignored. If key does not exist, it is treated as an empty set and this command returns 0. An error is returned when the value stored at key is not a set.

func (*TestRedisClient) Set

func (client *TestRedisClient) Set(key string, value string, expiration time.Duration) error

Set sets key to hold the string value. If key already holds a value, it is overwritten, regardless of its type. Any previous time to live associated with the key is discarded on successful SET operation.

func (*TestRedisClient) TTL

func (client *TestRedisClient) TTL(key string) (ttl time.Duration, err error)

TTL returns the remaining time to live of a key that has a timeout. This introspection capability allows a Redis client to check how many seconds a given key will continue to be part of the dataset. In Redis 2.6 or older the command returns -1 if the key does not exist or if the key exist but has no associated expire. Starting with Redis 2.8 the return value in case of error changed: The command returns -2 if the key does not exist. The command returns -1 if the key exists but has no associated expire.

type WithHeader added in v5.1.0

type WithHeader interface {
	Header() http.Header
}

WithHeader is a Delivery with Header.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL