rabbitmq

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2022 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package rabbitmq exports a RabbitMQ Client object that wraps the official go library. It automatically reconnects when the connection fails, and blocks all pushes until the connection succeeds. It also confirms every outgoing message, so none are lost. It doesn't automatically ack each message, but leaves that to the parent process, since it is usage-dependent.

Index

Examples

Constants

View Source
const (
	MQCONN_INIT = iota
	MQCONN_READY
	MQCONN_RECONNCTING
	MQCONN_CLOSED
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {

	// Client status
	Status int
	// Notify client is ready
	NotifyStatus chan int
	// Consumer delivery
	NotifyMessage chan amqp.Delivery
	// contains filtered or unexported fields
}

Rabbit MQ client

func New

func New(addr string, exchange Exchange, queue Queue, routingKey string, consumerTag ...string) *Client

New creates a new consumer state instance, and automatically, attempts to connect to the server. consumerTag unused when client as a publisher

func (*Client) CancelConsume

func (client *Client) CancelConsume() error

func (*Client) Close

func (client *Client) Close() error

Close will cleanly shutdown the channel and connection.

func (*Client) Consume

func (client *Client) Consume(autoAck bool) (<-chan amqp.Delivery, error)

Consume will continuously put queue items on the channel. It is required to call delivery.Ack when it has been successfully processed, or delivery.Nack when it fails. Ignoring this will cause data to build up on the server. amqp.Delivery invalid after any Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs.

Example (Cancel)
package main

import (
	"context"
	"sync"
	"time"

	"github.com/lovelacelee/clsgo/v1/config"
	"github.com/lovelacelee/clsgo/v1/log"

	mq "github.com/lovelacelee/clsgo/v1/rabbitmq"
	"github.com/lovelacelee/clsgo/v1/utils"
)

var workGroup sync.WaitGroup

const messageCount = 10

func clean() {
	utils.DeleteThingsInDir("logs")
	utils.DeletePath("logs")
	utils.DeleteFiles(utils.Cwd(), "/*.yaml$")
	utils.DeleteFiles(utils.Cwd(), "/*.xml$")
}

func main() {
	exchange := mq.Exchange{
		ExchangeName: "clsgo-exchange",
		ExchangeType: "direct",
		Durable:      true,
		Internal:     false,
		AutoDelete:   false,
		Nowait:       false,
	}
	queue := mq.Queue{
		QueueName:  "clsgo-queue",
		Exclusive:  false,
		Durable:    true,
		AutoDelete: false,
		Nowait:     false,
	}
	addr := config.Cfg.GetString("rabbitmq.server")
	queueClient := mq.New(addr, exchange, queue, "clsgo", "clsgo")
	defer queueClient.Close()

	count := 0
	lastMessage := ""
	timeoutTimes := 0
	for {
		select {
		// If MQ connection or channel closed, client will reconnect automatically,
		// Here we just wait it be ready for consume
		case status := <-queueClient.NotifyStatus:
			if status == mq.MQCONN_READY {
				log.Info("Start consume")
				msgChan, err := queueClient.Consume(false)
				utils.InfoIfError(err, log.Errorf)
			NEXT: //Continous consume
				if queueClient.Status != mq.MQCONN_READY {
					continue
				}
				// message := <-msgChan
				message, err := utils.ReadChanWithTimeout(context.Background(), msgChan, 2*time.Second)
				if utils.InfoIfError(err, log.Errorf) != nil {
					timeoutTimes++
					if timeoutTimes > 5 {
						goto Exit
					}
				}
				lastMessage = string(message.Body)
				message.Ack(false)
				count++
				if count > (messageCount - 1) {
					goto Exit
				}
				goto NEXT
			}

		default:
			continue
		}
	}

Exit:
	log.Infof("Consumer routine done: %d %s", count, lastMessage)
	workGroup.Done()

	clean()
}
Output:

func (*Client) Publish

func (client *Client) Publish(data PubStruct, retryTimes ...uint) error

Publish will push data onto the queue, and wait for a confirm. If no confirms are received until within the resendTimeout, it continuously re-sends messages until a confirm is received. This will block until the server sends a confirm. Errors are only returned if the push action itself fails, see UnsafePublish. If retryTimes set, wait loop breaks after retryTimes check.

Example
package main

import (
	"sync"

	"github.com/lovelacelee/clsgo/v1/config"
	"github.com/lovelacelee/clsgo/v1/log"

	mq "github.com/lovelacelee/clsgo/v1/rabbitmq"
	"github.com/lovelacelee/clsgo/v1/utils"
)

var workGroup sync.WaitGroup

const messageCount = 10
const retryTimes = 3

func clean() {
	utils.DeleteThingsInDir("logs")
	utils.DeletePath("logs")
	utils.DeleteFiles(utils.Cwd(), "/*.yaml$")
	utils.DeleteFiles(utils.Cwd(), "/*.xml$")
}

func main() {
	exchange := mq.Exchange{
		ExchangeName: "clsgo-exchange",
		ExchangeType: "direct",
		Durable:      true,
		Internal:     false,
		AutoDelete:   false,
		Nowait:       false,
	}
	queue := mq.Queue{
		QueueName:  "clsgo-queue",
		Exclusive:  false,
		Durable:    true,
		AutoDelete: false,
		Nowait:     false,
	}
	addr := config.Cfg.GetString("rabbitmq.server")
	queueClient := mq.New(addr, exchange, queue, "clsgo")
	defer queueClient.Close()
	message := []byte("message")
	log.Info("Start push")
	for i := 0; i < messageCount; i++ {
		// Publish blocks
		if err := queueClient.Publish(
			mq.PubStruct{
				ContentType:  "text/plain",
				Body:         message,
				DeliveryMode: 2,
			}, retryTimes); err != nil {
			log.Errorfi("Push failed: %s\n", err)
			break
		}
	}
	log.Info("Push routine done")
	workGroup.Done()

	clean()
}
Output:

func (*Client) UnsafePublish

func (client *Client) UnsafePublish(data PubStruct) error

UnsafePublish will push to the queue without checking for confirmation. It returns an error if it fails to connect. No guarantees are provided for whether the server will receive the message.

type Exchange

type Exchange struct {
	Durable      bool
	ExchangeName string
	ExchangeType string // The common types are "direct", "fanout", "topic" and "headers".
	Internal     bool
	AutoDelete   bool
	Nowait       bool
}

Params for ExchangeDeclare

type PubStruct

type PubStruct = amqp.Publishing

type Queue

type Queue struct {
	QueueName  string
	Exclusive  bool
	Durable    bool
	AutoDelete bool
	Nowait     bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL