Documentation ¶
Index ¶
- Constants
- type Config
- type ConsumerGroupConfig
- type DescribeConsumerGroupsResponse
- type InfoMetricConfig
- type LogDirResponseShard
- type LogDirsConfig
- type OffsetCommit
- type Service
- func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, error)
- func (s *Service) DescribeLogDirs(ctx context.Context) []LogDirResponseShard
- func (s *Service) GetAPIVersions(ctx context.Context) (*kmsg.ApiVersionsResponse, error)
- func (s *Service) GetClusterVersion(ctx context.Context) (string, error)
- func (s *Service) GetMetadata(ctx context.Context) (*kmsg.MetadataResponse, error)
- func (s *Service) GetMetadataCached(ctx context.Context) (*kmsg.MetadataResponse, error)
- func (s *Service) GetNumberOfOffsetRecordsConsumed() float64
- func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsResponse, error)
- func (s *Service) HandleIsReady() http.HandlerFunc
- func (s *Service) IsGroupAllowed(groupName string) bool
- func (s *Service) IsTopicAllowed(topicName string) bool
- func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[string]*kmsg.OffsetFetchResponse, error)
- func (s *Service) ListAllConsumerGroupOffsetsInternal() map[string]map[string]map[int32]OffsetCommit
- func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error)
- func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error)
- func (s *Service) Start(ctx context.Context) error
- type Storage
- type TopicConfig
Constants ¶
const ( ConsumerGroupScrapeModeOffsetsTopic string = "offsetsTopic" ConsumerGroupScrapeModeAdminAPI string = "adminApi" ConsumerGroupGranularityTopic string = "topic" ConsumerGroupGranularityPartition string = "partition" )
const ( TopicGranularityTopic string = "topic" TopicGranularityPartition string = "partition" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { ConsumerGroups ConsumerGroupConfig `koanf:"consumerGroups"` Topics TopicConfig `koanf:"topics"` LogDirs LogDirsConfig `koanf:"logDirs"` EndToEnd e2e.Config `koanf:"endToEnd"` }
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct { // Enabled specifies whether consumer groups shall be scraped and exported or not. Enabled bool `koanf:"enabled"` // Mode specifies whether we export consumer group offsets using the Admin API or by consuming the internal // __consumer_offsets topic. ScrapeMode string `koanf:"scrapeMode"` // Granularity can be per topic or per partition. If you want to reduce the number of exported metric series and // you aren't interested in per partition lags you could choose "topic" where all partition lags will be summed // and only topic lags will be exported. Granularity string `koanf:"granularity"` // AllowedGroups are regex strings of group ids that shall be exported AllowedGroupIDs []string `koanf:"allowedGroups"` // IgnoredGroups are regex strings of group ids that shall be ignored/skipped when exporting metrics. Ignored groups // take precedence over allowed groups. IgnoredGroupIDs []string `koanf:"ignoredGroups"` }
func (*ConsumerGroupConfig) SetDefaults ¶
func (c *ConsumerGroupConfig) SetDefaults()
func (*ConsumerGroupConfig) Validate ¶
func (c *ConsumerGroupConfig) Validate() error
type DescribeConsumerGroupsResponse ¶ added in v2.1.0
type DescribeConsumerGroupsResponse struct { BrokerMetadata kgo.BrokerMetadata Groups *kmsg.DescribeGroupsResponse }
type InfoMetricConfig ¶ added in v2.2.0
type InfoMetricConfig struct { // ConfigKeys configures optional topic configuration keys that should be exported // as prometheus metric labels. // By default only "cleanup.policy" is exported ConfigKeys []string `koanf:"configKeys"` }
type LogDirResponseShard ¶
type LogDirResponseShard struct { Err error Broker kgo.BrokerMetadata LogDirs *kmsg.DescribeLogDirsResponse }
type LogDirsConfig ¶
type LogDirsConfig struct { // Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior // to version 1.0.0 as describing log dirs was not supported back then. Enabled bool `koanf:"enabled"` }
func (*LogDirsConfig) SetDefaults ¶
func (c *LogDirsConfig) SetDefaults()
SetDefaults for topic config
func (*LogDirsConfig) Validate ¶
func (c *LogDirsConfig) Validate() error
Validate if provided LogDirsConfig is valid.
type OffsetCommit ¶
type OffsetCommit struct { Key kmsg.OffsetCommitKey Value kmsg.OffsetCommitValue // CommitCount is the number of offset commits for this group-topic-partition combination CommitCount int // ExpireTimestamp is a timestamp that indicates when this offset commit will expire on the Kafka cluster ExpireTimestamp time.Time }
OffsetCommit is used as value for the OffsetCommit map
type Service ¶
type Service struct { Cfg Config AllowedGroupIDsExpr []*regexp.Regexp IgnoredGroupIDsExpr []*regexp.Regexp AllowedTopicsExpr []*regexp.Regexp IgnoredTopicsExpr []*regexp.Regexp // contains filtered or unexported fields }
func NewService ¶
func (*Service) DescribeConsumerGroups ¶
func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, error)
func (*Service) DescribeLogDirs ¶
func (s *Service) DescribeLogDirs(ctx context.Context) []LogDirResponseShard
func (*Service) GetAPIVersions ¶
func (*Service) GetClusterVersion ¶
func (*Service) GetMetadata ¶
func (*Service) GetMetadataCached ¶
func (*Service) GetNumberOfOffsetRecordsConsumed ¶
func (*Service) GetTopicConfigs ¶
func (*Service) HandleIsReady ¶ added in v2.2.0
func (s *Service) HandleIsReady() http.HandlerFunc
func (*Service) IsGroupAllowed ¶
func (*Service) IsTopicAllowed ¶
func (*Service) ListAllConsumerGroupOffsetsAdminAPI ¶
func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[string]*kmsg.OffsetFetchResponse, error)
ListAllConsumerGroupOffsetsAdminAPI return all consumer group offsets using Kafka's Admin API.
func (*Service) ListAllConsumerGroupOffsetsInternal ¶
func (s *Service) ListAllConsumerGroupOffsetsInternal() map[string]map[string]map[int32]OffsetCommit
ListAllConsumerGroupOffsetsInternal returns a map from the in memory storage. The map value is the offset commit value and is grouped by group id, topic, partition id as keys of the nested maps.
func (*Service) ListOffsets ¶
func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error)
ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions
func (*Service) ListOffsetsCached ¶
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage stores the current state of all consumer group information that has been consumed using the offset consumer.
type TopicConfig ¶
type TopicConfig struct { // Enabled can be set to false in order to not collect any topic metrics at all. Enabled bool `koanf:"enabled"` // Granularity can be per topic or per partition. If you want to reduce the number of exported metric series and // you aren't interested in per partition metrics you could choose "topic". Granularity string `koanf:"granularity"` // AllowedTopics are regex strings of topic names whose topic metrics that shall be exported. AllowedTopics []string `koanf:"allowedTopics"` // IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics // take precedence over allowed topics. IgnoredTopics []string `koanf:"ignoredTopics"` // InfoMetric configures how the kafka_topic_info metric is populated InfoMetric InfoMetricConfig `koanf:"infoMetric"` }
func (*TopicConfig) Validate ¶
func (c *TopicConfig) Validate() error
Validate if provided TopicConfig is valid.