kafka

package
v3.6.1-beta.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetContextWithMQHeader

func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context

GetContextWithMQHeader creates a context from message queue headers.

func GetMQHeaderWithContext

func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)

GetMQHeaderWithContext extracts message queue headers from the context.

func SetupTLSConfig

func SetupTLSConfig(cfg *sarama.Config, tlsConfig *TLSConfig) error

SetupTLSConfig set up the TLS config from config file.

Types

type Consumer

type Consumer struct {
	WG            sync.WaitGroup
	Topic         string
	PartitionList []int32
	Consumer      sarama.Consumer
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(addr []string, topic string, config *config.GlobalConfig) (*Consumer, error)

type MConsumerGroup

type MConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func NewMConsumerGroup

func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []string, groupID string, tlsConfig *TLSConfig) (*MConsumerGroup, error)

func (*MConsumerGroup) Close added in v3.6.0

func (mc *MConsumerGroup) Close() error

func (*MConsumerGroup) GetContextFromMsg

func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context

func (*MConsumerGroup) RegisterHandleAndConsumer

func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler)

type MConsumerGroupConfig

type MConsumerGroupConfig struct {
	KafkaVersion   sarama.KafkaVersion
	OffsetsInitial int64
	IsReturnErr    bool
	UserName       string
	Password       string
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a Kafka producer.

func NewKafkaProducer

func NewKafkaProducer(addr []string, topic string, producerConfig *ProducerConfig, tlsConfig *TLSConfig) (*Producer, error)

NewKafkaProducer initializes a new Kafka producer.

func (*Producer) SendMessage

func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error)

SendMessage sends a message to the Kafka topic configured in the Producer.

type ProducerConfig added in v3.6.0

type ProducerConfig struct {
	ProducerAck  string
	CompressType string
	Username     string
	Password     string
}

type TLSConfig added in v3.6.0

type TLSConfig struct {
	CACrt              string
	ClientCrt          string
	ClientKey          string
	ClientKeyPwd       string
	InsecureSkipVerify bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL