amqp

package
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Logger                          Logger
	ConnectionString                string
	VirtualHost                     string
	Prefix                          string
	ReconnectionTimeout             time.Duration
	ConnectTimeout                  time.Duration
	InitTimeout                     time.Duration
	MaxUnconfirmedInFlightPublishes uint
	OnDial                          func(attempt int, err error)
}

Config contains configuration parameters for a Worker.

type ConsumerConfig

type ConsumerConfig struct {
	// the exchange
	Exchange string
	// the queue to bind to the exchange
	Queue string
	// whether to create the queue as a durable queue. Non durable queues will be deleted and the binding removed when the last consumer unsubscribes
	DurableQueue bool
	// the routing patterns to bind to
	RoutingPatterns []string
	// the prefetch size, i.e. the limit on the number of unacknowledged messages can be received at once. Set to 0 to disable.
	Prefetch int
	// the handler func. The handler func must itself take care of ack/nack/rejecting messages
	Handle func(*amqp.Delivery) error
	// The number of concurrent workers. If it is not set then it will default to 1.
	WorkerCount int
}

ConsumerConf is the configuration struct for a RabbitMQ consumer

type Logger

type Logger interface {
	Debugf(template string, args ...interface{})
	Errorf(template string, args ...interface{})
	Infof(template string, args ...interface{})
}

Logger is the logging interface required by a Worker.

type PublishDto

type PublishDto struct {
	Exchange      string
	RoutingKey    string
	MessageType   string
	CorrelationID string
	Message       interface{}
}

type Worker

type Worker struct {
	PublishMode string
	// contains filtered or unexported fields
}

Worker is an AMQP consumer and publisher implementation. It configures an AMQP channel to consume messages from a queue along with a publisher to publish messages to an exchange.

It transparently recovers from connection failures and publishes are blocked during recovery.

func New

func New(c Config) (*Worker, error)

New allocates, initializes and returns a new Worker instance.

func (*Worker) Close

func (w *Worker) Close() error

Close closes the worker stopping any active consumers and publishes.

func (*Worker) Publish

func (w *Worker) Publish(ctx context.Context, dto PublishDto) error

Publish publishes a message over AMQP. It will block until the message is confirmed on the server. In case of connection failures the method will block until connection is restored and message has been confirmed on the server.

If ctx is cancelled before completing the publish the operation is cancelled.

func (*Worker) StartConsumer

func (w *Worker) StartConsumer(consumerConfigs []ConsumerConfig, consumersStarted chan struct{}) error

StartConsumer consumes messages from an AMQP queue. The method is blocking and will always return with nil after calls to Close.

The consumers' queues are declared on startup along with a binding to the exchange with specified routing key.

Channel constumersStarted is closed when all consumers have been initialized.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL