xkafka

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2023 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultConsumer

type DefaultConsumer struct{}

func (*DefaultConsumer) Cleanup

func (consumer *DefaultConsumer) Cleanup(sarama.ConsumerGroupSession) error

func (*DefaultConsumer) ConsumeClaim

func (consumer *DefaultConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

func (*DefaultConsumer) Setup

type Kafka

type Kafka struct {
	Client        sarama.Client
	Admin         sarama.ClusterAdmin
	SyncProducer  sarama.SyncProducer
	AsyncProducer sarama.AsyncProducer
	OffsetManager sarama.OffsetManager
	GroupMap      sync.Map
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, confStr string) (*Kafka, error)

func (*Kafka) Close

func (k *Kafka) Close()

Close 关闭连接

func (*Kafka) ConsumerGroup

func (k *Kafka) ConsumerGroup(groupID string, topics string, handler sarama.ConsumerGroupHandler) error

ConsumerGroup 消费者组 分布式环境下、需注意分区数量与服务数量的关系

func (*Kafka) Consumes

func (k *Kafka) Consumes(topic string, handle func(*sarama.ConsumerMessage)) error

Consumes 消费者 为topic每个partition启动一个协程处理 用于分布式环境时,需注意消息重复消费

func (*Kafka) SendSyncMessage

func (k *Kafka) SendSyncMessage(message *sarama.ProducerMessage) error

SendSyncMessage 发送同步消息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL