Documentation ¶
Overview ¶
Package rabbitmq exports a RabbitMQ Client object that wraps the official go library. It automatically reconnects when the connection fails, and blocks all pushes until the connection succeeds. It also confirms every outgoing message, so none are lost. It doesn't automatically ack each message, but leaves that to the parent process, since it is usage-dependent.
Index ¶
Examples ¶
Constants ¶
const ( MQCONN_INIT = iota MQCONN_READY MQCONN_RECONNCTING MQCONN_CLOSED )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { // Client status Status int // Notify client is ready NotifyStatus chan int // Consumer delivery NotifyMessage chan amqp.Delivery // contains filtered or unexported fields }
Rabbit MQ client
func New ¶
func New(addr string, exchange Exchange, queue Queue, routingKey string, consumerTag ...string) *Client
New creates a new consumer state instance, and automatically, attempts to connect to the server. consumerTag unused when client as a publisher
func (*Client) CancelConsume ¶
func (*Client) Consume ¶
Consume will continuously put queue items on the channel. It is required to call delivery.Ack when it has been successfully processed, or delivery.Nack when it fails. Ignoring this will cause data to build up on the server. amqp.Delivery invalid after any Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs.
Example (Cancel) ¶
package main import ( "context" "sync" "time" "github.com/lovelacelee/clsgo/v1/config" "github.com/lovelacelee/clsgo/v1/log" mq "github.com/lovelacelee/clsgo/v1/rabbitmq" "github.com/lovelacelee/clsgo/v1/utils" ) var workGroup sync.WaitGroup const messageCount = 10 func clean() { utils.DeleteThingsInDir("logs") utils.DeletePath("logs") utils.DeleteFiles(utils.Cwd(), "/*.yaml$") utils.DeleteFiles(utils.Cwd(), "/*.xml$") } func main() { exchange := mq.Exchange{ ExchangeName: "clsgo-exchange", ExchangeType: "direct", Durable: true, Internal: false, AutoDelete: false, Nowait: false, } queue := mq.Queue{ QueueName: "clsgo-queue", Exclusive: false, Durable: true, AutoDelete: false, Nowait: false, } addr := config.Cfg.GetString("rabbitmq.server") queueClient := mq.New(addr, exchange, queue, "clsgo", "clsgo") defer queueClient.Close() count := 0 lastMessage := "" timeoutTimes := 0 for { select { // If MQ connection or channel closed, client will reconnect automatically, // Here we just wait it be ready for consume case status := <-queueClient.NotifyStatus: if status == mq.MQCONN_READY { log.Info("Start consume") msgChan, err := queueClient.Consume(false) utils.InfoIfError(err, log.Errorf) NEXT: //Continous consume if queueClient.Status != mq.MQCONN_READY { continue } // message := <-msgChan message, err := utils.ReadChanWithTimeout(context.Background(), msgChan, 2*time.Second) if utils.InfoIfError(err, log.Errorf) != nil { timeoutTimes++ if timeoutTimes > 5 { goto Exit } } lastMessage = string(message.Body) message.Ack(false) count++ if count > (messageCount - 1) { goto Exit } goto NEXT } default: continue } } Exit: log.Infof("Consumer routine done: %d %s", count, lastMessage) workGroup.Done() clean() }
Output:
func (*Client) Publish ¶
Publish will push data onto the queue, and wait for a confirm. If no confirms are received until within the resendTimeout, it continuously re-sends messages until a confirm is received. This will block until the server sends a confirm. Errors are only returned if the push action itself fails, see UnsafePublish. If retryTimes set, wait loop breaks after retryTimes check.
Example ¶
package main import ( "sync" "github.com/lovelacelee/clsgo/v1/config" "github.com/lovelacelee/clsgo/v1/log" mq "github.com/lovelacelee/clsgo/v1/rabbitmq" "github.com/lovelacelee/clsgo/v1/utils" ) var workGroup sync.WaitGroup const messageCount = 10 const retryTimes = 3 func clean() { utils.DeleteThingsInDir("logs") utils.DeletePath("logs") utils.DeleteFiles(utils.Cwd(), "/*.yaml$") utils.DeleteFiles(utils.Cwd(), "/*.xml$") } func main() { exchange := mq.Exchange{ ExchangeName: "clsgo-exchange", ExchangeType: "direct", Durable: true, Internal: false, AutoDelete: false, Nowait: false, } queue := mq.Queue{ QueueName: "clsgo-queue", Exclusive: false, Durable: true, AutoDelete: false, Nowait: false, } addr := config.Cfg.GetString("rabbitmq.server") queueClient := mq.New(addr, exchange, queue, "clsgo") defer queueClient.Close() message := []byte("message") log.Info("Start push") for i := 0; i < messageCount; i++ { // Publish blocks if err := queueClient.Publish( mq.PubStruct{ ContentType: "text/plain", Body: message, DeliveryMode: 2, }, retryTimes); err != nil { log.Errorfi("Push failed: %s\n", err) break } } log.Info("Push routine done") workGroup.Done() clean() }
Output:
func (*Client) UnsafePublish ¶
UnsafePublish will push to the queue without checking for confirmation. It returns an error if it fails to connect. No guarantees are provided for whether the server will receive the message.
type Exchange ¶
type Exchange struct { Durable bool ExchangeName string ExchangeType string // The common types are "direct", "fanout", "topic" and "headers". Internal bool AutoDelete bool Nowait bool }
Params for ExchangeDeclare
type PubStruct ¶
type PubStruct = amqp.Publishing