gokafka

package
v2.1.36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2024 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EveryPartitionLastMessage sync.Map

Functions

func CloseConsumer

func CloseConsumer() error

CloseConsumer close the consumer

func CommitOffsetForAllPartition

func CommitOffsetForAllPartition(onCommit func(message kafka.Message)) error

CommitOffsetForAllPartition commit offset for all partitions

func Consume

func Consume() (kafka.Message, error)

Consume consume message

func ConsumeByCallback

func ConsumeByCallback(consume func(kafka.Message, error) bool)

func NewConsumer

func NewConsumer()

NewConsumer construct the consumer

Types

type PC

type PC struct {
	// contains filtered or unexported fields
}

func (*PC) BlockingQueueSize

func (pc *PC) BlockingQueueSize() int

BlockingQueueSize 获取当前队列堆积数量

func (*PC) Cancel

func (pc *PC) Cancel() bool

Cancel the produce, blocking until the kafka cache buffer is successfully refreshed

func (*PC) Init

func (pc *PC) Init(capacity int, batchSize int, batchTimeOut time.Duration)

Init 设置阻塞队列的容量,批次大小,批次超时时间 capacity 阻塞队列的容量,建议这个值设置为 batchSize 的 4 倍 batchSize 批次大小 batchTimeOut 批次超时时间

func (*PC) Produce

func (pc *PC) Produce(object interface{})

Produce 向阻塞队列中生产消息,当阻塞队列已经满时,会阻塞

func (*PC) Subscribe

func (pc *PC) Subscribe(mapTo func(interface{}) kafka.Message, consume func([]kafka.Message))

Subscribe 消费阻塞队列中的数据 mapTo 由于向队列中写数据是一个 interface{} 对象,你需要在这个回调中实现你自己的序列方式,将其传换成 kafka.Message 对象 consume 每当有可消费的批次时,该方法就会被回调,你可以在这里实现你自己的消费逻辑,通常这里使用gokafka.Producer.ProduceMsgs 并将消息阻塞的写入kafka 下面是一个使用例子

pc.Subscribe(func(x interface{}) kafka.Message {
	return x.(kafka.Message)
}, func(messages []kafka.Message) {
	err := producer.ProduceMsgs(messages)
	if err != nil {
		log.Error(err)
	} else {
		fmt.Printf("Success write `%d` to kafka\n", len(messages))
	}
})

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func Init

func Init(topic string) *Producer

func LoadProducerByTopic

func LoadProducerByTopic(topic string) *Producer

func (*Producer) CloseProducer

func (p *Producer) CloseProducer() error

CloseProducer close the producer

func (*Producer) Produce

func (p *Producer) Produce(msg []byte) error

Produce produce message

func (*Producer) ProduceMsgs

func (p *Producer) ProduceMsgs(msgs []kafka.Message) error

func (*Producer) ProduceWithKey

func (p *Producer) ProduceWithKey(key []byte, value []byte) error

ProduceWithKey produce message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL