Documentation ¶
Overview ¶
Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go
producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message. publisher and subscriber using the fanout message type.
Index ¶
- Constants
- Variables
- type Connection
- type ConnectionOption
- type ConsumeOption
- type Consumer
- type ConsumerOption
- func WithConsumerAutoAck(enable bool) ConsumerOption
- func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption
- func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption
- func WithConsumerPersistent(enable bool) ConsumerOption
- func WithConsumerQosOptions(opts ...QosOption) ConsumerOption
- func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption
- func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption
- type DeadLetterOption
- func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption
- func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption
- func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption
- func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption
- type DelayedMessagePublishOption
- type Exchange
- func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange
- func NewDirectExchange(exchangeName string, routingKey string) *Exchange
- func NewFanoutExchange(exchangeName string) *Exchange
- func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange
- func NewTopicExchange(exchangeName string, routingKey string) *Exchange
- type ExchangeDeclareOption
- type Handler
- type HeadersType
- type Producer
- func (p *Producer) Close()
- func (p *Producer) ExchangeArgs() amqp.Table
- func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, ...) error
- func (p *Producer) PublishDirect(ctx context.Context, body []byte) error
- func (p *Producer) PublishFanout(ctx context.Context, body []byte) error
- func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error
- func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error
- func (p *Producer) QueueArgs() amqp.Table
- func (p *Producer) QueueBindArgs() amqp.Table
- type ProducerOption
- func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption
- func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption
- func WithProducerMandatory(enable bool) ProducerOption
- func WithProducerPersistent(enable bool) ProducerOption
- func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption
- func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption
- type Publisher
- type QosOption
- type QueueBindOption
- type QueueDeclareOption
- type Subscriber
Constants ¶
const DefaultURL = "amqp://guest:guest@localhost:5672/"
DefaultURL default rabbitmq url
Variables ¶
var ErrClosed = amqp.ErrClosed
ErrClosed closed
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection rabbitmq connection
func NewConnection ¶
func NewConnection(url string, opts ...ConnectionOption) (*Connection, error)
NewConnection rabbitmq connection
func (*Connection) CheckConnected ¶
func (c *Connection) CheckConnected() bool
CheckConnected rabbitmq connection
type ConnectionOption ¶
type ConnectionOption func(*connectionOptions)
ConnectionOption connection option.
func WithLogger ¶
func WithLogger(zapLog *zap.Logger) ConnectionOption
WithLogger set logger option.
func WithReconnectTime ¶
func WithReconnectTime(d time.Duration) ConnectionOption
WithReconnectTime set reconnect time interval option.
func WithTLSConfig ¶
func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption
WithTLSConfig set tls config option.
type ConsumeOption ¶ added in v1.5.6
type ConsumeOption func(*consumeOptions)
ConsumeOption consume option.
func WithConsumeArgs ¶ added in v1.5.6
func WithConsumeArgs(args map[string]interface{}) ConsumeOption
WithConsumeArgs set consume args option.
func WithConsumeConsumer ¶ added in v1.5.6
func WithConsumeConsumer(consumer string) ConsumeOption
WithConsumeConsumer set consume consumer option.
func WithConsumeExclusive ¶ added in v1.5.6
func WithConsumeExclusive(enable bool) ConsumeOption
WithConsumeExclusive set consume exclusive option.
func WithConsumeNoLocal ¶ added in v1.5.6
func WithConsumeNoLocal(enable bool) ConsumeOption
WithConsumeNoLocal set consume noLocal option.
func WithConsumeNoWait ¶ added in v1.5.6
func WithConsumeNoWait(enable bool) ConsumeOption
WithConsumeNoWait set consume no wait option.
type Consumer ¶ added in v1.5.6
type Consumer struct { Exchange *Exchange QueueName string // contains filtered or unexported fields }
Consumer session
func NewConsumer ¶ added in v1.5.6
func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error)
NewConsumer create a consumer
type ConsumerOption ¶ added in v1.5.6
type ConsumerOption func(*consumerOptions)
ConsumerOption consumer option.
func WithConsumerAutoAck ¶ added in v1.5.6
func WithConsumerAutoAck(enable bool) ConsumerOption
WithConsumerAutoAck set consumer auto ack option, if false, manual ACK required.
func WithConsumerConsumeOptions ¶ added in v1.5.6
func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption
WithConsumerConsumeOptions set consumer consume option.
func WithConsumerExchangeDeclareOptions ¶ added in v1.5.6
func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption
WithConsumerExchangeDeclareOptions set exchange declare option.
func WithConsumerPersistent ¶ added in v1.5.6
func WithConsumerPersistent(enable bool) ConsumerOption
WithConsumerPersistent set consumer persistent option.
func WithConsumerQosOptions ¶ added in v1.5.6
func WithConsumerQosOptions(opts ...QosOption) ConsumerOption
WithConsumerQosOptions set consume qos option.
func WithConsumerQueueBindOptions ¶ added in v1.5.6
func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption
WithConsumerQueueBindOptions set queue bind option.
func WithConsumerQueueDeclareOptions ¶ added in v1.5.6
func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption
WithConsumerQueueDeclareOptions set queue declare option.
type DeadLetterOption ¶ added in v1.8.2
type DeadLetterOption func(*deadLetterOptions)
DeadLetterOption declare dead letter option.
func WithDeadLetter ¶ added in v1.8.2
func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption
WithDeadLetter set dead letter exchange, queue, routing key.
func WithDeadLetterExchangeDeclareOptions ¶ added in v1.8.2
func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption
WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.
func WithDeadLetterQueueBindOptions ¶ added in v1.8.2
func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption
WithDeadLetterQueueBindOptions set dead letter queue bind option.
func WithDeadLetterQueueDeclareOptions ¶ added in v1.8.2
func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption
WithDeadLetterQueueDeclareOptions set dead letter queue declare option.
type DelayedMessagePublishOption ¶ added in v1.5.6
type DelayedMessagePublishOption func(*delayedMessagePublishOptions)
DelayedMessagePublishOption declare queue bind option.
func WithDelayedMessagePublishHeadersKeys ¶ added in v1.5.6
func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption
WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.
func WithDelayedMessagePublishTopicKey ¶ added in v1.5.6
func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption
WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.
type Exchange ¶ added in v1.5.6
type Exchange struct {
// contains filtered or unexported fields
}
Exchange rabbitmq minimum management unit
func NewDelayedMessageExchange ¶ added in v1.5.6
NewDelayedMessageExchange create a delayed message exchange
func NewDirectExchange ¶ added in v1.5.6
NewDirectExchange create a direct exchange
func NewFanoutExchange ¶ added in v1.5.6
NewFanoutExchange create a fanout exchange
func NewHeadersExchange ¶ added in v1.5.6
func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange
NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"
func NewTopicExchange ¶ added in v1.5.6
NewTopicExchange create a topic exchange
func (*Exchange) DelayedMessageType ¶ added in v1.5.6
DelayedMessageType exchange delayed message type
func (*Exchange) HeadersKeys ¶ added in v1.5.6
HeadersKeys exchange headers keys
func (*Exchange) RoutingKey ¶ added in v1.5.6
RoutingKey exchange routing key
type ExchangeDeclareOption ¶ added in v1.5.6
type ExchangeDeclareOption func(*exchangeDeclareOptions)
ExchangeDeclareOption declare exchange option.
func WithExchangeDeclareArgs ¶ added in v1.5.6
func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption
WithExchangeDeclareArgs set exchange declare args option.
func WithExchangeDeclareAutoDelete ¶ added in v1.5.6
func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption
WithExchangeDeclareAutoDelete set exchange declare auto delete option.
func WithExchangeDeclareInternal ¶ added in v1.5.6
func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption
WithExchangeDeclareInternal set exchange declare internal option.
func WithExchangeDeclareNoWait ¶ added in v1.5.6
func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption
WithExchangeDeclareNoWait set exchange declare no wait option.
type HeadersType ¶ added in v1.5.6
type HeadersType = string
HeadersType headers type
const ( // HeadersTypeAll all HeadersTypeAll HeadersType = "all" // HeadersTypeAny any HeadersTypeAny HeadersType = "any" )
type Producer ¶ added in v1.5.6
type Producer struct { Exchange *Exchange // exchange QueueName string // queue name // contains filtered or unexported fields }
Producer session
func NewProducer ¶ added in v1.5.6
func NewProducer(exchange *Exchange, queueName string, connection *Connection, opts ...ProducerOption) (*Producer, error)
NewProducer create a producer
func (*Producer) ExchangeArgs ¶ added in v1.8.2
ExchangeArgs returns the exchange declare args.
func (*Producer) PublishDelayedMessage ¶ added in v1.5.6
func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, opts ...DelayedMessagePublishOption) error
PublishDelayedMessage send delayed type message
func (*Producer) PublishDirect ¶ added in v1.5.6
PublishDirect send direct type message
func (*Producer) PublishFanout ¶ added in v1.5.6
PublishFanout send fanout type message
func (*Producer) PublishHeaders ¶ added in v1.5.6
func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error
PublishHeaders send headers type message
func (*Producer) PublishTopic ¶ added in v1.5.6
PublishTopic send topic type message
func (*Producer) QueueBindArgs ¶ added in v1.8.2
QueueBindArgs returns the queue bind args.
type ProducerOption ¶ added in v1.5.6
type ProducerOption func(*producerOptions)
ProducerOption producer option.
func WithDeadLetterOptions ¶ added in v1.8.2
func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption
WithDeadLetterOptions set dead letter options.
func WithProducerExchangeDeclareOptions ¶ added in v1.5.6
func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption
WithProducerExchangeDeclareOptions set exchange declare option.
func WithProducerMandatory ¶ added in v1.5.6
func WithProducerMandatory(enable bool) ProducerOption
WithProducerMandatory set producer mandatory option.
func WithProducerPersistent ¶ added in v1.5.6
func WithProducerPersistent(enable bool) ProducerOption
WithProducerPersistent set producer persistent option.
func WithProducerQueueBindOptions ¶ added in v1.5.6
func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption
WithProducerQueueBindOptions set queue bind option.
func WithProducerQueueDeclareOptions ¶ added in v1.5.6
func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption
WithProducerQueueDeclareOptions set queue declare option.
type Publisher ¶ added in v1.5.6
type Publisher struct {
*Producer
}
Publisher session
func NewPublisher ¶ added in v1.5.6
func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error)
NewPublisher create a publisher, channelName is exchange name
type QosOption ¶ added in v1.5.6
type QosOption func(*qosOptions)
QosOption qos option.
func WithQosEnable ¶ added in v1.5.6
func WithQosEnable() QosOption
WithQosEnable set qos enable option.
func WithQosPrefetchCount ¶ added in v1.5.6
WithQosPrefetchCount set qos prefetch count option.
func WithQosPrefetchGlobal ¶ added in v1.5.6
WithQosPrefetchGlobal set qos global option.
func WithQosPrefetchSize ¶ added in v1.5.6
WithQosPrefetchSize set qos prefetch size option.
type QueueBindOption ¶ added in v1.5.6
type QueueBindOption func(*queueBindOptions)
QueueBindOption declare queue bind option.
func WithQueueBindArgs ¶ added in v1.5.6
func WithQueueBindArgs(args map[string]interface{}) QueueBindOption
WithQueueBindArgs set queue bind args option.
func WithQueueBindNoWait ¶ added in v1.5.6
func WithQueueBindNoWait(enable bool) QueueBindOption
WithQueueBindNoWait set queue bind no wait option.
type QueueDeclareOption ¶ added in v1.5.6
type QueueDeclareOption func(*queueDeclareOptions)
QueueDeclareOption declare queue option.
func WithQueueDeclareArgs ¶ added in v1.5.6
func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption
WithQueueDeclareArgs set queue declare args option.
func WithQueueDeclareAutoDelete ¶ added in v1.5.6
func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption
WithQueueDeclareAutoDelete set queue declare auto delete option.
func WithQueueDeclareExclusive ¶ added in v1.5.6
func WithQueueDeclareExclusive(enable bool) QueueDeclareOption
WithQueueDeclareExclusive set queue declare exclusive option.
func WithQueueDeclareNoWait ¶ added in v1.5.6
func WithQueueDeclareNoWait(enable bool) QueueDeclareOption
WithQueueDeclareNoWait set queue declare no wait option.
type Subscriber ¶ added in v1.5.6
type Subscriber struct {
*Consumer
}
Subscriber session
func NewSubscriber ¶ added in v1.5.6
func NewSubscriber(channelName string, identifier string, connection *Connection, opts ...ConsumerOption) (*Subscriber, error)
NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name