Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var EveryPartitionLastMessage sync.Map
Functions ¶
func CommitOffsetForAllPartition ¶
func CommitOffsetForAllPartition(onCommit func(message kafka.Message)) error
commit offset for all partitions
func ConsumeByCallback ¶
Types ¶
type PC ¶
type PC struct {
// contains filtered or unexported fields
}
func (*PC) Cancel ¶
cancel the produce, blocking until the kafka cache buffer is successfully refreshed
func (*PC) Init ¶
设置阻塞队列的容量,批次大小,批次超时时间 capacity 阻塞队列的容量,建议这个值设置为 batchSize 的 4 倍 batchSize 批次大小 batchTimeOut 批次超时时间
func (*PC) Subscribe ¶
func (pc *PC) Subscribe(mapTo func(interface{}) kafka.Message, consume func([]kafka.Message))
消费阻塞队列中的数据 mapTo 由于向队列中写数据是一个 interface{} 对象,你需要在这个回调中实现你自己的序列方式,将其传换成 kafka.Message 对象 consume 每当有可消费的批次时,该方法就会被回调,你可以在这里实现你自己的消费逻辑,通常这里使用gokafka.Producer.ProduceMsgs 并将消息阻塞的写入kafka 下面是一个使用例子
pc.Subscribe(func(x interface{}) kafka.Message { return x.(kafka.Message) }, func(messages []kafka.Message) { err := producer.ProduceMsgs(messages) if err != nil { log.Error(err) } else { fmt.Printf("Success write `%d` to kafka\n", len(messages)) } })
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func LoadProducerByTopic ¶
func (*Producer) ProduceMsgs ¶
Click to show internal directories.
Click to hide internal directories.