Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ASyncProducer ¶
type ASyncProducer struct {
// contains filtered or unexported fields
}
func (*ASyncProducer) Close ¶
func (kafkaProducer *ASyncProducer) Close()
type ConsumerHandler ¶
type ConsumerHandler interface { // SetupHook is run at the beginning of a new session, before ConsumeClaim. SetupHook() // CleanUpHook is run at the end of a session, once all ConsumeClaim goroutines have exited // but before the offsets are committed for the very last time CleanUpHook() // HandleMsg 这里做了转换,仅需要写单条消息的处理逻辑即可。循环和MarkMessage已经做了处理,不需要再考虑 // 内部处理错误 HandleMsg(msg *sarama.ConsumerMessage) error }
ConsumerHandler 需要实现这个接口
type ConsumerSetting ¶
type ConsumerSetting struct { Name string `mapstructure:"name"` //连接名字 Hosts []string `mapstructure:"hosts"` Topic string `mapstructure:"topic"` GroupId string `mapstructure:"group_id"` // Offset 值只能为-1或-2,-1代表无偏移量时从最新位置开始消费,-2代表无偏移量时从最老的位置开始消费 Offset int64 `mapstructure:"offset"` // AutoCommit 默认自动提交 AutoCommit bool `mapstructure:"auto_commit"` MaxRetry int `mapstructure:"max_retry"` MaxRetryHandleMsg int `mapstructure:"max_retry_handle_msg"` ReturnError bool `mapstructure:"return_error"` ErrorCallback func(err error) }
type GroupConsumer ¶
type GroupConsumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(setting ConsumerSetting) (*GroupConsumer, error)
func (*GroupConsumer) Close ¶
func (consumer *GroupConsumer) Close()
func (*GroupConsumer) StartGroupConsume ¶
func (consumer *GroupConsumer) StartGroupConsume(handler ConsumerHandler) error
StartGroupConsume 阻塞执行
type ProducerInterface ¶
type ProducerSetting ¶
type ProducerSetting struct { Name string `mapstructure:"name"` //连接名字 Hosts []string `mapstructure:"hosts"` Topic string `mapstructure:"topic"` // ReturnSuccess 同步生产时,ReturnSuccess必须指定为true。异步生产时,指定为true后,必须处理success,否则阻塞 ReturnSuccess bool `mapstructure:"return_success"` // ReturnError 同步生产时,ReturnError默认为true。异步生产时,指定为true后,必须处理error,否则阻塞 ReturnError bool `mapstructure:"return_error"` // 默认的最大发送消息大小是1MB=defaults to 1000000字节 MaxMessageBytes int `mapstructure:"max_message_bytes"` // ack确认机制,默认是1,即WaitForLocal RequiredAcks int `mapstructure:"required_acks"` // socket的超时时间,ms毫秒 Timeout time.Duration `mapstructure:"timeout"` // 最大重试次数 MaxRetry int `mapstructure:"max_retry"` // 异步生产时的,失败的callback。可以是mon打点,可以是日志等, ErrorCallback func(err error) }
ProducerSetting 一些未设置的参数: 读写连接时间默认都是30s,默认消息不会被压缩,默认使用hash key选择分区,
type SyncProducer ¶
type SyncProducer struct {
// contains filtered or unexported fields
}
func NewSyncProducer ¶
func NewSyncProducer(producerSetting ProducerSetting) (*SyncProducer, error)
NewSyncProducer 同步生产者
func (*SyncProducer) Close ¶
func (kafkaProducer *SyncProducer) Close()
Close 异步生产时,程序运行结束后必须调用Close方法,否则缓冲区的未发送的数据有可能被清除
Click to show internal directories.
Click to hide internal directories.