rabbitmq

package
v1.1.46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2025 License: MIT Imports: 14 Imported by: 0

README

rabbitmq

rabbitmq library wrapped in github.com/rabbitmq/amqp091-go, supports automatic reconnection and customized setting parameters, includes direct, topic, fanout, headers, delayed message, publisher subscriber a total of six message types, and dead letter is supported.

Example of use

Code Example

The following code example is including direct, topic, fanout, headers, delayed message, publisher subscriber six message types.

Tip: the wrapped Consume function uses manual acknowledgement mode by default and does not need to call the ack function again.

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/18721889353/sunshine/pkg/logger"
	"github.com/18721889353/sunshine/pkg/rabbitmq"
)

var (
	producerCount int32
	consumerCount int32
)

func main() {
	url := "amqp://guest:guest@127.0.0.1:5672/"

	directExample(url)

	//topicExample(url)

	//fanoutExample(url)

	//headersExample(url)

	//delayedMessageExample(url)

	//publisherSubscriberExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue-1"
	routeKey := "direct-key-1"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishDirect(context.Background(), []byte("[direct] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side direct message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo"
	queueName := "topic-queue-1"
	routingKey := "key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			key := "key1.key2.key" + strconv.Itoa(i)
			err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side topic message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo"
	queueName := "fanout-queue-1"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side fanout message
	{
		queueName = "fanout-queue-1"
		c1 := runConsume(url, exchange, queueName, queueArgs)
		queueName = "fanout-queue-2"
		c2 := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		fmt.Println("\n\nconsumer 2 count:", c2.Count())
	}

	printStat()
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo"
	queueName := "headers-queue-1"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		for i := 1; i <= 100; i++ {
			headersKeys1 := headersKeys
			err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] key1 message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)

			// because of x-match: all, headersKeys2 will not match the same queue, so drop it
			headersKeys2 := map[string]interface{}{"foo": "bar"}
			err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message "+strconv.Itoa(i)))
			checkErr(err)
		}
	}

	// consumer-side headers message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo"
	queueName := "delayed-message-queue"
	routingKey := "delayed-key"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	var queueArgs map[string]interface{}
	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		for i := 1; i <= 100; i++ {
			err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side delayed message
	{
		c := runConsume(url, exchange, queueName, queueArgs)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c.Count()))
	}

	printStat()
}

func publisherSubscriberExample(url string) {
	channelName := "pub-sub"
	fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")

	// publisher-side message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewPublisher(channelName, connection)
		checkErr(err)
		defer p.Close()

		for i := 1; i <= 100; i++ {
			err = p.Publish(context.Background(), []byte("[pub-sub] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// subscriber-side message
	{
		identifier := "pub-sub-queue-1"
		s1 := runSubscriber(url, channelName, identifier)
		identifier = "pub-sub-queue-2"
		s2 := runSubscriber(url, channelName, identifier)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(s1.Count()))
		fmt.Println("\n\nsubscriber 2 count:", s2.Count())
	}

	printStat()
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
		rabbitmq.WithConsumerAutoAck(false),
		rabbitmq.WithConsumerQueueDeclareOptions(
			rabbitmq.WithQueueDeclareArgs(queueArgs),
		),
	)
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

func runSubscriber(url string, channelName string, identifier string) *rabbitmq.Subscriber {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	s, err := rabbitmq.NewSubscriber(channelName, identifier, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	s.Subscribe(context.Background(), handler)

	return s
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

func printStat() {
	fmt.Println("\n\n-------------------- stat --------------------")
	fmt.Println("producer count:", atomic.LoadInt32(&producerCount))
	fmt.Println("consumer count:", atomic.LoadInt32(&consumerCount))
	fmt.Println("----------------------------------------------\n")
	atomic.StoreInt32(&producerCount, 0)
	atomic.StoreInt32(&consumerCount, 0)
}

Example of Dead Letter

The following example code is in the direct, topic, fanout, headers, delayed message five message types to add a queue of dead letters, dead letter queue is fixed to direct type.

Tip: the wrapped Consume function uses manual acknowledgement mode by default.

package main

import (
	"context"
	"fmt"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/18721889353/sunshine/pkg/logger"
	"github.com/18721889353/sunshine/pkg/rabbitmq"
)

var (
	producerCount int32
	consumerCount int32

	deadLetterConsumerCount int32
)

func main() {
	url := "amqp://guest:guest@127.0.0.1:5672/"

	directExample(url)

	//topicExample(url)

	//fanoutExample(url)

	//headersExample(url)

	//delayedMessageExample(url)
}

func directExample(url string) {
	exchangeName := "direct-exchange-demo-2"
	queueName := "direct-queue-2"
	routingKey := "direct-key-2"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routingKey)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- direct --------------------\n")

	// producer-side direct message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs() // get producer queue args

		for i := 1; i <= 100; i++ {
			err = p.PublishDirect(context.Background(), []byte("[direct] say hello"+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side direct message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func topicExample(url string) {
	exchangeName := "topic-exchange-demo-2"
	queueName := "topic-queue-2"
	routingKey := "dl-key1.key2.*"
	exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- topic --------------------\n")

	// producer-side topic message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			key := "dl-key1.key2.key" + strconv.Itoa(i)
			err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side topic message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func fanoutExample(url string) {
	exchangeName := "fanout-exchange-demo-2"
	queueName := "fanout-queue-3"
	exchange := rabbitmq.NewFanoutExchange(exchangeName)
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-direct-key")
	fmt.Printf("\n\n-------------------- fanout --------------------\n")

	// producer-side fanout message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		for i := 1; i <= 100; i++ {
			err = p.PublishFanout(context.Background(), []byte("[fanout] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side fanout message
	{
		queueName = "fanout-queue-3"
		c1 := runConsume(url, exchange, queueName, queueArgs)
		queueName = "fanout-queue-4"
		c2 := runConsume(url, exchange, queueName, queueArgs)
		c3 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&consumerCount, int32(c2.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c3.Count()))
	}

	printStat()
}

func headersExample(url string) {
	exchangeName := "headers-exchange-demo-2"
	queueName := "headers-queue-2"
	headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
	exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-headers-key")
	fmt.Printf("\n\n-------------------- headers --------------------\n")

	// producer-side headers message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		for i := 1; i <= 100; i++ {
			headersKeys1 := headersKeys
			err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] message "+strconv.Itoa(i)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)

			// because of x-match: all, headersKeys2 will not match the same queue, so drop it
			headersKeys2 := map[string]interface{}{"foo": "bar"}
			err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] key2 message"))
			checkErr(err)
		}
	}

	// consumer-side headers message
	{
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 5)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func delayedMessageExample(url string) {
	exchangeName := "delayed-message-exchange-demo-2"
	queueName := "delayed-message-queue-2"
	routingKey := "delayed-key-2"
	exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
	queueArgs := map[string]interface{}{
		"x-max-length":  60,
		"x-message-ttl": 3000, // milliseconds
	}

	deadLetterQueueName := "dl-" + queueName
	deadLetterExchange := rabbitmq.NewDirectExchange("dl-"+exchangeName, "dl-"+routingKey)

	fmt.Printf("\n\n-------------------- delayed message --------------------\n")

	// producer-side delayed message
	{
		connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
		checkErr(err)
		defer connection.Close()

		p, err := rabbitmq.NewProducer(exchange, queueName, connection,
			// set queue args
			rabbitmq.WithProducerQueueDeclareOptions(
				rabbitmq.WithQueueDeclareArgs(queueArgs),
			),
			// add dead letter
			rabbitmq.WithDeadLetterOptions(
				rabbitmq.WithDeadLetter(deadLetterExchange.Name(), deadLetterQueueName, deadLetterExchange.RoutingKey()),
			),
		)
		checkErr(err)
		defer p.Close()
		queueArgs = p.QueueArgs()

		ctx := context.Background()
		datetimeLayout := "2006-01-02 15:04:05.000"
		for i := 1; i <= 200; i++ {
			delayTime := time.Second
			if i > 100 {
				delayTime = time.Second * 2
			}

			err = p.PublishDelayedMessage(ctx, delayTime, []byte("[delayed] message "+strconv.Itoa(i)+" at "+time.Now().Format(datetimeLayout)))
			checkErr(err)
			atomic.AddInt32(&producerCount, 1)
		}
	}

	// consumer-side delayed message
	{
		time.Sleep(time.Second * 3) // wait for all messages to be sent
		c1 := runConsume(url, exchange, queueName, queueArgs)
		c2 := runConsumeForDeadLetter(url, deadLetterExchange, deadLetterQueueName)

		<-time.After(time.Second * 10)
		atomic.AddInt32(&consumerCount, int32(c1.Count()))
		atomic.AddInt32(&deadLetterConsumerCount, int32(c2.Count()))
	}

	printStat()
}

func runConsume(url string, exchange *rabbitmq.Exchange, queueName string, queueArgs map[string]interface{}) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection,
		rabbitmq.WithConsumerAutoAck(false),
		rabbitmq.WithConsumerQueueDeclareOptions(
			rabbitmq.WithQueueDeclareArgs(queueArgs),
		),
	)
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

func runConsumeForDeadLetter(url string, exchange *rabbitmq.Exchange, queueName string) *rabbitmq.Consumer {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	checkErr(err)

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	checkErr(err)

	c.Consume(context.Background(), handler)
	return c
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

func checkErr(err error) {
	if err != nil {
		panic(err)
	}
}

func printStat() {
	fmt.Println("\n\n-------------------- stat --------------------")
	fmt.Println("producer count:", producerCount)
	fmt.Println("consumer count:", consumerCount)
	fmt.Println("dead letter consumer count:", deadLetterConsumerCount)
	fmt.Println("----------------------------------------------\n")
	atomic.StoreInt32(&producerCount, 0)
	atomic.StoreInt32(&consumerCount, 0)
	atomic.StoreInt32(&deadLetterConsumerCount, 0)
}

Example of Automatic Resumption of Publish

If the error of publish is caused by the network, you can check if the reconnection is successful and publish it again.

package main

import (
	"context"
	"errors"
	"strconv"
	"time"

	"github.com/18721889353/sunshine/pkg/logger"
	"github.com/18721889353/sunshine/pkg/rabbitmq"
)

var url = "amqp://guest:guest@127.0.0.1:5672/"

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Hour)
	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue"
	routeKey := "info"
	exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)

	err := runConsume(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runConsume failed", logger.Err(err))
		return
	}

	err = runProduce(ctx, exchange, queueName)
	if err != nil {
		logger.Error("runProduce failed", logger.Err(err))
		return
	}
}

func runProduce(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}
	defer connection.Close()

	p, err := rabbitmq.NewProducer(exchange, queueName, connection)
	if err != nil {
		return err
	}
	defer p.Close()

	count := 0
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			count++
			data := []byte("direct say hello" + strconv.Itoa(count))
			err = p.PublishDirect(ctx, data)
			if err != nil {
				if errors.Is(err, rabbitmq.ErrClosed) {
					for {
						if !connection.CheckConnected() { // check connection
							time.Sleep(time.Second * 2)
							continue
						}
						p, err = rabbitmq.NewProducer(exchange, queueName, connection)
						if err != nil {
							logger.Warn("reconnect failed", logger.Err(err))
							time.Sleep(time.Second * 2)
							continue
						}
						break
					}
				} else {
					logger.Warn("publish failed", logger.Err(err))
				}
			}
			logger.Info("publish message", logger.String("data", string(data)))
			time.Sleep(time.Second * 5)
		}
	}
}

func runConsume(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
	connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
	if err != nil {
		return err
	}

	c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
	if err != nil {
		return err
	}

	c.Consume(ctx, handler)

	return nil
}

var handler = func(ctx context.Context, data []byte, tagID string) error {
	logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
	return nil
}

Documentation

Overview

Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go

producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message. publisher and subscriber using the fanout message type.

Index

Constants

View Source
const DefaultURL = "amqp://guest:guest@localhost:5672/"

DefaultURL default rabbitmq url

Variables

View Source
var ErrClosed = amqp.ErrClosed

ErrClosed closed

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection rabbitmq connection

func NewConnection

func NewConnection(url string, opts ...ConnectionOption) (*Connection, error)

NewConnection rabbitmq connection

func (*Connection) CheckConnected

func (c *Connection) CheckConnected() bool

CheckConnected rabbitmq connection

func (*Connection) Close

func (c *Connection) Close()

Close rabbitmq connection

func (*Connection) GetConn added in v1.0.50

func (c *Connection) GetConn() *amqp.Connection

type ConnectionOption

type ConnectionOption func(*connectionOptions)

ConnectionOption connection option.

func WithLogger

func WithLogger(zapLog *zap.Logger) ConnectionOption

WithLogger set logger option.

func WithReconnectTime

func WithReconnectTime(d time.Duration) ConnectionOption

WithReconnectTime set reconnect time interval option.

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption

WithTLSConfig set tls config option.

type ConsumeOption

type ConsumeOption func(*consumeOptions)

ConsumeOption consume option.

func WithConsumeArgs

func WithConsumeArgs(args map[string]interface{}) ConsumeOption

WithConsumeArgs set consume args option.

func WithConsumeConsumer

func WithConsumeConsumer(consumer string) ConsumeOption

WithConsumeConsumer set consume consumer option.

func WithConsumeExclusive

func WithConsumeExclusive(enable bool) ConsumeOption

WithConsumeExclusive set consume exclusive option.

func WithConsumeNoLocal

func WithConsumeNoLocal(enable bool) ConsumeOption

WithConsumeNoLocal set consume noLocal option.

func WithConsumeNoWait

func WithConsumeNoWait(enable bool) ConsumeOption

WithConsumeNoWait set consume no wait option.

type Consumer

type Consumer struct {
	Exchange  *Exchange
	QueueName string
	// contains filtered or unexported fields
}

Consumer session

func NewConsumer

func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error)

NewConsumer create a consumer

func (*Consumer) Close

func (c *Consumer) Close()

Close consumer

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, handler Handler)

Consume messages for loop in goroutine

func (*Consumer) Count

func (c *Consumer) Count() int64

Count consumer success message number

func (*Consumer) DeadConsume added in v1.0.50

func (c *Consumer) DeadConsume(ctx context.Context, handler Handler)

DeadConsume messages for loop in goroutine

type ConsumerOption

type ConsumerOption func(*consumerOptions)

ConsumerOption consumer option.

func WithConsumerAutoAck

func WithConsumerAutoAck(enable bool) ConsumerOption

WithConsumerAutoAck set consumer auto ack option.

func WithConsumerConsumeOptions

func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption

WithConsumerConsumeOptions set consumer consume option.

func WithConsumerExchangeDeclareOptions

func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption

WithConsumerExchangeDeclareOptions set exchange declare option.

func WithConsumerPersistent

func WithConsumerPersistent(enable bool) ConsumerOption

WithConsumerPersistent set consumer persistent option.

func WithConsumerQosOptions

func WithConsumerQosOptions(opts ...QosOption) ConsumerOption

WithConsumerQosOptions set consume qos option.

func WithConsumerQueueBindOptions

func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption

WithConsumerQueueBindOptions set queue bind option.

func WithConsumerQueueDeclareOptions

func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption

WithConsumerQueueDeclareOptions set queue declare option.

type DeadLetterOption

type DeadLetterOption func(*deadLetterOptions)

DeadLetterOption declare dead letter option.

func WithDeadLetter

func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption

WithDeadLetter set dead letter exchange, queue, routing key.

func WithDeadLetterExchangeDeclareOptions

func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption

WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.

func WithDeadLetterQueueBindOptions

func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption

WithDeadLetterQueueBindOptions set dead letter queue bind option.

func WithDeadLetterQueueDeclareOptions

func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption

WithDeadLetterQueueDeclareOptions set dead letter queue declare option.

type DelayedMessagePublishOption

type DelayedMessagePublishOption func(*delayedMessagePublishOptions)

DelayedMessagePublishOption declare queue bind option.

func WithDelayedMessagePublishHeadersKeys

func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption

WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.

func WithDelayedMessagePublishTopicKey

func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption

WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange rabbitmq minimum management unit

func NewDelayedMessageExchange

func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange

NewDelayedMessageExchange create a delayed message exchange

func NewDirectExchange

func NewDirectExchange(exchangeName string, routingKey string) *Exchange

NewDirectExchange create a direct exchange

func NewFanoutExchange

func NewFanoutExchange(exchangeName string) *Exchange

NewFanoutExchange create a fanout exchange

func NewHeadersExchange

func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange

NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"

func NewTopicExchange

func NewTopicExchange(exchangeName string, routingKey string) *Exchange

NewTopicExchange create a topic exchange

func (*Exchange) DelayedMessageType

func (e *Exchange) DelayedMessageType() string

DelayedMessageType exchange delayed message type

func (*Exchange) HeadersKeys

func (e *Exchange) HeadersKeys() map[string]interface{}

HeadersKeys exchange headers keys

func (*Exchange) Name

func (e *Exchange) Name() string

Name exchange name

func (*Exchange) RoutingKey

func (e *Exchange) RoutingKey() string

RoutingKey exchange routing key

func (*Exchange) Type

func (e *Exchange) Type() string

Type exchange type

type ExchangeDeclareOption

type ExchangeDeclareOption func(*exchangeDeclareOptions)

ExchangeDeclareOption declare exchange option.

func WithExchangeDeclareArgs

func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption

WithExchangeDeclareArgs set exchange declare args option.

func WithExchangeDeclareAutoDelete

func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption

WithExchangeDeclareAutoDelete set exchange declare auto delete option.

func WithExchangeDeclareInternal

func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption

WithExchangeDeclareInternal set exchange declare internal option.

func WithExchangeDeclareNoWait

func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption

WithExchangeDeclareNoWait set exchange declare no wait option.

type Handler

type Handler func(ctx context.Context, data []byte, tagID string) error

Handler message

type HeadersType

type HeadersType = string

HeadersType headers type

const (

	// HeadersTypeAll all
	HeadersTypeAll HeadersType = "all"
	// HeadersTypeAny any
	HeadersTypeAny HeadersType = "any"
)

type Producer

type Producer struct {
	Exchange  *Exchange // exchange
	QueueName string    // queue name
	// contains filtered or unexported fields
}

Producer session

func NewProducer

func NewProducer(exchange *Exchange, queueName string, connection *Connection, opts ...ProducerOption) (*Producer, error)

NewProducer create a producer

func (*Producer) Close

func (p *Producer) Close()

Close the consumer

func (*Producer) ExchangeArgs

func (p *Producer) ExchangeArgs() amqp.Table

ExchangeArgs returns the exchange declare args.

func (*Producer) PublishDelayedMessage

func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, opts ...DelayedMessagePublishOption) error

PublishDelayedMessage send delayed type message

func (*Producer) PublishDirect

func (p *Producer) PublishDirect(ctx context.Context, body []byte) error

PublishDirect send direct type message

func (*Producer) PublishFanout

func (p *Producer) PublishFanout(ctx context.Context, body []byte) error

PublishFanout send fanout type message

func (*Producer) PublishHeaders

func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error

PublishHeaders send headers type message

func (*Producer) PublishTopic

func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error

PublishTopic send topic type message

func (*Producer) QueueArgs

func (p *Producer) QueueArgs() amqp.Table

QueueArgs returns the queue declare args.

func (*Producer) QueueBindArgs

func (p *Producer) QueueBindArgs() amqp.Table

QueueBindArgs returns the queue bind args.

type ProducerOption

type ProducerOption func(*producerOptions)

ProducerOption producer option.

func WithDeadLetterOptions

func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption

WithDeadLetterOptions set dead letter options.

func WithProducerExchangeDeclareOptions

func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption

WithProducerExchangeDeclareOptions set exchange declare option.

func WithProducerMandatory

func WithProducerMandatory(enable bool) ProducerOption

WithProducerMandatory set producer mandatory option.

func WithProducerPersistent

func WithProducerPersistent(enable bool) ProducerOption

WithProducerPersistent set producer persistent option.

func WithProducerQueueBindOptions

func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption

WithProducerQueueBindOptions set queue bind option.

func WithProducerQueueDeclareOptions

func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption

WithProducerQueueDeclareOptions set queue declare option.

type Publisher

type Publisher struct {
	*Producer
}

Publisher session

func NewPublisher

func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error)

NewPublisher create a publisher, channelName is exchange name

func (*Publisher) Close

func (p *Publisher) Close()

Close publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, body []byte) error

type QosOption

type QosOption func(*qosOptions)

QosOption qos option.

func WithQosEnable

func WithQosEnable() QosOption

WithQosEnable set qos enable option.

func WithQosPrefetchCount

func WithQosPrefetchCount(count int) QosOption

WithQosPrefetchCount set qos prefetch count option.

func WithQosPrefetchGlobal

func WithQosPrefetchGlobal(enable bool) QosOption

WithQosPrefetchGlobal set qos global option.

func WithQosPrefetchSize

func WithQosPrefetchSize(size int) QosOption

WithQosPrefetchSize set qos prefetch size option.

type QueueBindOption

type QueueBindOption func(*queueBindOptions)

QueueBindOption declare queue bind option.

func WithQueueBindArgs

func WithQueueBindArgs(args map[string]interface{}) QueueBindOption

WithQueueBindArgs set queue bind args option.

func WithQueueBindNoWait

func WithQueueBindNoWait(enable bool) QueueBindOption

WithQueueBindNoWait set queue bind no wait option.

type QueueDeclareOption

type QueueDeclareOption func(*queueDeclareOptions)

QueueDeclareOption declare queue option.

func WithQueueDeclareArgs

func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption

WithQueueDeclareArgs set queue declare args option.

func WithQueueDeclareAutoDelete

func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption

WithQueueDeclareAutoDelete set queue declare auto delete option.

func WithQueueDeclareExclusive

func WithQueueDeclareExclusive(enable bool) QueueDeclareOption

WithQueueDeclareExclusive set queue declare exclusive option.

func WithQueueDeclareNoWait

func WithQueueDeclareNoWait(enable bool) QueueDeclareOption

WithQueueDeclareNoWait set queue declare no wait option.

type Subscriber

type Subscriber struct {
	*Consumer
}

Subscriber session

func NewSubscriber

func NewSubscriber(channelName string, identifier string, connection *Connection, opts ...ConsumerOption) (*Subscriber, error)

NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name

func (*Subscriber) Close

func (s *Subscriber) Close()

Close subscriber

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, handler Handler)

Subscribe and handle message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL