rabbitmq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2021 License: MIT Imports: 6 Imported by: 147

README

go-rabbitmq

Wrapper of streadway/amqp that provides reconnection logic and sane defaults. Hit the project with a star if you find it useful ⭐

Supported by Qvault

Deploy

Motivation

Streadway's AMQP library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.

The goal with go-rabbitmq is to still provide most all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly:

  • Automatic reconnection
  • Multithreaded consumers via a handler function
  • Reasonable defaults
  • Flow control handling

⚙️ Installation

Outside of a Go module:

go get github.com/wagslane/go-rabbitmq

🚀 Quick Start Consumer

Default options
consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost")
if err != nil {
    log.Fatal(err)
}
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) bool {
        log.Printf("consumed: %v", string(d.Body))
        // true to ACK, false to NACK
        return true
    },
    "my_queue",
    []string{"routing_key1", "routing_key2"}
)
if err != nil {
    log.Fatal(err)
}
With options
consumer, err := rabbitmq.NewConsumer(
    "amqp://user:pass@localhost",
    // can pass nothing for no logging
    func(opts *rabbitmq.ConsumerOptions) {
        opts.Logging = true
    },
)
if err != nil {
    log.Fatal(err)
}
err = consumer.StartConsuming(
    func(d rabbitmq.Delivery) bool {
        log.Printf("consumed: %v", string(d.Body))
        // true to ACK, false to NACK
        return true
    },
    "my_queue",
    []string{"routing_key1", "routing_key2"},
    // can pass nothing here for defaults
    func(opts *rabbitmq.ConsumeOptions) {
        opts.QueueDurable = true
        opts.Concurrency = 10
        opts.QOSPrefetch = 100
    },
)
if err != nil {
    log.Fatal(err)
}

🚀 Quick Start Publisher

Default options
publisher, returns, err := rabbitmq.NewPublisher("amqp://user:pass@localhost")
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish([]byte("hello, world"), []string{"routing_key"})
if err != nil {
    log.Fatal(err)
}
With options
publisher, returns, err := rabbitmq.NewPublisher(
    "amqp://user:pass@localhost",
    // can pass nothing for no logging
    func(opts *rabbitmq.PublisherOptions) {
        opts.Logging = true
    },
)
if err != nil {
    log.Fatal(err)
}
err = publisher.Publish(
    []byte("hello, world"),
    []string{"routing_key"},
    // leave blank for defaults
    func(opts *rabbitmq.PublishOptions) {
        opts.DeliveryMode = rabbitmq.Persistent
        opts.Mandatory = true
        opts.ContentType = "application/json"
    },
)
if err != nil {
    log.Fatal(err)
}

go func() {
    for r := range returns {
        log.Printf("message returned from server: %s", string(r.Body))
    }
}()

💬 Contact

Twitter Follow

Submit an issue (above in the issues tab)

Transient Dependencies

My goal is to keep dependencies limited to 1, github.com/streadway/amqp.

👏 Contributing

I love help! Contribute by forking the repo and opening pull requests. Please ensure that your code passes the existing tests and linting, and write tests to test your changes if applicable.

All pull requests should be submitted to the main branch.

Documentation

Index

Constants

View Source
const (
	Transient  uint8 = amqp.Transient
	Persistent uint8 = amqp.Persistent
)

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumeOptions

type ConsumeOptions struct {
	QueueDurable      bool
	QueueAutoDelete   bool
	QueueExclusive    bool
	QueueNoWait       bool
	QueueArgs         Table
	BindingExchange   string
	BindingNoWait     bool
	BindingArgs       Table
	Concurrency       int
	QOSPrefetch       int
	QOSGlobal         bool
	ConsumerName      string
	ConsumerAutoAck   bool
	ConsumerExclusive bool
	ConsumerNoWait    bool
	ConsumerNoLocal   bool
	ConsumerArgs      Table
}

ConsumeOptions are used to describe how a new consumer will be created.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer allows you to create and connect to queues for data consumption.

func NewConsumer added in v0.2.0

func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error)

NewConsumer returns a new Consumer connected to the given rabbitmq server

func (Consumer) StartConsuming added in v0.1.0

func (consumer Consumer) StartConsuming(
	handler func(d Delivery) bool,
	queue string,
	routingKeys []string,
	optionFuncs ...func(*ConsumeOptions),
) error

StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). The provided handler is called once for each message. If the provided queue doesn't exist, it will be created on the cluster

type ConsumerOptions

type ConsumerOptions struct {
	Logging bool
}

ConsumerOptions are used to describe a consumer's configuration. Logging set to true will enable the consumer to print to stdout

type Delivery

type Delivery struct {
	amqp.Delivery
}

Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.

type PublishOptions

type PublishOptions struct {
	Exchange string
	// Mandatory fails to publish if there are no queues
	// bound to the routing key
	Mandatory bool
	// Immediate fails to publish if there are no consumers
	// that can ack bound to the queue on the routing key
	Immediate   bool
	ContentType string
	// Transient or Persistent
	DeliveryMode uint8
}

PublishOptions are used to control how data is published

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher allows you to publish messages safely across an open connection

func NewPublisher added in v0.2.0

func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error)

NewPublisher returns a new publisher with an open channel to the cluster. If you plan to enforce mandatory or immediate publishing, those failures will be reported on the channel of Returns that you should setup a listener on. Flow controls are automatically handled as they are sent from the server, and publishing will fail with an error when the server is requesting a slowdown

func (*Publisher) Publish

func (publisher *Publisher) Publish(
	data []byte,
	routingKeys []string,
	optionFuncs ...func(*PublishOptions),
) error

Publish publishes the provided data to the given routing keys over the connection

type PublisherOptions added in v0.1.0

type PublisherOptions struct {
	Logging bool
}

PublisherOptions are used to describe a publisher's configuration. Logging set to true will enable the consumer to print to stdout

type Return

type Return struct {
	amqp.Return
}

Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.

type Table

type Table map[string]interface{}

Table stores user supplied fields of the following types:

bool
byte
float32
float64
int
int16
int32
int64
nil
string
time.Time
amqp.Decimal
amqp.Table
[]byte
[]interface{} - containing above types

Functions taking a table will immediately fail when the table contains a value of an unsupported type.

The caller must be specific in which precision of integer it wishes to encode.

Use a type assertion when reading values from a table for type conversion.

RabbitMQ expects int32 for integer values.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL