Documentation
¶
Index ¶
- Constants
- Variables
- func WriteMap(pm *PartitionMap, path string) error
- type Broker
- type BrokerFilterFn
- type BrokerList
- func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)
- func (b BrokerList) Filter(f BrokerFilterFn) BrokerList
- func (b BrokerList) SortByCount()
- func (b BrokerList) SortByID()
- func (b BrokerList) SortByIDDesc()
- func (b BrokerList) SortByStorage()
- func (b BrokerList) SortPseudoShuffle(seed int64)
- type BrokerMap
- func (b BrokerMap) AboveMean(d float64, f func() float64) []int
- func (b BrokerMap) BelowMean(d float64, f func() float64) []int
- func (b BrokerMap) Copy() BrokerMap
- func (b BrokerMap) Filter(f BrokerFilterFn) BrokerMap
- func (b BrokerMap) HMean() float64
- func (b BrokerMap) List() BrokerList
- func (b BrokerMap) Mean() float64
- func (b BrokerMap) MinMax() (float64, float64)
- func (b BrokerMap) StorageDiff(b2 BrokerMap) map[int][2]float64
- func (b BrokerMap) StorageRange() float64
- func (b BrokerMap) StorageRangeSpread() float64
- func (b BrokerMap) StorageStdDev() float64
- func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f BrokerFilterFn) error
- func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)
- func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan string)
- type BrokerMeta
- type BrokerMetaMap
- type BrokerMetrics
- type BrokerMetricsMap
- type BrokerStatus
- type BrokerUseStats
- type BrokerUseStatsList
- type BrokerUseStatsMap
- type Constraints
- type ConstraintsParams
- type DegreeDistribution
- type DegreeDistributionStats
- type Mappings
- type NoMappingForBroker
- type NoMappingForTopic
- type Partition
- type PartitionList
- type PartitionMap
- func (pm PartitionMap) BrokersIn() BrokerFilterFn
- func (pm *PartitionMap) Copy() *PartitionMap
- func (pm *PartitionMap) DegreeDistribution() DegreeDistribution
- func (pm *PartitionMap) Equal(pm2 *PartitionMap) (bool, error)
- func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string
- func (pm *PartitionMap) Mappings() Mappings
- func (pm *PartitionMap) OptimizeLeaderFollower()
- func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error)
- func (pm *PartitionMap) ReplicaSets(t string) ReplicaSets
- func (pm *PartitionMap) SetReplication(r int)
- func (pm *PartitionMap) Strip() *PartitionMap
- func (pm *PartitionMap) Topics() []string
- func (pm *PartitionMap) UseStats() BrokerUseStatsMap
- type PartitionMapOpt
- type PartitionMeta
- type PartitionMetaMap
- type RebuildParams
- type ReplicaSets
- type Stub
- func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
- func (zk *Stub) GetAllPartitionMeta() (PartitionMetaMap, error)
- func (zk *Stub) GetBrokerMetrics() (BrokerMetricsMap, error)
- func (zk *Stub) GetPartitionMap(t string) (*PartitionMap, error)
- func (zk *Stub) GetTopicState(t string) (*TopicState, error)
- type StubZnode
- type SubstitutionAffinities
- type TopicState
Constants ¶
const ( // StubBrokerID is the platform int max. StubBrokerID int = int(^uint(0) >> 1) )
Variables ¶
var ( // ErrNoBrokers error. ErrNoBrokers = errors.New("No additional brokers that meet Constraints") // ErrInvalidSelectionMethod error. ErrInvalidSelectionMethod = errors.New("Invalid selection method") )
var NotReplacedBrokersFn = func(b *Broker) bool { return !b.Replace }
var ReplacedBrokersFn = func(b *Broker) bool { return b.Replace }
Functions ¶
func WriteMap ¶
func WriteMap(pm *PartitionMap, path string) error
WriteMap takes a *PartitionMap and writes a JSON text file to the provided path.
Types ¶
type Broker ¶
type Broker struct { ID int Locality string Used int StorageFree float64 Replace bool Missing bool New bool }
Broker associates metadata with a real broker by ID.
type BrokerFilterFn ¶
BrokerFilterFn is a filter function for BrokerList and BrokerMap types.
var AllBrokersFn BrokerFilterFn = func(b *Broker) bool { return true }
AllBrokersFn returns all brokers.
func AboveMeanFn ¶
func AboveMeanFn(d float64, f func() float64) BrokerFilterFn
AboveMeanFn returns a BrokerFilterFn that filters brokers that are above the mean by d percent (0.00 < d). The mean type is provided as a function f.
func BelowMeanFn ¶
func BelowMeanFn(d float64, f func() float64) BrokerFilterFn
BelowMeanFn returns a BrokerFilterFn that filters brokers that are below the mean by d percent (0.00 < d). The mean type is provided as a function f.
type BrokerList ¶
type BrokerList []*Broker
BrokerList is a slice of brokers for sorting by used count.
func (BrokerList) BestCandidate ¶
func (b BrokerList) BestCandidate(c *Constraints, by string, p int64) (*Broker, error)
TODO deprecate. BestCandidate takes a *Constraints, selection method and pass / iteration number (for use as a seed value for pseudo-random number generation) and returns the most suitable broker.
func (BrokerList) Filter ¶
func (b BrokerList) Filter(f BrokerFilterFn) BrokerList
Filter returns a BrokerList of brokers that return true as an input to function f.
func (BrokerList) SortByCount ¶
func (b BrokerList) SortByCount()
SortByCount sorts the BrokerList by Used values.
func (BrokerList) SortByID ¶
func (b BrokerList) SortByID()
SortByID sorts the BrokerList by ID values.
func (BrokerList) SortByIDDesc ¶
func (b BrokerList) SortByIDDesc()
SortByIDDesc sorts the BrokerList by ID values.
func (BrokerList) SortByStorage ¶
func (b BrokerList) SortByStorage()
SortByStorage sorts the BrokerList by StorageFree values.
func (BrokerList) SortPseudoShuffle ¶
func (b BrokerList) SortPseudoShuffle(seed int64)
SortPseudoShuffle takes a BrokerList and performs a sort by count. For each sequence of brokers with equal counts, the sub-slice is pseudo random shuffled using the provided seed value s.
type BrokerMap ¶
BrokerMap holds a mapping of broker IDs to *Broker.
func BrokerMapFromPartitionMap ¶
func BrokerMapFromPartitionMap(pm *PartitionMap, bm BrokerMetaMap, force bool) BrokerMap
BrokerMapFromPartitionMap creates a BrokerMap from a partitionMap.
func (BrokerMap) AboveMean ¶
AboveMean returns a sorted []int of broker IDs that are above the mean by d percent (0.00 < d). The mean type is provided as a function f.
func (BrokerMap) BelowMean ¶
BelowMean returns a sorted []int of broker IDs that are below the mean by d percent (0.00 < d). The mean type is provided as a function f.
func (BrokerMap) Filter ¶
func (b BrokerMap) Filter(f BrokerFilterFn) BrokerMap
Filter returns a BrokerMap of brokers that return true as an input to function f.
func (BrokerMap) List ¶
func (b BrokerMap) List() BrokerList
List take a BrokerMap and returns a BrokerList.
func (BrokerMap) StorageDiff ¶
StorageDiff takes two BrokerMaps and returns a per broker ID diff in storage as a [2]float64: [absolute, percentage] diff.
func (BrokerMap) StorageRange ¶
StorageRange returns the range of free storage for all brokers in the BrokerMap.
func (BrokerMap) StorageRangeSpread ¶
StorageRangeSpread returns the range spread of free storage for all brokers in the BrokerMap.
func (BrokerMap) StorageStdDev ¶
StorageStdDev returns the standard deviation of free storage for all brokers in the BrokerMap.
func (BrokerMap) SubStorage ¶
func (b BrokerMap) SubStorage(pm *PartitionMap, pmm PartitionMetaMap, f BrokerFilterFn) error
SubStorageAll takes a PartitionMap, PartitionMetaMap, and a function. For all brokers that return true as an input to function f, the size of all partitions held is added back to the broker StorageFree value.
func (BrokerMap) SubstitutionAffinities ¶
func (b BrokerMap) SubstitutionAffinities(pm *PartitionMap) (SubstitutionAffinities, error)
SubstitutionAffinities finds all brokers marked for replacement and foeach broker, it creates an exclusive association with a newly provided brokerIn the rebuild stage, each to-be-replaced broker will be only replaced witthe affinity it's associated with. A given new broker can only be an affinitfor a single outgoing broker. An error is returned if a completmapping of affinities cannot be constructed (e.g. two brokers armarked for replacement but only one new replacement was provideand substitution affinities is enabled).
func (BrokerMap) Update ¶
func (b BrokerMap) Update(bl []int, bm BrokerMetaMap) (*BrokerStatus, <-chan string)
Update takes a []int of broker IDs and BrokerMetaMap then adds them to the BrokerMap, returning the count of marked for replacement, newly included, and brokers that weren't found in ZooKeeper. Additionally, a channel of msgs describing changes is returned.
type BrokerMeta ¶
type BrokerMeta struct { StorageFree float64 // In bytes. MetricsIncomplete bool // Metadata from the Kafka cluster state. Host string Port int Rack string LogMessageFormat string InterBrokerProtocolVersion string }
BrokerMeta holds metadata that describes a broker.
func (BrokerMeta) Copy ¶
func (bm BrokerMeta) Copy() BrokerMeta
Copy returns a copy of a BrokerMeta.
type BrokerMetaMap ¶
type BrokerMetaMap map[int]*BrokerMeta
BrokerMetaMap is a map of broker IDs to BrokerMeta.
func BrokerMetaMapFromStates ¶ added in v4.1.0
func BrokerMetaMapFromStates(states kafkaadmin.BrokerStates) (BrokerMetaMap, error)
BrokerMetaMapFromStates takes a kafkaadmin.BrokerStates and translates it to a BrokerMetaMap.
func (BrokerMetaMap) Copy ¶
func (bmm BrokerMetaMap) Copy() BrokerMetaMap
Copy returns a copy of a BrokerMetaMap.
type BrokerMetrics ¶
type BrokerMetrics struct {
StorageFree float64
}
BrokerMetrics holds broker metric data fetched from ZK.
type BrokerMetricsMap ¶
type BrokerMetricsMap map[int]*BrokerMetrics
BrokerMetricsMap holds a mapping of broker ID to BrokerMetrics.
type BrokerStatus ¶
BrokerStatus summarizes change counts from an input and output broker list.
func (BrokerStatus) Changes ¶
func (bs BrokerStatus) Changes() bool
Changes returns a bool that indicates whether a BrokerStatus values represent a change in brokers.
type BrokerUseStats ¶
BrokerUseStats holds counts of partition ownership.
type BrokerUseStatsList ¶
type BrokerUseStatsList []*BrokerUseStats
BrokerUseStatsList is a slice of *BrokerUseStats.
func (BrokerUseStatsList) Len ¶
func (b BrokerUseStatsList) Len() int
func (BrokerUseStatsList) Less ¶
func (b BrokerUseStatsList) Less(i, j int) bool
func (BrokerUseStatsList) Swap ¶
func (b BrokerUseStatsList) Swap(i, j int)
type BrokerUseStatsMap ¶
type BrokerUseStatsMap map[int]*BrokerUseStats
BrokerUseStatsList is a map of IDs to *BrokerUseStats.
func (BrokerUseStatsMap) List ¶
func (b BrokerUseStatsMap) List() BrokerUseStatsList
List returns a BrokerUseStatsList from a BrokerUseStatsMap.
type Constraints ¶
type Constraints struct {
// contains filtered or unexported fields
}
Constraints holds a map of IDs and locality key-values.
func MergeConstraints ¶
func MergeConstraints(bl BrokerList) *Constraints
TODO deprecate. MergeConstraints takes a brokerlist and builds a *Constraints by merging the attributes of all brokers from the supplied list.
func NewConstraints ¶
func NewConstraints() *Constraints
NewConstraints returns an empty *Constraints.
func (*Constraints) Add ¶
func (c *Constraints) Add(b *Broker)
Add takes a *Broker and adds its attributes to the *Constraints. The requestSize is also subtracted from the *Broker.StorageFree.
func (*Constraints) MergeConstraints ¶
func (c *Constraints) MergeConstraints(bl BrokerList)
MergeConstraints takes a brokerlist and updates the *Constraints by merging the attributes of all brokers from the supplied list.
func (*Constraints) SelectBroker ¶
func (c *Constraints) SelectBroker(b BrokerList, p ConstraintsParams) (*Broker, error)
SelectBroker takes a BrokerList and a ConstraintsParams and selects the most suitable broker that passes all specified constraints.
type ConstraintsParams ¶
type ConstraintsParams struct { SelectorMethod string MinUniqueRackIDs int RequestSize float64 SeedVal int64 }
ConstraintsParams holds parameters for the SelectBroker method.
type DegreeDistribution ¶
type DegreeDistribution struct { // Relationships is a an adjacency list where an edge between brokers is // defined as a common occupancy in at least one replica set. For instance, // given the replica set [1001,1002,1003], ID 1002 has a relationship with // 1001 and 1003. Relationships map[int]map[int]struct{} }
DegreeDistribution counts broker to broker relationships.
func NewDegreeDistribution ¶
func NewDegreeDistribution() DegreeDistribution
NewDegreeDistribution returns a new DegreeDistribution.
func (DegreeDistribution) Add ¶
func (dd DegreeDistribution) Add(nodes []int)
Add takes a []int of broker IDs representing a replica set and updates the adjacency lists for each broker in the set.
func (DegreeDistribution) Count ¶
func (dd DegreeDistribution) Count(n int) int
Count takes a node ID and returns the degree distribution.
func (DegreeDistribution) Stats ¶
func (dd DegreeDistribution) Stats() DegreeDistributionStats
Stats returns a DegreeDistributionStats.
type DegreeDistributionStats ¶
DegreeDistributionStats holds general statistical information describing the DegreeDistribution counts.
type Mappings ¶
type Mappings map[int]map[string]PartitionList
Mappings is a mapping of broker IDs to currently held partition as a PartitionList.
func (Mappings) LargestPartitions ¶
func (m Mappings) LargestPartitions(id int, k int, pm PartitionMetaMap) (PartitionList, error)
LargestPartitions takes a broker ID and PartitionMetaMap and returns a PartitionList with the top k partitions by size for the provided broker ID.
type NoMappingForBroker ¶
type NoMappingForBroker struct {
// contains filtered or unexported fields
}
NoMappingForBroker error.
func (NoMappingForBroker) Error ¶
func (e NoMappingForBroker) Error() string
type NoMappingForTopic ¶
type NoMappingForTopic struct {
// contains filtered or unexported fields
}
NoMappingForTopic error.
func (NoMappingForTopic) Error ¶
func (e NoMappingForTopic) Error() string
type Partition ¶
type Partition struct { Topic string `json:"topic"` Partition int `json:"partition"` Replicas []int `json:"replicas"` }
Partition represents the Kafka partition structure.
type PartitionList ¶
type PartitionList []Partition
PartitionList is a []Partition.
func (PartitionList) Len ¶
func (p PartitionList) Len() int
func (PartitionList) Less ¶
func (p PartitionList) Less(i, j int) bool
func (PartitionList) SortBySize ¶
func (p PartitionList) SortBySize(m PartitionMetaMap)
SortBySize takes a PartitionMetaMap and sorts the PartitionList by partition size.
func (PartitionList) Swap ¶
func (p PartitionList) Swap(i, j int)
type PartitionMap ¶
type PartitionMap struct { Version int `json:"version"` Partitions PartitionList `json:"partitions"` }
PartitionMap represents the Kafka partition map structure.
func NewPartitionMap ¶
func NewPartitionMap(opts ...PartitionMapOpt) *PartitionMap
NewPartitionMap returns an empty *PartitionMap with optionally provided PartitionMapOpts.
func PartitionMapFromString ¶
func PartitionMapFromString(s string) (*PartitionMap, error)
PartitionMapFromString takes a json encoded string and returns a *PartitionMap.
func PartitionMapFromTopicStates ¶ added in v4.1.0
func PartitionMapFromTopicStates(ts kafkaadmin.TopicStates) (*PartitionMap, error)
PartitionMapFromTopicStates translates a kafkaadmin.TopicStates to a *PartitionMap.
func (PartitionMap) BrokersIn ¶
func (pm PartitionMap) BrokersIn() BrokerFilterFn
BrokersIn returns a BrokerFilterFn that filters for brokers in the PartitionMap.
func (*PartitionMap) Copy ¶
func (pm *PartitionMap) Copy() *PartitionMap
Copy returns a copy of a *PartitionMap.
func (*PartitionMap) DegreeDistribution ¶
func (pm *PartitionMap) DegreeDistribution() DegreeDistribution
DegreeDistribution returns the DegreeDistribution for the PartitionMap.
func (*PartitionMap) Equal ¶
func (pm *PartitionMap) Equal(pm2 *PartitionMap) (bool, error)
Equal checks the ity betwee two partition maps. Equality requires that the total order is exactly the same.
func (*PartitionMap) LocalitiesAvailable ¶
func (pm *PartitionMap) LocalitiesAvailable(bm BrokerMap, b *Broker) []string
LocalitiesAvailable takes a broker map and broker and returns a []string of localities that are unused by any of the brokers in any replica sets that the reference broker was found in. This is done by building a set of all localities observed across all replica sets and a set of all localities observed in replica sets containing the reference broker, then returning the diff.
func (*PartitionMap) Mappings ¶
func (pm *PartitionMap) Mappings() Mappings
Mappings returns a Mappings from a *PartitionMap.
func (*PartitionMap) OptimizeLeaderFollower ¶
func (pm *PartitionMap) OptimizeLeaderFollower()
OptimizeLeaderFollower is a simple leadership optimization algorithm that iterates over each partition's replica set and sorts brokers according to their leader/follower position ratio, ascending. The idea is that if a broker has a high leader/follower ratio, it should go further down the replica list. This ratio is recalculated at each replica set visited to avoid extreme skew.
func (*PartitionMap) Rebuild ¶
func (pm *PartitionMap) Rebuild(params RebuildParams) (*PartitionMap, []error)
Rebuild takes a BrokerMap and rebuild strategy. It then traverses the partition map, replacing brokers marked removal with the best available candidate based on the selected rebuild strategy. A rebuilt *PartitionMap and []error of errors is returned.
func (*PartitionMap) ReplicaSets ¶
func (pm *PartitionMap) ReplicaSets(t string) ReplicaSets
ReplicaSets takes a topic name and returns a ReplicaSets.
func (*PartitionMap) SetReplication ¶
func (pm *PartitionMap) SetReplication(r int)
SetReplication ensures that replica sets is reset to the replication factor r. Sets exceeding r are truncated, sets below r are extended with stub brokers.
func (*PartitionMap) Strip ¶
func (pm *PartitionMap) Strip() *PartitionMap
Strip takes a PartitionMap and returns a copy where all broker ID references are replaced with the stub broker (ID == StubBrokerID) with the replace field is set to true. This ensures that the entire map is rebuilt, even if the provided broker list matches what's already in the map.
func (*PartitionMap) Topics ¶
func (pm *PartitionMap) Topics() []string
Topics returns a []string of topic names held in the PartitionMap.
func (*PartitionMap) UseStats ¶
func (pm *PartitionMap) UseStats() BrokerUseStatsMap
UseStats returns a map of broker IDs to BrokerUseStats; each contains a count of leader and follower partition assignments.
type PartitionMapOpt ¶
type PartitionMapOpt func(*PartitionMap)
PartitionMapOpt is a function that configures a *PartitionMap at instantiation time.
func Populate ¶
func Populate(s string, n, r int) PartitionMapOpt
Populate takes a name, partition count and replication factor and returns a PartitionMapOpt.
type PartitionMeta ¶
type PartitionMeta struct {
Size float64 // In bytes.
}
PartitionMeta holds partition metadata.
type PartitionMetaMap ¶
type PartitionMetaMap map[string]map[int]*PartitionMeta
PartitionMetaMap is a mapping of topic, partition number to PartitionMeta.
func NewPartitionMetaMap ¶
func NewPartitionMetaMap() PartitionMetaMap
NewPartitionMetaMap returns an empty PartitionMetaMap.
type RebuildParams ¶
type RebuildParams struct { PMM PartitionMetaMap BM BrokerMap Strategy string Optimization string Affinities SubstitutionAffinities PartnSzFactor float64 MinUniqueRackIDs int // contains filtered or unexported fields }
RebuildParams holds required parameters to call the Rebuild method on a *PartitionMap.
func NewRebuildParams ¶
func NewRebuildParams() RebuildParams
NewRebuildParams initializes a RebuildParams.
type ReplicaSets ¶
ReplicaSets is a mapping of partition number to Partition.Replicas. Take note that there is no topic identifier and that partition numbers from two different topics can overwrite one another.
type Stub ¶
type Stub struct {
// contains filtered or unexported fields
}
Stub stubs the Handler interface.
func (*Stub) GetAllBrokerMeta ¶
func (zk *Stub) GetAllBrokerMeta(withMetrics bool) (BrokerMetaMap, []error)
GetAllBrokerMeta stubs GetAllBrokerMeta.
func (*Stub) GetAllPartitionMeta ¶
func (zk *Stub) GetAllPartitionMeta() (PartitionMetaMap, error)
GetAllPartitionMeta stubs GetAllPartitionMeta.
func (*Stub) GetBrokerMetrics ¶
func (zk *Stub) GetBrokerMetrics() (BrokerMetricsMap, error)
GetBrokerMetrics stubs GetBrokerMetrics.
func (*Stub) GetPartitionMap ¶
func (zk *Stub) GetPartitionMap(t string) (*PartitionMap, error)
GetPartitionMap stubs GetPartitionMap.
func (*Stub) GetTopicState ¶
func (zk *Stub) GetTopicState(t string) (*TopicState, error)
GetTopicState stubs GetTopicState.
type StubZnode ¶
type StubZnode struct {
// contains filtered or unexported fields
}
StubZnode stubs a ZooKeeper znode.
type SubstitutionAffinities ¶
SubstitutionAffinities is a mapping of an ID belonging to a *Broker marked for replacement and a replacement *Broker that will fill all previously filled replica slots held by the *Broker being replaced.
func (SubstitutionAffinities) Get ¶
func (sa SubstitutionAffinities) Get(id int) *Broker
Get takes a broker ID and returns a *Broker if one was set as a substitution affinity.
type TopicState ¶
TopicState is used for unmarshalling ZooKeeper json data from a topic: e.g. /brokers/topics/some-topic
func (*TopicState) Brokers ¶
func (ts *TopicState) Brokers() []int
Brokers returns a []int of broker IDs from all partitions in the TopicState.