kafka

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2024 License: GPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ASyncProducer

type ASyncProducer struct {
	// contains filtered or unexported fields
}

func (*ASyncProducer) Close

func (kafkaProducer *ASyncProducer) Close()

type ConsumerHandler

type ConsumerHandler interface {
	// SetupHook is run at the beginning of a new session, before ConsumeClaim.
	SetupHook()
	// CleanUpHook is run at the end of a session, once all ConsumeClaim goroutines have exited
	// but before the offsets are committed for the very last time
	CleanUpHook()
	// HandleMsg 这里做了转换,仅需要写单条消息的处理逻辑即可。循环和MarkMessage已经做了处理,不需要再考虑
	// 内部处理错误
	HandleMsg(msg *sarama.ConsumerMessage) error
}

ConsumerHandler 需要实现这个接口

type ConsumerSetting

type ConsumerSetting struct {
	Name    string   `mapstructure:"name"` //连接名字
	Hosts   []string `mapstructure:"hosts"`
	Topic   string   `mapstructure:"topic"`
	GroupId string   `mapstructure:"group_id"`
	// Offset 值只能为-1或-2,-1代表无偏移量时从最新位置开始消费,-2代表无偏移量时从最老的位置开始消费
	Offset int64 `mapstructure:"offset"`
	// AutoCommit 默认自动提交
	AutoCommit        bool `mapstructure:"auto_commit"`
	MaxRetry          int  `mapstructure:"max_retry"`
	MaxRetryHandleMsg int  `mapstructure:"max_retry_handle_msg"`
	ReturnError       bool `mapstructure:"return_error"`
	ErrorCallback     func(err error)
}

type GroupConsumer

type GroupConsumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(setting ConsumerSetting) (*GroupConsumer, error)

func (*GroupConsumer) Close

func (consumer *GroupConsumer) Close()

func (*GroupConsumer) StartGroupConsume

func (consumer *GroupConsumer) StartGroupConsume(handler ConsumerHandler) error

StartGroupConsume 阻塞执行

type ProducerInterface

type ProducerInterface interface {
	SendMsg(topic string, key string, value string) error
	Close()
}

type ProducerSetting

type ProducerSetting struct {
	Name  string   `mapstructure:"name"` //连接名字
	Hosts []string `mapstructure:"hosts"`
	Topic string   `mapstructure:"topic"`
	// ReturnSuccess 同步生产时,ReturnSuccess必须指定为true。异步生产时,指定为true后,必须处理success,否则阻塞
	ReturnSuccess bool `mapstructure:"return_success"`
	// ReturnError 同步生产时,ReturnError默认为true。异步生产时,指定为true后,必须处理error,否则阻塞
	ReturnError bool `mapstructure:"return_error"`
	// 默认的最大发送消息大小是1MB=defaults to 1000000字节
	MaxMessageBytes int `mapstructure:"max_message_bytes"`
	// ack确认机制,默认是1,即WaitForLocal
	RequiredAcks int `mapstructure:"required_acks"`
	// socket的超时时间,ms毫秒
	Timeout time.Duration `mapstructure:"timeout"`
	// 最大重试次数
	MaxRetry int `mapstructure:"max_retry"`
	// 异步生产时的,失败的callback。可以是mon打点,可以是日志等,
	ErrorCallback func(err error)
}

ProducerSetting 一些未设置的参数: 读写连接时间默认都是30s,默认消息不会被压缩,默认使用hash key选择分区,

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

func NewSyncProducer

func NewSyncProducer(producerSetting ProducerSetting) (*SyncProducer, error)

NewSyncProducer 同步生产者

func (*SyncProducer) Close

func (kafkaProducer *SyncProducer) Close()

Close 异步生产时,程序运行结束后必须调用Close方法,否则缓冲区的未发送的数据有可能被清除

func (*SyncProducer) SendMsg

func (kafkaProducer *SyncProducer) SendMsg(topic string, key string, value string) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL