rabbitmq

package
v1.13.0-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2024 License: MIT Imports: 6 Imported by: 0

README

Helper packages for connecting to rabbitmq

Package channelprovider

NewChannelProvider gives you a new channel provider. It takes the list of servers from "rabbitmq" in config.

NewChannelProviderWithServers gives you a new channel provider. You have to pass a list of rabbitmq servers.

GetChannel initialises a connection pool once and tries to get a channel, with exponential back-off up to 30 minutes.

Package connectionpool

NewConnectionPool allows to create a new connection pool (type Pool), manages adding/removing connection from pool. Also provides method to get connection from pool, which has a timeout of 1 minute.

Package connection

Provides method to get a new connection to the given server, will retry with exponential back-off upto 30 min. Implements the IConnectionProvider interface defined in connectionpool

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type IProcessor

type IProcessor interface {
	ProcessMessage(map[string]interface{}) bool
}

IProcessor : interface for consuming messages from queue

type OperationManager

type OperationManager struct {
	// contains filtered or unexported fields
}

OperationManager manages rabbitmq connections and operations like publish & consume

func NewRabbitMQManager

func NewRabbitMQManager(logger *gologger.CustomLogger, rabbitMqServers []string, queueName string, username string, password string) *OperationManager

NewRabbitMQManager : returns RabbitMQ OperationManager. panics if empty server list given.

func (*OperationManager) NewRabbitmqChannel

func (om *OperationManager) NewRabbitmqChannel(notifyError bool) (*amqp.Channel, chan *amqp.Error)

NewRabbitmqChannel : initializes the rabbitmq channel. Input parameter is a flag to notify error on channel NOTE: Add a listener to returned error channel to handle connection errors.

func (*OperationManager) Publish

func (om *OperationManager) Publish(ch *amqp.Channel, msg []byte)

Publish : publishes the message bytes to given queue

func (*OperationManager) PublishDL

func (om *OperationManager) PublishDL(ch *amqp.Channel, msg []byte)

PublishDL : publishes the message bytes to dead letter queue

func (*OperationManager) SetBindings

func (om *OperationManager) SetBindings(ch *amqp.Channel, isDL bool) error

SetBindings : declare the queue, exchange and sets bindings between queue and exhange. pass `isDL` true to set dead letter bindings for given queuename

func (*OperationManager) StartConsumer

func (om *OperationManager) StartConsumer(processor IProcessor)

StartConsumer : starts the consumer from given queue Also it declares a dead letter queue and publishes the failed messages to DL

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL