kafka

package
v0.0.50-alpha.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2024 License: Apache-2.0 Imports: 15 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildConsumerGroupConfig

func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error)

func BuildProducerConfig

func BuildProducerConfig(conf Config) (*sarama.Config, error)

func Check added in v0.0.48

func Check(ctx context.Context, conf *Config, topics []string) error

func GetContextWithMQHeader added in v0.0.48

func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context

GetContextWithMQHeader creates a context from message queue headers.

func GetMQHeaderWithContext added in v0.0.48

func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)

GetMQHeaderWithContext extracts message queue headers from the context.

func NewConsumerGroup

func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error)

func NewProducer

func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error)

Types

type Config

type Config struct {
	Username     string    `yaml:"username"`
	Password     string    `yaml:"password"`
	ProducerAck  string    `yaml:"producerAck"`
	CompressType string    `yaml:"compressType"`
	Addr         []string  `yaml:"addr"`
	TLS          TLSConfig `yaml:"tls"`
}

type MConsumerGroup added in v0.0.48

type MConsumerGroup struct {
	sarama.ConsumerGroup
	// contains filtered or unexported fields
}

func NewMConsumerGroup added in v0.0.48

func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error)

func (*MConsumerGroup) Close added in v0.0.48

func (mc *MConsumerGroup) Close() error

func (*MConsumerGroup) GetContextFromMsg added in v0.0.48

func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context

func (*MConsumerGroup) RegisterHandleAndConsumer added in v0.0.48

func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler)

type Producer added in v0.0.48

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents a Kafka producer.

func NewKafkaProducer added in v0.0.48

func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error)

func (*Producer) SendMessage added in v0.0.48

func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error)

SendMessage sends a message to the Kafka topic configured in the Producer.

type TLSConfig

type TLSConfig struct {
	EnableTLS          bool   `yaml:"enableTLS"`
	CACrt              string `yaml:"caCrt"`
	ClientCrt          string `yaml:"clientCrt"`
	ClientKey          string `yaml:"clientKey"`
	ClientKeyPwd       string `yaml:"clientKeyPwd"`
	InsecureSkipVerify bool   `yaml:"insecureSkipVerify"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL