Documentation ¶
Index ¶
- type Config
- type ConsumerGroup
- func (cg *ConsumerGroup) CommitOffset(topic string, partition int32, offset int64) error
- func (cg *ConsumerGroup) GetErrors(topic string) (<-chan *sarama.ConsumerError, bool)
- func (cg *ConsumerGroup) GetMessages(topic string) (<-chan *sarama.ConsumerMessage, bool)
- func (cg *ConsumerGroup) GetOffsets() map[string]interface{}
- func (cg *ConsumerGroup) IsStopped() bool
- func (cg *ConsumerGroup) OnClose(cb func())
- func (cg *ConsumerGroup) OnLoad(cb func())
- func (cg *ConsumerGroup) Owners() map[string]map[int32]string
- func (cg *ConsumerGroup) SetLogger(l *logrus.Logger)
- func (cg *ConsumerGroup) Start() error
- func (cg *ConsumerGroup) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Chroot string // ZkList is required, zookeeper address's list ZkList []string // Zookeeper session timeout, default is 6s ZkSessionTimeout time.Duration // GroupID is required, identifer to determin which ConsumerGroup would be joined GroupID string // ConsumerID is optional, identifer to sign partition's owner ConsumerID string // TopicList is required, topics that ConsumerGroup would be consumed TopicList []string // Just export Sarama Config SaramaConfig *sarama.Config // Size of error channel, default is 1024 ErrorChannelBufferSize int // Whether auto commit the offset or not, default is true OffsetAutoCommitEnable bool // Offset auto commit interval, default is 10s OffsetAutoCommitInterval time.Duration // Where to fetch messages when offset was not found, default is newest OffsetAutoReset int64 // Claim the partition would give up after ClaimPartitionRetryTimes(>0) retires, // ClaimPartitionRetryTimes <= 0 would retry until success or receive stop signal ClaimPartitionRetryTimes int // Retry interval when fail to clain the partition ClaimPartitionRetryInterval time.Duration }
Config is used to pass multiple configuration options to ConsumerGroup's constructors
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup consume message from Kafka with rebalancing supports
func NewConsumerGroup ¶
func NewConsumerGroup(config *Config) (*ConsumerGroup, error)
NewConsumerGroup create the ConsumerGroup instance with config
func (*ConsumerGroup) CommitOffset ¶
func (cg *ConsumerGroup) CommitOffset(topic string, partition int32, offset int64) error
CommitOffset is used to commit offset when auto commit was disabled.
func (*ConsumerGroup) GetErrors ¶ added in v0.2.0
func (cg *ConsumerGroup) GetErrors(topic string) (<-chan *sarama.ConsumerError, bool)
GetErrors was used to get a unbuffered error's channel from specified topic
func (*ConsumerGroup) GetMessages ¶ added in v0.2.0
func (cg *ConsumerGroup) GetMessages(topic string) (<-chan *sarama.ConsumerMessage, bool)
GetMessages was used to get a unbuffered message's channel from specified topic
func (*ConsumerGroup) GetOffsets ¶ added in v0.2.3
func (cg *ConsumerGroup) GetOffsets() map[string]interface{}
GetOffsets return the offset in memory for debug
func (*ConsumerGroup) IsStopped ¶
func (cg *ConsumerGroup) IsStopped() bool
IsStopped return whether the ConsumerGroup was stopped or not.
func (*ConsumerGroup) OnClose ¶ added in v0.2.3
func (cg *ConsumerGroup) OnClose(cb func())
OnClose load callback function that runs before the end
func (*ConsumerGroup) OnLoad ¶ added in v0.2.3
func (cg *ConsumerGroup) OnLoad(cb func())
OnLoad load callback function that runs after startup
func (*ConsumerGroup) Owners ¶ added in v0.2.3
func (cg *ConsumerGroup) Owners() map[string]map[int32]string
Owners return owners of all partitions
func (*ConsumerGroup) SetLogger ¶
func (cg *ConsumerGroup) SetLogger(l *logrus.Logger)
SetLogger use to set the user's logger the consumer group
func (*ConsumerGroup) Start ¶ added in v0.2.2
func (cg *ConsumerGroup) Start() error
Start would register ConsumerGroup, and rebalance would be triggered. ConsumerGroup computes the partitions which should be consumed by consumer's num, and start fetching message.
func (*ConsumerGroup) Stop ¶ added in v0.2.2
func (cg *ConsumerGroup) Stop()
Stop would unregister ConsumerGroup, and rebalance would be triggered. The partitions which consumed by this ConsumerGroup would be assigned to others.