Documentation ¶
Index ¶
- Constants
- Variables
- func NewHotStoresStats(kind FlowKind) *hotPeerCache
- func NewStoreStatisticsMap(opt ScheduleOptions) *storeStatisticsMap
- type AvgOverTime
- type FlowKind
- type HotCache
- func (w *HotCache) CheckRead(region *core.RegionInfo, stats *StoresStats) []*HotPeerStat
- func (w *HotCache) CheckWrite(region *core.RegionInfo, stats *StoresStats) []*HotPeerStat
- func (w *HotCache) CollectMetrics(stats *StoresStats)
- func (w *HotCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool
- func (w *HotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotDegree int) *HotPeerStat
- func (w *HotCache) RegionStats(kind FlowKind) map[uint64][]*HotPeerStat
- func (w *HotCache) ResetMetrics()
- func (w *HotCache) Update(item *HotPeerStat)
- type HotPeerStat
- func (stat *HotPeerStat) Clone() *HotPeerStat
- func (stat *HotPeerStat) GetByteRate() float64
- func (stat *HotPeerStat) GetKeyRate() float64
- func (stat *HotPeerStat) ID() uint64
- func (stat *HotPeerStat) IsLeader() bool
- func (stat *HotPeerStat) IsNeedDelete() bool
- func (stat *HotPeerStat) IsNew() bool
- func (stat *HotPeerStat) Less(k int, than TopNItem) bool
- type HotPeersStat
- type LabelStatistics
- type MedianFilter
- type MovingAvg
- type RegionStatInformer
- type RegionStatisticType
- type RegionStatistics
- func (r *RegionStatistics) ClearDefunctRegion(regionID uint64)
- func (r *RegionStatistics) Collect()
- func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo
- func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo)
- func (r *RegionStatistics) Reset()
- type RegionStats
- type RollingStoreStats
- func (r *RollingStoreStats) GetBytesRate() (writeRate float64, readRate float64)
- func (r *RollingStoreStats) GetBytesReadRate() float64
- func (r *RollingStoreStats) GetBytesWriteRate() float64
- func (r *RollingStoreStats) GetCPUUsage() float64
- func (r *RollingStoreStats) GetDiskReadRate() float64
- func (r *RollingStoreStats) GetDiskWriteRate() float64
- func (r *RollingStoreStats) GetKeysReadRate() float64
- func (r *RollingStoreStats) GetKeysWriteRate() float64
- func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats)
- func (r *RollingStoreStats) Set(stats *pdpb.StoreStats)
- type ScheduleOptions
- type StoreHotPeersInfos
- type StoreHotPeersStat
- type StoreStatInformer
- type StoresStats
- func (s *StoresStats) CreateRollingStoreStats(storeID uint64)
- func (s *StoresStats) GetOrCreateRollingStoreStats(storeID uint64) *RollingStoreStats
- func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats
- func (s *StoresStats) GetStoreBytesRate(storeID uint64) (writeRate float64, readRate float64)
- func (s *StoresStats) GetStoreBytesReadRate(storeID uint64) float64
- func (s *StoresStats) GetStoreBytesWriteRate(storeID uint64) float64
- func (s *StoresStats) GetStoreCPUUsage(storeID uint64) float64
- func (s *StoresStats) GetStoreDiskReadRate(storeID uint64) float64
- func (s *StoresStats) GetStoreDiskWriteRate(storeID uint64) float64
- func (s *StoresStats) GetStoresBytesReadStat() map[uint64]float64
- func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]float64
- func (s *StoresStats) GetStoresCPUUsage() map[uint64]float64
- func (s *StoresStats) GetStoresDiskReadRate() map[uint64]float64
- func (s *StoresStats) GetStoresDiskWriteRate() map[uint64]float64
- func (s *StoresStats) GetStoresKeysReadStat() map[uint64]float64
- func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]float64
- func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats)
- func (s *StoresStats) RemoveRollingStoreStats(storeID uint64)
- func (s *StoresStats) Set(storeID uint64, stats *pdpb.StoreStats)
- func (s *StoresStats) TotalBytesReadRate() float64
- func (s *StoresStats) TotalBytesWriteRate() float64
- func (s *StoresStats) UpdateTotalBytesRate(f func() []*core.StoreInfo)
- type TimeMedian
- type TopN
- func (tn *TopN) Get(id uint64) TopNItem
- func (tn *TopN) GetAll() []TopNItem
- func (tn *TopN) GetAllTopN(k int) []TopNItem
- func (tn *TopN) GetTopNMin(k int) TopNItem
- func (tn *TopN) Len() int
- func (tn *TopN) Put(item TopNItem) (isUpdate bool)
- func (tn *TopN) Remove(id uint64) (item TopNItem)
- func (tn *TopN) RemoveExpired()
- type TopNItem
Constants ¶
const ( // DefaultAotSize is default size of average over time. DefaultAotSize = 2 // DefaultWriteMfSize is default size of write median filter DefaultWriteMfSize = 5 // DefaultReadMfSize is default size of read median filter DefaultReadMfSize = 3 )
const (
// RegionHeartBeatReportInterval is the heartbeat report interval of a region.
RegionHeartBeatReportInterval = 60
)
const (
// StoreHeartBeatReportInterval is the heartbeat report interval of a store.
StoreHeartBeatReportInterval = 10
)
Variables ¶
var Denoising = true
Denoising is an option to calculate flow base on the real heartbeats. Should only turned off by the simulator and the test.
Functions ¶
func NewHotStoresStats ¶
func NewHotStoresStats(kind FlowKind) *hotPeerCache
NewHotStoresStats creates a HotStoresStats
func NewStoreStatisticsMap ¶
func NewStoreStatisticsMap(opt ScheduleOptions) *storeStatisticsMap
NewStoreStatisticsMap creates a new storeStatisticsMap.
Types ¶
type AvgOverTime ¶
type AvgOverTime struct {
// contains filtered or unexported fields
}
AvgOverTime maintains change rate in the last avgInterval.
AvgOverTime takes changes with their own intervals, stores recent changes that happened in the last avgInterval, then calculates the change rate by (sum of changes) / (sum of intervals).
func NewAvgOverTime ¶
func NewAvgOverTime(interval time.Duration) *AvgOverTime
NewAvgOverTime returns an AvgOverTime with given interval.
func (*AvgOverTime) Add ¶
func (aot *AvgOverTime) Add(delta float64, interval time.Duration)
Add adds recent change to AvgOverTime.
func (*AvgOverTime) Get ¶
func (aot *AvgOverTime) Get() float64
Get returns change rate in the last interval.
func (*AvgOverTime) Set ¶
func (aot *AvgOverTime) Set(avg float64)
Set sets AvgOverTime to the given average.
type HotCache ¶
type HotCache struct {
// contains filtered or unexported fields
}
HotCache is a cache hold hot regions.
func (*HotCache) CheckRead ¶
func (w *HotCache) CheckRead(region *core.RegionInfo, stats *StoresStats) []*HotPeerStat
CheckRead checks the read status, returns update items.
func (*HotCache) CheckWrite ¶
func (w *HotCache) CheckWrite(region *core.RegionInfo, stats *StoresStats) []*HotPeerStat
CheckWrite checks the write status, returns update items.
func (*HotCache) CollectMetrics ¶
func (w *HotCache) CollectMetrics(stats *StoresStats)
CollectMetrics collects the hot cache metrics.
func (*HotCache) IsRegionHot ¶
func (w *HotCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool
IsRegionHot checks if the region is hot.
func (*HotCache) RandHotRegionFromStore ¶
func (w *HotCache) RandHotRegionFromStore(storeID uint64, kind FlowKind, hotDegree int) *HotPeerStat
RandHotRegionFromStore random picks a hot region in specify store.
func (*HotCache) RegionStats ¶
func (w *HotCache) RegionStats(kind FlowKind) map[uint64][]*HotPeerStat
RegionStats returns hot items according to kind
func (*HotCache) ResetMetrics ¶
func (w *HotCache) ResetMetrics()
ResetMetrics resets the hot cache metrics.
type HotPeerStat ¶
type HotPeerStat struct { StoreID uint64 `json:"store_id"` RegionID uint64 `json:"region_id"` // HotDegree records the hot region update times HotDegree int `json:"hot_degree"` // AntiCount used to eliminate some noise when remove region in cache AntiCount int `json:"anti_count"` Kind FlowKind `json:"kind"` ByteRate float64 `json:"flow_bytes"` KeyRate float64 `json:"flow_keys"` // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` // Version used to check the region split times Version uint64 `json:"version"` // contains filtered or unexported fields }
HotPeerStat records each hot peer's statistics
func (*HotPeerStat) Clone ¶
func (stat *HotPeerStat) Clone() *HotPeerStat
Clone clones the HotPeerStat
func (*HotPeerStat) GetByteRate ¶
func (stat *HotPeerStat) GetByteRate() float64
GetByteRate returns denoised BytesRate if possible.
func (*HotPeerStat) GetKeyRate ¶
func (stat *HotPeerStat) GetKeyRate() float64
GetKeyRate returns denoised KeysRate if possible.
func (*HotPeerStat) ID ¶
func (stat *HotPeerStat) ID() uint64
ID returns region ID. Implementing TopNItem.
func (*HotPeerStat) IsLeader ¶
func (stat *HotPeerStat) IsLeader() bool
IsLeader indicates the item belong to the leader.
func (*HotPeerStat) IsNeedDelete ¶
func (stat *HotPeerStat) IsNeedDelete() bool
IsNeedDelete to delete the item in cache.
func (*HotPeerStat) IsNew ¶
func (stat *HotPeerStat) IsNew() bool
IsNew indicates the item is first update in the cache of the region.
type HotPeersStat ¶
type HotPeersStat struct { TotalBytesRate float64 `json:"total_flow_bytes"` TotalKeysRate float64 `json:"total_flow_keys"` Count int `json:"regions_count"` Stats []HotPeerStat `json:"statistics"` }
HotPeersStat records all hot regions statistics
type LabelStatistics ¶
type LabelStatistics struct {
// contains filtered or unexported fields
}
LabelStatistics is the statistics of the level of labels.
func NewLabelStatistics ¶
func NewLabelStatistics() *LabelStatistics
NewLabelStatistics creates a new LabelStatistics.
func (*LabelStatistics) ClearDefunctRegion ¶
func (l *LabelStatistics) ClearDefunctRegion(regionID uint64, labels []string)
ClearDefunctRegion is used to handle the overlap region.
func (*LabelStatistics) Collect ¶
func (l *LabelStatistics) Collect()
Collect collects the metrics of the label status.
func (*LabelStatistics) Observe ¶
func (l *LabelStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo, labels []string)
Observe records the current label status.
func (*LabelStatistics) Reset ¶
func (l *LabelStatistics) Reset()
Reset resets the metrics of the label status.
type MedianFilter ¶
type MedianFilter struct {
// contains filtered or unexported fields
}
MedianFilter works as a median filter with specified window size. There are at most `size` data points for calculating. References: https://en.wikipedia.org/wiki/Median_filter.
func NewMedianFilter ¶
func NewMedianFilter(size int) *MedianFilter
NewMedianFilter returns a MedianFilter.
func (*MedianFilter) Get ¶
func (r *MedianFilter) Get() float64
Get returns the median of the data set.
type MovingAvg ¶
type MovingAvg interface { // Add adds a data point to the data set. Add(data float64) // Get returns the moving average. Get() float64 // Reset cleans the data set. Reset() // Set = Reset + Add Set(data float64) }
MovingAvg provides moving average. Ref: https://en.wikipedia.org/wiki/Moving_average
type RegionStatInformer ¶
type RegionStatInformer interface { IsRegionHot(region *core.RegionInfo) bool RegionWriteStats() map[uint64][]*HotPeerStat RegionReadStats() map[uint64][]*HotPeerStat RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo }
RegionStatInformer provides access to a shared informer of statistics.
type RegionStatisticType ¶
type RegionStatisticType uint32
RegionStatisticType represents the type of the region's status.
const ( MissPeer RegionStatisticType = 1 << iota ExtraPeer DownPeer PendingPeer OfflinePeer LearnerPeer EmptyRegion )
region status type
type RegionStatistics ¶
type RegionStatistics struct {
// contains filtered or unexported fields
}
RegionStatistics is used to record the status of regions.
func NewRegionStatistics ¶
func NewRegionStatistics(opt ScheduleOptions) *RegionStatistics
NewRegionStatistics creates a new RegionStatistics.
func (*RegionStatistics) ClearDefunctRegion ¶
func (r *RegionStatistics) ClearDefunctRegion(regionID uint64)
ClearDefunctRegion is used to handle the overlap region.
func (*RegionStatistics) Collect ¶
func (r *RegionStatistics) Collect()
Collect collects the metrics of the regions' status.
func (*RegionStatistics) GetRegionStatsByType ¶
func (r *RegionStatistics) GetRegionStatsByType(typ RegionStatisticType) []*core.RegionInfo
GetRegionStatsByType gets the status of the region by types.
func (*RegionStatistics) Observe ¶
func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo)
Observe records the current regions' status.
func (*RegionStatistics) Reset ¶
func (r *RegionStatistics) Reset()
Reset resets the metrics of the regions' status.
type RegionStats ¶
type RegionStats struct { Count int `json:"count"` EmptyCount int `json:"empty_count"` StorageSize int64 `json:"storage_size"` StorageKeys int64 `json:"storage_keys"` StoreLeaderCount map[uint64]int `json:"store_leader_count"` StorePeerCount map[uint64]int `json:"store_peer_count"` StoreLeaderSize map[uint64]int64 `json:"store_leader_size"` StoreLeaderKeys map[uint64]int64 `json:"store_leader_keys"` StorePeerSize map[uint64]int64 `json:"store_peer_size"` StorePeerKeys map[uint64]int64 `json:"store_peer_keys"` }
RegionStats records a list of regions' statistics and distribution status.
func GetRegionStats ¶
func GetRegionStats(regions []*core.RegionInfo) *RegionStats
GetRegionStats sums regions' statistics.
func (*RegionStats) Observe ¶
func (s *RegionStats) Observe(r *core.RegionInfo)
Observe adds a region's statistics into RegionStats.
type RollingStoreStats ¶
RollingStoreStats are multiple sets of recent historical records with specified windows size.
func (*RollingStoreStats) GetBytesRate ¶
func (r *RollingStoreStats) GetBytesRate() (writeRate float64, readRate float64)
GetBytesRate returns the bytes write rate and the bytes read rate.
func (*RollingStoreStats) GetBytesReadRate ¶
func (r *RollingStoreStats) GetBytesReadRate() float64
GetBytesReadRate returns the bytes read rate.
func (*RollingStoreStats) GetBytesWriteRate ¶
func (r *RollingStoreStats) GetBytesWriteRate() float64
GetBytesWriteRate returns the bytes write rate.
func (*RollingStoreStats) GetCPUUsage ¶
func (r *RollingStoreStats) GetCPUUsage() float64
GetCPUUsage returns the total cpu usages of threads in the store.
func (*RollingStoreStats) GetDiskReadRate ¶
func (r *RollingStoreStats) GetDiskReadRate() float64
GetDiskReadRate returns the total read disk io rate of threads in the store.
func (*RollingStoreStats) GetDiskWriteRate ¶
func (r *RollingStoreStats) GetDiskWriteRate() float64
GetDiskWriteRate returns the total write disk io rate of threads in the store.
func (*RollingStoreStats) GetKeysReadRate ¶
func (r *RollingStoreStats) GetKeysReadRate() float64
GetKeysReadRate returns the keys read rate.
func (*RollingStoreStats) GetKeysWriteRate ¶
func (r *RollingStoreStats) GetKeysWriteRate() float64
GetKeysWriteRate returns the keys write rate.
func (*RollingStoreStats) Observe ¶
func (r *RollingStoreStats) Observe(stats *pdpb.StoreStats)
Observe records current statistics.
func (*RollingStoreStats) Set ¶
func (r *RollingStoreStats) Set(stats *pdpb.StoreStats)
Set sets the statistics (for test).
type ScheduleOptions ¶
type ScheduleOptions interface { GetLocationLabels() []string GetLowSpaceRatio() float64 GetHighSpaceRatio() float64 GetTolerantSizeRatio() float64 GetStoreBalanceRate() float64 GetSchedulerMaxWaitingOperator() uint64 GetLeaderScheduleLimit() uint64 GetRegionScheduleLimit() uint64 GetReplicaScheduleLimit() uint64 GetMergeScheduleLimit() uint64 GetHotRegionScheduleLimit() uint64 GetMaxReplicas() int GetHotRegionCacheHitsThreshold() int GetMaxSnapshotCount() uint64 GetMaxPendingPeerCount() uint64 GetMaxMergeRegionSize() uint64 GetMaxMergeRegionKeys() uint64 GetLeaderSchedulePolicy() core.SchedulePolicy GetKeyType() core.KeyType IsMakeUpReplicaEnabled() bool IsRemoveExtraReplicaEnabled() bool IsRemoveDownReplicaEnabled() bool IsReplaceOfflineReplicaEnabled() bool GetMaxStoreDownTime() time.Duration }
ScheduleOptions is an interface to access configurations. TODO: merge the Options to schedule.Options
type StoreHotPeersInfos ¶
type StoreHotPeersInfos struct { AsPeer StoreHotPeersStat `json:"as_peer"` AsLeader StoreHotPeersStat `json:"as_leader"` }
StoreHotPeersInfos is used to get human-readable description for hot regions.
type StoreHotPeersStat ¶
type StoreHotPeersStat map[uint64]*HotPeersStat
StoreHotPeersStat is used to record the hot region statistics group by store.
type StoreStatInformer ¶
type StoreStatInformer interface {
GetStoresStats() *StoresStats
}
StoreStatInformer provides access to a shared informer of statistics.
type StoresStats ¶
StoresStats is a cache hold hot regions.
func NewStoresStats ¶
func NewStoresStats() *StoresStats
NewStoresStats creates a new hot spot cache.
func (*StoresStats) CreateRollingStoreStats ¶
func (s *StoresStats) CreateRollingStoreStats(storeID uint64)
CreateRollingStoreStats creates RollingStoreStats with a given store ID.
func (*StoresStats) GetOrCreateRollingStoreStats ¶
func (s *StoresStats) GetOrCreateRollingStoreStats(storeID uint64) *RollingStoreStats
GetOrCreateRollingStoreStats gets or creates RollingStoreStats with a given store ID.
func (*StoresStats) GetRollingStoreStats ¶
func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats
GetRollingStoreStats gets RollingStoreStats with a given store ID.
func (*StoresStats) GetStoreBytesRate ¶
func (s *StoresStats) GetStoreBytesRate(storeID uint64) (writeRate float64, readRate float64)
GetStoreBytesRate returns the bytes write stat of the specified store.
func (*StoresStats) GetStoreBytesReadRate ¶
func (s *StoresStats) GetStoreBytesReadRate(storeID uint64) float64
GetStoreBytesReadRate returns the bytes read stat of the specified store.
func (*StoresStats) GetStoreBytesWriteRate ¶
func (s *StoresStats) GetStoreBytesWriteRate(storeID uint64) float64
GetStoreBytesWriteRate returns the bytes write stat of the specified store.
func (*StoresStats) GetStoreCPUUsage ¶
func (s *StoresStats) GetStoreCPUUsage(storeID uint64) float64
GetStoreCPUUsage returns the total cpu usages of threads of the specified store.
func (*StoresStats) GetStoreDiskReadRate ¶
func (s *StoresStats) GetStoreDiskReadRate(storeID uint64) float64
GetStoreDiskReadRate returns the total read disk io rate of threads of the specified store.
func (*StoresStats) GetStoreDiskWriteRate ¶
func (s *StoresStats) GetStoreDiskWriteRate(storeID uint64) float64
GetStoreDiskWriteRate returns the total write disk io rate of threads of the specified store.
func (*StoresStats) GetStoresBytesReadStat ¶
func (s *StoresStats) GetStoresBytesReadStat() map[uint64]float64
GetStoresBytesReadStat returns the bytes read stat of all StoreInfo.
func (*StoresStats) GetStoresBytesWriteStat ¶
func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]float64
GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo.
func (*StoresStats) GetStoresCPUUsage ¶
func (s *StoresStats) GetStoresCPUUsage() map[uint64]float64
GetStoresCPUUsage returns the cpu usage stat of all StoreInfo.
func (*StoresStats) GetStoresDiskReadRate ¶
func (s *StoresStats) GetStoresDiskReadRate() map[uint64]float64
GetStoresDiskReadRate returns the disk read rate stat of all StoreInfo.
func (*StoresStats) GetStoresDiskWriteRate ¶
func (s *StoresStats) GetStoresDiskWriteRate() map[uint64]float64
GetStoresDiskWriteRate returns the disk write rate stat of all StoreInfo.
func (*StoresStats) GetStoresKeysReadStat ¶
func (s *StoresStats) GetStoresKeysReadStat() map[uint64]float64
GetStoresKeysReadStat returns the bytes read stat of all StoreInfo.
func (*StoresStats) GetStoresKeysWriteStat ¶
func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]float64
GetStoresKeysWriteStat returns the keys write stat of all StoreInfo.
func (*StoresStats) Observe ¶
func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats)
Observe records the current store status with a given store.
func (*StoresStats) RemoveRollingStoreStats ¶
func (s *StoresStats) RemoveRollingStoreStats(storeID uint64)
RemoveRollingStoreStats removes RollingStoreStats with a given store ID.
func (*StoresStats) Set ¶
func (s *StoresStats) Set(storeID uint64, stats *pdpb.StoreStats)
Set sets the store statistics (for test).
func (*StoresStats) TotalBytesReadRate ¶
func (s *StoresStats) TotalBytesReadRate() float64
TotalBytesReadRate returns the total read bytes rate of all StoreInfo.
func (*StoresStats) TotalBytesWriteRate ¶
func (s *StoresStats) TotalBytesWriteRate() float64
TotalBytesWriteRate returns the total written bytes rate of all StoreInfo.
func (*StoresStats) UpdateTotalBytesRate ¶
func (s *StoresStats) UpdateTotalBytesRate(f func() []*core.StoreInfo)
UpdateTotalBytesRate updates the total bytes write rate and read rate.
type TimeMedian ¶
type TimeMedian struct {
// contains filtered or unexported fields
}
TimeMedian is AvgOverTime + MedianFilter Size of MedianFilter should be larger than double size of AvgOverTime to denoisy. Delay is aotSize * mfSize * StoreHeartBeatReportInterval /4
func NewTimeMedian ¶
func NewTimeMedian(aotSize, mfSize int) *TimeMedian
NewTimeMedian returns a TimeMedian with given size.
func (*TimeMedian) Add ¶
func (t *TimeMedian) Add(delta float64, interval time.Duration)
Add adds recent change to TimeMedian.
func (*TimeMedian) Get ¶
func (t *TimeMedian) Get() float64
Get returns change rate in the median of the several intervals.
type TopN ¶
type TopN struct {
// contains filtered or unexported fields
}
TopN maintains the N largest items of multiple dimensions.
func NewTopN ¶
NewTopN returns a k-dimensional TopN with given TTL. NOTE: panic if k <= 0 or n <= 0.
func (*TopN) GetAllTopN ¶
GetAllTopN returns the top N items of the `k`th dimension.
func (*TopN) GetTopNMin ¶
GetTopNMin returns the min item in top N of the `k`th dimension.
func (*TopN) RemoveExpired ¶
func (tn *TopN) RemoveExpired()
RemoveExpired deletes all expired items.