rabbitMQ

package
v0.0.0-...-d1ade8f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: BSD-3-Clause Imports: 30 Imported by: 0

Documentation

Overview

1、在消费函数处理结束后再发送Ack; 2、支持设置消费协程数量,并发消费; 3、支持停止消费协程、并等待所有已经获取到的消息处理完毕,以便实现进程优雅退出

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultOptions = Options{
		QueueDeclareOptions: &DefaultQueueDeclareOptions,
		ConsumeConcurrent:   1000,
		ProducerSize:        10,
	}

	DefaultQueueDeclareOptions = QueueDeclareOptions{
		Capacity:  100000,
		ExpireSec: 86400,
		Overflow:  "drop-head",
	}
)
View Source
var DefaultPoolOptions = ChannelPoolOptions{
	Capacity: 2000,
	TTL:      5 * time.Second,
}

Functions

func AutoEncodeUrl

func AutoEncodeUrl(url string) string

func OnNewQueue

func OnNewQueue(f func(*Queue))

func ParseUrl

func ParseUrl(url string, keepRaw ...bool) (scheme, username, password, host string, err error)

解析形如 ‘amqp://user:password@host’ 的 rabbitMQ URL

keepRaw: 是否保持原始字符串、不进行 QueryUnescape 操作。默认为 false。

Types

type ChannelOpenError

type ChannelOpenError struct {
	// contains filtered or unexported fields
}

func (*ChannelOpenError) Error

func (this *ChannelOpenError) Error() string

type ChannelPool

type ChannelPool struct {
	// contains filtered or unexported fields
}

func AllChannelPool

func AllChannelPool() []*ChannelPool

func NewChannelPool

func NewChannelPool(url string, opt ...ChannelPoolOptions) (*ChannelPool, error)

func (*ChannelPool) Capacity

func (this *ChannelPool) Capacity() int

func (*ChannelPool) Close

func (this *ChannelPool) Close(d time.Duration) (closed bool)

关闭连接

func (*ChannelPool) ConnectedUrl

func (this *ChannelPool) ConnectedUrl() string

获取当前正在使用的连接字符串

func (*ChannelPool) Do

func (this *ChannelPool) Do(job string, f func(ch *amqp.Channel)) error

func (*ChannelPool) Idle

func (this *ChannelPool) Idle() int

可复用的空闲 amqp.Channel 个数

func (*ChannelPool) LastJob

func (this *ChannelPool) LastJob() []*JobInfo

遍历当前活动对象

func (*ChannelPool) LocalAddr

func (this *ChannelPool) LocalAddr() net.Addr

LocalAddr 获取本地连接信息

func (*ChannelPool) OnGet

func (this *ChannelPool) OnGet(f func(job string, err error))

func (*ChannelPool) OnPut

func (this *ChannelPool) OnPut(f func(job string, time time.Time, took time.Duration))

func (*ChannelPool) Ping

func (this *ChannelPool) Ping() error

func (*ChannelPool) SetCapacity

func (this *ChannelPool) SetCapacity(n int)

func (*ChannelPool) SetTTL

func (this *ChannelPool) SetTTL(d time.Duration)

func (*ChannelPool) Size

func (this *ChannelPool) Size() int

已经从对缓存池中申请出去的 amqp.Channel 个数

func (*ChannelPool) TTL

func (this *ChannelPool) TTL() time.Duration

type ChannelPoolBusyError

type ChannelPoolBusyError struct {
	// contains filtered or unexported fields
}

func (*ChannelPoolBusyError) Error

func (this *ChannelPoolBusyError) Error() string

type ChannelPoolOptions

type ChannelPoolOptions struct {
	TLSConfig   *tls.Config            // 当使用 ExternalAuth 时的 TLS 配置。为空表示不使用 ExternalAuth
	Capacity    int                    // amqp.Channel 缓存池容量上限。
	TTL         time.Duration          // amqp.Channel 在归还给缓存池之后多久被回收。默认 5 秒。
	Properties  map[string]interface{} //
	LastJobSize int                    //

	OnConnect    func(host string, err error)
	OnDisconnect func(host string, err error)
}

type ConsumeConfig

type ConsumeConfig struct {
	// 交换机,队列绑定的交换机,如果队列不存在,重建队列+绑定交换机
	Exchange string
	Queue    string
	// With a prefetch count greater than zero, the server will deliver that many
	// messages to consumers before acknowledgments are received.
	Qos        int
	Concurrent int
}

type ConsumeEventArgs

type ConsumeEventArgs struct {
	Time  time.Time
	Queue string
	Data  string
	Error error
	Took  time.Duration
}

type ConsumeOptions

type ConsumeOptions struct {
	// 交换机,队列绑定的交换机,如果队列不存在,重建队列+绑定交换机
	Exchange string
	// handler 超时时间
	Timeout time.Duration
	// With a prefetch count greater than zero, the server will deliver that many
	// messages to consumers before acknowledgments are received.
	Qos int
	// 当创建新的 Channel 时触发。
	OnChannel func(ch *amqp.Channel)
}

type GroupProducer

type GroupProducer struct {
	// contains filtered or unexported fields
}

func (*GroupProducer) Close

func (p *GroupProducer) Close()

func (*GroupProducer) Send

func (p *GroupProducer) Send(exchange, queue string, data string) error

func (*GroupProducer) SendToExchange

func (p *GroupProducer) SendToExchange(exchange string, data string) error

func (*GroupProducer) SendToQueue

func (p *GroupProducer) SendToQueue(queue string, data string) error

type IProducer

type IProducer interface {
	Close()
	Send(exchange string, queue string, data string) error
	SendToExchange(exchange string, data string) error
	SendToQueue(queue string, data string) error
}

type JobInfo

type JobInfo struct {
	Job  string        `json:"job,omitempty"`
	Time time.Time     `json:"time,omitempty"`
	Took time.Duration `json:"took,omitempty"`
}

type Options

type Options struct {
	NoAutoDeclare       bool                 `json:"no_auto_declare,omitempty"`       // 指示不自动声明队列。
	QueueDeclareOptions *QueueDeclareOptions `json:"queue_declare_options,omitempty"` // 声明队列时的默认参数。
	ConsumeConcurrent   int                  `json:"consume_concurrent,omitempty"`    // 消费者默认并发数
	NoJobCounter        bool                 `json:"no_job_counter,omitempty"`        // 是否不使用 JobCounter。
	NoProduceLog        bool                 `json:"no_produce_log,omitempty"`        // 当发送消息时是否记录日志
	NoConsumeLog        bool                 `json:"no_consume_log,omitempty"`        // 当接收到消息时是否记录日志
	NoWaitOnStop        bool                 `json:"no_wait_on_stop,omitempty"`       // 调用 Stop 方法时,是否等待当前正则执行的任务处理结束。
	Logger              logs.Logger          `json:"-"`                               //

	ProducerSize int `json:"producer_size,omitempty"` // 生产者数量
}

type ProduceEventArgs

type ProduceEventArgs struct {
	Time     time.Time
	Exchange string
	Queue    string
	Data     string
	Error    error
	Took     time.Duration
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Send

func (p *Producer) Send(exchange, queue string, data string) error

func (*Producer) SendToExchange

func (p *Producer) SendToExchange(exchange, data string) error

func (*Producer) SendToQueue

func (p *Producer) SendToQueue(queue, data string) error

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func AllQueue

func AllQueue() []*Queue

func New

func New(name string, pool *ChannelPool, opt ...Options) *Queue

func (*Queue) BindQueue

func (this *Queue) BindQueue(queue, exchange string) (err error)

将队列绑定到交换机上

func (*Queue) Consume

func (this *Queue) Consume(queue string, concurrent int, handler func(data string) (ack bool), opt ...ConsumeOptions) (stop func(wait time.Duration) (stopped bool), err error)

启动指定队列的消费者。

queue: 队列名
concurrent: 并发数(消费者协程数)
handler: 消息处理函数

当从队列中接收到消息并处理结束后会触发 Consume 事件。

func (*Queue) DeclareAndBindExchangeQueue

func (this *Queue) DeclareAndBindExchangeQueue(exchange, queue string) error

func (*Queue) DeclareExchange

func (this *Queue) DeclareExchange(exchange string, kind ...string) (err error)

声明一个交换机

exchange: 交换机
kind: 交换机类型,默认 fanout

func (*Queue) DeclareQueue

func (this *Queue) DeclareQueue(queue string, opt ...QueueDeclareOptions) (err error)

声明一个队列

queue: 队列名称
opt: 队列声明参数。默认使用 GetOptions().QueueDeclareOptions

func (*Queue) Do

func (this *Queue) Do(f func(ch *amqp.Channel) error) error

func (*Queue) GetDefaultProducer

func (this *Queue) GetDefaultProducer() IProducer

func (*Queue) GetLogger

func (this *Queue) GetLogger() logs.Logger

func (*Queue) GetOptions

func (this *Queue) GetOptions() Options

获取当前正在使用的参数

func (*Queue) GetPool

func (this *Queue) GetPool() *ChannelPool

func (*Queue) Name

func (this *Queue) Name() string

func (*Queue) NewGroupProducer

func (this *Queue) NewGroupProducer(size int) (gp IProducer, err error)

func (*Queue) NewProducer

func (this *Queue) NewProducer() (IProducer, error)

func (*Queue) NewSimpleConsumer

func (this *Queue) NewSimpleConsumer(cfg *ConsumeConfig) *SimpleConsumer

func (*Queue) OnConsume

func (this *Queue) OnConsume(f func(e *ConsumeEventArgs))

设置 Produce 事件回调函数

func (*Queue) OnProduce

func (this *Queue) OnProduce(f func(e *ProduceEventArgs))

设置 Produce 事件回调函数

func (*Queue) Ping

func (this *Queue) Ping() error

检测 MQ 是否可以访问。如果不可用则返回对应的错误信息。

func (*Queue) Produce

func (this *Queue) Produce(exchange, queue, data string) (err error)

发送一条消息。 发送结束后将触发 OnProduce 事件。

func (*Queue) QueueDeclareOptions

func (this *Queue) QueueDeclareOptions() *QueueDeclareOptions

func (*Queue) RemoveExchange

func (this *Queue) RemoveExchange(exchange string) (err error)

从服务端移除一个交换机

func (*Queue) RemoveQueue

func (this *Queue) RemoveQueue(queue string) (err error)

从服务端移除一个队列

func (*Queue) SetLogger

func (this *Queue) SetLogger(logger logs.Logger)

func (*Queue) Stop

func (this *Queue) Stop(wait time.Duration) (stopped bool)

停止所有的 Consumer(如果有启动的话)、并关闭连接。

func (*Queue) UnbindQueue

func (this *Queue) UnbindQueue(queue, exchange string) (err error)

取消队列绑定

type QueueDeclareOptions

type QueueDeclareOptions struct {
	Capacity  int    `json:"capacity,omitempty"`   // 100000
	ExpireSec int    `json:"expire_sec,omitempty"` // x-expires,秒
	Overflow  string `json:"overflow,omitempty"`   // x-overflow
}

type SimpleConsumer

type SimpleConsumer struct {
	// contains filtered or unexported fields
}

func (*SimpleConsumer) Handle

func (c *SimpleConsumer) Handle(handle func(data string) (ack bool, err error)) error

func (*SimpleConsumer) IsStop

func (c *SimpleConsumer) IsStop() bool

func (*SimpleConsumer) Stop

func (c *SimpleConsumer) Stop(wait time.Duration) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL