Documentation ¶
Overview ¶
Package rabbit is a simple streadway/amqp wrapper library that comes with:
* Auto-reconnect support
* Context support
* Helpers for consuming once or forever and publishing
The library is used internally at https://batch.sh where it powers most of the platform's backend services.
For an example, refer to the README.md.
Index ¶
- Variables
- func ValidateOptions(opts *Options) error
- type Binding
- type ConsumeError
- type IRabbit
- type Logger
- type Mode
- type NoOpLogger
- func (l *NoOpLogger) Debug(args ...interface{})
- func (l *NoOpLogger) Debugf(format string, args ...interface{})
- func (l *NoOpLogger) Error(args ...interface{})
- func (l *NoOpLogger) Errorf(format string, args ...interface{})
- func (l *NoOpLogger) Info(args ...interface{})
- func (l *NoOpLogger) Infof(format string, args ...interface{})
- func (l *NoOpLogger) Warn(args ...interface{})
- func (l *NoOpLogger) Warnf(format string, args ...interface{})
- type Options
- type Rabbit
- func (r *Rabbit) Close() error
- func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, ...)
- func (r *Rabbit) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error
- func (r *Rabbit) PublishWithContext(ctx context.Context, routingKey string, msg amqp.Publishing) error
- func (r *Rabbit) Stop() error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrShutdown will be returned if the underlying connection has already // been closed (ie. if you Close()'d and then tried to Publish()) ErrShutdown = errors.New("connection has been shutdown") // DefaultConsumerTag is used for identifying consumer DefaultConsumerTag = "c-rabbit-" + uuid.NewV4().String()[0:8] // DefaultAppID is used for identifying the producer DefaultAppID = "p-rabbit-" + uuid.NewV4().String()[0:8] )
Functions ¶
func ValidateOptions ¶
ValidateOptions validates various combinations of options.
Types ¶
type Binding ¶
type Binding struct { // Required ExchangeName string // Bind a queue to one or more routing keys BindingKeys []string // Whether to declare/create exchange on connect ExchangeDeclare bool // Required if declaring queue (valid: direct, fanout, topic, headers) ExchangeType string // Whether exchange should survive/persist server restarts ExchangeDurable bool // Whether to delete exchange when its no longer used; used only if ExchangeDeclare set to true ExchangeAutoDelete bool }
Binding represents the information needed to bind a queue to an Exchange.
type ConsumeError ¶
ConsumeError will be passed down the error channel if/when `f()` func runs into an error during `Consume()`.
type IRabbit ¶
type IRabbit interface { Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error) ConsumeOnce(ctx context.Context, runFunc func(msg amqp.Delivery) error) error PublishWithContext(ctx context.Context, routingKey string, msg amqp.Publishing) error Stop() error Close() error }
IRabbit is the interface that the `rabbit` library implements. It's here as convenience.
type Logger ¶
type Logger interface { // Debug sends out a debug message with the given arguments to the logger. Debug(args ...interface{}) // Debugf formats a debug message using the given arguments and sends it to the logger. Debugf(format string, args ...interface{}) // Info sends out an informational message with the given arguments to the logger. Info(args ...interface{}) // Infof formats an informational message using the given arguments and sends it to the logger. Infof(format string, args ...interface{}) // Warn sends out a warning message with the given arguments to the logger. Warn(args ...interface{}) // Warnf formats a warning message using the given arguments and sends it to the logger. Warnf(format string, args ...interface{}) // Error sends out an error message with the given arguments to the logger. Error(args ...interface{}) // Errorf formats an error message using the given arguments and sends it to the logger. Errorf(format string, args ...interface{}) }
Logger is the common interface for user-provided loggers.
type Mode ¶
type Mode int
Mode is the type used to represent whether the RabbitMQ clients is acting as a consumer, a producer, or both.
const ( // DefaultRetryReconnectSec determines how long to wait before attempting // to reconnect to a rabbit server DefaultRetryReconnectSec = 60 // Both means that the client is acting as both a consumer and a producer. Both Mode = 0 // Consumer means that the client is acting as a consumer. Consumer Mode = 1 // Producer means that the client is acting as a producer. Producer Mode = 2 )
type NoOpLogger ¶
type NoOpLogger struct { }
NoOpLogger is a do-nothing logger; it is used internally as the default Logger when none is provided in the Options.
func (*NoOpLogger) Debug ¶
func (l *NoOpLogger) Debug(args ...interface{})
Debug is no-op implementation of Logger's Debug.
func (*NoOpLogger) Debugf ¶
func (l *NoOpLogger) Debugf(format string, args ...interface{})
Debugf is no-op implementation of Logger's Debugf.
func (*NoOpLogger) Error ¶
func (l *NoOpLogger) Error(args ...interface{})
Error is no-op implementation of Logger's Error.
func (*NoOpLogger) Errorf ¶
func (l *NoOpLogger) Errorf(format string, args ...interface{})
Errorf is no-op implementation of Logger's Errorf.
func (*NoOpLogger) Info ¶
func (l *NoOpLogger) Info(args ...interface{})
Info is no-op implementation of Logger's Info.
func (*NoOpLogger) Infof ¶
func (l *NoOpLogger) Infof(format string, args ...interface{})
Infof is no-op implementation of Logger's Infof.
func (*NoOpLogger) Warn ¶
func (l *NoOpLogger) Warn(args ...interface{})
Warn is no-op implementation of Logger's Warn.
func (*NoOpLogger) Warnf ¶
func (l *NoOpLogger) Warnf(format string, args ...interface{})
Warnf is no-op implementation of Logger's Warnf.
type Options ¶
type Options struct { // Required; format "amqp://user:pass@host:port" URLs []string // In what mode does the library operate (Both, Consumer, Producer) Mode Mode // If left empty, server will auto generate queue name QueueName string // Bindings is the set of information need to bind a queue to one or // more exchanges, specifying one or more binding (routing) keys. Bindings []Binding // https://godoc.org/github.com/streadway/amqp#Channel.Qos // Leave unset if no QoS preferences QosPrefetchCount int QosPrefetchSize int // How long to wait before we retry connecting to a server (after disconnect) RetryReconnectSec int // Whether queue should survive/persist server restarts (and there are no remaining bindings) QueueDurable bool // Whether consumer should be the sole consumer of the queue; used only if // QueueDeclare set to true QueueExclusive bool // Whether to delete queue on consumer disconnect; used only if QueueDeclare set to true QueueAutoDelete bool // Whether to declare/create queue on connect; used only if QueueDeclare set to true QueueDeclare bool // Additional arguements to pass to the queue declaration or binding // https://github.com/batchcorp/plumber/issues/210 QueueArgs map[string]interface{} // Whether to automatically acknowledge consumed message(s) AutoAck bool // Used for identifying consumer ConsumerTag string // Used as a property to identify producer AppID string // Use TLS UseTLS bool // Skip cert verification (only applies if UseTLS is true) SkipVerifyTLS bool // Log is the (optional) logger to use for writing out log messages. Log Logger }
Options determines how the `rabbit` library will behave and should be passed in to rabbit via `New()`. Many of the options are optional (and will fall back to sane defaults).
type Rabbit ¶
type Rabbit struct { Conn *amqp.Connection ConsumerDeliveryChannel <-chan amqp.Delivery ConsumerRWMutex *sync.RWMutex NotifyCloseChan chan *amqp.Error ProducerServerChannel *amqp.Channel ProducerRWMutex *sync.RWMutex ConsumeLooper director.Looper Options *Options // contains filtered or unexported fields }
Rabbit struct that is instantiated via `New()`. You should not instantiate this struct by hand (unless you have a really good reason to do so).
func (*Rabbit) Close ¶
Close stops any active Consume and closes the amqp connection (and channels using the conn)
You should re-instantiate the rabbit lib once this is called.
func (*Rabbit) Consume ¶
func (r *Rabbit) Consume(ctx context.Context, errChan chan *ConsumeError, f func(msg amqp.Delivery) error)
Consume consumes messages from the configured queue (`Options.QueueName`) and executes `f` for every received message.
`Consume()` will block until it is stopped either via the passed in `ctx` OR by calling `Stop()`
It is also possible to see the errors that `f()` runs into by passing in an error channel (`chan *ConsumeError`).
Both `ctx` and `errChan` can be `nil`.
If the server goes away, `Consume` will automatically attempt to reconnect. Subsequent reconnect attempts will sleep/wait for `DefaultRetryReconnectSec` between attempts.
func (*Rabbit) ConsumeOnce ¶
ConsumeOnce will consume exactly one message from the configured queue, execute `runFunc()` on the message and return.
Same as with `Consume()`, you can pass in a context to cancel `ConsumeOnce()` or run `Stop()`.
func (*Rabbit) PublishWithContext ¶
func (r *Rabbit) PublishWithContext(ctx context.Context, routingKey string, msg amqp.Publishing) error
PublishWithContext publishes one message to the configured exchange, using the specified routing key.