amqpx

package
v0.0.0-...-619fe92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

TODO: publish retry TODO: 重连是否加锁

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

func (*Channel) Cancel

func (c *Channel) Cancel(consumerTag string, noWait bool) error

func (*Channel) Close

func (c *Channel) Close() error

func (*Channel) Consume

func (c *Channel) Consume(queue, consumerTag string, autoAck, exclusive, noLocal bool, args amqp.Table, consumer Consumer) error

func (*Channel) ConsumeNowait

func (c *Channel) ConsumeNowait(queue, consumerTag string, autoAck, exclusive, noLocal bool, args amqp.Table, consumer Consumer) error

func (*Channel) ExchangeDeclare

func (c *Channel) ExchangeDeclare(name string, kind string, durable bool, autoDelete bool, internal bool, args amqp.Table) error

func (*Channel) ExchangeDeclareNoWait

func (c *Channel) ExchangeDeclareNoWait(name string, kind string, durable bool, autoDelete bool, internal bool, args amqp.Table) error

func (*Channel) ExchangeDelete

func (c *Channel) ExchangeDelete(name string, ifUnused bool) error

func (*Channel) ExchangeDeleteNoWait

func (c *Channel) ExchangeDeleteNoWait(name string, ifUnused bool) error

func (*Channel) PublishWithContext

func (c *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error

func (*Channel) PublishWithDeferredConfirmWithContext

func (c *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

func (*Channel) Qos

func (c *Channel) Qos(prefetchCount, prefetchSize int, global bool) error

func (*Channel) QueueBind

func (c *Channel) QueueBind(name, key, exchange string, args amqp.Table) error

func (*Channel) QueueBindNoWait

func (c *Channel) QueueBindNoWait(name, key, exchange string, args amqp.Table) error

func (*Channel) QueueDeclare

func (c *Channel) QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, args amqp.Table) (amqp.Queue, error)

func (*Channel) QueueDeclareNoWait

func (c *Channel) QueueDeclareNoWait(name string, durable bool, autoDelete bool, exclusive bool, args amqp.Table) (amqp.Queue, error)

func (*Channel) QueueDelete

func (c *Channel) QueueDelete(name string, ifUnused, ifEmpty bool) (int, error)

func (*Channel) QueueDeleteNoWait

func (c *Channel) QueueDeleteNoWait(name string, ifUnused, ifEmpty bool) (int, error)

func (*Channel) QueueUnbind

func (c *Channel) QueueUnbind(name, key, exchange string, args amqp.Table) error

type Connection

type Connection struct {
	*amqp.Connection
	// contains filtered or unexported fields
}

func Dial

func Dial(url string, opts ...ConnectionOption) (*Connection, error)

func (*Connection) Channel

func (c *Connection) Channel() (*Channel, error)

Channel 创建一个新的Channel 一个Connection可以创建多个Channel Channel是一个轻量级的Connection,可以用来发送和接收消息 不建议在多个goroutine中共享一个Channel https://www.rabbitmq.com/docs/channels#basics

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) IsClosed

func (c *Connection) IsClosed() bool

type ConnectionOption

type ConnectionOption func(*Connection)

func WithLogger

func WithLogger(logger *slog.Logger) ConnectionOption

func WithRecoveryBackoff

func WithRecoveryBackoff(backoff RecoveryBackoff) ConnectionOption

func WithRecoveryMaxAttempts

func WithRecoveryMaxAttempts(maxAttempts int) ConnectionOption

type Consumer

type Consumer interface {
	HandleDelivery(delivery amqp.Delivery)
}

type CustomBackoff

type CustomBackoff struct {
	// contains filtered or unexported fields
}

func (*CustomBackoff) Delay

func (e *CustomBackoff) Delay(attempts int) time.Duration

type ExponentialBackoff

type ExponentialBackoff struct {
	// contains filtered or unexported fields
}

func (*ExponentialBackoff) Delay

func (e *ExponentialBackoff) Delay(attempts int) time.Duration

type ExponentialRandomBackoff

type ExponentialRandomBackoff struct {
	// contains filtered or unexported fields
}

func (*ExponentialRandomBackoff) Delay

func (e *ExponentialRandomBackoff) Delay(attempts int) time.Duration

type FixedBackoff

type FixedBackoff struct {
	// contains filtered or unexported fields
}

func (*FixedBackoff) Delay

func (f *FixedBackoff) Delay(attempts int) time.Duration

type RecordedBinding

type RecordedBinding struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	Args         amqp.Table
}

func (*RecordedBinding) Equal

func (rb *RecordedBinding) Equal(another *RecordedBinding) bool

type RecordedConsumer

type RecordedConsumer struct {
	QueueName   string
	ConsumerTag string
	AutoAck     bool
	Exclusive   bool
	NoLocal     bool
	NoWait      bool
	Args        amqp.Table
	Consumer    Consumer
}

type RecordedExchange

type RecordedExchange struct {
	Kind       string
	Durable    bool
	AutoDelete bool
	Args       amqp.Table
}

type RecordedQueue

type RecordedQueue struct {
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	Args       amqp.Table
}

type RecoveryBackoff

type RecoveryBackoff interface {
	// attempts 从0开始
	Delay(attempts int) time.Duration
}

func NewCustomBackoff

func NewCustomBackoff(intervals ...time.Duration) RecoveryBackoff

func NewExponentialBackoff

func NewExponentialBackoff(initialInterval time.Duration, multiplier float64) RecoveryBackoff

func NewExponentialRandomBackoff

func NewExponentialRandomBackoff(initialInterval time.Duration, multiplier float64) RecoveryBackoff

func NewFixedBackoff

func NewFixedBackoff(interval time.Duration) RecoveryBackoff

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL