rabbitmq

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2023 License: MIT Imports: 8 Imported by: 0

README

rabbitmq

rabbitmq library wrapped in github.com/rabbitmq/amqp091-go, supports automatic reconnection and customized setting of queue parameters.

Example of use

Consumer code

This is a consumer code example common to the four types direct, topic, fanout, and headers.

package main

import (
	"context"
	"strings"

	"github.com/liuzw3018/sponge/pkg/logger"
	"github.com/liuzw3018/sponge/pkg/rabbitmq"
	"github.com/liuzw3018/sponge/pkg/rabbitmq/consumer"
)

var handler = func(ctx context.Context, data []byte, tag ...string) error {
	tagID := strings.Join(tag, ",")
	logger.Infof("tagID=%s, receive message: %s", tagID, string(data))
	return nil
}

func main() {
	url := rabbitmq.DefaultURL
	c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
	if err != nil {
		logger.Error("NewConnection err",logger.Err(err))
		return
	}
	defer c.Close()

	queue, err := consumer.NewQueue(context.Background(), "yourQueueName", c, consumer.WithConsumeAutoAck(false)) // here you can set the consume parameter
	if err != nil {
		logger.Error("NewQueue err",logger.Err(err))
		return
	}

	queue.Consume(handler)

	exit := make(chan struct{})
	<-exit
}    

Direct Type Code
package main

import (
	"context"

	"github.com/liuzw3018/sponge/pkg/logger"
	"github.com/liuzw3018/sponge/pkg/rabbitmq"
	"github.com/liuzw3018/sponge/pkg/rabbitmq/producer"
)

func main() {
	url := rabbitmq.DefaultURL
	c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
	if err != nil {
		logger.Error("NewConnection err",logger.Err(err))
		return
	}
	defer c.Close()

	exchangeName := "direct-exchange-demo"
	queueName := "direct-queue-1"
	routeKey := "direct-key-1"
	exchange := producer.NewDirectExchange(exchangeName, routeKey)
	q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
	if err != nil {
		logger.Error("NewQueue err",logger.Err(err))
		return
	}
	defer q.Close()

	err = q.Publish(context.Background(), []byte(routeKey+" say hello"))
	if err != nil {
		logger.Error("Publish err",logger.Err(err))
		return
	}
}    

Topic Type Code
package main

import (
	"context"

	"github.com/liuzw3018/sponge/pkg/logger"
	"github.com/liuzw3018/sponge/pkg/rabbitmq"
	"github.com/liuzw3018/sponge/pkg/rabbitmq/producer"
)

func main() {
	url := rabbitmq.DefaultURL
	c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
	if err != nil {
		logger.Error("NewConnection err",logger.Err(err))
		return
	}
	defer c.Close()

	exchangeName := "topic-exchange-demo"
	queueName := "topic-queue-1"
	routingKey := "key1.key2.*"
	exchange := producer.NewTopicExchange(exchangeName, routingKey)
	q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
	if err != nil {
		logger.Error("NewQueue err",logger.Err(err))
		return
	}
	defer q.Close()

	key:="key1.key2.key3"
	err = q.PublishTopic(context.Background(), key, []byte(key+" say hello "))
	if err != nil {
		logger.Error("PublishTopic err",logger.Err(err))
		return
	}
}    

Fanout Type Code
package main

import (
	"context"

	"github.com/liuzw3018/sponge/pkg/logger"
	"github.com/liuzw3018/sponge/pkg/rabbitmq"
	"github.com/liuzw3018/sponge/pkg/rabbitmq/producer"
)

func main() {
	url := rabbitmq.DefaultURL
	c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
	if err != nil {
		logger.Error("NewConnection err",logger.Err(err))
		return
	}
	defer c.Close()
	
	exchangeName := "fanout-exchange-demo"
	queueName := "fanout-queue-1"
	exchange := producer.NewFanOutExchange(exchangeName)
	q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
	if err != nil {
		logger.Error("NewQueue err",logger.Err(err))
		return
	}
	defer q.Close()

	err = q.Publish(context.Background(), []byte("say hello"))
	if err != nil {
		logger.Error("Publish err",logger.Err(err))
		return
	}
}    

Headers Type Code
package main

import (
	"context"

	"github.com/liuzw3018/sponge/pkg/logger"
	"github.com/liuzw3018/sponge/pkg/rabbitmq"
	"github.com/liuzw3018/sponge/pkg/rabbitmq/producer"
)

func main() {
	url := rabbitmq.DefaultURL
	c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
	if err != nil {
		logger.Error("NewConnection err",logger.Err(err))
		return
	}
	defer c.Close()


	exchangeName := "headers-exchange-demo"
	// the message is only received if there is an exact match for headers
	queueName := "headers-queue-1"
	kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
	exchange := producer.NewHeaderExchange(exchangeName, producer.HeadersTypeAll, kv1)
	q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
	if err != nil {
		logger.Error("NewQueue err",logger.Err(err))
		return
	}
	defer q.Close()
	headersKey1 := kv1 // exact match, consumer queue can receive messages
	err = q.PublishHeaders(context.Background(), headersKey1, []byte("say hello"))
	if err != nil {
		logger.Error("PublishHeaders err",logger.Err(err))
		return
	}
}    

Publish Error Handling

If the error is caused by the network, you can check if the reconnection is successful and resend it again.

    err := q.Publish(context.Background(), []byte(routeKey+" say hello"))
    if err != nil {
        if errors.Is(err, producer.ErrClosed) && c.CheckConnected() { // check connection
            q, err = producer.NewQueue(queueName, c.Conn, exchange)
            if err != nil {
                logger.Info("queue reconnect failed", logger.Err(err))
            }else{
                logger.Info("queue reconnect success")
            }
        }
    }

Documentation

Overview

Package rabbitmq is a go wrapper for rabbitmq

Index

Constants

View Source
const DefaultURL = "amqp://guest:guest@localhost:5672/"

DefaultURL default rabbitmq url

Variables

This section is empty.

Functions

This section is empty.

Types

type Connection

type Connection struct {
	Mutex sync.Mutex

	Exit   chan struct{}
	ZapLog *zap.Logger

	Conn *amqp.Connection

	IsConnected bool
	// contains filtered or unexported fields
}

Connection rabbitmq connection

func NewConnection

func NewConnection(url string, opts ...ConnectionOption) (*Connection, error)

NewConnection rabbitmq connection

func (*Connection) CheckConnected

func (c *Connection) CheckConnected() bool

CheckConnected rabbitmq connection

func (*Connection) Close

func (c *Connection) Close()

Close rabbitmq connection

type ConnectionOption

type ConnectionOption func(*connectionOptions)

ConnectionOption connection option.

func WithLogger

func WithLogger(zapLog *zap.Logger) ConnectionOption

WithLogger set logger option.

func WithReconnectTime

func WithReconnectTime(d time.Duration) ConnectionOption

WithReconnectTime set reconnect time interval option.

func WithTLSConfig

func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption

WithTLSConfig set tls config option.

Directories

Path Synopsis
Package consumer is the generic consumer-side processing logic for the four modes direct, topic, fanout, headers
Package consumer is the generic consumer-side processing logic for the four modes direct, topic, fanout, headers
Package producer is the generic producer-side processing logic for the four modes direct, topic, fanout, headers.
Package producer is the generic producer-side processing logic for the four modes direct, topic, fanout, headers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL