Documentation ¶
Overview ¶
Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go
producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message. publisher and subscriber using the fanout message type.
Index ¶
- Constants
- Variables
- type Connection
- type ConnectionOption
- type ConsumeOption
- type Consumer
- type ConsumerOption
- func WithConsumerAutoAck(enable bool) ConsumerOption
- func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption
- func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption
- func WithConsumerPersistent(enable bool) ConsumerOption
- func WithConsumerQosOptions(opts ...QosOption) ConsumerOption
- func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption
- func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption
- type DeadLetterOption
- func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption
- func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption
- func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption
- func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption
- type DelayedMessagePublishOption
- type Exchange
- func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange
- func NewDirectExchange(exchangeName string, routingKey string) *Exchange
- func NewFanoutExchange(exchangeName string) *Exchange
- func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange
- func NewTopicExchange(exchangeName string, routingKey string) *Exchange
- type ExchangeDeclareOption
- type Handler
- type HeadersType
- type Producer
- func (p *Producer) Close()
- func (p *Producer) ExchangeArgs() amqp.Table
- func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, ...) error
- func (p *Producer) PublishDirect(ctx context.Context, body []byte) error
- func (p *Producer) PublishFanout(ctx context.Context, body []byte) error
- func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error
- func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error
- func (p *Producer) QueueArgs() amqp.Table
- func (p *Producer) QueueBindArgs() amqp.Table
- type ProducerOption
- func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption
- func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption
- func WithProducerMandatory(enable bool) ProducerOption
- func WithProducerPersistent(enable bool) ProducerOption
- func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption
- func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption
- type Publisher
- type QosOption
- type QueueBindOption
- type QueueDeclareOption
- type Subscriber
Constants ¶
const DefaultURL = "amqp://guest:guest@localhost:5672/"
DefaultURL default rabbitmq url
Variables ¶
var ErrClosed = amqp.ErrClosed
ErrClosed closed
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection rabbitmq connection
func NewConnection ¶
func NewConnection(url string, opts ...ConnectionOption) (*Connection, error)
NewConnection rabbitmq connection
func (*Connection) CheckConnected ¶
func (c *Connection) CheckConnected() bool
CheckConnected rabbitmq connection
type ConnectionOption ¶
type ConnectionOption func(*connectionOptions)
ConnectionOption connection option.
func WithLogger ¶
func WithLogger(zapLog *zap.Logger) ConnectionOption
WithLogger set logger option.
func WithReconnectTime ¶
func WithReconnectTime(d time.Duration) ConnectionOption
WithReconnectTime set reconnect time interval option.
func WithTLSConfig ¶
func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption
WithTLSConfig set tls config option.
type ConsumeOption ¶
type ConsumeOption func(*consumeOptions)
ConsumeOption consume option.
func WithConsumeArgs ¶
func WithConsumeArgs(args map[string]interface{}) ConsumeOption
WithConsumeArgs set consume args option.
func WithConsumeConsumer ¶
func WithConsumeConsumer(consumer string) ConsumeOption
WithConsumeConsumer set consume consumer option.
func WithConsumeExclusive ¶
func WithConsumeExclusive(enable bool) ConsumeOption
WithConsumeExclusive set consume exclusive option.
func WithConsumeNoLocal ¶
func WithConsumeNoLocal(enable bool) ConsumeOption
WithConsumeNoLocal set consume noLocal option.
func WithConsumeNoWait ¶
func WithConsumeNoWait(enable bool) ConsumeOption
WithConsumeNoWait set consume no wait option.
type Consumer ¶
type Consumer struct { Exchange *Exchange QueueName string // contains filtered or unexported fields }
Consumer session
func NewConsumer ¶
func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error)
NewConsumer create a consumer
type ConsumerOption ¶
type ConsumerOption func(*consumerOptions)
ConsumerOption consumer option.
func WithConsumerAutoAck ¶
func WithConsumerAutoAck(enable bool) ConsumerOption
WithConsumerAutoAck set consumer auto ack option, if false, manual ACK required.
func WithConsumerConsumeOptions ¶
func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption
WithConsumerConsumeOptions set consumer consume option.
func WithConsumerExchangeDeclareOptions ¶
func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption
WithConsumerExchangeDeclareOptions set exchange declare option.
func WithConsumerPersistent ¶
func WithConsumerPersistent(enable bool) ConsumerOption
WithConsumerPersistent set consumer persistent option.
func WithConsumerQosOptions ¶
func WithConsumerQosOptions(opts ...QosOption) ConsumerOption
WithConsumerQosOptions set consume qos option.
func WithConsumerQueueBindOptions ¶
func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption
WithConsumerQueueBindOptions set queue bind option.
func WithConsumerQueueDeclareOptions ¶
func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption
WithConsumerQueueDeclareOptions set queue declare option.
type DeadLetterOption ¶
type DeadLetterOption func(*deadLetterOptions)
DeadLetterOption declare dead letter option.
func WithDeadLetter ¶
func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption
WithDeadLetter set dead letter exchange, queue, routing key.
func WithDeadLetterExchangeDeclareOptions ¶
func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption
WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.
func WithDeadLetterQueueBindOptions ¶
func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption
WithDeadLetterQueueBindOptions set dead letter queue bind option.
func WithDeadLetterQueueDeclareOptions ¶
func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption
WithDeadLetterQueueDeclareOptions set dead letter queue declare option.
type DelayedMessagePublishOption ¶
type DelayedMessagePublishOption func(*delayedMessagePublishOptions)
DelayedMessagePublishOption declare queue bind option.
func WithDelayedMessagePublishHeadersKeys ¶
func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption
WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.
func WithDelayedMessagePublishTopicKey ¶
func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption
WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
Exchange rabbitmq minimum management unit
func NewDelayedMessageExchange ¶
NewDelayedMessageExchange create a delayed message exchange
func NewDirectExchange ¶
NewDirectExchange create a direct exchange
func NewFanoutExchange ¶
NewFanoutExchange create a fanout exchange
func NewHeadersExchange ¶
func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange
NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"
func NewTopicExchange ¶
NewTopicExchange create a topic exchange
func (*Exchange) DelayedMessageType ¶
DelayedMessageType exchange delayed message type
func (*Exchange) HeadersKeys ¶
HeadersKeys exchange headers keys
func (*Exchange) RoutingKey ¶
RoutingKey exchange routing key
type ExchangeDeclareOption ¶
type ExchangeDeclareOption func(*exchangeDeclareOptions)
ExchangeDeclareOption declare exchange option.
func WithExchangeDeclareArgs ¶
func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption
WithExchangeDeclareArgs set exchange declare args option.
func WithExchangeDeclareAutoDelete ¶
func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption
WithExchangeDeclareAutoDelete set exchange declare auto delete option.
func WithExchangeDeclareInternal ¶
func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption
WithExchangeDeclareInternal set exchange declare internal option.
func WithExchangeDeclareNoWait ¶
func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption
WithExchangeDeclareNoWait set exchange declare no wait option.
type HeadersType ¶
type HeadersType = string
HeadersType headers type
const ( // HeadersTypeAll all HeadersTypeAll HeadersType = "all" // HeadersTypeAny any HeadersTypeAny HeadersType = "any" )
type Producer ¶
type Producer struct { Exchange *Exchange // exchange QueueName string // queue name // contains filtered or unexported fields }
Producer session
func NewProducer ¶
func NewProducer(exchange *Exchange, queueName string, connection *Connection, opts ...ProducerOption) (*Producer, error)
NewProducer create a producer
func (*Producer) ExchangeArgs ¶
ExchangeArgs returns the exchange declare args.
func (*Producer) PublishDelayedMessage ¶
func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, opts ...DelayedMessagePublishOption) error
PublishDelayedMessage send delayed type message
func (*Producer) PublishDirect ¶
PublishDirect send direct type message
func (*Producer) PublishFanout ¶
PublishFanout send fanout type message
func (*Producer) PublishHeaders ¶
func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error
PublishHeaders send headers type message
func (*Producer) PublishTopic ¶
PublishTopic send topic type message
func (*Producer) QueueBindArgs ¶
QueueBindArgs returns the queue bind args.
type ProducerOption ¶
type ProducerOption func(*producerOptions)
ProducerOption producer option.
func WithDeadLetterOptions ¶
func WithDeadLetterOptions(opts ...DeadLetterOption) ProducerOption
WithDeadLetterOptions set dead letter options.
func WithProducerExchangeDeclareOptions ¶
func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption
WithProducerExchangeDeclareOptions set exchange declare option.
func WithProducerMandatory ¶
func WithProducerMandatory(enable bool) ProducerOption
WithProducerMandatory set producer mandatory option.
func WithProducerPersistent ¶
func WithProducerPersistent(enable bool) ProducerOption
WithProducerPersistent set producer persistent option.
func WithProducerQueueBindOptions ¶
func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption
WithProducerQueueBindOptions set queue bind option.
func WithProducerQueueDeclareOptions ¶
func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption
WithProducerQueueDeclareOptions set queue declare option.
type Publisher ¶
type Publisher struct {
*Producer
}
Publisher session
func NewPublisher ¶
func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error)
NewPublisher create a publisher, channelName is exchange name
type QosOption ¶
type QosOption func(*qosOptions)
QosOption qos option.
func WithQosPrefetchCount ¶
WithQosPrefetchCount set qos prefetch count option.
func WithQosPrefetchGlobal ¶
WithQosPrefetchGlobal set qos global option.
func WithQosPrefetchSize ¶
WithQosPrefetchSize set qos prefetch size option.
type QueueBindOption ¶
type QueueBindOption func(*queueBindOptions)
QueueBindOption declare queue bind option.
func WithQueueBindArgs ¶
func WithQueueBindArgs(args map[string]interface{}) QueueBindOption
WithQueueBindArgs set queue bind args option.
func WithQueueBindNoWait ¶
func WithQueueBindNoWait(enable bool) QueueBindOption
WithQueueBindNoWait set queue bind no wait option.
type QueueDeclareOption ¶
type QueueDeclareOption func(*queueDeclareOptions)
QueueDeclareOption declare queue option.
func WithQueueDeclareArgs ¶
func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption
WithQueueDeclareArgs set queue declare args option.
func WithQueueDeclareAutoDelete ¶
func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption
WithQueueDeclareAutoDelete set queue declare auto delete option.
func WithQueueDeclareExclusive ¶
func WithQueueDeclareExclusive(enable bool) QueueDeclareOption
WithQueueDeclareExclusive set queue declare exclusive option.
func WithQueueDeclareNoWait ¶
func WithQueueDeclareNoWait(enable bool) QueueDeclareOption
WithQueueDeclareNoWait set queue declare no wait option.
type Subscriber ¶
type Subscriber struct {
*Consumer
}
Subscriber session
func NewSubscriber ¶
func NewSubscriber(channelName string, identifier string, connection *Connection, opts ...ConsumerOption) (*Subscriber, error)
NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name