Documentation ¶
Index ¶
- Constants
- Variables
- func NewReplicaLeaderIndex(task *LeaderTask) replicaSegmentIndex
- func NewReplicaSegmentIndex(task *SegmentTask) replicaSegmentIndex
- func NewScheduler(ctx context.Context, meta *meta.Meta, distMgr *meta.DistributionManager, ...) *taskScheduler
- func SetPriority(priority Priority, tasks ...Task)
- func SetReason(reason string, tasks ...Task)
- func Wait(ctx context.Context, timeout time.Duration, tasks ...Task) error
- type Action
- type ActionType
- type BaseAction
- type ChannelAction
- type ChannelTask
- func (task ChannelTask) Actions() []Action
- func (task ChannelTask) Cancel(err error)
- func (task *ChannelTask) Channel() string
- func (task ChannelTask) CollectionID() typeutil.UniqueID
- func (task ChannelTask) Context() context.Context
- func (task ChannelTask) Err() error
- func (task ChannelTask) Fail(err error)
- func (task ChannelTask) GetReason() string
- func (task ChannelTask) GetTaskLatency() int64
- func (task ChannelTask) ID() typeutil.UniqueID
- func (task *ChannelTask) Index() string
- func (task ChannelTask) IsFinished(distMgr *meta.DistributionManager) bool
- func (task ChannelTask) LoadType() querypb.LoadType
- func (task *ChannelTask) MarshalJSON() ([]byte, error)
- func (task *ChannelTask) Name() string
- func (task ChannelTask) Priority() Priority
- func (task ChannelTask) RecordStartTs()
- func (task ChannelTask) ReplicaID() typeutil.UniqueID
- func (task ChannelTask) ResourceGroup() string
- func (task ChannelTask) SetID(id typeutil.UniqueID)
- func (task ChannelTask) SetPriority(priority Priority)
- func (task ChannelTask) SetReason(reason string)
- func (task ChannelTask) SetStatus(status Status)
- func (task ChannelTask) Shard() string
- func (task ChannelTask) Source() Source
- func (task ChannelTask) Status() Status
- func (task ChannelTask) Step() int
- func (task ChannelTask) StepUp() int
- func (task *ChannelTask) String() string
- func (task ChannelTask) Wait() error
- type Executor
- type LeaderAction
- func (action *LeaderAction) Desc() string
- func (action *LeaderAction) GetLeaderID() typeutil.UniqueID
- func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool
- func (action *LeaderAction) PartStats() map[int64]int64
- func (action *LeaderAction) SegmentID() typeutil.UniqueID
- func (action *LeaderAction) String() string
- func (action *LeaderAction) Version() typeutil.UniqueID
- type LeaderTask
- func (task LeaderTask) Actions() []Action
- func (task LeaderTask) Cancel(err error)
- func (task LeaderTask) CollectionID() typeutil.UniqueID
- func (task LeaderTask) Context() context.Context
- func (task LeaderTask) Err() error
- func (task LeaderTask) Fail(err error)
- func (task LeaderTask) GetReason() string
- func (task LeaderTask) GetTaskLatency() int64
- func (task LeaderTask) ID() typeutil.UniqueID
- func (task *LeaderTask) Index() string
- func (task LeaderTask) IsFinished(distMgr *meta.DistributionManager) bool
- func (task LeaderTask) LoadType() querypb.LoadType
- func (task *LeaderTask) MarshalJSON() ([]byte, error)
- func (task *LeaderTask) Name() string
- func (task LeaderTask) Priority() Priority
- func (task LeaderTask) RecordStartTs()
- func (task LeaderTask) ReplicaID() typeutil.UniqueID
- func (task LeaderTask) ResourceGroup() string
- func (task *LeaderTask) SegmentID() typeutil.UniqueID
- func (task LeaderTask) SetID(id typeutil.UniqueID)
- func (task LeaderTask) SetPriority(priority Priority)
- func (task LeaderTask) SetReason(reason string)
- func (task LeaderTask) SetStatus(status Status)
- func (task LeaderTask) Shard() string
- func (task LeaderTask) Source() Source
- func (task LeaderTask) Status() Status
- func (task LeaderTask) Step() int
- func (task LeaderTask) StepUp() int
- func (task *LeaderTask) String() string
- func (task LeaderTask) Wait() error
- type MockScheduler
- func (_m *MockScheduler) Add(task Task) error
- func (_m *MockScheduler) AddExecutor(nodeID int64)
- func (_m *MockScheduler) Dispatch(node int64)
- func (_m *MockScheduler) EXPECT() *MockScheduler_Expecter
- func (_m *MockScheduler) GetChannelTaskDelta(nodeID int64, collectionID int64) int
- func (_m *MockScheduler) GetChannelTaskNum(filters ...TaskFilter) int
- func (_m *MockScheduler) GetExecutedFlag(nodeID int64) <-chan struct{}
- func (_m *MockScheduler) GetSegmentTaskDelta(nodeID int64, collectionID int64) int
- func (_m *MockScheduler) GetSegmentTaskNum(filters ...TaskFilter) int
- func (_m *MockScheduler) GetTasksJSON() string
- func (_m *MockScheduler) RemoveByNode(node int64)
- func (_m *MockScheduler) RemoveExecutor(nodeID int64)
- func (_m *MockScheduler) Start()
- func (_m *MockScheduler) Stop()
- type MockScheduler_AddExecutor_Call
- type MockScheduler_Add_Call
- type MockScheduler_Dispatch_Call
- type MockScheduler_Expecter
- func (_e *MockScheduler_Expecter) Add(task interface{}) *MockScheduler_Add_Call
- func (_e *MockScheduler_Expecter) AddExecutor(nodeID interface{}) *MockScheduler_AddExecutor_Call
- func (_e *MockScheduler_Expecter) Dispatch(node interface{}) *MockScheduler_Dispatch_Call
- func (_e *MockScheduler_Expecter) GetChannelTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetChannelTaskDelta_Call
- func (_e *MockScheduler_Expecter) GetChannelTaskNum(filters ...interface{}) *MockScheduler_GetChannelTaskNum_Call
- func (_e *MockScheduler_Expecter) GetExecutedFlag(nodeID interface{}) *MockScheduler_GetExecutedFlag_Call
- func (_e *MockScheduler_Expecter) GetSegmentTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetSegmentTaskDelta_Call
- func (_e *MockScheduler_Expecter) GetSegmentTaskNum(filters ...interface{}) *MockScheduler_GetSegmentTaskNum_Call
- func (_e *MockScheduler_Expecter) GetTasksJSON() *MockScheduler_GetTasksJSON_Call
- func (_e *MockScheduler_Expecter) RemoveByNode(node interface{}) *MockScheduler_RemoveByNode_Call
- func (_e *MockScheduler_Expecter) RemoveExecutor(nodeID interface{}) *MockScheduler_RemoveExecutor_Call
- func (_e *MockScheduler_Expecter) Start() *MockScheduler_Start_Call
- func (_e *MockScheduler_Expecter) Stop() *MockScheduler_Stop_Call
- type MockScheduler_GetChannelTaskDelta_Call
- func (_c *MockScheduler_GetChannelTaskDelta_Call) Return(_a0 int) *MockScheduler_GetChannelTaskDelta_Call
- func (_c *MockScheduler_GetChannelTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetChannelTaskDelta_Call
- func (_c *MockScheduler_GetChannelTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetChannelTaskDelta_Call
- type MockScheduler_GetChannelTaskNum_Call
- func (_c *MockScheduler_GetChannelTaskNum_Call) Return(_a0 int) *MockScheduler_GetChannelTaskNum_Call
- func (_c *MockScheduler_GetChannelTaskNum_Call) Run(run func(filters ...TaskFilter)) *MockScheduler_GetChannelTaskNum_Call
- func (_c *MockScheduler_GetChannelTaskNum_Call) RunAndReturn(run func(...TaskFilter) int) *MockScheduler_GetChannelTaskNum_Call
- type MockScheduler_GetExecutedFlag_Call
- func (_c *MockScheduler_GetExecutedFlag_Call) Return(_a0 <-chan struct{}) *MockScheduler_GetExecutedFlag_Call
- func (_c *MockScheduler_GetExecutedFlag_Call) Run(run func(nodeID int64)) *MockScheduler_GetExecutedFlag_Call
- func (_c *MockScheduler_GetExecutedFlag_Call) RunAndReturn(run func(int64) <-chan struct{}) *MockScheduler_GetExecutedFlag_Call
- type MockScheduler_GetSegmentTaskDelta_Call
- func (_c *MockScheduler_GetSegmentTaskDelta_Call) Return(_a0 int) *MockScheduler_GetSegmentTaskDelta_Call
- func (_c *MockScheduler_GetSegmentTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetSegmentTaskDelta_Call
- func (_c *MockScheduler_GetSegmentTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetSegmentTaskDelta_Call
- type MockScheduler_GetSegmentTaskNum_Call
- func (_c *MockScheduler_GetSegmentTaskNum_Call) Return(_a0 int) *MockScheduler_GetSegmentTaskNum_Call
- func (_c *MockScheduler_GetSegmentTaskNum_Call) Run(run func(filters ...TaskFilter)) *MockScheduler_GetSegmentTaskNum_Call
- func (_c *MockScheduler_GetSegmentTaskNum_Call) RunAndReturn(run func(...TaskFilter) int) *MockScheduler_GetSegmentTaskNum_Call
- type MockScheduler_GetTasksJSON_Call
- type MockScheduler_RemoveByNode_Call
- type MockScheduler_RemoveExecutor_Call
- func (_c *MockScheduler_RemoveExecutor_Call) Return() *MockScheduler_RemoveExecutor_Call
- func (_c *MockScheduler_RemoveExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_RemoveExecutor_Call
- func (_c *MockScheduler_RemoveExecutor_Call) RunAndReturn(run func(int64)) *MockScheduler_RemoveExecutor_Call
- type MockScheduler_Start_Call
- type MockScheduler_Stop_Call
- type Priority
- type Scheduler
- type SegmentAction
- type SegmentTask
- func (task SegmentTask) Actions() []Action
- func (task SegmentTask) Cancel(err error)
- func (task SegmentTask) CollectionID() typeutil.UniqueID
- func (task SegmentTask) Context() context.Context
- func (task SegmentTask) Err() error
- func (task SegmentTask) Fail(err error)
- func (task SegmentTask) GetReason() string
- func (task SegmentTask) GetTaskLatency() int64
- func (task SegmentTask) ID() typeutil.UniqueID
- func (task *SegmentTask) Index() string
- func (task SegmentTask) IsFinished(distMgr *meta.DistributionManager) bool
- func (task SegmentTask) LoadType() querypb.LoadType
- func (task *SegmentTask) MarshalJSON() ([]byte, error)
- func (task *SegmentTask) Name() string
- func (task SegmentTask) Priority() Priority
- func (task SegmentTask) RecordStartTs()
- func (task SegmentTask) ReplicaID() typeutil.UniqueID
- func (task SegmentTask) ResourceGroup() string
- func (task *SegmentTask) SegmentID() typeutil.UniqueID
- func (task SegmentTask) SetID(id typeutil.UniqueID)
- func (task SegmentTask) SetPriority(priority Priority)
- func (task SegmentTask) SetReason(reason string)
- func (task SegmentTask) SetStatus(status Status)
- func (task SegmentTask) Shard() string
- func (task SegmentTask) Source() Source
- func (task SegmentTask) Status() Status
- func (task SegmentTask) Step() int
- func (task SegmentTask) StepUp() int
- func (task *SegmentTask) String() string
- func (task SegmentTask) Wait() error
- type Source
- type Status
- type Task
- type TaskFilter
- type Type
Constants ¶
const ( TaskStatusCreated = "created" TaskStatusStarted = "started" TaskStatusSucceeded = "succeeded" TaskStatusCanceled = "canceled" TaskStatusFailed = "failed" )
Variables ¶
var ActionTypeName = map[ActionType]string{ ActionTypeGrow: "Grow", ActionTypeReduce: "Reduce", ActionTypeUpdate: "Update", }
var TaskPriorities = []Priority{TaskPriorityLow, TaskPriorityNormal, TaskPriorityHigh}
All task priorities from low to high
var TaskPriorityName = map[Priority]string{ TaskPriorityLow: "Low", TaskPriorityNormal: "Normal", TaskPriorityHigh: "High", }
var TaskTypeName = map[Type]string{ TaskTypeGrow: "Grow", TaskTypeReduce: "Reduce", TaskTypeMove: "Move", TaskTypeUpdate: "Update", }
Functions ¶
func NewReplicaLeaderIndex ¶
func NewReplicaLeaderIndex(task *LeaderTask) replicaSegmentIndex
func NewReplicaSegmentIndex ¶
func NewReplicaSegmentIndex(task *SegmentTask) replicaSegmentIndex
func NewScheduler ¶
func NewScheduler(ctx context.Context, meta *meta.Meta, distMgr *meta.DistributionManager, targetMgr meta.TargetManagerInterface, broker meta.Broker, cluster session.Cluster, nodeMgr *session.NodeManager, ) *taskScheduler
func SetPriority ¶
Types ¶
type Action ¶
type Action interface { Node() int64 Type() ActionType IsFinished(distMgr *meta.DistributionManager) bool Desc() string String() string }
type ActionType ¶
type ActionType int32
const ( ActionTypeGrow ActionType = iota + 1 ActionTypeReduce ActionTypeUpdate )
func (ActionType) String ¶
func (t ActionType) String() string
type BaseAction ¶
type BaseAction struct { NodeID typeutil.UniqueID Typ ActionType Shard string }
func NewBaseAction ¶
func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string) *BaseAction
func (*BaseAction) GetShard ¶
func (action *BaseAction) GetShard() string
func (*BaseAction) Node ¶
func (action *BaseAction) Node() int64
func (*BaseAction) String ¶
func (action *BaseAction) String() string
func (*BaseAction) Type ¶
func (action *BaseAction) Type() ActionType
type ChannelAction ¶
type ChannelAction struct {
*BaseAction
}
func NewChannelAction ¶
func NewChannelAction(nodeID typeutil.UniqueID, typ ActionType, channelName string) *ChannelAction
func (*ChannelAction) ChannelName ¶
func (action *ChannelAction) ChannelName() string
func (*ChannelAction) Desc ¶
func (action *ChannelAction) Desc() string
func (*ChannelAction) IsFinished ¶
func (action *ChannelAction) IsFinished(distMgr *meta.DistributionManager) bool
type ChannelTask ¶
type ChannelTask struct {
// contains filtered or unexported fields
}
func NewChannelTask ¶
func NewChannelTask(ctx context.Context, timeout time.Duration, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, actions ...Action, ) (*ChannelTask, error)
NewChannelTask creates a ChannelTask with actions, all actions must process the same channel, and the same type of channel empty actions is not allowed
func (*ChannelTask) Channel ¶
func (task *ChannelTask) Channel() string
func (ChannelTask) CollectionID ¶
func (ChannelTask) GetTaskLatency ¶
func (task ChannelTask) GetTaskLatency() int64
func (*ChannelTask) Index ¶
func (task *ChannelTask) Index() string
func (ChannelTask) IsFinished ¶
func (task ChannelTask) IsFinished(distMgr *meta.DistributionManager) bool
func (*ChannelTask) MarshalJSON ¶
func (task *ChannelTask) MarshalJSON() ([]byte, error)
func (*ChannelTask) Name ¶
func (task *ChannelTask) Name() string
func (ChannelTask) RecordStartTs ¶
func (task ChannelTask) RecordStartTs()
func (ChannelTask) ResourceGroup ¶
func (task ChannelTask) ResourceGroup() string
func (ChannelTask) SetPriority ¶
func (task ChannelTask) SetPriority(priority Priority)
func (*ChannelTask) String ¶
func (task *ChannelTask) String() string
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(meta *meta.Meta, dist *meta.DistributionManager, broker meta.Broker, targetMgr meta.TargetManagerInterface, cluster session.Cluster, nodeMgr *session.NodeManager, ) *Executor
func (*Executor) Execute ¶
Execute executes the given action, does nothing and returns false if the action is already committed, returns true otherwise.
func (*Executor) GetExecutedFlag ¶
func (ex *Executor) GetExecutedFlag() <-chan struct{}
type LeaderAction ¶
type LeaderAction struct { *BaseAction // contains filtered or unexported fields }
func NewLeaderAction ¶
func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, version typeutil.UniqueID) *LeaderAction
func NewLeaderUpdatePartStatsAction ¶
func NewLeaderUpdatePartStatsAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, partStatsVersions map[int64]int64) *LeaderAction
func (*LeaderAction) Desc ¶
func (action *LeaderAction) Desc() string
func (*LeaderAction) GetLeaderID ¶
func (action *LeaderAction) GetLeaderID() typeutil.UniqueID
func (*LeaderAction) IsFinished ¶
func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool
func (*LeaderAction) PartStats ¶
func (action *LeaderAction) PartStats() map[int64]int64
func (*LeaderAction) SegmentID ¶
func (action *LeaderAction) SegmentID() typeutil.UniqueID
func (*LeaderAction) String ¶
func (action *LeaderAction) String() string
func (*LeaderAction) Version ¶
func (action *LeaderAction) Version() typeutil.UniqueID
type LeaderTask ¶
type LeaderTask struct {
// contains filtered or unexported fields
}
func NewLeaderPartStatsTask ¶
func NewLeaderPartStatsTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, leaderID int64, action *LeaderAction, ) *LeaderTask
func NewLeaderSegmentTask ¶
func NewLeaderSegmentTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, leaderID int64, action *LeaderAction, ) *LeaderTask
func (LeaderTask) CollectionID ¶
func (LeaderTask) GetTaskLatency ¶
func (task LeaderTask) GetTaskLatency() int64
func (*LeaderTask) Index ¶
func (task *LeaderTask) Index() string
func (LeaderTask) IsFinished ¶
func (task LeaderTask) IsFinished(distMgr *meta.DistributionManager) bool
func (*LeaderTask) MarshalJSON ¶
func (task *LeaderTask) MarshalJSON() ([]byte, error)
func (*LeaderTask) Name ¶
func (task *LeaderTask) Name() string
func (LeaderTask) RecordStartTs ¶
func (task LeaderTask) RecordStartTs()
func (LeaderTask) ResourceGroup ¶
func (task LeaderTask) ResourceGroup() string
func (*LeaderTask) SegmentID ¶
func (task *LeaderTask) SegmentID() typeutil.UniqueID
func (LeaderTask) SetPriority ¶
func (task LeaderTask) SetPriority(priority Priority)
func (*LeaderTask) String ¶
func (task *LeaderTask) String() string
type MockScheduler ¶
MockScheduler is an autogenerated mock type for the Scheduler type
func NewMockScheduler ¶
func NewMockScheduler(t interface { mock.TestingT Cleanup(func()) }) *MockScheduler
NewMockScheduler creates a new instance of MockScheduler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockScheduler) Add ¶
func (_m *MockScheduler) Add(task Task) error
Add provides a mock function with given fields: task
func (*MockScheduler) AddExecutor ¶
func (_m *MockScheduler) AddExecutor(nodeID int64)
AddExecutor provides a mock function with given fields: nodeID
func (*MockScheduler) Dispatch ¶
func (_m *MockScheduler) Dispatch(node int64)
Dispatch provides a mock function with given fields: node
func (*MockScheduler) EXPECT ¶
func (_m *MockScheduler) EXPECT() *MockScheduler_Expecter
func (*MockScheduler) GetChannelTaskDelta ¶
func (_m *MockScheduler) GetChannelTaskDelta(nodeID int64, collectionID int64) int
GetChannelTaskDelta provides a mock function with given fields: nodeID, collectionID
func (*MockScheduler) GetChannelTaskNum ¶
func (_m *MockScheduler) GetChannelTaskNum(filters ...TaskFilter) int
GetChannelTaskNum provides a mock function with given fields: filters
func (*MockScheduler) GetExecutedFlag ¶
func (_m *MockScheduler) GetExecutedFlag(nodeID int64) <-chan struct{}
GetExecutedFlag provides a mock function with given fields: nodeID
func (*MockScheduler) GetSegmentTaskDelta ¶
func (_m *MockScheduler) GetSegmentTaskDelta(nodeID int64, collectionID int64) int
GetSegmentTaskDelta provides a mock function with given fields: nodeID, collectionID
func (*MockScheduler) GetSegmentTaskNum ¶
func (_m *MockScheduler) GetSegmentTaskNum(filters ...TaskFilter) int
GetSegmentTaskNum provides a mock function with given fields: filters
func (*MockScheduler) GetTasksJSON ¶
func (_m *MockScheduler) GetTasksJSON() string
GetTasksJSON provides a mock function with given fields:
func (*MockScheduler) RemoveByNode ¶
func (_m *MockScheduler) RemoveByNode(node int64)
RemoveByNode provides a mock function with given fields: node
func (*MockScheduler) RemoveExecutor ¶
func (_m *MockScheduler) RemoveExecutor(nodeID int64)
RemoveExecutor provides a mock function with given fields: nodeID
func (*MockScheduler) Start ¶
func (_m *MockScheduler) Start()
Start provides a mock function with given fields:
func (*MockScheduler) Stop ¶
func (_m *MockScheduler) Stop()
Stop provides a mock function with given fields:
type MockScheduler_AddExecutor_Call ¶
MockScheduler_AddExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddExecutor'
func (*MockScheduler_AddExecutor_Call) Return ¶
func (_c *MockScheduler_AddExecutor_Call) Return() *MockScheduler_AddExecutor_Call
func (*MockScheduler_AddExecutor_Call) Run ¶
func (_c *MockScheduler_AddExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_AddExecutor_Call
func (*MockScheduler_AddExecutor_Call) RunAndReturn ¶
func (_c *MockScheduler_AddExecutor_Call) RunAndReturn(run func(int64)) *MockScheduler_AddExecutor_Call
type MockScheduler_Add_Call ¶
MockScheduler_Add_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Add'
func (*MockScheduler_Add_Call) Return ¶
func (_c *MockScheduler_Add_Call) Return(_a0 error) *MockScheduler_Add_Call
func (*MockScheduler_Add_Call) Run ¶
func (_c *MockScheduler_Add_Call) Run(run func(task Task)) *MockScheduler_Add_Call
func (*MockScheduler_Add_Call) RunAndReturn ¶
func (_c *MockScheduler_Add_Call) RunAndReturn(run func(Task) error) *MockScheduler_Add_Call
type MockScheduler_Dispatch_Call ¶
MockScheduler_Dispatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Dispatch'
func (*MockScheduler_Dispatch_Call) Return ¶
func (_c *MockScheduler_Dispatch_Call) Return() *MockScheduler_Dispatch_Call
func (*MockScheduler_Dispatch_Call) Run ¶
func (_c *MockScheduler_Dispatch_Call) Run(run func(node int64)) *MockScheduler_Dispatch_Call
func (*MockScheduler_Dispatch_Call) RunAndReturn ¶
func (_c *MockScheduler_Dispatch_Call) RunAndReturn(run func(int64)) *MockScheduler_Dispatch_Call
type MockScheduler_Expecter ¶
type MockScheduler_Expecter struct {
// contains filtered or unexported fields
}
func (*MockScheduler_Expecter) Add ¶
func (_e *MockScheduler_Expecter) Add(task interface{}) *MockScheduler_Add_Call
Add is a helper method to define mock.On call
- task Task
func (*MockScheduler_Expecter) AddExecutor ¶
func (_e *MockScheduler_Expecter) AddExecutor(nodeID interface{}) *MockScheduler_AddExecutor_Call
AddExecutor is a helper method to define mock.On call
- nodeID int64
func (*MockScheduler_Expecter) Dispatch ¶
func (_e *MockScheduler_Expecter) Dispatch(node interface{}) *MockScheduler_Dispatch_Call
Dispatch is a helper method to define mock.On call
- node int64
func (*MockScheduler_Expecter) GetChannelTaskDelta ¶
func (_e *MockScheduler_Expecter) GetChannelTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetChannelTaskDelta_Call
GetChannelTaskDelta is a helper method to define mock.On call
- nodeID int64
- collectionID int64
func (*MockScheduler_Expecter) GetChannelTaskNum ¶
func (_e *MockScheduler_Expecter) GetChannelTaskNum(filters ...interface{}) *MockScheduler_GetChannelTaskNum_Call
GetChannelTaskNum is a helper method to define mock.On call
- filters ...TaskFilter
func (*MockScheduler_Expecter) GetExecutedFlag ¶
func (_e *MockScheduler_Expecter) GetExecutedFlag(nodeID interface{}) *MockScheduler_GetExecutedFlag_Call
GetExecutedFlag is a helper method to define mock.On call
- nodeID int64
func (*MockScheduler_Expecter) GetSegmentTaskDelta ¶
func (_e *MockScheduler_Expecter) GetSegmentTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetSegmentTaskDelta_Call
GetSegmentTaskDelta is a helper method to define mock.On call
- nodeID int64
- collectionID int64
func (*MockScheduler_Expecter) GetSegmentTaskNum ¶
func (_e *MockScheduler_Expecter) GetSegmentTaskNum(filters ...interface{}) *MockScheduler_GetSegmentTaskNum_Call
GetSegmentTaskNum is a helper method to define mock.On call
- filters ...TaskFilter
func (*MockScheduler_Expecter) GetTasksJSON ¶
func (_e *MockScheduler_Expecter) GetTasksJSON() *MockScheduler_GetTasksJSON_Call
GetTasksJSON is a helper method to define mock.On call
func (*MockScheduler_Expecter) RemoveByNode ¶
func (_e *MockScheduler_Expecter) RemoveByNode(node interface{}) *MockScheduler_RemoveByNode_Call
RemoveByNode is a helper method to define mock.On call
- node int64
func (*MockScheduler_Expecter) RemoveExecutor ¶
func (_e *MockScheduler_Expecter) RemoveExecutor(nodeID interface{}) *MockScheduler_RemoveExecutor_Call
RemoveExecutor is a helper method to define mock.On call
- nodeID int64
func (*MockScheduler_Expecter) Start ¶
func (_e *MockScheduler_Expecter) Start() *MockScheduler_Start_Call
Start is a helper method to define mock.On call
func (*MockScheduler_Expecter) Stop ¶
func (_e *MockScheduler_Expecter) Stop() *MockScheduler_Stop_Call
Stop is a helper method to define mock.On call
type MockScheduler_GetChannelTaskDelta_Call ¶
MockScheduler_GetChannelTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelTaskDelta'
func (*MockScheduler_GetChannelTaskDelta_Call) Return ¶
func (_c *MockScheduler_GetChannelTaskDelta_Call) Return(_a0 int) *MockScheduler_GetChannelTaskDelta_Call
func (*MockScheduler_GetChannelTaskDelta_Call) Run ¶
func (_c *MockScheduler_GetChannelTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetChannelTaskDelta_Call
func (*MockScheduler_GetChannelTaskDelta_Call) RunAndReturn ¶
func (_c *MockScheduler_GetChannelTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetChannelTaskDelta_Call
type MockScheduler_GetChannelTaskNum_Call ¶
MockScheduler_GetChannelTaskNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelTaskNum'
func (*MockScheduler_GetChannelTaskNum_Call) Return ¶
func (_c *MockScheduler_GetChannelTaskNum_Call) Return(_a0 int) *MockScheduler_GetChannelTaskNum_Call
func (*MockScheduler_GetChannelTaskNum_Call) Run ¶
func (_c *MockScheduler_GetChannelTaskNum_Call) Run(run func(filters ...TaskFilter)) *MockScheduler_GetChannelTaskNum_Call
func (*MockScheduler_GetChannelTaskNum_Call) RunAndReturn ¶
func (_c *MockScheduler_GetChannelTaskNum_Call) RunAndReturn(run func(...TaskFilter) int) *MockScheduler_GetChannelTaskNum_Call
type MockScheduler_GetExecutedFlag_Call ¶
MockScheduler_GetExecutedFlag_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetExecutedFlag'
func (*MockScheduler_GetExecutedFlag_Call) Return ¶
func (_c *MockScheduler_GetExecutedFlag_Call) Return(_a0 <-chan struct{}) *MockScheduler_GetExecutedFlag_Call
func (*MockScheduler_GetExecutedFlag_Call) Run ¶
func (_c *MockScheduler_GetExecutedFlag_Call) Run(run func(nodeID int64)) *MockScheduler_GetExecutedFlag_Call
func (*MockScheduler_GetExecutedFlag_Call) RunAndReturn ¶
func (_c *MockScheduler_GetExecutedFlag_Call) RunAndReturn(run func(int64) <-chan struct{}) *MockScheduler_GetExecutedFlag_Call
type MockScheduler_GetSegmentTaskDelta_Call ¶
MockScheduler_GetSegmentTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentTaskDelta'
func (*MockScheduler_GetSegmentTaskDelta_Call) Return ¶
func (_c *MockScheduler_GetSegmentTaskDelta_Call) Return(_a0 int) *MockScheduler_GetSegmentTaskDelta_Call
func (*MockScheduler_GetSegmentTaskDelta_Call) Run ¶
func (_c *MockScheduler_GetSegmentTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetSegmentTaskDelta_Call
func (*MockScheduler_GetSegmentTaskDelta_Call) RunAndReturn ¶
func (_c *MockScheduler_GetSegmentTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetSegmentTaskDelta_Call
type MockScheduler_GetSegmentTaskNum_Call ¶
MockScheduler_GetSegmentTaskNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentTaskNum'
func (*MockScheduler_GetSegmentTaskNum_Call) Return ¶
func (_c *MockScheduler_GetSegmentTaskNum_Call) Return(_a0 int) *MockScheduler_GetSegmentTaskNum_Call
func (*MockScheduler_GetSegmentTaskNum_Call) Run ¶
func (_c *MockScheduler_GetSegmentTaskNum_Call) Run(run func(filters ...TaskFilter)) *MockScheduler_GetSegmentTaskNum_Call
func (*MockScheduler_GetSegmentTaskNum_Call) RunAndReturn ¶
func (_c *MockScheduler_GetSegmentTaskNum_Call) RunAndReturn(run func(...TaskFilter) int) *MockScheduler_GetSegmentTaskNum_Call
type MockScheduler_GetTasksJSON_Call ¶
MockScheduler_GetTasksJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTasksJSON'
func (*MockScheduler_GetTasksJSON_Call) Return ¶
func (_c *MockScheduler_GetTasksJSON_Call) Return(_a0 string) *MockScheduler_GetTasksJSON_Call
func (*MockScheduler_GetTasksJSON_Call) Run ¶
func (_c *MockScheduler_GetTasksJSON_Call) Run(run func()) *MockScheduler_GetTasksJSON_Call
func (*MockScheduler_GetTasksJSON_Call) RunAndReturn ¶
func (_c *MockScheduler_GetTasksJSON_Call) RunAndReturn(run func() string) *MockScheduler_GetTasksJSON_Call
type MockScheduler_RemoveByNode_Call ¶
MockScheduler_RemoveByNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveByNode'
func (*MockScheduler_RemoveByNode_Call) Return ¶
func (_c *MockScheduler_RemoveByNode_Call) Return() *MockScheduler_RemoveByNode_Call
func (*MockScheduler_RemoveByNode_Call) Run ¶
func (_c *MockScheduler_RemoveByNode_Call) Run(run func(node int64)) *MockScheduler_RemoveByNode_Call
func (*MockScheduler_RemoveByNode_Call) RunAndReturn ¶
func (_c *MockScheduler_RemoveByNode_Call) RunAndReturn(run func(int64)) *MockScheduler_RemoveByNode_Call
type MockScheduler_RemoveExecutor_Call ¶
MockScheduler_RemoveExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveExecutor'
func (*MockScheduler_RemoveExecutor_Call) Return ¶
func (_c *MockScheduler_RemoveExecutor_Call) Return() *MockScheduler_RemoveExecutor_Call
func (*MockScheduler_RemoveExecutor_Call) Run ¶
func (_c *MockScheduler_RemoveExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_RemoveExecutor_Call
func (*MockScheduler_RemoveExecutor_Call) RunAndReturn ¶
func (_c *MockScheduler_RemoveExecutor_Call) RunAndReturn(run func(int64)) *MockScheduler_RemoveExecutor_Call
type MockScheduler_Start_Call ¶
MockScheduler_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
func (*MockScheduler_Start_Call) Return ¶
func (_c *MockScheduler_Start_Call) Return() *MockScheduler_Start_Call
func (*MockScheduler_Start_Call) Run ¶
func (_c *MockScheduler_Start_Call) Run(run func()) *MockScheduler_Start_Call
func (*MockScheduler_Start_Call) RunAndReturn ¶
func (_c *MockScheduler_Start_Call) RunAndReturn(run func()) *MockScheduler_Start_Call
type MockScheduler_Stop_Call ¶
MockScheduler_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
func (*MockScheduler_Stop_Call) Return ¶
func (_c *MockScheduler_Stop_Call) Return() *MockScheduler_Stop_Call
func (*MockScheduler_Stop_Call) Run ¶
func (_c *MockScheduler_Stop_Call) Run(run func()) *MockScheduler_Stop_Call
func (*MockScheduler_Stop_Call) RunAndReturn ¶
func (_c *MockScheduler_Stop_Call) RunAndReturn(run func()) *MockScheduler_Stop_Call
type Scheduler ¶
type Scheduler interface { Start() Stop() AddExecutor(nodeID int64) RemoveExecutor(nodeID int64) Add(task Task) error Dispatch(node int64) RemoveByNode(node int64) GetExecutedFlag(nodeID int64) <-chan struct{} GetChannelTaskNum(filters ...TaskFilter) int GetSegmentTaskNum(filters ...TaskFilter) int GetTasksJSON() string GetSegmentTaskDelta(nodeID int64, collectionID int64) int GetChannelTaskDelta(nodeID int64, collectionID int64) int }
type SegmentAction ¶
type SegmentAction struct { *BaseAction SegmentID typeutil.UniqueID Scope querypb.DataScope // contains filtered or unexported fields }
func NewSegmentAction ¶
func NewSegmentAction(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID) *SegmentAction
func NewSegmentActionWithScope ¶
func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope) *SegmentAction
func (*SegmentAction) Desc ¶
func (action *SegmentAction) Desc() string
func (*SegmentAction) GetScope ¶
func (action *SegmentAction) GetScope() querypb.DataScope
func (*SegmentAction) GetSegmentID ¶
func (action *SegmentAction) GetSegmentID() typeutil.UniqueID
func (*SegmentAction) IsFinished ¶
func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool
func (*SegmentAction) String ¶
func (action *SegmentAction) String() string
type SegmentTask ¶
type SegmentTask struct {
// contains filtered or unexported fields
}
func NewSegmentTask ¶
func NewSegmentTask(ctx context.Context, timeout time.Duration, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, actions ...Action, ) (*SegmentTask, error)
NewSegmentTask creates a SegmentTask with actions, all actions must process the same segment, empty actions is not allowed
func (SegmentTask) CollectionID ¶
func (SegmentTask) GetTaskLatency ¶
func (task SegmentTask) GetTaskLatency() int64
func (*SegmentTask) Index ¶
func (task *SegmentTask) Index() string
func (SegmentTask) IsFinished ¶
func (task SegmentTask) IsFinished(distMgr *meta.DistributionManager) bool
func (*SegmentTask) MarshalJSON ¶
func (task *SegmentTask) MarshalJSON() ([]byte, error)
func (*SegmentTask) Name ¶
func (task *SegmentTask) Name() string
func (SegmentTask) RecordStartTs ¶
func (task SegmentTask) RecordStartTs()
func (SegmentTask) ResourceGroup ¶
func (task SegmentTask) ResourceGroup() string
func (*SegmentTask) SegmentID ¶
func (task *SegmentTask) SegmentID() typeutil.UniqueID
func (SegmentTask) SetPriority ¶
func (task SegmentTask) SetPriority(priority Priority)
func (*SegmentTask) String ¶
func (task *SegmentTask) String() string
type Task ¶
type Task interface { Context() context.Context Source() Source ID() typeutil.UniqueID CollectionID() typeutil.UniqueID // Return 0 if the task is a reduce task without given replica. ReplicaID() typeutil.UniqueID // Return "" if the task is a reduce task without given replica. ResourceGroup() string Shard() string SetID(id typeutil.UniqueID) Status() Status SetStatus(status Status) Err() error Priority() Priority SetPriority(priority Priority) Index() string // dedup indexing string // cancel the task as we don't need to continue it Cancel(err error) // fail the task as we encounter some error so be unable to continue, // this error will be recorded for response to user requests Fail(err error) Wait() error Actions() []Action Step() int StepUp() int IsFinished(dist *meta.DistributionManager) bool SetReason(reason string) String() string // MarshalJSON marshal task info to json MarshalJSON() ([]byte, error) Name() string GetReason() string RecordStartTs() GetTaskLatency() int64 }
type TaskFilter ¶
func WithCollectionID2TaskFilter ¶
func WithCollectionID2TaskFilter(collectionID int64) TaskFilter
func WithTaskTypeFilter ¶
func WithTaskTypeFilter(taskType Type) TaskFilter
type Type ¶
type Type int32
func GetTaskType ¶
GetTaskType returns the task's type, for now, only 3 types; - only 1 grow action -> Grow - only 1 reduce action -> Reduce - 1 grow action, and ends with 1 reduce action -> Move