Documentation ¶
Overview ¶
Package amqp fork from github.com/ThreeDotsLabs/watermill-amqp@f72ea40 AMQP implementation of Watermill's Pub/Sub interface.
Supported features: - Reconnect support - Fully customizable configuration - Qos settings - TLS support - Publish Transactions support (optional, can be enabled in config)
Nomenclature ¶
Unfortunately, Watermill's nomenclature is not fully compatible with AMQP's nomenclature. Depending of the configuration, topic can be mapped to exchange name, routing key and queue name.
IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
In case of any problem to find to what exchange name, routing key and queue name are set, just enable logging with debug level and check it in logs.
Index ¶
- Constants
- func GenerateQueueNameTopicName(topic string) string
- type Config
- type ConnectionConfig
- type ConnectionWrapper
- type ConsumeConfig
- type CorrelatingMarshaler
- type DefaultMarshaler
- type DefaultTopologyBuilder
- type ExchangeConfig
- type Marshaler
- type PublishConfig
- type Publisher
- type QosConfig
- type QueueBindConfig
- type QueueConfig
- type QueueNameGenerator
- type ReconnectConfig
- type Subscriber
- type TopologyBuilder
Constants ¶
const DefaultMessageUUIDHeaderKey = "_watermill_message_uuid"
const MessageUUIDHeaderKey = DefaultMessageUUIDHeaderKey
deprecated, please use DefaultMessageUUIDHeaderKey instead
Variables ¶
This section is empty.
Functions ¶
func GenerateQueueNameTopicName ¶
GenerateQueueNameTopicName generates queueName equal to the topic.
Types ¶
type Config ¶
type Config struct { Connection ConnectionConfig Marshaler Marshaler Exchange ExchangeConfig Queue QueueConfig QueueBind QueueBindConfig Publish PublishConfig Consume ConsumeConfig TopologyBuilder TopologyBuilder }
func NewDurablePubSubConfig ¶
func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config
NewDurablePubSubConfig creates config for durable PubSub. generateQueueName is optional, when passing to the publisher. Exchange name is set to the topic name and routing key is empty.
IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html with durable added for exchange, queue and amqp.Persistent DeliveryMode. Thanks to this, we don't lose messages on broker restart.
func NewDurableQueueConfig ¶
NewDurableQueueConfig creates config for durable Queue. Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html with durable added for exchange, queue and amqp.Persistent DeliveryMode. Thanks to this, we don't lose messages on broker restart.
func NewNonDurablePubSubConfig ¶
func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config
NewNonDurablePubSubConfig creates config for non durable PubSub. generateQueueName is optional, when passing to the publisher. Exchange name is set to the topic name and routing key is empty.
IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html. This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurableQueueConfig ¶
NewNonDurableQueueConfig creates config for non durable Queue. Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type. It is used to generate exchange name, routing key and queue name, depending on the context. To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html. This config is not durable, so on the restart of the broker all messages will be lost.
func (Config) ValidatePublisher ¶
func (Config) ValidatePublisherWithConnection ¶
func (Config) ValidateSubscriber ¶
func (Config) ValidateSubscriberWithConnection ¶
type ConnectionConfig ¶
type ConnectionWrapper ¶
type ConnectionWrapper struct {
// contains filtered or unexported fields
}
ConnectionWrapper manages an AMQP connection.
func NewConnection ¶
func NewConnection( config ConnectionConfig, logger watermill.LoggerAdapter, ) (*ConnectionWrapper, error)
NewConnection returns a new connection wrapper.
func (*ConnectionWrapper) Close ¶
func (c *ConnectionWrapper) Close() error
func (*ConnectionWrapper) Closed ¶
func (c *ConnectionWrapper) Closed() bool
func (*ConnectionWrapper) Connected ¶
func (c *ConnectionWrapper) Connected() chan struct{}
func (*ConnectionWrapper) Connection ¶
func (c *ConnectionWrapper) Connection() *amqp.Connection
func (*ConnectionWrapper) IsConnected ¶
func (c *ConnectionWrapper) IsConnected() bool
type ConsumeConfig ¶
type ConsumeConfig struct { // When true, message will be not requeued when nacked. NoRequeueOnNack bool // The consumer is identified by a string that is unique and scoped for all // consumers on this channel. If you wish to eventually cancel the consumer, use // the same non-empty identifier in Channel.Cancel. An empty string will cause // the library to generate a unique identity. The consumer identity will be // included in every Delivery in the ConsumerTag field Consumer string // When exclusive is true, the server will ensure that this is the sole consumer // from this queue. When exclusive is false, the server will fairly distribute // deliveries across multiple consumers. Exclusive bool // The noLocal flag is not supported by RabbitMQ. NoLocal bool // When noWait is true, do not wait for the server to confirm the request and // immediately begin deliveries. If it is not possible to consume, a channel // exception will be raised and the channel will be closed. NoWait bool Qos QosConfig // Optional arguments can be provided that have specific semantics for the queue // or server. Arguments amqp.Table }
type CorrelatingMarshaler ¶
type CorrelatingMarshaler struct { // PostprocessPublishing can be used to make some extra processing with amqp.Publishing, // for example add CorrelationId and ContentType: // // amqp.DefaultMarshaler{ // PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing { // publishing.CorrelationId = "correlation" // publishing.ContentType = "application/json" // // return publishing // }, // } PostprocessPublishing func(amqp.Publishing) amqp.Publishing // When true, DeliveryMode will be not set to Persistent. // // DeliveryMode Transient means higher throughput, but messages will not be // restored on broker restart. The delivery mode of publishings is unrelated // to the durability of the queues they reside on. Transient messages will // not be restored to durable queues, persistent messages will be restored to // durable queues and lost on non-durable queues during server restart. NotPersistentDeliveryMode bool }
CorrelatingMarshaler will pass UUID through the AMQP native correlation ID rather than as a header
func (CorrelatingMarshaler) Marshal ¶
func (cm CorrelatingMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error)
type DefaultMarshaler ¶
type DefaultMarshaler struct { // PostprocessPublishing can be used to make some extra processing with amqp.Publishing, // for example add CorrelationId and ContentType: // // amqp.DefaultMarshaler{ // PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing { // publishing.CorrelationId = "correlation" // publishing.ContentType = "application/json" // // return publishing // }, // } PostprocessPublishing func(amqp.Publishing) amqp.Publishing // When true, DeliveryMode will be not set to Persistent. // // DeliveryMode Transient means higher throughput, but messages will not be // restored on broker restart. The delivery mode of publishings is unrelated // to the durability of the queues they reside on. Transient messages will // not be restored to durable queues, persistent messages will be restored to // durable queues and lost on non-durable queues during server restart. NotPersistentDeliveryMode bool // Header used to store and read message UUID. // // If value is empty, DefaultMessageUUIDHeaderKey value is used. // If header doesn't exist, empty value is passed as message UUID. MessageUUIDHeaderKey string AppID string }
func (DefaultMarshaler) Marshal ¶
func (d DefaultMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error)
type DefaultTopologyBuilder ¶
type DefaultTopologyBuilder struct{}
func (*DefaultTopologyBuilder) BuildTopology ¶
func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error
func (DefaultTopologyBuilder) ExchangeDeclare ¶
type ExchangeConfig ¶
type ExchangeConfig struct { // GenerateName is generated based on the topic provided for Publish or Subscribe method. // // Exchange names starting with "amq." are reserved for pre-declared and // standardized exchanges. The client MAY declare an exchange starting with // "amq." if the passive option is set, or the exchange already exists. Names can // consist of a non-empty sequence of letters, digits, hyphen, underscore, // period, or colon. GenerateName func(topic string) string // Each exchange belongs to one of a set of exchange kinds/types implemented by // the server. The exchange types define the functionality of the exchange - i.e. // how messages are routed through it. Once an exchange is declared, its type // cannot be changed. The common types are "direct", "fanout", "topic" and // "headers". Type string // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain // declared when there are no remaining bindings. This is the best lifetime for // long-lived exchange configurations like stable routes and default exchanges. Durable bool // Non-Durable and Auto-Deleted exchanges will be deleted when there are no // remaining bindings and not restored on server restart. This lifetime is // useful for temporary topologies that should not pollute the virtual host on // failure or after the consumers have completed. // // Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is // running including when there are no remaining bindings. This is useful for // temporary topologies that may have long delays between bindings. // AutoDeleted bool // Exchanges declared as `internal` do not accept accept publishings. Internal // exchanges are useful when you wish to implement inter-exchange topologies // that should not be exposed to users of the broker. Internal bool // When noWait is true, declare without waiting for a confirmation from the server. // The channel may be closed as a result of an error. Add a NotifyClose listener- // to respond to any exceptions. NoWait bool // Optional amqp.Table of arguments that are specific to the server's implementation of // the exchange can be sent for exchange types that require extra parameters. Arguments amqp.Table }
type Marshaler ¶
type Marshaler interface { Marshal(msg *message.Message) (amqp.Publishing, error) Unmarshal(amqpMsg amqp.Delivery) (*message.Message, error) }
Marshaler marshals Watermill's message to amqp.Publishing and unmarshals amqp.Delivery to Watermill's message.
type PublishConfig ¶
type PublishConfig struct { // GenerateRoutingKey is generated based on the topic provided for Publish. GenerateRoutingKey func(topic string) string // Publishings can be undeliverable when the mandatory flag is true and no queue is // bound that matches the routing key, or when the immediate flag is true and no // consumer on the matched queue is ready to accept the delivery. Mandatory bool // Publishings can be undeliverable when the mandatory flag is true and no queue is // bound that matches the routing key, or when the immediate flag is true and no // consumer on the matched queue is ready to accept the delivery. Immediate bool // With transactional enabled, all messages wil be added in transaction. Transactional bool // ChannelPoolSize specifies the size of a channel pool. All channels in the pool are opened when the publisher is // created. When a Publish operation is performed then a channel is taken from the pool to perform the operation and // then returned to the pool once the operation has finished. If all channels are in use then the Publish operation // waits until a channel is returned to the pool. // If this value is set to 0 (default) then channels are not pooled and a new channel is opened/closed for every // Publish operation. ChannelPoolSize int // ConfirmDelivery indicates whether the Publish function should wait until a confirmation is received from // the AMQP server in order to guarantee that the message is delivered. Setting this value to true may // negatively impact performance but will increase reliability. ConfirmDelivery bool }
type Publisher ¶
type Publisher struct { *ConnectionWrapper // contains filtered or unexported fields }
func NewPublisher ¶
func NewPublisher(config Config, logger watermill.LoggerAdapter) (*Publisher, error)
func NewPublisherWithConnection ¶
func NewPublisherWithConnection(config Config, logger watermill.LoggerAdapter, conn *ConnectionWrapper) (*Publisher, error)
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, topic string, messages ...*message.Message) (err error)
Publish publishes messages to AMQP broker. Publish is blocking until the broker has received and saved the message. Publish is always thread safe.
Watermill's topic in Publish is not mapped to AMQP's topic, but depending on configuration it can be mapped to exchange, queue or routing key. For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
type QosConfig ¶
type QosConfig struct { // With a prefetch count greater than zero, the server will deliver that many // messages to consumers before acknowledgments are received. The server ignores // this option when consumers are started with noAck because no acknowledgments // are expected or sent. // // In order to defeat that we can set the prefetch count with the value of 1. // This tells RabbitMQ not to give more than one message to a worker at a time. // Or, in other words, don't dispatch a new message to a worker until it has // processed and acknowledged the previous one. // Instead, it will dispatch it to the next worker that is not still busy. PrefetchCount int // With a prefetch size greater than zero, the server will try to keep at least // that many bytes of deliveries flushed to the network before receiving // acknowledgments from the consumers. This option is ignored when consumers are // started with noAck. PrefetchSize int // When global is true, these Qos settings apply to all existing and future // consumers on all channels on the same connection. When false, the Channel.Qos // settings will apply to all existing and future consumers on this channel. // // Please see the RabbitMQ Consumer Prefetch documentation for an explanation of // how the global flag is implemented in RabbitMQ, as it differs from the // AMQP 0.9.1 specification in that global Qos settings are limited in scope to // channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html). Global bool }
Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.
type QueueBindConfig ¶
type QueueBindConfig struct { GenerateRoutingKey func(topic string) string // When noWait is false and the queue could not be bound, the channel will be // closed with an error. NoWait bool // Optional amqpe.Table of arguments that are specific to the server's implementation of // the queue bind can be sent for queue bind types that require extra parameters. Arguments amqp.Table }
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
type QueueConfig ¶
type QueueConfig struct { // GenerateRoutingKey is generated based on the topic provided for Subscribe. GenerateName QueueNameGenerator // Durable and Non-Auto-Deleted queues will survive server restarts and remain // when there are no remaining consumers or bindings. Persistent publishings will // be restored in this queue on server restart. These queues are only able to be // bound to durable exchanges. Durable bool // Non-Durable and Auto-Deleted exchanges will be deleted when there are no // remaining bindings and not restored on server restart. This lifetime is // useful for temporary topologies that should not pollute the virtual host on // failure or after the consumers have completed. // // Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is // running including when there are no remaining bindings. This is useful for // temporary topologies that may have long delays between bindings. AutoDelete bool // Exclusive queues are only accessible by the connection that declares them and // will be deleted when the connection closes. Channels on other connections // will receive an error when attempting to declare, bind, consume, purge or // delete a queue with the same name. Exclusive bool // When noWait is true, the queue will assume to be declared on the server. A // channel exception will arrive if the conditions are met for existing queues // or attempting to modify an existing queue from a different connection. NoWait bool // Optional amqpe.Table of arguments that are specific to the server's implementation of // the queue can be sent for queue types that require extra parameters. Arguments amqp.Table }
type QueueNameGenerator ¶
QueueNameGenerator generates QueueName based on the topic.
func GenerateQueueNameConstant ¶
func GenerateQueueNameConstant(queueName string) QueueNameGenerator
GenerateQueueNameConstant generate queue name equal to queueName.
func GenerateQueueNameTopicNameWithSuffix ¶
func GenerateQueueNameTopicNameWithSuffix(suffix string) QueueNameGenerator
GenerateQueueNameTopicNameWithSuffix generates queue name equal to:
topic + "_" + suffix
type ReconnectConfig ¶
type ReconnectConfig struct { BackoffInitialInterval time.Duration BackoffRandomizationFactor float64 BackoffMultiplier float64 BackoffMaxInterval time.Duration }
func DefaultReconnectConfig ¶
func DefaultReconnectConfig() *ReconnectConfig
type Subscriber ¶
type Subscriber struct { *ConnectionWrapper // contains filtered or unexported fields }
func NewSubscriber ¶
func NewSubscriber(config Config, logger watermill.LoggerAdapter) (*Subscriber, error)
func NewSubscriberWithConnection ¶
func NewSubscriberWithConnection(config Config, logger watermill.LoggerAdapter, conn *ConnectionWrapper) (*Subscriber, error)
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes all subscriptions with their output channels.
func (*Subscriber) Subscribe ¶
Subscribe consumes messages from AMQP broker.
Watermill's topic in Subscribe is not mapped to AMQP's topic, but depending on configuration it can be mapped to exchange, queue or routing key. For detailed description of nomenclature mapping, please check "Nomenclature" paragraph in doc.go file.
func (*Subscriber) SubscribeInitialize ¶
func (s *Subscriber) SubscribeInitialize(topic string) (err error)
type TopologyBuilder ¶
type TopologyBuilder interface { BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error }
TopologyBuilder is responsible for declaring exchange, queues and queues binding.
Default TopologyBuilder is DefaultTopologyBuilder. If you need custom built topology, you should implement your own TopologyBuilder and pass it to the amqp.Config:
config := NewDurablePubSubConfig() config.TopologyBuilder = MyProCustomBuilder{}
nolint: revive // interface too long issue