Versions in this module Expand all Collapse all v0 v0.0.1 Feb 6, 2023 Changes in this version + type BatchConsumer struct + func NewBatchConsumer(cfg BatchConsumerConf, handler ConsumeHandler, consumer IConsumer, ...) *BatchConsumer + func (bc *BatchConsumer) GracefulStop(ctx context.Context) + func (bc *BatchConsumer) Start() + func (bc *BatchConsumer) Stop() + type BatchConsumerConf struct + CacheCapacity int + Consumers int + Processors int + type BatchConsumerOption func(*batchConsumerOptions) + func WithLogger(logger log15.Logger) BatchConsumerOption + type ConsumeHandle func(key string, data []byte) error + type ConsumeHandler interface + Consume func(key string, data []byte) error + func WithHandle(handle ConsumeHandle) ConsumeHandler + type Consumer struct + func NewConsumer(cfg ConsumerConfig, logger log15.Logger) *Consumer + func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error + func (c *Consumer) Close() error + func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error + func (c *Consumer) FetchMessage(ctx context.Context) (message *sarama.ConsumerMessage, err error) + func (c *Consumer) Setup(sarama.ConsumerGroupSession) error + type ConsumerConfig struct + Brokers []string + CacheCapacity int + ConnectTimeout time.Duration + Group string + Topic string + Version string + type IConsumer interface + Close func() error + FetchMessage func(ctx context.Context) (message *sarama.ConsumerMessage, err error) + type Producer struct + func NewProducer(cfg ProducerConfig) *Producer + func (p *Producer) Close() error + func (p *Producer) Publish(topic, k string, v []byte) (int32, int64, error) + type ProducerConfig struct + Brokers []string + Version string