Documentation ¶
Index ¶
- func FetchTopicOffsets(client sarama.Client, offset int64, topic string) (topicOffsets map[int32]TopicPartitionOffset)
- func GenerateOffsetRequests(client sarama.Client, time int64, topics ...string) (requests map[*sarama.Broker]*sarama.OffsetRequest)
- func GetBrokerGroupOffsets(broker *sarama.Broker, groupOffsetChannel chan GroupOffset)
- func GetBrokerTopicOffsets(broker *sarama.Broker, request *sarama.OffsetRequest, ...)
- func GetOffsetFetchRequest(desc *sarama.GroupDescription) *sarama.OffsetFetchRequest
- func GetSaramaClient(brokers ...string) sarama.Client
- func GetSaramaConsumer(brokers string, consumerGroup string, topics []string) *cluster.Consumer
- type GroupOffset
- type GroupOffsetSlice
- type GroupTopicOffset
- type GroupTopicOffsetSlice
- type MemberAssignment
- type TopicAssignment
- type TopicPartitionOffset
- type TopicPartitionOffsetSlice
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FetchTopicOffsets ¶
func FetchTopicOffsets(client sarama.Client, offset int64, topic string) (topicOffsets map[int32]TopicPartitionOffset)
FetchTopicOffsets fetches topic offsets
func GenerateOffsetRequests ¶
func GenerateOffsetRequests(client sarama.Client, time int64, topics ...string) (requests map[*sarama.Broker]*sarama.OffsetRequest)
GenerateOffsetRequests generates the offset requests which can be used in the GetBrokerTopicOffsets function
func GetBrokerGroupOffsets ¶
func GetBrokerGroupOffsets(broker *sarama.Broker, groupOffsetChannel chan GroupOffset)
GetBrokerGroupOffsets fetches all group offsets for a specific broker
func GetBrokerTopicOffsets ¶
func GetBrokerTopicOffsets(broker *sarama.Broker, request *sarama.OffsetRequest, offsets chan TopicPartitionOffset)
GetBrokerTopicOffsets fetches the offsets for all topics from a specific groker and sends them to the offset topic
func GetOffsetFetchRequest ¶
func GetOffsetFetchRequest(desc *sarama.GroupDescription) *sarama.OffsetFetchRequest
GetOffsetFetchRequest generates a request for the offsets of a specific group
func GetSaramaClient ¶
GetSaramaClient sets up a kafka client
func GetSaramaConsumer ¶
GetSaramaConsumer returns a high-level kafka consumer
Types ¶
type GroupOffset ¶
type GroupOffset struct { Group string GroupTopicOffsets GroupTopicOffsetSlice }
GroupOffset contains the topic offsets for a specific Group
type GroupOffsetSlice ¶
type GroupOffsetSlice []GroupOffset
GroupOffsetSlice for sorting
func FetchOffsets ¶
func FetchOffsets(client sarama.Client, offset int64) (groupOffsets GroupOffsetSlice, topicOffsets map[string]map[int32]TopicPartitionOffset)
FetchOffsets fetches group and topic offsets (where the topic offset can be sarama.OffsetNewest/OffsetOldest or the time in milliseconds)
func (GroupOffsetSlice) Len ¶
func (a GroupOffsetSlice) Len() int
func (GroupOffsetSlice) Less ¶
func (a GroupOffsetSlice) Less(i, j int) bool
func (GroupOffsetSlice) Swap ¶
func (a GroupOffsetSlice) Swap(i, j int)
type GroupTopicOffset ¶
type GroupTopicOffset struct { Topic string TopicPartitionOffsets TopicPartitionOffsetSlice }
GroupTopicOffset contains the partition offset of a topic
type GroupTopicOffsetSlice ¶
type GroupTopicOffsetSlice []GroupTopicOffset
GroupTopicOffsetSlice for sorting
func (GroupTopicOffsetSlice) Len ¶
func (a GroupTopicOffsetSlice) Len() int
func (GroupTopicOffsetSlice) Less ¶
func (a GroupTopicOffsetSlice) Less(i, j int) bool
func (GroupTopicOffsetSlice) Swap ¶
func (a GroupTopicOffsetSlice) Swap(i, j int)
type MemberAssignment ¶
type MemberAssignment struct { Version int Assignments []TopicAssignment }
MemberAssignment contains the assignments of a consumer group member
func ParseMemberAssignment ¶
func ParseMemberAssignment(byteArr []byte) (assignments MemberAssignment)
ParseMemberAssignment parses a binary byteArr
type TopicAssignment ¶
TopicAssignment contains the assigned partitions of a topic
type TopicPartitionOffset ¶
TopicPartitionOffset information
type TopicPartitionOffsetSlice ¶
type TopicPartitionOffsetSlice []TopicPartitionOffset
TopicPartitionOffsetSlice for sorting
func (TopicPartitionOffsetSlice) Len ¶
func (a TopicPartitionOffsetSlice) Len() int
func (TopicPartitionOffsetSlice) Less ¶
func (a TopicPartitionOffsetSlice) Less(i, j int) bool
func (TopicPartitionOffsetSlice) Swap ¶
func (a TopicPartitionOffsetSlice) Swap(i, j int)