amqp

package
v1.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2023 License: Apache-2.0, MIT Imports: 12 Imported by: 1

README

Watermill AMQP Pub/Sub

CI Status Go Report Card

This is Pub/Sub for the Watermill project.

All Pub/Sub implementations can be found at https://watermill.io/pubsubs/.

Watermill is a Go library for working efficiently with message streams. It is intended for building event driven applications, enabling event sourcing, RPC over messages, sagas and basically whatever else comes to your mind. You can use conventional pub/sub implementations like Kafka or RabbitMQ, but also HTTP or MySQL binlog if that fits your use case.

Documentation: https://watermill.io/

Getting started guide: https://watermill.io/docs/getting-started/

Issues: https://github.com/ThreeDotsLabs/watermill/issues

Contributing

All contributions are very much welcome. If you'd like to help with Watermill development, please see open issues and submit your pull request via GitHub.

Support

If you didn't find the answer to your question in the documentation, feel free to ask us directly!

Please join us on the #watermill channel on the Gophers slack: You can get an invite here.

License

MIT License

Documentation

Overview

Package amqp fork from github.com/ThreeDotsLabs/watermill-amqp@f72ea40 AMQP implementation of Watermill's Pub/Sub interface.

Supported features: - Reconnect support - Fully customizable configuration - Qos settings - TLS support - Publish Transactions support (optional, can be enabled in config)

Nomenclature

Unfortunately, Watermill's nomenclature is not fully compatible with AMQP's nomenclature. Depending of the configuration, topic can be mapped to exchange name, routing key and queue name.

IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.

In case of any problem to find to what exchange name, routing key and queue name are set, just enable logging with debug level and check it in logs.

Index

Constants

View Source
const DefaultMessageUUIDHeaderKey = "_watermill_message_uuid"
View Source
const MessageUUIDHeaderKey = DefaultMessageUUIDHeaderKey

deprecated, please use DefaultMessageUUIDHeaderKey instead

Variables

This section is empty.

Functions

func GenerateQueueNameTopicName

func GenerateQueueNameTopicName(topic string) string

GenerateQueueNameTopicName generates queueName equal to the topic.

Types

type Config

type Config struct {
	Connection ConnectionConfig

	Marshaler Marshaler

	Exchange  ExchangeConfig
	Queue     QueueConfig
	QueueBind QueueBindConfig

	Publish PublishConfig
	Consume ConsumeConfig

	TopologyBuilder TopologyBuilder
}

func NewDurablePubSubConfig

func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config

NewDurablePubSubConfig creates config for durable PubSub. generateQueueName is optional, when passing to the publisher. Exchange name is set to the topic name and routing key is empty.

IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.

This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html with durable added for exchange, queue and amqp.Persistent DeliveryMode. Thanks to this, we don't lose messages on broker restart.

func NewDurableQueueConfig

func NewDurableQueueConfig(amqpURI string) Config

NewDurableQueueConfig creates config for durable Queue. Queue name and routing key is set to the topic name by default. Default ("") exchange is used.

IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.

This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html with durable added for exchange, queue and amqp.Persistent DeliveryMode. Thanks to this, we don't lose messages on broker restart.

func NewNonDurablePubSubConfig

func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config

NewNonDurablePubSubConfig creates config for non durable PubSub. generateQueueName is optional, when passing to the publisher. Exchange name is set to the topic name and routing key is empty.

IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.

This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html. This config is not durable, so on the restart of the broker all messages will be lost.

func NewNonDurableQueueConfig

func NewNonDurableQueueConfig(amqpURI string) Config

NewNonDurableQueueConfig creates config for non durable Queue. Queue name and routing key is set to the topic name by default. Default ("") exchange is used.

IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.

This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html. This config is not durable, so on the restart of the broker all messages will be lost.

func (Config) ValidatePublisher

func (c Config) ValidatePublisher() error

func (Config) ValidatePublisherWithConnection

func (c Config) ValidatePublisherWithConnection() error

func (Config) ValidateSubscriber

func (c Config) ValidateSubscriber() error

func (Config) ValidateSubscriberWithConnection

func (c Config) ValidateSubscriberWithConnection() error

type ConnectionConfig

type ConnectionConfig struct {
	AmqpURI string

	TLSConfig  *tls.Config
	AmqpConfig *amqp.Config

	Reconnect *ReconnectConfig
}

type ConnectionWrapper

type ConnectionWrapper struct {
	// contains filtered or unexported fields
}

ConnectionWrapper manages an AMQP connection.

func NewConnection

func NewConnection(
	config ConnectionConfig,
	logger watermill.LoggerAdapter,
) (*ConnectionWrapper, error)

NewConnection returns a new connection wrapper.

func (*ConnectionWrapper) Close

func (c *ConnectionWrapper) Close() error

func (*ConnectionWrapper) Closed

func (c *ConnectionWrapper) Closed() bool

func (*ConnectionWrapper) Connected

func (c *ConnectionWrapper) Connected() chan struct{}

func (*ConnectionWrapper) Connection

func (c *ConnectionWrapper) Connection() *amqp.Connection

func (*ConnectionWrapper) IsConnected

func (c *ConnectionWrapper) IsConnected() bool

type ConsumeConfig

type ConsumeConfig struct {
	// When true, message will be not requeued when nacked.
	NoRequeueOnNack bool

	// The consumer is identified by a string that is unique and scoped for all
	// consumers on this channel.  If you wish to eventually cancel the consumer, use
	// the same non-empty identifier in Channel.Cancel.  An empty string will cause
	// the library to generate a unique identity.  The consumer identity will be
	// included in every Delivery in the ConsumerTag field
	Consumer string

	// When exclusive is true, the server will ensure that this is the sole consumer
	// from this queue. When exclusive is false, the server will fairly distribute
	// deliveries across multiple consumers.
	Exclusive bool

	// The noLocal flag is not supported by RabbitMQ.
	NoLocal bool

	// When noWait is true, do not wait for the server to confirm the request and
	// immediately begin deliveries.  If it is not possible to consume, a channel
	// exception will be raised and the channel will be closed.
	NoWait bool

	Qos QosConfig

	// Optional arguments can be provided that have specific semantics for the queue
	// or server.
	Arguments amqp.Table
}

type CorrelatingMarshaler

type CorrelatingMarshaler struct {
	// PostprocessPublishing can be used to make some extra processing with amqp.Publishing,
	// for example add CorrelationId and ContentType:
	//
	//  amqp.DefaultMarshaler{
	//		PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing {
	//			publishing.CorrelationId = "correlation"
	//			publishing.ContentType = "application/json"
	//
	//			return publishing
	//		},
	//	}
	PostprocessPublishing func(amqp.Publishing) amqp.Publishing

	// When true, DeliveryMode will be not set to Persistent.
	//
	// DeliveryMode Transient means higher throughput, but messages will not be
	// restored on broker restart. The delivery mode of publishings is unrelated
	// to the durability of the queues they reside on. Transient messages will
	// not be restored to durable queues, persistent messages will be restored to
	// durable queues and lost on non-durable queues during server restart.
	NotPersistentDeliveryMode bool
}

CorrelatingMarshaler will pass UUID through the AMQP native correlation ID rather than as a header

func (CorrelatingMarshaler) Marshal

func (CorrelatingMarshaler) Unmarshal

func (cm CorrelatingMarshaler) Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error)

type DefaultMarshaler

type DefaultMarshaler struct {
	// PostprocessPublishing can be used to make some extra processing with amqp.Publishing,
	// for example add CorrelationId and ContentType:
	//
	//  amqp.DefaultMarshaler{
	//		PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing {
	//			publishing.CorrelationId = "correlation"
	//			publishing.ContentType = "application/json"
	//
	//			return publishing
	//		},
	//	}
	PostprocessPublishing func(amqp.Publishing) amqp.Publishing

	// When true, DeliveryMode will be not set to Persistent.
	//
	// DeliveryMode Transient means higher throughput, but messages will not be
	// restored on broker restart. The delivery mode of publishings is unrelated
	// to the durability of the queues they reside on. Transient messages will
	// not be restored to durable queues, persistent messages will be restored to
	// durable queues and lost on non-durable queues during server restart.
	NotPersistentDeliveryMode bool

	// Header used to store and read message UUID.
	//
	// If value is empty, DefaultMessageUUIDHeaderKey value is used.
	// If header doesn't exist, empty value is passed as message UUID.
	MessageUUIDHeaderKey string

	AppID string
}

func (DefaultMarshaler) Marshal

func (d DefaultMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error)

func (DefaultMarshaler) Unmarshal

func (d DefaultMarshaler) Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error)

type DefaultTopologyBuilder

type DefaultTopologyBuilder struct{}

func (*DefaultTopologyBuilder) BuildTopology

func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string,
	exchangeName string, config Config, logger watermill.LoggerAdapter) error

func (DefaultTopologyBuilder) ExchangeDeclare

func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error

type ExchangeConfig

type ExchangeConfig struct {
	// GenerateName is generated based on the topic provided for Publish or Subscribe method.
	//
	// Exchange names starting with "amq." are reserved for pre-declared and
	// standardized exchanges. The client MAY declare an exchange starting with
	// "amq." if the passive option is set, or the exchange already exists.  Names can
	// consist of a non-empty sequence of letters, digits, hyphen, underscore,
	// period, or colon.
	GenerateName func(topic string) string

	// Each exchange belongs to one of a set of exchange kinds/types implemented by
	// the server. The exchange types define the functionality of the exchange - i.e.
	// how messages are routed through it. Once an exchange is declared, its type
	// cannot be changed.  The common types are "direct", "fanout", "topic" and
	// "headers".
	Type string

	// Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
	// declared when there are no remaining bindings.  This is the best lifetime for
	// long-lived exchange configurations like stable routes and default exchanges.
	Durable bool

	// Non-Durable and Auto-Deleted exchanges will be deleted when there are no
	// remaining bindings and not restored on server restart.  This lifetime is
	// useful for temporary topologies that should not pollute the virtual host on
	// failure or after the consumers have completed.
	//
	// Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
	// running including when there are no remaining bindings.  This is useful for
	// temporary topologies that may have long delays between bindings.
	//
	AutoDeleted bool

	// Exchanges declared as `internal` do not accept accept publishings. Internal
	// exchanges are useful when you wish to implement inter-exchange topologies
	// that should not be exposed to users of the broker.
	Internal bool

	// When noWait is true, declare without waiting for a confirmation from the server.
	// The channel may be closed as a result of an error.  Add a NotifyClose listener-
	// to respond to any exceptions.
	NoWait bool

	// Optional amqp.Table of arguments that are specific to the server's implementation of
	// the exchange can be sent for exchange types that require extra parameters.
	Arguments amqp.Table
}

type Marshaler

type Marshaler interface {
	Marshal(msg *message.Message) (amqp.Publishing, error)
	Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error)
}

Marshaler marshals Watermill's message to amqp.Publishing and unmarshals amqp.Delivery to Watermill's message.

type PublishConfig

type PublishConfig struct {
	// GenerateRoutingKey is generated based on the topic provided for Publish.
	GenerateRoutingKey func(topic string) string

	// Publishings can be undeliverable when the mandatory flag is true and no queue is
	// bound that matches the routing key, or when the immediate flag is true and no
	// consumer on the matched queue is ready to accept the delivery.
	Mandatory bool

	// Publishings can be undeliverable when the mandatory flag is true and no queue is
	// bound that matches the routing key, or when the immediate flag is true and no
	// consumer on the matched queue is ready to accept the delivery.
	Immediate bool

	// With transactional enabled, all messages wil be added in transaction.
	Transactional bool

	// ChannelPoolSize specifies the size of a channel pool. All channels in the pool are opened when the publisher is
	// created. When a Publish operation is performed then a channel is taken from the pool to perform the operation and
	// then returned to the pool once the operation has finished. If all channels are in use then the Publish operation
	// waits until a channel is returned to the pool.
	// If this value is set to 0 (default) then channels are not pooled and a new channel is opened/closed for every
	// Publish operation.
	ChannelPoolSize int

	// ConfirmDelivery indicates whether the Publish function should wait until a confirmation is received from
	// the AMQP server in order to guarantee that the message is delivered. Setting this value to true may
	// negatively impact performance but will increase reliability.
	ConfirmDelivery bool
}

type Publisher

type Publisher struct {
	*ConnectionWrapper
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(config Config, logger watermill.LoggerAdapter) (*Publisher, error)

func NewPublisherWithConnection

func NewPublisherWithConnection(config Config, logger watermill.LoggerAdapter,
	conn *ConnectionWrapper) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close() error

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic string, messages ...*message.Message) (err error)

Publish publishes messages to AMQP broker. Publish is blocking until the broker has received and saved the message. Publish is always thread safe.

Watermill's topic in Publish is not mapped to AMQP's topic, but depending on configuration it can be mapped to exchange, queue or routing key. For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.

type QosConfig

type QosConfig struct {
	// With a prefetch count greater than zero, the server will deliver that many
	// messages to consumers before acknowledgments are received.  The server ignores
	// this option when consumers are started with noAck because no acknowledgments
	// are expected or sent.
	//
	// In order to defeat that we can set the prefetch count with the value of 1.
	// This tells RabbitMQ not to give more than one message to a worker at a time.
	// Or, in other words, don't dispatch a new message to a worker until it has
	// processed and acknowledged the previous one.
	// Instead, it will dispatch it to the next worker that is not still busy.
	PrefetchCount int

	// With a prefetch size greater than zero, the server will try to keep at least
	// that many bytes of deliveries flushed to the network before receiving
	// acknowledgments from the consumers.  This option is ignored when consumers are
	// started with noAck.
	PrefetchSize int

	// When global is true, these Qos settings apply to all existing and future
	// consumers on all channels on the same connection.  When false, the Channel.Qos
	// settings will apply to all existing and future consumers on this channel.
	//
	// Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
	// how the global flag is implemented in RabbitMQ, as it differs from the
	// AMQP 0.9.1 specification in that global Qos settings are limited in scope to
	// channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
	Global bool
}

Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.

type QueueBindConfig

type QueueBindConfig struct {
	GenerateRoutingKey func(topic string) string

	// When noWait is false and the queue could not be bound, the channel will be
	// closed with an error.
	NoWait bool

	// Optional amqpe.Table of arguments that are specific to the server's implementation of
	// the queue bind can be sent for queue bind types that require extra parameters.
	Arguments amqp.Table
}

QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.

type QueueConfig

type QueueConfig struct {
	// GenerateRoutingKey is generated based on the topic provided for Subscribe.
	GenerateName QueueNameGenerator

	// Durable and Non-Auto-Deleted queues will survive server restarts and remain
	// when there are no remaining consumers or bindings.  Persistent publishings will
	// be restored in this queue on server restart.  These queues are only able to be
	// bound to durable exchanges.
	Durable bool

	// Non-Durable and Auto-Deleted exchanges will be deleted when there are no
	// remaining bindings and not restored on server restart.  This lifetime is
	// useful for temporary topologies that should not pollute the virtual host on
	// failure or after the consumers have completed.
	//
	// Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
	// running including when there are no remaining bindings.  This is useful for
	// temporary topologies that may have long delays between bindings.
	AutoDelete bool

	// Exclusive queues are only accessible by the connection that declares them and
	// will be deleted when the connection closes.  Channels on other connections
	// will receive an error when attempting  to declare, bind, consume, purge or
	// delete a queue with the same name.
	Exclusive bool

	// When noWait is true, the queue will assume to be declared on the server.  A
	// channel exception will arrive if the conditions are met for existing queues
	// or attempting to modify an existing queue from a different connection.
	NoWait bool

	// Optional amqpe.Table of arguments that are specific to the server's implementation of
	// the queue can be sent for queue types that require extra parameters.
	Arguments amqp.Table
}

type QueueNameGenerator

type QueueNameGenerator func(topic string) string

QueueNameGenerator generates QueueName based on the topic.

func GenerateQueueNameConstant

func GenerateQueueNameConstant(queueName string) QueueNameGenerator

GenerateQueueNameConstant generate queue name equal to queueName.

func GenerateQueueNameTopicNameWithSuffix

func GenerateQueueNameTopicNameWithSuffix(suffix string) QueueNameGenerator

GenerateQueueNameTopicNameWithSuffix generates queue name equal to:

topic + "_" + suffix

type ReconnectConfig

type ReconnectConfig struct {
	BackoffInitialInterval     time.Duration
	BackoffRandomizationFactor float64
	BackoffMultiplier          float64
	BackoffMaxInterval         time.Duration
}

func DefaultReconnectConfig

func DefaultReconnectConfig() *ReconnectConfig

type Subscriber

type Subscriber struct {
	*ConnectionWrapper
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(config Config, logger watermill.LoggerAdapter) (*Subscriber, error)

func NewSubscriberWithConnection

func NewSubscriberWithConnection(config Config,
	logger watermill.LoggerAdapter, conn *ConnectionWrapper) (*Subscriber, error)

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close closes all subscriptions with their output channels.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)

Subscribe consumes messages from AMQP broker.

Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration it can be mapped to exchange, queue or routing key. For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.

func (*Subscriber) SubscribeInitialize

func (s *Subscriber) SubscribeInitialize(topic string) (err error)

type TopologyBuilder

type TopologyBuilder interface {
	BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error
	ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error
}

TopologyBuilder is responsible for declaring exchange, queues and queues binding.

Default TopologyBuilder is DefaultTopologyBuilder. If you need custom built topology, you should implement your own TopologyBuilder and pass it to the amqp.Config:

config := NewDurablePubSubConfig()
config.TopologyBuilder = MyProCustomBuilder{}

nolint: revive // interface too long issue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL