package module
v1.0.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2021 License: MIT Imports: 10 Imported by: 1


Hard copy of github.com/adjust/rmq, with delay functionality added

IMPORTANT: This package is not for import to your package. You should import github.com/best-expendables/redis-queue

Basic Usage

Lets take a look at how to use rmq.


Of course you need to import rmq wherever you want to use it.

import "github.com/best-expendables/rmq"


Before we get to queues, we first need to establish a connection. Each rmq connection has a name (used in statistics) and Redis connection details including which Redis database to use. The most basic Redis connection uses a TCP connection to a given host and a port.

connection := rmq.OpenConnection("my service", "tcp", "localhost:6379", 1)

But it's also possible to access a Redis listening on a Unix socket.

connection := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1)

Note: rmq panics on Redis connection errors. Your producers and consumers will crash if Redis goes down. Please let us know if you would see this handled differently.


Once we have a connection we can use it to finally access queues. Each queue must have a unique name by which we address it. Queues are created once they are accessed. There is no need to declare them in advance. Here we open a queue named "tasks":

taskQueue := connection.OpenQueue("tasks")


An empty queue is boring, lets add some deliveries! Internally all deliveries are saved to Redis as strings. This is how you can publish a string payload to a queue:

delivery := "task payload"

In practice, however, it's more common to have instances of some struct that we want to publish to a queue. Assuming task is of some type like Kind, this is how to publish the JSON representation of that task:

// create task
taskBytes, err := json.Marshal(task)
if err != nil {
    // handle error


For a full example see example/producer.go


Now that our queue starts filling, lets add a consumer. After opening the queue as before, we need it to start consuming before we can add consumers.

taskQueue.StartConsuming(10, time.Second)

This sets the prefetch limit to 10 and the poll duration to one second. This means the queue will fetch up to 10 deliveries at a time before giving them to the consumers. To avoid idling producers in times of full queues, the prefetch limit should always be greater than the number of consumers you are going to add. If the queue gets empty, the poll duration sets how long to wait before checking for new deliveries in Redis.

Once this is set up, we can actually add consumers to the consuming queue.

taskConsumer := &TaskConsumer{}
taskQueue.AddConsumer("task consumer", taskConsumer)

For our example this assumes that you have a struct TaskConsumer that implements the rmq.Consumer interface like this:

func (consumer *TaskConsumer) Consume(delivery rmq.Delivery) {
    var task Task
    if err = json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
        // handle error

    // perform task
    log.Printf("performing task %s", task)

First we unmarshal the JSON package found in the delivery payload. If this fails we reject the delivery, otherwise we perform the task and ack the delivery.

For a full example see example/consumer.go

Testing Included

To simplify testing of queue producers and consumers we include test mocks.

Test Connection

As before, we first need a queue connection, but this time we use a rmq.TestConnection that doesn't need any connection settings.

testConn := rmq.NewTestConnection()

If you are using a testing framework that uses test suites, you can reuse that test connection by setting it up once for the suite and resetting it with testConn.Reset before each test.

Producer Test

Now lets say we want to test the function publishTask that creates a task and publishes it to a queue from that connection.

// call the function that should publish a task

// check that the task is published
c.Check(suite.testConn.GetDelivery("tasks", 0), Equals, "task payload")

The c.Check part is from gocheck, but it will look similar for other testing frameworks. Given a rmq.TestConnection, we can check the deliveries that were published to it's queues (since the last Reset() call) with GetDelivery(queue, index). In this case we want to extract the first (and possibly only) delivery that was published to queue tasks and just check the payload string.

If the payload is JSON again, the unmarshalling and check might look like this:

var task Task
err := json.Unmarshal([]byte(suite.testConn.GetDelivery("tasks", 0)), &task)
c.Assert(err, IsNil)
c.Assert(task, NotNil)
c.Check(task.Property, Equals, "value")

If you expect a producer to create multiple deliveries you can use different indexes to access them all.

c.Check(suite.testConn.GetDelivery("tasks", 0), Equals, "task1")
c.Check(suite.testConn.GetDelivery("tasks", 1), Equals, "task2")

For convenience there's also a function GetDeliveries that returns all published deliveries to a queue as string array.

c.Check(suite.testConn.GetDeliveries("tasks"), DeepEquals, []string{"task1", "task2"})

If your producer doesn't have guarantees about the order of its deliveries, you could implement a selector function like findByPrefix and then check each delivery regardless of their index.

tasks := suite.testConn.GetDeliveries("tasks")
c.Assert(tasks, HasLen, 2)
xTask := findByPrefix(tasks, "x")
yTask := findByPrefix(tasks, "y")
c.Check(xTask.Id, Equals, "3")
c.Check(yTask.Id, Equals, "4")

These examples assumed that you inject the rmq.Connection into your testable functions. If you inject instances of rmq.Queue instead, you can use rmq.TestQueue instances in tests and access their LastDeliveries (since Reset()) directly.

Consumer Test

Testing consumers is a bit easier because consumers must implement the rmq.Consumer interface. In the tests just create rmq.TestDelivery and pass it to your Consume implementation. This example creates a test delivery from a string and then checks that the delivery was acked.

consumer := &TaskConsumer{}
delivery := rmq.NewTestDeliveryString("task payload")


c.Check(delivery.State, Equals, rmq.Acked)

The State field will always be one of these values:

  • rmq.Acked: The delivery was acked
  • rmq.Rejected: The delivery was rejected
  • rmq.Pushed: The delivery was pushed (see below)
  • rmq.Unacked: Nothing of the above

If your packages are JSON marshalled objects, then you can create test deliveries out of those like this:

task := Task{Property: "bad value"}
delivery := rmq.NewTestDelivery(task)


Given a connection, you can call connection.CollectStats to receive rmq.Stats about all open queues, connections and consumers. If you run example/handler.go you can see what's available:

In this example you see three connections consuming things, each wich 10 consumers each. Two of them have 8 packages unacked. Below the marker you see connections which are not consuming. One of the handler connections died because I stopped the handler. Running the cleaner would clean that up (see below).




This section is empty.


This section is empty.


func ActiveSign

func ActiveSign(active bool) string

func OpenConnection

func OpenConnection(tag, network, address string, db int) *redisConnection

OpenConnection opens and returns a new connection

func OpenConnectionWithRedisClient

func OpenConnectionWithRedisClient(tag string, redisClient *redis.Client) *redisConnection

OpenConnectionWithRedisClient opens and returns a new connection


type BatchConsumer

type BatchConsumer interface {
	Consume(batch Deliveries)

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields

func NewCleaner

func NewCleaner(connection *redisConnection) *Cleaner

func (*Cleaner) Clean

func (cleaner *Cleaner) Clean() error

func (*Cleaner) CleanConnection

func (cleaner *Cleaner) CleanConnection(connection *redisConnection) error

func (*Cleaner) CleanQueue

func (cleaner *Cleaner) CleanQueue(queue *redisQueue)

type Connection

type Connection interface {
	OpenQueue(name string) Queue
	CollectStats(queueList []string) Stats
	GetOpenQueues() []string

Connection is an interface that can be used to test publishing

type ConnectionStat

type ConnectionStat struct {
	// contains filtered or unexported fields

func (ConnectionStat) String

func (stat ConnectionStat) String() string

type ConnectionStats

type ConnectionStats map[string]ConnectionStat

type Consumer

type Consumer interface {
	Consume(delivery Delivery)

type Deliveries

type Deliveries []Delivery

func (Deliveries) Ack

func (deliveries Deliveries) Ack() int

func (Deliveries) Reject

func (deliveries Deliveries) Reject() int

type Delivery

type Delivery interface {
	Payload() string
	Ack() bool
	Reject() bool
	Push() bool

type Queue

type Queue interface {
	Publish(payload string) bool
	PublishOnDelay(payload string, delayedAt time.Time) bool
	PublishBytes(payload []byte) bool
	PublishBytesOnDelay(payload []byte, delayedAt time.Time) bool
	PublishRejected(payload string) bool
	SetPushQueue(pushQueue Queue)
	StartConsuming(prefetchLimit int, pollDuration time.Duration) bool
	StopConsuming() bool
	AddConsumer(tag string, consumer Consumer) string
	AddBatchConsumer(tag string, batchSize int, consumer BatchConsumer) string
	AddBatchConsumerWithTimeout(tag string, batchSize int, timeout time.Duration, consumer BatchConsumer) string
	PurgeReady() int
	PurgeRejected() int
	ReturnRejected(count int) int
	ReturnAllRejected() int
	Close() bool

type QueueStat

type QueueStat struct {
	ReadyCount    int `json:"ready"`
	RejectedCount int `json:"rejected"`
	// contains filtered or unexported fields

func NewQueueStat

func NewQueueStat(readyCount, rejectedCount int) QueueStat

func (QueueStat) ConnectionCount

func (stat QueueStat) ConnectionCount() int

func (QueueStat) ConsumerCount

func (stat QueueStat) ConsumerCount() int

func (QueueStat) String

func (stat QueueStat) String() string

func (QueueStat) UnackedCount

func (stat QueueStat) UnackedCount() int

type QueueStats

type QueueStats map[string]QueueStat

type State

type State int
const (
	Unacked State = iota

func (State) String

func (i State) String() string

type Stats

type Stats struct {
	QueueStats QueueStats `json:"queues"`
	// contains filtered or unexported fields

func CollectStats

func CollectStats(queueList []string, mainConnection *redisConnection) Stats

func NewStats

func NewStats() Stats

func (Stats) GetHtml

func (stats Stats) GetHtml(layout, refresh string) string

func (Stats) String

func (stats Stats) String() string

type TestBatchConsumer

type TestBatchConsumer struct {
	LastBatch Deliveries
	// contains filtered or unexported fields

func NewTestBatchConsumer

func NewTestBatchConsumer() *TestBatchConsumer

func (*TestBatchConsumer) Consume

func (consumer *TestBatchConsumer) Consume(batch Deliveries)

func (*TestBatchConsumer) Finish

func (consumer *TestBatchConsumer) Finish()

type TestConnection

type TestConnection struct {
	// contains filtered or unexported fields

func NewTestConnection

func NewTestConnection() TestConnection

func (TestConnection) CollectStats

func (connection TestConnection) CollectStats(queueList []string) Stats

func (TestConnection) GetDeliveries

func (connection TestConnection) GetDeliveries(queueName string) []string

func (TestConnection) GetDelivery

func (connection TestConnection) GetDelivery(queueName string, index int) string

func (TestConnection) GetOpenQueues

func (connection TestConnection) GetOpenQueues() []string

func (TestConnection) OpenQueue

func (connection TestConnection) OpenQueue(name string) Queue

func (TestConnection) Reset

func (connection TestConnection) Reset()

type TestConsumer

type TestConsumer struct {
	AutoAck       bool
	AutoFinish    bool
	SleepDuration time.Duration

	LastDelivery   Delivery
	LastDeliveries []Delivery
	// contains filtered or unexported fields

func NewTestConsumer

func NewTestConsumer(name string) *TestConsumer

func (*TestConsumer) Consume

func (consumer *TestConsumer) Consume(delivery Delivery)

func (*TestConsumer) Finish

func (consumer *TestConsumer) Finish()

func (*TestConsumer) String

func (consumer *TestConsumer) String() string

type TestDelivery

type TestDelivery struct {
	State State
	// contains filtered or unexported fields

func NewTestDelivery

func NewTestDelivery(content interface{}) *TestDelivery

func NewTestDeliveryString

func NewTestDeliveryString(payload string) *TestDelivery

func (*TestDelivery) Ack

func (delivery *TestDelivery) Ack() bool

func (*TestDelivery) Delay

func (delivery *TestDelivery) Delay(delayedAt time.Time) bool

func (*TestDelivery) Payload

func (delivery *TestDelivery) Payload() string

func (*TestDelivery) Push

func (delivery *TestDelivery) Push() bool

func (*TestDelivery) Reject

func (delivery *TestDelivery) Reject() bool

type TestQueue

type TestQueue struct {
	LastDeliveries []string
	// contains filtered or unexported fields

func NewTestQueue

func NewTestQueue(name string) *TestQueue

func (*TestQueue) AddBatchConsumer

func (queue *TestQueue) AddBatchConsumer(tag string, batchSize int, consumer BatchConsumer) string

func (*TestQueue) AddBatchConsumerWithTimeout

func (queue *TestQueue) AddBatchConsumerWithTimeout(tag string, batchSize int, timeout time.Duration, consumer BatchConsumer) string

func (*TestQueue) AddConsumer

func (queue *TestQueue) AddConsumer(tag string, consumer Consumer) string

func (*TestQueue) Close

func (queue *TestQueue) Close() bool

func (*TestQueue) Publish

func (queue *TestQueue) Publish(payload string) bool

func (*TestQueue) PublishBytes

func (queue *TestQueue) PublishBytes(payload []byte) bool

func (*TestQueue) PublishBytesOnDelay

func (queue *TestQueue) PublishBytesOnDelay(payload []byte, delayedAt time.Time) bool

func (*TestQueue) PublishOnDelay

func (queue *TestQueue) PublishOnDelay(payload string, delayedAt time.Time) bool

func (*TestQueue) PublishRejected

func (queue *TestQueue) PublishRejected(payload string) bool

func (*TestQueue) PurgeReady

func (queue *TestQueue) PurgeReady() int

func (*TestQueue) PurgeRejected

func (queue *TestQueue) PurgeRejected() int

func (*TestQueue) Reset

func (queue *TestQueue) Reset()

func (*TestQueue) ReturnAllRejected

func (queue *TestQueue) ReturnAllRejected() int

func (*TestQueue) ReturnRejected

func (queue *TestQueue) ReturnRejected(count int) int

func (*TestQueue) SetPushQueue

func (queue *TestQueue) SetPushQueue(pushQueue Queue)

func (*TestQueue) StartConsuming

func (queue *TestQueue) StartConsuming(prefetchLimit int, pollDuration time.Duration) bool

func (*TestQueue) StopConsuming

func (queue *TestQueue) StopConsuming() bool

func (*TestQueue) String

func (queue *TestQueue) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL