Documentation
¶
Index ¶
- Constants
- Variables
- func NewFactoryConsumer() *factoryConsumer
- func NewFactoryConsumerBackGround() *factoryConsumer
- func NewFactoryProduct() *factoryProduct
- type AckLevel
- type BalanceType
- type BuildConsumer
- func (this *BuildConsumer) GetAddr() string
- func (this *BuildConsumer) GetBalanceType() BalanceType
- func (this *BuildConsumer) GetConfig() *sarama.Config
- func (this *BuildConsumer) GetGroupId() string
- func (this *BuildConsumer) GetKafkaVersion() string
- func (this *BuildConsumer) GetResponseListener() ConsumerResponseListener
- func (this *BuildConsumer) GetTopic() string
- func (this *BuildConsumer) IsAutoCommit() bool
- func (this *BuildConsumer) IsDebug() bool
- func (this *BuildConsumer) IsMultiplePartition() bool
- func (this *BuildConsumer) IsOldOffset() bool
- func (this *BuildConsumer) SetBalanceType(balanceType BalanceType) BuildConsumerApi
- func (this *BuildConsumer) SetConfig(config *sarama.Config) BuildConsumerApi
- func (this *BuildConsumer) SetDebug(isDebug bool) BuildConsumerApi
- func (this *BuildConsumer) SetIsAutoCommit(autoCommit bool) BuildConsumerApi
- func (this *BuildConsumer) SetKafkaVersion(kafkaVersion string) BuildConsumerApi
- func (this *BuildConsumer) SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi
- func (this *BuildConsumer) SetResponseListener(responseListener ConsumerResponseListener) BuildConsumerApi
- func (this *BuildConsumer) ToString() string
- type BuildConsumerApi
- func NewBuildConsumer(addr string, groupId string, topic string) BuildConsumerApi
- func NewBuildConsumerStrategy(addr string, groupId string, topic string, balanceType BalanceType) BuildConsumerApi
- func NewNewBuildConsumerOffset(addr string, groupId string, topic string, balanceType BalanceType, ...) BuildConsumerApi
- type BuildProduct
- func (this *BuildProduct) GetAddr() string
- func (this *BuildProduct) GetAddrSlice() []string
- func (this *BuildProduct) GetConfig() *sarama.Config
- func (this *BuildProduct) GetConnStrategy() ProductBalanceType
- func (this *BuildProduct) GetHashCode() int64
- func (this *BuildProduct) GetHashCodeString() string
- func (this *BuildProduct) GetMaxConnection() int32
- func (this *BuildProduct) GetName() string
- func (this *BuildProduct) GetTransactional() bool
- func (this *BuildProduct) SetAckType(ackType ProductAckType) BuildProductApi
- func (this *BuildProduct) SetConfig(config *sarama.Config) BuildProductApi
- func (this *BuildProduct) SetDebug(isDebug bool) BuildProductApi
- func (this *BuildProduct) SetMaxConnection(maxConnection int32) BuildProductApi
- func (this *BuildProduct) SetTransactional(isTransactional bool) BuildProductApi
- func (this *BuildProduct) ToString() string
- type BuildProductApi
- type ConsumerMessageContext
- func (this *ConsumerMessageContext) GetBuilder() BuildConsumerApi
- func (this *ConsumerMessageContext) GetGroupId() string
- func (this *ConsumerMessageContext) GetMessage() []byte
- func (this *ConsumerMessageContext) GetMessageString() string
- func (this *ConsumerMessageContext) GetOffset() int64
- func (this *ConsumerMessageContext) GetPartition() int32
- func (this *ConsumerMessageContext) GetSession() *ConsumerSession
- func (this *ConsumerMessageContext) GetTimeStamp() time.Time
- func (this *ConsumerMessageContext) GetTopic() string
- func (this *ConsumerMessageContext) GetVal() *sarama.ConsumerMessage
- type ConsumerResponseListener
- type ConsumerSession
- type ProductAckType
- type ProductBalanceType
Constants ¶
View Source
const ( //默认最大连接数 DEFAULT_MAX_CONNECTION int32 = 5 //连接轮询 PRODUCT_CONN_STRATEGY_BALANCE_ROUNDROBIN ProductBalanceType = 0 //ack确认类型- 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还 没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据 PRODUCT_ACK_TYPE_NORNAL ProductAckType = 0 //等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么将会丢失数据 PRODUCT_ACK_TYPE_FOLLOWER ProductAckType = 1 //等待 broker 的 ack,partition 的 leader 和 follower (ISRL里的follower,不是全部的follower)全部落盘成功后才 返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复 //开启事务后可避免数据重复 PRODUCT_ACK_TYPE_ALL ProductAckType = -1 )
Variables ¶
View Source
var ( //--消费者错误--- //构建器不能为空 KAFKA_CONSUMER_ERROR_BUILD_EMPTY = errors.New("consumer build len can not be empty") //生产者错误 //构建器不能为空 KAFKA_PRODUCT_ERROR_BUILD_EMPTY = errors.New("product build len can not be empty") //构建器名称不能为空 KAFKA_PRODUCT_ERROR_BUILD_NAME_EMPTY = errors.New("product build name can not be empty") //连接不存在 KAFKA_PRODUCT_ERROR_BUILD_CLIENT_EMPTY = errors.New("product client empty") //客户端连接错误 KAFKA_PRODUCT_ERROR_BUILD_CLIENT_NIL = errors.New("product client nil") )
Functions ¶
func NewFactoryConsumer ¶
func NewFactoryConsumer() *factoryConsumer
func NewFactoryConsumerBackGround ¶
func NewFactoryConsumerBackGround() *factoryConsumer
Types ¶
type BalanceType ¶
type BalanceType int
消费策略
const ( //消费策略-轮询 默认 消费者连接池失败, 暂时只使用轮询 CONSUMER_BALANCE_STRATEGY_ROUNDROBIN BalanceType = 0 //消费策略-粘性 CONSUMER_BALANCE_STRATEGY_STICKY BalanceType = 1 //消费策略-随机 CONSUMER_BALANCE_STRATEGY_RANGE BalanceType = 2 )
type BuildConsumer ¶
type BuildConsumer struct {
// contains filtered or unexported fields
}
消费者构建
func (*BuildConsumer) GetAddr ¶
func (this *BuildConsumer) GetAddr() string
func (*BuildConsumer) GetBalanceType ¶
func (this *BuildConsumer) GetBalanceType() BalanceType
func (*BuildConsumer) GetConfig ¶
func (this *BuildConsumer) GetConfig() *sarama.Config
func (*BuildConsumer) GetGroupId ¶
func (this *BuildConsumer) GetGroupId() string
func (*BuildConsumer) GetKafkaVersion ¶
func (this *BuildConsumer) GetKafkaVersion() string
func (*BuildConsumer) GetResponseListener ¶
func (this *BuildConsumer) GetResponseListener() ConsumerResponseListener
消息返回事件监听
func (*BuildConsumer) GetTopic ¶
func (this *BuildConsumer) GetTopic() string
func (*BuildConsumer) IsAutoCommit ¶
func (this *BuildConsumer) IsAutoCommit() bool
func (*BuildConsumer) IsDebug ¶
func (this *BuildConsumer) IsDebug() bool
func (*BuildConsumer) IsMultiplePartition ¶ added in v1.0.21
func (this *BuildConsumer) IsMultiplePartition() bool
func (*BuildConsumer) IsOldOffset ¶
func (this *BuildConsumer) IsOldOffset() bool
func (*BuildConsumer) SetBalanceType ¶ added in v1.0.21
func (this *BuildConsumer) SetBalanceType(balanceType BalanceType) BuildConsumerApi
分区策略类型
func (*BuildConsumer) SetConfig ¶
func (this *BuildConsumer) SetConfig(config *sarama.Config) BuildConsumerApi
设置其它配置
func (*BuildConsumer) SetDebug ¶
func (this *BuildConsumer) SetDebug(isDebug bool) BuildConsumerApi
func (*BuildConsumer) SetIsAutoCommit ¶
func (this *BuildConsumer) SetIsAutoCommit(autoCommit bool) BuildConsumerApi
* 是否自动提交
func (*BuildConsumer) SetKafkaVersion ¶
func (this *BuildConsumer) SetKafkaVersion(kafkaVersion string) BuildConsumerApi
设置kafka版本号
func (*BuildConsumer) SetMultiplePartition ¶ added in v1.0.21
func (this *BuildConsumer) SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi
设置是否加载多个分区
func (*BuildConsumer) SetResponseListener ¶
func (this *BuildConsumer) SetResponseListener(responseListener ConsumerResponseListener) BuildConsumerApi
设置消息返回事件
func (*BuildConsumer) ToString ¶
func (this *BuildConsumer) ToString() string
type BuildConsumerApi ¶
type BuildConsumerApi interface { SetDebug(isDebug bool) BuildConsumerApi IsDebug() bool //设置kafka版本号 SetKafkaVersion(kafkaVersion string) BuildConsumerApi //设置其它配置 SetConfig(config *sarama.Config) BuildConsumerApi //是否分区策略类型 SetBalanceType(balanceType BalanceType) BuildConsumerApi //是否多个分区加载 IsMultiplePartition() bool /* 是否多个分区加载 消费者多个分区不保证消息稳定性, 但可以保证吞吐量 */ SetMultiplePartition(isMultiplePartition bool) BuildConsumerApi //是否自动提交 SetIsAutoCommit(autoCommit bool) BuildConsumerApi //设置消息返回事件 SetResponseListener(responseResult ConsumerResponseListener) BuildConsumerApi //返回消息事件监听 GetResponseListener() ConsumerResponseListener //获取broker地址 GetAddr() string //获取组id GetGroupId() string //获取主题 GetTopic() string //获取分区策略方式 GetBalanceType() BalanceType //是否启用加载历史消费 IsOldOffset() bool //是否自动提交 IsAutoCommit() bool //获取版本 GetKafkaVersion() string //获取原始配置 GetConfig() *sarama.Config //返回构建数据 ToString() string }
* 构建消费者接口
func NewBuildConsumer ¶
func NewBuildConsumer(addr string, groupId string, topic string) BuildConsumerApi
初始化消费者构建器
func NewBuildConsumerStrategy ¶
func NewBuildConsumerStrategy(addr string, groupId string, topic string, balanceType BalanceType) BuildConsumerApi
* 创建消费构建者[默认拉取历史消费]
func NewNewBuildConsumerOffset ¶
func NewNewBuildConsumerOffset(addr string, groupId string, topic string, balanceType BalanceType, oldOffset bool) BuildConsumerApi
* 创建消费构建者[判断是否拉取历史消费]
type BuildProduct ¶
type BuildProduct struct {
// contains filtered or unexported fields
}
* 生产者
func (*BuildProduct) GetAddr ¶
func (this *BuildProduct) GetAddr() string
func (*BuildProduct) GetAddrSlice ¶
func (this *BuildProduct) GetAddrSlice() []string
func (*BuildProduct) GetConfig ¶
func (this *BuildProduct) GetConfig() *sarama.Config
func (*BuildProduct) GetConnStrategy ¶
func (this *BuildProduct) GetConnStrategy() ProductBalanceType
func (*BuildProduct) GetHashCode ¶
func (this *BuildProduct) GetHashCode() int64
func (*BuildProduct) GetHashCodeString ¶
func (this *BuildProduct) GetHashCodeString() string
func (*BuildProduct) GetMaxConnection ¶
func (this *BuildProduct) GetMaxConnection() int32
func (*BuildProduct) GetName ¶
func (this *BuildProduct) GetName() string
func (*BuildProduct) GetTransactional ¶ added in v1.0.22
func (this *BuildProduct) GetTransactional() bool
func (*BuildProduct) SetAckType ¶
func (this *BuildProduct) SetAckType(ackType ProductAckType) BuildProductApi
func (*BuildProduct) SetConfig ¶
func (this *BuildProduct) SetConfig(config *sarama.Config) BuildProductApi
func (*BuildProduct) SetDebug ¶
func (this *BuildProduct) SetDebug(isDebug bool) BuildProductApi
func (*BuildProduct) SetMaxConnection ¶
func (this *BuildProduct) SetMaxConnection(maxConnection int32) BuildProductApi
func (*BuildProduct) SetTransactional ¶
func (this *BuildProduct) SetTransactional(isTransactional bool) BuildProductApi
func (*BuildProduct) ToString ¶
func (this *BuildProduct) ToString() string
type BuildProductApi ¶
type BuildProductApi interface { //是否开启调试 SetDebug(isDebug bool) BuildProductApi //是否开启事务提交 SetTransactional(isTransactional bool) BuildProductApi GetTransactional() bool //设置确认方式 SetAckType(ackType ProductAckType) BuildProductApi //当前buildName GetName() string //生产者连接地址 GetAddr() string //获取连接地址数组 GetAddrSlice() []string //设置配置 SetConfig(config *sarama.Config) BuildProductApi //设置连接数 SetMaxConnection(maxConnection int32) BuildProductApi //获取连接策略模式 GetConnStrategy() ProductBalanceType //获取连接数 GetMaxConnection() int32 //获取配置 GetConfig() *sarama.Config //获取build hashcode GetHashCode() int64 GetHashCodeString() string //返回构建数据 ToString() string }
* 生产者构建接口
type ConsumerMessageContext ¶
type ConsumerMessageContext struct {
// contains filtered or unexported fields
}
消费消息上下文 groupId string, topic string, partition int32, offset int64, message []byte, timeStamp time.Time, consumerVal *sarama.ConsumerMessage
func (*ConsumerMessageContext) GetBuilder ¶
func (this *ConsumerMessageContext) GetBuilder() BuildConsumerApi
func (*ConsumerMessageContext) GetGroupId ¶
func (this *ConsumerMessageContext) GetGroupId() string
func (*ConsumerMessageContext) GetMessage ¶
func (this *ConsumerMessageContext) GetMessage() []byte
func (*ConsumerMessageContext) GetMessageString ¶
func (this *ConsumerMessageContext) GetMessageString() string
func (*ConsumerMessageContext) GetOffset ¶
func (this *ConsumerMessageContext) GetOffset() int64
func (*ConsumerMessageContext) GetPartition ¶
func (this *ConsumerMessageContext) GetPartition() int32
func (*ConsumerMessageContext) GetSession ¶
func (this *ConsumerMessageContext) GetSession() *ConsumerSession
func (*ConsumerMessageContext) GetTimeStamp ¶
func (this *ConsumerMessageContext) GetTimeStamp() time.Time
func (*ConsumerMessageContext) GetTopic ¶
func (this *ConsumerMessageContext) GetTopic() string
func (*ConsumerMessageContext) GetVal ¶
func (this *ConsumerMessageContext) GetVal() *sarama.ConsumerMessage
type ConsumerResponseListener ¶
type ConsumerResponseListener func(context *ConsumerMessageContext)
消费者返回数据监听
type ConsumerSession ¶
type ConsumerSession struct {
// contains filtered or unexported fields
}
func NewConsumerSession ¶
func NewConsumerSession(session sarama.ConsumerGroupSession, message *sarama.ConsumerMessage, isAutoAck bool) *ConsumerSession
func (*ConsumerSession) Ack ¶
func (this *ConsumerSession) Ack()
func (*ConsumerSession) GetMessage ¶
func (this *ConsumerSession) GetMessage() *sarama.ConsumerMessage
func (*ConsumerSession) GetSession ¶
func (this *ConsumerSession) GetSession() sarama.ConsumerGroupSession
func (*ConsumerSession) IsAutoAck ¶
func (this *ConsumerSession) IsAutoAck() bool
type ProductAckType ¶
type ProductAckType int
type ProductBalanceType ¶
type ProductBalanceType int
Source Files
¶
Click to show internal directories.
Click to hide internal directories.