Documentation ¶
Index ¶
- Variables
- type ChannelHost
- type Config
- type ConnectionHost
- type ConnectionPool
- func (cp *ConnectionPool) GetChannel() (*ChannelHost, error)
- func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
- func (cp *ConnectionPool) GetTransientChannel(ackable bool) (*amqp.Channel, error)
- func (cp *ConnectionPool) Push(exchange, routingKey string, data amqp.Publishing) error
- func (cp *ConnectionPool) PushWithRetry(exchange, routingKey string, data amqp.Publishing) error
- func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)
- func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost)
- type Session
- func (session *Session) Ack(tag uint64) error
- func (session *Session) Close() error
- func (session *Session) Nack(tag uint64) error
- func (session *Session) Push(routingKey string, data amqp.Publishing) error
- func (session *Session) Stream(consumergroup string, autoAck bool, exclusive bool) (chan amqp.Delivery, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotConnected is returned when not connected to a server ErrNotConnected = errors.New("not connected to a server") // ErrAlreadyClosed is returned when the connection is already closed ErrAlreadyClosed = errors.New("already closed: not connected to the server") // ErrShutdown is returned when already shutting down ErrShutdown = errors.New("session is shutting down") // ErrNack is returned when a message publish fails with a NACK ErrNack = errors.New("message was not sent") // ErrServerBusy is returned when there is too much tcp backpressure on a channel ErrServerBusy = errors.New("server busy; message was not sent") // ErrPublishOnly is returned when a channel is publish only and you try and use a queue function ErrPublishOnly = errors.New("channel is publish only") // ErrTimedOut is returned when a message times out waiting for confirmation ErrTimedOut = errors.New("confirmation timed out") )
Functions ¶
This section is empty.
Types ¶
type ChannelHost ¶ added in v10.0.28
type ChannelHost struct { Channel *amqp.Channel Confirmations chan amqp.Confirmation Errors chan *amqp.Error Backpressure chan bool // contains filtered or unexported fields }
ChannelHost is an internal representation of amqp.Connection.
func NewChannelHost ¶ added in v10.0.28
func NewChannelHost( logger log.Logger, connHost *ConnectionHost, ackable bool) (*ChannelHost, error)
NewChannelHost creates a simple ConnectionHost wrapper for management by end-user developer.
func (*ChannelHost) Close ¶ added in v10.0.28
func (ch *ChannelHost) Close()
Close allows for manual close of Amqp Channel kept internally.
func (*ChannelHost) MakeChannel ¶ added in v10.0.28
func (ch *ChannelHost) MakeChannel() (err error)
MakeChannel tries to create (or re-create) the channel from the ConnectionHost its attached to.
type Config ¶
type Config struct { Name string ID string Exchange string ConsumerConnectionPool *ConnectionPool PublisherConnectionPool *ConnectionPool AutoAck bool DurableQueue bool DeleteUnused bool Exclusive bool Args amqp.Table Qos int PublishOnly bool RoutingKeys []string Context context.Context }
Config for the session
type ConnectionHost ¶ added in v10.0.28
type ConnectionHost struct { Connection *amqp.Connection Errors chan *amqp.Error // contains filtered or unexported fields }
ConnectionHost is an internal representation of amqp.Connection.
func NewConnectionHost ¶ added in v10.0.28
func NewConnectionHost( logger log.Logger, uri string, connectionName string, ) (*ConnectionHost, error)
NewConnectionHost creates a simple ConnectionHost wrapper for management by end-user developer.
func (*ConnectionHost) CanTakeMoreChannels ¶ added in v10.0.28
func (ch *ConnectionHost) CanTakeMoreChannels(maxchannels int) bool
CanTakeMoreChannels is a helpermethod to see if a connection can take another channel
func (*ConnectionHost) Connect ¶ added in v10.0.28
func (ch *ConnectionHost) Connect() error
Connect tries to connect (or reconnect) to the provided properties of the host one time.
func (*ConnectionHost) IncrChannels ¶ added in v10.0.28
func (ch *ConnectionHost) IncrChannels(i int)
IncrChannels increments the channel count by i (use -1 for decr)
func (*ConnectionHost) VerifyConnection ¶ added in v10.0.28
func (ch *ConnectionHost) VerifyConnection()
VerifyConnection will make sure a connection is ok or try to reconnect it
type ConnectionPool ¶ added in v10.0.28
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool houses the pool of RabbitMQ connections.
func NewConnectionPool ¶ added in v10.0.28
func NewConnectionPool(logger log.Logger, baseConnectionName, uri string, connections, channelsPerConnection int) (*ConnectionPool, error)
NewConnectionPool creates hosting structure for the ConnectionPool.
func (*ConnectionPool) GetChannel ¶ added in v10.0.28
func (cp *ConnectionPool) GetChannel() (*ChannelHost, error)
GetChannel gets a ackable channel from the Pool if they exist or creates a channel.
func (*ConnectionPool) GetConnection ¶ added in v10.0.28
func (cp *ConnectionPool) GetConnection() (*ConnectionHost, error)
GetConnection gets a connection based on whats in the ConnectionPool or creates one to avoid blocking
func (*ConnectionPool) GetTransientChannel ¶ added in v10.0.28
func (cp *ConnectionPool) GetTransientChannel(ackable bool) (*amqp.Channel, error)
GetTransientChannel allows you create an unmanaged amqp Channel with the help of the ConnectionPool.
func (*ConnectionPool) Push ¶ added in v10.0.28
func (cp *ConnectionPool) Push(exchange, routingKey string, data amqp.Publishing) error
Push sends data using the publisher channel pool
func (*ConnectionPool) PushWithRetry ¶ added in v10.0.28
func (cp *ConnectionPool) PushWithRetry(exchange, routingKey string, data amqp.Publishing) error
PushWithRetry sends data using the publisher channel pool
func (*ConnectionPool) ReturnChannel ¶ added in v10.0.28
func (cp *ConnectionPool) ReturnChannel(chanHost *ChannelHost, erred bool)
ReturnChannel returns a Channel.
func (*ConnectionPool) ReturnConnection ¶ added in v10.0.28
func (cp *ConnectionPool) ReturnConnection(connHost *ConnectionHost)
ReturnConnection puts the connection back in the queue
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session is the rabbitmq session
func New ¶
New creates a new consumer state instance, and automatically attempts to connect to the server.
func (*Session) Push ¶
func (session *Session) Push(routingKey string, data amqp.Publishing) error
Push will publish data to channel
func (*Session) Stream ¶
func (session *Session) Stream(consumergroup string, autoAck bool, exclusive bool) (chan amqp.Delivery, error)
Stream will continuously put queue items on the channel. It is required to call delivery.Ack when it has been successfully processed, or delivery.Nack when it fails. Ignoring this will cause data to build up on the server.