Documentation ¶
Index ¶
- func MustNewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue
- func NewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)
- type ConsumeHandle
- type ConsumeHandler
- type MessageHandler
- type NqConf
- type PushOption
- type Pusher
- type QueueOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MustNewQueue ¶
func MustNewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue
func NewQueue ¶
func NewQueue(c NqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error)
Types ¶
type ConsumeHandle ¶
type ConsumeHandler ¶
func WithHandle ¶
func WithHandle(handle ConsumeHandle) ConsumeHandler
type MessageHandler ¶
type MessageHandler struct { }
func (*MessageHandler) Consume ¶
func (mh *MessageHandler) Consume(key, val string) error
type NqConf ¶
type NqConf struct { // service.ServiceConf Brokers []string //连接的nsqlookupd Group string //可默认为分区ID,标准版nsq无效,有赞版有用(类型为int,需要转换) Topic string Offset string `json:",options=first|last,default=last"` //标准版nsq无效,有赞版有用(从哪个位置读取) Conns int `json:",default=1"` //建立多少连接 Consumers int `json:",default=1"` //1个连接,多个协程读取,nsq固定为1. Processors int `json:",default=8"` //1个连接,多少协程去消费 Channel string `json:",default=default"` //消费的channel,默认为default }
type PushOption ¶
type PushOption func(options *chunkOptions)
func WithChunkSize ¶
func WithChunkSize(chunkSize int) PushOption
func WithFlushInterval ¶
func WithFlushInterval(interval time.Duration) PushOption
type QueueOption ¶
type QueueOption func(*queueOptions)
func WithMetrics ¶
func WithMetrics(metrics *stat.Metrics) QueueOption
Click to show internal directories.
Click to hide internal directories.