Documentation ¶
Index ¶
- Constants
- Variables
- func ContextWithQueueName(ctx context.Context, queueName string) context.Context
- func ContextWithShutdownChan(ctx context.Context, ch chan struct{}) context.Context
- func QueueNameFromContext(ctx context.Context) (string, bool)
- func ShutdownChanFromContext(ctx context.Context) (chan struct{}, bool)
- func WithClientOptionsConsumerAutoAck(autoAck bool) func(*ClientOptions)
- func WithClientOptionsConsumerExclusive(options *ClientOptions)
- func WithClientOptionsConsumerName(consumerName string) func(*ClientOptions)
- func WithClientOptionsLogger(log Logger) func(options *ClientOptions)
- func WithClientOptionsLogging(options *ClientOptions)
- func WithClientOptionsQueueArgs(args Table) func(*ClientOptions)
- func WithClientOptionsQueueAutoDelete(options *ClientOptions)
- func WithClientOptionsQueueDurable(options *ClientOptions)
- func WithClientOptionsQueueExclusive(options *ClientOptions)
- func WithClientPublishOptionsImmediate(options *ClientOptions)
- func WithClientPublishOptionsMandatory(options *ClientOptions)
- func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions)
- func WithConnectionOptionsLogger(log Logger) func(options *ConnectionOptions)
- func WithConnectionOptionsLogging(options *ConnectionOptions)
- func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions)
- func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions)
- func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions)
- func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions)
- func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions)
- func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions)
- func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions)
- func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions)
- func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions)
- func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions)
- func WithConsumerOptionsExchangeDurable(options *ConsumerOptions)
- func WithConsumerOptionsExchangeInternal(options *ConsumerOptions)
- func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions)
- func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions)
- func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions)
- func WithConsumerOptionsExchangeOptions(exchangeOptions ExchangeOptions) func(*ConsumerOptions)
- func WithConsumerOptionsExchangePassive(options *ConsumerOptions)
- func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions)
- func WithConsumerOptionsLogging(options *ConsumerOptions)
- func WithConsumerOptionsQOSGlobal(options *ConsumerOptions)
- func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions)
- func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions)
- func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions)
- func WithConsumerOptionsQueueDurable(options *ConsumerOptions)
- func WithConsumerOptionsQueueExclusive(options *ConsumerOptions)
- func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions)
- func WithConsumerOptionsQueueNoWait(options *ConsumerOptions)
- func WithConsumerOptionsQueuePassive(options *ConsumerOptions)
- func WithConsumerOptionsQueueQuorum(options *ConsumerOptions)
- func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions)
- func WithConsumerOptionsRoutingKeys(routingKeys []string) func(*ConsumerOptions)
- func WithPublishOptionsAppID(appID string) func(*PublishOptions)
- func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions)
- func WithPublishOptionsContentType(contentType string) func(*PublishOptions)
- func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions)
- func WithPublishOptionsExchange(exchange string) func(*PublishOptions)
- func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions)
- func WithPublishOptionsHeaders(headers Table) func(*PublishOptions)
- func WithPublishOptionsImmediate(options *PublishOptions)
- func WithPublishOptionsMandatory(options *PublishOptions)
- func WithPublishOptionsMessageID(messageID string) func(*PublishOptions)
- func WithPublishOptionsPersistentDelivery(options *PublishOptions)
- func WithPublishOptionsPriority(priority uint8) func(*PublishOptions)
- func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions)
- func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions)
- func WithPublishOptionsType(messageType string) func(*PublishOptions)
- func WithPublishOptionsUserID(userID string) func(*PublishOptions)
- func WithPublisherOptionsConfirm(options *PublisherOptions)
- func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions)
- func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions)
- func WithPublisherOptionsExchangeDeclare(options *PublisherOptions)
- func WithPublisherOptionsExchangeDurable(options *PublisherOptions)
- func WithPublisherOptionsExchangeInternal(options *PublisherOptions)
- func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions)
- func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions)
- func WithPublisherOptionsExchangeNoWait(options *PublisherOptions)
- func WithPublisherOptionsExchangePassive(options *PublisherOptions)
- func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions)
- func WithPublisherOptionsLogging(options *PublisherOptions)
- type Action
- type Binding
- type BindingOptions
- type ClientMiddlewareFunc
- type ClientOptions
- type Config
- type Confirmation
- type Conn
- type ConnectionOptions
- type ConsumeMiddlewareFunc
- type ConsumeOptions
- type Consumer
- type ConsumerOptions
- type Delivery
- type ExchangeOptions
- type Handler
- type HandlerFunc
- type Logger
- type OnStartedFunc
- type PublishOptions
- type Publisher
- func (publisher *Publisher) Close(ctx context.Context)
- func (publisher *Publisher) NotifyPublish(handler func(p Confirmation))
- func (publisher *Publisher) NotifyReturn(handler func(r Return))
- func (publisher *Publisher) Publish(data []byte, routingKeys []string, optionFuncs ...func(*PublishOptions)) error
- func (publisher *Publisher) PublishWithContext(ctx context.Context, data []byte, routingKeys []string, ...) error
- func (publisher *Publisher) PublishWithDeferredConfirmWithContext(ctx context.Context, data []byte, routingKeys []string, ...) (PublisherConfirmation, error)
- type PublisherConfirmation
- type PublisherOptions
- type QueueOptions
- type Request
- func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
- func (r *Request) AfterTimeout() <-chan time.Time
- func (r *Request) WithBody(b []byte) *Request
- func (r *Request) WithContentType(ct string) *Request
- func (r *Request) WithContext(ctx context.Context) *Request
- func (r *Request) WithCorrelationID(id string) *Request
- func (r *Request) WithExchange(e string) *Request
- func (r *Request) WithHeaders(h amqp.Table) *Request
- func (r *Request) WithResponse(wr bool) *Request
- func (r *Request) WithRoutingKey(rk string) *Request
- func (r *Request) WithTimeout(t time.Duration) *Request
- func (r *Request) Write(p []byte) (int, error)
- func (r *Request) WriteHeader(header string, value interface{})
- type RequestMap
- type ResponseWriter
- type Return
- type RpcClient
- func (c *RpcClient) AddMiddleware(m ClientMiddlewareFunc) *RpcClient
- func (c *RpcClient) Close(ctx context.Context)
- func (c *RpcClient) OnStarted(f OnStartedFunc)
- func (c *RpcClient) PublishWithContext(ctx context.Context, data []byte, routingKey string) ([]byte, error)
- func (c *RpcClient) Send(r *Request) (*amqp.Delivery, error)
- func (c *RpcClient) Stop()
- func (c *RpcClient) WithConfirmMode(confirmMode bool) *RpcClient
- func (c *RpcClient) WithConsumeSettings(s ConsumeOptions) *RpcClient
- func (c *RpcClient) WithLogger(f Logger) *RpcClient
- func (c *RpcClient) WithMaxRetries(n int) *RpcClient
- func (c *RpcClient) WithPublishSettings(s PublishOptions) *RpcClient
- func (c *RpcClient) WithQueueDeclareSettings(s QueueOptions) *RpcClient
- func (c *RpcClient) WithTimeout(t time.Duration) *RpcClient
- type SendFunc
- type Table
Constants ¶
const ( Transient uint8 = amqp.Transient Persistent uint8 = amqp.Persistent )
DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
This remains typed as uint8 to match Publishing.DeliveryMode. Other delivery modes specific to custom queue implementations are not enumerated here.
Variables ¶
var ( // ErrRequestReturned can be returned by Client#Send() when the server // returns the message. For example, when mandatory is set but the message // cannot be routed. ErrRequestReturned = errors.New("publishing returned") // ErrRequestRejected can be returned by Client#Send() when the server Backs // the message. This can happen if there is some problem inside the amqp // server. To check if the error returned is an ErrRequestReturned error, use // errors.Is(err, ErrRequestRejected). ErrRequestRejected = errors.New("publishing Backed") // ErrRequestTimeout is an error returned when a client request does not // receive a response within the client timeout duration. To check if the // error returned is an ErrRequestTimeout error, use errors.Is(err, // ErrRequestTimeout). ErrRequestTimeout = errors.New("request timed out") // ErrUnexpectedConnClosed is returned by ListenAndServe() if the server // shuts down without calling Stop() and if AMQP does not give an error // when said shutdown happens. ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error") )
Functions ¶
func ContextWithQueueName ¶ added in v1.0.10
ContextWithQueueName adds the given queueName to the provided context.
func ContextWithShutdownChan ¶ added in v1.0.10
ContextWithShutdownChan adds a shutdown chan to the given context.
func QueueNameFromContext ¶ added in v1.0.10
QueueNameFromContext returns the queue name for the current request.
func ShutdownChanFromContext ¶ added in v1.0.10
ShutdownChanFromContext returns the shutdown chan.
func WithClientOptionsConsumerAutoAck ¶ added in v1.0.10
func WithClientOptionsConsumerAutoAck(autoAck bool) func(*ClientOptions)
WithClientOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset, the default will be used (false)
func WithClientOptionsConsumerExclusive ¶ added in v1.0.10
func WithClientOptionsConsumerExclusive(options *ClientOptions)
WithClientOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
func WithClientOptionsConsumerName ¶ added in v1.0.10
func WithClientOptionsConsumerName(consumerName string) func(*ClientOptions)
WithClientOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given
func WithClientOptionsLogger ¶ added in v1.0.10
func WithClientOptionsLogger(log Logger) func(options *ClientOptions)
WithClientOptionsLogger sets logging to a custom interface. Use WithClientOptionsLogging to just log to stdout.
func WithClientOptionsLogging ¶ added in v1.0.10
func WithClientOptionsLogging(options *ClientOptions)
WithClientOptionsLogging sets logging to true on the client options and sets the
func WithClientOptionsQueueArgs ¶ added in v1.0.10
func WithClientOptionsQueueArgs(args Table) func(*ClientOptions)
WithClientOptionsQueueArgs adds optional args to the queue
func WithClientOptionsQueueAutoDelete ¶ added in v1.0.10
func WithClientOptionsQueueAutoDelete(options *ClientOptions)
WithClientOptionsQueueAutoDelete ensures the queue is an auto-delete queue
func WithClientOptionsQueueDurable ¶ added in v1.0.10
func WithClientOptionsQueueDurable(options *ClientOptions)
WithClientOptionsQueueDurable ensures the queue is a durable queue
func WithClientOptionsQueueExclusive ¶ added in v1.0.10
func WithClientOptionsQueueExclusive(options *ClientOptions)
WithClientOptionsQueueExclusive ensures the queue is an exclusive queue
func WithClientPublishOptionsImmediate ¶ added in v1.0.10
func WithClientPublishOptionsImmediate(options *ClientOptions)
WithClientPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the return channel for you to handle
func WithClientPublishOptionsMandatory ¶ added in v1.0.10
func WithClientPublishOptionsMandatory(options *ClientOptions)
WithClientPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key, a message will be sent back on the return channel for you to handle
func WithConnectionOptionsConfig ¶ added in v1.0.4
func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions)
WithConnectionOptionsConfig sets the Config used in the connection
func WithConnectionOptionsLogger ¶ added in v1.0.4
func WithConnectionOptionsLogger(log Logger) func(options *ConnectionOptions)
WithConnectionOptionsLogger sets logging to true on the consumer options and sets the
func WithConnectionOptionsLogging ¶ added in v1.0.4
func WithConnectionOptionsLogging(options *ConnectionOptions)
WithConnectionOptionsLogging sets logging to true on the consumer options and sets the
func WithConnectionOptionsReconnectInterval ¶ added in v1.0.4
func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions)
WithConnectionOptionsReconnectInterval sets the reconnection interval
func WithConsumerOptionsBinding ¶ added in v1.0.4
func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions)
WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to the zero value. If you want to declare your bindings for example, be sure to set Declare=true
func WithConsumerOptionsConcurrency ¶ added in v1.0.4
func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions)
WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages
func WithConsumerOptionsConsumerAutoAck ¶ added in v1.0.4
func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions)
WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false)
func WithConsumerOptionsConsumerExclusive ¶ added in v1.0.4
func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions)
WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
func WithConsumerOptionsConsumerName ¶ added in v1.0.4
func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions)
WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given
func WithConsumerOptionsConsumerNoWait ¶ added in v1.0.4
func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions)
WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means it does not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.
func WithConsumerOptionsExchangeArgs ¶ added in v1.0.4
func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions)
WithConsumerOptionsExchangeArgs adds optional args to the exchange
func WithConsumerOptionsExchangeAutoDelete ¶ added in v1.0.4
func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions)
WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithConsumerOptionsExchangeDeclare ¶ added in v1.0.4
func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions)
WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance
func WithConsumerOptionsExchangeDurable ¶ added in v1.0.4
func WithConsumerOptionsExchangeDurable(options *ConsumerOptions)
WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange
func WithConsumerOptionsExchangeInternal ¶ added in v1.0.4
func WithConsumerOptionsExchangeInternal(options *ConsumerOptions)
WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange
func WithConsumerOptionsExchangeKind ¶ added in v1.0.4
func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions)
WithConsumerOptionsExchangeKind ensures the queue is a durable queue
func WithConsumerOptionsExchangeName ¶ added in v1.0.4
func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions)
WithConsumerOptionsExchangeName sets the exchange name
func WithConsumerOptionsExchangeNoWait ¶ added in v1.0.4
func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions)
WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithConsumerOptionsExchangeOptions ¶ added in v1.0.14
func WithConsumerOptionsExchangeOptions(exchangeOptions ExchangeOptions) func(*ConsumerOptions)
WithConsumerOptionsExchangeOptions adds a new exchange to the consumer, this should probably only be used if you want to to consume from multiple exchanges on the same consumer
func WithConsumerOptionsExchangePassive ¶ added in v1.0.4
func WithConsumerOptionsExchangePassive(options *ConsumerOptions)
WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange
func WithConsumerOptionsLogger ¶
func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions)
WithConsumerOptionsLogger sets logging to a custom interface. Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogging ¶
func WithConsumerOptionsLogging(options *ConsumerOptions)
WithConsumerOptionsLogging uses a default logger that writes to std out
func WithConsumerOptionsQOSGlobal ¶ added in v1.0.4
func WithConsumerOptionsQOSGlobal(options *ConsumerOptions)
WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means these QOS settings apply to ALL existing and future consumers on all channels on the same connection
func WithConsumerOptionsQOSPrefetch ¶ added in v1.0.4
func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions)
WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput. This doesn't affect the handler, messages are still processed one at a time.
func WithConsumerOptionsQueueArgs ¶ added in v1.0.4
func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions)
WithConsumerOptionsQueueArgs adds optional args to the queue
func WithConsumerOptionsQueueAutoDelete ¶ added in v1.0.4
func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions)
WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue
func WithConsumerOptionsQueueDurable ¶ added in v1.0.4
func WithConsumerOptionsQueueDurable(options *ConsumerOptions)
WithConsumerOptionsQueueDurable ensures the queue is a durable queue
func WithConsumerOptionsQueueExclusive ¶ added in v1.0.4
func WithConsumerOptionsQueueExclusive(options *ConsumerOptions)
WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue
func WithConsumerOptionsQueueNoDeclare ¶ added in v1.0.4
func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions)
WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's existance upon startup
func WithConsumerOptionsQueueNoWait ¶ added in v1.0.4
func WithConsumerOptionsQueueNoWait(options *ConsumerOptions)
WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue
func WithConsumerOptionsQueuePassive ¶ added in v1.0.4
func WithConsumerOptionsQueuePassive(options *ConsumerOptions)
WithConsumerOptionsQueuePassive ensures the queue is a passive queue
func WithConsumerOptionsQueueQuorum ¶ added in v1.0.4
func WithConsumerOptionsQueueQuorum(options *ConsumerOptions)
WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means multiple nodes in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumerOptionsRoutingKey ¶ added in v1.0.4
func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions)
WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options
func WithConsumerOptionsRoutingKeys ¶ added in v1.0.4
func WithConsumerOptionsRoutingKeys(routingKeys []string) func(*ConsumerOptions)
WithConsumerOptionsRoutingKeys binds the queue to routingKeys with the default binding options
func WithPublishOptionsAppID ¶
func WithPublishOptionsAppID(appID string) func(*PublishOptions)
WithPublishOptionsAppID returns a function that sets the application id
func WithPublishOptionsContentEncoding ¶
func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions)
WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e. "utf-8"
func WithPublishOptionsContentType ¶
func WithPublishOptionsContentType(contentType string) func(*PublishOptions)
WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json"
func WithPublishOptionsCorrelationID ¶
func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions)
WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier
func WithPublishOptionsExchange ¶
func WithPublishOptionsExchange(exchange string) func(*PublishOptions)
WithPublishOptionsExchange returns a function that sets the exchange to publish to
func WithPublishOptionsExpiration ¶
func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions)
WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a string value in milliseconds.
func WithPublishOptionsHeaders ¶
func WithPublishOptionsHeaders(headers Table) func(*PublishOptions)
WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id"
func WithPublishOptionsImmediate ¶
func WithPublishOptionsImmediate(options *PublishOptions)
WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available to immediately handle the new message, a message will be sent back on the returns channel for you to handle
func WithPublishOptionsMandatory ¶
func WithPublishOptionsMandatory(options *PublishOptions)
WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not bound to the routing key a message will be sent back on the returns channel for you to handle
func WithPublishOptionsMessageID ¶
func WithPublishOptionsMessageID(messageID string) func(*PublishOptions)
WithPublishOptionsMessageID returns a function that sets the message identifier
func WithPublishOptionsPersistentDelivery ¶
func WithPublishOptionsPersistentDelivery(options *PublishOptions)
WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart. By default publishings are transient
func WithPublishOptionsPriority ¶
func WithPublishOptionsPriority(priority uint8) func(*PublishOptions)
WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9
func WithPublishOptionsReplyTo ¶
func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions)
WithPublishOptionsReplyTo returns a function that sets the reply to field
func WithPublishOptionsTimestamp ¶
func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions)
WithPublishOptionsTimestamp returns a function that sets the timestamp for the message
func WithPublishOptionsType ¶
func WithPublishOptionsType(messageType string) func(*PublishOptions)
WithPublishOptionsType returns a function that sets the message type name
func WithPublishOptionsUserID ¶
func WithPublishOptionsUserID(userID string) func(*PublishOptions)
WithPublishOptionsUserID returns a function that sets the user id i.e. "user"
func WithPublisherOptionsConfirm ¶ added in v1.0.4
func WithPublisherOptionsConfirm(options *PublisherOptions)
WithPublisherOptionsConfirm enables confirm mode on the connection this is required if publisher confirmations should be used
func WithPublisherOptionsExchangeArgs ¶ added in v1.0.4
func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions)
WithPublisherOptionsExchangeArgs adds optional args to the exchange
func WithPublisherOptionsExchangeAutoDelete ¶ added in v1.0.4
func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions)
WithPublisherOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange
func WithPublisherOptionsExchangeDeclare ¶ added in v1.0.4
func WithPublisherOptionsExchangeDeclare(options *PublisherOptions)
WithPublisherOptionsExchangeDeclare will create the exchange if it doesn't exist
func WithPublisherOptionsExchangeDurable ¶ added in v1.0.4
func WithPublisherOptionsExchangeDurable(options *PublisherOptions)
WithPublisherOptionsExchangeDurable ensures the exchange is a durable exchange
func WithPublisherOptionsExchangeInternal ¶ added in v1.0.4
func WithPublisherOptionsExchangeInternal(options *PublisherOptions)
WithPublisherOptionsExchangeInternal ensures the exchange is an internal exchange
func WithPublisherOptionsExchangeKind ¶ added in v1.0.4
func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions)
WithPublisherOptionsExchangeKind ensures the queue is a durable queue
func WithPublisherOptionsExchangeName ¶ added in v1.0.4
func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions)
WithPublisherOptionsExchangeName sets the exchange name
func WithPublisherOptionsExchangeNoWait ¶ added in v1.0.4
func WithPublisherOptionsExchangeNoWait(options *PublisherOptions)
WithPublisherOptionsExchangeNoWait ensures the exchange is a no-wait exchange
func WithPublisherOptionsExchangePassive ¶ added in v1.0.4
func WithPublisherOptionsExchangePassive(options *PublisherOptions)
WithPublisherOptionsExchangePassive ensures the exchange is a passive exchange
func WithPublisherOptionsLogger ¶
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions)
WithPublisherOptionsLogger sets logging to a custom interface. Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogging ¶
func WithPublisherOptionsLogging(options *PublisherOptions)
WithPublisherOptionsLogging sets logging to true on the publisher options and sets the
Types ¶
type Action ¶
type Action int
Action is an action that occurs after processed this delivery
const ( // Ack default ack this msg after you have successfully processed this delivery. Ack Action = iota // NackDiscard the message will be dropped or delivered to a server configured dead-letter queue. NackDiscard // NackRequeue deliver this message to a different consumer. NackRequeue // Manual Message acknowledgement is left to the user using the msg.Ack() method Manual )
type Binding ¶ added in v1.0.4
type Binding struct { RoutingKey string BindingOptions }
Binding describes the binding of a queue to a routing key on an exchange
type BindingOptions ¶ added in v1.0.4
BindingOptions describes the options a binding can have
type ClientMiddlewareFunc ¶ added in v1.0.9
ClientMiddlewareFunc represents a function that can be used as middleware.
type ClientOptions ¶ added in v1.0.10
type ClientOptions struct { ConsumeOptions ConsumeOptions QueueOptions QueueOptions ExchangeOptions ExchangeOptions PublishOptions PublishOptions Logger logger.Logger // ConfirmMode puts the channel that messages are published over in // confirmation mode. // This makes sending requests more reliable at the cost // of some performance. // The server must confirm each publishing. // See https://www.rabbitmq.com/confirms.html#publisher-confirms ConfirmMode bool }
type Config ¶
Config wraps amqp.Config Config is used in DialConfig and Open to specify the desired tuning parameters used during a connection open handshake. The negotiated tuning will be stored in the returned connection's Config field.
type Confirmation ¶
type Confirmation struct { amqp.Confirmation ReconnectionCount int }
Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. Use NotifyPublish to consume these events. ReconnectionCount is useful in that each time it increments, the DeliveryTag is reset to 0, meaning you can use ReconnectionCount+DeliveryTag to ensure uniqueness
type Conn ¶ added in v1.0.4
type Conn struct {
// contains filtered or unexported fields
}
Conn manages the connection to a rabbit cluster it is intended to be shared across publishers and consumers
func NewConn ¶ added in v1.0.4
func NewConn(ctx context.Context, url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)
NewConn creates a new connection manager
type ConnectionOptions ¶ added in v1.0.4
ConnectionOptions are used to describe how a new consumer will be created.
type ConsumeMiddlewareFunc ¶ added in v1.0.10
type ConsumeMiddlewareFunc func(next HandlerFunc) HandlerFunc
ConsumeMiddlewareFunc represent a function that can be used as middleware.
For example:
func myMiddle(next HandlerFunc) HandlerFunc { // Preinitialization of middleware here. return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) { // Before handler execution here. // Execute the handler. next(ctx, rw, d) // After execution here. } } s := New("url") // Add middleware to specific handler. s.Bind(DirectBinding("foobar", myMiddle(HandlerFunc))) // Add middleware to all handlers on the server. s.AddMiddleware(myMiddle)
type ConsumeOptions ¶
type ConsumeOptions struct { Name string AutoAck bool Exclusive bool NoWait bool NoLocal bool Args Table }
ConsumeOptions are used to configure the consumer on the rabbit server
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer allows you to create and connect to queues for data consumption.
func NewConsumer ¶
func NewConsumer( ctx context.Context, conn *Conn, handler Handler, queue string, optionFuncs ...func(*ConsumerOptions), ) (*Consumer, error)
NewConsumer returns a new Consumer connected to the given rabbitmq server it also starts consuming on the given connection with automatic reconnection handling Do not reuse the returned consumer for anything other than to close it
type ConsumerOptions ¶
type ConsumerOptions struct { ConsumerOptions ConsumeOptions QueueOptions QueueOptions ExchangeOptions []ExchangeOptions Concurrency int Logger logger.Logger QOSPrefetch int QOSGlobal bool }
ConsumerOptions are used to describe how a new consumer will be created. If QueueOptions is not nil, the options will be used to declare a queue If ExchangeOptions is not nil, it will be used to declare an exchange If there are Bindings, the queue will be bound to them
type Delivery ¶
Delivery captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer from Channel.Consume or Channel.Get.
type ExchangeOptions ¶ added in v1.0.4
type ExchangeOptions struct { Name string Kind string // possible values: empty string for default exchange or direct, topic, fanout Durable bool AutoDelete bool Internal bool NoWait bool Passive bool // if false, a missing exchange will be created on the server Args Table Declare bool Bindings []Binding }
ExchangeOptions are used to configure an exchange. If the Passive flag is set the client will only check if the exchange exists on the server and that the settings match, no creation attempt will be made.
type Handler ¶
type Handler func(ctx context.Context, rw *ResponseWriter, d Delivery) (action Action)
Handler defines the handler of each Delivery and return Action
type HandlerFunc ¶ added in v1.0.10
type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)
HandlerFunc is the function that handles all request based on the routing key.
func ConsumeMiddlewareChain ¶ added in v1.0.10
func ConsumeMiddlewareChain(next HandlerFunc, m ...ConsumeMiddlewareFunc) HandlerFunc
ConsumeMiddlewareChain will attach all given middlewares to your HandlerFunc. The middlewares will be executed in the same order as your input.
For example:
s := New("url") s.Bind(DirectBinding( "foobar", ServerMiddlewareChain( myHandler, middlewareOne, middlewareTwo, middlewareThree, ), ))
type Logger ¶
Logger is describing a logging structure. It can be set using WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type OnStartedFunc ¶ added in v1.0.9
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
type PublishOptions ¶
type PublishOptions struct { Exchange string // Mandatory fails to publish if there are no queues // bound to the routing key Mandatory bool // Immediate fails to publish if there are no consumers // that can ack bound to the queue on the routing key Immediate bool // MIME content type ContentType string // Transient (0 or 1) or Persistent (2) DeliveryMode uint8 // Expiration time in ms that a message will expire from a queue. // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers Expiration string // MIME content encoding ContentEncoding string // 0 to 9 Priority uint8 // correlation identifier CorrelationID string // address to reply to (ex: RPC) ReplyTo string // message identifier MessageID string // message timestamp Timestamp time.Time // message type name Type string // creating user id - ex: "guest" UserID string // creating application id AppID string // Application or exchange specific fields, // the headers exchange will inspect this field. Headers Table }
PublishOptions are used to control how data is published
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher allows you to publish messages safely across an open connection
func NewPublisher ¶
func NewPublisher(ctx context.Context, conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error)
NewPublisher returns a new publisher with an open channel to the cluster. If you plan to enforce mandatory or immediate publishing, those failures will be reported on the channel of Returns that you should setup a listener on. Flow controls are automatically handled as they are sent from the server, and publishing will fail with an error when the server is requesting a slowdown
func (*Publisher) Close ¶
Close closes the publisher and releases resources The publisher should be discarded as it's not safe for re-use Only call Close() once
func (*Publisher) NotifyPublish ¶
func (publisher *Publisher) NotifyPublish(handler func(p Confirmation))
NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option These notifications are shared across an entire connection, so if you're creating multiple publishers on the same connection keep that in mind
func (*Publisher) NotifyReturn ¶
NotifyReturn registers a listener for basic.return methods. These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. These notifications are shared across an entire connection, so if you're creating multiple publishers on the same connection keep that in mind
func (*Publisher) Publish ¶
func (publisher *Publisher) Publish( data []byte, routingKeys []string, optionFuncs ...func(*PublishOptions), ) error
Publish publishes the provided data to the given routing keys over the connection.
func (*Publisher) PublishWithContext ¶ added in v1.0.4
func (publisher *Publisher) PublishWithContext( ctx context.Context, data []byte, routingKeys []string, optionFuncs ...func(*PublishOptions), ) error
PublishWithContext publishes the provided data to the given routing keys over the connection.
func (*Publisher) PublishWithDeferredConfirmWithContext ¶ added in v1.0.4
func (publisher *Publisher) PublishWithDeferredConfirmWithContext( ctx context.Context, data []byte, routingKeys []string, optionFuncs ...func(*PublishOptions), ) (PublisherConfirmation, error)
PublishWithContext publishes the provided data to the given routing keys over the connection. if the publisher is in confirm mode (which can be either done by calling `NotifyPublish` with a custom handler or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned. This confirmation can be used to check if the message was actually published or wait for this to happen. If the publisher is not in confirm mode, the returned confirmation will always be nil.
type PublisherConfirmation ¶ added in v1.0.4
type PublisherConfirmation []*amqp.DeferredConfirmation
type PublisherOptions ¶
type PublisherOptions struct { ExchangeOptions ExchangeOptions Logger Logger ConfirmMode bool }
PublisherOptions are used to describe a publisher's configuration. Logger is a custom logging interface.
type QueueOptions ¶ added in v1.0.4
type QueueOptions struct { Name string Durable bool AutoDelete bool Exclusive bool NoWait bool Passive bool // if false, a missing queue will be created on the server Args Table Declare bool }
QueueOptions are used to configure a queue. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception.
type Request ¶ added in v1.0.9
type Request struct { // Exchange is the exchange to which the rquest will be published when // passing it to the clients send function. Exchange string // Routing key is the routing key that will be used in the amqp.Publishing // request. RoutingKey string // Reply is a boolean value telling if the request should wait for a reply // or just send the request without waiting. Reply bool // Timeout is the time we should wait after a request is published before // we assume the request got lost. Timeout time.Duration // Publishing is the publising that are going to be published. Publishing amqp.Publishing // Context is a context which you can use to pass data from where the // request is created to middlewares. By default this will be a // context.Background() Context context.Context //nolint:containedctx // Needed in the struct. // contains filtered or unexported fields }
Request is a requet to perform with the client.
func NewRequest ¶ added in v1.0.9
NewRequest will generate a new request to be published. The default request will use the content type "text/plain" and always wait for reply.
func (*Request) AddMiddleware ¶ added in v1.0.9
func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
AddMiddleware will add a middleware which will be executed when the request is published.
func (*Request) AfterTimeout ¶ added in v1.0.9
AfterTimeout waits for the duration of the timeout.
func (*Request) WithBody ¶ added in v1.0.9
WithBody will convert a string to a byte slice and add as the body passed for the request.
func (*Request) WithContentType ¶ added in v1.0.9
WithContentType will update the content type passed in the header of the request. This value will bee set as the ContentType in the amqp.Publishing type but also preserved as a header value.
func (*Request) WithContext ¶ added in v1.0.9
WithContext will set the context on the request.
func (*Request) WithCorrelationID ¶ added in v1.0.9
WithCorrelationID will add/overwrite the correlation ID used for the request and set it on the Publishing.
func (*Request) WithExchange ¶ added in v1.0.9
WithExchange will set the exchange on to which the request will be published.
func (*Request) WithHeaders ¶ added in v1.0.9
WithHeaders will set the full amqp.Table as the headers for the request. Note that this will overwrite anything previously set on the headers.
func (*Request) WithResponse ¶ added in v1.0.9
WithResponse sets the value determining wether the request should wait for a response or not. A request that does not require a response will only catch errors occurring before the reuqest has been published.
func (*Request) WithRoutingKey ¶ added in v1.0.9
WithRoutingKey will set the routing key for the request.
func (*Request) WithTimeout ¶ added in v1.0.9
WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.
func (*Request) Write ¶ added in v1.0.9
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*Request) WriteHeader ¶ added in v1.0.9
WriteHeader will write a header for the specified key.
type RequestMap ¶ added in v1.0.9
type RequestMap struct {
// contains filtered or unexported fields
}
RequestMap keeps track of requests based on their DeliveryTag and/or CorrelationID.
func (*RequestMap) Delete ¶ added in v1.0.9
func (m *RequestMap) Delete(r *Request)
Delete will remove r from m.
func (*RequestMap) GetByCorrelationID ¶ added in v1.0.9
func (m *RequestMap) GetByCorrelationID(key string) (*Request, bool)
GetByCorrelationID returns the request with the provided correlation id.
func (*RequestMap) GetByDeliveryTag ¶ added in v1.0.9
func (m *RequestMap) GetByDeliveryTag(key uint64) (*Request, bool)
GetByDeliveryTag returns the request with the provided delivery tag.
func (*RequestMap) Set ¶ added in v1.0.9
func (m *RequestMap) Set(r *Request)
Set will add r to m, so it can be fetched later using its correlation id or delivery tag.
type ResponseWriter ¶ added in v1.0.10
type ResponseWriter struct { Publishing *amqp.Publishing Mandatory bool Immediate bool }
ResponseWriter is used by a handler to construct an RPC response. The ResponseWriter may NOT be used after the handler has returned.
Because the ResponseWriter implements io.Writer you can for example use it to write json:
encoder := json.NewEncoder(responseWriter) encoder.Encode(dataObject)
func NewResponseWriter ¶ added in v1.0.10
func NewResponseWriter(p *amqp.Publishing) *ResponseWriter
NewResponseWriter will create a new response writer with given amqp.Publishing.
func (*ResponseWriter) Write ¶ added in v1.0.10
func (rw *ResponseWriter) Write(p []byte) (int, error)
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*ResponseWriter) WriteHeader ¶ added in v1.0.10
func (rw *ResponseWriter) WriteHeader(header string, value interface{})
WriteHeader will write a header for the specified key.
type Return ¶
Return captures a flattened struct of fields returned by the server when a Publishing is unable to be delivered either due to the `mandatory` flag set and no route found, or `immediate` flag set and no free consumer.
type RpcClient ¶ added in v1.0.6
type RpcClient struct { // Sender is the main send function called after all middlewares has been // chained and called. // This field can be overridden to simplify testing. Sender SendFunc // contains filtered or unexported fields }
func NewRpcClient ¶ added in v1.0.6
func (*RpcClient) AddMiddleware ¶ added in v1.0.9
func (c *RpcClient) AddMiddleware(m ClientMiddlewareFunc) *RpcClient
AddMiddleware will add a middleware which will be executed on request.
func (*RpcClient) Close ¶ added in v1.0.6
Close closes the publisher and releases resources The publisher should be discarded as it's not safe for re-use Only call Close() once
func (*RpcClient) OnStarted ¶ added in v1.0.9
func (c *RpcClient) OnStarted(f OnStartedFunc)
OnStarted can be used to hook into the connections/channels that the client is using. This can be useful if you want more control over amqp directly. Note that since the client is lazy and won't connect until the first .Send() the provided OnStartedFunc won't be called until then. Also note that this is blocking and the client won't continue its startup until this function has finished executing.
client := NewClient(url) client.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) { // Do something with amqp connections/channels. })
func (*RpcClient) PublishWithContext ¶ added in v1.0.9
func (c *RpcClient) PublishWithContext( ctx context.Context, data []byte, routingKey string, ) ([]byte, error)
PublishWithContext rpc request the provided data to the given routing key over the connection.
func (*RpcClient) Stop ¶ added in v1.0.9
func (c *RpcClient) Stop()
Stop will gracefully disconnect from AMQP. It is not guaranteed that all in flight requests or responses are handled before the disconnect. Instead, the user should ensure that all calls to c.Send() has returned before calling c.Stop().
func (*RpcClient) WithConfirmMode ¶ added in v1.0.10
WithConfirmMode sets the confirm-mode on the client. This causes the client to wait for confirmations, and if none arrives or the confirmation is marked as Nack, Client#Send() returns a corresponding error.
func (*RpcClient) WithConsumeSettings ¶ added in v1.0.10
func (c *RpcClient) WithConsumeSettings(s ConsumeOptions) *RpcClient
WithConsumeSettings will set the settings used when consuming in the client globally.
func (*RpcClient) WithLogger ¶ added in v1.0.10
WithLogger sets the logger to use for error logging.
func (*RpcClient) WithMaxRetries ¶ added in v1.0.10
WithMaxRetries sets the maximum number of times, the client will retry sending the request before giving up and returning the error to the caller of c.Send(). This retry will persist during reconnecting.
func (*RpcClient) WithPublishSettings ¶ added in v1.0.10
func (c *RpcClient) WithPublishSettings(s PublishOptions) *RpcClient
WithPublishSettings will set the client publishing settings when publishing messages.
func (*RpcClient) WithQueueDeclareSettings ¶ added in v1.0.10
func (c *RpcClient) WithQueueDeclareSettings(s QueueOptions) *RpcClient
WithQueueDeclareSettings will set the settings used when declaring queues for the client globally.
func (*RpcClient) WithTimeout ¶ added in v1.0.10
WithTimeout will set the client timeout used when publishing messages. T will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.
type SendFunc ¶ added in v1.0.9
SendFunc represents the function that Send does. It takes a Request as input and returns a delivery and an error.
func ClientMiddlewareChain ¶ added in v1.0.9
func ClientMiddlewareChain(next SendFunc, m ...ClientMiddlewareFunc) SendFunc
ClientMiddlewareChain will attach all given middlewares to your SendFunc. The middlewares will be executed in the same order as your input.
type Table ¶
type Table map[string]interface{}
Table stores user supplied fields of the following types:
bool byte float32 float64 int int16 int32 int64 nil string time.Time amqp.Decimal amqp.Table []byte []interface{} - containing above types
Functions taking a table will immediately fail when the table contains a value of an unsupported type.
The caller must be specific in which precision of integer it wishes to encode.
Use a type assertion when reading values from a table for type conversion.
RabbitMQ expects int32 for integer values.
Source Files ¶
- connection.go
- connection_options.go
- consume.go
- consume_middleware.go
- consume_options.go
- context.go
- declare.go
- errors.go
- logger.go
- options.go
- publish.go
- publish_flow_block.go
- publish_options.go
- publisher_options.go
- response_writer.go
- rpc_client.go
- rpc_client_middleware.go
- rpc_client_options.go
- rpc_flow_block.go
- rpc_request.go
- table.go