store

package
v0.0.0-...-6603b5a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxPoints  int           = 500
	Timeout    time.Duration = 5 * time.Second
	SampleTime time.Duration = 30 * time.Second
)

Variables

View Source
var (
	BeanBrokerBytesInPerSec = JMXBean{"kafka.server", map[string]string{
		"type": "BrokerTopicMetrics",
		"name": "BytesInPerSec"}}
	BeanBrokerBytesOutPerSec = JMXBean{"kafka.server", map[string]string{
		"type": "BrokerTopicMetrics",
		"name": "BytesOutPerSec"}}
	BeanBrokerMessagesInPerSec = JMXBean{"kafka.server", map[string]string{
		"type": "BrokerTopicMetrics",
		"name": "MessagesInPerSec"}}
	BeanBrokerBytesRejectedPerSec = JMXBean{"kafka.server", map[string]string{
		"type": "BrokerTopicMetrics",
		"name": "BytesRejectedPerSec"}}

	BeanBrokerPartitionCount = JMXBean{"kafka.server", map[string]string{
		"type": "ReplicaManager",
		"name": "PartitionCount"}}
	BeanBrokerLeaderCount = JMXBean{"kafka.server", map[string]string{
		"type": "ReplicaManager",
		"name": "LeaderCount"}}

	BeanBrokerUnderReplicatedPartitions = JMXBean{"kafka.server", map[string]string{
		"type": "ReplicaManager",
		"name": "UnderReplicatedPartitions"}}
	BeanBrokerOfflinePartitionsCount = JMXBean{"kafka.controller", map[string]string{
		"type": "ReplicaManager",
		"name": "OfflinePartitionsCount",
	}}

	BeanBrokerIsrShrinks = JMXBean{"kafka.server", map[string]string{
		"type": "ReplicaManager",
		"name": "IsrShrinksPerSec",
	}}
	BeanBrokerIsrExpands = JMXBean{"kafka.server", map[string]string{
		"type": "ReplicaManager",
		"name": "IsrExpandsPerSec",
	}}
	BeanBrokerConnections = JMXBean{"kafka.server", map[string]string{
		"type":             "socket-server-metrics",
		"listener":         "*",
		"networkProcessor": "*"}}

	BeanAllTopicsBytesInPerSec = JMXBean{"kafka.server", map[string]string{
		"type":  "BrokerTopicMetrics",
		"name":  "BytesInPerSec",
		"topic": "*"}}
	BeanAllTopicsBytesOutPerSec = JMXBean{"kafka.server", map[string]string{
		"type":  "BrokerTopicMetrics",
		"name":  "BytesOutPerSec",
		"topic": "*"}}

	BeanTopicBytesInPerSec = func(topic string) JMXBean {
		return JMXBean{"kafka.server", map[string]string{
			"type":  "BrokerTopicMetrics",
			"name":  "BytesInPerSec",
			"topic": topic}}
	}
	BeanTopicBytesOutPerSec = func(topic string) JMXBean {
		return JMXBean{"kafka.server", map[string]string{
			"type":  "BrokerTopicMetrics",
			"name":  "BytesOutPerSec",
			"topic": topic}}
	}

	BeanAllTopicsLogStart = JMXBean{"kafka.log", map[string]string{
		"type":      "Log",
		"name":      "LogStartOffset",
		"topic":     "*",
		"partition": "*"}}

	BeanAllTopicsLogEnd = JMXBean{"kafka.log", map[string]string{
		"type":      "Log",
		"name":      "LogEndOffset",
		"topic":     "*",
		"partition": "*"}}

	BeanAllTopicsLogSize = JMXBean{"kafka.log", map[string]string{
		"type":      "Log",
		"name":      "Size",
		"topic":     "*",
		"partition": "*"}}

	BeanTopicLogStart = func(topic string) JMXBean {
		return JMXBean{"kafka.log", map[string]string{
			"type":      "Log",
			"name":      "LogStartOffset",
			"topic":     topic,
			"partition": "*"}}

	}
	BeanTopicLogEnd = func(topic string) JMXBean {
		return JMXBean{"kafka.log", map[string]string{
			"type":      "Log",
			"name":      "LogEndOffset",
			"topic":     topic,
			"partition": "*"}}

	}
	BeanTopicLogSize = func(topic string) JMXBean {
		return JMXBean{"kafka.log", map[string]string{
			"type":      "Log",
			"name":      "Size",
			"topic":     topic,
			"partition": "*"}}

	}
)

Functions

func AddParitions

func AddParitions(ctx context.Context, name string, increaseTo int) error

func Broker

func Broker(id string) (broker, bool)

func BrokerToipcStats

func BrokerToipcStats(brokerId int) (int, int, string)

func Brokers

func Brokers() brokers

func CreateTopic

func CreateTopic(ctx context.Context, name string, partitions, replicationFactor int, topicConfig map[string]string) error

func DeleteTopic

func DeleteTopic(name string) error

func FetchConsumerGroups

func FetchConsumerGroups(ctx context.Context, out chan ConsumerGroups)

func FetchMetrics

func FetchMetrics(ctx context.Context, metrics chan Metric, reqs []MetricRequest)

func FetchTopic

func FetchTopic(topicName string) (topic, error)

func KafkaVersion

func KafkaVersion(brokerId int) (string, error)

func NewBroker

func NewBroker() broker

func Partitions

func Partitions() int

func PluginVersion

func PluginVersion(brokerId int) (string, error)

func Start

func Start()

func Topic

func Topic(name string) (topic, bool)

func TotalMessageCount

func TotalMessageCount() int

func TotalTopicSize

func TotalTopicSize() string

func UpdateTopic

func UpdateTopic(name string) bool

func UpdateTopicConfig

func UpdateTopicConfig(ctx context.Context, name string, topicConfig map[string]interface{}) error

func Uptime

func Uptime() string

Types

type ConsumedPartition

type ConsumedPartition struct {
	Topic         string `json:"topic"`
	Partition     int    `json:"partition"`
	CurrentOffset int    `json:"current_offset"`
	LogEndOffset  int    `json:"log_end_offset"`
	ClientId      string `json:"clientid"`
	ConsumerId    string `json:"consumerid"`
	Host          string `json:"host"`
	LastSeen      int64  `json:"last_seen"`
}

func (ConsumedPartition) Lag

func (g ConsumedPartition) Lag() int

type ConsumerGroup

type ConsumerGroup struct {
	Name               string                   `json:"name"`
	Topics             []map[string]interface{} `json:"topics"`
	Clients            []client                 `json:"clients"`
	Online             bool                     `json:"online"`
	ConsumedPartitions []ConsumedPartition      `json:"consumed_partitions"`
	LastSeen           int64                    `json:"last_seen"`
}

func Consumer

func Consumer(name string) (ConsumerGroup, bool)

type ConsumerGroups

type ConsumerGroups map[string][]ConsumedPartition

func (ConsumerGroups) Clients

func (g ConsumerGroups) Clients(group string) []client

func (ConsumerGroups) Lag

func (g ConsumerGroups) Lag(group string) map[string]int

func (ConsumerGroups) MarshalJSON

func (g ConsumerGroups) MarshalJSON() ([]byte, error)

func (ConsumerGroups) NumberConsumers

func (g ConsumerGroups) NumberConsumers(group string) int

func (ConsumerGroups) Online

func (g ConsumerGroups) Online(group string) bool

func (ConsumerGroups) Topics

func (g ConsumerGroups) Topics(group string) []map[string]interface{}

type ConsumerSlice

type ConsumerSlice []ConsumerGroup

func Consumers

func Consumers() ConsumerSlice

func (ConsumerSlice) Get

func (me ConsumerSlice) Get(i int) interface{}

func (ConsumerSlice) Size

func (me ConsumerSlice) Size() int

type JMXBean

type JMXBean struct {
	Service string
	Params  map[string]string
}

func BeanFromString

func BeanFromString(str string) JMXBean

func (JMXBean) String

func (b JMXBean) String() string

type KafkaUser

type KafkaUser struct {
	Name string
	Type string
}

func Users

func Users(p zookeeper.Permissions) ([]KafkaUser, error)

type Metric

type Metric struct {
	Broker           int     `json:"broker"`
	Topic            string  `json:"topic"`
	Name             string  `json:"name"`
	Partition        string  `json:"partition"`
	Type             string  `json:"type"`
	Value            float64 `json:"value"`
	Listener         string  `json:"listener"`
	NetworkProcessor string  `json:"networkProcessor"`
	Attribute        string  `json:"attribute"`
	Request          string  `json:"request"`
	Key              string  `json:"key"`
	Error            string  `json:"-"`
}

func GetMetrics

func GetMetrics(ctx context.Context, query MetricRequest) ([]Metric, error)

type MetricRequest

type MetricRequest struct {
	BrokerId int
	Bean     JMXBean
	Attr     string
}

func (MetricRequest) String

func (mr MetricRequest) String() string

type Partition

type Partition struct {
	Number          int            `json:"number"`
	Leader          int            `json:"leader"`
	Replicas        []int          `json:"replicas"`
	ISR             []int          `json:"isr"`
	LeaderEpoch     int            `json:"leader_epoch"`
	Version         int            `json:"version"`
	ControllerEpoch int            `json:"controller_epoch"`
	Metrics         map[string]int `json:"metrics"`
}

type SimpleTimeSerie

type SimpleTimeSerie struct {
	Points []int `json:"points"`
	// contains filtered or unexported fields
}

func NewSimpleTimeSerie

func NewSimpleTimeSerie(interval, maxPoints int) *SimpleTimeSerie

func (*SimpleTimeSerie) Add

func (me *SimpleTimeSerie) Add(y int)

func (*SimpleTimeSerie) All

func (me *SimpleTimeSerie) All() []int

func (*SimpleTimeSerie) Interval

func (me *SimpleTimeSerie) Interval() int

func (*SimpleTimeSerie) Last

func (me *SimpleTimeSerie) Last() int

func (*SimpleTimeSerie) Len

func (me *SimpleTimeSerie) Len() int

type SumTimeSerie

type SumTimeSerie struct {
	Series []TimeSerie
}

func (*SumTimeSerie) All

func (me *SumTimeSerie) All() []int

func (*SumTimeSerie) Interval

func (me *SumTimeSerie) Interval() int

func (*SumTimeSerie) Last

func (me *SumTimeSerie) Last() int

func (*SumTimeSerie) Len

func (me *SumTimeSerie) Len() int

type TimeSerie

type TimeSerie interface {
	Interval() int
	All() []int
	Last() int
	Len() int
}

func NewSumTimeSerie

func NewSumTimeSerie(series []TimeSerie) TimeSerie

func SumBrokerSeries

func SumBrokerSeries(m string) TimeSerie

type TopicConfig

type TopicConfig struct {
	Version int                    `json:"version"`
	Data    map[string]interface{} `json:"config"`
}

func (TopicConfig) MarshalJSON

func (t TopicConfig) MarshalJSON() ([]byte, error)

type TopicSlice

type TopicSlice []topic

func Topics

func Topics() TopicSlice

func (TopicSlice) Get

func (me TopicSlice) Get(i int) interface{}

func (TopicSlice) Size

func (me TopicSlice) Size() int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL