Versions in this module Expand all Collapse all v1 v1.1.0 Apr 6, 2022 Changes in this version + var ErrConsumerClosed = errors.New("consumer closed") + var ErrIllegalMqType = errors.New("illegal mq type") + var ErrKafkaConfigIllegal = errors.New("kafka config illegal") + var ErrKafkaLoggerNil = errors.New("kafka logger nil") v1.0.0 Mar 23, 2022 Changes in this version + const KAFKA_MQ_TYP + var ConsumerClosedErr = errors.New("consumer closed") + var IllegalMqTypeErr = errors.New("illegal mq type") + var KafkaConfigIllegalErr = errors.New("kafka config illegal") + var KafkaLoggerNilErr = errors.New("kafka logger nil") + func GetClusterConfig(config *KafkaConfig) *cluster.Config + func GetSaramConfig(config *KafkaConfig) *sarama.Config + func WriteAsyncMsg(ctx context.Context, key string, value []byte, properties map[string]string) error + type Client interface + NewConsumer func(ctx context.Context, consumerName string) (Consumer, error) + NewProducer func(ctx context.Context) (Producer, error) + func NewClient(ctx context.Context, m MqConfig) (Client, error) + type Consumer interface + Close func() error + FetchMsg func(ctx context.Context, value interface{}) (context.Context, Message, error) + FetchPayloadMsg func(ctx context.Context) (context.Context, []byte, Message, error) + ReadMsg func(ctx context.Context, value interface{}) (context.Context, error) + ReadPayloadMsg func(ctx context.Context) (context.Context, []byte, error) + type KafkaClient struct + func NewKafkaClient(ctx context.Context, config *KafkaConfig) (KafkaClient, error) + func (client *KafkaClient) NewConsumer(ctx context.Context, consumerName string) (Consumer, error) + func (client *KafkaClient) NewProducer(ctx context.Context) (Producer, error) + type KafkaConfig struct + BrokerAddr []string + L MqLog + Password string + Topic string + TopicType string + User string + func NewKafkaConfig() KafkaConfig + func (conf *KafkaConfig) GetBrokerAddr() []string + func (conf *KafkaConfig) GetLog() MqLog + func (conf *KafkaConfig) GetMqTopic() string + func (conf *KafkaConfig) GetMqType() string + func (conf *KafkaConfig) GetPassword() string + func (conf *KafkaConfig) GetUser() string + func (conf *KafkaConfig) SetBrokerAddr(addrs []string) + func (conf *KafkaConfig) SetLog(logger MqLog) + func (conf *KafkaConfig) SetMqTopic(topic string) + func (conf *KafkaConfig) SetMqType(mqTyp string) + func (conf *KafkaConfig) SetPassword(password string) + func (conf *KafkaConfig) SetUser(user string) + type KafkaConsumer struct + func (c *KafkaConsumer) Close() error + func (c *KafkaConsumer) FetchMsg(ctx context.Context, value interface{}) (context.Context, Message, error) + func (c *KafkaConsumer) FetchPayloadMsg(ctx context.Context) (context.Context, []byte, Message, error) + func (c *KafkaConsumer) InitConsumerReturn(ctx context.Context) + func (c *KafkaConsumer) ReadMsg(ctx context.Context, value interface{}) (context.Context, error) + func (c *KafkaConsumer) ReadPayloadMsg(ctx context.Context) (context.Context, []byte, error) + type KafkaMssage struct + func (m *KafkaMssage) Commit(metadata string) + type KafkaProducer struct + func (p *KafkaProducer) Close() error + func (p *KafkaProducer) InitProducerReturn(ctx context.Context) + func (p *KafkaProducer) WriteMsg(ctx context.Context, key string, value []byte, properties map[string]string) (partition int32, msgId string, err error) + type Message interface + Commit func(metadata string) + type MqConfig interface + GetBrokerAddr func() []string + GetMqTopic func() string + GetMqType func() string + GetPassword func() string + GetUser func() string + SetBrokerAddr func(addrs []string) + SetMqTopic func(topic string) + SetMqType func(mqTyp string) + SetPassword func(password string) + SetUser func(user string) + type MqLog interface + Debugf func(f string, args ...interface{}) + Errorf func(f string, args ...interface{}) + Infof func(f string, args ...interface{}) + Warnf func(f string, args ...interface{}) + type Producer interface + Close func() error + WriteMsg func(ctx context.Context, key string, value []byte, properties map[string]string) (partition int32, msgId string, err error)