rabbit

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2023 License: MIT Imports: 9 Imported by: 0

README

rabbit

Master build status Go Report Card

A RabbitMQ wrapper lib around streadway/amqp rabbitmq/amqp091-go with some bells and whistles.

NOTE: streadway/amqp is no longer maintained and RabbitMQ team have forked streadway/amqp and created rabbitmq/amqp091-go. You can read about this change here. This library uses rabbitmq/amqp091-go.

  • Support for auto-reconnect
  • Support for context (ie. cancel/timeout)
  • Support for using multiple binding keys
  • Support Producer, Consumer or both modes

Motivation

We (Batch), make heavy use of RabbitMQ - we use it as the primary method for facilitating inter-service communication. Due to this, all services make use of RabbitMQ and are both publishers and consumers.

We wrote this lib to ensure that all of our services make use of Rabbit in a consistent, predictable way AND are able to survive network blips.

NOTE: This library works only with non-default exchanges. If you need support for default exchange - open a PR!

Usage

package main

import (
    "fmt"
    "log"  

    "github.com/batchcorp/rabbit"
)

func main() { 
    r, err := rabbit.New(&rabbit.Options{
        URL:          "amqp://localhost",
        QueueName:    "my-queue",
        ExchangeName: "messages",
        BindingKeys:   []string{"messages"},
    })
    if err != nil {
        log.Fatalf("unable to instantiate rabbit: %s", err)
    }
    
    routingKey := "messages"
    data := []byte("pumpkins")

    // Publish something
    if err := r.Publish(context.Background(), routingKey, data); err != nil {
        log.Fatalf("unable to publish message: ")
    }

    // Consume once
    if err := r.ConsumeOnce(nil, func(amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
    }); err != nil {
        log.Fatalf("unable to consume once: %s", err),
    }

    var numReceived int

    // Consume forever (blocks)
    ctx, cancel := context.WithCancel(context.Background())

    r.Consume(ctx, nil, func(msg amqp.Delivery) error {
        fmt.Printf("Received new message: %+v\n", msg)
        
        numReceived++
        
        if numReceived > 1 {
            r.Stop()
        }
    })

    // Or stop via ctx 
    r.Consume(..)
    cancel()
}

Documentation

Overview

Package rabbit is a simple streadway/amqp wrapper library that comes with:

* Auto-reconnect support

* Context support

* Helpers for consuming once or forever and publishing

The library is used internally at https://batch.sh where it powers most of the platform's backend services.

For an example, refer to the README.md.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrShutdown will be returned if the underlying connection has already
	// been closed (ie. if you Close()'d and then tried to Publish())
	ErrShutdown = errors.New("connection has been shutdown")

	// DefaultConsumerTag is used for identifying consumer
	DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8]

	// DefaultAppID is used for identifying the producer
	DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8]
)

Functions

func ValidateOptions

func ValidateOptions(opts *Options) error

ValidateOptions validates various combinations of options.

Types

type Binding

type Binding struct {
	// Required
	ExchangeName string

	// Bind a queue to one or more routing keys
	BindingKeys []string

	// Whether to declare/create exchange on connect
	ExchangeDeclare bool

	// Required if declaring queue (valid: direct, fanout, topic, headers)
	ExchangeType string

	// Whether exchange should survive/persist server restarts
	ExchangeDurable bool

	// Whether to delete exchange when its no longer used; used only if ExchangeDeclare set to true
	ExchangeAutoDelete bool
}

Binding represents the information needed to bind a queue to an Exchange.

type ConsumeError

type ConsumeError struct {
	Message *amqp.Delivery
	Error   error
}

ConsumeError will be passed down the error channel if/when `f()` func runs into an error during `Consume()`.

type IRabbit

type IRabbit interface {
	Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)
	ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error
	PublishWithContext(ctx context.Context, routingKey string, msg amqp.Publishing) error
	Stop() error
	Close() error
}

IRabbit is the interface that the `rabbit` library implements. It's here as convenience.

type Logger

type Logger interface {
	// Debug sends out a debug message with the given arguments to the logger.
	Debug(args ...interface{})
	// Debugf formats a debug message using the given arguments and sends it to the logger.
	Debugf(format string, args ...interface{})
	// Info sends out an informational message with the given arguments to the logger.
	Info(args ...interface{})
	// Infof formats an informational message using the given arguments and sends it to the logger.
	Infof(format string, args ...interface{})
	// Warn sends out a warning message with the given arguments to the logger.
	Warn(args ...interface{})
	// Warnf formats a warning message using the given arguments and sends it to the logger.
	Warnf(format string, args ...interface{})
	// Error sends out an error message with the given arguments to the logger.
	Error(args ...interface{})
	// Errorf formats an error message using the given arguments and sends it to the logger.
	Errorf(format string, args ...interface{})
}

Logger is the common interface for user-provided loggers.

type Mode

type Mode int

Mode is the type used to represent whether the RabbitMQ clients is acting as a consumer, a producer, or both.

const (
	// DefaultRetryReconnectSec determines how long to wait before attempting
	// to reconnect to a rabbit server
	DefaultRetryReconnectSec = 60

	// Both means that the client is acting as both a consumer and a producer.
	Both Mode = 0
	// Consumer means that the client is acting as a consumer.
	Consumer Mode = 1
	// Producer means that the client is acting as a producer.
	Producer Mode = 2
)

type NoOpLogger

type NoOpLogger struct {
}

NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.

func (*NoOpLogger) Debug

func (l *NoOpLogger) Debug(args ...interface{})

Debug is no-op implementation of Logger's Debug.

func (*NoOpLogger) Debugf

func (l *NoOpLogger) Debugf(format string, args ...interface{})

Debugf is no-op implementation of Logger's Debugf.

func (*NoOpLogger) Error

func (l *NoOpLogger) Error(args ...interface{})

Error is no-op implementation of Logger's Error.

func (*NoOpLogger) Errorf

func (l *NoOpLogger) Errorf(format string, args ...interface{})

Errorf is no-op implementation of Logger's Errorf.

func (*NoOpLogger) Info

func (l *NoOpLogger) Info(args ...interface{})

Info is no-op implementation of Logger's Info.

func (*NoOpLogger) Infof

func (l *NoOpLogger) Infof(format string, args ...interface{})

Infof is no-op implementation of Logger's Infof.

func (*NoOpLogger) Warn

func (l *NoOpLogger) Warn(args ...interface{})

Warn is no-op implementation of Logger's Warn.

func (*NoOpLogger) Warnf

func (l *NoOpLogger) Warnf(format string, args ...interface{})

Warnf is no-op implementation of Logger's Warnf.

type Options

type Options struct {
	// Required; format "amqp://user:pass@host:port"
	URLs []string

	// In what mode does the library operate (Both, Consumer, Producer)
	Mode Mode

	// If left empty, server will auto generate queue name
	QueueName string

	// Bindings is the set of information need to bind a queue to one or
	// more exchanges, specifying one or more binding (routing) keys.
	Bindings []Binding

	// https://godoc.org/github.com/streadway/amqp#Channel.Qos
	// Leave unset if no QoS preferences
	QosPrefetchCount int
	QosPrefetchSize  int

	// How long to wait before we retry connecting to a server (after disconnect)
	RetryReconnectSec int

	// Whether queue should survive/persist server restarts (and there are no remaining bindings)
	QueueDurable bool

	// Whether consumer should be the sole consumer of the queue; used only if
	// QueueDeclare set to true
	QueueExclusive bool

	// Whether to delete queue on consumer disconnect; used only if QueueDeclare set to true
	QueueAutoDelete bool

	// Whether to declare/create queue on connect; used only if QueueDeclare set to true
	QueueDeclare bool

	// Additional arguements to pass to the queue declaration or binding
	// https://github.com/batchcorp/plumber/issues/210
	QueueArgs map[string]interface{}

	// Whether to automatically acknowledge consumed message(s)
	AutoAck bool

	// Used for identifying consumer
	ConsumerTag string

	// Used as a property to identify producer
	AppID string

	// Use TLS
	UseTLS bool

	// Skip cert verification (only applies if UseTLS is true)
	SkipVerifyTLS bool

	// Log is the (optional) logger to use for writing out log messages.
	Log Logger
}

Options determines how the `rabbit` library will behave and should be passed in to rabbit via `New()`. Many of the options are optional (and will fall back to sane defaults).

type Rabbit

type Rabbit struct {
	Conn                    *amqp.Connection
	ConsumerDeliveryChannel <-chan amqp.Delivery
	ConsumerRWMutex         *sync.RWMutex
	NotifyCloseChan         chan *amqp.Error
	ProducerServerChannel   *amqp.Channel
	ProducerRWMutex         *sync.RWMutex
	ConsumeLooper           director.Looper
	Options                 *Options
	// contains filtered or unexported fields
}

Rabbit struct that is instantiated via `New()`. You should not instantiate this struct by hand (unless you have a really good reason to do so).

func New

func New(opts *Options) (*Rabbit, error)

New is used for instantiating the library.

func (*Rabbit) Close

func (r *Rabbit) Close() error

Close stops any active Consume and closes the amqp connection (and channels using the conn)

You should re-instantiate the rabbit lib once this is called.

func (*Rabbit) Consume

func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)

Consume consumes messages from the configured queue (`Options.QueueName`) and executes `f` for every received message.

`Consume()` will block until it is stopped either via the passed in `ctx` OR by calling `Stop()`

It is also possible to see the errors that `f()` runs into by passing in an error channel (`chan *ConsumeError`).

Both `ctx` and `errChan` can be `nil`.

If the server goes away, `Consume` will automatically attempt to reconnect. Subsequent reconnect attempts will sleep/wait for `DefaultRetryReconnectSec` between attempts.

func (*Rabbit) ConsumeOnce

func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error

ConsumeOnce will consume exactly one message from the configured queue, execute `runFunc()` on the message and return.

Same as with `Consume()`, you can pass in a context to cancel `ConsumeOnce()` or run `Stop()`.

func (*Rabbit) PublishWithContext

func (r *Rabbit) PublishWithContext(ctx context.Context, routingKey string, msg amqp.Publishing) error

PublishWithContext publishes one message to the configured exchange, using the specified routing key.

func (*Rabbit) Stop

func (r *Rabbit) Stop() error

Stop stops an in-progress `Consume()` or `ConsumeOnce()`.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL