Documentation ¶
Index ¶
- Constants
- Variables
- func WriteMap(pm *PartitionMap, path string) error
- type Broker
- type BrokerFilterFn
- type BrokerList
- func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)
- func (b BrokerList) Filter(f BrokerFilterFn) BrokerList
- func (b BrokerList) SortByCount()
- func (b BrokerList) SortByID()
- func (b BrokerList) SortByIDDesc()
- func (b BrokerList) SortByStorage()
- func (b BrokerList) SortPseudoShuffle(seed int64)
- type BrokerMap
- func (b BrokerMap) AboveMean(d float64, f func() float64) []int
- func (b BrokerMap) BelowMean(d float64, f func() float64) []int
- func (b BrokerMap) Copy() BrokerMap
- func (b BrokerMap) Filter(f BrokerFilterFn) BrokerMap
- func (b BrokerMap) HMean() float64
- func (b BrokerMap) List() BrokerList
- func (b BrokerMap) Mean() float64
- func (b BrokerMap) MinMax() (float64, float64)
- func (b BrokerMap) StorageDiff(b2 BrokerMap) map[int][2]float64
- func (b BrokerMap) StorageRange() float64
- func (b BrokerMap) StorageRangeSpread() float64
- func (b BrokerMap) StorageStdDev() float64
- func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f BrokerFilterFn) error
- func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)
- func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan string)
- type BrokerMeta
- type BrokerMetaMap
- type BrokerMetrics
- type BrokerMetricsMap
- type BrokerStatus
- type BrokerUseStats
- type BrokerUseStatsList
- type BrokerUseStatsMap
- type Config
- type Constraints
- type ConstraintsParams
- type DegreeDistribution
- type DegreeDistributionStats
- type ErrNoNode
- type Handler
- type KafkaConfig
- type KafkaConfigData
- type KafkaConfigKV
- type Mappings
- type NoMappingForBroker
- type NoMappingForTopic
- type Partition
- type PartitionList
- type PartitionMap
- func (pm PartitionMap) BrokersIn() BrokerFilterFn
- func (pm *PartitionMap) Copy() *PartitionMap
- func (pm *PartitionMap) DegreeDistribution() DegreeDistribution
- func (pm *PartitionMap) Equal(pm2 *PartitionMap) (bool, error)
- func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string
- func (pm *PartitionMap) Mappings() Mappings
- func (pm *PartitionMap) OptimizeLeaderFollower()
- func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error)
- func (pm *PartitionMap) ReplicaSets(t string) ReplicaSets
- func (pm *PartitionMap) SetReplication(r int)
- func (pm *PartitionMap) Strip() *PartitionMap
- func (pm *PartitionMap) Topics() []string
- func (pm *PartitionMap) UseStats() BrokerUseStatsMap
- type PartitionMapOpt
- type PartitionMeta
- type PartitionMetaMap
- type PartitionState
- type Reassignments
- type RebuildParams
- type ReplicaSets
- type SimpleZooKeeperClient
- type Stub
- func (zk *Stub) AddBrokers(b map[int]BrokerMeta)
- func (zk *Stub) Children(p string) ([]string, error)
- func (zk *Stub) Close()
- func (zk *Stub) Create(p, d string) error
- func (zk *Stub) CreateSequential(a, b string) error
- func (zk *Stub) Delete(p string) error
- func (zk *Stub) Exists(p string) (bool, error)
- func (zk *Stub) Get(p string) ([]byte, error)
- func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
- func (zk *Stub) GetAllPartitionMeta() (PartitionMetaMap, error)
- func (zk *Stub) GetBrokerMetrics() (BrokerMetricsMap, error)
- func (zk *Stub) GetPartitionMap(t string) (*PartitionMap, error)
- func (zk *Stub) GetPendingDeletion() ([]string, error)
- func (zk *Stub) GetReassignments() Reassignments
- func (zk *Stub) GetTopicConfig(t string) (*TopicConfig, error)
- func (zk *Stub) GetTopicMetadata(t string) (TopicMetadata, error)
- func (zk *Stub) GetTopicState(t string) (*TopicState, error)
- func (zk *Stub) GetTopicStateISR(t string) (TopicStateISR, error)
- func (zk *Stub) GetTopics(ts []*regexp.Regexp) ([]string, error)
- func (zk *Stub) GetUnderReplicated() ([]string, error)
- func (zk *Stub) InitRawClient() error
- func (zk *Stub) ListReassignments() (Reassignments, error)
- func (zk *Stub) MaxMetaAge() (time.Duration, error)
- func (zk *Stub) NextInt(p string) (int32, error)
- func (zk *Stub) Ready() bool
- func (zk *Stub) RemoveBrokers(ids []int)
- func (zk *Stub) Set(p, d string) error
- func (zk *Stub) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
- type StubZnode
- type SubstitutionAffinities
- type TopicConfig
- type TopicMetadata
- type TopicState
- type TopicStateISR
- type ZKHandler
- func (z *ZKHandler) Children(p string) ([]string, error)
- func (z *ZKHandler) Close()
- func (z *ZKHandler) Create(p string, d string) error
- func (z *ZKHandler) CreateSequential(p string, d string) error
- func (z *ZKHandler) Delete(p string) error
- func (z *ZKHandler) Exists(p string) (bool, error)
- func (z *ZKHandler) Get(p string) ([]byte, error)
- func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
- func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error)
- func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error)
- func (z *ZKHandler) GetPendingDeletion() ([]string, error)
- func (z *ZKHandler) GetReassignments() Reassignments
- func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)
- func (z *ZKHandler) GetTopicMetadata(t string) (TopicMetadata, error)
- func (z *ZKHandler) GetTopicState(t string) (*TopicState, error)
- func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)
- func (z *ZKHandler) GetTopics(ts []*regexp.Regexp) ([]string, error)
- func (z *ZKHandler) GetUnderReplicated() ([]string, error)
- func (z *ZKHandler) ListReassignments() (Reassignments, error)
- func (z *ZKHandler) MaxMetaAge() (time.Duration, error)
- func (z *ZKHandler) NextInt(p string) (int32, error)
- func (z *ZKHandler) Ready() bool
- func (z *ZKHandler) Set(p string, d string) error
- func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
Constants ¶
const ( // StubBrokerID is the platform int max. StubBrokerID int = int(^uint(0) >> 1) )
Variables ¶
var ( // ErrNoBrokers error. ErrNoBrokers = errors.New("No additional brokers that meet Constraints") // ErrInvalidSelectionMethod error. ErrInvalidSelectionMethod = errors.New("Invalid selection method") )
var ( // ErrInvalidKafkaConfigType error. ErrInvalidKafkaConfigType = errors.New("Invalid Kafka config type") )
var NotReplacedBrokersFn = func(b *Broker) bool { return !b.Replace }
var ReplacedBrokersFn = func(b *Broker) bool { return b.Replace }
Functions ¶
func WriteMap ¶
func WriteMap(pm *PartitionMap, path string) error
WriteMap takes a *PartitionMap and writes a JSON text file to the provided path.
Types ¶
type Broker ¶
type Broker struct { ID int Locality string Used int StorageFree float64 Replace bool Missing bool New bool }
Broker associates metadata with a real broker by ID.
type BrokerFilterFn ¶
BrokerFilterFn is a filter function for BrokerList and BrokerMap types.
var AllBrokersFn BrokerFilterFn = func(b *Broker) bool { return true }
AllBrokersFn returns all brokers.
func AboveMeanFn ¶ added in v3.14.0
func AboveMeanFn(d float64, f func() float64) BrokerFilterFn
AboveMeanFn returns a BrokerFilterFn that filters brokers that are above the mean by d percent (0.00 < d). The mean type is provided as a function f.
func BelowMeanFn ¶ added in v3.14.0
func BelowMeanFn(d float64, f func() float64) BrokerFilterFn
BelowMeanFn returns a BrokerFilterFn that filters brokers that are below the mean by d percent (0.00 < d). The mean type is provided as a function f.
type BrokerList ¶
type BrokerList []*Broker
BrokerList is a slice of brokers for sorting by used count.
func (BrokerList) BestCandidate ¶
func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)
TODO deprecate. BestCandidate takes a *Constraints, selection method and pass / iteration number (for use as a seed value for pseudo-random number generation) and returns the most suitable broker.
func (BrokerList) Filter ¶
func (b BrokerList) Filter(f BrokerFilterFn) BrokerList
Filter returns a BrokerList of brokers that return true as an input to function f.
func (BrokerList) SortByCount ¶
func (b BrokerList) SortByCount()
SortByCount sorts the BrokerList by Used values.
func (BrokerList) SortByID ¶
func (b BrokerList) SortByID()
SortByID sorts the BrokerList by ID values.
func (BrokerList) SortByIDDesc ¶ added in v3.17.0
func (b BrokerList) SortByIDDesc()
SortByIDDesc sorts the BrokerList by ID values.
func (BrokerList) SortByStorage ¶
func (b BrokerList) SortByStorage()
SortByStorage sorts the BrokerList by StorageFree values.
func (BrokerList) SortPseudoShuffle ¶
func (b BrokerList) SortPseudoShuffle(seed int64)
SortPseudoShuffle takes a BrokerList and performs a sort by count. For each sequence of brokers with equal counts, the sub-slice is pseudo random shuffled using the provided seed value s.
type BrokerMap ¶
BrokerMap holds a mapping of broker IDs to *Broker.
func BrokerMapFromPartitionMap ¶
func BrokerMapFromPartitionMap(pm *PartitionMap, bm BrokerMetaMap, force bool) BrokerMap
BrokerMapFromPartitionMap creates a BrokerMap from a partitionMap.
func (BrokerMap) AboveMean ¶
AboveMean returns a sorted []int of broker IDs that are above the mean by d percent (0.00 < d). The mean type is provided as a function f.
func (BrokerMap) BelowMean ¶
BelowMean returns a sorted []int of broker IDs that are below the mean by d percent (0.00 < d). The mean type is provided as a function f.
func (BrokerMap) Filter ¶
func (b BrokerMap) Filter(f BrokerFilterFn) BrokerMap
Filter returns a BrokerMap of brokers that return true as an input to function f.
func (BrokerMap) List ¶
func (b BrokerMap) List() BrokerList
List take a BrokerMap and returns a BrokerList.
func (BrokerMap) StorageDiff ¶
StorageDiff takes two BrokerMaps and returns a per broker ID diff in storage as a [2]float64: [absolute, percentage] diff.
func (BrokerMap) StorageRange ¶
StorageRange returns the range of free storage for all brokers in the BrokerMap.
func (BrokerMap) StorageRangeSpread ¶
StorageRangeSpread returns the range spread of free storage for all brokers in the BrokerMap.
func (BrokerMap) StorageStdDev ¶
StorageStdDev returns the standard deviation of free storage for all brokers in the BrokerMap.
func (BrokerMap) SubStorage ¶
func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f BrokerFilterFn) error
SubStorageAll takes a PartitionMap, PartitionMetaMap, and a function. For all brokers that return true as an input to function f, the size of all partitions held is added back to the broker StorageFree value.
func (BrokerMap) SubstitutionAffinities ¶
func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)
SubstitutionAffinities finds all brokers marked for replacement and for each broker, it creates an exclusive association with a newly provided broker. In the rebuild stage, each to-be-replaced broker will be only replaced with the affinity it's associated with. A given new broker can only be an affinity for a single outgoing broker. An error is returned if a complete mapping of affinities cannot be constructed (e.g. two brokers are marked for replacement but only one new replacement was provided and substitution affinities is enabled).
func (BrokerMap) Update ¶
func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan string)
Update takes a []int of broker IDs and BrokerMetaMap then adds them to the BrokerMap, returning the count of marked for replacement, newly included, and brokers that weren't found in ZooKeeper. Additionally, a channel of msgs describing changes is returned.
type BrokerMeta ¶
type BrokerMeta struct { StorageFree float64 // In bytes. MetricsIncomplete bool // Metadata from ZooKeeper. ListenerSecurityProtocolMap map[string]string `json:"listener_security_protocol_map"` Endpoints []string `json:"endpoints"` Rack string `json:"rack"` JMXPort int `json:"jmx_port"` Host string `json:"host"` Timestamp string `json:"timestamp"` Port int `json:"port"` Version int `json:"version"` }
BrokerMeta holds metadata that describes a broker, used in satisfying constraints.
func (BrokerMeta) Copy ¶ added in v3.11.0
func (bm BrokerMeta) Copy() BrokerMeta
Copy returns a copy of a BrokerMeta.
type BrokerMetaMap ¶
type BrokerMetaMap map[int]*BrokerMeta
BrokerMetaMap is a map of broker IDs to BrokerMeta metadata fetched from ZooKeeper. Currently, just the rack field is retrieved.
func (BrokerMetaMap) Copy ¶ added in v3.11.0
func (bmm BrokerMetaMap) Copy() BrokerMetaMap
Copy returns a copy of a BrokerMetaMap.
type BrokerMetrics ¶
type BrokerMetrics struct {
StorageFree float64
}
BrokerMetrics holds broker metric data fetched from ZK.
type BrokerMetricsMap ¶
type BrokerMetricsMap map[int]*BrokerMetrics
BrokerMetricsMap holds a mapping of broker ID to BrokerMetrics.
type BrokerStatus ¶
BrokerStatus summarizes change counts from an input and output broker list.
func (BrokerStatus) Changes ¶
func (bs BrokerStatus) Changes() bool
Changes returns a bool that indicates whether a BrokerStatus values represent a change in brokers.
type BrokerUseStats ¶
BrokerUseStats holds counts of partition ownership.
type BrokerUseStatsList ¶
type BrokerUseStatsList []*BrokerUseStats
BrokerUseStatsList is a slice of *BrokerUseStats.
func (BrokerUseStatsList) Len ¶
func (b BrokerUseStatsList) Len() int
func (BrokerUseStatsList) Less ¶
func (b BrokerUseStatsList) Less(i, j int) bool
func (BrokerUseStatsList) Swap ¶
func (b BrokerUseStatsList) Swap(i, j int)
type BrokerUseStatsMap ¶
type BrokerUseStatsMap map[int]*BrokerUseStats
BrokerUseStatsList is a map of IDs to *BrokerUseStats.
func (BrokerUseStatsMap) List ¶
func (b BrokerUseStatsMap) List() BrokerUseStatsList
List returns a BrokerUseStatsList from a BrokerUseStatsMap.
type Config ¶
Config holds initialization paramaters for a Handler. Connect is a ZooKeeper connect string. Prefix should reflect any prefix used for Kafka on the reference ZooKeeper cluster (excluding slashes). MetricsPrefix is the prefix used for broker metrics metadata persisted in ZooKeeper.
type Constraints ¶
type Constraints struct {
// contains filtered or unexported fields
}
Constraints holds a map of IDs and locality key-values.
func MergeConstraints ¶
func MergeConstraints(bl BrokerList) *Constraints
TODO deprecate. MergeConstraints takes a brokerlist and builds a *Constraints by merging the attributes of all brokers from the supplied list.
func NewConstraints ¶
func NewConstraints() *Constraints
NewConstraints returns an empty *Constraints.
func (*Constraints) Add ¶
func (c *Constraints) Add(b *Broker)
Add takes a *Broker and adds its attributes to the *Constraints. The requestSize is also subtracted from the *Broker.StorageFree.
func (*Constraints) MergeConstraints ¶
func (c *Constraints) MergeConstraints(bl BrokerList)
MergeConstraints takes a brokerlist and updates the *Constraints by merging the attributes of all brokers from the supplied list.
func (*Constraints) SelectBroker ¶
func (c *Constraints) SelectBroker(b BrokerList, p ConstraintsParams) (*Broker, error)
SelectBroker takes a BrokerList and a ConstraintsParams and selects the most suitable broker that passes all specified constraints.
type ConstraintsParams ¶
type ConstraintsParams struct { SelectorMethod string MinUniqueRackIDs int RequestSize float64 SeedVal int64 }
ConstraintsParams holds parameters for the SelectBroker method.
type DegreeDistribution ¶
type DegreeDistribution struct { // Relationships is a an adjacency list // where an edge between brokers is defined as // a common occupancy in at least one replica set. // For instance, given the replica set [1001,1002,1003], // ID 1002 has a relationship with 1001 and 1003. Relationships map[int]map[int]struct{} }
DegreeDistribution counts broker to broker relationships.
func NewDegreeDistribution ¶
func NewDegreeDistribution() DegreeDistribution
NewDegreeDistribution returns a new DegreeDistribution.
func (DegreeDistribution) Add ¶
func (dd DegreeDistribution) Add(nodes []int)
Add takes a []int of broker IDs representing a replica set and updates the adjacency lists for each broker in the set.
func (DegreeDistribution) Count ¶
func (dd DegreeDistribution) Count(n int) int
Count takes a node ID and returns the degree distribution.
func (DegreeDistribution) Stats ¶
func (dd DegreeDistribution) Stats() DegreeDistributionStats
Stats returns a DegreeDistributionStats.
type DegreeDistributionStats ¶
DegreeDistributionStats holds general statistical information describing the DegreeDistribution counts.
type ErrNoNode ¶
type ErrNoNode struct {
// contains filtered or unexported fields
}
ErrNoNode error type is specifically for Get method calls where the underlying error type is a zkclient.ErrNoNode.
type Handler ¶
type Handler interface { SimpleZooKeeperClient GetTopicState(string) (*TopicState, error) GetTopicStateISR(string) (TopicStateISR, error) UpdateKafkaConfig(KafkaConfig) ([]bool, error) GetReassignments() Reassignments ListReassignments() (Reassignments, error) GetUnderReplicated() ([]string, error) GetPendingDeletion() ([]string, error) GetTopics([]*regexp.Regexp) ([]string, error) GetTopicConfig(string) (*TopicConfig, error) GetTopicMetadata(string) (TopicMetadata, error) GetAllBrokerMeta(bool) (BrokerMetaMap, []error) GetAllPartitionMeta() (PartitionMetaMap, error) MaxMetaAge() (time.Duration, error) GetPartitionMap(string) (*PartitionMap, error) }
Handler specifies an interface for common Kafka metadata retrieval and configuration methods.
func NewHandler ¶
NewHandler takes a *Config, performs any initialization and returns a Handler.
type KafkaConfig ¶
type KafkaConfig struct { Type string // Topic or broker. Name string // Entity name. Configs []KafkaConfigKV // Config KVs. }
KafkaConfig is used to issue configuration updates to either topics or brokers in ZooKeeper.
type KafkaConfigData ¶
type KafkaConfigData struct { Version int `json:"version"` Config map[string]string `json:"config"` }
KafkaConfigData is used for unmarshalling /config/<type>/<name> data from ZooKeeper.
func NewKafkaConfigData ¶
func NewKafkaConfigData() KafkaConfigData
NewKafkaConfigData creates a KafkaConfigData.
type KafkaConfigKV ¶
type KafkaConfigKV [2]string
KafkaConfigKV is a [2]string{key, value} representing a Kafka configuration.
type Mappings ¶
type Mappings map[int]map[string]PartitionList
Mappings is a mapping of broker IDs to currently held partition as a PartitionList.
func (Mappings) LargestPartitions ¶
func (m Mappings) LargestPartitions(id int, k int, pm PartitionMetaMap) (PartitionList, error)
LargestPartitions takes a broker ID and PartitionMetaMap and returns a PartitionList with the top k partitions by size for the provided broker ID.
type NoMappingForBroker ¶
type NoMappingForBroker struct {
// contains filtered or unexported fields
}
NoMappingForBroker error.
func (NoMappingForBroker) Error ¶
func (e NoMappingForBroker) Error() string
type NoMappingForTopic ¶
type NoMappingForTopic struct {
// contains filtered or unexported fields
}
NoMappingForTopic error.
func (NoMappingForTopic) Error ¶
func (e NoMappingForTopic) Error() string
type Partition ¶
type Partition struct { Topic string `json:"topic"` Partition int `json:"partition"` Replicas []int `json:"replicas"` }
Partition represents the Kafka partition structure.
type PartitionList ¶
type PartitionList []Partition
PartitionList is a []Partition.
func (PartitionList) Len ¶
func (p PartitionList) Len() int
func (PartitionList) Less ¶
func (p PartitionList) Less(i, j int) bool
func (PartitionList) SortBySize ¶
func (p PartitionList) SortBySize(m PartitionMetaMap)
SortBySize takes a PartitionMetaMap and sorts the PartitionList by partition size.
func (PartitionList) Swap ¶
func (p PartitionList) Swap(i, j int)
type PartitionMap ¶
type PartitionMap struct { Version int `json:"version"` Partitions PartitionList `json:"partitions"` }
PartitionMap represents the Kafka partition map structure.
func NewPartitionMap ¶
func NewPartitionMap(opts ...PartitionMapOpt) *PartitionMap
NewPartitionMap returns an empty *PartitionMap with optionally provided PartitionMapOpts.
func PartitionMapFromString ¶
func PartitionMapFromString(s string) (*PartitionMap, error)
PartitionMapFromString takes a json encoded string and returns a *PartitionMap.
func PartitionMapFromZK ¶
func PartitionMapFromZK(t []*regexp.Regexp, zk Handler) (*PartitionMap, error)
PartitionMapFromZK takes a slice of regexp and finds all matching topics for each. A merged *PartitionMap of all matching topic maps is returned.
func (PartitionMap) BrokersIn ¶ added in v3.14.0
func (pm PartitionMap) BrokersIn() BrokerFilterFn
BrokersIn returns a BrokerFilterFn that filters for brokers in the PartitionMap
func (*PartitionMap) Copy ¶
func (pm *PartitionMap) Copy() *PartitionMap
Copy returns a copy of a *PartitionMap.
func (*PartitionMap) DegreeDistribution ¶
func (pm *PartitionMap) DegreeDistribution() DegreeDistribution
DegreeDistribution returns the DegreeDistribution for the PartitionMap.
func (*PartitionMap) Equal ¶
func (pm *PartitionMap) Equal(pm2 *PartitionMap) (bool, error)
Equal checks the ity betwee two partition maps. Equality requires that the total order is exactly the same.
func (*PartitionMap) LocalitiesAvailable ¶
func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string
LocalitiesAvailable takes a broker map and broker and returns a []string of localities that are unused by any of the brokers in any replica sets that the reference broker was found in. This is done by building a set of all localities observed across all replica sets and a set of all localities observed in replica sets containing the reference broker, then returning the diff.
func (*PartitionMap) Mappings ¶
func (pm *PartitionMap) Mappings() Mappings
Mappings returns a Mappings from a *PartitionMap.
func (*PartitionMap) OptimizeLeaderFollower ¶
func (pm *PartitionMap) OptimizeLeaderFollower()
OptimizeLeaderFollower is a simple leadership optimization algorithm that iterates over each partition's replica set and sorts brokers according to their leader/follower position ratio, ascending. The idea is that if a broker has a high leader/follower ratio, it should go further down the replica list. This ratio is recalculated at each replica set visited to avoid extreme skew.
func (*PartitionMap) Rebuild ¶
func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error)
Rebuild takes a BrokerMap and rebuild strategy. It then traverses the partition map, replacing brokers marked removal with the best available candidate based on the selected rebuild strategy. A rebuilt *PartitionMap and []error of errors is returned.
func (*PartitionMap) ReplicaSets ¶
func (pm *PartitionMap) ReplicaSets(t string) ReplicaSets
ReplicaSets takes a topic name and returns a ReplicaSets.
func (*PartitionMap) SetReplication ¶
func (pm *PartitionMap) SetReplication(r int)
SetReplication ensures that replica sets is reset to the replication factor r. Sets exceeding r are truncated, sets below r are extended with stub brokers.
func (*PartitionMap) Strip ¶
func (pm *PartitionMap) Strip() *PartitionMap
Strip takes a PartitionMap and returns a copy where all broker ID references are replaced with the stub broker (ID == StubBrokerID) with the replace field is set to true. This ensures that the entire map is rebuilt, even if the provided broker list matches what's already in the map.
func (*PartitionMap) Topics ¶
func (pm *PartitionMap) Topics() []string
Topics returns a []string of topic names held in the PartitionMap.
func (*PartitionMap) UseStats ¶
func (pm *PartitionMap) UseStats() BrokerUseStatsMap
UseStats returns a map of broker IDs to BrokerUseStats; each contains a count of leader and follower partition assignments.
type PartitionMapOpt ¶
type PartitionMapOpt func(*PartitionMap)
PartitionMapOpt is a function that configures a *PartitionMap at instantiation time.
func Populate ¶
func Populate(s string, n, r int) PartitionMapOpt
Populate takes a name, partition count and replication factor and returns a PartitionMapOpt.
type PartitionMeta ¶
type PartitionMeta struct {
Size float64 // In bytes.
}
PartitionMeta holds partition metadata.
type PartitionMetaMap ¶
type PartitionMetaMap map[string]map[int]*PartitionMeta
PartitionMetaMap is a mapping of topic, partition number to PartitionMeta.
func NewPartitionMetaMap ¶
func NewPartitionMetaMap() PartitionMetaMap
NewPartitionMetaMap returns an empty PartitionMetaMap.
type PartitionState ¶
type PartitionState struct { Version int `json:"version"` ControllerEpoch int `json:"controller_epoch"` Leader int `json:"leader"` LeaderEpoch int `json:"leader_epoch"` ISR []int `json:"isr"` }
PartitionState is used for unmarshalling json data from a partition state: e.g. /brokers/topics/some-topic/partitions/0/state
type Reassignments ¶
Reassignments is a map of topic:partition:brokers.
type RebuildParams ¶
type RebuildParams struct { PMM PartitionMetaMap BM BrokerMap Strategy string Optimization string Affinities SubstitutionAffinities PartnSzFactor float64 MinUniqueRackIDs int // contains filtered or unexported fields }
RebuildParams holds required parameters to call the Rebuild method on a *PartitionMap.
func NewRebuildParams ¶
func NewRebuildParams() RebuildParams
NewRebuildParams initializes a RebuildParams.
type ReplicaSets ¶
ReplicaSets is a mapping of partition number to Partition.Replicas. Take note that there is no topic identifier and that partition numbers from two different topics can overwrite one another.
type SimpleZooKeeperClient ¶ added in v3.14.0
type SimpleZooKeeperClient interface { Exists(string) (bool, error) Create(string, string) error CreateSequential(string, string) error Set(string, string) error Get(string) ([]byte, error) Delete(string) error Children(string) ([]string, error) NextInt(string) (int32, error) Close() Ready() bool }
SimpleZooKeeperClient is an interface that wraps a real ZooKeeper client, obscuring much of the API semantics that are unneeded for a ZooKeeper based Handler implementation.
type Stub ¶ added in v3.6.0
type Stub struct {
// contains filtered or unexported fields
}
Stub stubs the Handler interface.
func NewZooKeeperStub ¶ added in v3.6.0
func NewZooKeeperStub() *Stub
NewZooKeeperStub returns a stub ZooKeeper.
func (*Stub) AddBrokers ¶ added in v3.11.0
func (zk *Stub) AddBrokers(b map[int]BrokerMeta)
AddBrokers takes a map of broker ID to BrokerMeta and adds it to the Stub BrokerMetaMap.
func (*Stub) CreateSequential ¶ added in v3.6.0
CreateSequential stubs CreateSequential.
func (*Stub) GetAllBrokerMeta ¶ added in v3.6.0
func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
GetAllBrokerMeta stubs GetAllBrokerMeta.
func (*Stub) GetAllPartitionMeta ¶ added in v3.6.0
func (zk *Stub) GetAllPartitionMeta() (PartitionMetaMap, error)
GetAllPartitionMeta stubs GetAllPartitionMeta.
func (*Stub) GetBrokerMetrics ¶ added in v3.6.0
func (zk *Stub) GetBrokerMetrics() (BrokerMetricsMap, error)
GetBrokerMetrics stubs GetBrokerMetrics.
func (*Stub) GetPartitionMap ¶ added in v3.6.0
func (zk *Stub) GetPartitionMap(t string) (*PartitionMap, error)
GetPartitionMap stubs GetPartitionMap.
func (*Stub) GetPendingDeletion ¶ added in v3.6.0
func (*Stub) GetReassignments ¶ added in v3.6.0
func (zk *Stub) GetReassignments() Reassignments
GetReassignments stubs GetReassignments.
func (*Stub) GetTopicConfig ¶ added in v3.6.0
func (zk *Stub) GetTopicConfig(t string) (*TopicConfig, error)
GetTopicConfig stubs GetTopicConfig.
func (*Stub) GetTopicMetadata ¶ added in v3.14.0
func (zk *Stub) GetTopicMetadata(t string) (TopicMetadata, error)
GetTopicMetadata stubs GetTopicMetadata.
func (*Stub) GetTopicState ¶ added in v3.6.0
func (zk *Stub) GetTopicState(t string) (*TopicState, error)
GetTopicState stubs GetTopicState.
func (*Stub) GetTopicStateISR ¶ added in v3.6.0
func (zk *Stub) GetTopicStateISR(t string) (TopicStateISR, error)
GetTopicStateISR stubs GetTopicStateISR.
func (*Stub) GetUnderReplicated ¶ added in v3.6.0
func (*Stub) InitRawClient ¶ added in v3.6.0
InitRawClient stubs InitRawClient.
func (*Stub) ListReassignments ¶ added in v3.14.0
func (zk *Stub) ListReassignments() (Reassignments, error)
ListReassignments stubs ListReassignments.
func (*Stub) MaxMetaAge ¶ added in v3.6.0
MaxMetaAge stubs MaxMetaAge.
func (*Stub) RemoveBrokers ¶ added in v3.11.0
RemoveBrokers removes the specified IDs from the BrokerMetaMap. This can be used in testing to simulate brokers leaving the cluster.
func (*Stub) UpdateKafkaConfig ¶ added in v3.6.0
func (zk *Stub) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
UpdateKafkaConfig stubs UpdateKafkaConfig.
type StubZnode ¶ added in v3.6.0
type StubZnode struct {
// contains filtered or unexported fields
}
StubZnode stubs a ZooKeeper znode.
type SubstitutionAffinities ¶
SubstitutionAffinities is a mapping of an ID belonging to a *Broker marked for replacement and a replacement *Broker that will fill all previously filled replica slots held by the *Broker being replaced.
func (SubstitutionAffinities) Get ¶
func (sa SubstitutionAffinities) Get(id int) *Broker
Get takes a broker ID and returns a *Broker if one was set as a substitution affinity.
type TopicConfig ¶
TopicConfig is used for unmarshalling /config/topics/<topic> from ZooKeeper.
type TopicMetadata ¶ added in v3.14.0
type TopicMetadata struct { Version int Name string TopicID string `json:"topic_id"` Partitions map[int][]int AddingReplicas map[int][]int `json:"adding_replicas"` RemovingReplicas map[int][]int `json:"removing_replicas"` }
TopicMetadata holds the topic data found in the /brokers/topics/<topic> znode. This is designed for the version 3 fields present in Kafka version ~2.4+.
func (TopicMetadata) Reassignments ¶ added in v3.14.0
func (tm TopicMetadata) Reassignments() Reassignments
Reassignments returns a Reassignments from a given topics TopicMetadata.
type TopicState ¶
TopicState is used for unmarshalling ZooKeeper json data from a topic: e.g. /brokers/topics/some-topic
func (*TopicState) Brokers ¶ added in v3.11.0
func (ts *TopicState) Brokers() []int
Brokers returns a []int of broker IDs from all partitions in the TopicState.
type TopicStateISR ¶
type TopicStateISR map[string]PartitionState
TopicStateISR is a map of partition numbers to PartitionState.
type ZKHandler ¶
type ZKHandler struct { Connect string Prefix string MetricsPrefix string // contains filtered or unexported fields }
ZKHandler implements the Handler interface for real ZooKeeper clusters.
func (*ZKHandler) Children ¶
Children takes a path p and returns a list of child znodes and an error if encountered.
func (*ZKHandler) Close ¶
func (z *ZKHandler) Close()
Close calls close on the *ZKHandler. Any additional shutdown cleanup or other tasks should be performed here.
func (*ZKHandler) Create ¶
Create creates the provided path p with the data from the provided string d and returns an error if encountered.
func (*ZKHandler) CreateSequential ¶
CreateSequential takes a path p and data d and creates a sequential znode at p with data d. An error is returned if encountered.
func (*ZKHandler) Exists ¶
Exists takes a path p and returns a bool as to whether the path exists and an error if encountered.
func (*ZKHandler) GetAllBrokerMeta ¶
func (z *ZKHandler) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
GetAllBrokerMeta looks up all registered Kafka brokers and returns their metadata as a BrokerMetaMap. A withMetrics bool param determines whether we additionally want to fetch stored broker metrics.
func (*ZKHandler) GetAllPartitionMeta ¶
func (z *ZKHandler) GetAllPartitionMeta() (PartitionMetaMap, error)
GetAllPartitionMeta fetches partition metadata stored in Zookeeper.
func (*ZKHandler) GetPartitionMap ¶
func (z *ZKHandler) GetPartitionMap(t string) (*PartitionMap, error)
GetPartitionMap takes a topic name. If the topic exists, the state of the topic is fetched and returned as a *PartitionMap.
func (*ZKHandler) GetPendingDeletion ¶
GetPendingDeletion returns any topics pending deletion.
func (*ZKHandler) GetReassignments ¶
func (z *ZKHandler) GetReassignments() Reassignments
GetReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments.
func (*ZKHandler) GetTopicConfig ¶
func (z *ZKHandler) GetTopicConfig(t string) (*TopicConfig, error)
GetTopicConfig takes a topic name. If the topic exists, the topic config is returned as a *TopicConfig.
func (*ZKHandler) GetTopicMetadata ¶ added in v3.14.0
func (z *ZKHandler) GetTopicMetadata(t string) (TopicMetadata, error)
GetTopicMetadata takes a topic name. If the topic exists, the topic metadata is returned as a TopicMetadata.
func (*ZKHandler) GetTopicState ¶
func (z *ZKHandler) GetTopicState(t string) (*TopicState, error)
GetTopicState takes a topic name. If the topic exists, the topic state is returned as a *TopicState.
func (*ZKHandler) GetTopicStateISR ¶
func (z *ZKHandler) GetTopicStateISR(t string) (TopicStateISR, error)
GetTopicStateISR takes a topic name. If the topic exists, the topic state is returned as a TopicStateISR. GetTopicStateCurrentISR differs from GetTopicState in that the actual, current broker IDs in the ISR are returned for each partition. This method is more expensive due to the need for a call per partition to ZK.
func (*ZKHandler) GetTopics ¶
GetTopics takes a []*regexp.Regexp and returns a []string of all topic names that match any of the provided regex.
func (*ZKHandler) GetUnderReplicated ¶ added in v3.5.0
GetUnderReplicated returns a []string of all under-replicated topics.
func (*ZKHandler) ListReassignments ¶ added in v3.14.0
func (z *ZKHandler) ListReassignments() (Reassignments, error)
ListReassignments looks up any ongoing topic reassignments and returns the data as a Reassignments. ListReassignments is a KIP-455 compatible call for Kafka 2.4 and Kafka cli tools 2.6.
func (*ZKHandler) MaxMetaAge ¶
MaxMetaAge returns the greatest age between the partitionmeta and brokermetrics stuctures.
func (*ZKHandler) NextInt ¶ added in v3.3.1
NextInt works as an atomic int generator. It does this by setting nil value to path p and returns the znode version.
func (*ZKHandler) Ready ¶
Ready returns true if the client is in either state StateConnected or StateHasSession. See https://godoc.org/github.com/go-zookeeper/zk#State.
func (*ZKHandler) UpdateKafkaConfig ¶
func (z *ZKHandler) UpdateKafkaConfig(c KafkaConfig) ([]bool, error)
UpdateKafkaConfig takes a KafkaConfig with key value pairs of entity config. If the config is changed, a persistent sequential znode is also written to propagate changes (via watches) to all Kafka brokers. This is a Kafka specific behavior; further references are available from the Kafka codebase. A []bool is returned indicating whether the config of the respective index was changed (if a config is updated to the existing value, 'false' is returned) along with any errors encountered. If a config value is set to an empty string (""), the entire config key itself is deleted. This was a convenient method to combine update/delete into a single func.