Documentation ¶
Index ¶
- Variables
- func WhatChanged(s1 []int, s2 []int) string
- func WriteMap(pm *PartitionMap, path string) error
- type Broker
- type BrokerList
- type BrokerMap
- func (b BrokerMap) AboveMean(d float64, f func() float64) []int
- func (b BrokerMap) BelowMean(d float64, f func() float64) []int
- func (b BrokerMap) Copy() BrokerMap
- func (b BrokerMap) HMean() float64
- func (b BrokerMap) List() BrokerList
- func (b BrokerMap) MappedBrokers(pm *PartitionMap) BrokerMap
- func (b BrokerMap) Mean() float64
- func (b BrokerMap) NonReplacedBrokers() BrokerMap
- func (b BrokerMap) StorageDiff(b2 BrokerMap) map[int][2]float64
- func (b BrokerMap) StorageRange() float64
- func (b BrokerMap) StorageRangeSpread() float64
- func (b BrokerMap) StorageStdDev() float64
- func (b BrokerMap) SubStorageAll(pm *PartitionMap, pmm PartitionMetaMap) error
- func (b BrokerMap) SubStorageReplacements(pm *PartitionMap, pmm PartitionMetaMap) error
- func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)
- func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) *BrokerStatus
- type BrokerMeta
- type BrokerMetaMap
- type BrokerMetrics
- type BrokerMetricsMap
- type BrokerStatus
- type BrokerUseStats
- type BrokerUseStatsList
- type Config
- type Constraints
- type DegreeDistribution
- type DegreeDistributionStats
- type ErrNoNode
- type Handler
- type KafkaConfig
- type KafkaConfigData
- type Mappings
- type Mock
- func (zk *Mock) Children(a string) ([]string, error)
- func (zk *Mock) Close()
- func (zk *Mock) Create(a, b string) error
- func (zk *Mock) CreateSequential(a, b string) error
- func (zk *Mock) Exists(a string) (bool, error)
- func (zk *Mock) Get(a string) ([]byte, error)
- func (zk *Mock) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
- func (zk *Mock) GetAllPartitionMeta() (PartitionMetaMap, error)
- func (zk *Mock) GetBrokerMetrics() (BrokerMetricsMap, error)
- func (zk *Mock) GetPartitionMap(t string) (*PartitionMap, error)
- func (zk *Mock) GetReassignments() Reassignments
- func (zk *Mock) GetTopicConfig(t string) (*TopicConfig, error)
- func (zk *Mock) GetTopicState(t string) (*TopicState, error)
- func (zk *Mock) GetTopicStateISR(t string) (TopicStateISR, error)
- func (zk *Mock) GetTopics(ts []*regexp.Regexp) ([]string, error)
- func (zk *Mock) InitRawClient() error
- func (zk *Mock) Ready() bool
- func (zk *Mock) Set(a, b string) error
- func (zk *Mock) UpdateKafkaConfig(c KafkaConfig) (bool, error)
- type NoMappingForBroker
- type NoMappingForTopic
- type Partition
- type PartitionMap
- func (pm *PartitionMap) Copy() *PartitionMap
- func (pm *PartitionMap) DegreeDistribution() DegreeDistribution
- func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string
- func (pm *PartitionMap) Mappings() Mappings
- func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []string)
- func (pm *PartitionMap) SetReplication(r int)
- func (pm *PartitionMap) SimpleLeaderOptimization()
- func (pm *PartitionMap) Strip() *PartitionMap
- func (pm *PartitionMap) UseStats() []*BrokerUseStats
- type PartitionMeta
- type PartitionMetaMap
- type PartitionState
- type Reassignments
- type RebuildParams
- type SubstitutionAffinities
- type TopicConfig
- type TopicState
- type TopicStateISR
- type ZKHandler
- func (z *ZKHandler) Children(p string) ([]string, error)
- func (z *ZKHandler) Close()
- func (z *ZKHandler) Create(p string, d string) error
- func (z *ZKHandler) CreateSequential(p string, d string) error
- func (z *ZKHandler) Exists(p string) (bool, error)
- func (z *ZKHandler) Get(p string) ([]byte, error)
- func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
- func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error)
- func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error)
- func (z *ZKHandler) GetReassignments() Reassignments
- func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)
- func (z *ZKHandler) GetTopicState(t string) (*TopicState, error)
- func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)
- func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error)
- func (z *ZKHandler) Ready() bool
- func (z *ZKHandler) Set(p string, d string) error
- func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) (bool, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoBrokers = errors.New("No additional brokers that meet Constraints") ErrInvalidSelectionMethod = errors.New("Invalid selection method") )
var ( // ErrInvalidKafkaConfigType indicates invalid Kafka config types. ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type") )
Functions ¶
func WhatChanged ¶
WhatChanged takes a before and after broker replica set and returns a string describing what changed.
func WriteMap ¶
func WriteMap(pm *PartitionMap, path string) error
WriteMap takes a *PartitionMap and writes a JSON text file to the provided path.
Types ¶
type Broker ¶
type Broker struct { ID int Locality string Used int StorageFree float64 Replace bool Missing bool New bool }
Broker associates metadata with a real broker by ID.
type BrokerList ¶
type BrokerList []*Broker
BrokerList is a slice of brokers for sorting by used count.
func (BrokerList) BestCandidate ¶
func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)
BestCandidate takes a *Constraints, selection method and pass / iteration number (for use as a seed value for pseudo-random number generation) and returns the most suitable broker.
func (BrokerList) SortByCount ¶
func (b BrokerList) SortByCount()
SortByCount sorts the BrokerList by Used values.
func (BrokerList) SortByID ¶
func (b BrokerList) SortByID()
SortByID sorts the BrokerList by ID values.
func (BrokerList) SortByStorage ¶
func (b BrokerList) SortByStorage()
SortByStorage sorts the BrokerList by StorageFree values.
func (BrokerList) SortPseudoShuffle ¶
func (b BrokerList) SortPseudoShuffle(seed int64)
SortPseudoShuffle takes a BrokerList and performs a sort by count. For each sequence of brokers with equal counts, the sub-slice is pseudo random shuffled using the provided seed value s.
type BrokerMap ¶
BrokerMap holds a mapping of broker IDs to *Broker.
func BrokerMapFromPartitionMap ¶
func BrokerMapFromPartitionMap(pm *PartitionMap, bm BrokerMetaMap, force bool) BrokerMap
BrokerMapFromPartitionMap creates a BrokerMap from a partitionMap. TODO can we remove marked for replacement here too?
func (BrokerMap) AboveMean ¶
AboveMean returns a sorted []int of broker IDs that are above the mean by d percent (0.00 < d). The mean type is provided as a function parameter f.
func (BrokerMap) BelowMean ¶
BelowMean returns a sorted []int of broker IDs that are below the mean by d percent (0.00 < d). The mean type is provided as a function parameter f.
func (BrokerMap) List ¶
func (b BrokerMap) List() BrokerList
List take a BrokerMap and returns a BrokerList.
func (BrokerMap) MappedBrokers ¶
func (b BrokerMap) MappedBrokers(pm *PartitionMap) BrokerMap
MappedBrokers takes a PartitionMap and returns a new BrokerMap that only includes brokers found in the partition map holding a partition.
func (BrokerMap) NonReplacedBrokers ¶
NonReplacedBrokers returns a copy of a BrokerMap that excludes all brokers marked for replacement.
func (BrokerMap) StorageDiff ¶
StorageDiff takes two BrokerMaps and returns a per broker ID diff in storage as a [2]float64: [absolute, percentage] diff.
func (BrokerMap) StorageRange ¶
StorageRange returns the range of free storage for all brokers in the BrokerMap.
func (BrokerMap) StorageRangeSpread ¶
StorageRangeSpread returns the range spread of free storage for all brokers in the BrokerMap.
func (BrokerMap) StorageStdDev ¶
StorageStdDev returns the standard deviation of free storage for all brokers in the BrokerMap.
func (BrokerMap) SubStorageAll ¶
func (b BrokerMap) SubStorageAll(pm *PartitionMap, pmm PartitionMetaMap) error
SubStorageAll takes a PartitionMap + PartitionMetaMap and adds the size of each partition back to the StorageFree value of any broker it was originally mapped to. This is used in a force rebuild where the assumption is that partitions will be lifted and repositioned.
func (BrokerMap) SubStorageReplacements ¶
func (b BrokerMap) SubStorageReplacements(pm *PartitionMap, pmm PartitionMetaMap) error
SubStorageReplacements works similarly to SubStorageAll except that storage usage is only subtraced from brokers marked for replacement.
func (BrokerMap) SubstitutionAffinities ¶
func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)
SubstitutionAffinities finds all brokers marked for replacement and for each broker, it creates an exclusive association with a newly provided broker. In the rebuild stage, each to-be-replaced broker will be only replaced with the affinity it's associated with. A given new broker can only be an affinity for a single outgoing broker. An error is returned if a complete mapping of affinities cannot be constructed (e.g. two brokers are marked for replacement but only one new replacement was provided and substitution affinities is enabled).
func (BrokerMap) Update ¶
func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) *BrokerStatus
Update takes a []int of broker IDs and BrokerMap then adds them to the BrokerMap, returning the count of marked for replacement, newly included, and brokers that weren't found in ZooKeeper.
type BrokerMeta ¶
type BrokerMeta struct { Rack string `json:"rack"` StorageFree float64 // In bytes. MetricsIncomplete bool }
BrokerMeta holds metadata that describes a broker, used in satisfying constraints.
type BrokerMetaMap ¶
type BrokerMetaMap map[int]*BrokerMeta
BrokerMetaMap is a map of broker IDs to BrokerMeta metadata fetched from ZooKeeper. Currently, just the rack field is retrieved.
type BrokerMetrics ¶
type BrokerMetrics struct {
StorageFree float64
}
BrokerMetrics holds broker metric data fetched from ZK.
type BrokerMetricsMap ¶
type BrokerMetricsMap map[int]*BrokerMetrics
BrokerMetricsMap holds a mapping of broker ID to BrokerMetrics.
type BrokerStatus ¶
BrokerStatus summarizes change counts from an input and output broker list.
func (BrokerStatus) Changes ¶
func (bs BrokerStatus) Changes() bool
Changes returns a bool that indicates whether a BrokerStatus values represent a change in brokers.
type BrokerUseStats ¶
BrokerUseStats holds counts of partition ownership.
type BrokerUseStatsList ¶
type BrokerUseStatsList []*BrokerUseStats
BrokerUseStatsList is a slice of *BrokerUseStats.
func (BrokerUseStatsList) Len ¶
func (b BrokerUseStatsList) Len() int
func (BrokerUseStatsList) Less ¶
func (b BrokerUseStatsList) Less(i, j int) bool
func (BrokerUseStatsList) Swap ¶
func (b BrokerUseStatsList) Swap(i, j int)
type Config ¶
Config holds initialization paramaters for a Handler. Connect is a ZooKeeper connect string. Prefix should reflect any prefix used for Kafka on the reference ZooKeeper cluster (excluding slashes). MetricsPrefix is the prefix used for broker metrics metadata persisted in ZooKeeper.
type Constraints ¶
type Constraints struct {
// contains filtered or unexported fields
}
Constraints holds a map of IDs and locality key-values.
func MergeConstraints ¶
func MergeConstraints(bl BrokerList) *Constraints
MergeConstraints takes a brokerlist and builds a *Constraints by merging the attributes of all brokers from the supplied list.
func NewConstraints ¶
func NewConstraints() *Constraints
NewConstraints returns an empty *Constraints.
func (*Constraints) Add ¶
func (c *Constraints) Add(b *Broker)
Add takes a *Broker and adds its attributes to the *Constraints. The requestSize is also subtracted from the *Broker.StorageFree.
type DegreeDistribution ¶
type DegreeDistribution struct { // Relationships is a an adjacency list // where an edge between brokers is defined as // a common occupancy in at least one replica set. // For instance, given the replica set [1001,1002,1003], // ID 1002 has a relationship with 1001 and 1003. Relationships map[int]map[int]struct{} }
DegreeDistribution holds broker to broker relationships.
func NewDegreeDistribution ¶
func NewDegreeDistribution() DegreeDistribution
NewDegreeDistribution returns a new DegreeDistribution.
func (DegreeDistribution) Add ¶
func (dd DegreeDistribution) Add(nodes []int)
Add takes a []int of broker IDs representing a replica set and updates the adjacency lists for each broker in the set.
func (DegreeDistribution) Count ¶
func (dd DegreeDistribution) Count(n int) int
Count takes a node ID and returns the degree distribution.
func (DegreeDistribution) Stats ¶
func (dd DegreeDistribution) Stats() DegreeDistributionStats
Stats returns a DegreeDistributionStats.
type DegreeDistributionStats ¶
DegreeDistributionStats holds general statistical information describing the DegreeDistribution counts.
type ErrNoNode ¶
type ErrNoNode struct {
// contains filtered or unexported fields
}
ErrNoNode error type is specifically for Get method calls where the underlying error type is a zkclient.ErrNoNode.
type Handler ¶
type Handler interface { Exists(string) (bool, error) Create(string, string) error CreateSequential(string, string) error Set(string, string) error Get(string) ([]byte, error) Children(string) ([]string, error) Close() GetTopicState(string) (*TopicState, error) GetTopicStateISR(string) (TopicStateISR, error) UpdateKafkaConfig(KafkaConfig) (bool, error) GetReassignments() Reassignments GetTopics([]*regexp.Regexp) ([]string, error) GetTopicConfig(string) (*TopicConfig, error) GetAllBrokerMeta(bool) (BrokerMetaMap, []error) GetAllPartitionMeta() (PartitionMetaMap, error) GetPartitionMap(string) (*PartitionMap, error) Ready() bool }
Handler exposes basic ZooKeeper operations along with additional methods that return kafkazk package specific types, populated with data fetched from ZooKeeper.
func NewHandler ¶
NewHandler takes a *Config, performs any initialization and returns a Handler.
type KafkaConfig ¶
type KafkaConfig struct { Type string // Topic or broker. Name string // Entity name. Configs [][2]string // Slice of [2]string{key,value} configs. }
KafkaConfig is used to issue configuration updates to either topics or brokers in ZooKeeper.
type KafkaConfigData ¶
type KafkaConfigData struct { Version int `json:"version"` Config map[string]string `json:"config"` }
KafkaConfigData is used for unmarshalling /config/<type>/<name> data from ZooKeeper.
func NewKafkaConfigData ¶
func NewKafkaConfigData() KafkaConfigData
NewKafkaConfigData creates a KafkaConfigData.
type Mappings ¶
Mappings is a mapping of broker IDs to currently held partition as a partitionList.
func (Mappings) LargestPartitions ¶
func (m Mappings) LargestPartitions(id int, k int, pm PartitionMetaMap) (partitionList, error)
LargestPartitions takes a broker ID and PartitionMetaMap and returns a partitionList with the top k partitions by size for the provided broker ID.
type Mock ¶
type Mock struct{}
Mock mocks the Handler interface.
func (*Mock) CreateSequential ¶
CreateSequential mocks CreateSequential.
func (*Mock) GetAllBrokerMeta ¶
func (zk *Mock) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
GetAllBrokerMeta mocks GetAllBrokerMeta.
func (*Mock) GetAllPartitionMeta ¶
func (zk *Mock) GetAllPartitionMeta() (PartitionMetaMap, error)
GetAllPartitionMeta mocks GetAllPartitionMeta.
func (*Mock) GetBrokerMetrics ¶
func (zk *Mock) GetBrokerMetrics() (BrokerMetricsMap, error)
GetBrokerMetrics mocks GetBrokerMetrics.
func (*Mock) GetPartitionMap ¶
func (zk *Mock) GetPartitionMap(t string) (*PartitionMap, error)
GetPartitionMap mocks GetPartitionMap.
func (*Mock) GetReassignments ¶
func (zk *Mock) GetReassignments() Reassignments
GetReassignments mocks GetReassignments.
func (*Mock) GetTopicConfig ¶
func (zk *Mock) GetTopicConfig(t string) (*TopicConfig, error)
GetTopicConfig mocks GetTopicConfig.
func (*Mock) GetTopicState ¶
func (zk *Mock) GetTopicState(t string) (*TopicState, error)
GetTopicState mocks GetTopicState.
func (*Mock) GetTopicStateISR ¶
func (zk *Mock) GetTopicStateISR(t string) (TopicStateISR, error)
GetTopicStateISR mocks GetTopicStateISR.
func (*Mock) InitRawClient ¶
InitRawClient mocks InitRawClient.
func (*Mock) UpdateKafkaConfig ¶
func (zk *Mock) UpdateKafkaConfig(c KafkaConfig) (bool, error)
UpdateKafkaConfig mocks UpdateKafkaConfig.
type NoMappingForBroker ¶
type NoMappingForBroker struct {
// contains filtered or unexported fields
}
func (NoMappingForBroker) Error ¶
func (e NoMappingForBroker) Error() string
type NoMappingForTopic ¶
type NoMappingForTopic struct {
// contains filtered or unexported fields
}
func (NoMappingForTopic) Error ¶
func (e NoMappingForTopic) Error() string
type Partition ¶
type Partition struct { Topic string `json:"topic"` Partition int `json:"partition"` Replicas []int `json:"replicas"` }
Partition represents the Kafka partition structure.
type PartitionMap ¶
type PartitionMap struct { Version int `json:"version"` Partitions partitionList `json:"partitions"` }
PartitionMap represents the Kafka partition map structure.
func NewPartitionMap ¶
func NewPartitionMap() *PartitionMap
NewPartitionMap returns an empty *PartitionMap.
func PartitionMapFromString ¶
func PartitionMapFromString(s string) (*PartitionMap, error)
PartitionMapFromString takes a json encoded string and returns a *PartitionMap.
func PartitionMapFromZK ¶
func PartitionMapFromZK(t []*regexp.Regexp, zk Handler) (*PartitionMap, error)
PartitionMapFromZK takes a slice of regexp and finds all matching topics for each. A merged *PartitionMap of all matching topic maps is returned.
func (*PartitionMap) Copy ¶
func (pm *PartitionMap) Copy() *PartitionMap
Copy returns a copy of a *PartitionMap.
func (*PartitionMap) DegreeDistribution ¶
func (pm *PartitionMap) DegreeDistribution() DegreeDistribution
DegreeDistribution returns the DegreeDistribution for the PartitionMap.
func (*PartitionMap) LocalitiesAvailable ¶
func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string
LocalitiesAvailable takes a broker map and broker and returns a []string of localities that are unused by any of the brokers in any replica sets that the reference broker was found in. This is done by building a set of all localities observed across all replica sets and a set of all localities observed in replica sets containing the reference broker, then returning the diff.
func (*PartitionMap) Mappings ¶
func (pm *PartitionMap) Mappings() Mappings
Mappings returns a Mappings from a *PartitionMap.
func (*PartitionMap) Rebuild ¶
func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []string)
Rebuild takes a BrokerMap and rebuild strategy. It then traverses the partition map, replacing brokers marked removal with the best available candidate based on the selected rebuild strategy. A rebuilt *PartitionMap and []string of errors is returned.
func (*PartitionMap) SetReplication ¶
func (pm *PartitionMap) SetReplication(r int)
SetReplication ensures that replica sets is reset to the replication factor r. Sets exceeding r are truncated, sets below r are extended with stub brokers.
func (*PartitionMap) SimpleLeaderOptimization ¶
func (pm *PartitionMap) SimpleLeaderOptimization()
SimpleLeaderOptimization is a naive leadership optimization algorithm. It gets leadership counts for all brokers in the partition map and shuffles partition replica sets for those holding brokers with below average leadership.
func (*PartitionMap) Strip ¶
func (pm *PartitionMap) Strip() *PartitionMap
Strip takes a PartitionMap and returns a copy where all broker ID references are replaced with the stub broker with ID 0 where the replace field is set to true. This ensures that the entire map is rebuilt, even if the provided broker list matches what's already in the map.
func (*PartitionMap) UseStats ¶
func (pm *PartitionMap) UseStats() []*BrokerUseStats
UseStats returns a map of broker IDs to BrokerUseStats; each contains a count of leader and follower partition assignments.
type PartitionMeta ¶
type PartitionMeta struct {
Size float64 // In bytes.
}
PartitionMeta holds partition metadata.
type PartitionMetaMap ¶
type PartitionMetaMap map[string]map[int]*PartitionMeta
PartitionMetaMap is a mapping of topic, partition number to PartitionMeta.
func NewPartitionMetaMap ¶
func NewPartitionMetaMap() PartitionMetaMap
NewPartitionMetaMap returns an empty PartitionMetaMap.
type PartitionState ¶
type PartitionState struct { Version int `json:"version"` ControllerEpoch int `json:"controller_epoch"` Leader int `json:"leader"` LeaderEpoch int `json:"leader_epoch"` ISR []int `json:"isr"` }
PartitionState is used for unmarshalling json data from a partition state: e.g. /brokers/topics/some-topic/partitions/0/state
type Reassignments ¶
Reassignments is a map of topic:partition:brokers.
type RebuildParams ¶
type RebuildParams struct { PMM PartitionMetaMap BM BrokerMap Strategy string Optimization string Affinities SubstitutionAffinities PartnSzFactor float64 // contains filtered or unexported fields }
RebuildParams holds required parameters to call the Rebuild method on a *PartitionMap.
func NewRebuildParams ¶
func NewRebuildParams() RebuildParams
NewRebuildParams initializes a RebuildParams.
type SubstitutionAffinities ¶
SubstitutionAffinities is a mapping of an ID belonging to a *Broker marked for replacement and a replacement *Broker that will fill all previously filled replica slots held by the *Broker being replaced.
func (SubstitutionAffinities) Get ¶
func (sa SubstitutionAffinities) Get(id int) *Broker
Get takes a broker ID and returns a *Broker if one was set as a substitution affinity.
type TopicConfig ¶
TopicConfig is used for unmarshalling /config/topics/<topic> from ZooKeeper.
type TopicState ¶
TopicState is used for unmarshing ZooKeeper json data from a topic: e.g. /brokers/topics/some-topic
type TopicStateISR ¶
type TopicStateISR map[string]PartitionState
TopicStateISR is a map of partition numbers to PartitionState.
type ZKHandler ¶
type ZKHandler struct { Connect string Prefix string MetricsPrefix string // contains filtered or unexported fields }
ZKHandler implements the Handler interface for real ZooKeeper clusters.
func (*ZKHandler) Children ¶
Children takes a path p and returns a list of child znodes and an error if encountered.
func (*ZKHandler) Close ¶
func (z *ZKHandler) Close()
Close calls close on the *ZKHandler. Any additional shutdown cleanup or other tasks should be performed here.
func (*ZKHandler) Create ¶
Create creates the provided path p with the data from the provided string d and returns an error if encountered.
func (*ZKHandler) CreateSequential ¶
CreateSequential takes a path p and data d and creates a sequential znode at p with data d. An error is returned if encountered.
func (*ZKHandler) Exists ¶
Exists takes a path p and returns a bool as to whether the path exists and an error if encountered.
func (*ZKHandler) Get ¶
Get gets the provided path p and returns the data from the path and an error if encountered.
func (*ZKHandler) GetAllBrokerMeta ¶
func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
GetAllBrokerMeta looks up all registered Kafka brokers and returns their metadata as a BrokerMetaMap. A withMetrics bool param determines whether we additionally want to fetch stored broker metrics.
func (*ZKHandler) GetAllPartitionMeta ¶
func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error)
GetAllPartitionMeta fetches partition metadata stored in Zookeeper.
func (*ZKHandler) GetPartitionMap ¶
func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error)
GetPartitionMap takes a topic name. If the topic exists, the state of the topic is fetched and translated into a *PartitionMap.
func (*ZKHandler) GetReassignments ¶
func (z *ZKHandler) GetReassignments() Reassignments
GetReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments type.
func (*ZKHandler) GetTopicConfig ¶
func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)
GetTopicConfig takes a topic name. If the topic exists, the topic config is returned as a *TopicConfig.
func (*ZKHandler) GetTopicState ¶
func (z *ZKHandler) GetTopicState(t string) (*TopicState, error)
GetTopicState takes a topic name. If the topic exists, the topic state is returned as a *TopicState.
func (*ZKHandler) GetTopicStateISR ¶
func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)
GetTopicStateCurrentISR takes a topic name. If the topic exists, the topic state is returned as a TopicStateISR. GetTopicStateCurrentISR differs from GetTopicState in that the actual, current broker IDs in the ISR are returned for each partition. This method is notably more expensive due to the need for a call per partition to ZK.
func (*ZKHandler) GetTopics ¶
GetTopics takes a []*regexp.Regexp and returns a []string of all topic names that match any of the provided regex.
func (*ZKHandler) Ready ¶
Ready returns true if the client is in either state StateConnected or StateHasSession. See https://godoc.org/github.com/samuel/go-zookeeper/zk#State.
func (*ZKHandler) Set ¶
Set sets the provided path p data to the provided string d and returns an error if encountered.
func (*ZKHandler) UpdateKafkaConfig ¶
func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) (bool, error)
UpdateKafkaConfig takes a KafkaConfig with key value pairs of entity config. If the config is changed, a persistent sequential znode is also written to propagate changes (via watches) to all Kafka brokers. This is a Kafka specific behavior; further references are available from the Kafka codebase. A bool is returned indicating whether the config was changed (if a config is updated to the existing value, 'false' is returned) along with any errors encountered. If a config value is set to an empty string (""), the entire config key itself is deleted. This was a convenient way to combine update/delete into a single func.