Versions in this module Expand all Collapse all v1 v1.12.1 Dec 19, 2024 v1.12.0 Dec 19, 2024 Changes in this version + const DefaultURL + var ErrClosed = amqp.ErrClosed + type Connection struct + func NewConnection(url string, opts ...ConnectionOption) (*Connection, error) + func (c *Connection) CheckConnected() bool + func (c *Connection) Close() + type ConnectionOption func(*connectionOptions) + func WithLogger(zapLog *zap.Logger) ConnectionOption + func WithReconnectTime(d time.Duration) ConnectionOption + func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption + type ConsumeOption func(*consumeOptions) + func WithConsumeArgs(args map[string]interface{}) ConsumeOption + func WithConsumeConsumer(consumer string) ConsumeOption + func WithConsumeExclusive(enable bool) ConsumeOption + func WithConsumeNoLocal(enable bool) ConsumeOption + func WithConsumeNoWait(enable bool) ConsumeOption + type Consumer struct + Exchange *Exchange + QueueName string + func NewConsumer(exchange *Exchange, queueName string, connection *Connection, ...) (*Consumer, error) + func (c *Consumer) Close() + func (c *Consumer) Consume(ctx context.Context, handler Handler) + func (c *Consumer) Count() int64 + type ConsumerOption func(*consumerOptions) + func WithConsumerAutoAck(enable bool) ConsumerOption + func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption + func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption + func WithConsumerPersistent(enable bool) ConsumerOption + func WithConsumerQosOptions(opts ...QosOption) ConsumerOption + func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption + func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption + type DeadLetterOption func(*deadLetterOptions) + func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption + func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption + func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption + func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption + type DelayedMessagePublishOption func(*delayedMessagePublishOptions) + func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption + func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption + type Exchange struct + func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange + func NewDirectExchange(exchangeName string, routingKey string) *Exchange + func NewFanoutExchange(exchangeName string) *Exchange + func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange + func NewTopicExchange(exchangeName string, routingKey string) *Exchange + func (e *Exchange) DelayedMessageType() string + func (e *Exchange) HeadersKeys() map[string]interface{} + func (e *Exchange) Name() string + func (e *Exchange) RoutingKey() string + func (e *Exchange) Type() string + type ExchangeDeclareOption func(*exchangeDeclareOptions) + func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption + func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption + func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption + func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption + type Handler func(ctx context.Context, data []byte, tagID string) error + type HeadersType = string + const HeadersTypeAll + const HeadersTypeAny + type Producer struct + Exchange *Exchange + QueueName string + func NewProducer(exchange *Exchange, queueName string, connection *Connection, ...) (*Producer, error) + func (p *Producer) Close() + func (p *Producer) ExchangeArgs() amqp.Table + func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, ...) error + func (p *Producer) PublishDirect(ctx context.Context, body []byte) error + func (p *Producer) PublishFanout(ctx context.Context, body []byte) error + func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error + func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error + func (p *Producer) QueueArgs() amqp.Table + func (p *Producer) QueueBindArgs() amqp.Table + type ProducerOption func(*producerOptions) + func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption + func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption + func WithProducerMandatory(enable bool) ProducerOption + func WithProducerPersistent(enable bool) ProducerOption + func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption + func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption + func WithPublisherConfirm() ProducerOption + type Publisher struct + func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error) + func (p *Publisher) Close() + func (p *Publisher) Publish(ctx context.Context, body []byte) error + type QosOption func(*qosOptions) + func WithQosEnable() QosOption + func WithQosPrefetchCount(count int) QosOption + func WithQosPrefetchGlobal(enable bool) QosOption + func WithQosPrefetchSize(size int) QosOption + type QueueBindOption func(*queueBindOptions) + func WithQueueBindArgs(args map[string]interface{}) QueueBindOption + func WithQueueBindNoWait(enable bool) QueueBindOption + type QueueDeclareOption func(*queueDeclareOptions) + func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption + func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption + func WithQueueDeclareExclusive(enable bool) QueueDeclareOption + func WithQueueDeclareNoWait(enable bool) QueueDeclareOption + type Subscriber struct + func NewSubscriber(channelName string, identifier string, connection *Connection, ...) (*Subscriber, error) + func (s *Subscriber) Close() + func (s *Subscriber) Subscribe(ctx context.Context, handler Handler)