minion

package
v2.2.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2024 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConsumerGroupScrapeModeOffsetsTopic string = "offsetsTopic"
	ConsumerGroupScrapeModeAdminAPI     string = "adminApi"

	ConsumerGroupGranularityTopic     string = "topic"
	ConsumerGroupGranularityPartition string = "partition"
)
View Source
const (
	TopicGranularityTopic     string = "topic"
	TopicGranularityPartition string = "partition"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	ConsumerGroups ConsumerGroupConfig `koanf:"consumerGroups"`
	Topics         TopicConfig         `koanf:"topics"`
	LogDirs        LogDirsConfig       `koanf:"logDirs"`
	EndToEnd       e2e.Config          `koanf:"endToEnd"`
}

func (*Config) SetDefaults

func (c *Config) SetDefaults()

func (*Config) Validate

func (c *Config) Validate() error

type ConsumerGroupConfig

type ConsumerGroupConfig struct {
	// Enabled specifies whether consumer groups shall be scraped and exported or not.
	Enabled bool `koanf:"enabled"`

	// Mode specifies whether we export consumer group offsets using the Admin API or by consuming the internal
	// __consumer_offsets topic.
	ScrapeMode string `koanf:"scrapeMode"`

	// Granularity can be per topic or per partition. If you want to reduce the number of exported metric series and
	// you aren't interested in per partition lags you could choose "topic" where all partition lags will be summed
	// and only topic lags will be exported.
	Granularity string `koanf:"granularity"`

	// AllowedGroups are regex strings of group ids that shall be exported
	AllowedGroupIDs []string `koanf:"allowedGroups"`

	// IgnoredGroups are regex strings of group ids that shall be ignored/skipped when exporting metrics. Ignored groups
	// take precedence over allowed groups.
	IgnoredGroupIDs []string `koanf:"ignoredGroups"`
}

func (*ConsumerGroupConfig) SetDefaults

func (c *ConsumerGroupConfig) SetDefaults()

func (*ConsumerGroupConfig) Validate

func (c *ConsumerGroupConfig) Validate() error

type DescribeConsumerGroupsResponse added in v2.1.0

type DescribeConsumerGroupsResponse struct {
	BrokerMetadata kgo.BrokerMetadata
	Groups         *kmsg.DescribeGroupsResponse
}

type InfoMetricConfig added in v2.2.0

type InfoMetricConfig struct {
	// ConfigKeys configures optional topic configuration keys that should be exported
	// as prometheus metric labels.
	// By default only "cleanup.policy" is exported
	ConfigKeys []string `koanf:"configKeys"`
}

type LogDirResponseShard

type LogDirResponseShard struct {
	Err     error
	Broker  kgo.BrokerMetadata
	LogDirs *kmsg.DescribeLogDirsResponse
}

type LogDirsConfig

type LogDirsConfig struct {
	// Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
	// to version 1.0.0 as describing log dirs was not supported back then.
	Enabled bool `koanf:"enabled"`
}

func (*LogDirsConfig) SetDefaults

func (c *LogDirsConfig) SetDefaults()

SetDefaults for topic config

func (*LogDirsConfig) Validate

func (c *LogDirsConfig) Validate() error

Validate if provided LogDirsConfig is valid.

type OffsetCommit

type OffsetCommit struct {
	Key   kmsg.OffsetCommitKey
	Value kmsg.OffsetCommitValue

	// CommitCount is the number of offset commits for this group-topic-partition combination
	CommitCount int

	// ExpireTimestamp is a timestamp that indicates when this offset commit will expire on the Kafka cluster
	ExpireTimestamp time.Time
}

OffsetCommit is used as value for the OffsetCommit map

type Service

type Service struct {
	Cfg Config

	AllowedGroupIDsExpr []*regexp.Regexp
	IgnoredGroupIDsExpr []*regexp.Regexp
	AllowedTopicsExpr   []*regexp.Regexp
	IgnoredTopicsExpr   []*regexp.Regexp
	// contains filtered or unexported fields
}

func NewService

func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricsNamespace string, ctx context.Context) (*Service, error)

func (*Service) DescribeConsumerGroups

func (s *Service) DescribeConsumerGroups(ctx context.Context) ([]DescribeConsumerGroupsResponse, error)

func (*Service) DescribeLogDirs

func (s *Service) DescribeLogDirs(ctx context.Context) []LogDirResponseShard

func (*Service) GetAPIVersions

func (s *Service) GetAPIVersions(ctx context.Context) (*kmsg.ApiVersionsResponse, error)

func (*Service) GetClusterVersion

func (s *Service) GetClusterVersion(ctx context.Context) (string, error)

func (*Service) GetMetadata

func (s *Service) GetMetadata(ctx context.Context) (*kmsg.MetadataResponse, error)

func (*Service) GetMetadataCached

func (s *Service) GetMetadataCached(ctx context.Context) (*kmsg.MetadataResponse, error)

func (*Service) GetNumberOfOffsetRecordsConsumed

func (s *Service) GetNumberOfOffsetRecordsConsumed() float64

func (*Service) GetTopicConfigs

func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsResponse, error)

func (*Service) HandleIsReady added in v2.2.0

func (s *Service) HandleIsReady() http.HandlerFunc

func (*Service) IsGroupAllowed

func (s *Service) IsGroupAllowed(groupName string) bool

func (*Service) IsTopicAllowed

func (s *Service) IsTopicAllowed(topicName string) bool

func (*Service) ListAllConsumerGroupOffsetsAdminAPI

func (s *Service) ListAllConsumerGroupOffsetsAdminAPI(ctx context.Context) (map[string]*kmsg.OffsetFetchResponse, error)

ListAllConsumerGroupOffsetsAdminAPI return all consumer group offsets using Kafka's Admin API.

func (*Service) ListAllConsumerGroupOffsetsInternal

func (s *Service) ListAllConsumerGroupOffsetsInternal() map[string]map[string]map[int32]OffsetCommit

ListAllConsumerGroupOffsetsInternal returns a map from the in memory storage. The map value is the offset commit value and is grouped by group id, topic, partition id as keys of the nested maps.

func (*Service) ListOffsets

func (s *Service) ListOffsets(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error)

ListOffsets fetches the low (timestamp: -2) or high water mark (timestamp: -1) for all topic partitions

func (*Service) ListOffsetsCached

func (s *Service) ListOffsetsCached(ctx context.Context, timestamp int64) (*kmsg.ListOffsetsResponse, error)

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage stores the current state of all consumer group information that has been consumed using the offset consumer.

type TopicConfig

type TopicConfig struct {
	// Enabled can be set to false in order to not collect any topic metrics at all.
	Enabled bool `koanf:"enabled"`

	// Granularity can be per topic or per partition. If you want to reduce the number of exported metric series and
	// you aren't interested in per partition metrics you could choose "topic".
	Granularity string `koanf:"granularity"`

	// AllowedTopics are regex strings of topic names whose topic metrics that shall be exported.
	AllowedTopics []string `koanf:"allowedTopics"`

	// IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
	// take precedence over allowed topics.
	IgnoredTopics []string `koanf:"ignoredTopics"`

	// InfoMetric configures how the kafka_topic_info metric is populated
	InfoMetric InfoMetricConfig `koanf:"infoMetric"`
}

func (*TopicConfig) SetDefaults

func (c *TopicConfig) SetDefaults()

SetDefaults for topic config

func (*TopicConfig) Validate

func (c *TopicConfig) Validate() error

Validate if provided TopicConfig is valid.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL