Documentation
¶
Index ¶
- Constants
- func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions)
- func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions)
- func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions)
- func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions)
- func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions)
- func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions)
- func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions)
- func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions)
- func WithConsumeOptionsBindingNoWait(options *ConsumeOptions)
- func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions)
- func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions)
- func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions)
- func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions)
- func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions)
- func WithConsumeOptionsQOSGlobal(options *ConsumeOptions)
- func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions)
- func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions)
- func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions)
- func WithConsumeOptionsQueueDurable(options *ConsumeOptions)
- func WithConsumeOptionsQueueExclusive(options *ConsumeOptions)
- func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions)
- func WithConsumeOptionsQueueNoWait(options *ConsumeOptions)
- func WithConsumeOptionsQuorum(options *ConsumeOptions)
- func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions)
- func WithConsumerOptionsLogging(options *ConsumerOptions)
- func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions)
- func WithPublishOptionsAppID(appID string) func(*PublishOptions)
- func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions)
- func WithPublishOptionsContentType(contentType string) func(*PublishOptions)
- func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions)
- func WithPublishOptionsExchange(exchange string) func(*PublishOptions)
- func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions)
- func WithPublishOptionsHeaders(headers Table) func(*PublishOptions)
- func WithPublishOptionsImmediate(options *PublishOptions)
- func WithPublishOptionsMandatory(options *PublishOptions)
- func WithPublishOptionsMessageID(messageID string) func(*PublishOptions)
- func WithPublishOptionsPersistentDelivery(options *PublishOptions)
- func WithPublishOptionsPriority(priority uint8) func(*PublishOptions)
- func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions)
- func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions)
- func WithPublishOptionsType(messageType string) func(*PublishOptions)
- func WithPublishOptionsUserID(userID string) func(*PublishOptions)
- func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions)
- func WithPublisherOptionsLogging(options *PublisherOptions)
- func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *PublisherOptions)
- type Action
- type BindingExchangeOptions
- type Config
- type Confirmation
- type ConsumeOptions
- type Consumer
- type ConsumerOptions
- type Delivery
- type Handler
- type Logger
- type PublishOptions
- type Publisher
- type PublisherOptions
- type Return
- type Table
Constants ¶
const ( Transient uint8 = amqp.Transient Persistent uint8 = amqp.Persistent )
DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.
Variables ¶
This section is empty.
Functions ¶
func WithConsumeOptionsBindingExchangeArgs ¶ added in v0.6.0
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions)
WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeAutoDelete ¶ added in v0.6.0
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions)
WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeDurable ¶ added in v0.6.0
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions)
WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeInternal ¶ added in v0.6.0
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions)
WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeKind ¶ added in v0.6.0
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions)
WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeName ¶ added in v0.6.0
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions)
WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeNoWait ¶ added in v0.6.0
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions)
WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeSkipDeclare ¶ added in v0.7.0
func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions)
WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the binding exchange. Use this setting if the exchange already exists and you don't need to declare it on consumer start.
func WithConsumeOptionsBindingNoWait ¶ added in v0.3.0
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions)
WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound the channel will not be closed with an error.
func WithConsumeOptionsConcurrency ¶ added in v0.3.0
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions)
WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConsumerAutoAck ¶ added in v0.6.2
func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions)
WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false)
func WithConsumeOptionsConsumerExclusive ¶ added in v0.3.0
func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions)
WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
func WithConsumeOptionsConsumerName ¶ added in v0.3.0
func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions)
WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given
func WithConsumeOptionsConsumerNoWait ¶ added in v0.3.0
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions)
WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means it does not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.
func WithConsumeOptionsQOSGlobal ¶ added in v0.3.0
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions)
WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection
func WithConsumeOptionsQOSPrefetch ¶ added in v0.3.0
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions)
WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput. This doesn't affect the handler, messages are still processed one at a time.
func WithConsumeOptionsQueueArgs ¶ added in v0.7.0
func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions)
WithConsumeOptionsQueueArgs returns a function that sets the queue arguments
func WithConsumeOptionsQueueAutoDelete ¶ added in v0.3.0
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions)
WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will be deleted when there are no more conusmers on it
func WithConsumeOptionsQueueDurable ¶ added in v0.3.0
func WithConsumeOptionsQueueDurable(options *ConsumeOptions)
WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueExclusive ¶ added in v0.3.0
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions)
WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means it's are only accessible by the connection that declares it and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.
func WithConsumeOptionsQueueNoDeclare ¶ added in v0.7.0
func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions)
WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means the queue will be assumed to be declared on the server, and won't be declared at all.
func WithConsumeOptionsQueueNoWait ¶ added in v0.3.0
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions)
WithConsumeOptionsQueueNoWait sets the queue to nowait, which means the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.
func WithConsumeOptionsQuorum ¶ added in v0.3.0
func WithConsumeOptionsQuorum(options *ConsumeOptions)
WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumerOptionsLogger ¶ added in v0.5.0
func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions)
WithConsumerOptionsLogger sets logging to a custom interface. Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogging ¶ added in v0.3.0
func WithConsumerOptionsLogging(options *ConsumerOptions)
WithConsumerOptionsLogging sets a logger to log to stdout
func WithConsumerOptionsReconnectInterval ¶ added in v0.7.2
func WithConsumerOptionsReconnectInterval(reconnectInterval time.Duration) func(options *ConsumerOptions)
WithConsumerOptionsReconnectInterval sets the interval at which the consumer will attempt to reconnect to the rabbit server
func WithPublishOptionsAppID ¶ added in v0.7.0
func WithPublishOptionsAppID(appID string) func(*PublishOptions)
WithPublishOptionsAppID returns a function that sets the application id
func WithPublishOptionsContentEncoding ¶ added in v0.7.0
func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions)
WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e. "utf-8"
func WithPublishOptionsContentType ¶ added in v0.3.0
func WithPublishOptionsContentType(contentType string) func(*PublishOptions)
WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json"
func WithPublishOptionsCorrelationID ¶ added in v0.7.0
func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions)
WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier
func WithPublishOptionsExchange ¶ added in v0.3.0
func WithPublishOptionsExchange(exchange string) func(*PublishOptions)
WithPublishOptionsExchange returns a function that sets the exchange to publish to
func WithPublishOptionsExpiration ¶ added in v0.6.0
func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions)
WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a string value in milliseconds.
func WithPublishOptionsHeaders ¶ added in v0.6.0
func WithPublishOptionsHeaders(headers Table) func(*PublishOptions)
WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id"
func WithPublishOptionsImmediate ¶ added in v0.3.0
func WithPublishOptionsImmediate(options *PublishOptions)
WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the returns channel for you to handle
func WithPublishOptionsMandatory ¶ added in v0.3.0
func WithPublishOptionsMandatory(options *PublishOptions)
WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle
func WithPublishOptionsMessageID ¶ added in v0.7.0
func WithPublishOptionsMessageID(messageID string) func(*PublishOptions)
WithPublishOptionsMessageID returns a function that sets the message identifier
func WithPublishOptionsPersistentDelivery ¶ added in v0.3.0
func WithPublishOptionsPersistentDelivery(options *PublishOptions)
WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart. By default publishings are transient
func WithPublishOptionsPriority ¶ added in v0.7.0
func WithPublishOptionsPriority(priority uint8) func(*PublishOptions)
WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9
func WithPublishOptionsReplyTo ¶ added in v0.7.0
func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions)
WithPublishOptionsReplyTo returns a function that sets the reply to field
func WithPublishOptionsTimestamp ¶ added in v0.7.0
func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions)
WithPublishOptionsTimestamp returns a function that sets the timestamp for the message
func WithPublishOptionsType ¶ added in v0.7.0
func WithPublishOptionsType(messageType string) func(*PublishOptions)
WithPublishOptionsType returns a function that sets the message type name
func WithPublishOptionsUserID ¶ added in v0.7.0
func WithPublishOptionsUserID(userID string) func(*PublishOptions)
WithPublishOptionsUserID returns a function that sets the user id i.e. "user"
func WithPublisherOptionsLogger ¶ added in v0.5.0
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions)
WithPublisherOptionsLogger sets logging to a custom interface. Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogging ¶ added in v0.3.0
func WithPublisherOptionsLogging(options *PublisherOptions)
WithPublisherOptionsLogging sets logging to true on the consumer options
func WithPublisherOptionsReconnectInterval ¶ added in v0.7.2
func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *PublisherOptions)
WithPublisherOptionsReconnectInterval sets the interval at which the publisher will attempt to reconnect to the rabbit server
Types ¶
type Action ¶ added in v0.7.0
type Action int
Action is an action that occurs after processed this delivery
type BindingExchangeOptions ¶ added in v0.6.0
type BindingExchangeOptions struct { Name string Kind string Durable bool AutoDelete bool Internal bool NoWait bool ExchangeArgs Table Declare bool }
BindingExchangeOptions are used when binding to an exchange. it will verify the exchange is created before binding to it.
type Config ¶ added in v0.8.0
Config wraps amqp.Config Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.
type Confirmation ¶ added in v0.7.0
type Confirmation struct {
amqp.Confirmation
}
Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. Use NotifyPublish to consume these events.
type ConsumeOptions ¶
type ConsumeOptions struct { QueueDurable bool QueueAutoDelete bool QueueExclusive bool QueueNoWait bool QueueDeclare bool QueueArgs Table BindingExchange *BindingExchangeOptions BindingNoWait bool BindingArgs Table Concurrency int QOSPrefetch int QOSGlobal bool ConsumerName string ConsumerAutoAck bool ConsumerExclusive bool ConsumerNoWait bool ConsumerNoLocal bool ConsumerArgs Table }
ConsumeOptions are used to describe how a new consumer will be created.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer allows you to create and connect to queues for data consumption.
func NewConsumer ¶ added in v0.2.0
func NewConsumer(url string, config Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error)
NewConsumer returns a new Consumer connected to the given rabbitmq server
func (Consumer) Close ¶ added in v0.8.0
Close cleans up resources and closes the consumer. The consumer is not safe for reuse
func (Consumer) StartConsuming ¶ added in v0.1.0
func (consumer Consumer) StartConsuming( handler Handler, queue string, routingKeys []string, optionFuncs ...func(*ConsumeOptions), ) error
StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). The provided handler is called once for each message. If the provided queue doesn't exist, it will be created on the cluster
type ConsumerOptions ¶
ConsumerOptions are used to describe a consumer's configuration. Logging set to true will enable the consumer to print to stdout Logger specifies a custom Logger interface implementation overruling Logging.
type Delivery ¶
Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.
type Logger ¶ added in v0.5.0
type Logger interface {
Printf(string, ...interface{})
}
Logger is the interface to send logs to. It can be set using WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type PublishOptions ¶
type PublishOptions struct { Exchange string // Mandatory fails to publish if there are no queues // bound to the routing key Mandatory bool // Immediate fails to publish if there are no consumers // that can ack bound to the queue on the routing key Immediate bool // MIME content type ContentType string // Transient (0 or 1) or Persistent (2) DeliveryMode uint8 // Expiration time in ms that a message will expire from a queue. // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers Expiration string // MIME content encoding ContentEncoding string // 0 to 9 Priority uint8 // correlation identifier CorrelationID string // address to to reply to (ex: RPC) ReplyTo string // message identifier MessageID string // message timestamp Timestamp time.Time // message type name Type string // creating user id - ex: "guest" UserID string // creating application id AppID string // Application or exchange specific fields, // the headers exchange will inspect this field. Headers Table }
PublishOptions are used to control how data is published
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher allows you to publish messages safely across an open connection
func NewPublisher ¶ added in v0.2.0
func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptions)) (*Publisher, error)
NewPublisher returns a new publisher with an open channel to the cluster. If you plan to enforce mandatory or immediate publishing, those failures will be reported on the channel of Returns that you should setup a listener on. Flow controls are automatically handled as they are sent from the server, and publishing will fail with an error when the server is requesting a slowdown
func (Publisher) Close ¶ added in v0.8.0
Close closes the publisher and releases resources The publisher should be discarded as it's not safe for re-use
func (*Publisher) NotifyPublish ¶ added in v0.7.0
func (publisher *Publisher) NotifyPublish() <-chan Confirmation
NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
func (*Publisher) NotifyReturn ¶ added in v0.7.0
NotifyReturn registers a listener for basic.return methods. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
type PublisherOptions ¶ added in v0.1.0
PublisherOptions are used to describe a publisher's configuration. Logging set to true will enable the consumer to print to stdout
type Return ¶
Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.
type Table ¶
type Table map[string]interface{}
Table stores user supplied fields of the following types:
bool byte float32 float64 int int16 int32 int64 nil string time.Time amqp.Decimal amqp.Table []byte []interface{} - containing above types
Functions taking a table will immediately fail when the table contains a value of an unsupported type.
The caller must be specific in which precision of integer it wishes to encode.
Use a type assertion when reading values from a table for type conversion.
RabbitMQ expects int32 for integer values.