Versions in this module Expand all Collapse all v1 v1.1.9 Apr 24, 2023 Changes in this version + func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue + func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) + type ConsumeErrorHandler func(msg kafka.Message, err error) + type ConsumeHandle func(key, value string) error + type ConsumeHandler interface + Consume func(key, value string) error + func WithHandle(handle ConsumeHandle) ConsumeHandler + type KqConf struct + Brokers []string + Conns int + Consumers int + ForceCommit bool + Group string + MaxBytes int + MinBytes int + Offset string + Password string + Processors int + Topic string + Username string + type PushOption func(options *chunkOptions) + func WithChunkSize(chunkSize int) PushOption + func WithFlushInterval(interval time.Duration) PushOption + type Pusher struct + func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher + func (p *Pusher) Close() error + func (p *Pusher) Name() string + func (p *Pusher) Push(v string) error + type QueueOption func(*queueOptions) + func WithCommitInterval(interval time.Duration) QueueOption + func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption + func WithMaxWait(wait time.Duration) QueueOption + func WithMetrics(metrics *stat.Metrics) QueueOption + func WithQueueCapacity(queueCapacity int) QueueOption