rabbitmq

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2021 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	StateClosed    = uint8(0)
	StateOpened    = uint8(1)
	StateReopening = uint8(2)
)

Functions

func DestroyQueueName

func DestroyQueueName(name string) (string, string, string)

DestroyQueueName 使用当前客户端分解一个ESB消息的队列名称,{name}名称满足格式:sys_esb_{systemId}_{node} 其中{systemId}为目标系统的四位数数字ID,{node}为目标系统监听的ESB节点标示(参考{@link ESBNode}。

@param name ESB消息的队列名称 @return "sys_esb","{systemId}","{node}"

func NewProvider

func NewProvider(node esb.Node, logger log.Logger, client *Client) *rabbitProvider

Types

type Binding

type Binding struct {
	RouteKey string
	Queues   []*Queue
	NoWait   bool       // 默认为false
	Args     amqp.Table // 默认为nil
}

Binding routeKey ==>队列

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(opts ...Option) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) CloseConsumer

func (c *Client) CloseConsumer(name string) error

func (*Client) CloseProducer

func (c *Client) CloseProducer(name string) error

func (*Client) Consumer

func (c *Client) Consumer(name string) (*Consumer, error)

func (*Client) Open

func (c *Client) Open() (mq *Client, err error)

func (*Client) Producer

func (c *Client) Producer(name string) (*Producer, error)

func (*Client) State

func (c *Client) State() uint8

type ConsumeOption

type ConsumeOption struct {
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqp.Table
}

ConsumeOption 消费者消费选项

func DefaultConsumeOption

func DefaultConsumeOption() *ConsumeOption

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer 基于RabbitMQ消息中间件的客户端实现。

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) CloseChan

func (c *Consumer) CloseChan()

CloseChan 该接口仅用于测试使用, 勿手动调用

func (*Consumer) Name

func (c *Consumer) Name() string

func (*Consumer) Open

func (c *Consumer) Open() error

func (*Consumer) SetExchangeBinds

func (c *Consumer) SetExchangeBinds(eb []*ExchangeBinds) *Consumer

func (*Consumer) SetMsgCallback

func (c *Consumer) SetMsgCallback(cb chan<- Delivery) *Consumer

func (*Consumer) SetQos

func (c *Consumer) SetQos(prefetch int) *Consumer

SetQos 设置channel粒度的Qos, prefetch取值范围[0,∞), 默认为0 如果想要RoundRobin地进行消费,设置prefetch为1即可 注意:在调用Open前设置

func (*Consumer) State

func (c *Consumer) State() uint8

type Delivery

type Delivery struct {
	amqp.Delivery
}

type Exchange

type Exchange struct {
	Name       string
	Kind       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Args       amqp.Table // 默认为nil
}

Exchange 基于amqp的Exchange配置

func DefaultExchange

func DefaultExchange(name string, kind string, exchangeArgs amqp.Table) *Exchange

type ExchangeBinds

type ExchangeBinds struct {
	Exchange *Exchange
	Bindings []*Binding
}

ExchangeBinds exchange ==> routeKey ==> queues

type Option

type Option func(*options)

Option 是ESB选项.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer 基于RabbitMQ的生产者封装。

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Confirm

func (p *Producer) Confirm(enable bool) *Producer

Confirm 是否开启生产者confirm功能, 默认为false, 该选项在Open()前设置. 说明: 目前仅实现串行化的confirm, 每次的等待confirm额外需要约50ms,建议上层并发调用Publish

func (*Producer) ForDirect

func (p *Producer) ForDirect(exchange, route, queue, data string) error

func (*Producer) ForFanout

func (p *Producer) ForFanout(exchange, data string) error

func (*Producer) ForQueueArgs

func (p *Producer) ForQueueArgs(exchange, route, kind string, bindings []*Binding, data []byte) error

func (*Producer) ForTopic

func (p *Producer) ForTopic(exchange, route, queue, data string) error

func (*Producer) IsOpen

func (p *Producer) IsOpen() bool

func (*Producer) Name

func (p *Producer) Name() string

func (*Producer) Open

func (p *Producer) Open() error

func (*Producer) Publish

func (p *Producer) Publish(exchange, route string, msg *PublishMsg) error

Publish 在同步Publish Confirm模式下, 每次Publish将额外有约50ms的等待时间.如果采用这种模式,建议上层并发publish

func (*Producer) SetExchangeBinds

func (p *Producer) SetExchangeBinds(eb []*ExchangeBinds) *Producer

func (*Producer) State

func (p *Producer) State() uint8

type PublishMsg

type PublishMsg struct {
	ContentType     string // MIME内容类型
	ContentEncoding string // MIME内容编码
	DeliveryMode    uint8  // 暂时或持久
	Priority        uint8  // 0至9
	Timestamp       time.Time
	Body            []byte
}

PublishMsg 生产者生产的数据格式

func NewPublishMsg

func NewPublishMsg(body []byte) *PublishMsg

type Queue

type Queue struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Args       amqp.Table
}

Queue 基于amqp的Queue配置

func DefaultQueue

func DefaultQueue(name string, queueArgs amqp.Table) *Queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL