rabbitmq

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2024 License: MIT Imports: 10 Imported by: 0

README

rabbitmq

rabbitmq library wrapped in github.com/rabbitmq/amqp091-go, supports automatic reconnection and customized setting parameters, includes direct, topic, fanout, headers, delayed message, publisher subscriber a total of six message types.

Example of use

Code Example

The code example includes direct, topic, fanout, headers, delayed message, publisher subscriber a total of six message types.

package main

import (
	"context"
	"fmt"
	"time"

	"gitee.com/yzsunjianguo/common_pkg/logger"
	"gitee.com/yzsunjianguo/common_pkg/rabbitmq"
)

func main() {
	url := "amqp://guest:guest@127.0.0.1:5672/"

	directExample(url)

	topicExample(url)

	fanoutExample(url)

	headersExample(url)

	delayedMessageExample(url)

	publisherSubscriberExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue-1"
	routeKey := "direct-key-1"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		err = p.PublishDirect(context.Background(), []byte("[direct] say hello"))
		checkErr(err)
	}()

	// consumer-side direct message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo"
	queueName := "topic-queue-1"
	routingKey := "key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		key := "key1.key2.key3"
		err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" say hello"))
		checkErr(err)
	}()

	// consumer-side topic message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo"
	queueName := "fanout-queue-1"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		err = p.PublishFanout(context.Background(), []byte("[fanout] say hello"))
		checkErr(err)
	}()

	// consumer-side fanout message
	func() {
		runConsume(url, exchange, queueName)
		queueName = "fanout-queue-2"
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo"
	queueName := "headers-queue-1"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		ctx := context.Background()
		headersKeys1 := headersKeys
		err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] say hello 1"))
		checkErr(err)
		headersKeys2 := map[string]interface{}{"foo": "bar"}
		err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] say hello 2"))
		checkErr(err)
	}()

	// consumer-side headers message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second)
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo"
	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed message] say hello "+time.Now().Format(datetimeLayout)))
		checkErr(err)
	}()

	// consumer-side delayed message
	func() {
		runConsume(url, exchange, queueName)
	}()

	<-time.After(time.Second * 4)
}

func publisherSubscriberExample(url string) {
	channelName := "pub-sub"
	fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")

	// publisher-side message
	func() {
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewPublisher(channelName, connection)
		checkErr(err)
		defer p.Close()

		err = p.Publish(context.Background(), []byte("[pub-sub] say hello"))
		checkErr(err)
	}()

	// subscriber-side message
	func() {
		identifier := "pub-sub-queue-1"
		runSubscriber(url, channelName, identifier)
		identifier = "pub-sub-queue-2"
		runSubscriber(url, channelName, identifier)
	}()

	<-time.After(time.Second)
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string) {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	c.Consume(context.Background(), handler)
}

func runSubscriber(url string, channelName string, identifier string) {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	s, err := rabbitmq.NewSubscriber(channelName, identifier, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	s.Subscribe(context.Background(), handler)
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

Example of Automatic Resumption of Publish

If the error of publish is caused by the network, you can check if the reconnection is successful and publish it again.

package main

import (
	"context"
	"errors"
	"strconv"
	"time"

	"gitee.com/yzsunjianguo/common_pkg/logger"
	"gitee.com/yzsunjianguo/common_pkg/rabbitmq"
)

var url = "amqp://guest:guest@127.0.0.1:5672/"

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Hour)
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue"
	routeKey := "info"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)

	err := runConsume(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runConsume failed", logger.Err(err))
		return
	}

	err = runProduce(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runProduce failed", logger.Err(err))
		return
	}
}

func runProduce(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}
	defer connection.Close()

	p, err := rabbitmq.NewProducer(exchange, queueName, connection)
	if err != nil {
		return err
	}
	defer p.Close()

	count := 0
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			count++
			data := []byte("direct say hello" + strconv.Itoa(count))
			err = p.PublishDirect(ctx, data)
			if err != nil {
				if errors.Is(err, rabbitmq.ErrClosed) {
					for {
						if !connection.CheckConnected() { // check connection
							time.Sleep(time.Second * 2)
							continue
						}
						p, err = rabbitmq.NewProducer(exchange, queueName, connection)
						if err != nil {
							logger.Warn("reconnect failed", logger.Err(err))
							time.Sleep(time.Second * 2)
							continue
						}
						break
					}
				} else {
					logger.Warn("publish failed", logger.Err(err))
				}
			}
			logger.Info("publish message", logger.String("data", string(data)))
			time.Sleep(time.Second * 5)
		}
	}
}

func runConsume(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	if err != nil {
		return err
	}

	c.Consume(ctx, handler)

	return nil
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

Documentation

Overview

Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go

producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message. publisher and subscriber using the fanout message type.

Index

Constants

View Source
const DefaultURL = "amqp://guest:guest@localhost:5672/"

DefaultURL default rabbitmq url

Variables

View Source
var ErrClosed = amqp.ErrClosed

ErrClosed closed

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection rabbitmq connection

func NewConnection

func NewConnection(url string, opts ...ConnectionOption) (*Connection, error)

NewConnection rabbitmq connection

func (*Connection) CheckConnected

func (c *Connection) CheckConnected() bool

CheckConnected rabbitmq connection

func (*Connection) Close

func (c *Connection) Close()

Close rabbitmq connection

type ConnectionOption

type ConnectionOption func(*connectionOptions)

ConnectionOption connection option.

func WithLogger

func WithLogger(zapLog *zap.Logger) ConnectionOption

WithLogger set logger option.

func WithReconnectTime

func WithReconnectTime(d time.Duration) ConnectionOption

WithReconnectTime set reconnect time interval option.

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption

WithTLSConfig set tls config option.

type ConsumeOption

type ConsumeOption func(*consumeOptions)

ConsumeOption consume option.

func WithConsumeArgs

func WithConsumeArgs(args map[string]interface{}) ConsumeOption

WithConsumeArgs set consume args option.

func WithConsumeConsumer

func WithConsumeConsumer(consumer string) ConsumeOption

WithConsumeConsumer set consume consumer option.

func WithConsumeExclusive

func WithConsumeExclusive(enable bool) ConsumeOption

WithConsumeExclusive set consume exclusive option.

func WithConsumeNoLocal

func WithConsumeNoLocal(enable bool) ConsumeOption

WithConsumeNoLocal set consume noLocal option.

func WithConsumeNoWait

func WithConsumeNoWait(enable bool) ConsumeOption

WithConsumeNoWait set consume no wait option.

type Consumer

type Consumer struct {
	Exchange  *Exchange
	QueueName string
	// contains filtered or unexported fields
}

Consumer session

func NewConsumer

func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error)

NewConsumer create a consumer

func (*Consumer) Close

func (c *Consumer) Close()

Close consumer

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, handler Handler)

Consume messages for loop in goroutine

type ConsumerOption

type ConsumerOption func(*consumerOptions)

ConsumerOption consumer option.

func WithConsumerAutoAck

func WithConsumerAutoAck(enable bool) ConsumerOption

WithConsumerAutoAck set consumer auto ack option.

func WithConsumerConsumeOptions

func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption

WithConsumerConsumeOptions set consumer consume option.

func WithConsumerExchangeDeclareOptions

func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption

WithConsumerExchangeDeclareOptions set exchange declare option.

func WithConsumerPersistent

func WithConsumerPersistent(enable bool) ConsumerOption

WithConsumerPersistent set consumer persistent option.

func WithConsumerQosOptions

func WithConsumerQosOptions(opts ...QosOption) ConsumerOption

WithConsumerQosOptions set consume qos option.

func WithConsumerQueueBindOptions

func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption

WithConsumerQueueBindOptions set queue bind option.

func WithConsumerQueueDeclareOptions

func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption

WithConsumerQueueDeclareOptions set queue declare option.

type DelayedMessagePublishOption

type DelayedMessagePublishOption func(*delayedMessagePublishOptions)

DelayedMessagePublishOption declare queue bind option.

func WithDelayedMessagePublishHeadersKeys

func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption

WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.

func WithDelayedMessagePublishTopicKey

func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption

WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange rabbitmq minimum management unit

func NewDelayedMessageExchange

func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange

NewDelayedMessageExchange create a delayed message exchange

func NewDirectExchange

func NewDirectExchange(exchangeName string, routingKey string) *Exchange

NewDirectExchange create a direct exchange

func NewFanoutExchange

func NewFanoutExchange(exchangeName string) *Exchange

NewFanoutExchange create a fanout exchange

func NewHeadersExchange

func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange

NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"

func NewTopicExchange

func NewTopicExchange(exchangeName string, routingKey string) *Exchange

NewTopicExchange create a topic exchange

func (*Exchange) DelayedMessageType

func (e *Exchange) DelayedMessageType() string

DelayedMessageType exchange delayed message type

func (*Exchange) HeadersKeys

func (e *Exchange) HeadersKeys() map[string]interface{}

HeadersKeys exchange headers keys

func (*Exchange) Name

func (e *Exchange) Name() string

Name exchange name

func (*Exchange) RoutingKey

func (e *Exchange) RoutingKey() string

RoutingKey exchange routing key

func (*Exchange) Type

func (e *Exchange) Type() string

Type exchange type

type ExchangeDeclareOption

type ExchangeDeclareOption func(*exchangeDeclareOptions)

ExchangeDeclareOption declare exchange option.

func WithExchangeDeclareArgs

func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption

WithExchangeDeclareArgs set exchange declare args option.

func WithExchangeDeclareAutoDelete

func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption

WithExchangeDeclareAutoDelete set exchange declare auto delete option.

func WithExchangeDeclareInternal

func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption

WithExchangeDeclareInternal set exchange declare internal option.

func WithExchangeDeclareNoWait

func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption

WithExchangeDeclareNoWait set exchange declare no wait option.

type Handler

type Handler func(ctx context.Context, data []byte, tagID string) error

Handler message

type HeadersType

type HeadersType = string

HeadersType headers type

const (

	// HeadersTypeAll all
	HeadersTypeAll HeadersType = "all"
	// HeadersTypeAny any
	HeadersTypeAny HeadersType = "any"
)

type Producer

type Producer struct {
	Exchange  *Exchange // exchange
	QueueName string    // queue name
	// contains filtered or unexported fields
}

Producer session

func NewProducer

func NewProducer(exchange *Exchange, queueName string, connection *Connection, opts ...ProducerOption) (*Producer, error)

NewProducer create a producer

func (*Producer) Close

func (p *Producer) Close()

Close the consumer

func (*Producer) PublishDelayedMessage

func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, opts ...DelayedMessagePublishOption) error

PublishDelayedMessage send delayed type message

func (*Producer) PublishDirect

func (p *Producer) PublishDirect(ctx context.Context, body []byte) error

PublishDirect send direct type message

func (*Producer) PublishFanout

func (p *Producer) PublishFanout(ctx context.Context, body []byte) error

PublishFanout send fanout type message

func (*Producer) PublishHeaders

func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error

PublishHeaders send headers type message

func (*Producer) PublishTopic

func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error

PublishTopic send topic type message

type ProducerOption

type ProducerOption func(*producerOptions)

ProducerOption producer option.

func WithProducerExchangeDeclareOptions

func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption

WithProducerExchangeDeclareOptions set exchange declare option.

func WithProducerMandatory

func WithProducerMandatory(enable bool) ProducerOption

WithProducerMandatory set producer mandatory option.

func WithProducerPersistent

func WithProducerPersistent(enable bool) ProducerOption

WithProducerPersistent set producer persistent option.

func WithProducerQueueBindOptions

func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption

WithProducerQueueBindOptions set queue bind option.

func WithProducerQueueDeclareOptions

func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption

WithProducerQueueDeclareOptions set queue declare option.

type Publisher

type Publisher struct {
	*Producer
}

Publisher session

func NewPublisher

func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error)

NewPublisher create a publisher, channelName is exchange name

func (*Publisher) Close

func (p *Publisher) Close()

Close publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, body []byte) error

type QosOption

type QosOption func(*qosOptions)

QosOption qos option.

func WithQosEnable

func WithQosEnable() QosOption

WithQosEnable set qos enable option.

func WithQosPrefetchCount

func WithQosPrefetchCount(count int) QosOption

WithQosPrefetchCount set qos prefetch count option.

func WithQosPrefetchGlobal

func WithQosPrefetchGlobal(enable bool) QosOption

WithQosPrefetchGlobal set qos global option.

func WithQosPrefetchSize

func WithQosPrefetchSize(size int) QosOption

WithQosPrefetchSize set qos prefetch size option.

type QueueBindOption

type QueueBindOption func(*queueBindOptions)

QueueBindOption declare queue bind option.

func WithQueueBindArgs

func WithQueueBindArgs(args map[string]interface{}) QueueBindOption

WithQueueBindArgs set queue bind args option.

func WithQueueBindNoWait

func WithQueueBindNoWait(enable bool) QueueBindOption

WithQueueBindNoWait set queue bind no wait option.

type QueueDeclareOption

type QueueDeclareOption func(*queueDeclareOptions)

QueueDeclareOption declare queue option.

func WithQueueDeclareArgs

func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption

WithQueueDeclareArgs set queue declare args option.

func WithQueueDeclareAutoDelete

func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption

WithQueueDeclareAutoDelete set queue declare auto delete option.

func WithQueueDeclareExclusive

func WithQueueDeclareExclusive(enable bool) QueueDeclareOption

WithQueueDeclareExclusive set queue declare exclusive option.

func WithQueueDeclareNoWait

func WithQueueDeclareNoWait(enable bool) QueueDeclareOption

WithQueueDeclareNoWait set queue declare no wait option.

type Subscriber

type Subscriber struct {
	*Consumer
}

Subscriber session

func NewSubscriber

func NewSubscriber(channelName string, identifier string, connection *Connection, opts ...ConsumerOption) (*Subscriber, error)

NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name

func (*Subscriber) Close

func (s *Subscriber) Close()

Close subscriber

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, handler Handler)

Subscribe and handle message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL