kafka

package
v3.3.3-rc.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetContextWithMQHeader

func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context

func GetMQHeaderWithContext

func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)

func SetupTLSConfig

func SetupTLSConfig(cfg *sarama.Config)

SetupTLSConfig set up the TLS config from config file.

Types

type Consumer

type Consumer struct {
	WG            sync.WaitGroup
	Topic         string
	PartitionList []int32
	Consumer      sarama.Consumer
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(addr []string, topic string) *Consumer

type MConsumerGroup

type MConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func NewMConsumerGroup

func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup

func (*MConsumerGroup) GetContextFromMsg

func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context

func (*MConsumerGroup) RegisterHandleAndConsumer

func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler)

type MConsumerGroupConfig

type MConsumerGroupConfig struct {
	KafkaVersion   sarama.KafkaVersion
	OffsetsInitial int64
	IsReturnErr    bool
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(addr []string, topic string) *Producer

NewKafkaProducer Initialize kafka producer.

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL