kafka

package
v1.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2024 License: MIT Imports: 9 Imported by: 0

README

kafka

kafka is a kafka client library based on sarama encapsulation, producer supports synchronous and asynchronous production messages, consumer supports group and partition consumption messages, fully compatible with the usage of sarama.


Example of use

Producer

Synchronous Produce
package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"github.com/zhufuyi/pkg/kafka"
)

func main() {
	testTopic := "my-topic"
	addrs := []string{"localhost:9092"}
	// default config are requiredAcks=WaitForAll, partitionerConstructor=NewHashPartitioner, returnSuccesses=true
	p, err := kafka.InitSyncProducer(addrs, kafka.SyncProducerWithVersion(sarama.V3_6_0_0))
	if err != nil {
		fmt.Println(err)
		return
	}
	defer p.Close()

	// Case 1: send sarama.ProducerMessage type message
	msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/producer_test.go#L18
	partition, offset, err := p.SendMessage(msg)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println("partition:", partition, "offset:", offset)

	// Case 2: send multiple types  message
	for _, data := range testData {
		partition, offset, err := p.SendData(testTopic, data)
		if err != nil {
			fmt.Println(err)
			return
		}
		fmt.Println("partition:", partition, "offset:", offset)
	}
}

Asynchronous Produce

package main

import (
	"fmt"
	"time"
	"github.com/IBM/sarama"
	"github.com/zhufuyi/pkg/kafka"
)

func main() {
	testTopic := "my-topic"
	addrs := []string{"localhost:9092"}

	p, err := kafka.InitAsyncProducer(addrs,
		kafka.AsyncProducerWithVersion(sarama.V3_6_0_0),
		kafka.AsyncProducerWithRequiredAcks(sarama.WaitForLocal),
		kafka.AsyncProducerWithFlushMessages(50),
		kafka.AsyncProducerWithFlushFrequency(time.milliseconds*500),
	)
	if err != nil {
		fmt.Println(err)
		return
	}
	defer p.Close()

	// Case 1: send sarama.ProducerMessage type message, supports multiple messages
	msg := testData[0].(*sarama.ProducerMessage) // testData is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/producer_test.go#L18
	err = p.SendMessage(msg, msg)
	if err != nil {
		fmt.Println(err)
		return
	}

	// Case 2: send multiple types  message, supports multiple messages
	err = p.SendData(testTopic, testData...)
	if err != nil {
		fmt.Println(err)
		return
	}

	<-time.After(time.Second) // wait for all messages to be sent
}

Consumer

Consume Group
package main

import (
	"fmt"
	"time"
	"github.com/IBM/sarama"
	"github.com/zhufuyi/pkg/kafka"
)

func main() {
	testTopic := "my-topic"
	groupID := "my-group"
	addrs := []string{"localhost:9092"}

	// default config are offsetsInitial=OffsetOldest, autoCommitEnable=true, autoCommitInterval=time.Second
	cg, err := kafka.InitConsumerGroup(addrs, groupID, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
	if err != nil {
		fmt.Println(err)
		return
	}
	defer cg.Close()

	// Case 1: consume default handle message
	go cg.Consume(context.Background(), []string{testTopic}, handleMsgFn) // handleMsgFn is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/consumer_test.go#L19

	// Case 2: consume custom handle message
	go cg.ConsumeCustom(context.Background(), []string{testTopic}, &myConsumerGroupHandler{ // myConsumerGroupHandler is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/consumer_test.go#L26
		autoCommitEnable: cg.autoCommitEnable,
	})

	<-time.After(time.Minute) // wait exit
}

Consume Partition
package main

import (
	"fmt"
	"github.com/IBM/sarama"
	"github.com/zhufuyi/pkg/kafka"
	"time"
)

func main() {
	testTopic := "my-topic"
	addrs := []string{"localhost:9092"}

	c, err := kafka.InitConsumer(addrs, kafka.ConsumerWithVersion(sarama.V3_6_0_0))
	if err != nil {
		fmt.Println(err)
		return
	}
	defer c.Close()

	// Case 1: consume one partition
	go c.ConsumePartition(context.Background(), testTopic, 0, sarama.OffsetNewest, handleMsgFn) // // handleMsgFn is https://github.com/zhufuyi/sponge/blob/main/pkg/kafka/consumer_test.go#L19

	// Case 2: consume all partition
	c.ConsumeAllPartition(context.Background(), testTopic, sarama.OffsetNewest, handleMsgFn)

	<-time.After(time.Minute) // wait exit
}

Topic Backlog

Obtain the total backlog of the topic and the backlog of each partition.

package main

import (    
	"fmt"
	"github.com/zhufuyi/pkg/kafka"    
)

func main() {
	m, err := kafka.InitClientManager(brokerList, groupID)
	if err != nil {
		panic(err)
	}
	defer m.Close()

	total, backlogs, err := m.GetBacklog(topic)
	if err != nil {
		panic(err)
	}

	fmt.Println("total backlog:", total)
	for _, backlog := range backlogs {
		fmt.Printf("partation=%d, backlog=%d, next_consume_offset=%d\n", backlog.Partition, backlog.Backlog, backlog.NextConsumeOffset)
	}
}

Documentation

Overview

Package kafka is a kafka client package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncProducer

type AsyncProducer struct {
	Producer sarama.AsyncProducer
	// contains filtered or unexported fields
}

AsyncProducer is async producer.

func InitAsyncProducer

func InitAsyncProducer(addrs []string, opts ...AsyncProducerOption) (*AsyncProducer, error)

InitAsyncProducer init async producer.

func (*AsyncProducer) Close

func (p *AsyncProducer) Close() error

Close closes the producer.

func (*AsyncProducer) SendData

func (p *AsyncProducer) SendData(topic string, multiData ...interface{}) error

SendData sends messages to a topic with multiple types of data.

func (*AsyncProducer) SendMessage

func (p *AsyncProducer) SendMessage(messages ...*sarama.ProducerMessage) error

SendMessage sends messages to a topic.

type AsyncProducerOption

type AsyncProducerOption func(*asyncProducerOptions)

AsyncProducerOption set options.

func AsyncProducerWithClientID

func AsyncProducerWithClientID(clientID string) AsyncProducerOption

AsyncProducerWithClientID set clientID.

func AsyncProducerWithConfig

func AsyncProducerWithConfig(config *sarama.Config) AsyncProducerOption

AsyncProducerWithConfig set custom config.

func AsyncProducerWithFlushBytes

func AsyncProducerWithFlushBytes(flushBytes int) AsyncProducerOption

AsyncProducerWithFlushBytes set flushBytes.

func AsyncProducerWithFlushFrequency

func AsyncProducerWithFlushFrequency(flushFrequency time.Duration) AsyncProducerOption

AsyncProducerWithFlushFrequency set flushFrequency.

func AsyncProducerWithFlushMessages

func AsyncProducerWithFlushMessages(flushMessages int) AsyncProducerOption

AsyncProducerWithFlushMessages set flushMessages.

func AsyncProducerWithHandleFailed

func AsyncProducerWithHandleFailed(handleFailedFn AsyncSendFailedHandlerFn) AsyncProducerOption

AsyncProducerWithHandleFailed set handleFailedFn.

func AsyncProducerWithPartitioner

func AsyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) AsyncProducerOption

AsyncProducerWithPartitioner set partitioner.

func AsyncProducerWithRequiredAcks

func AsyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) AsyncProducerOption

AsyncProducerWithRequiredAcks set requiredAcks.

func AsyncProducerWithReturnSuccesses

func AsyncProducerWithReturnSuccesses(returnSuccesses bool) AsyncProducerOption

AsyncProducerWithReturnSuccesses set returnSuccesses.

func AsyncProducerWithTLS

func AsyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) AsyncProducerOption

AsyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.

func AsyncProducerWithVersion

func AsyncProducerWithVersion(version sarama.KafkaVersion) AsyncProducerOption

AsyncProducerWithVersion set kafka version.

func AsyncProducerWithZapLogger

func AsyncProducerWithZapLogger(zapLogger *zap.Logger) AsyncProducerOption

AsyncProducerWithZapLogger set zapLogger.

type AsyncSendFailedHandlerFn

type AsyncSendFailedHandlerFn func(msg *sarama.ProducerMessage) error

AsyncSendFailedHandlerFn is a function that handles failed messages.

type Backlog

type Backlog struct {
	Partition         int32 `json:"partition"`  // partition id
	Backlog           int64 `json:"backlog"`    // data backlog
	NextConsumeOffset int64 `json:"nextOffset"` // offset for next consumption
}

Backlog info

type ClientManager

type ClientManager struct {
	// contains filtered or unexported fields
}

ClientManager client manager

func InitClientManager

func InitClientManager(addrs []string, groupID string) (*ClientManager, error)

InitClientManager init client manager

func (*ClientManager) Close

func (m *ClientManager) Close() error

Close topic backlog

func (*ClientManager) GetBacklog

func (m *ClientManager) GetBacklog(topic string) (int64, []*Backlog, error)

GetBacklog get topic backlog

type Consumer

type Consumer struct {
	C sarama.Consumer
	// contains filtered or unexported fields
}

Consumer consume partition

func InitConsumer

func InitConsumer(addrs []string, opts ...ConsumerOption) (*Consumer, error)

InitConsumer init consumer

func (*Consumer) Close

func (c *Consumer) Close() error

Close the consumer

func (*Consumer) ConsumeAllPartition

func (c *Consumer) ConsumeAllPartition(ctx context.Context, topic string, offset int64, handleFn HandleMessageFn)

ConsumeAllPartition consumer all partitions, no blocking

func (*Consumer) ConsumePartition

func (c *Consumer) ConsumePartition(ctx context.Context, topic string, partition int32, offset int64, handleFn HandleMessageFn)

ConsumePartition consumer one partition, blocking

type ConsumerGroup

type ConsumerGroup struct {
	Group sarama.ConsumerGroup
	// contains filtered or unexported fields
}

ConsumerGroup consume group

func InitConsumerGroup

func InitConsumerGroup(addrs []string, groupID string, opts ...ConsumerOption) (*ConsumerGroup, error)

InitConsumerGroup init consumer group

func (*ConsumerGroup) Close

func (c *ConsumerGroup) Close() error

func (*ConsumerGroup) Consume

func (c *ConsumerGroup) Consume(ctx context.Context, topics []string, handleMessageFn HandleMessageFn) error

Consume consume messages

func (*ConsumerGroup) ConsumeCustom

func (c *ConsumerGroup) ConsumeCustom(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error

ConsumeCustom consume messages for custom handler, you need to implement the sarama.ConsumerGroupHandler interface

type ConsumerOption

type ConsumerOption func(*consumerOptions)

ConsumerOption set options.

func ConsumerWithClientID

func ConsumerWithClientID(clientID string) ConsumerOption

ConsumerWithClientID set clientID.

func ConsumerWithConfig

func ConsumerWithConfig(config *sarama.Config) ConsumerOption

ConsumerWithConfig set custom config.

func ConsumerWithGroupStrategies

func ConsumerWithGroupStrategies(groupStrategies ...sarama.BalanceStrategy) ConsumerOption

ConsumerWithGroupStrategies set groupStrategies.

func ConsumerWithOffsetsAutoCommitEnable

func ConsumerWithOffsetsAutoCommitEnable(offsetsAutoCommitEnable bool) ConsumerOption

ConsumerWithOffsetsAutoCommitEnable set offsetsAutoCommitEnable.

func ConsumerWithOffsetsAutoCommitInterval

func ConsumerWithOffsetsAutoCommitInterval(offsetsAutoCommitInterval time.Duration) ConsumerOption

ConsumerWithOffsetsAutoCommitInterval set offsetsAutoCommitInterval.

func ConsumerWithOffsetsInitial

func ConsumerWithOffsetsInitial(offsetsInitial int64) ConsumerOption

ConsumerWithOffsetsInitial set offsetsInitial.

func ConsumerWithTLS

func ConsumerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) ConsumerOption

ConsumerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.

func ConsumerWithVersion

func ConsumerWithVersion(version sarama.KafkaVersion) ConsumerOption

ConsumerWithVersion set kafka version.

func ConsumerWithZapLogger

func ConsumerWithZapLogger(zapLogger *zap.Logger) ConsumerOption

ConsumerWithZapLogger set zapLogger.

type HandleMessageFn

type HandleMessageFn func(msg *sarama.ConsumerMessage) error

HandleMessageFn is a function that handles a message from a partition consumer

type Message

type Message struct {
	Topic string `json:"topic"`
	Data  []byte `json:"data"`
	Key   []byte `json:"key"`
}

Message is a message to be sent to a topic.

type ProducerMessage

type ProducerMessage = sarama.ProducerMessage

ProducerMessage is sarama ProducerMessage

type SyncProducer

type SyncProducer struct {
	Producer sarama.SyncProducer
}

SyncProducer is a sync producer.

func InitSyncProducer

func InitSyncProducer(addrs []string, opts ...SyncProducerOption) (*SyncProducer, error)

InitSyncProducer init sync producer.

func (*SyncProducer) Close

func (p *SyncProducer) Close() error

Close closes the producer.

func (*SyncProducer) SendData

func (p *SyncProducer) SendData(topic string, data interface{}) (int32, int64, error)

SendData sends a message to a topic with multiple types of data.

func (*SyncProducer) SendMessage

func (p *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (int32, int64, error)

SendMessage sends a message to a topic.

type SyncProducerOption

type SyncProducerOption func(*syncProducerOptions)

SyncProducerOption set options.

func SyncProducerWithClientID

func SyncProducerWithClientID(clientID string) SyncProducerOption

SyncProducerWithClientID set clientID.

func SyncProducerWithConfig

func SyncProducerWithConfig(config *sarama.Config) SyncProducerOption

SyncProducerWithConfig set custom config.

func SyncProducerWithPartitioner

func SyncProducerWithPartitioner(partitioner sarama.PartitionerConstructor) SyncProducerOption

SyncProducerWithPartitioner set partitioner.

func SyncProducerWithRequiredAcks

func SyncProducerWithRequiredAcks(requiredAcks sarama.RequiredAcks) SyncProducerOption

SyncProducerWithRequiredAcks set requiredAcks.

func SyncProducerWithReturnSuccesses

func SyncProducerWithReturnSuccesses(returnSuccesses bool) SyncProducerOption

SyncProducerWithReturnSuccesses set returnSuccesses.

func SyncProducerWithTLS

func SyncProducerWithTLS(certFile, keyFile, caFile string, isSkipVerify bool) SyncProducerOption

SyncProducerWithTLS set tlsConfig, if isSkipVerify is true, crypto/tls accepts any certificate presented by the server and any host name in that certificate.

func SyncProducerWithVersion

func SyncProducerWithVersion(version sarama.KafkaVersion) SyncProducerOption

SyncProducerWithVersion set kafka version.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL