Documentation ¶
Index ¶
- Constants
- type Consumer
- func (c *Consumer) Ack(tag uint64, multiple bool) error
- func (c *Consumer) Cancel() error
- func (c *Consumer) Channel() (*amqp.Channel, error)
- func (c *Consumer) Close() error
- func (c *Consumer) Consume(queue string, exclusive bool) (<-chan amqp.Delivery, error)
- func (c *Consumer) Disconnect() error
- func (c *Consumer) ExchangeDeclare(name, kind string) error
- func (c *Consumer) IsChannelOrConnectionClosedError(err error) bool
- func (c *Consumer) IsExclusiveUseError(queue string, err error) bool
- func (c *Consumer) Nack(tag uint64, multiple bool, requeue bool) error
- func (c *Consumer) Qos(prefetchCount int, global bool) error
- func (c *Consumer) QueueBind(queue, exchange, key string) error
- func (c *Consumer) QueueDeclare(name string) error
- type Pool
- func NewPool(addr, user, host, vhost, tagPrefix, exchange, queue, key string, ...) (*Pool, error)
- func NewPoolWithConfig(config *rabbitmq.Config, ...) (*Pool, error)
- func NewPoolWithDefault(addr, user, host, vhost, tagPrefix, exchange, queue, key string) (*Pool, error)
- func NewPoolWithPoolConfig(config *rabbitmq.PoolConfig) (*Pool, error)
- type PoolConsumer
Constants ¶
const ( ErrQueueExclusiveUseCode = amqp.AccessRefused ErrQueueExclusiveUseReasonTemplate = `ACCESS_REFUSED - queue '%s' in vhost '%s' in exclusive use` ErrChannelOrConnectionClosedCode = amqp.ChannelError ErrChannelOrConnectionClosedReason = `channel/connection is not open` )
const ( DefaultMaxConnections = 20 DefaultInitConnections = 5 DefaultMaxIdleConnections = 10 DefaultMaxIdleTime = 1800 // seconds DefaultMaxWaitTime = 1 // seconds DefaultMaxRetryCount = -1 DefaultKeepAliveInterval = 300 // seconds DefaultKeepAliveChunkSize = 5 DefaultSleepTime = 1 // seconds DefaultFreeChanLengthTimes = 2 DefaultUnlimitedWaitTime = -1 // seconds DefaultUnlimitedRetryCount = -1 DefaultDelayTime = 5 // milliseconds )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
func NewConsumer ¶
NewConsumer returns a new *Consumer
func NewConsumerWithConfig ¶
NewConsumerWithConfig returns a new *Consumer with given config
func (*Consumer) Channel ¶
Channel returns the amqp channel, if the channel of the consumer is nil or had been closed, a new channel will be opened, otherwise the existing channel will be returned
func (*Consumer) Disconnect ¶
Disconnect disconnects the rabbitmq server
func (*Consumer) ExchangeDeclare ¶
ExchangeDeclare declares an exchange
func (*Consumer) IsChannelOrConnectionClosedError ¶
IsNotFoundQueueError returns true if the error is channel or connection closed error
func (*Consumer) IsExclusiveUseError ¶
IsExclusiveUseError returns true if the error is exclusive use error
func (*Consumer) Qos ¶
Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks.
func (*Consumer) QueueDeclare ¶
QueueDeclare declares a queue
type Pool ¶
type Pool struct { sync.Mutex *rabbitmq.PoolConfig // contains filtered or unexported fields }
func NewPool ¶
func NewPool(addr, user, host, vhost, tagPrefix, exchange, queue, key string, maxConnections, initConnections, maxIdleConnections, maxIdleTime, maxWaitTime, maxRetryCount, keepAliveInterval int) (*Pool, error)
NewPool returns a new *Pool
func NewPoolWithConfig ¶
func NewPoolWithConfig(config *rabbitmq.Config, maxConnections, initConnections, maxIdleConnections, maxIdleTime, maxWaitTime, maxRetryCount, keepAliveInterval int) (*Pool, error)
NewPoolWithConfig returns a new *Pool with a Config object
func NewPoolWithDefault ¶
func NewPoolWithDefault(addr, user, host, vhost, tagPrefix, exchange, queue, key string) (*Pool, error)
NewPoolWithDefault returns a new *Pool with default configuration
func NewPoolWithPoolConfig ¶
func NewPoolWithPoolConfig(config *rabbitmq.PoolConfig) (*Pool, error)
NewPoolWithPoolConfig returns a new *Pool with a PoolConfig object
func (*Pool) Get ¶
func (p *Pool) Get() (*PoolConsumer, error)
Get is an exported alias of get() function with routine safe
func (*Pool) GetFullTag ¶ added in v0.3.19
GetFullTag gets the full tag
func (*Pool) UsedConnections ¶
UsedConnections returns used connection number
type PoolConsumer ¶
func NewPoolConsumer ¶
func NewPoolConsumer(pool *Pool) (*PoolConsumer, error)
NewPoolConsumer returns a new *PoolConsumer
func (*PoolConsumer) Close ¶
func (pp *PoolConsumer) Close() error
Close closes the channel and returns the Consumer back to the pool
func (*PoolConsumer) Disconnect ¶
func (pp *PoolConsumer) Disconnect() error
Disconnect disconnects from rabbitmq, normally when using connection pool, there is no need to disconnect manually, consider to use Close() instead.
func (*PoolConsumer) GetTag ¶ added in v0.3.19
func (pp *PoolConsumer) GetTag() string
GetTag returns the tag of the pool consumer, should use this method to get the tag instead of using the tag property of the pool config, because the tag of the pool config is the prefix of the tag of the pool consumer
func (*PoolConsumer) IsValid ¶
func (pp *PoolConsumer) IsValid() bool
IsValid validates if connection is valid