Documentation ¶
Overview ¶
1、在消费函数处理结束后再发送Ack; 2、支持设置消费协程数量,并发消费; 3、支持停止消费协程、并等待所有已经获取到的消息处理完毕,以便实现进程优雅退出
Index ¶
- Variables
- func AutoEncodeUrl(url string) string
- func OnNewQueue(f func(*Queue))
- func ParseUrl(url string, keepRaw ...bool) (scheme, username, password, host string, err error)
- type ChannelOpenError
- type ChannelPool
- func (this *ChannelPool) Capacity() int
- func (this *ChannelPool) Close(d time.Duration) (closed bool)
- func (this *ChannelPool) ConnectedUrl() string
- func (this *ChannelPool) Do(job string, f func(ch *amqp.Channel)) error
- func (this *ChannelPool) Idle() int
- func (this *ChannelPool) LastJob() []*JobInfo
- func (this *ChannelPool) LocalAddr() net.Addr
- func (this *ChannelPool) OnGet(f func(job string, err error))
- func (this *ChannelPool) OnPut(f func(job string, time time.Time, took time.Duration))
- func (this *ChannelPool) Ping() error
- func (this *ChannelPool) SetCapacity(n int)
- func (this *ChannelPool) SetTTL(d time.Duration)
- func (this *ChannelPool) Size() int
- func (this *ChannelPool) TTL() time.Duration
- type ChannelPoolBusyError
- type ChannelPoolOptions
- type ConsumeConfig
- type ConsumeEventArgs
- type ConsumeOptions
- type GroupProducer
- type IProducer
- type JobInfo
- type Options
- type ProduceEventArgs
- type Producer
- type Queue
- func (this *Queue) BindQueue(queue, exchange string) (err error)
- func (this *Queue) Consume(queue string, concurrent int, handler func(data string) (ack bool), ...) (stop func(wait time.Duration) (stopped bool), err error)
- func (this *Queue) DeclareAndBindExchangeQueue(exchange, queue string) error
- func (this *Queue) DeclareExchange(exchange string, kind ...string) (err error)
- func (this *Queue) DeclareQueue(queue string, opt ...QueueDeclareOptions) (err error)
- func (this *Queue) Do(f func(ch *amqp.Channel) error) error
- func (this *Queue) GetDefaultProducer() IProducer
- func (this *Queue) GetLogger() logs.Logger
- func (this *Queue) GetOptions() Options
- func (this *Queue) GetPool() *ChannelPool
- func (this *Queue) Name() string
- func (this *Queue) NewGroupProducer(size int) (gp IProducer, err error)
- func (this *Queue) NewProducer() (IProducer, error)
- func (this *Queue) NewSimpleConsumer(cfg *ConsumeConfig) *SimpleConsumer
- func (this *Queue) OnConsume(f func(e *ConsumeEventArgs))
- func (this *Queue) OnProduce(f func(e *ProduceEventArgs))
- func (this *Queue) Ping() error
- func (this *Queue) Produce(exchange, queue, data string) (err error)
- func (this *Queue) QueueDeclareOptions() *QueueDeclareOptions
- func (this *Queue) RemoveExchange(exchange string) (err error)
- func (this *Queue) RemoveQueue(queue string) (err error)
- func (this *Queue) SetLogger(logger logs.Logger)
- func (this *Queue) Stop(wait time.Duration) (stopped bool)
- func (this *Queue) UnbindQueue(queue, exchange string) (err error)
- type QueueDeclareOptions
- type SimpleConsumer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( DefaultOptions = Options{ QueueDeclareOptions: &DefaultQueueDeclareOptions, ConsumeConcurrent: 1000, ProducerSize: 10, } DefaultQueueDeclareOptions = QueueDeclareOptions{ Capacity: 100000, ExpireSec: 86400, Overflow: "drop-head", } )
View Source
var DefaultPoolOptions = ChannelPoolOptions{ Capacity: 2000, TTL: 5 * time.Second, }
Functions ¶
func AutoEncodeUrl ¶
func OnNewQueue ¶
func OnNewQueue(f func(*Queue))
Types ¶
type ChannelOpenError ¶
type ChannelOpenError struct {
// contains filtered or unexported fields
}
func (*ChannelOpenError) Error ¶
func (this *ChannelOpenError) Error() string
type ChannelPool ¶
type ChannelPool struct {
// contains filtered or unexported fields
}
func AllChannelPool ¶
func AllChannelPool() []*ChannelPool
func NewChannelPool ¶
func NewChannelPool(url string, opt ...ChannelPoolOptions) (*ChannelPool, error)
func (*ChannelPool) Capacity ¶
func (this *ChannelPool) Capacity() int
func (*ChannelPool) OnGet ¶
func (this *ChannelPool) OnGet(f func(job string, err error))
func (*ChannelPool) Ping ¶
func (this *ChannelPool) Ping() error
func (*ChannelPool) SetCapacity ¶
func (this *ChannelPool) SetCapacity(n int)
func (*ChannelPool) SetTTL ¶
func (this *ChannelPool) SetTTL(d time.Duration)
func (*ChannelPool) TTL ¶
func (this *ChannelPool) TTL() time.Duration
type ChannelPoolBusyError ¶
type ChannelPoolBusyError struct {
// contains filtered or unexported fields
}
func (*ChannelPoolBusyError) Error ¶
func (this *ChannelPoolBusyError) Error() string
type ChannelPoolOptions ¶
type ChannelPoolOptions struct { TLSConfig *tls.Config // 当使用 ExternalAuth 时的 TLS 配置。为空表示不使用 ExternalAuth Capacity int // amqp.Channel 缓存池容量上限。 TTL time.Duration // amqp.Channel 在归还给缓存池之后多久被回收。默认 5 秒。 Properties map[string]interface{} // LastJobSize int // OnConnect func(host string, err error) OnDisconnect func(host string, err error) }
type ConsumeConfig ¶
type ConsumeEventArgs ¶
type ConsumeOptions ¶
type ConsumeOptions struct { // 交换机,队列绑定的交换机,如果队列不存在,重建队列+绑定交换机 Exchange string // handler 超时时间 Timeout time.Duration // With a prefetch count greater than zero, the server will deliver that many // messages to consumers before acknowledgments are received. Qos int // 当创建新的 Channel 时触发。 OnChannel func(ch *amqp.Channel) }
type GroupProducer ¶
type GroupProducer struct {
// contains filtered or unexported fields
}
func (*GroupProducer) Close ¶
func (p *GroupProducer) Close()
func (*GroupProducer) Send ¶
func (p *GroupProducer) Send(exchange, queue string, data string) error
func (*GroupProducer) SendToExchange ¶
func (p *GroupProducer) SendToExchange(exchange string, data string) error
func (*GroupProducer) SendToQueue ¶
func (p *GroupProducer) SendToQueue(queue string, data string) error
type Options ¶
type Options struct { NoAutoDeclare bool `json:"no_auto_declare,omitempty"` // 指示不自动声明队列。 QueueDeclareOptions *QueueDeclareOptions `json:"queue_declare_options,omitempty"` // 声明队列时的默认参数。 ConsumeConcurrent int `json:"consume_concurrent,omitempty"` // 消费者默认并发数 NoJobCounter bool `json:"no_job_counter,omitempty"` // 是否不使用 JobCounter。 NoProduceLog bool `json:"no_produce_log,omitempty"` // 当发送消息时是否记录日志 NoConsumeLog bool `json:"no_consume_log,omitempty"` // 当接收到消息时是否记录日志 NoWaitOnStop bool `json:"no_wait_on_stop,omitempty"` // 调用 Stop 方法时,是否等待当前正则执行的任务处理结束。 Logger logs.Logger `json:"-"` // ProducerSize int `json:"producer_size,omitempty"` // 生产者数量 }
type ProduceEventArgs ¶
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func (*Producer) SendToExchange ¶
func (*Producer) SendToQueue ¶
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
func (*Queue) Consume ¶
func (this *Queue) Consume(queue string, concurrent int, handler func(data string) (ack bool), opt ...ConsumeOptions) (stop func(wait time.Duration) (stopped bool), err error)
启动指定队列的消费者。
queue: 队列名 concurrent: 并发数(消费者协程数) handler: 消息处理函数
当从队列中接收到消息并处理结束后会触发 Consume 事件。
func (*Queue) DeclareAndBindExchangeQueue ¶
func (*Queue) DeclareQueue ¶
func (this *Queue) DeclareQueue(queue string, opt ...QueueDeclareOptions) (err error)
声明一个队列
queue: 队列名称 opt: 队列声明参数。默认使用 GetOptions().QueueDeclareOptions
func (*Queue) GetDefaultProducer ¶
func (*Queue) GetPool ¶
func (this *Queue) GetPool() *ChannelPool
func (*Queue) NewGroupProducer ¶
func (*Queue) NewProducer ¶
func (*Queue) NewSimpleConsumer ¶
func (this *Queue) NewSimpleConsumer(cfg *ConsumeConfig) *SimpleConsumer
func (*Queue) OnConsume ¶
func (this *Queue) OnConsume(f func(e *ConsumeEventArgs))
设置 Produce 事件回调函数
func (*Queue) OnProduce ¶
func (this *Queue) OnProduce(f func(e *ProduceEventArgs))
设置 Produce 事件回调函数
func (*Queue) QueueDeclareOptions ¶
func (this *Queue) QueueDeclareOptions() *QueueDeclareOptions
func (*Queue) RemoveExchange ¶
从服务端移除一个交换机
func (*Queue) UnbindQueue ¶
取消队列绑定
type QueueDeclareOptions ¶
type SimpleConsumer ¶
type SimpleConsumer struct {
// contains filtered or unexported fields
}
func (*SimpleConsumer) Handle ¶
func (c *SimpleConsumer) Handle(handle func(data string) (ack bool, err error)) error
func (*SimpleConsumer) IsStop ¶
func (c *SimpleConsumer) IsStop() bool
Click to show internal directories.
Click to hide internal directories.