Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyOperator(mc *mockcluster.Cluster, op *operator.Operator)
- func ApplyOperatorStep(resource *core.CachedResource, op *operator.Operator) *core.CachedResource
- func DecodeConfig(data []byte, v interface{}) error
- func EncodeConfig(v interface{}) ([]byte, error)
- func FindSchedulerTypeByName(name string) string
- func NewTotalOpInfluence(operators []*operator.Operator, cluster opt.Cluster) operator.OpInfluence
- func RegisterScheduler(typ string, createFn CreateSchedulerFunc)
- func RegisterSliceDecoderBuilder(typ string, builder ConfigSliceDecoderBuilder)
- type Bucket
- type CheckerController
- func (c *CheckerController) AddWaitingResource(res *core.CachedResource)
- func (c *CheckerController) CheckResource(res *core.CachedResource) []*operator.Operator
- func (c *CheckerController) FillReplicas(res *core.CachedResource, leastPeers int) error
- func (c *CheckerController) GetMergeChecker() *checker.MergeChecker
- func (c *CheckerController) GetWaitingResources() []*cache.Item
- func (c *CheckerController) RemoveWaitingResource(id uint64)
- type ConfigDecoder
- type ConfigSliceDecoderBuilder
- type CreateSchedulerFunc
- type OperatorController
- func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool
- func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int
- func (oc *OperatorController) CollectContainerLimitMetrics()
- func (oc *OperatorController) Ctx() context.Context
- func (oc *OperatorController) Dispatch(res *core.CachedResource, source string)
- func (oc *OperatorController) DispatchDestroyDirectly(res *core.CachedResource, source string)
- func (oc *OperatorController) ExceedContainerLimit(ops ...*operator.Operator) bool
- func (oc *OperatorController) GetCluster() opt.Cluster
- func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory
- func (oc *OperatorController) GetLeaderSchedulePolicy() core.SchedulePolicy
- func (oc *OperatorController) GetOpInfluence(cluster opt.Cluster) operator.OpInfluence
- func (oc *OperatorController) GetOperator(resID uint64) *operator.Operator
- func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus
- func (oc *OperatorController) GetOperators() []*operator.Operator
- func (oc *OperatorController) GetWaitingOperators() []*operator.Operator
- func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64
- func (oc *OperatorController) PromoteWaitingOperator()
- func (oc *OperatorController) PruneHistory()
- func (oc *OperatorController) PushOperators()
- func (oc *OperatorController) RemoveOperator(op *operator.Operator, extra string) bool
- func (oc *OperatorController) SendScheduleCommand(res *core.CachedResource, step operator.OpStep, source string)
- func (oc *OperatorController) SetOperator(op *operator.Operator)
- type OperatorRecords
- type OperatorWithStatus
- type PluginInterface
- type RandBuckets
- type RangeCluster
- func (r *RangeCluster) GetAverageResourceSize() int64
- func (r *RangeCluster) GetContainer(id uint64) *core.CachedContainer
- func (r *RangeCluster) GetContainers() []*core.CachedContainer
- func (r *RangeCluster) GetFollowerContainers(res *core.CachedResource) []*core.CachedContainer
- func (r *RangeCluster) GetLeaderContainer(res *core.CachedResource) *core.CachedContainer
- func (r *RangeCluster) GetResourceContainers(res *core.CachedResource) []*core.CachedContainer
- func (r *RangeCluster) GetTolerantSizeRatio() float64
- func (r *RangeCluster) RandFollowerResource(groupKey string, containerID uint64, ranges []core.KeyRange, ...) *core.CachedResource
- func (r *RangeCluster) RandLeaderResource(groupKey string, containerID uint64, ranges []core.KeyRange, ...) *core.CachedResource
- func (r *RangeCluster) SetTolerantSizeRatio(ratio float64)
- type ResourceScatterer
- func (r *ResourceScatterer) Put(peers map[uint64]metapb.Replica, leaderContainerID uint64, group string)
- func (r *ResourceScatterer) Scatter(res *core.CachedResource, group string) (*operator.Operator, error)
- func (r *ResourceScatterer) ScatterResources(resources map[uint64]*core.CachedResource, failures map[uint64]error, ...) ([]*operator.Operator, error)
- func (r *ResourceScatterer) ScatterResourcesByID(resourceIDs []uint64, group string, retryLimit int) ([]*operator.Operator, map[uint64]error, error)
- func (r *ResourceScatterer) ScatterResourcesByRange(resGroup uint64, startKey, endKey []byte, group string, retryLimit int) ([]*operator.Operator, map[uint64]error, error)
- type ResourceSplitter
- type Scheduler
- type SplitResourcesHandler
- type WaitingOperator
- type WaitingOperatorStatus
Constants ¶
const ( DispatchFromHeartBeat = "heartbeat" DispatchFromNotifierQueue = "active push" DispatchFromCreate = "create" )
The source of dispatched resource.
const DefaultCacheSize = 1000
DefaultCacheSize is the default length of waiting list.
Variables ¶
var ( // PushOperatorTickInterval is the interval try to push the operator. PushOperatorTickInterval = 500 * time.Millisecond // ContainerBalanceBaseTime represents the base time of balance rate. ContainerBalanceBaseTime float64 = 60 )
var PriorityWeight = []float64{1.0, 4.0, 9.0}
PriorityWeight is used to represent the weight of different priorities of operators.
Functions ¶
func ApplyOperator ¶
func ApplyOperator(mc *mockcluster.Cluster, op *operator.Operator)
ApplyOperator applies operator. Only for test purpose.
func ApplyOperatorStep ¶
func ApplyOperatorStep(resource *core.CachedResource, op *operator.Operator) *core.CachedResource
ApplyOperatorStep applies operator step. Only for test purpose.
func DecodeConfig ¶
DecodeConfig decode the custom config for each scheduler.
func EncodeConfig ¶
EncodeConfig encode the custom config for each scheduler.
func FindSchedulerTypeByName ¶
FindSchedulerTypeByName finds the type of the specified name.
func NewTotalOpInfluence ¶
NewTotalOpInfluence creates a OpInfluence.
func RegisterScheduler ¶
func RegisterScheduler(typ string, createFn CreateSchedulerFunc)
RegisterScheduler binds a scheduler creator. It should be called in init() func of a package.
func RegisterSliceDecoderBuilder ¶
func RegisterSliceDecoderBuilder(typ string, builder ConfigSliceDecoderBuilder)
RegisterSliceDecoderBuilder convert arguments to config. It should be called in init() func of package.
Types ¶
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket is used to maintain the operators created by a specific scheduler.
type CheckerController ¶
type CheckerController struct {
// contains filtered or unexported fields
}
CheckerController is used to manage all checkers.
func NewCheckerController ¶
func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, opController *OperatorController) *CheckerController
NewCheckerController create a new CheckerController. TODO: isSupportMerge should be removed.
func (*CheckerController) AddWaitingResource ¶
func (c *CheckerController) AddWaitingResource(res *core.CachedResource)
AddWaitingResource returns the resources in the waiting list.
func (*CheckerController) CheckResource ¶
func (c *CheckerController) CheckResource(res *core.CachedResource) []*operator.Operator
CheckResource will check the resource and add a new operator if needed.
func (*CheckerController) FillReplicas ¶
func (c *CheckerController) FillReplicas(res *core.CachedResource, leastPeers int) error
FillReplicas fill replicas for a empty resources
func (*CheckerController) GetMergeChecker ¶
func (c *CheckerController) GetMergeChecker() *checker.MergeChecker
GetMergeChecker returns the merge checker.
func (*CheckerController) GetWaitingResources ¶
func (c *CheckerController) GetWaitingResources() []*cache.Item
GetWaitingResources returns the resources in the waiting list.
func (*CheckerController) RemoveWaitingResource ¶
func (c *CheckerController) RemoveWaitingResource(id uint64)
RemoveWaitingResource removes the resource from the waiting list.
type ConfigDecoder ¶
type ConfigDecoder func(v interface{}) error
ConfigDecoder used to decode the config.
func ConfigJSONDecoder ¶
func ConfigJSONDecoder(data []byte) ConfigDecoder
ConfigJSONDecoder used to build a json decoder of the config.
func ConfigSliceDecoder ¶
func ConfigSliceDecoder(name string, args []string) ConfigDecoder
ConfigSliceDecoder the default decode for the config.
type ConfigSliceDecoderBuilder ¶
type ConfigSliceDecoderBuilder func([]string) ConfigDecoder
ConfigSliceDecoderBuilder used to build slice decoder of the config.
type CreateSchedulerFunc ¶
type CreateSchedulerFunc func(opController *OperatorController, storage storage.Storage, dec ConfigDecoder) (Scheduler, error)
CreateSchedulerFunc is for creating scheduler.
type OperatorController ¶
OperatorController is used to limit the speed of scheduling.
func NewOperatorController ¶
func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams *hbstream.HeartbeatStreams) *OperatorController
NewOperatorController creates a OperatorController.
func (*OperatorController) AddOperator ¶
func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool
AddOperator adds operators to the running operators.
func (*OperatorController) AddWaitingOperator ¶
func (oc *OperatorController) AddWaitingOperator(ops ...*operator.Operator) int
AddWaitingOperator adds operators to waiting operators.
func (*OperatorController) CollectContainerLimitMetrics ¶
func (oc *OperatorController) CollectContainerLimitMetrics()
CollectContainerLimitMetrics collects the metrics about container limit
func (*OperatorController) Ctx ¶
func (oc *OperatorController) Ctx() context.Context
Ctx returns a context which will be canceled once RaftCluster is stopped. For now, it is only used to control the lifetime of TTL cache in schedulers.
func (*OperatorController) Dispatch ¶
func (oc *OperatorController) Dispatch(res *core.CachedResource, source string)
Dispatch is used to dispatch the operator of a resource.
func (*OperatorController) DispatchDestroyDirectly ¶ added in v0.2.0
func (oc *OperatorController) DispatchDestroyDirectly(res *core.CachedResource, source string)
DispatchDestroyDirectly send DestroyDirect cmd to the current container, because the resource has been removed.
func (*OperatorController) ExceedContainerLimit ¶
func (oc *OperatorController) ExceedContainerLimit(ops ...*operator.Operator) bool
ExceedContainerLimit returns true if the container exceeds the cost limit after adding the operator. Otherwise, returns false.
func (*OperatorController) GetCluster ¶
func (oc *OperatorController) GetCluster() opt.Cluster
GetCluster exports cluster to evict-scheduler for check container status.
func (*OperatorController) GetHistory ¶
func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory
GetHistory gets operators' history.
func (*OperatorController) GetLeaderSchedulePolicy ¶
func (oc *OperatorController) GetLeaderSchedulePolicy() core.SchedulePolicy
GetLeaderSchedulePolicy is to get leader schedule policy.
func (*OperatorController) GetOpInfluence ¶
func (oc *OperatorController) GetOpInfluence(cluster opt.Cluster) operator.OpInfluence
GetOpInfluence gets OpInfluence.
func (*OperatorController) GetOperator ¶
func (oc *OperatorController) GetOperator(resID uint64) *operator.Operator
GetOperator gets a operator from the given resource.
func (*OperatorController) GetOperatorStatus ¶
func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus
GetOperatorStatus gets the operator and its status with the specify id.
func (*OperatorController) GetOperators ¶
func (oc *OperatorController) GetOperators() []*operator.Operator
GetOperators gets operators from the running operators.
func (*OperatorController) GetWaitingOperators ¶
func (oc *OperatorController) GetWaitingOperators() []*operator.Operator
GetWaitingOperators gets operators from the waiting operators.
func (*OperatorController) OperatorCount ¶
func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64
OperatorCount gets the count of operators filtered by mask.
func (*OperatorController) PromoteWaitingOperator ¶
func (oc *OperatorController) PromoteWaitingOperator()
PromoteWaitingOperator promotes operators from waiting operators.
func (*OperatorController) PruneHistory ¶
func (oc *OperatorController) PruneHistory()
PruneHistory prunes a part of operators' history.
func (*OperatorController) PushOperators ¶
func (oc *OperatorController) PushOperators()
PushOperators periodically pushes the unfinished operator to the executor(your storage application).
func (*OperatorController) RemoveOperator ¶
func (oc *OperatorController) RemoveOperator(op *operator.Operator, extra string) bool
RemoveOperator removes a operator from the running operators.
func (*OperatorController) SendScheduleCommand ¶
func (oc *OperatorController) SendScheduleCommand(res *core.CachedResource, step operator.OpStep, source string)
SendScheduleCommand sends a command to the resource.
func (*OperatorController) SetOperator ¶
func (oc *OperatorController) SetOperator(op *operator.Operator)
SetOperator is only used for test.
type OperatorRecords ¶
type OperatorRecords struct {
// contains filtered or unexported fields
}
OperatorRecords remains the operator and its status for a while.
func NewOperatorRecords ¶
func NewOperatorRecords(ctx context.Context) *OperatorRecords
NewOperatorRecords returns a OperatorRecords.
func (*OperatorRecords) Get ¶
func (o *OperatorRecords) Get(id uint64) *OperatorWithStatus
Get gets the operator and its status.
func (*OperatorRecords) Put ¶
func (o *OperatorRecords) Put(op *operator.Operator)
Put puts the operator and its status.
type OperatorWithStatus ¶
type OperatorWithStatus struct { Op *operator.Operator Status metapb.OperatorStatus }
OperatorWithStatus records the operator and its status.
func NewOperatorWithStatus ¶
func NewOperatorWithStatus(op *operator.Operator) *OperatorWithStatus
NewOperatorWithStatus creates an OperatorStatus from an operator.
func (*OperatorWithStatus) MarshalJSON ¶
func (o *OperatorWithStatus) MarshalJSON() ([]byte, error)
MarshalJSON returns the status of operator as a JSON string
type PluginInterface ¶
type PluginInterface struct {
// contains filtered or unexported fields
}
PluginInterface is used to manage all plugin.
func NewPluginInterface ¶
func NewPluginInterface(logger *zap.Logger) *PluginInterface
NewPluginInterface create a plugin interface
func (*PluginInterface) GetFunction ¶
GetFunction gets func by funcName from plugin(.so)
type RandBuckets ¶
type RandBuckets struct {
// contains filtered or unexported fields
}
RandBuckets is an implementation of waiting operators
func (*RandBuckets) GetOperator ¶
func (b *RandBuckets) GetOperator() []*operator.Operator
GetOperator gets an operator from the random buckets.
func (*RandBuckets) ListOperator ¶
func (b *RandBuckets) ListOperator() []*operator.Operator
ListOperator lists all operator in the random buckets.
func (*RandBuckets) PutOperator ¶
func (b *RandBuckets) PutOperator(op *operator.Operator)
PutOperator puts an operator into the random buckets.
type RangeCluster ¶
RangeCluster isolates the cluster by range.
func GenRangeCluster ¶
func GenRangeCluster(group uint64, cluster opt.Cluster, startKey, endKey []byte) *RangeCluster
GenRangeCluster gets a range cluster by specifying start key and end key. The cluster can only know the resources within [startKey, endKey].
func (*RangeCluster) GetAverageResourceSize ¶
func (r *RangeCluster) GetAverageResourceSize() int64
GetAverageResourceSize returns the average resource approximate size.
func (*RangeCluster) GetContainer ¶
func (r *RangeCluster) GetContainer(id uint64) *core.CachedContainer
GetContainer searches for a container by ID.
func (*RangeCluster) GetContainers ¶
func (r *RangeCluster) GetContainers() []*core.CachedContainer
GetContainers returns all Containers in the cluster.
func (*RangeCluster) GetFollowerContainers ¶
func (r *RangeCluster) GetFollowerContainers(res *core.CachedResource) []*core.CachedContainer
GetFollowerContainers returns all containers that contains the resource's follower peer.
func (*RangeCluster) GetLeaderContainer ¶
func (r *RangeCluster) GetLeaderContainer(res *core.CachedResource) *core.CachedContainer
GetLeaderContainer returns all containers that contains the resource's leader peer.
func (*RangeCluster) GetResourceContainers ¶
func (r *RangeCluster) GetResourceContainers(res *core.CachedResource) []*core.CachedContainer
GetResourceContainers returns all containers that contains the resource's peer.
func (*RangeCluster) GetTolerantSizeRatio ¶
func (r *RangeCluster) GetTolerantSizeRatio() float64
GetTolerantSizeRatio gets the tolerant size ratio.
func (*RangeCluster) RandFollowerResource ¶
func (r *RangeCluster) RandFollowerResource(groupKey string, containerID uint64, ranges []core.KeyRange, opts ...core.ResourceOption) *core.CachedResource
RandFollowerResource returns a random resource that has a follower on the Container.
func (*RangeCluster) RandLeaderResource ¶
func (r *RangeCluster) RandLeaderResource(groupKey string, containerID uint64, ranges []core.KeyRange, opts ...core.ResourceOption) *core.CachedResource
RandLeaderResource returns a random resource that has leader on the container.
func (*RangeCluster) SetTolerantSizeRatio ¶
func (r *RangeCluster) SetTolerantSizeRatio(ratio float64)
SetTolerantSizeRatio sets the tolerant size ratio.
type ResourceScatterer ¶
type ResourceScatterer struct {
// contains filtered or unexported fields
}
ResourceScatterer scatters resources.
func NewResourceScatterer ¶
func NewResourceScatterer(ctx context.Context, cluster opt.Cluster) *ResourceScatterer
NewResourceScatterer creates a resource scatterer. ResourceScatter is used for the `Lightning`, it will scatter the specified resources before import data.
func (*ResourceScatterer) Put ¶
func (r *ResourceScatterer) Put(peers map[uint64]metapb.Replica, leaderContainerID uint64, group string)
Put put the final distribution in the context no matter the operator was created
func (*ResourceScatterer) Scatter ¶
func (r *ResourceScatterer) Scatter(res *core.CachedResource, group string) (*operator.Operator, error)
Scatter relocates the resource. If the group is defined, the resources' leader with the same group would be scattered in a group level instead of cluster level.
func (*ResourceScatterer) ScatterResources ¶
func (r *ResourceScatterer) ScatterResources(resources map[uint64]*core.CachedResource, failures map[uint64]error, group string, retryLimit int) ([]*operator.Operator, error)
ScatterResources relocates the resources. If the group is defined, the resources' leader with the same group would be scattered in a group level instead of cluster level. RetryTimes indicates the retry times if any of the resources failed to relocate during scattering. There will be time.Sleep between each retry. Failures indicates the resources which are failed to be relocated, the key of the failures indicates the resID and the value of the failures indicates the failure error.
type ResourceSplitter ¶
type ResourceSplitter struct {
// contains filtered or unexported fields
}
ResourceSplitter handles split resources
func NewResourceSplitter ¶
func NewResourceSplitter(cluster opt.Cluster, handler SplitResourcesHandler) *ResourceSplitter
NewResourceSplitter return a resource splitter
func (*ResourceSplitter) SplitResources ¶
func (r *ResourceSplitter) SplitResources(ctx context.Context, group uint64, splitKeys [][]byte, retryLimit int) (int, []uint64)
SplitResources support splitResources by given split keys.
type Scheduler ¶
type Scheduler interface { http.Handler GetName() string // GetType should in accordance with the name passing to schedule.RegisterScheduler() GetType() string EncodeConfig() ([]byte, error) GetMinInterval() time.Duration GetNextInterval(interval time.Duration) time.Duration Prepare(cluster opt.Cluster) error Cleanup(cluster opt.Cluster) Schedule(cluster opt.Cluster) []*operator.Operator IsScheduleAllowed(cluster opt.Cluster) bool }
Scheduler is an interface to schedule resources.
func CreateScheduler ¶
func CreateScheduler(typ string, opController *OperatorController, storage storage.Storage, dec ConfigDecoder) (Scheduler, error)
CreateScheduler creates a scheduler with registered creator func.
type SplitResourcesHandler ¶
type SplitResourcesHandler interface { SplitResourceByKeys(res *core.CachedResource, splitKeys [][]byte) error ScanResourcesByKeyRange(group uint64, groupKeys *resourceGroupKeys, results *splitKeyResults) }
SplitResourcesHandler used to handle resource splitting
func NewSplitResourcesHandler ¶
func NewSplitResourcesHandler(cluster opt.Cluster, oc *OperatorController) SplitResourcesHandler
NewSplitResourcesHandler return SplitResourcesHandler
type WaitingOperator ¶
type WaitingOperator interface { PutOperator(op *operator.Operator) GetOperator() []*operator.Operator ListOperator() []*operator.Operator }
WaitingOperator is an interface of waiting operators.
type WaitingOperatorStatus ¶
type WaitingOperatorStatus struct {
// contains filtered or unexported fields
}
WaitingOperatorStatus is used to limit the count of each kind of operators.
func NewWaitingOperatorStatus ¶
func NewWaitingOperatorStatus() *WaitingOperatorStatus
NewWaitingOperatorStatus creates a new WaitingOperatorStatus.