Versions in this module Expand all Collapse all v1 v1.1.82 Jul 19, 2024 Changes in this version + const CA_FILE + const CA_TEXT + const SASL_PLAIN + const SASL_SCRAM + func Base642PEM(base64Key string) []byte + func Base64ToPEM(base64String string, certType string) []byte + func VerifyCertPemStr(certStr string) bool + type KqPusherConf struct + Brokers []string + Compression int8 + Topic string + type KqSaslCaConf struct + CA_WAY int8 + CaFile string + CaPEM string + CertFile string + CertPEM string + ForceCommit bool + KeyFile string + KeyPEM string + Password string + SASL_WAY int8 + Username string type Pusher + func NewConfPusher(c KqPusherConf, opts ...PushOption) *Pusher v1.1.81 Jul 18, 2024 Changes in this version + func MustNewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) queue.MessageQueue + func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.MessageQueue, error) + type ConsumeErrorHandler func(msg kafka.Message, err error) + type ConsumeHandle func(key, value string) error + type ConsumeHandler interface + Consume func(key, value string) error + func WithHandle(handle ConsumeHandle) ConsumeHandler + type KqConf struct + Brokers []string + CaFile string + Conns int + Consumers int + ForceCommit bool + Group string + MaxBytes int + MinBytes int + Offset string + Password string + Processors int + Topic string + Username string + type PushOption func(options *pushOptions) + func WithAllowAutoTopicCreation() PushOption + func WithChunkSize(chunkSize int) PushOption + func WithFlushInterval(interval time.Duration) PushOption + type Pusher struct + func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher + func (p *Pusher) Close() error + func (p *Pusher) Name() string + func (p *Pusher) Push(v string) error + type QueueOption func(*queueOptions) + func WithCommitInterval(interval time.Duration) QueueOption + func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption + func WithMaxWait(wait time.Duration) QueueOption + func WithMetrics(metrics *stat.Metrics) QueueOption + func WithQueueCapacity(queueCapacity int) QueueOption