Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultConsumer ¶
type DefaultConsumer struct{}
func (*DefaultConsumer) Cleanup ¶
func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error
func (*DefaultConsumer) ConsumeClaim ¶
func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*DefaultConsumer) Setup ¶
func (consumer *DefaultConsumer) Setup(sarama.ConsumerGroupSession) error
type Kafka ¶
type Kafka struct { Client sarama.Client Admin sarama.ClusterAdmin SyncProducer sarama.SyncProducer AsyncProducer sarama.AsyncProducer OffsetManager sarama.OffsetManager GroupMap sync.Map // contains filtered or unexported fields }
func (*Kafka) ConsumerGroup ¶
func (k *Kafka) ConsumerGroup(groupID string, topics string, handler sarama.ConsumerGroupHandler) error
ConsumerGroup 消费者组 分布式环境下、需注意分区数量与服务数量的关系
func (*Kafka) Consumes ¶
func (k *Kafka) Consumes(topic string, handle func(*sarama.ConsumerMessage)) error
Consumes 消费者 为topic每个partition启动一个协程处理 用于分布式环境时,需注意消息重复消费
func (*Kafka) SendSyncMessage ¶
func (k *Kafka) SendSyncMessage(message *sarama.ProducerMessage) error
SendSyncMessage 发送同步消息
Click to show internal directories.
Click to hide internal directories.