Documentation ¶
Index ¶
- Constants
- Variables
- type BackoffFunc
- type BatchHandler
- func (h *BatchHandler) Config() BatchHandlerConfig
- func (h *BatchHandler) ConsumeOptions() ConsumeOptions
- func (h *BatchHandler) FlushTimeout() time.Duration
- func (h *BatchHandler) IsActive(ctx context.Context) (active bool, err error)
- func (h *BatchHandler) MaxBatchBytes() int
- func (h *BatchHandler) MaxBatchSize() int
- func (h *BatchHandler) Pause(ctx context.Context) error
- func (h *BatchHandler) Queue() string
- func (h *BatchHandler) QueueConfig() QueueConfig
- func (h *BatchHandler) Resume(ctx context.Context) error
- func (h *BatchHandler) SetConsumeOptions(consumeOpts ConsumeOptions)
- func (h *BatchHandler) SetFlushTimeout(flushTimeout time.Duration)
- func (h *BatchHandler) SetHandlerFunc(hf BatchHandlerFunc)
- func (h *BatchHandler) SetMaxBatchBytes(maxBatchBytes int)
- func (h *BatchHandler) SetMaxBatchSize(maxBatchSize int)
- func (h *BatchHandler) SetQueue(queue string)
- type BatchHandlerConfig
- type BatchHandlerFunc
- type BatchHandlerOption
- type Connection
- func (ch *Connection) BlockingFlowControl() <-chan amqp.Blocking
- func (ch *Connection) Close() (err error)
- func (ch *Connection) Connect(ctx context.Context) error
- func (ch *Connection) Error() error
- func (ch *Connection) Flag(err error)
- func (c *Connection) IsCached() bool
- func (ch *Connection) IsClosed() bool
- func (ch *Connection) IsFlagged() bool
- func (c *Connection) Name() string
- func (ch *Connection) Recover(ctx context.Context) error
- type ConnectionOption
- func ConnectionWithBackoffPolicy(policy BackoffFunc) ConnectionOption
- func ConnectionWithCached(cached bool) ConnectionOption
- func ConnectionWithHeartbeatInterval(interval time.Duration) ConnectionOption
- func ConnectionWithLogger(logger logging.Logger) ConnectionOption
- func ConnectionWithRecoverCallback(callback ConnectionRecoverCallback) ConnectionOption
- func ConnectionWithTLS(config *tls.Config) ConnectionOption
- func ConnectionWithTimeout(timeout time.Duration) ConnectionOption
- type ConnectionPool
- func (cp *ConnectionPool) Capacity() int
- func (cp *ConnectionPool) Close()
- func (cp *ConnectionPool) GetConnection(ctx context.Context) (conn *Connection, err error)
- func (cp *ConnectionPool) GetTransientConnection(ctx context.Context) (conn *Connection, err error)
- func (cp *ConnectionPool) Name() string
- func (cp *ConnectionPool) ReturnConnection(conn *Connection, err error)
- func (cp *ConnectionPool) Size() int
- func (cp *ConnectionPool) StatCachedActive() int
- func (cp *ConnectionPool) StatTransientActive() int
- type ConnectionPoolOption
- func ConnectionPoolWithConnectionTimeout(timeout time.Duration) ConnectionPoolOption
- func ConnectionPoolWithHeartbeatInterval(interval time.Duration) ConnectionPoolOption
- func ConnectionPoolWithLogger(logger logging.Logger) ConnectionPoolOption
- func ConnectionPoolWithName(name string) ConnectionPoolOption
- func ConnectionPoolWithNamePrefix(prefix string) ConnectionPoolOption
- func ConnectionPoolWithNameSuffix(suffix string) ConnectionPoolOption
- func ConnectionPoolWithRecoverCallback(callback ConnectionRecoverCallback) ConnectionPoolOption
- func ConnectionPoolWithTLS(config *tls.Config) ConnectionPoolOption
- type ConnectionRecoverCallback
- type ConsumeOptions
- type Delivery
- type ExchangeBindOptions
- type ExchangeDeclareOptions
- type ExchangeDeleteOptions
- type ExchangeKind
- type ExchangeUnbindOptions
- type Handler
- func (h *Handler) Config() HandlerConfig
- func (h *Handler) ConsumeOptions() ConsumeOptions
- func (h *Handler) IsActive(ctx context.Context) (active bool, err error)
- func (h *Handler) Pause(ctx context.Context) error
- func (h *Handler) Queue() string
- func (h *Handler) QueueConfig() QueueConfig
- func (h *Handler) Resume(ctx context.Context) error
- func (h *Handler) SetConsumeOptions(consumeOpts ConsumeOptions)
- func (h *Handler) SetHandlerFunc(hf HandlerFunc)
- func (h *Handler) SetQueue(queue string)
- type HandlerConfig
- type HandlerFunc
- type Option
- func WithBufferCapacity(size int) Option
- func WithConfirms(requirePublishConfirms bool) Option
- func WithConnectionRecoverCallback(callback ConnectionRecoverCallback) Option
- func WithConnectionTimeout(timeout time.Duration) Option
- func WithHeartbeatInterval(interval time.Duration) Option
- func WithLogger(logger logging.Logger) Option
- func WithName(name string) Option
- func WithNamePrefix(prefix string) Option
- func WithNameSuffix(suffix string) Option
- func WithSessionConsumeContextRetryCallback(callback SessionRetryCallback) Option
- func WithSessionConsumeRetryCallback(callback SessionRetryCallback) Option
- func WithSessionExchangeBindRetryCallback(callback SessionRetryCallback) Option
- func WithSessionExchangeDeclarePassiveRetryCallback(callback SessionRetryCallback) Option
- func WithSessionExchangeDeclareRetryCallback(callback SessionRetryCallback) Option
- func WithSessionExchangeDeleteRetryCallback(callback SessionRetryCallback) Option
- func WithSessionExchangeUnbindRetryCallback(callback SessionRetryCallback) Option
- func WithSessionFlowRetryCallback(callback SessionRetryCallback) Option
- func WithSessionGetRetryCallback(callback SessionRetryCallback) Option
- func WithSessionPublishRetryCallback(callback SessionRetryCallback) Option
- func WithSessionQoSRetryCallback(callback SessionRetryCallback) Option
- func WithSessionQueueBindRetryCallback(callback SessionRetryCallback) Option
- func WithSessionQueueDeclarePassiveRetryCallback(callback SessionRetryCallback) Option
- func WithSessionQueueDeclareRetryCallback(callback SessionRetryCallback) Option
- func WithSessionQueueDeleteRetryCallback(callback SessionRetryCallback) Option
- func WithSessionQueuePurgeRetryCallback(callback SessionRetryCallback) Option
- func WithSessionQueueUnbindRetryCallback(callback SessionRetryCallback) Option
- func WithSessionRecoverCallback(callback SessionRetryCallback) Option
- func WithSessionRetryCallback(callback SessionRetryCallback) Option
- func WithTLS(config *tls.Config) Option
- type Pool
- func (p *Pool) Close()
- func (p *Pool) ConnectionPoolCapacity() int
- func (p *Pool) ConnectionPoolSize() int
- func (p *Pool) Context() context.Context
- func (p *Pool) GetSession(ctx context.Context) (*Session, error)
- func (p *Pool) GetTransientSession(ctx context.Context) (*Session, error)
- func (p *Pool) Name() string
- func (p *Pool) ReturnSession(session *Session, err error)
- func (p *Pool) SessionPoolCapacity() int
- func (p *Pool) SessionPoolSize() int
- type Publisher
- type PublisherOption
- type Publishing
- type Queue
- type QueueBindOptions
- type QueueConfig
- type QueueDeclareOptions
- type QueueDeleteOptions
- type QueuePurgeOptions
- type Session
- func (s *Session) Ack(deliveryTag uint64, multiple bool) (err error)
- func (s *Session) AwaitConfirm(ctx context.Context, expectedTag uint64) (err error)
- func (s *Session) Close() (err error)
- func (s *Session) Connect() (err error)
- func (s *Session) ConsumeWithContext(ctx context.Context, queue string, option ...ConsumeOptions) (<-chan amqp091.Delivery, error)
- func (s *Session) Error() error
- func (s *Session) ExchangeBind(ctx context.Context, destination string, routingKey string, source string, ...) error
- func (s *Session) ExchangeDeclare(ctx context.Context, name string, kind ExchangeKind, ...) error
- func (s *Session) ExchangeDeclarePassive(ctx context.Context, name string, kind ExchangeKind, ...) error
- func (s *Session) ExchangeDelete(ctx context.Context, name string, option ...ExchangeDeleteOptions) error
- func (s *Session) ExchangeUnbind(ctx context.Context, destination string, routingKey string, source string, ...) error
- func (s *Session) Flag(err error)
- func (s *Session) Flow(ctx context.Context, active bool) error
- func (s *Session) FlushConfirms()
- func (s *Session) FlushReturned()
- func (s *Session) Get(ctx context.Context, queue string, autoAck bool) (msg Delivery, ok bool, err error)
- func (s *Session) IsCached() bool
- func (s *Session) IsConfirmable() bool
- func (s *Session) IsFlagged() bool
- func (s *Session) Nack(deliveryTag uint64, multiple bool, requeue bool) (err error)
- func (s *Session) Name() string
- func (s *Session) Publish(ctx context.Context, exchange string, routingKey string, msg Publishing) (deliveryTag uint64, err error)
- func (s *Session) Qos(ctx context.Context, prefetchCount int, prefetchSize int) error
- func (s *Session) QueueBind(ctx context.Context, queueName string, routingKey string, exchange string, ...) error
- func (s *Session) QueueDeclare(ctx context.Context, name string, option ...QueueDeclareOptions) (Queue, error)
- func (s *Session) QueueDeclarePassive(ctx context.Context, name string, option ...QueueDeclareOptions) (Queue, error)
- func (s *Session) QueueDelete(ctx context.Context, name string, option ...QueueDeleteOptions) (purgedMsgs int, err error)
- func (s *Session) QueuePurge(ctx context.Context, name string, options ...QueuePurgeOptions) (int, error)
- func (s *Session) QueueUnbind(ctx context.Context, name string, routingKey string, exchange string, ...) error
- func (s *Session) Recover(ctx context.Context) error
- func (s *Session) Tx() error
- func (s *Session) TxCommit() error
- func (s *Session) TxRollback() error
- type SessionOption
- func SessionWithAutoCloseConnection(autoClose bool) SessionOption
- func SessionWithBufferCapacity(capacity int) SessionOption
- func SessionWithCached(cached bool) SessionOption
- func SessionWithConfirms(requiresPublishConfirms bool) SessionOption
- func SessionWithConsumeContextRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithConsumeRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithContext(ctx context.Context) SessionOption
- func SessionWithExchangeBindRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithExchangeDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithExchangeDeclareRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithExchangeDeleteRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithExchangeUnbindRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithFlowRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithGetRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithLogger(logger logging.Logger) SessionOption
- func SessionWithPublishRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithQoSRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithQueueBindRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithQueueDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithQueueDeclareRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithQueueDeleteRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithQueuePurgeRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithQueueUnbindRetryCallback(callback SessionRetryCallback) SessionOption
- func SessionWithRecoverCallback(callback SessionRetryCallback) SessionOption
- func SessionWithRetryCallback(callback SessionRetryCallback) SessionOption
- type SessionPool
- func (sp *SessionPool) Capacity() int
- func (sp *SessionPool) Close()
- func (sp *SessionPool) GetSession(ctx context.Context) (s *Session, err error)
- func (sp *SessionPool) GetTransientSession(ctx context.Context) (s *Session, err error)
- func (sp *SessionPool) ReturnSession(session *Session, err error)
- func (sp *SessionPool) Size() int
- type SessionPoolOption
- func SessionPoolWithAutoCloseConnectionPool(autoClose bool) SessionPoolOption
- func SessionPoolWithBufferCapacity(capacity int) SessionPoolOption
- func SessionPoolWithConfirms(requirePublishConfirms bool) SessionPoolOption
- func SessionPoolWithConsumeContextRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithConsumeRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeBindRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeDeclareRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeDeleteRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithExchangeUnbindRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithFlowRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithGetRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithLogger(logger logging.Logger) SessionPoolOption
- func SessionPoolWithPublishRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQoSRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueBindRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueDeclareRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueDeleteRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueuePurgeRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithQueueUnbindRetryCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithRecoverCallback(callback SessionRetryCallback) SessionPoolOption
- func SessionPoolWithRetryCallback(callback SessionRetryCallback) SessionPoolOption
- type SessionRetryCallback
- type Subscriber
- func (s *Subscriber) Close()
- func (s *Subscriber) RegisterBatchHandler(handler *BatchHandler)
- func (s *Subscriber) RegisterBatchHandlerFunc(queue string, hf BatchHandlerFunc, options ...BatchHandlerOption) *BatchHandler
- func (s *Subscriber) RegisterHandler(handler *Handler)
- func (s *Subscriber) RegisterHandlerFunc(queue string, hf HandlerFunc, options ...ConsumeOptions) *Handler
- func (s *Subscriber) Start(ctx context.Context) (err error)
- func (s *Subscriber) Wait()
- type SubscriberOption
- type Table
- func (t Table) Clone() Table
- func (t Table) Death() (int64, bool)
- func (t Table) DeliveryCount() (int64, bool)
- func (t Table) WithDeadLetterExchange(exchange string) Table
- func (t Table) WithDeadLetterExchangeAndRoutingKey(exchange, routingKey string) Table
- func (t Table) WithDeliveryLimit(limit int) Table
- type Topologer
- func (t *Topologer) ExchangeBind(ctx context.Context, destination string, routingKey string, source string, ...) (err error)
- func (t *Topologer) ExchangeDeclare(ctx context.Context, name string, kind ExchangeKind, ...) (err error)
- func (t *Topologer) ExchangeDeclarePassive(ctx context.Context, name string, kind ExchangeKind, ...) (err error)
- func (t *Topologer) ExchangeDelete(ctx context.Context, name string, option ...ExchangeDeleteOptions) (err error)
- func (t *Topologer) ExchangeUnbind(ctx context.Context, destination string, routingKey string, source string, ...) (err error)
- func (t *Topologer) QueueBind(ctx context.Context, name string, routingKey string, exchange string, ...) (err error)
- func (t *Topologer) QueueDeclare(ctx context.Context, name string, option ...QueueDeclareOptions) (queue Queue, err error)
- func (t *Topologer) QueueDeclarePassive(ctx context.Context, name string, option ...QueueDeclareOptions) (queue Queue, err error)
- func (t *Topologer) QueueDelete(ctx context.Context, name string, option ...QueueDeleteOptions) (purged int, err error)
- func (t *Topologer) QueuePurge(ctx context.Context, name string, options ...QueuePurgeOptions) (int, error)
- func (t *Topologer) QueueUnbind(ctx context.Context, name string, routingKey string, exchange string, ...) (err error)
- type TopologerOption
Constants ¶
const ( // In order to prevent the broker from requeuing the message to th end of the queue, we need to set this limit in order for at least the // first N requeues to be requeued to the front of the queue. // https://www.rabbitmq.com/docs/quorum-queues#repeated-requeues DefaultQueueDeliveryLimit = 20 )
const (
/*
ExchangeKeyDeadLetter can be used in order to create a dead letter exchange (reference: https://www.rabbitmq.com/dlx.html)
Some queue consumers may be unable to process certain alerts, and the queue itself may reject messages as a result of certain events.
For instance, a message is dropped if there is no matching queue for it. In that instance, Dead Letter Exchanges must be implemented
so that those messages can be saved and reprocessed later. The “Dead Letter Exchange” is an AMQP enhancement provided by RabbitMQ.
This exchange has the capability of capturing messages that are not deliverable.
*/
ExchangeKeyDeadLetter = "x-dead-letter-exchange"
)
Variables ¶
var ( ErrInvalidConnectURL = errors.New("invalid connection url") // ErrConnectionFailed is just a generic error that is not checked // explicitly against in the code. ErrConnectionFailed = errors.New("connection failed") ErrPoolInitializationFailed = errors.New("pool initialization failed") ErrClosed = errors.New("closed") // ErrNotFound is returned by ExchangeDeclarePassive or QueueDeclarePassive in the case that // the queue was not found. ErrNotFound = errors.New("not found") // ErrBlockingFlowControl is returned when the server is under flow control // Your HTTP api may return 503 Service Unavailable or 429 Too Many Requests with a Retry-After header (https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After) ErrBlockingFlowControl = errors.New("blocking flow control") // ErrReturned is returned when a message is returned by the server when publishing ErrReturned = errors.New("returned") // ErrReject can be used to reject a specific message // This is a special error that negatively acknowledges messages and does not reuque them. ErrReject = errors.New("message rejected") )
var ( // ErrNack is returned in case the broker did not acknowledge a published message ErrNack = errors.New("message not acked") // returned when a user tries to await confirmations without configuring them for the session ErrNoConfirms = errors.New("confirmations are disabled for this session") // ErrDeliveryTagMismatch is returne din case we receive a publishing confirmation that // contains a delivery tag that doe snot match the one we expect. ErrDeliveryTagMismatch = errors.New("delivery tag mismatch") ErrDeliveryClosed = errors.New("delivery channel closed") )
var ( // ErrPauseFailed is returned by (Batch)Handler.Pause in case that the passed context is canceled ErrPauseFailed = errors.New("failed to pause handler") // ErrResumeFailed is returned by (Batch)Handler.Resume in case that the passed context is canceled ErrResumeFailed = errors.New("failed to resume handler") )
var (
QuorumQueue = Table{
"x-queue-type": "quorum",
}
)
Functions ¶
This section is empty.
Types ¶
type BackoffFunc ¶
type BatchHandler ¶ added in v0.6.0
type BatchHandler struct {
// contains filtered or unexported fields
}
BatchHandler is a struct that contains all parameters needed in order to register a batch handler function.
func NewBatchHandler ¶ added in v0.7.0
func NewBatchHandler(queue string, hf BatchHandlerFunc, options ...BatchHandlerOption) *BatchHandler
NewHandler creates a new handler which is primarily a combination of your passed handler function and the queue name from which the handler fetches messages and processes those. Additionally, the handler allows you to pause and resume processing from the provided queue.
func (*BatchHandler) Config ¶ added in v0.7.0
func (h *BatchHandler) Config() BatchHandlerConfig
func (*BatchHandler) ConsumeOptions ¶ added in v0.7.0
func (h *BatchHandler) ConsumeOptions() ConsumeOptions
func (*BatchHandler) FlushTimeout ¶ added in v0.6.1
func (h *BatchHandler) FlushTimeout() time.Duration
func (*BatchHandler) IsActive ¶ added in v0.7.0
func (h *BatchHandler) IsActive(ctx context.Context) (active bool, err error)
func (*BatchHandler) MaxBatchBytes ¶ added in v0.7.2
func (h *BatchHandler) MaxBatchBytes() int
func (*BatchHandler) MaxBatchSize ¶ added in v0.6.1
func (h *BatchHandler) MaxBatchSize() int
func (*BatchHandler) Pause ¶ added in v0.7.0
func (h *BatchHandler) Pause(ctx context.Context) error
Pause allows to halt the processing of a queue after the processing has been started by the subscriber.
func (*BatchHandler) Queue ¶ added in v0.6.0
func (h *BatchHandler) Queue() string
func (*BatchHandler) QueueConfig ¶ added in v0.7.1
func (h *BatchHandler) QueueConfig() QueueConfig
func (*BatchHandler) Resume ¶ added in v0.7.0
func (h *BatchHandler) Resume(ctx context.Context) error
Resume allows to continue the processing of a queue after it has been paused using Pause
func (*BatchHandler) SetConsumeOptions ¶ added in v0.7.0
func (h *BatchHandler) SetConsumeOptions(consumeOpts ConsumeOptions)
func (*BatchHandler) SetFlushTimeout ¶ added in v0.7.0
func (h *BatchHandler) SetFlushTimeout(flushTimeout time.Duration)
func (*BatchHandler) SetHandlerFunc ¶ added in v0.7.0
func (h *BatchHandler) SetHandlerFunc(hf BatchHandlerFunc)
SetHandlerFunc changes the current handler function to another handler function which processes messages.. The actual change is effective after pausing and resuming the handler.
func (*BatchHandler) SetMaxBatchBytes ¶ added in v0.7.2
func (h *BatchHandler) SetMaxBatchBytes(maxBatchBytes int)
func (*BatchHandler) SetMaxBatchSize ¶ added in v0.7.0
func (h *BatchHandler) SetMaxBatchSize(maxBatchSize int)
func (*BatchHandler) SetQueue ¶ added in v0.7.0
func (h *BatchHandler) SetQueue(queue string)
SetQueue changes the current queue to another queue from which the handler consumes messages. The actual change is effective after pausing and resuming the handler.
type BatchHandlerConfig ¶ added in v0.7.0
type BatchHandlerConfig struct { Queue string ConsumeOptions HandlerFunc BatchHandlerFunc // Maximum number of messages MaxBatchSize int // Maximum size of a batch in bytes (soft limit which triggers a batch to be processed) // does not guarantee that the batch size is not exceeded. MaxBatchBytes int FlushTimeout time.Duration }
BatchHandlerConfig is a read only snapshot of the current handler's configuration.
type BatchHandlerFunc ¶ added in v0.6.0
BatchHandlerFunc is a handler for incoming batches of messages/events
type BatchHandlerOption ¶ added in v0.7.0
type BatchHandlerOption func(*BatchHandler)
func WithBatchConsumeOptions ¶ added in v0.7.0
func WithBatchConsumeOptions(opts ConsumeOptions) BatchHandlerOption
func WithBatchFlushTimeout ¶ added in v0.7.0
func WithBatchFlushTimeout(d time.Duration) BatchHandlerOption
func WithMaxBatchBytes ¶ added in v0.7.2
func WithMaxBatchBytes(size int) BatchHandlerOption
WithMaxBatchBytes sets the maximum size of a batch in bytes. If the batch size exceeds this limit, the batch is passed to the handler function. If the value is set to 0, the batch size is not limited by bytes.
func WithMaxBatchSize ¶ added in v0.7.0
func WithMaxBatchSize(size int) BatchHandlerOption
WithMaxBatchSize sets the maximum size of a batch. If set to 0 the batch size is not limited. This means that the batch size is only limited by the maximum batch size in bytes.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is an internal representation of amqp.Connection.
func NewConnection ¶
func NewConnection(ctx context.Context, connectUrl, name string, options ...ConnectionOption) (*Connection, error)
NewConnection creates a connection wrapper. name: unique connection name
func (*Connection) BlockingFlowControl ¶ added in v0.8.0
func (ch *Connection) BlockingFlowControl() <-chan amqp.Blocking
func (*Connection) Close ¶
func (ch *Connection) Close() (err error)
func (*Connection) Connect ¶
func (ch *Connection) Connect(ctx context.Context) error
Connect tries to connect (or reconnect) Does not block indefinitely, but returns an error upon connection failure.
func (*Connection) Error ¶
func (ch *Connection) Error() error
Error returns the first error from the errors channel and flushes all other pending errors from the channel In case that there are no errors, nil is returned.
func (*Connection) Flag ¶
func (ch *Connection) Flag(err error)
Flag flags the connection as broken which must be recovered. A flagged connection implies a closed connection. Flagging of a connectioncan only be undone by Recover-ing the connection.
func (*Connection) IsCached ¶
func (c *Connection) IsCached() bool
IsCached returns true in case this session is supposed to be returned to a session pool.
func (*Connection) IsClosed ¶
func (ch *Connection) IsClosed() bool
func (*Connection) IsFlagged ¶ added in v0.8.0
func (ch *Connection) IsFlagged() bool
func (*Connection) Name ¶
func (c *Connection) Name() string
Name returns the name of the connection
type ConnectionOption ¶
type ConnectionOption func(*connectionOption)
func ConnectionWithBackoffPolicy ¶
func ConnectionWithBackoffPolicy(policy BackoffFunc) ConnectionOption
ConnectionWithBackoffPolicy influences the sleep interval between connection recovery retries.
func ConnectionWithCached ¶
func ConnectionWithCached(cached bool) ConnectionOption
ConnectionWithCached makes a connection a cached connection This is only necessary for the connection pool, as cached connections are part of a pool and can be returned back to the pool without being closed.
func ConnectionWithHeartbeatInterval ¶
func ConnectionWithHeartbeatInterval(interval time.Duration) ConnectionOption
ConnectionHeartbeatInterval allows to set a custom heartbeat interval, that MUST be >= 1 * time.Second
func ConnectionWithLogger ¶
func ConnectionWithLogger(logger logging.Logger) ConnectionOption
ConnectionWithLogger allows to set a logger. By default no logger is set.
func ConnectionWithRecoverCallback ¶ added in v0.8.0
func ConnectionWithRecoverCallback(callback ConnectionRecoverCallback) ConnectionOption
ConnectionWithRecoverCallback allows to set a custom recover callback.
func ConnectionWithTLS ¶
func ConnectionWithTLS(config *tls.Config) ConnectionOption
ConnectionWithTLS allows to configure tls connectivity.
func ConnectionWithTimeout ¶
func ConnectionWithTimeout(timeout time.Duration) ConnectionOption
ConnectionWithTimeout allows to set a custom connection timeout, that MUST be >= 1 * time.Second
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool houses the pool of RabbitMQ connections.
func NewConnectionPool ¶
func NewConnectionPool(ctx context.Context, connectUrl string, numConns int, options ...ConnectionPoolOption) (*ConnectionPool, error)
NewConnectionPool creates a new connection pool which has a maximum size it can become and an idle size of connections that are always open.
func (*ConnectionPool) Capacity ¶ added in v0.8.0
func (cp *ConnectionPool) Capacity() int
Capacity is the capacity of the cached connection pool without any transient connections. It is the initial number of connections that were created for this connection pool.
func (*ConnectionPool) Close ¶
func (cp *ConnectionPool) Close()
Close closes the connection pool. Closes all connections and sessions that are currently known to the pool. Any new connections or session requests will return an error. Any returned sessions or connections will be closed properly.
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection(ctx context.Context) (conn *Connection, err error)
GetConnection only returns an error upon shutdown
func (*ConnectionPool) GetTransientConnection ¶
func (cp *ConnectionPool) GetTransientConnection(ctx context.Context) (conn *Connection, err error)
GetTransientConnection may return an error when the context was cancelled before the connection could be obtained. Transient connections may be returned to the pool. The are closed properly upon returning.
func (*ConnectionPool) Name ¶ added in v0.2.0
func (cp *ConnectionPool) Name() string
func (*ConnectionPool) ReturnConnection ¶
func (cp *ConnectionPool) ReturnConnection(conn *Connection, err error)
ReturnConnection puts the connection back in the queue and flag it for error. This helps maintain a Round Robin on Connections and their resources. If the connection is flagged, it will be recovered and returned to the pool. If the context is canceled, the connection will be immediately returned to the pool without any recovery attempt.
func (*ConnectionPool) Size ¶ added in v0.5.0
func (cp *ConnectionPool) Size() int
Size returns the number of idle cached connections.
func (*ConnectionPool) StatCachedActive ¶ added in v0.8.0
func (cp *ConnectionPool) StatCachedActive() int
StatCachedActive returns the number of active cached connections.
func (*ConnectionPool) StatTransientActive ¶ added in v0.8.0
func (cp *ConnectionPool) StatTransientActive() int
StatTransientActive returns the number of active transient connections.
type ConnectionPoolOption ¶
type ConnectionPoolOption func(*connectionPoolOption)
func ConnectionPoolWithConnectionTimeout ¶
func ConnectionPoolWithConnectionTimeout(timeout time.Duration) ConnectionPoolOption
ConnectionPoolWithConnectionTimeout allows to set a custom connection timeout, that MUST be >= 1 * time.Second
func ConnectionPoolWithHeartbeatInterval ¶
func ConnectionPoolWithHeartbeatInterval(interval time.Duration) ConnectionPoolOption
WithHeartbeatInterval allows to set a custom heartbeat interval, that MUST be >= 1 * time.Second
func ConnectionPoolWithLogger ¶
func ConnectionPoolWithLogger(logger logging.Logger) ConnectionPoolOption
ConnectionPoolWithLogger allows to set a custom logger.
func ConnectionPoolWithName ¶
func ConnectionPoolWithName(name string) ConnectionPoolOption
ConnectionPoolWithName gives all of your pooled connections a prefix name
func ConnectionPoolWithNamePrefix ¶ added in v0.2.0
func ConnectionPoolWithNamePrefix(prefix string) ConnectionPoolOption
ConnectionPoolWithNamePrefix adds a prefix to the connection pool name
func ConnectionPoolWithNameSuffix ¶ added in v0.2.0
func ConnectionPoolWithNameSuffix(suffix string) ConnectionPoolOption
ConnectionPoolWithNameSuffix adds a suffix to the connection pool name
func ConnectionPoolWithRecoverCallback ¶ added in v0.8.0
func ConnectionPoolWithRecoverCallback(callback ConnectionRecoverCallback) ConnectionPoolOption
ConnectionPoolWithRecoverCallback allows to set a custom recover callback.
func ConnectionPoolWithTLS ¶
func ConnectionPoolWithTLS(config *tls.Config) ConnectionPoolOption
ConnectionPoolWithTLS allows to configure tls connectivity.
type ConnectionRecoverCallback ¶ added in v0.8.0
ConnectionRecoverCallback is a function that can be called after a connection failed to be established and is about to be recovered.
type ConsumeOptions ¶ added in v0.5.0
type ConsumeOptions struct { // The consumer is identified by a string that is unique and scoped for all consumers on this channel. If you wish to eventually cancel the consumer, use the same non-empty identifier in Channel.Cancel. // An empty string will cause the library to generate a unique identity. // The consumer identity will be included in every Delivery in the ConsumerTag field ConsumerTag string // When AutoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. // Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details. AutoAck bool // When Exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers. Exclusive bool // The NoLocal flag is not supported by RabbitMQ. // It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness. NoLocal bool // When NoWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed. // Optional arguments can be provided that have specific semantics for the queue or server. NoWait bool // Args are aditional implementation dependent parameters. Args Table }
type Delivery ¶
type Delivery struct { Headers Table // Application or header exchange table // Properties ContentType string // MIME content type ContentEncoding string // MIME content encoding DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) Priority uint8 // queue implementation use - 0 to 9 CorrelationId string // application use - correlation identifier ReplyTo string // application use - address to reply to (ex: RPC) Expiration string // implementation use - message expiration spec MessageId string // application use - message identifier Timestamp time.Time // application use - message timestamp Type string // application use - message type name UserId string // application use - creating user - should be authenticated user AppId string // application use - creating application id // Valid only with Channel.Consume ConsumerTag string // Valid only with Channel.Get MessageCount uint32 DeliveryTag uint64 Redelivered bool Exchange string // basic.publish exchange RoutingKey string // basic.publish routing key Body []byte }
func NewDeliveryFromAMQP091 ¶ added in v0.9.0
func NewDeliveryFromAMQP091(delivery amqp091.Delivery) Delivery
type ExchangeBindOptions ¶ added in v0.5.0
type ExchangeBindOptions struct { // When NoWait is true, do not wait for the server to confirm the binding. If any // error occurs the channel will be closed. Add a listener to NotifyClose to // handle these errors. NoWait bool // Optional arguments specific to the exchanges bound can also be specified. Args Table }
type ExchangeDeclareOptions ¶ added in v0.5.0
type ExchangeDeclareOptions struct { // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain // declared when there are no remaining bindings. This is the best lifetime for // long-lived exchange configurations like stable routes and default exchanges. // // Non-Durable and Auto-Deleted exchanges will be deleted when there are no // remaining bindings and not restored on server restart. This lifetime is // useful for temporary topologies that should not pollute the virtual host on // failure or after the consumers have completed. // // Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is // running including when there are no remaining bindings. This is useful for // temporary topologies that may have long delays between bindings. // // Durable and Auto-Deleted exchanges will survive server restarts and will be // removed before and after server restarts when there are no remaining bindings. // These exchanges are useful for robust temporary topologies or when you require // binding durable queues to auto-deleted exchanges. // // Note: RabbitMQ declares the default exchange types like 'amq.fanout' as // durable, so queues that bind to these pre-declared exchanges must also be // durable. Durable bool // Durable and Non-Auto-Deleted exchanges will survive server restarts and remain // declared when there are no remaining bindings. This is the best lifetime for // long-lived exchange configurations like stable routes and default exchanges. // // Non-Durable and Auto-Deleted exchanges will be deleted when there are no // remaining bindings and not restored on server restart. This lifetime is // useful for temporary topologies that should not pollute the virtual host on // failure or after the consumers have completed. // // Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is // running including when there are no remaining bindings. This is useful for // temporary topologies that may have long delays between bindings. // // Durable and Auto-Deleted exchanges will survive server restarts and will be // removed before and after server restarts when there are no remaining bindings. // These exchanges are useful for robust temporary topologies or when you require // binding durable queues to auto-deleted exchanges. AutoDelete bool // Exchanges declared as `internal` do not accept accept publishings. Internal // exchanges are useful when you wish to implement inter-exchange topologies // that should not be exposed to users of the broker. Internal bool // When NoWait is true, declare without waiting for a confirmation from the server. // The channel may be closed as a result of an error. Add a NotifyClose listener // to respond to any exceptions. NoWait bool // Optional Table of arguments that are specific to the server's implementation of // the exchange can be sent for exchange types that require extra parameters. Args Table }
type ExchangeDeleteOptions ¶ added in v0.5.0
type ExchangeDeleteOptions struct { // When IfUnused is true, the server will only delete the exchange if it has no queue // bindings. If the exchange has queue bindings the server does not delete it // but close the channel with an exception instead. Set this to true if you are // not the sole owner of the exchange. IfUnused bool // When NoWait is true, do not wait for a server confirmation that the exchange has // been deleted. NoWait bool }
type ExchangeKind ¶ added in v0.7.0
type ExchangeKind string
const ( /* The first RabbitMQ exchange type, the direct exchange, uses a message routing key to transport messages to queues. The routing key is a message attribute that the producer adds to the message header. You can consider the routing key to be an “address” that the exchange uses to determine how the message should be routed. A message is delivered to the queue with the binding key that exactly matches the message’s routing key. The direct exchange’s default exchange is “amq. direct“, which AMQP brokers must offer for communication. As is shown in the figure, queue A (create_pdf_queue) is tied to a direct exchange (pdf_events) with the binding key “pdf_create”. When a new message arrives at the direct exchange with the routing key “pdf_create”, the exchange sends it to the queue where the binding key = routing key; which is queue A in this example (create_pdf_queue). */ ExchangeKindDirect ExchangeKind = "direct" /* A fanout exchange, like direct and topic exchange, duplicates and routes a received message to any associated queues, regardless of routing keys or pattern matching. Here, your provided keys will be entirely ignored. Fanout exchanges are useful when the same message needs to be passed to one or perhaps more queues with consumers who may process the message differently. As shown in the image, a message received by the fanout exchange is copied and routed to all three queues associated with the exchange. When something happens, such as a sporting event or weather forecast, all connected mobile devices will be notified. For the fanout RabbitMQ exchange type, “amq.fanout” is the default exchange that must be provided by AMQP brokers. */ ExchangeKindFanOut ExchangeKind = "fanout" /* Topic RabbitMQ exchange type sends messages to queues depending on wildcard matches between the routing key and the queue binding’s routing pattern. Messages are routed to one or more queues based on a pattern that matches a message routing key. A list of words separated by a period must be used as the routing key (.). The routing patterns may include an asterisk (“*”) to match a word in a specified position of the routing key (for example, a routing pattern of “agreements.*.*.b.*” only matches routing keys with “agreements” as the first word and “b” as the fourth word). A pound symbol (“#”) denotes a match of zero or more words. In topic exchange, consumers indicate which topics are of interest to them. The consumer establishes a queue and binds it to the exchange using a certain routing pattern. All messages with a routing key that matches the routing pattern are routed to the queue, where they will remain until the consumer consumes them. For the topic RabbitMQ exchange type, “amq.topic” is the default topic exchange that AMQP brokers must provide for message exchange. */ ExchangeKindTopic ExchangeKind = "topic" /* A headers RabbitMQ exchange type is a message routing system that uses arguments with headers and optional values to route messages. Header exchanges are identical to topic exchanges, except that instead of using routing keys, messages are routed based on header values. If the value of the header equals the value of supply during binding, the message matches. In the binding between exchange and queue, a specific argument termed “x-match” indicates whether all headers must match or only one. For the message to match, any common header between the message and the binding should match, or all of the headers referenced in the binding must be present in the message. The “x-match” property has two possible values: “any” and “all,” with “all” being the default. A value of “all” indicates that all header pairs (key, value) must match, whereas “any” indicates that at least one pair must match. Instead of a string, headers can be built with a larger range of data types, such as integers or hashes. The headers exchange type (when used with the binding option “any”) is useful for steering messages containing a subset of known (unordered) criteria. For the header RabbitMQ exchange type, “amq.headers” is the default topic exchange that AMQP brokers must supply. */ ExchangeKindHeaders ExchangeKind = "headers" )
type ExchangeUnbindOptions ¶ added in v0.5.0
type ExchangeUnbindOptions struct { // When NoWait is true, do not wait for the server to confirm the deletion of the // binding. If any error occurs the channel will be closed. Add a listener to // NotifyClose to handle these errors. NoWait bool // Optional arguments that are specific to the type of exchanges bound can also be // provided. These must match the same arguments specified in ExchangeBind to // identify the binding. Args Table }
ExchangeUnbindOptions can be used to configure additional unbind options.
type Handler ¶ added in v0.2.0
type Handler struct {
// contains filtered or unexported fields
}
Handler is a struct that contains all parameters needed in order to register a handler function to the provided queue. Additionally, the handler allows you to pause and resume processing or messages.
func NewHandler ¶ added in v0.7.0
func NewHandler(queue string, hf HandlerFunc, option ...ConsumeOptions) *Handler
NewHandler creates a new handler which is primarily a combination of your passed handler function and the queue name from which the handler fetches messages and processes those. Additionally, the handler allows you to pause and resume processing from the provided queue.
func (*Handler) Config ¶ added in v0.7.0
func (h *Handler) Config() HandlerConfig
func (*Handler) ConsumeOptions ¶ added in v0.7.0
func (h *Handler) ConsumeOptions() ConsumeOptions
func (*Handler) Pause ¶ added in v0.7.0
Pause allows to halt the processing of a queue after the processing has been started by the subscriber.
func (*Handler) QueueConfig ¶ added in v0.7.1
func (h *Handler) QueueConfig() QueueConfig
func (*Handler) Resume ¶ added in v0.7.0
Resume allows to continue the processing of a queue after it has been paused using Pause
func (*Handler) SetConsumeOptions ¶ added in v0.7.0
func (h *Handler) SetConsumeOptions(consumeOpts ConsumeOptions)
SetConsumeOptions changes the current handler function to another handler function which processes messages.. The actual change is effective after pausing and resuming the handler.
func (*Handler) SetHandlerFunc ¶ added in v0.7.0
func (h *Handler) SetHandlerFunc(hf HandlerFunc)
SetHandlerFunc changes the current handler function to another handler function which processes messages.. The actual change is effective after pausing and resuming the handler.
type HandlerConfig ¶ added in v0.7.0
type HandlerConfig struct { Queue string ConsumeOptions HandlerFunc HandlerFunc }
HandlerConfig is a read only snapshot of the current handler's configuration. This internal data structure is used in the corresponsing consumer.
type HandlerFunc ¶
HandlerFunc is basically a handler for incoming messages/events.
type Option ¶ added in v0.2.0
type Option func(*poolOption)
func WithBufferCapacity ¶ added in v0.8.0
WithBufferCapacity allows to configurethe size of the confirmation, error & blocker buffers of all sessions
func WithConfirms ¶
WithConfirms requires all messages from sessions to be acked.
func WithConnectionRecoverCallback ¶ added in v0.8.0
func WithConnectionRecoverCallback(callback ConnectionRecoverCallback) Option
WithConnectionRecoverCallback allows to set a custom connection recovery callback
func WithConnectionTimeout ¶
WithConnectionTimeout allows to set a custom connection timeout, that MUST be >= 1 * time.Second
func WithHeartbeatInterval ¶
WithHeartbeatInterval allows to set a custom heartbeat interval, that MUST be >= 1 * time.Second
func WithLogger ¶
WithLogger allows to set a custom logger for the connection AND session pool
func WithNamePrefix ¶ added in v0.2.0
WithNamePrefix adds a prefix to the connection pool name
func WithNameSuffix ¶ added in v0.2.0
WithNameSuffix adds a suffix to the connection pool name
func WithSessionConsumeContextRetryCallback ¶ added in v0.8.0
func WithSessionConsumeContextRetryCallback(callback SessionRetryCallback) Option
WithSessionConsumeContextRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionConsumeRetryCallback ¶ added in v0.8.0
func WithSessionConsumeRetryCallback(callback SessionRetryCallback) Option
WithSessionConsumeRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeBindRetryCallback ¶ added in v0.8.0
func WithSessionExchangeBindRetryCallback(callback SessionRetryCallback) Option
WithSessionExchangeBindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeDeclarePassiveRetryCallback ¶ added in v0.8.0
func WithSessionExchangeDeclarePassiveRetryCallback(callback SessionRetryCallback) Option
WithSessionExchangeDeclarePassiveRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeDeclareRetryCallback ¶ added in v0.8.0
func WithSessionExchangeDeclareRetryCallback(callback SessionRetryCallback) Option
WithSessionExchangeDeclareRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeDeleteRetryCallback ¶ added in v0.8.0
func WithSessionExchangeDeleteRetryCallback(callback SessionRetryCallback) Option
WithSessionExchangeDeleteRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionExchangeUnbindRetryCallback ¶ added in v0.8.0
func WithSessionExchangeUnbindRetryCallback(callback SessionRetryCallback) Option
WithSessionExchangeUnbindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionFlowRetryCallback ¶ added in v0.8.0
func WithSessionFlowRetryCallback(callback SessionRetryCallback) Option
WithSessionFlowRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionGetRetryCallback ¶ added in v0.8.0
func WithSessionGetRetryCallback(callback SessionRetryCallback) Option
WithSessionGetRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionPublishRetryCallback ¶ added in v0.8.0
func WithSessionPublishRetryCallback(callback SessionRetryCallback) Option
WithSessionPublishRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQoSRetryCallback ¶ added in v0.8.0
func WithSessionQoSRetryCallback(callback SessionRetryCallback) Option
WithSessionQoSRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueBindRetryCallback ¶ added in v0.8.0
func WithSessionQueueBindRetryCallback(callback SessionRetryCallback) Option
WithSessionQueueBindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueDeclarePassiveRetryCallback ¶ added in v0.8.0
func WithSessionQueueDeclarePassiveRetryCallback(callback SessionRetryCallback) Option
WithSessionQueueDeclarePassiveRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueDeclareRetryCallback ¶ added in v0.8.0
func WithSessionQueueDeclareRetryCallback(callback SessionRetryCallback) Option
WithSessionQueueDeclareRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueDeleteRetryCallback ¶ added in v0.8.0
func WithSessionQueueDeleteRetryCallback(callback SessionRetryCallback) Option
WithSessionQueueDeleteRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueuePurgeRetryCallback ¶ added in v0.8.0
func WithSessionQueuePurgeRetryCallback(callback SessionRetryCallback) Option
WithSessionQueuePurgeRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionQueueUnbindRetryCallback ¶ added in v0.8.0
func WithSessionQueueUnbindRetryCallback(callback SessionRetryCallback) Option
WithSessionQueueUnbindRetryCallback allows to set a custom retry callback for the session pool.
func WithSessionRecoverCallback ¶ added in v0.8.0
func WithSessionRecoverCallback(callback SessionRetryCallback) Option
WithSessionRecoverCallback allows to set a custom session recovery callback
func WithSessionRetryCallback ¶ added in v0.8.0
func WithSessionRetryCallback(callback SessionRetryCallback) Option
WithSessionRetryCallback allows to set a custom retry callback for the session pool. This will set the same retry callback for all operations.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
func (*Pool) ConnectionPoolCapacity ¶ added in v0.8.0
ConnectionPoolCapacity returns the capacity of the connection pool.
func (*Pool) ConnectionPoolSize ¶ added in v0.5.0
ConnectionPoolSize returns the number of connections in the pool that are idling.
func (*Pool) GetSession ¶
GetSession returns a new session from the pool, only returns an error upon shutdown.
func (*Pool) GetTransientSession ¶
GetTransientSession returns a new session which is decoupled from anyshutdown mechanism, thus requiring a context for timeout handling. The session does also use a transient connection which is closed when the transient session is closed.
func (*Pool) ReturnSession ¶
ReturnSession returns a Session back to the pool. If the session was returned due to an error, erred should be set to true, otherwise erred should be set to false.
func (*Pool) SessionPoolCapacity ¶ added in v0.8.0
SessionPoolCapacity returns the capacity of the session pool.
func (*Pool) SessionPoolSize ¶ added in v0.5.0
SessionPoolSize returns the number of sessions in the pool that are idling.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(p *Pool, options ...PublisherOption) *Publisher
func (*Publisher) Get ¶ added in v0.2.0
func (p *Publisher) Get(ctx context.Context, queue string, autoAck bool) (msg Delivery, ok bool, err error)
Get is only supposed to be used for testing, do not use get for polling any broker queues.
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg Publishing) error
Publish a message to a specific exchange with a given routingKey. You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.
type PublisherOption ¶
type PublisherOption func(*publisherOption)
func PublisherWithAutoClosePool ¶
func PublisherWithAutoClosePool(autoClose bool) PublisherOption
func PublisherWithBackoffPolicy ¶ added in v0.8.1
func PublisherWithBackoffPolicy(backoffFunc BackoffFunc) PublisherOption
func PublisherWithContext ¶
func PublisherWithContext(ctx context.Context) PublisherOption
func PublisherWithLogger ¶ added in v0.2.0
func PublisherWithLogger(logger logging.Logger) PublisherOption
type Publishing ¶
type Publishing struct { // Application or exchange specific fields, // the headers exchange will inspect this field. Headers Table // Properties ContentType string // MIME content type ContentEncoding string // MIME content encoding DeliveryMode uint8 // Persistent (0 or 2) or Transient (1) (different from rabbitmq/amqp091-go library) Priority uint8 // 0 to 9 CorrelationId string // correlation identifier ReplyTo string // address to to reply to (ex: RPC) Expiration string // message expiration spec MessageId string // message identifier Timestamp time.Time // message timestamp Type string // message type name UserId string // creating user id - ex: "guest" AppId string // creating application id // The application specific payload of the message Body []byte // Since publishings are asynchronous, any undeliverable message will get returned by the server. // Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true. // Publishings can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing key, // or when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery. // This can return an error when the channel, connection or socket is closed. The error or lack of an error does not indicate whether the server has received this publishing. Mandatory bool Immediate bool }
Publishing captures the client message sent to the server. The fields outside of the Headers table included in this struct mirror the underlying fields in the content frame. They use native types for convenience and efficiency.
type Queue ¶ added in v0.7.0
type Queue amqp091.Queue
type Queue struct { Name string // server confirmed or generated name Messages int // count of messages not awaiting acknowledgment Consumers int // number of consumers receiving deliveries }
Queue captures the current server state of the queue on the server returned from Channel.QueueDeclare or Channel.QueueInspect.
type QueueBindOptions ¶ added in v0.5.0
type QueueConfig ¶ added in v0.7.1
type QueueConfig struct { Queue string ConsumeOptions }
QueueConfig is a read only snapshot of the current handler's queue configuration. It is the common configuration between the handler and the batch handler.
type QueueDeclareOptions ¶ added in v0.5.0
type QueueDeclareOptions struct { // Durable and Non-Auto-Deleted queues will survive server restarts and remain // when there are no remaining consumers or bindings. Persistent publishings will // be restored in this queue on server restart. These queues are only able to be // bound to durable exchanges. // // Non-Durable and Non-Auto-Deleted queues will remain declared as long as the // server is running regardless of how many consumers. This lifetime is useful // for temporary topologies that may have long delays between consumer activity. // These queues can only be bound to non-durable exchanges. // // Durable and Auto-Deleted queues will be restored on server restart, but without // active consumers will not survive and be removed. This Lifetime is unlikely // to be useful. Durable bool // Non-Durable and Auto-Deleted queues will not be redeclared on server restart // and will be deleted by the server after a short time when the last consumer is // canceled or the last consumer's channel is closed. Queues with this lifetime // can also be deleted normally with QueueDelete. These durable queues can only // be bound to non-durable exchanges. // // Non-Durable and Non-Auto-Deleted queues will remain declared as long as the // server is running regardless of how many consumers. This lifetime is useful // for temporary topologies that may have long delays between consumer activity. // These queues can only be bound to non-durable exchanges. // // Durable and Auto-Deleted queues will be restored on server restart, but without // active consumers will not survive and be removed. This Lifetime is unlikely // to be useful. AutoDelete bool // Exclusive queues are only accessible by the connection that declares them and // will be deleted when the connection closes. Channels on other connections // will receive an error when attempting to declare, bind, consume, purge or // delete a queue with the same name. Exclusive bool // When noWait is true, the queue will assume to be declared on the server. A // channel exception will arrive if the conditions are met for existing queues // or attempting to modify an existing queue from a different connection. // NoWait bool // Args are additional properties you can set, like the queue type. Args Table }
QueueDeclareOptions can be passed to the queue declaration If you want to change your default queue behavior.
type QueueDeleteOptions ¶ added in v0.5.0
type QueueDeleteOptions struct { // When IfUnused is true, the queue will not be deleted if there are any // consumers on the queue. If there are consumers, an error will be returned and // the channel will be closed. IfUnused bool // When IfEmpty is true, the queue will not be deleted if there are any messages // remaining on the queue. If there are messages, an error will be returned and // the channel will be closed. IfEmpty bool // When NoWait is true, the queue will be deleted without waiting for a response // from the server. The purged message count will not be meaningful. If the queue // could not be deleted, a channel exception will be raised and the channel will // be closed. NoWait bool }
QueueDeleteOptions are options for deleting a queue.
type QueuePurgeOptions ¶ added in v0.7.0
type QueuePurgeOptions struct { // If NoWait is true, do not wait for the server response and the number of messages purged will not be meaningful. NoWait bool }
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session is a wrapper for an amqp channel. It MUST not be used in a multithreaded context, but only in a single goroutine.
func NewSession ¶
func NewSession(conn *Connection, name string, options ...SessionOption) (*Session, error)
NewSession wraps a connection and a channel in order tointeract with the message broker. By default the context of the parent connection is used for cancellation.
func (*Session) Ack ¶
Ack confirms the processing of the message. In case the underlying channel dies, you cannot send a nack for the processed message. You might receive the message again from the broker, as it expects a n/ack
func (*Session) AwaitConfirm ¶
AwaitConfirm tries to await a confirmation from the broker for a published message You may check for ErrNack in order to see whether the broker rejected the message temporatily. WARNING: AwaitConfirm cannot be retried in case the channel dies or errors. You must resend your message and attempt to await it again.
func (*Session) Close ¶
Close closes the session completely. Do not use this method in case you have acquired the session from a connection pool. Use the ConnectionPool.ResurnSession method in order to return the session.
func (*Session) Connect ¶
Connect tries to create (or re-create) the channel from the Connection it is derived from.
func (*Session) ConsumeWithContext ¶ added in v0.7.0
func (s *Session) ConsumeWithContext(ctx context.Context, queue string, option ...ConsumeOptions) (<-chan amqp091.Delivery, error)
Consume immediately starts delivering queued messages.
Begin receiving on the returned chan Delivery before any other operation on the Connection or Channel. Continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must range over the chan to ensure all deliveries are received.
Unreceived deliveries will block all methods on the same connection. All deliveries in AMQP must be acknowledged. It is expected of the consumer to call Delivery.Ack after it has successfully processed the delivery.
If the consumer is cancelled or the channel or connection is closed any unacknowledged deliveries will be requeued at the end of the same queue.
Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan. When the Channel or Connection is closed, all buffered and inflight messages will be dropped. When the consumer identifier tag is cancelled, all inflight messages will be delivered until the returned chan is closed.
func (*Session) Error ¶ added in v0.4.0
Error returns all errors from the errors channel and flushes all other pending errors from the channel In case that there are no errors, nil is returned.
func (*Session) ExchangeBind ¶
func (s *Session) ExchangeBind(ctx context.Context, destination string, routingKey string, source string, option ...ExchangeBindOptions) error
ExchangeBind binds an exchange to another exchange to create inter-exchange routing topologies on the server. This can decouple the private topology and routing exchanges from exchanges intended solely for publishing endpoints.
Binding two exchanges with identical arguments will not create duplicate bindings.
Binding one exchange to another with multiple bindings will only deliver a message once. For example if you bind your exchange to `amq.fanout` with two different binding keys, only a single message will be delivered to your exchange even though multiple bindings will match.
Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing key is matched.
ExchangeBind("sell", "MSFT", "trade", false, nil) ExchangeBind("buy", "AAPL", "trade", false, nil) Delivery Source Key Destination example exchange exchange ----------------------------------------------- key: AAPL --> trade ----> MSFT sell \---> AAPL --> buy
func (*Session) ExchangeDeclare ¶
func (s *Session) ExchangeDeclare(ctx context.Context, name string, kind ExchangeKind, option ...ExchangeDeclareOptions) error
ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
Errors returned from this method will close the channel.
Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consist of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon.
Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers".
func (*Session) ExchangeDeclarePassive ¶ added in v0.7.0
func (s *Session) ExchangeDeclarePassive(ctx context.Context, name string, kind ExchangeKind, option ...ExchangeDeclareOptions) error
ExchangeDeclarePassive is functionally and parametrically equivalent to ExchangeDeclare, except that it sets the "passive" attribute to true. A passive exchange is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent exchange will cause RabbitMQ to throw an exception. This function can be used to detect the existence of an exchange.
func (*Session) ExchangeDelete ¶
func (s *Session) ExchangeDelete(ctx context.Context, name string, option ...ExchangeDeleteOptions) error
ExchangeDelete removes the named exchange from the server. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.
func (*Session) ExchangeUnbind ¶
func (s *Session) ExchangeUnbind(ctx context.Context, destination string, routingKey string, source string, option ...ExchangeUnbindOptions) error
ExchangeUnbind unbinds the destination exchange from the source exchange on the server by removing the routing key between them. This is the inverse of ExchangeBind. If the binding does not currently exist, an error will be returned.
func (*Session) Flag ¶ added in v0.8.0
Flag marks the session as flagged. This is useful in case of a connection pool, where the session is returned to the pool and should be recovered by the next user.
func (*Session) Flow ¶
Flow allows to enable or disable flow from the message broker Flow pauses the delivery of messages to consumers on this channel. Channels are opened with flow control active, to open a channel with paused deliveries immediately call this method with `false` after calling Connection.Channel.
When active is `false`, this method asks the server to temporarily pause deliveries until called again with active as `true`.
Channel.Get methods will not be affected by flow control.
This method is not intended to act as window control. Use Channel.Qos to limit the number of unacknowledged messages or bytes in flight instead.
The server may also send us flow methods to throttle our publishings. A well behaving publishing client should add a listener with Channel.NotifyFlow and pause its publishings when `false` is sent on that channel.
Note: RabbitMQ prefers to use TCP push back to control flow for all channels on a connection, so under high volume scenarios, it's wise to open separate Connections for publishings and deliveries.
func (*Session) FlushConfirms ¶ added in v0.9.0
func (s *Session) FlushConfirms()
Flush confirms channel
func (*Session) FlushReturned ¶ added in v0.9.0
func (s *Session) FlushReturned()
FlushReturned publish channel
func (*Session) Get ¶ added in v0.2.0
func (s *Session) Get(ctx context.Context, queue string, autoAck bool) (msg Delivery, ok bool, err error)
Get is only supposed to be used for testing purposes, do not us eit to poll the queue periodically.
func (*Session) IsCached ¶
IsCached returns true in case this session is supposed to be returned to a session pool.
func (*Session) IsConfirmable ¶
IsConfirmable returns true in case this session requires that after Publishing a message you also MUST Await its confirmation
func (*Session) Nack ¶
Nack rejects the message. In case the underlying channel dies, you cannot send a nack for the processed message. You might receive the message again from the broker, as it expects a n/ack
func (*Session) Publish ¶
func (s *Session) Publish(ctx context.Context, exchange string, routingKey string, msg Publishing) (deliveryTag uint64, err error)
Publish sends a Publishing from the client to an exchange on the server. When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is because every declared queue gets an implicit route to the default exchange. It is possible for publishing to not reach the broker if the underlying socket is shut down without pending publishing packets being flushed from the kernel buffers. The easy way of making it probable that all publishings reach the server is to always call Connection.Close before terminating your publishing application. The way to ensure that all publishings reach the server is to add a listener to Channel.NotifyPublish and put the channel in confirm mode with Channel.Confirm. Publishing delivery tags and their corresponding confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1.
func (*Session) Qos ¶ added in v0.6.0
Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.
With a prefetch count greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with noAck because no acknowledgments are expected or sent.
With a prefetch size greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers. This option is ignored when consumers are started with noAck.
To get round-robin behavior between consumers consuming from the same queue on different connections, set the prefetch count to 1, and the next available message on the server will be delivered to the next available consumer.
If your consumer work time is reasonably consistent and not much greater than two times your network round trip time, you will see significant throughput improvements starting with a prefetch count of 2 or slightly greater as described by benchmarks on RabbitMQ.
http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
func (*Session) QueueBind ¶
func (s *Session) QueueBind(ctx context.Context, queueName string, routingKey string, exchange string, option ...QueueBindOptions) error
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
QueueBind("pagers", "alert", "log", false, nil) QueueBind("emails", "info", "log", false, nil) Delivery Exchange Key Queue ----------------------------------------------- key: alert --> log ----> alert --> pagers key: info ---> log ----> info ---> emails key: debug --> log (none) (dropped)
If a binding with the same key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained.
In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges.
QueueBind("pagers", "alert", "amq.topic", false, nil) QueueBind("emails", "info", "amq.topic", false, nil) QueueBind("emails", "#", "amq.topic", false, nil) // match everything Delivery Exchange Key Queue ----------------------------------------------- key: alert --> amq.topic ----> alert --> pagers key: info ---> amq.topic ----> # ------> emails \---> info ---/ key: debug --> amq.topic ----> # ------> emails
It is only possible to bind a durable queue to a durable exchange regardless of whether the queue or exchange is auto-deleted. Bindings between durable queues and exchanges will also be restored on server restart.
If the binding could not complete, an error will be returned and the channel will be closed.
func (*Session) QueueDeclare ¶
func (s *Session) QueueDeclare(ctx context.Context, name string, option ...QueueDeclareOptions) (Queue, error)
QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
Every queue declared gets a default binding to the empty exchange "" which has the type "direct" with the routing key matching the queue's name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil) Publish("", "alerts", false, false, Publishing{Body: []byte("...")}) Delivery Exchange Key Queue ----------------------------------------------- key: alerts -> "" -> alerts -> alerts
The queue name may be empty, in which case the server will generate a unique name which will be returned in the Name field of Queue struct.
When the error return value is not nil, you can assume the queue could not be declared with these parameters, and the channel will be closed.
func (*Session) QueueDeclarePassive ¶ added in v0.7.0
func (s *Session) QueueDeclarePassive(ctx context.Context, name string, option ...QueueDeclareOptions) (Queue, error)
QueueDeclarePassive is functionally and parametrically equivalent to QueueDeclare, except that it sets the "passive" attribute to true. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception. This function can be used to test for the existence of a queue.
func (*Session) QueueDelete ¶
func (s *Session) QueueDelete(ctx context.Context, name string, option ...QueueDeleteOptions) (purgedMsgs int, err error)
QueueDelete removes the queue from the server including all bindings then purges the messages based on server configuration, returning the number of messages purged.
func (*Session) QueuePurge ¶ added in v0.7.0
func (s *Session) QueuePurge(ctx context.Context, name string, options ...QueuePurgeOptions) (int, error)
QueuePurge removes all messages from the named queue which are not waiting to be acknowledged. Messages that have been delivered but have not yet been acknowledged will not be removed. When successful, returns the number of messages purged.
func (*Session) QueueUnbind ¶
func (s *Session) QueueUnbind(ctx context.Context, name string, routingKey string, exchange string, arg ...Table) error
QueueUnbind removes a binding between an exchange and queue matching the key and arguments. It is possible to send and empty string for the exchange name which means to unbind the queue from the default exchange.
func (*Session) Tx ¶ added in v0.8.0
Tx puts the channel into transaction mode on the server. All publishings and acknowledgments following this method will be atomically committed or rolled back for a single queue. Call either Channel.TxCommit or Channel.TxRollback to leave this transaction and immediately start a new transaction.
The atomicity across multiple queues is not defined as queue declarations and bindings are not included in the transaction.
The behavior of publishings that are delivered as mandatory or immediate while the channel is in a transaction is not defined.
Once a channel has been put into transaction mode, it cannot be taken out of transaction mode. Use a different channel for non-transactional semantics.
func (*Session) TxCommit ¶ added in v0.8.0
TxCommit atomically commits all publishings and acknowledgments for a single queue and immediately start a new transaction.
Calling this method without having called Channel.Tx is an error.
func (*Session) TxRollback ¶ added in v0.8.0
TxRollback atomically rolls back all publishings and acknowledgments for a single queue and immediately start a new transaction.
Calling this method without having called Channel.Tx is an error.
type SessionOption ¶
type SessionOption func(*sessionOption)
func SessionWithAutoCloseConnection ¶
func SessionWithAutoCloseConnection(autoClose bool) SessionOption
SessionWithAutoCloseConnection is important for transient sessions which, as they allow to create sessions that close their internal connections automatically upon closing themselves.
func SessionWithBufferCapacity ¶ added in v0.8.0
func SessionWithBufferCapacity(capacity int) SessionOption
SessionWithBufferSize allows to customize the size of th einternal channel buffers. all buffers/channels are initialized with this size. (e.g. error or confirm channels)
func SessionWithCached ¶
func SessionWithCached(cached bool) SessionOption
SessionWithCached makes a session a cached session This is only necessary for the session pool, as cached sessions are part of a pool and can be returned back to the pool without being closed.
func SessionWithConfirms ¶
func SessionWithConfirms(requiresPublishConfirms bool) SessionOption
SessionContext allows enable or explicitly disable message acknowledgements (acks)
func SessionWithConsumeContextRetryCallback ¶ added in v0.8.0
func SessionWithConsumeContextRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithConsumeContextRetryCallback allows to set a custom consume retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithConsumeRetryCallback ¶ added in v0.8.0
func SessionWithConsumeRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithConsumeRetryCallback allows to set a custom consume retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithContext ¶
func SessionWithContext(ctx context.Context) SessionOption
SessionWithContext allows to set a custom session context that might trigger a shutdown
func SessionWithExchangeBindRetryCallback ¶ added in v0.8.0
func SessionWithExchangeBindRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithExchangeBindRetryCallback allows to set a custom exchange bind retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithExchangeDeclarePassiveRetryCallback ¶ added in v0.8.0
func SessionWithExchangeDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithExchangeDeclarePassiveRetryCallback allows to set a custom exchange declare passive retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithExchangeDeclareRetryCallback ¶ added in v0.8.0
func SessionWithExchangeDeclareRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithExchangeDeclareRetryCallback allows to set a custom exchange declare retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithExchangeDeleteRetryCallback ¶ added in v0.8.0
func SessionWithExchangeDeleteRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithExchangeDeleteRetryCallback allows to set a custom exchange delete retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithExchangeUnbindRetryCallback ¶ added in v0.8.0
func SessionWithExchangeUnbindRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithExchangeUnbindRetryCallback allows to set a custom exchange unbind retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithFlowRetryCallback ¶ added in v0.8.0
func SessionWithFlowRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithFlowRetryCallback allows to set a custom flow retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithGetRetryCallback ¶ added in v0.8.0
func SessionWithGetRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithGetRetryCallback allows to set a custom get retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithLogger ¶
func SessionWithLogger(logger logging.Logger) SessionOption
SessionWithLogger allows to set a logger. By default no logger is set.
func SessionWithPublishRetryCallback ¶ added in v0.8.0
func SessionWithPublishRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithPublishRetryCallback allows to set a custom publish retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithQoSRetryCallback ¶ added in v0.8.0
func SessionWithQoSRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithQoSRetryCallback allows to set a custom qos retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithQueueBindRetryCallback ¶ added in v0.8.0
func SessionWithQueueBindRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithQueueBindRetryCallback allows to set a custom queue bind retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithQueueDeclarePassiveRetryCallback ¶ added in v0.8.0
func SessionWithQueueDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithQueueDeclarePassiveRetryCallback allows to set a custom queue declare passive retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithQueueDeclareRetryCallback ¶ added in v0.8.0
func SessionWithQueueDeclareRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithQueueDeclareRetryCallback allows to set a custom queue declare retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithQueueDeleteRetryCallback ¶ added in v0.8.0
func SessionWithQueueDeleteRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithQueueDeleteRetryCallback allows to set a custom queue delete retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithQueuePurgeRetryCallback ¶ added in v0.8.0
func SessionWithQueuePurgeRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithQueuePurgeRetryCallback allows to set a custom queue purge retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithQueueUnbindRetryCallback ¶ added in v0.8.0
func SessionWithQueueUnbindRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithQueueUnbindRetryCallback allows to set a custom queue unbind retry callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithRecoverCallback ¶ added in v0.8.0
func SessionWithRecoverCallback(callback SessionRetryCallback) SessionOption
SessionWithRecoverCallback allows to set a custom recover callback. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
func SessionWithRetryCallback ¶ added in v0.8.0
func SessionWithRetryCallback(callback SessionRetryCallback) SessionOption
SessionWithRetryCallback allows to set a custom retry callback for all operations. The callback should not interact with anything that may lead to any kind of errors. It should preferrably delegate its work to a separate goroutine.
type SessionPool ¶
type SessionPool struct { RecoverCallback SessionRetryCallback PublishRetryCallback SessionRetryCallback GetRetryCallback SessionRetryCallback ConsumeRetryCallback SessionRetryCallback ConsumeContextRetryCallback SessionRetryCallback ExchangeDeclareRetryCallback SessionRetryCallback ExchangeDeclarePassiveRetryCallback SessionRetryCallback ExchangeDeleteRetryCallback SessionRetryCallback QueueDeclareRetryCallback SessionRetryCallback QueueDeclarePassiveRetryCallback SessionRetryCallback QueueDeleteRetryCallback SessionRetryCallback QueueBindRetryCallback SessionRetryCallback QueueUnbindRetryCallback SessionRetryCallback QueuePurgeRetryCallback SessionRetryCallback ExchangeBindRetryCallback SessionRetryCallback ExchangeUnbindRetryCallback SessionRetryCallback QoSRetryCallback SessionRetryCallback FlowRetryCallback SessionRetryCallback // contains filtered or unexported fields }
func NewSessionPool ¶
func NewSessionPool(pool *ConnectionPool, numSessions int, options ...SessionPoolOption) (*SessionPool, error)
func (*SessionPool) Capacity ¶ added in v0.8.0
func (sp *SessionPool) Capacity() int
Capacity returns the size of the session pool which indicate t he number of available cached sessions.
func (*SessionPool) Close ¶
func (sp *SessionPool) Close()
Closes the session pool with all of its sessions
func (*SessionPool) GetSession ¶
func (sp *SessionPool) GetSession(ctx context.Context) (s *Session, err error)
GetSession gets a pooled session. blocks until a session is acquired from the pool.
func (*SessionPool) GetTransientSession ¶
func (sp *SessionPool) GetTransientSession(ctx context.Context) (s *Session, err error)
GetTransientSession returns a transient session. This method may return an error when the context has been closed before a session could be obtained. A transient session creates a transient connection under the hood.
func (*SessionPool) ReturnSession ¶
func (sp *SessionPool) ReturnSession(session *Session, err error)
ReturnSession returns a Session to the pool. If Session is not a cached channel, it is simply closed here.
func (*SessionPool) Size ¶
func (sp *SessionPool) Size() int
Size returns the number of available idle sessions in the pool.
type SessionPoolOption ¶
type SessionPoolOption func(*sessionPoolOption)
func SessionPoolWithAutoCloseConnectionPool ¶ added in v0.2.0
func SessionPoolWithAutoCloseConnectionPool(autoClose bool) SessionPoolOption
SessionPoolWithAutoCloseConnectionPool allows to close the internal connection pool automatically. This is helpful in case you have a session pool that is the onl yuser of the connection pool. You are basically passing ownership of the connection pool to the session pool with this.
func SessionPoolWithBufferCapacity ¶ added in v0.8.0
func SessionPoolWithBufferCapacity(capacity int) SessionPoolOption
SessionPoolWithBufferCapacity allows to configure the size of the confirmation, error & blocker buffers of all sessions
func SessionPoolWithConfirms ¶
func SessionPoolWithConfirms(requirePublishConfirms bool) SessionPoolOption
SessionPoolWithConfirms requires all messages from sessions to be acked.
func SessionPoolWithConsumeContextRetryCallback ¶ added in v0.8.0
func SessionPoolWithConsumeContextRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithConsumeContextRetryCallback allows to set a custom consume context retry callback for the session pool.
func SessionPoolWithConsumeRetryCallback ¶ added in v0.8.0
func SessionPoolWithConsumeRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithConsumeRetryCallback allows to set a custom consume retry callback for the session pool.
func SessionPoolWithExchangeBindRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeBindRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeBindRetryCallback allows to set a custom exchange bind retry callback for the session pool.
func SessionPoolWithExchangeDeclarePassiveRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeDeclarePassiveRetryCallback allows to set a custom exchange declare passive retry callback for the session pool.
func SessionPoolWithExchangeDeclareRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeDeclareRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeDeclareRetryCallback allows to set a custom exchange declare retry callback for the session pool.
func SessionPoolWithExchangeDeleteRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeDeleteRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeDeleteRetryCallback allows to set a custom exchange delete retry callback for the session pool.
func SessionPoolWithExchangeUnbindRetryCallback ¶ added in v0.8.0
func SessionPoolWithExchangeUnbindRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithExchangeUnbindRetryCallback allows to set a custom exchange unbind retry callback for the session pool.
func SessionPoolWithFlowRetryCallback ¶ added in v0.8.0
func SessionPoolWithFlowRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithFlowRetryCallback allows to set a custom flow retry callback for the session pool.
func SessionPoolWithGetRetryCallback ¶ added in v0.8.0
func SessionPoolWithGetRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithGetRetryCallback allows to set a custom get retry callback for the session pool.
func SessionPoolWithLogger ¶
func SessionPoolWithLogger(logger logging.Logger) SessionPoolOption
SessionPoolWithLogger allows to set a custom logger
func SessionPoolWithPublishRetryCallback ¶ added in v0.8.0
func SessionPoolWithPublishRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithPublishRetryCallback allows to set a custom publish retry callback for the session pool.
func SessionPoolWithQoSRetryCallback ¶ added in v0.8.0
func SessionPoolWithQoSRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithQoSRetryCallback allows to set a custom qos retry callback for the session pool.
func SessionPoolWithQueueBindRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueBindRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueBindRetryCallback allows to set a custom queue bind retry callback for the session pool.
func SessionPoolWithQueueDeclarePassiveRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueDeclarePassiveRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueDeclarePassiveRetryCallback allows to set a custom queue declare passive retry callback for the session pool.
func SessionPoolWithQueueDeclareRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueDeclareRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueDeclareRetryCallback allows to set a custom queue declare retry callback for the session pool.
func SessionPoolWithQueueDeleteRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueDeleteRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueDeleteRetryCallback allows to set a custom queue delete retry callback for the session pool.
func SessionPoolWithQueuePurgeRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueuePurgeRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithQueuePurgeRetryCallback allows to set a custom queue purge retry callback for the session pool.
func SessionPoolWithQueueUnbindRetryCallback ¶ added in v0.8.0
func SessionPoolWithQueueUnbindRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithQueueUnbindRetryCallback allows to set a custom queue unbind retry callback for the session pool.
func SessionPoolWithRecoverCallback ¶ added in v0.8.0
func SessionPoolWithRecoverCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithRecoverCallback allows to set a custom recover callback for the session pool.
func SessionPoolWithRetryCallback ¶ added in v0.8.0
func SessionPoolWithRetryCallback(callback SessionRetryCallback) SessionPoolOption
SessionPoolWithRetryCallback allows to set a custom retry callback for the session pool. This will set the same retry callback for all operations.
type SessionRetryCallback ¶ added in v0.8.0
RetryCallback is a function that is called when some operation fails.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(p *Pool, options ...SubscriberOption) *Subscriber
func (*Subscriber) Close ¶
func (s *Subscriber) Close()
func (*Subscriber) RegisterBatchHandler ¶ added in v0.6.0
func (s *Subscriber) RegisterBatchHandler(handler *BatchHandler)
RegisterBatchHandler registers a custom handler that MIGHT not be closed in case that the subscriber is closed. The passed batch handler may be derived from a different parent context.
func (*Subscriber) RegisterBatchHandlerFunc ¶ added in v0.6.0
func (s *Subscriber) RegisterBatchHandlerFunc(queue string, hf BatchHandlerFunc, options ...BatchHandlerOption) *BatchHandler
RegisterBatchHandlerFunc registers a function that is able to process up to `maxBatchSize` messages at the same time. The flushTimeout is the duration to wait before triggering the processing of the messages. In case your maxBatchSize is 50 and there are only 20 messages in a queue which you can fetch and then you'd have to wait indefinitly for those 20 messages to be processed, as it might take a long time for another message to arrive in the queue. This is where your flushTimeout comes into play. In order to wait at most for the period of flushTimeout until a new message arrives before processing the batch in your handler function.
func (*Subscriber) RegisterHandler ¶
func (s *Subscriber) RegisterHandler(handler *Handler)
func (*Subscriber) RegisterHandlerFunc ¶ added in v0.2.0
func (s *Subscriber) RegisterHandlerFunc(queue string, hf HandlerFunc, options ...ConsumeOptions) *Handler
RegisterHandlerFunc registers a consumer function that starts a consumer upon subscriber startup. The consumer is identified by a string that is unique and scoped for all consumers on this channel. An empty string will cause the library to generate a unique identity. The consumer identity will be included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details.
When exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ. It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.
When noWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed. Optional arguments can be provided that have specific semantics for the queue or server.
Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan. When the Channel or Connection is closed, all buffered and inflight messages will be dropped. When the consumer identifier tag is cancelled, all inflight messages will be delivered until the returned chan is closed.
func (*Subscriber) Start ¶
func (s *Subscriber) Start(ctx context.Context) (err error)
Start starts the consumers for all registered handler functions This method is not blocking. Use Wait() to wait for all routines to shut down via context cancelation (e.g. via a signal)
func (*Subscriber) Wait ¶
func (s *Subscriber) Wait()
Wait waits until all consumers have been closed. The provided context must have been closed in order for Wait to unlock after all consumer goroutines of the subscriber have been closed.
type SubscriberOption ¶
type SubscriberOption func(*subscriberOption)
func SubscriberWithAutoClosePool ¶
func SubscriberWithAutoClosePool(autoClose bool) SubscriberOption
func SubscriberWithContext ¶
func SubscriberWithContext(ctx context.Context) SubscriberOption
func SubscriberWithLogger ¶ added in v0.2.0
func SubscriberWithLogger(logger logging.Logger) SubscriberOption
type Table ¶
type Table amqp091.Table
Table is a dynamic map of arguments that may be passed additionally to functions. type Table map[string]interface{}
Table stores user supplied fields of the following types:
bool byte int8 float32 float64 int int16 int32 int64 nil string time.Time amqp.Decimal amqp.Table []byte []interface{} - containing above types
Functions taking a table will immediately fail when the table contains a value of an unsupported type.
The caller must be specific in which precision of integer it wishes to encode.
Use a type assertion when reading values from a table for type conversion.
RabbitMQ expects int32 for integer values.
func (Table) DeliveryCount ¶ added in v0.9.0
Returns the number of deliveries of the message.
func (Table) WithDeadLetterExchange ¶ added in v0.9.0
Rejected messages will be routed to the dead-letter exchange. Which in turn routes them to some specified queue.
func (Table) WithDeadLetterExchangeAndRoutingKey ¶ added in v0.9.0
Rejected messages will be routed to the dead-letter exchange. Which in turn routes them to using the specified routing key.
func (Table) WithDeliveryLimit ¶ added in v0.9.0
This option is interesting for quorum queues. It specifies the number f redeliveries that still presereve the order of the messages. After that the order is not quaranteed anymore.
type Topologer ¶ added in v0.2.0
type Topologer struct {
// contains filtered or unexported fields
}
func NewTopologer ¶ added in v0.2.0
func NewTopologer(p *Pool, options ...TopologerOption) *Topologer
func (*Topologer) ExchangeBind ¶ added in v0.2.0
func (t *Topologer) ExchangeBind(ctx context.Context, destination string, routingKey string, source string, option ...ExchangeBindOptions) (err error)
ExchangeBind binds an exchange to another exchange to create inter-exchange routing topologies on the server. This can decouple the private topology and routing exchanges from exchanges intended solely for publishing endpoints.
Binding two exchanges with identical arguments will not create duplicate bindings.
Binding one exchange to another with multiple bindings will only deliver a message once. For example if you bind your exchange to `amq.fanout` with two different binding keys, only a single message will be delivered to your exchange even though multiple bindings will match.
Given a message delivered to the source exchange, the message will be forwarded to the destination exchange when the routing key is matched.
ExchangeBind("sell", "MSFT", "trade", false, nil) ExchangeBind("buy", "AAPL", "trade", false, nil) Delivery Source Key Destination example exchange exchange ----------------------------------------------- key: AAPL --> trade ----> MSFT sell \---> AAPL --> buy
func (*Topologer) ExchangeDeclare ¶ added in v0.2.0
func (t *Topologer) ExchangeDeclare(ctx context.Context, name string, kind ExchangeKind, option ...ExchangeDeclareOptions) (err error)
ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags.
Errors returned from this method will close the session. Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consist of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon.
Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers".
func (*Topologer) ExchangeDeclarePassive ¶ added in v0.7.0
func (t *Topologer) ExchangeDeclarePassive(ctx context.Context, name string, kind ExchangeKind, option ...ExchangeDeclareOptions) (err error)
ExchangeDeclarePassive is functionally and parametrically equivalent to ExchangeDeclare, except that it sets the "passive" attribute to true. A passive exchange is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent exchange will cause RabbitMQ to throw an exception. This function can be used to detect the existence of an exchange.
func (*Topologer) ExchangeDelete ¶ added in v0.2.0
func (t *Topologer) ExchangeDelete(ctx context.Context, name string, option ...ExchangeDeleteOptions) (err error)
ExchangeDelete removes the named exchange from the server. When an exchange is deleted all queue bindings on the exchange are also deleted. If this exchange does not exist, the channel will be closed with an error.
func (*Topologer) ExchangeUnbind ¶ added in v0.2.0
func (t *Topologer) ExchangeUnbind(ctx context.Context, destination string, routingKey string, source string, option ...ExchangeUnbindOptions) (err error)
ExchangeUnbind unbinds the destination exchange from the source exchange on the server by removing the routing key between them. This is the inverse of ExchangeBind. If the binding does not currently exist, an error will be returned.
func (*Topologer) QueueBind ¶ added in v0.2.0
func (t *Topologer) QueueBind(ctx context.Context, name string, routingKey string, exchange string, option ...QueueBindOptions) (err error)
QueueBind binds an exchange to a queue so that publishings to the exchange will be routed to the queue when the publishing routing key matches the binding routing key.
QueueBind("pagers", "alert", "log", false, nil) QueueBind("emails", "info", "log", false, nil) Delivery Exchange Key Queue ----------------------------------------------- key: alert --> log ----> alert --> pagers key: info ---> log ----> info ---> emails key: debug --> log (none) (dropped)
If a binding with the same key and arguments already exists between the exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained.
In the case that multiple bindings may cause the message to be routed to the same queue, the server will only route the publishing once. This is possible with topic exchanges.
QueueBind("pagers", "alert", "amq.topic", false, nil) QueueBind("emails", "info", "amq.topic", false, nil) QueueBind("emails", "#", "amq.topic", false, nil) // match everything Delivery Exchange Key Queue ----------------------------------------------- key: alert --> amq.topic ----> alert --> pagers key: info ---> amq.topic ----> # ------> emails \---> info ---/ key: debug --> amq.topic ----> # ------> emails
func (*Topologer) QueueDeclare ¶ added in v0.2.0
func (t *Topologer) QueueDeclare(ctx context.Context, name string, option ...QueueDeclareOptions) (queue Queue, err error)
QueueDeclare declares a queue to hold messages and deliver to consumers. Declaring creates a queue if it doesn't already exist, or ensures that an existing queue matches the same parameters.
Every queue declared gets a default binding to the empty exchange "" which has the type "direct" with the routing key matching the queue's name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil) Publish("", "alerts", false, false, Publishing{Body: []byte("...")}) Delivery Exchange Key Queue ----------------------------------------------- key: alerts -> "" -> alerts -> alerts
The queue name may be empty, in which case the server will generate a unique name which will be returned in the Name field of Queue struct.
func (*Topologer) QueueDeclarePassive ¶ added in v0.7.0
func (t *Topologer) QueueDeclarePassive(ctx context.Context, name string, option ...QueueDeclareOptions) (queue Queue, err error)
QueueDeclarePassive is functionally and parametrically equivalent to QueueDeclare, except that it sets the "passive" attribute to true. A passive queue is assumed by RabbitMQ to already exist, and attempting to connect to a non-existent queue will cause RabbitMQ to throw an exception. This function can be used to test for the existence of a queue.
func (*Topologer) QueueDelete ¶ added in v0.2.0
func (t *Topologer) QueueDelete(ctx context.Context, name string, option ...QueueDeleteOptions) (purged int, err error)
QueueDelete removes the queue from the server including all bindings then purges the messages based on server configuration, returning the number of messages purged.
func (*Topologer) QueuePurge ¶ added in v0.7.0
func (t *Topologer) QueuePurge(ctx context.Context, name string, options ...QueuePurgeOptions) (int, error)
QueuePurge removes all messages from the named queue which are not waiting to be acknowledged. Messages that have been delivered but have not yet been acknowledged will not be removed. When successful, returns the number of messages purged.
type TopologerOption ¶ added in v0.2.0
type TopologerOption func(*topologerOption)
func TopologerWithContext ¶ added in v0.7.0
func TopologerWithContext(ctx context.Context) TopologerOption
func TopologerWithLogger ¶ added in v0.2.0
func TopologerWithLogger(logger logging.Logger) TopologerOption
func TopologerWithTransientSessions ¶ added in v0.7.0
func TopologerWithTransientSessions(transientOnly bool) TopologerOption
Source Files ¶
- callback_types.go
- connection.go
- connection_options.go
- connection_pool.go
- connection_pool_options.go
- delivery.go
- dial.go
- errors.go
- exchange.go
- helpers_context.go
- pool.go
- pool_options.go
- publisher.go
- publisher_option.go
- queue.go
- session.go
- session_options.go
- session_pool.go
- session_pool_options.go
- subscriber.go
- subscriber_batch_handler.go
- subscriber_handler.go
- subscriber_handler_options.go
- subscriber_options.go
- table.go
- topologer.go
- topologer_options.go
- utils.go