rabbitmq

package
v1.5.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2023 License: MIT Imports: 10 Imported by: 0

README

rabbitmq

rabbitmq library wrapped in github.com/rabbitmq/amqp091-go, supports automatic reconnection and customized setting parameters, includes direct, topic, fanout, headers, delayed message, publisher subscriber a total of six message types.

Example of use

Code Example

The code example includes direct, topic, fanout, headers, delayed message, publisher subscriber a total of six message types.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/zhufuyi/sponge/pkg/logger"
	"github.com/zhufuyi/sponge/pkg/rabbitmq"
)

func main() {
	url := "amqp://guest:guest@127.0.0.1:5672/"

	directExample(url)

	topicExample(url)

	fanoutExample(url)

	headersExample(url)

	delayedMessageExample(url)

	publisherSubscriberExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue-1"
	routeKey := "direct-key-1"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		err = p.PublishDirect(context.Background(), []byte("[direct] say hello"))
		checkErr(err)
	}()

	// consumer-side direct message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo"
	queueName := "topic-queue-1"
	routingKey := "key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		key := "key1.key2.key3"
		err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" say hello"))
		checkErr(err)
	}()

	// consumer-side topic message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo"
	queueName := "fanout-queue-1"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		err = p.PublishFanout(context.Background(), []byte("[fanout] say hello"))
		checkErr(err)
	}()

	// consumer-side fanout message
	func() {
		runConsume(url, exchange, queueName)
		queueName = "fanout-queue-2"
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo"
	queueName := "headers-queue-1"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		ctx := context.Background()
		headersKeys1 := headersKeys
		err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] say hello 1"))
		checkErr(err)
		headersKeys2 := map[string]interface{}{"foo": "bar"}
		err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] say hello 2"))
		checkErr(err)
	}()

	// consumer-side headers message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo"
	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed message] say hello "+time.Now().Format(datetimeLayout)))
		checkErr(err)
	}()

	// consumer-side delayed message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second * 4)
}

func publisherSubscriberExample(url string) {
	channelName := "pub-sub"
	fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")

	// publisher-side message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewPublisher(channelName, connection)
		checkErr(err)
		defer p.Close()

		err = p.Publish(context.Background(), []byte("[pub-sub] say hello"))
		checkErr(err)
	}()

	// subscriber-side message
	func() {
		identifier := "pub-sub-queue-1"
		runSubscriber(url, channelName, identifier)
		identifier = "pub-sub-queue-2"
		runSubscriber(url, channelName, identifier)
	}()

	<-time.After(time.Second)
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string) {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	c.Consume(context.Background(), handler)
}

func runSubscriber(url string, channelName string, identifier string) {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	s, err := rabbitmq.NewSubscriber(channelName, identifier, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	s.Subscribe(context.Background(), handler)
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

Example of Automatic Resumption of Publish

If the error of publish is caused by the network, you can check if the reconnection is successful and publish it again.

package main

import (
	"context"
	"errors"
	"strconv"
	"time"

	"github.com/zhufuyi/sponge/pkg/logger"
	"github.com/zhufuyi/sponge/pkg/rabbitmq"
)

var url = "amqp://guest:guest@127.0.0.1:5672/"

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Hour)
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue"
	routeKey := "info"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)

	err := runConsume(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runConsume failed", logger.Err(err))
		return
	}

	err = runProduce(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runProduce failed", logger.Err(err))
		return
	}
}

func runProduce(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}
	defer connection.Close()

	p, err := rabbitmq.NewProducer(exchange, queueName, connection)
	if err != nil {
		return err
	}
	defer p.Close()

	count := 0
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			count++
			data := []byte("direct say hello" + strconv.Itoa(count))
			err = p.PublishDirect(ctx, data)
			if err != nil {
				if errors.Is(err, rabbitmq.ErrClosed) {
					for {
						if !connection.CheckConnected() { // check connection
							time.Sleep(time.Second * 2)
							continue
						}
						p, err = rabbitmq.NewProducer(exchange, queueName, connection)
						if err != nil {
							logger.Warn("reconnect failed", logger.Err(err))
							time.Sleep(time.Second * 2)
							continue
						}
						break
					}
				} else {
					logger.Warn("publish failed", logger.Err(err))
				}
			}
			logger.Info("publish message", logger.String("data", string(data)))
			time.Sleep(time.Second * 5)
		}
	}
}

func runConsume(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	if err != nil {
		return err
	}

	c.Consume(ctx, handler)

	return nil
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

Documentation

Overview

Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go

producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message. publisher and subscriber using the fanout message type.

Index

Constants

View Source
const DefaultURL = "amqp://guest:guest@localhost:5672/"

DefaultURL default rabbitmq url

Variables

View Source
var ErrClosed = amqp.ErrClosed

ErrClosed closed

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection rabbitmq connection

func NewConnection

func NewConnection(url string, opts ...ConnectionOption) (*Connection, error)

NewConnection rabbitmq connection

func (*Connection) CheckConnected

func (c *Connection) CheckConnected() bool

CheckConnected rabbitmq connection

func (*Connection) Close

func (c *Connection) Close()

Close rabbitmq connection

type ConnectionOption

type ConnectionOption func(*connectionOptions)

ConnectionOption connection option.

func WithLogger

func WithLogger(zapLog *zap.Logger) ConnectionOption

WithLogger set logger option.

func WithReconnectTime

func WithReconnectTime(d time.Duration) ConnectionOption

WithReconnectTime set reconnect time interval option.

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption

WithTLSConfig set tls config option.

type ConsumeOption added in v1.5.6

type ConsumeOption func(*consumeOptions)

ConsumeOption consume option.

func WithConsumeArgs added in v1.5.6

func WithConsumeArgs(args map[string]interface{}) ConsumeOption

WithConsumeArgs set consume args option.

func WithConsumeConsumer added in v1.5.6

func WithConsumeConsumer(consumer string) ConsumeOption

WithConsumeConsumer set consume consumer option.

func WithConsumeExclusive added in v1.5.6

func WithConsumeExclusive(enable bool) ConsumeOption

WithConsumeExclusive set consume exclusive option.

func WithConsumeNoLocal added in v1.5.6

func WithConsumeNoLocal(enable bool) ConsumeOption

WithConsumeNoLocal set consume noLocal option.

func WithConsumeNoWait added in v1.5.6

func WithConsumeNoWait(enable bool) ConsumeOption

WithConsumeNoWait set consume no wait option.

type Consumer added in v1.5.6

type Consumer struct {
	Exchange  *Exchange
	QueueName string
	// contains filtered or unexported fields
}

Consumer session

func NewConsumer added in v1.5.6

func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error)

NewConsumer create a consumer

func (*Consumer) Close added in v1.5.6

func (c *Consumer) Close()

Close consumer

func (*Consumer) Consume added in v1.5.6

func (c *Consumer) Consume(ctx context.Context, handler Handler)

Consume messages for loop in goroutine

type ConsumerOption added in v1.5.6

type ConsumerOption func(*consumerOptions)

ConsumerOption consumer option.

func WithConsumerAutoAck added in v1.5.6

func WithConsumerAutoAck(enable bool) ConsumerOption

WithConsumerAutoAck set consumer auto ack option.

func WithConsumerConsumeOptions added in v1.5.6

func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption

WithConsumerConsumeOptions set consumer consume option.

func WithConsumerExchangeDeclareOptions added in v1.5.6

func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption

WithConsumerExchangeDeclareOptions set exchange declare option.

func WithConsumerPersistent added in v1.5.6

func WithConsumerPersistent(enable bool) ConsumerOption

WithConsumerPersistent set consumer persistent option.

func WithConsumerQosOptions added in v1.5.6

func WithConsumerQosOptions(opts ...QosOption) ConsumerOption

WithConsumerQosOptions set consume qos option.

func WithConsumerQueueBindOptions added in v1.5.6

func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption

WithConsumerQueueBindOptions set queue bind option.

func WithConsumerQueueDeclareOptions added in v1.5.6

func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption

WithConsumerQueueDeclareOptions set queue declare option.

type DelayedMessagePublishOption added in v1.5.6

type DelayedMessagePublishOption func(*delayedMessagePublishOptions)

DelayedMessagePublishOption declare queue bind option.

func WithDelayedMessagePublishHeadersKeys added in v1.5.6

func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption

WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.

func WithDelayedMessagePublishTopicKey added in v1.5.6

func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption

WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.

type Exchange added in v1.5.6

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange rabbitmq minimum management unit

func NewDelayedMessageExchange added in v1.5.6

func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange

NewDelayedMessageExchange create a delayed message exchange

func NewDirectExchange added in v1.5.6

func NewDirectExchange(exchangeName string, routingKey string) *Exchange

NewDirectExchange create a direct exchange

func NewFanoutExchange added in v1.5.6

func NewFanoutExchange(exchangeName string) *Exchange

NewFanoutExchange create a fanout exchange

func NewHeadersExchange added in v1.5.6

func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange

NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"

func NewTopicExchange added in v1.5.6

func NewTopicExchange(exchangeName string, routingKey string) *Exchange

NewTopicExchange create a topic exchange

func (*Exchange) DelayedMessageType added in v1.5.6

func (e *Exchange) DelayedMessageType() string

DelayedMessageType exchange delayed message type

func (*Exchange) HeadersKeys added in v1.5.6

func (e *Exchange) HeadersKeys() map[string]interface{}

HeadersKeys exchange headers keys

func (*Exchange) Name added in v1.5.6

func (e *Exchange) Name() string

Name exchange name

func (*Exchange) RoutingKey added in v1.5.6

func (e *Exchange) RoutingKey() string

RoutingKey exchange routing key

func (*Exchange) Type added in v1.5.6

func (e *Exchange) Type() string

Type exchange type

type ExchangeDeclareOption added in v1.5.6

type ExchangeDeclareOption func(*exchangeDeclareOptions)

ExchangeDeclareOption declare exchange option.

func WithExchangeDeclareArgs added in v1.5.6

func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption

WithExchangeDeclareArgs set exchange declare args option.

func WithExchangeDeclareAutoDelete added in v1.5.6

func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption

WithExchangeDeclareAutoDelete set exchange declare auto delete option.

func WithExchangeDeclareInternal added in v1.5.6

func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption

WithExchangeDeclareInternal set exchange declare internal option.

func WithExchangeDeclareNoWait added in v1.5.6

func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption

WithExchangeDeclareNoWait set exchange declare no wait option.

type Handler added in v1.5.6

type Handler func(ctx context.Context, data []byte, tagID string) error

Handler message

type HeadersType added in v1.5.6

type HeadersType = string

HeadersType headers type

const (

	// HeadersTypeAll all
	HeadersTypeAll HeadersType = "all"
	// HeadersTypeAny any
	HeadersTypeAny HeadersType = "any"
)

type Producer added in v1.5.6

type Producer struct {
	Exchange  *Exchange // exchange
	QueueName string    // queue name
	// contains filtered or unexported fields
}

Producer session

func NewProducer added in v1.5.6

func NewProducer(exchange *Exchange, queueName string, connection *Connection, opts ...ProducerOption) (*Producer, error)

NewProducer create a producer

func (*Producer) Close added in v1.5.6

func (p *Producer) Close()

Close the consumer

func (*Producer) PublishDelayedMessage added in v1.5.6

func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, opts ...DelayedMessagePublishOption) error

PublishDelayedMessage send delayed type message

func (*Producer) PublishDirect added in v1.5.6

func (p *Producer) PublishDirect(ctx context.Context, body []byte) error

PublishDirect send direct type message

func (*Producer) PublishFanout added in v1.5.6

func (p *Producer) PublishFanout(ctx context.Context, body []byte) error

PublishFanout send fanout type message

func (*Producer) PublishHeaders added in v1.5.6

func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error

PublishHeaders send headers type message

func (*Producer) PublishTopic added in v1.5.6

func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error

PublishTopic send topic type message

type ProducerOption added in v1.5.6

type ProducerOption func(*producerOptions)

ProducerOption producer option.

func WithProducerExchangeDeclareOptions added in v1.5.6

func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption

WithProducerExchangeDeclareOptions set exchange declare option.

func WithProducerMandatory added in v1.5.6

func WithProducerMandatory(enable bool) ProducerOption

WithProducerMandatory set producer mandatory option.

func WithProducerPersistent added in v1.5.6

func WithProducerPersistent(enable bool) ProducerOption

WithProducerPersistent set producer persistent option.

func WithProducerQueueBindOptions added in v1.5.6

func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption

WithProducerQueueBindOptions set queue bind option.

func WithProducerQueueDeclareOptions added in v1.5.6

func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption

WithProducerQueueDeclareOptions set queue declare option.

type Publisher added in v1.5.6

type Publisher struct {
	*Producer
}

Publisher session

func NewPublisher added in v1.5.6

func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error)

NewPublisher create a publisher, channelName is exchange name

func (*Publisher) Close added in v1.5.6

func (p *Publisher) Close()

Close publisher

func (*Publisher) Publish added in v1.5.6

func (p *Publisher) Publish(ctx context.Context, body []byte) error

type QosOption added in v1.5.6

type QosOption func(*qosOptions)

QosOption qos option.

func WithQosEnable added in v1.5.6

func WithQosEnable() QosOption

WithQosEnable set qos enable option.

func WithQosPrefetchCount added in v1.5.6

func WithQosPrefetchCount(count int) QosOption

WithQosPrefetchCount set qos prefetch count option.

func WithQosPrefetchGlobal added in v1.5.6

func WithQosPrefetchGlobal(enable bool) QosOption

WithQosPrefetchGlobal set qos global option.

func WithQosPrefetchSize added in v1.5.6

func WithQosPrefetchSize(size int) QosOption

WithQosPrefetchSize set qos prefetch size option.

type QueueBindOption added in v1.5.6

type QueueBindOption func(*queueBindOptions)

QueueBindOption declare queue bind option.

func WithQueueBindArgs added in v1.5.6

func WithQueueBindArgs(args map[string]interface{}) QueueBindOption

WithQueueBindArgs set queue bind args option.

func WithQueueBindNoWait added in v1.5.6

func WithQueueBindNoWait(enable bool) QueueBindOption

WithQueueBindNoWait set queue bind no wait option.

type QueueDeclareOption added in v1.5.6

type QueueDeclareOption func(*queueDeclareOptions)

QueueDeclareOption declare queue option.

func WithQueueDeclareArgs added in v1.5.6

func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption

WithQueueDeclareArgs set queue declare args option.

func WithQueueDeclareAutoDelete added in v1.5.6

func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption

WithQueueDeclareAutoDelete set queue declare auto delete option.

func WithQueueDeclareExclusive added in v1.5.6

func WithQueueDeclareExclusive(enable bool) QueueDeclareOption

WithQueueDeclareExclusive set queue declare exclusive option.

func WithQueueDeclareNoWait added in v1.5.6

func WithQueueDeclareNoWait(enable bool) QueueDeclareOption

WithQueueDeclareNoWait set queue declare no wait option.

type Subscriber added in v1.5.6

type Subscriber struct {
	*Consumer
}

Subscriber session

func NewSubscriber added in v1.5.6

func NewSubscriber(channelName string, identifier string, connection *Connection, opts ...ConsumerOption) (*Subscriber, error)

NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name

func (*Subscriber) Close added in v1.5.6

func (s *Subscriber) Close()

Close subscriber

func (*Subscriber) Subscribe added in v1.5.6

func (s *Subscriber) Subscribe(ctx context.Context, handler Handler)

Subscribe and handle message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL