kafka

package
v3.5.1-bate.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetContextWithMQHeader

func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context

GetContextWithMQHeader creates a context from message queue headers.

func GetMQHeaderWithContext

func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)

GetMQHeaderWithContext extracts message queue headers from the context.

func SetupTLSConfig

func SetupTLSConfig(cfg *sarama.Config)

SetupTLSConfig set up the TLS config from config file.

Types

type Consumer

type Consumer struct {
	WG            sync.WaitGroup
	Topic         string
	PartitionList []int32
	Consumer      sarama.Consumer
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(addr []string, topic string) *Consumer

type MConsumerGroup

type MConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func NewMConsumerGroup

func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string) *MConsumerGroup

func (*MConsumerGroup) GetContextFromMsg

func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context

func (*MConsumerGroup) RegisterHandleAndConsumer

func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler)

type MConsumerGroupConfig

type MConsumerGroupConfig struct {
	KafkaVersion   sarama.KafkaVersion
	OffsetsInitial int64
	IsReturnErr    bool
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a Kafka producer.

func NewKafkaProducer

func NewKafkaProducer(addr []string, topic string) *Producer

NewKafkaProducer initializes a new Kafka producer.

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error)

SendMessage sends a message to the Kafka topic configured in the Producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL