rabbitmq

package
v1.5.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Direct          = "direct"
	Fanout          = "fanout"
	Topic           = "topic"
	Headers         = "headers"
	XDelayedMessage = "x-delayed-message"
)
View Source
const DefaultBatchPullCount = 30

Variables

View Source
var (
	EmptyRoutingKey  = errors.New("empty routing key")
	CancelChannelErr = errors.New("consumer cancel failed")
	CloseConnErr     = errors.New("amqp connection close error")
)
View Source
var DelayTable = amqp.Table{"x-delayed-type": "direct"}

Functions

func BindExchangeQueue added in v1.4.2

func BindExchangeQueue(
	ch *amqp.Channel,
	queueName,
	exchangeName string,
	routingKeys []string,
	exchangeType string) error

BindExchangeQueue 交换机绑定队列.

func Connect added in v1.4.2

func Connect(url string) (*amqp.Connection, error)

Connect 连接.

func ExchangeDeclare added in v1.4.2

func ExchangeDeclare(ch *amqp.Channel, exchange Exchange) error

ExchangeDeclare 声明交换机.

func QueueDeclare added in v1.4.2

func QueueDeclare(ch *amqp.Channel, queue Queue) error

QueueDeclare 声明队列.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Close added in v1.4.2

func (c *Consumer) Close() error

Close 关闭.

func (*Consumer) Consume

func (c *Consumer) Consume(handler MsgHandler)

Consume 消费消息.

func (*Consumer) Init added in v1.4.6

func (c *Consumer) Init() error

func (*Consumer) Watch added in v1.4.4

func (c *Consumer) Watch()

Watch 监听连接断开, 然后重连.

type ConsumerConf added in v1.4.2

type ConsumerConf struct {
	Url            string
	RoutingKeys    []string
	Exchange       Exchange
	Queue          Queue
	BatchPullCount int
	ConsumerTag    string
	AutoAck        bool
	Exclusive      bool
	NoLocal        bool
	NoWait         bool
	Args           amqp.Table
	Logger         log.Logger
}

type Exchange

type Exchange struct {
	Name        string                 `json:"name" yaml:"name"`               // 交换器名
	ExType      string                 `json:"exType" yaml:"exType"`           // 类型
	Persistence bool                   `json:"persistence" yaml:"persistence"` // 持久化
	AutoDeleted bool                   `json:"autoDeleted" yaml:"autoDeleted"` // 是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
	Internal    bool                   `json:"internal" yaml:"internal"`       // 设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
	NoWait      bool                   `json:"noWait" yaml:"noWait"`           // 是否为非阻塞
	Arguments   map[string]interface{} // 额外属性
}

Exchange 交换机.

type Msg added in v1.4.8

type Msg struct {
	Key   string
	Body  []byte
	Delay time.Duration
}

type MsgHandler added in v1.4.2

type MsgHandler func(delivery amqp.Delivery)

type Producer added in v1.4.2

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close added in v1.4.2

func (p *Producer) Close() error

Close 关闭.

func (*Producer) Init added in v1.4.6

func (p *Producer) Init() error

func (*Producer) SendMsg added in v1.4.2

func (p *Producer) SendMsg(ctx context.Context, msg *Msg) error

SendMsg 发送消息.

func (*Producer) Setup added in v1.4.4

func (p *Producer) Setup() error

func (*Producer) Watch added in v1.4.4

func (p *Producer) Watch()

Watch 监听连接断开, 然后重连.

type ProducerConf added in v1.4.2

type ProducerConf struct {
	Url      string
	Exchange Exchange
	Logger   log.Logger
}

type Queue

type Queue struct {
	Name        string                 // 队列名
	Persistence bool                   `json:"persistence" yaml:"persistence"` // 持久化
	AutoDeleted bool                   `json:"autoDeleted" yaml:"autoDeleted"` // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
	Exclusive   bool                   `json:"exclusive" yaml:"exclusive"`     // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
	NoWait      bool                   `json:"noWait" yaml:"noWait"`           // 是否为非阻塞
	Arguments   map[string]interface{} // 额外属性
}

type RabbitConsumer added in v1.4.5

type RabbitConsumer interface {
	Init() error
	Consume(handler MsgHandler)
	Close() error
}

func NewConsumer added in v1.4.2

func NewConsumer(conf ConsumerConf) RabbitConsumer

type RabbitProducer added in v1.4.5

type RabbitProducer interface {
	Init() error
	SendMsg(ctx context.Context, msg *Msg) error
	Close() error
}

func NewProducer added in v1.4.2

func NewProducer(conf ProducerConf) RabbitProducer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL