rabbitmq

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Direct  = "direct"
	Fanout  = "fanout"
	Topic   = "topic"
	Headers = "headers"
)
View Source
const (
	DefaultPullBatchSize = 10
)

Variables

View Source
var (
	EmptyConnection           = errors.New("empty connection get channel")
	EmptyExchange             = errors.New("empty exchange")
	EmptyQueue                = errors.New("empty queue")
	EmptyRoutingKeys          = errors.New("empty routing key")
	ExecuteStartBeforeSendMsg = errors.New("execute start function before send msg")
	ExecuteStartBeforeRecvMsg = errors.New("execute start function before receive msg")
)

Functions

func DefaultFailCallback

func DefaultFailCallback(body []byte) error

func DefaultSuccessCallback

func DefaultSuccessCallback(body []byte) error

Types

type ConsumeCallback

type ConsumeCallback func(msgId string, body []byte, err error) error

type Consumer

type Consumer interface {
	Consume([]byte) error
	ConsumeSuccessCallback(msgId string, body []byte)
	ConsumeFailConsumeCallback(msgId string, body []byte, err error)
}

type Exchange

type Exchange struct {
	Name        string                 `json:"name" yaml:"name"`               // 交换器名
	ExType      string                 `json:"exType" yaml:"exType"`           // 类型
	Persistence bool                   `json:"persistence" yaml:"persistence"` // 持久化
	AutoDeleted bool                   `json:"autoDeleted" yaml:"autoDeleted"` // 是否自动删除(自动删除的前提是至少有一个队列或者交换器与这和交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑)
	Internal    bool                   `json:"internal" yaml:"internal"`       // 设置是否内置的。true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
	NoWait      bool                   `json:"noWait" yaml:"noWait"`           // 是否为非阻塞
	Arguments   map[string]interface{} // 额外属性
}

Exchange 交换机

func (*Exchange) CheckAndSet

func (e *Exchange) CheckAndSet() error

CheckAndSet 处理默认值

type Queue

type Queue struct {
	Name        string                 // 队列名
	Persistence bool                   `json:"persistence" yaml:"persistence"` // 持久化
	AutoDeleted bool                   `json:"autoDeleted" yaml:"autoDeleted"` // 是否自动删除(前提是至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。注意:生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列)
	Exclusive   bool                   `json:"exclusive" yaml:"exclusive"`     // 是否为排他队列(排他的队列仅对“首次”声明的conn可见[一个conn中的其他channel也能访问该队列],conn结束后队列删除)
	NoWait      bool                   `json:"noWait" yaml:"noWait"`           // 是否为非阻塞
	Arguments   map[string]interface{} // 额外属性
}

func (*Queue) CheckAndSet

func (q *Queue) CheckAndSet() error

CheckAndSet 处理默认值

type RabbitMQ

type RabbitMQ struct {
	Url                    string       `json:"url,omitempty" yaml:"url"`
	RoutingKey             string       `json:"routingKey,omitempty" yaml:"routingKey"`
	Exchange               *Exchange    `json:"exchange,omitempty" yaml:"exchange"`
	Queue                  *Queue       `json:"queue,omitempty" yaml:"queue"`
	SendSuccess            SendCallback // 发送成功后回调函数
	SendFail               SendCallback // 发送失败后回调函数
	SendTeardown           SendCallback // 不论失败还是成功, 发送后定触发的函数
	ConsumeSuccessCallback func([]byte, error) error

	// 消费配置
	PullBatchSize int                    `json:"pullBatchSize" yaml:"pullBatchSize"` // 批量拉取数量
	AutoAck       bool                   `json:"autoAck" yaml:"autoAck"`             // 自动提交
	Exclusive     bool                   `json:"exclusive" yaml:"exclusive"`         // 是否排他
	NoLocal       bool                   `json:"noLocal" yaml:"noLocal"`             // 是否接收只同一个连接中的消息,若为true,则只能接收别的conn中发送的消息
	NoWait        bool                   `json:"noWait" yaml:"noWait"`               // 非阻塞
	Args          map[string]interface{} // 额外参数
	// contains filtered or unexported fields
}

func (*RabbitMQ) BindExchangeQueue

func (r *RabbitMQ) BindExchangeQueue(ch *amqp.Channel) error

BindExchangeQueue 交换机绑定队列

func (*RabbitMQ) Close

func (r *RabbitMQ) Close() error

Close 关闭mq链接

func (*RabbitMQ) CloseChannel

func (r *RabbitMQ) CloseChannel() error

CloseChannel 关闭通道

func (*RabbitMQ) Connect

func (r *RabbitMQ) Connect() error

Connect 连接

func (*RabbitMQ) ConsumeMsg

func (r *RabbitMQ) ConsumeMsg(consumer Consumer)

ConsumeMsg 消费消息

func (*RabbitMQ) ExchangeDeclare

func (r *RabbitMQ) ExchangeDeclare(ch *amqp.Channel) error

ExchangeDeclare 声明交换机

func (*RabbitMQ) OpenChannel

func (r *RabbitMQ) OpenChannel() error

OpenChannel 开启Channel

func (*RabbitMQ) QueueDeclare

func (r *RabbitMQ) QueueDeclare(ch *amqp.Channel) error

QueueDeclare 声明队列

func (*RabbitMQ) RecvMsg

func (r *RabbitMQ) RecvMsg() (<-chan amqp.Delivery, error)

RecvMsg 接收消息

func (*RabbitMQ) SendMsg

func (r *RabbitMQ) SendMsg(body []byte) (bool, error)

SendMsg 发送消息

func (*RabbitMQ) Start

func (r *RabbitMQ) Start() error

Start 开始

type SendCallback

type SendCallback func(body []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL