Documentation ¶
Index ¶
- Constants
- Variables
- func AddParitions(ctx context.Context, name string, increaseTo int) error
- func Broker(id string) (broker, bool)
- func BrokerToipcStats(brokerId int) (int, int, string)
- func Brokers() brokers
- func CreateTopic(ctx context.Context, name string, partitions, replicationFactor int, ...) error
- func DeleteTopic(name string) error
- func FetchConsumerGroups(ctx context.Context, out chan ConsumerGroups)
- func FetchMetrics(ctx context.Context, metrics chan Metric, reqs []MetricRequest)
- func FetchTopic(topicName string) (topic, error)
- func KafkaVersion(brokerId int) (string, error)
- func NewBroker() broker
- func Partitions() int
- func PluginVersion(brokerId int) (string, error)
- func Start()
- func Topic(name string) (topic, bool)
- func TotalMessageCount() int
- func TotalTopicSize() string
- func UpdateTopic(name string) bool
- func UpdateTopicConfig(ctx context.Context, name string, topicConfig map[string]interface{}) error
- func Uptime() string
- type ConsumedPartition
- type ConsumerGroup
- type ConsumerGroups
- func (g ConsumerGroups) Clients(group string) []client
- func (g ConsumerGroups) Lag(group string) map[string]int
- func (g ConsumerGroups) MarshalJSON() ([]byte, error)
- func (g ConsumerGroups) NumberConsumers(group string) int
- func (g ConsumerGroups) Online(group string) bool
- func (g ConsumerGroups) Topics(group string) []map[string]interface{}
- type ConsumerSlice
- type JMXBean
- type KafkaUser
- type Metric
- type MetricRequest
- type Partition
- type SimpleTimeSerie
- type SumTimeSerie
- type TimeSerie
- type TopicConfig
- type TopicSlice
Constants ¶
Variables ¶
View Source
var ( BeanBrokerBytesInPerSec = JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "BytesInPerSec"}} BeanBrokerBytesOutPerSec = JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "BytesOutPerSec"}} BeanBrokerMessagesInPerSec = JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "MessagesInPerSec"}} BeanBrokerBytesRejectedPerSec = JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "BytesRejectedPerSec"}} BeanBrokerPartitionCount = JMXBean{"kafka.server", map[string]string{ "type": "ReplicaManager", "name": "PartitionCount"}} BeanBrokerLeaderCount = JMXBean{"kafka.server", map[string]string{ "type": "ReplicaManager", "name": "LeaderCount"}} BeanBrokerUnderReplicatedPartitions = JMXBean{"kafka.server", map[string]string{ "type": "ReplicaManager", "name": "UnderReplicatedPartitions"}} BeanBrokerOfflinePartitionsCount = JMXBean{"kafka.controller", map[string]string{ "type": "ReplicaManager", "name": "OfflinePartitionsCount", }} BeanBrokerIsrShrinks = JMXBean{"kafka.server", map[string]string{ "type": "ReplicaManager", "name": "IsrShrinksPerSec", }} BeanBrokerIsrExpands = JMXBean{"kafka.server", map[string]string{ "type": "ReplicaManager", "name": "IsrExpandsPerSec", }} BeanBrokerConnections = JMXBean{"kafka.server", map[string]string{ "type": "socket-server-metrics", "listener": "*", "networkProcessor": "*"}} BeanAllTopicsBytesInPerSec = JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "BytesInPerSec", "topic": "*"}} BeanAllTopicsBytesOutPerSec = JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "BytesOutPerSec", "topic": "*"}} BeanTopicBytesInPerSec = func(topic string) JMXBean { return JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "BytesInPerSec", "topic": topic}} } BeanTopicBytesOutPerSec = func(topic string) JMXBean { return JMXBean{"kafka.server", map[string]string{ "type": "BrokerTopicMetrics", "name": "BytesOutPerSec", "topic": topic}} } BeanAllTopicsLogStart = JMXBean{"kafka.log", map[string]string{ "type": "Log", "name": "LogStartOffset", "topic": "*", "partition": "*"}} BeanAllTopicsLogEnd = JMXBean{"kafka.log", map[string]string{ "type": "Log", "name": "LogEndOffset", "topic": "*", "partition": "*"}} BeanAllTopicsLogSize = JMXBean{"kafka.log", map[string]string{ "type": "Log", "name": "Size", "topic": "*", "partition": "*"}} BeanTopicLogStart = func(topic string) JMXBean { return JMXBean{"kafka.log", map[string]string{ "type": "Log", "name": "LogStartOffset", "topic": topic, "partition": "*"}} } BeanTopicLogEnd = func(topic string) JMXBean { return JMXBean{"kafka.log", map[string]string{ "type": "Log", "name": "LogEndOffset", "topic": topic, "partition": "*"}} } BeanTopicLogSize = func(topic string) JMXBean { return JMXBean{"kafka.log", map[string]string{ "type": "Log", "name": "Size", "topic": topic, "partition": "*"}} } )
Functions ¶
func CreateTopic ¶
func DeleteTopic ¶
func FetchConsumerGroups ¶
func FetchConsumerGroups(ctx context.Context, out chan ConsumerGroups)
func FetchMetrics ¶
func FetchMetrics(ctx context.Context, metrics chan Metric, reqs []MetricRequest)
func FetchTopic ¶
func KafkaVersion ¶
func Partitions ¶
func Partitions() int
func PluginVersion ¶
func TotalMessageCount ¶
func TotalMessageCount() int
func TotalTopicSize ¶
func TotalTopicSize() string
func UpdateTopic ¶
func UpdateTopicConfig ¶
Types ¶
type ConsumedPartition ¶
type ConsumedPartition struct { Topic string `json:"topic"` Partition int `json:"partition"` CurrentOffset int `json:"current_offset"` LogEndOffset int `json:"log_end_offset"` ClientId string `json:"clientid"` ConsumerId string `json:"consumerid"` Host string `json:"host"` LastSeen int64 `json:"last_seen"` }
func (ConsumedPartition) Lag ¶
func (g ConsumedPartition) Lag() int
type ConsumerGroup ¶
type ConsumerGroup struct { Name string `json:"name"` Topics []map[string]interface{} `json:"topics"` Clients []client `json:"clients"` Online bool `json:"online"` ConsumedPartitions []ConsumedPartition `json:"consumed_partitions"` LastSeen int64 `json:"last_seen"` }
func Consumer ¶
func Consumer(name string) (ConsumerGroup, bool)
type ConsumerGroups ¶
type ConsumerGroups map[string][]ConsumedPartition
func (ConsumerGroups) Clients ¶
func (g ConsumerGroups) Clients(group string) []client
func (ConsumerGroups) MarshalJSON ¶
func (g ConsumerGroups) MarshalJSON() ([]byte, error)
func (ConsumerGroups) NumberConsumers ¶
func (g ConsumerGroups) NumberConsumers(group string) int
func (ConsumerGroups) Online ¶
func (g ConsumerGroups) Online(group string) bool
func (ConsumerGroups) Topics ¶
func (g ConsumerGroups) Topics(group string) []map[string]interface{}
type ConsumerSlice ¶
type ConsumerSlice []ConsumerGroup
func Consumers ¶
func Consumers() ConsumerSlice
func (ConsumerSlice) Get ¶
func (me ConsumerSlice) Get(i int) interface{}
func (ConsumerSlice) Size ¶
func (me ConsumerSlice) Size() int
type JMXBean ¶
func BeanFromString ¶
type Metric ¶
type Metric struct { Broker int `json:"broker"` Topic string `json:"topic"` Name string `json:"name"` Partition string `json:"partition"` Type string `json:"type"` Value float64 `json:"value"` Listener string `json:"listener"` NetworkProcessor string `json:"networkProcessor"` Attribute string `json:"attribute"` Request string `json:"request"` Key string `json:"key"` Error string `json:"-"` }
func GetMetrics ¶
func GetMetrics(ctx context.Context, query MetricRequest) ([]Metric, error)
type MetricRequest ¶
func (MetricRequest) String ¶
func (mr MetricRequest) String() string
type SimpleTimeSerie ¶
type SimpleTimeSerie struct { Points []int `json:"points"` // contains filtered or unexported fields }
func NewSimpleTimeSerie ¶
func NewSimpleTimeSerie(interval, maxPoints int) *SimpleTimeSerie
func (*SimpleTimeSerie) Add ¶
func (me *SimpleTimeSerie) Add(y int)
func (*SimpleTimeSerie) All ¶
func (me *SimpleTimeSerie) All() []int
func (*SimpleTimeSerie) Interval ¶
func (me *SimpleTimeSerie) Interval() int
func (*SimpleTimeSerie) Last ¶
func (me *SimpleTimeSerie) Last() int
func (*SimpleTimeSerie) Len ¶
func (me *SimpleTimeSerie) Len() int
type SumTimeSerie ¶
type SumTimeSerie struct {
Series []TimeSerie
}
func (*SumTimeSerie) All ¶
func (me *SumTimeSerie) All() []int
func (*SumTimeSerie) Interval ¶
func (me *SumTimeSerie) Interval() int
func (*SumTimeSerie) Last ¶
func (me *SumTimeSerie) Last() int
func (*SumTimeSerie) Len ¶
func (me *SumTimeSerie) Len() int
type TopicConfig ¶
type TopicConfig struct { Version int `json:"version"` Data map[string]interface{} `json:"config"` }
func (TopicConfig) MarshalJSON ¶
func (t TopicConfig) MarshalJSON() ([]byte, error)
type TopicSlice ¶
type TopicSlice []topic
func Topics ¶
func Topics() TopicSlice
func (TopicSlice) Get ¶
func (me TopicSlice) Get(i int) interface{}
func (TopicSlice) Size ¶
func (me TopicSlice) Size() int
Click to show internal directories.
Click to hide internal directories.