rabbitmq

package
v1.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2024 License: MIT Imports: 11 Imported by: 0

README

rabbitmq

rabbitmq library wrapped in github.com/rabbitmq/amqp091-go, supports automatic reconnection and customized setting parameters, includes direct, topic, fanout, headers, delayed message, publisher subscriber a total of six message types, and dead letter is supported.

Example of use

Code Example

The following code example is including direct, topic, fanout, headers, delayed message, publisher subscriber six message types.

Tip: the wrapped Consume function uses manual acknowledgement mode by default and does not need to call the ack function again.

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/zhufuyi/sponge/pkg/logger"
	"github.com/zhufuyi/sponge/pkg/rabbitmq"
)

var (
	producerCount int32
	consumerCount int32
)

func main() {
	url := "amqp://guest:guest@127.0.0.1:5672/"

	directExample(url)

	//topicExample(url)

	//fanoutExample(url)

	//headersExample(url)

	//delayedMessageExample(url)

	//publisherSubscriberExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue-1"
	routeKey := "direct-key-1"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishDirect(context.Background(), []byte("[direct] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side direct message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo"
	queueName := "topic-queue-1"
	routingKey := "key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			key := "key1.key2.key" + strconv.Itoa(i)
			err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side topic message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo"
	queueName := "fanout-queue-1"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side fanout message
	{
		queueName = "fanout-queue-1"
		c1 := runConsume(url, exchange, queueName, queueArgs)
		queueName = "fanout-queue-2"
		c2 := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		fmt.Println("\n\nconsumer 2 count:", c2.Count())
	}

	printStat()
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo"
	queueName := "headers-queue-1"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		for i := 1; i <= 100; i++ {
			headersKeys1 := headersKeys
			err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] key1 message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)

			// because of x-match: all, headersKeys2 will not match the same queue, so drop it
			headersKeys2 := map[string]interface{}{"foo": "bar"}
			err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message "+strconv.Itoa(i)))
			checkErr(err)
		}
	}

	// consumer-side headers message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo"
	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		for i := 1; i <= 100; i++ {
			err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side delayed message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func publisherSubscriberExample(url string) {
	channelName := "pub-sub"
	fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")

	// publisher-side message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewPublisher(channelName, connection)
		checkErr(err)
		defer p.Close()

		for i := 1; i <= 100; i++ {
			err = p.Publish(context.Background(), []byte("[pub-sub] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// subscriber-side message
	{
		identifier := "pub-sub-queue-1"
		s1 := runSubscriber(url, channelName, identifier)
		identifier = "pub-sub-queue-2"
		s2 := runSubscriber(url, channelName, identifier)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(s1.Count()))
		fmt.Println("\n\nsubscriber 2 count:", s2.Count())
	}

	printStat()
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
		rabbitmq.WithConsumerAutoAck(false),
		rabbitmq.WithConsumerQueueDeclareOptions(
			rabbitmq.WithQueueDeclareArgs(queueArgs),
		),
	)
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

func runSubscriber(url string, channelName string, identifier string) *rabbitmq.Subscriber {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	s, err := rabbitmq.NewSubscriber(channelName, identifier, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	s.Subscribe(context.Background(), handler)

	return s
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

func printStat() {
	fmt.Println("\n\n-------------------- stat --------------------")
	fmt.Println("producer count:", atomic.LoadInt32(&producerCount))
	fmt.Println("consumer count:", atomic.LoadInt32(&consumerCount))
	fmt.Println("----------------------------------------------\n")
	atomic.StoreInt32(&producerCount, 0)
	atomic.StoreInt32(&consumerCount, 0)
}

Example of Dead Letter

The following example code is in the direct, topic, fanout, headers, delayed message five message types to add a queue of dead letters, dead letter queue is fixed to direct type.

Tip: the wrapped Consume function uses manual acknowledgement mode by default.

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/zhufuyi/sponge/pkg/logger"
	"github.com/zhufuyi/sponge/pkg/rabbitmq"
)

var (
	producerCount int32
	consumerCount int32

	deadLetterConsumerCount int32
)

func main() {
	url := "amqp://guest:guest@127.0.0.1:5672/"

	directExample(url)

	//topicExample(url)

	//fanoutExample(url)

	//headersExample(url)

	//delayedMessageExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo-2"
	queueName := "direct-queue-2"
	routingKey := "direct-key-2"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routingKey)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs() // get producer queue args

		for i := 1; i <= 100; i++ {
			err = p.PublishDirect(context.Background(), []byte("[direct] say hello"+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side direct message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo-2"
	queueName := "topic-queue-2"
	routingKey := "dl-key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			key := "dl-key1.key2.key" + strconv.Itoa(i)
			err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side topic message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo-2"
	queueName := "fanout-queue-3"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-direct-key")
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side fanout message
	{
		queueName = "fanout-queue-3"
		c1 := runConsume(url, exchange, queueName, queueArgs)
		queueName = "fanout-queue-4"
		c2 := runConsume(url, exchange, queueName, queueArgs)
		c3 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&consumerCount, int32(c2.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c3.Count()))
	}

	printStat()
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo-2"
	queueName := "headers-queue-2"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-headers-key")
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		for i := 1; i <= 100; i++ {
			headersKeys1 := headersKeys
			err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)

			// because of x-match: all, headersKeys2 will not match the same queue, so drop it
			headersKeys2 := map[string]interface{}{"foo": "bar"}
			err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message"))
			checkErr(err)
		}
	}

	// consumer-side headers message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo-2"
	queueName := "delayed-message-queue-2"
	routingKey := "delayed-key-2"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		for i := 1; i <= 200; i++ {
			delayTime := time.Second
			if i > 100 {
				delayTime = time.Second * 2
			}

			err = p.PublishDelayedMessage(ctx, delayTime, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side delayed message
	{
		time.Sleep(time.Second * 3) // wait for all messages to be sent
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 10)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
		rabbitmq.WithConsumerAutoAck(false),
		rabbitmq.WithConsumerQueueDeclareOptions(
			rabbitmq.WithQueueDeclareArgs(queueArgs),
		),
	)
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

func runConsumeForDeadLetter(url string, exchange *rabbitmq.Exchange, queueName string) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

func printStat() {
	fmt.Println("\n\n-------------------- stat --------------------")
	fmt.Println("producer count:", producerCount)
	fmt.Println("consumer count:", consumerCount)
	fmt.Println("dead letter consumer count:", deadLetterConsumerCount)
	fmt.Println("----------------------------------------------\n")
	atomic.StoreInt32(&producerCount, 0)
	atomic.StoreInt32(&consumerCount, 0)
	atomic.StoreInt32(&deadLetterConsumerCount, 0)
}

Example of Automatic Resumption of Publish

If the error of publish is caused by the network, you can check if the reconnection is successful and publish it again.

package main

import (
	"context"
	"errors"
	"strconv"
	"time"

	"github.com/zhufuyi/sponge/pkg/logger"
	"github.com/zhufuyi/sponge/pkg/rabbitmq"
)

var url = "amqp://guest:guest@127.0.0.1:5672/"

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Hour)
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue"
	routeKey := "info"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)

	err := runConsume(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runConsume failed", logger.Err(err))
		return
	}

	err = runProduce(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runProduce failed", logger.Err(err))
		return
	}
}

func runProduce(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}
	defer connection.Close()

	p, err := rabbitmq.NewProducer(exchange, queueName, connection)
	if err != nil {
		return err
	}
	defer p.Close()

	count := 0
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			count++
			data := []byte("direct say hello" + strconv.Itoa(count))
			err = p.PublishDirect(ctx, data)
			if err != nil {
				if errors.Is(err, rabbitmq.ErrClosed) {
					for {
						if !connection.CheckConnected() { // check connection
							time.Sleep(time.Second * 2)
							continue
						}
						p, err = rabbitmq.NewProducer(exchange, queueName, connection)
						if err != nil {
							logger.Warn("reconnect failed", logger.Err(err))
							time.Sleep(time.Second * 2)
							continue
						}
						break
					}
				} else {
					logger.Warn("publish failed", logger.Err(err))
				}
			}
			logger.Info("publish message", logger.String("data", string(data)))
			time.Sleep(time.Second * 5)
		}
	}
}

func runConsume(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	if err != nil {
		return err
	}

	c.Consume(ctx, handler)

	return nil
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

Documentation

Overview

Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go

producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message. publisher and subscriber using the fanout message type.

Index

Constants

View Source
const DefaultURL = "amqp://guest:guest@localhost:5672/"

DefaultURL default rabbitmq url

Variables

View Source
var ErrClosed = amqp.ErrClosed

ErrClosed closed

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection rabbitmq connection

func NewConnection

func NewConnection(url string, opts ...ConnectionOption) (*Connection, error)

NewConnection rabbitmq connection

func (*Connection) CheckConnected

func (c *Connection) CheckConnected() bool

CheckConnected rabbitmq connection

func (*Connection) Close

func (c *Connection) Close()

Close rabbitmq connection

type ConnectionOption

type ConnectionOption func(*connectionOptions)

ConnectionOption connection option.

func WithLogger

func WithLogger(zapLog *zap.Logger) ConnectionOption

WithLogger set logger option.

func WithReconnectTime

func WithReconnectTime(d time.Duration) ConnectionOption

WithReconnectTime set reconnect time interval option.

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption

WithTLSConfig set tls config option.

type ConsumeOption added in v1.5.6

type ConsumeOption func(*consumeOptions)

ConsumeOption consume option.

func WithConsumeArgs added in v1.5.6

func WithConsumeArgs(args map[string]interface{}) ConsumeOption

WithConsumeArgs set consume args option.

func WithConsumeConsumer added in v1.5.6

func WithConsumeConsumer(consumer string) ConsumeOption

WithConsumeConsumer set consume consumer option.

func WithConsumeExclusive added in v1.5.6

func WithConsumeExclusive(enable bool) ConsumeOption

WithConsumeExclusive set consume exclusive option.

func WithConsumeNoLocal added in v1.5.6

func WithConsumeNoLocal(enable bool) ConsumeOption

WithConsumeNoLocal set consume noLocal option.

func WithConsumeNoWait added in v1.5.6

func WithConsumeNoWait(enable bool) ConsumeOption

WithConsumeNoWait set consume no wait option.

type Consumer added in v1.5.6

type Consumer struct {
	Exchange  *Exchange
	QueueName string
	// contains filtered or unexported fields
}

Consumer session

func NewConsumer added in v1.5.6

func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error)

NewConsumer create a consumer

func (*Consumer) Close added in v1.5.6

func (c *Consumer) Close()

Close consumer

func (*Consumer) Consume added in v1.5.6

func (c *Consumer) Consume(ctx context.Context, handler Handler)

Consume messages for loop in goroutine

func (*Consumer) Count added in v1.8.2

func (c *Consumer) Count() int64

Count consumer success message number

type ConsumerOption added in v1.5.6

type ConsumerOption func(*consumerOptions)

ConsumerOption consumer option.

func WithConsumerAutoAck added in v1.5.6

func WithConsumerAutoAck(enable bool) ConsumerOption

WithConsumerAutoAck set consumer auto ack option, if false, manual ACK required.

func WithConsumerConsumeOptions added in v1.5.6

func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption

WithConsumerConsumeOptions set consumer consume option.

func WithConsumerExchangeDeclareOptions added in v1.5.6

func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption

WithConsumerExchangeDeclareOptions set exchange declare option.

func WithConsumerPersistent added in v1.5.6

func WithConsumerPersistent(enable bool) ConsumerOption

WithConsumerPersistent set consumer persistent option.

func WithConsumerQosOptions added in v1.5.6

func WithConsumerQosOptions(opts ...QosOption) ConsumerOption

WithConsumerQosOptions set consume qos option.

func WithConsumerQueueBindOptions added in v1.5.6

func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption

WithConsumerQueueBindOptions set queue bind option.

func WithConsumerQueueDeclareOptions added in v1.5.6

func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption

WithConsumerQueueDeclareOptions set queue declare option.

type DeadLetterOption added in v1.8.2

type DeadLetterOption func(*deadLetterOptions)

DeadLetterOption declare dead letter option.

func WithDeadLetter added in v1.8.2

func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption

WithDeadLetter set dead letter exchange, queue, routing key.

func WithDeadLetterExchangeDeclareOptions added in v1.8.2

func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption

WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.

func WithDeadLetterQueueBindOptions added in v1.8.2

func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption

WithDeadLetterQueueBindOptions set dead letter queue bind option.

func WithDeadLetterQueueDeclareOptions added in v1.8.2

func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption

WithDeadLetterQueueDeclareOptions set dead letter queue declare option.

type DelayedMessagePublishOption added in v1.5.6

type DelayedMessagePublishOption func(*delayedMessagePublishOptions)

DelayedMessagePublishOption declare queue bind option.

func WithDelayedMessagePublishHeadersKeys added in v1.5.6

func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption

WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.

func WithDelayedMessagePublishTopicKey added in v1.5.6

func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption

WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.

type Exchange added in v1.5.6

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange rabbitmq minimum management unit

func NewDelayedMessageExchange added in v1.5.6

func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange

NewDelayedMessageExchange create a delayed message exchange

func NewDirectExchange added in v1.5.6

func NewDirectExchange(exchangeName string, routingKey string) *Exchange

NewDirectExchange create a direct exchange

func NewFanoutExchange added in v1.5.6

func NewFanoutExchange(exchangeName string) *Exchange

NewFanoutExchange create a fanout exchange

func NewHeadersExchange added in v1.5.6

func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange

NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"

func NewTopicExchange added in v1.5.6

func NewTopicExchange(exchangeName string, routingKey string) *Exchange

NewTopicExchange create a topic exchange

func (*Exchange) DelayedMessageType added in v1.5.6

func (e *Exchange) DelayedMessageType() string

DelayedMessageType exchange delayed message type

func (*Exchange) HeadersKeys added in v1.5.6

func (e *Exchange) HeadersKeys() map[string]interface{}

HeadersKeys exchange headers keys

func (*Exchange) Name added in v1.5.6

func (e *Exchange) Name() string

Name exchange name

func (*Exchange) RoutingKey added in v1.5.6

func (e *Exchange) RoutingKey() string

RoutingKey exchange routing key

func (*Exchange) Type added in v1.5.6

func (e *Exchange) Type() string

Type exchange type

type ExchangeDeclareOption added in v1.5.6

type ExchangeDeclareOption func(*exchangeDeclareOptions)

ExchangeDeclareOption declare exchange option.

func WithExchangeDeclareArgs added in v1.5.6

func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption

WithExchangeDeclareArgs set exchange declare args option.

func WithExchangeDeclareAutoDelete added in v1.5.6

func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption

WithExchangeDeclareAutoDelete set exchange declare auto delete option.

func WithExchangeDeclareInternal added in v1.5.6

func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption

WithExchangeDeclareInternal set exchange declare internal option.

func WithExchangeDeclareNoWait added in v1.5.6

func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption

WithExchangeDeclareNoWait set exchange declare no wait option.

type Handler added in v1.5.6

type Handler func(ctx context.Context, data []byte, tagID string) error

Handler message

type HeadersType added in v1.5.6

type HeadersType = string

HeadersType headers type

const (

	// HeadersTypeAll all
	HeadersTypeAll HeadersType = "all"
	// HeadersTypeAny any
	HeadersTypeAny HeadersType = "any"
)

type Producer added in v1.5.6

type Producer struct {
	Exchange  *Exchange // exchange
	QueueName string    // queue name
	// contains filtered or unexported fields
}

Producer session

func NewProducer added in v1.5.6

func NewProducer(exchange *Exchange, queueName string, connection *Connection, opts ...ProducerOption) (*Producer, error)

NewProducer create a producer

func (*Producer) Close added in v1.5.6

func (p *Producer) Close()

Close the consumer

func (*Producer) ExchangeArgs added in v1.8.2

func (p *Producer) ExchangeArgs() amqp.Table

ExchangeArgs returns the exchange declare args.

func (*Producer) PublishDelayedMessage added in v1.5.6

func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, opts ...DelayedMessagePublishOption) error

PublishDelayedMessage send delayed type message

func (*Producer) PublishDirect added in v1.5.6

func (p *Producer) PublishDirect(ctx context.Context, body []byte) error

PublishDirect send direct type message

func (*Producer) PublishFanout added in v1.5.6

func (p *Producer) PublishFanout(ctx context.Context, body []byte) error

PublishFanout send fanout type message

func (*Producer) PublishHeaders added in v1.5.6

func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error

PublishHeaders send headers type message

func (*Producer) PublishTopic added in v1.5.6

func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error

PublishTopic send topic type message

func (*Producer) QueueArgs added in v1.8.2

func (p *Producer) QueueArgs() amqp.Table

QueueArgs returns the queue declare args.

func (*Producer) QueueBindArgs added in v1.8.2

func (p *Producer) QueueBindArgs() amqp.Table

QueueBindArgs returns the queue bind args.

type ProducerOption added in v1.5.6

type ProducerOption func(*producerOptions)

ProducerOption producer option.

func WithDeadLetterOptions added in v1.8.2

func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption

WithDeadLetterOptions set dead letter options.

func WithProducerExchangeDeclareOptions added in v1.5.6

func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption

WithProducerExchangeDeclareOptions set exchange declare option.

func WithProducerMandatory added in v1.5.6

func WithProducerMandatory(enable bool) ProducerOption

WithProducerMandatory set producer mandatory option.

func WithProducerPersistent added in v1.5.6

func WithProducerPersistent(enable bool) ProducerOption

WithProducerPersistent set producer persistent option.

func WithProducerQueueBindOptions added in v1.5.6

func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption

WithProducerQueueBindOptions set queue bind option.

func WithProducerQueueDeclareOptions added in v1.5.6

func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption

WithProducerQueueDeclareOptions set queue declare option.

type Publisher added in v1.5.6

type Publisher struct {
	*Producer
}

Publisher session

func NewPublisher added in v1.5.6

func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error)

NewPublisher create a publisher, channelName is exchange name

func (*Publisher) Close added in v1.5.6

func (p *Publisher) Close()

Close publisher

func (*Publisher) Publish added in v1.5.6

func (p *Publisher) Publish(ctx context.Context, body []byte) error

type QosOption added in v1.5.6

type QosOption func(*qosOptions)

QosOption qos option.

func WithQosEnable added in v1.5.6

func WithQosEnable() QosOption

WithQosEnable set qos enable option.

func WithQosPrefetchCount added in v1.5.6

func WithQosPrefetchCount(count int) QosOption

WithQosPrefetchCount set qos prefetch count option.

func WithQosPrefetchGlobal added in v1.5.6

func WithQosPrefetchGlobal(enable bool) QosOption

WithQosPrefetchGlobal set qos global option.

func WithQosPrefetchSize added in v1.5.6

func WithQosPrefetchSize(size int) QosOption

WithQosPrefetchSize set qos prefetch size option.

type QueueBindOption added in v1.5.6

type QueueBindOption func(*queueBindOptions)

QueueBindOption declare queue bind option.

func WithQueueBindArgs added in v1.5.6

func WithQueueBindArgs(args map[string]interface{}) QueueBindOption

WithQueueBindArgs set queue bind args option.

func WithQueueBindNoWait added in v1.5.6

func WithQueueBindNoWait(enable bool) QueueBindOption

WithQueueBindNoWait set queue bind no wait option.

type QueueDeclareOption added in v1.5.6

type QueueDeclareOption func(*queueDeclareOptions)

QueueDeclareOption declare queue option.

func WithQueueDeclareArgs added in v1.5.6

func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption

WithQueueDeclareArgs set queue declare args option.

func WithQueueDeclareAutoDelete added in v1.5.6

func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption

WithQueueDeclareAutoDelete set queue declare auto delete option.

func WithQueueDeclareExclusive added in v1.5.6

func WithQueueDeclareExclusive(enable bool) QueueDeclareOption

WithQueueDeclareExclusive set queue declare exclusive option.

func WithQueueDeclareNoWait added in v1.5.6

func WithQueueDeclareNoWait(enable bool) QueueDeclareOption

WithQueueDeclareNoWait set queue declare no wait option.

type Subscriber added in v1.5.6

type Subscriber struct {
	*Consumer
}

Subscriber session

func NewSubscriber added in v1.5.6

func NewSubscriber(channelName string, identifier string, connection *Connection, opts ...ConsumerOption) (*Subscriber, error)

NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name

func (*Subscriber) Close added in v1.5.6

func (s *Subscriber) Close()

Close subscriber

func (*Subscriber) Subscribe added in v1.5.6

func (s *Subscriber) Subscribe(ctx context.Context, handler Handler)

Subscribe and handle message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL