Versions in this module Expand all Collapse all v1 v1.12.1 Dec 19, 2024 v1.12.0 Dec 19, 2024 Changes in this version + type AsyncProducer struct + Producer sarama.AsyncProducer + func InitAsyncProducer(addrs []string, opts ...AsyncProducerOption) (*AsyncProducer, error) + func (p *AsyncProducer) Close() error + func (p *AsyncProducer) SendData(topic string, multiData ...interface{}) error + func (p *AsyncProducer) SendMessage(messages ...*sarama.ProducerMessage) error + type AsyncProducerOption func(*asyncProducerOptions) + func AsyncProducerWithClientID(clientID string) AsyncProducerOption + func AsyncProducerWithConfig(config *sarama.Config) AsyncProducerOption + func AsyncProducerWithFlushBytes(flushBytes int) AsyncProducerOption + func AsyncProducerWithFlushFrequency(flushFrequency time.Duration) AsyncProducerOption + func AsyncProducerWithFlushMessages(flushMessages int) AsyncProducerOption + func AsyncProducerWithHandleFailed(handleFailedFn AsyncSendFailedHandlerFn) AsyncProducerOption + func AsyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) AsyncProducerOption + func AsyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) AsyncProducerOption + func AsyncProducerWithReturnSuccesses(returnSuccesses bool) AsyncProducerOption + func AsyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) AsyncProducerOption + func AsyncProducerWithVersion(version sarama.KafkaVersion) AsyncProducerOption + func AsyncProducerWithZapLogger(zapLogger *zap.Logger) AsyncProducerOption + type AsyncSendFailedHandlerFn func(msg *sarama.ProducerMessage) error + type Backlog struct + Backlog int64 + NextConsumeOffset int64 + Partition int32 + type ClientManager struct + func InitClientManager(addrs []string, groupID string) (*ClientManager, error) + func (m *ClientManager) Close() error + func (m *ClientManager) GetBacklog(topic string) (int64, []*Backlog, error) + type Consumer struct + C sarama.Consumer + func InitConsumer(addrs []string, opts ...ConsumerOption) (*Consumer, error) + func (c *Consumer) Close() error + func (c *Consumer) ConsumeAllPartition(ctx context.Context, topic string, offset int64, handleFn HandleMessageFn) + func (c *Consumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, ...) + type ConsumerGroup struct + Group sarama.ConsumerGroup + func InitConsumerGroup(addrs []string, groupID string, opts ...ConsumerOption) (*ConsumerGroup, error) + func (c *ConsumerGroup) Close() error + func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handleMessageFn HandleMessageFn) error + func (c *ConsumerGroup) ConsumeCustom(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error + type ConsumerOption func(*consumerOptions) + func ConsumerWithClientID(clientID string) ConsumerOption + func ConsumerWithConfig(config *sarama.Config) ConsumerOption + func ConsumerWithGroupStrategies(groupStrategies ...sarama.BalanceStrategy) ConsumerOption + func ConsumerWithOffsetsAutoCommitEnable(offsetsAutoCommitEnable bool) ConsumerOption + func ConsumerWithOffsetsAutoCommitInterval(offsetsAutoCommitInterval time.Duration) ConsumerOption + func ConsumerWithOffsetsInitial(offsetsInitial int64) ConsumerOption + func ConsumerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) ConsumerOption + func ConsumerWithVersion(version sarama.KafkaVersion) ConsumerOption + func ConsumerWithZapLogger(zapLogger *zap.Logger) ConsumerOption + type HandleMessageFn func(msg *sarama.ConsumerMessage) error + type Message struct + Data []byte + Key []byte + Topic string + type ProducerMessage = sarama.ProducerMessage + type SyncProducer struct + Producer sarama.SyncProducer + func InitSyncProducer(addrs []string, opts ...SyncProducerOption) (*SyncProducer, error) + func (p *SyncProducer) Close() error + func (p *SyncProducer) SendData(topic string, data interface{}) (int32, int64, error) + func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error) + type SyncProducerOption func(*syncProducerOptions) + func SyncProducerWithClientID(clientID string) SyncProducerOption + func SyncProducerWithConfig(config *sarama.Config) SyncProducerOption + func SyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) SyncProducerOption + func SyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) SyncProducerOption + func SyncProducerWithReturnSuccesses(returnSuccesses bool) SyncProducerOption + func SyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) SyncProducerOption + func SyncProducerWithVersion(version sarama.KafkaVersion) SyncProducerOption