producer

package
v0.3.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxConnections      = 20
	DefaultInitConnections     = 5
	DefaultMaxIdleConnections  = 10
	DefaultMaxIdleTime         = 1800 // seconds
	DefaultMaxWaitTime         = 1    // seconds
	DefaultMaxRetryCount       = -1
	DefaultKeepAliveInterval   = 300 // seconds
	DefaultKeepAliveChunkSize  = 5
	DefaultSleepTime           = 1 // seconds
	DefaultFreeChanLengthTimes = 2

	DefaultUnlimitedWaitTime   = -1 // seconds
	DefaultUnlimitedRetryCount = -1
	DefaultDelayTime           = 5 // milliseconds
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Pool

type Pool struct {
	sync.Mutex
	*rabbitmq.PoolConfig
	// contains filtered or unexported fields
}

func NewPool

func NewPool(addr, user, host, vhost, tag, exchange, queue, key string,
	maxConnections, initConnections, maxIdleConnections, maxIdleTime, maxWaitTime, maxRetryCount, keepAliveInterval int) (*Pool, error)

NewPool returns a new *Pool

func NewPoolWithConfig

func NewPoolWithConfig(config *rabbitmq.Config, maxConnections, initConnections,
	maxIdleConnections, maxIdleTime, maxWaitTime, maxRetryCount, keepAliveInterval int) (*Pool, error)

NewPoolWithConfig returns a new *Pool with a Config object

func NewPoolWithDefault

func NewPoolWithDefault(addr, user, host, vhost, tag, exchange, queue, key string) (*Pool, error)

NewPoolWithDefault returns a new *Pool with default configuration

func NewPoolWithPoolConfig

func NewPoolWithPoolConfig(config *rabbitmq.PoolConfig) (*Pool, error)

NewPoolWithPoolConfig returns a new *Pool with a PoolConfig object

func (*Pool) Close

func (p *Pool) Close() error

Close releases each connection in the pool

func (*Pool) Get

func (p *Pool) Get() (*PoolProducer, error)

Get is an exported alias of get() function with routine safe

func (*Pool) GetFullTag added in v0.3.19

func (p *Pool) GetFullTag() string

GetFullTag gets the full tag

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed returns if pool had been closed

func (*Pool) Release

func (p *Pool) Release(num int) error

Release is an exported alias of release() function

func (*Pool) Supply

func (p *Pool) Supply(num int) error

Supply is an exported alias of supply() function with routine safe

func (*Pool) UsedConnections

func (p *Pool) UsedConnections() int

UsedConnections returns used connection number

type PoolProducer

type PoolProducer struct {
	*Producer
	Pool *Pool
}

func NewPoolProducer

func NewPoolProducer(pool *Pool) (*PoolProducer, error)

NewPoolProducer returns a new *PoolProducer

func (*PoolProducer) Close

func (pp *PoolProducer) Close() error

Close closes the channel and returns the producer back to the pool

func (*PoolProducer) Disconnect

func (pp *PoolProducer) Disconnect() error

Disconnect disconnects from rabbitmq, normally when using connection pool, there is no need to disconnect manually, consider to use Close() instead.

func (*PoolProducer) GetTag added in v0.3.19

func (pp *PoolProducer) GetTag() string

GetTag returns the tag of the pool producer, should use this method to get the tag instead of using the tag property of the pool config, because the tag of the pool config is the prefix of the tag of the pool producer

func (*PoolProducer) IsValid

func (pp *PoolProducer) IsValid() bool

IsValid validates if connection is valid

type Producer

type Producer struct {
	Conn  *client.Conn
	Chan  *amqp.Channel
	Queue amqp.Queue
}

func NewProducer

func NewProducer(addr, user, pass, vhost, tag, exchange, queue, key string) (*Producer, error)

NewProducer returns a new *Producer

func NewProducerWithConfig

func NewProducerWithConfig(config *rabbitmq.Config) (*Producer, error)

NewProducerWithConfig returns a new *Producer with given config

func (*Producer) BuildMessage

func (p *Producer) BuildMessage(contentType, message string) amqp.Publishing

BuildMessage builds an amqp.Publishing with given content type and message

func (*Producer) BuildMessageWithExpiration

func (p *Producer) BuildMessageWithExpiration(contentType, message string, expiration int) amqp.Publishing

BuildMessageWithExpiration builds an amqp.Publishing with given content type and message and expiration

func (*Producer) Channel

func (p *Producer) Channel() (*amqp.Channel, error)

Channel returns the channel, it the channel of the producer is nil or had been closed, a new channel will be opened

func (*Producer) Close

func (p *Producer) Close() error

Close closes the channel

func (*Producer) Disconnect

func (p *Producer) Disconnect() error

Disconnect disconnects the rabbitmq server

func (*Producer) ExchangeDeclare

func (p *Producer) ExchangeDeclare(kind string) error

ExchangeDeclare declares an exchange

func (*Producer) Publish

func (p *Producer) Publish(msg amqp.Publishing) error

Publish publishes a message to an exchange

func (*Producer) PublishJSON

func (p *Producer) PublishJSON(message string) error

PublishMessage publishes a json message to an exchange

func (*Producer) PublishWithContext

func (p *Producer) PublishWithContext(ctx context.Context, msg amqp.Publishing) error

PublishWithContext publishes a message to an exchange with context

func (*Producer) QueueBind

func (p *Producer) QueueBind() error

QueueBind binds a queue to an exchange

func (*Producer) QueueDeclare

func (p *Producer) QueueDeclare() error

QueueDeclare declares a queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL