task

package
v0.10.3-0...-74752f2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2024 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskStatusCreated   = "created"
	TaskStatusStarted   = "started"
	TaskStatusSucceeded = "succeeded"
	TaskStatusCanceled  = "canceled"
	TaskStatusFailed    = "failed"
)

Variables

View Source
var ActionTypeName = map[ActionType]string{
	ActionTypeGrow:   "Grow",
	ActionTypeReduce: "Reduce",
	ActionTypeUpdate: "Update",
}

All task priorities from low to high

View Source
var TaskPriorityName = map[Priority]string{
	TaskPriorityLow:    "Low",
	TaskPriorityNormal: "Normal",
	TaskPriorityHigh:   "High",
}
View Source
var TaskTypeName = map[Type]string{
	TaskTypeGrow:   "Grow",
	TaskTypeReduce: "Reduce",
	TaskTypeMove:   "Move",
	TaskTypeUpdate: "Update",
}

Functions

func NewReplicaLeaderIndex

func NewReplicaLeaderIndex(task *LeaderTask) replicaSegmentIndex

func NewReplicaSegmentIndex

func NewReplicaSegmentIndex(task *SegmentTask) replicaSegmentIndex

func NewScheduler

func NewScheduler(ctx context.Context,
	meta *meta.Meta,
	distMgr *meta.DistributionManager,
	targetMgr meta.TargetManagerInterface,
	broker meta.Broker,
	cluster session.Cluster,
	nodeMgr *session.NodeManager,
) *taskScheduler

func SetPriority

func SetPriority(priority Priority, tasks ...Task)

func SetReason

func SetReason(reason string, tasks ...Task)

func Wait

func Wait(ctx context.Context, timeout time.Duration, tasks ...Task) error

Types

type Action

type Action interface {
	Node() int64
	Type() ActionType
	IsFinished(distMgr *meta.DistributionManager) bool
	Desc() string
	String() string
}

type ActionType

type ActionType int32
const (
	ActionTypeGrow ActionType = iota + 1
	ActionTypeReduce
	ActionTypeUpdate
)

func (ActionType) String

func (t ActionType) String() string

type BaseAction

type BaseAction struct {
	NodeID typeutil.UniqueID
	Typ    ActionType
	Shard  string
}

func NewBaseAction

func NewBaseAction(nodeID typeutil.UniqueID, typ ActionType, shard string) *BaseAction

func (*BaseAction) GetShard

func (action *BaseAction) GetShard() string

func (*BaseAction) Node

func (action *BaseAction) Node() int64

func (*BaseAction) String

func (action *BaseAction) String() string

func (*BaseAction) Type

func (action *BaseAction) Type() ActionType

type ChannelAction

type ChannelAction struct {
	*BaseAction
}

func NewChannelAction

func NewChannelAction(nodeID typeutil.UniqueID, typ ActionType, channelName string) *ChannelAction

func (*ChannelAction) ChannelName

func (action *ChannelAction) ChannelName() string

func (*ChannelAction) Desc

func (action *ChannelAction) Desc() string

func (*ChannelAction) IsFinished

func (action *ChannelAction) IsFinished(distMgr *meta.DistributionManager) bool

type ChannelTask

type ChannelTask struct {
	// contains filtered or unexported fields
}

func NewChannelTask

func NewChannelTask(ctx context.Context,
	timeout time.Duration,
	source Source,
	collectionID typeutil.UniqueID,
	replica *meta.Replica,
	actions ...Action,
) (*ChannelTask, error)

NewChannelTask creates a ChannelTask with actions, all actions must process the same channel, and the same type of channel empty actions is not allowed

func (ChannelTask) Actions

func (task ChannelTask) Actions() []Action

func (ChannelTask) Cancel

func (task ChannelTask) Cancel(err error)

func (*ChannelTask) Channel

func (task *ChannelTask) Channel() string

func (ChannelTask) CollectionID

func (task ChannelTask) CollectionID() typeutil.UniqueID

func (ChannelTask) Context

func (task ChannelTask) Context() context.Context

func (ChannelTask) Err

func (task ChannelTask) Err() error

func (ChannelTask) Fail

func (task ChannelTask) Fail(err error)

func (ChannelTask) GetReason

func (task ChannelTask) GetReason() string

func (ChannelTask) GetTaskLatency

func (task ChannelTask) GetTaskLatency() int64

func (ChannelTask) ID

func (task ChannelTask) ID() typeutil.UniqueID

func (*ChannelTask) Index

func (task *ChannelTask) Index() string

func (ChannelTask) IsFinished

func (task ChannelTask) IsFinished(distMgr *meta.DistributionManager) bool

func (ChannelTask) LoadType

func (task ChannelTask) LoadType() querypb.LoadType

func (*ChannelTask) MarshalJSON

func (task *ChannelTask) MarshalJSON() ([]byte, error)

func (*ChannelTask) Name

func (task *ChannelTask) Name() string

func (ChannelTask) Priority

func (task ChannelTask) Priority() Priority

func (ChannelTask) RecordStartTs

func (task ChannelTask) RecordStartTs()

func (ChannelTask) ReplicaID

func (task ChannelTask) ReplicaID() typeutil.UniqueID

func (ChannelTask) ResourceGroup

func (task ChannelTask) ResourceGroup() string

func (ChannelTask) SetID

func (task ChannelTask) SetID(id typeutil.UniqueID)

func (ChannelTask) SetPriority

func (task ChannelTask) SetPriority(priority Priority)

func (ChannelTask) SetReason

func (task ChannelTask) SetReason(reason string)

func (ChannelTask) SetStatus

func (task ChannelTask) SetStatus(status Status)

func (ChannelTask) Shard

func (task ChannelTask) Shard() string

func (ChannelTask) Source

func (task ChannelTask) Source() Source

func (ChannelTask) Status

func (task ChannelTask) Status() Status

func (ChannelTask) Step

func (task ChannelTask) Step() int

func (ChannelTask) StepUp

func (task ChannelTask) StepUp() int

func (*ChannelTask) String

func (task *ChannelTask) String() string

func (ChannelTask) Wait

func (task ChannelTask) Wait() error

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(meta *meta.Meta,
	dist *meta.DistributionManager,
	broker meta.Broker,
	targetMgr meta.TargetManagerInterface,
	cluster session.Cluster,
	nodeMgr *session.NodeManager,
) *Executor

func (*Executor) Execute

func (ex *Executor) Execute(task Task, step int) bool

Execute executes the given action, does nothing and returns false if the action is already committed, returns true otherwise.

func (*Executor) GetExecutedFlag

func (ex *Executor) GetExecutedFlag() <-chan struct{}

func (*Executor) Start

func (ex *Executor) Start(ctx context.Context)

func (*Executor) Stop

func (ex *Executor) Stop()

type LeaderAction

type LeaderAction struct {
	*BaseAction
	// contains filtered or unexported fields
}

func NewLeaderAction

func NewLeaderAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, version typeutil.UniqueID) *LeaderAction

func NewLeaderUpdatePartStatsAction

func NewLeaderUpdatePartStatsAction(leaderID, workerID typeutil.UniqueID, typ ActionType, shard string, partStatsVersions map[int64]int64) *LeaderAction

func (*LeaderAction) Desc

func (action *LeaderAction) Desc() string

func (*LeaderAction) GetLeaderID

func (action *LeaderAction) GetLeaderID() typeutil.UniqueID

func (*LeaderAction) IsFinished

func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool

func (*LeaderAction) PartStats

func (action *LeaderAction) PartStats() map[int64]int64

func (*LeaderAction) SegmentID

func (action *LeaderAction) SegmentID() typeutil.UniqueID

func (*LeaderAction) String

func (action *LeaderAction) String() string

func (*LeaderAction) Version

func (action *LeaderAction) Version() typeutil.UniqueID

type LeaderTask

type LeaderTask struct {
	// contains filtered or unexported fields
}

func NewLeaderPartStatsTask

func NewLeaderPartStatsTask(ctx context.Context,
	source Source,
	collectionID typeutil.UniqueID,
	replica *meta.Replica,
	leaderID int64,
	action *LeaderAction,
) *LeaderTask

func NewLeaderSegmentTask

func NewLeaderSegmentTask(ctx context.Context,
	source Source,
	collectionID typeutil.UniqueID,
	replica *meta.Replica,
	leaderID int64,
	action *LeaderAction,
) *LeaderTask

func (LeaderTask) Actions

func (task LeaderTask) Actions() []Action

func (LeaderTask) Cancel

func (task LeaderTask) Cancel(err error)

func (LeaderTask) CollectionID

func (task LeaderTask) CollectionID() typeutil.UniqueID

func (LeaderTask) Context

func (task LeaderTask) Context() context.Context

func (LeaderTask) Err

func (task LeaderTask) Err() error

func (LeaderTask) Fail

func (task LeaderTask) Fail(err error)

func (LeaderTask) GetReason

func (task LeaderTask) GetReason() string

func (LeaderTask) GetTaskLatency

func (task LeaderTask) GetTaskLatency() int64

func (LeaderTask) ID

func (task LeaderTask) ID() typeutil.UniqueID

func (*LeaderTask) Index

func (task *LeaderTask) Index() string

func (LeaderTask) IsFinished

func (task LeaderTask) IsFinished(distMgr *meta.DistributionManager) bool

func (LeaderTask) LoadType

func (task LeaderTask) LoadType() querypb.LoadType

func (*LeaderTask) MarshalJSON

func (task *LeaderTask) MarshalJSON() ([]byte, error)

func (*LeaderTask) Name

func (task *LeaderTask) Name() string

func (LeaderTask) Priority

func (task LeaderTask) Priority() Priority

func (LeaderTask) RecordStartTs

func (task LeaderTask) RecordStartTs()

func (LeaderTask) ReplicaID

func (task LeaderTask) ReplicaID() typeutil.UniqueID

func (LeaderTask) ResourceGroup

func (task LeaderTask) ResourceGroup() string

func (*LeaderTask) SegmentID

func (task *LeaderTask) SegmentID() typeutil.UniqueID

func (LeaderTask) SetID

func (task LeaderTask) SetID(id typeutil.UniqueID)

func (LeaderTask) SetPriority

func (task LeaderTask) SetPriority(priority Priority)

func (LeaderTask) SetReason

func (task LeaderTask) SetReason(reason string)

func (LeaderTask) SetStatus

func (task LeaderTask) SetStatus(status Status)

func (LeaderTask) Shard

func (task LeaderTask) Shard() string

func (LeaderTask) Source

func (task LeaderTask) Source() Source

func (LeaderTask) Status

func (task LeaderTask) Status() Status

func (LeaderTask) Step

func (task LeaderTask) Step() int

func (LeaderTask) StepUp

func (task LeaderTask) StepUp() int

func (*LeaderTask) String

func (task *LeaderTask) String() string

func (LeaderTask) Wait

func (task LeaderTask) Wait() error

type MockScheduler

type MockScheduler struct {
	mock.Mock
}

MockScheduler is an autogenerated mock type for the Scheduler type

func NewMockScheduler

func NewMockScheduler(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockScheduler

NewMockScheduler creates a new instance of MockScheduler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockScheduler) Add

func (_m *MockScheduler) Add(task Task) error

Add provides a mock function with given fields: task

func (*MockScheduler) AddExecutor

func (_m *MockScheduler) AddExecutor(nodeID int64)

AddExecutor provides a mock function with given fields: nodeID

func (*MockScheduler) Dispatch

func (_m *MockScheduler) Dispatch(node int64)

Dispatch provides a mock function with given fields: node

func (*MockScheduler) EXPECT

func (_m *MockScheduler) EXPECT() *MockScheduler_Expecter

func (*MockScheduler) GetChannelTaskDelta

func (_m *MockScheduler) GetChannelTaskDelta(nodeID int64, collectionID int64) int

GetChannelTaskDelta provides a mock function with given fields: nodeID, collectionID

func (*MockScheduler) GetChannelTaskNum

func (_m *MockScheduler) GetChannelTaskNum(filters ...TaskFilter) int

GetChannelTaskNum provides a mock function with given fields: filters

func (*MockScheduler) GetExecutedFlag

func (_m *MockScheduler) GetExecutedFlag(nodeID int64) <-chan struct{}

GetExecutedFlag provides a mock function with given fields: nodeID

func (*MockScheduler) GetSegmentTaskDelta

func (_m *MockScheduler) GetSegmentTaskDelta(nodeID int64, collectionID int64) int

GetSegmentTaskDelta provides a mock function with given fields: nodeID, collectionID

func (*MockScheduler) GetSegmentTaskNum

func (_m *MockScheduler) GetSegmentTaskNum(filters ...TaskFilter) int

GetSegmentTaskNum provides a mock function with given fields: filters

func (*MockScheduler) GetTasksJSON

func (_m *MockScheduler) GetTasksJSON() string

GetTasksJSON provides a mock function with given fields:

func (*MockScheduler) RemoveByNode

func (_m *MockScheduler) RemoveByNode(node int64)

RemoveByNode provides a mock function with given fields: node

func (*MockScheduler) RemoveExecutor

func (_m *MockScheduler) RemoveExecutor(nodeID int64)

RemoveExecutor provides a mock function with given fields: nodeID

func (*MockScheduler) Start

func (_m *MockScheduler) Start()

Start provides a mock function with given fields:

func (*MockScheduler) Stop

func (_m *MockScheduler) Stop()

Stop provides a mock function with given fields:

type MockScheduler_AddExecutor_Call

type MockScheduler_AddExecutor_Call struct {
	*mock.Call
}

MockScheduler_AddExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddExecutor'

func (*MockScheduler_AddExecutor_Call) Return

func (*MockScheduler_AddExecutor_Call) Run

func (*MockScheduler_AddExecutor_Call) RunAndReturn

type MockScheduler_Add_Call

type MockScheduler_Add_Call struct {
	*mock.Call
}

MockScheduler_Add_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Add'

func (*MockScheduler_Add_Call) Return

func (*MockScheduler_Add_Call) Run

func (_c *MockScheduler_Add_Call) Run(run func(task Task)) *MockScheduler_Add_Call

func (*MockScheduler_Add_Call) RunAndReturn

func (_c *MockScheduler_Add_Call) RunAndReturn(run func(Task) error) *MockScheduler_Add_Call

type MockScheduler_Dispatch_Call

type MockScheduler_Dispatch_Call struct {
	*mock.Call
}

MockScheduler_Dispatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Dispatch'

func (*MockScheduler_Dispatch_Call) Return

func (*MockScheduler_Dispatch_Call) Run

func (*MockScheduler_Dispatch_Call) RunAndReturn

func (_c *MockScheduler_Dispatch_Call) RunAndReturn(run func(int64)) *MockScheduler_Dispatch_Call

type MockScheduler_Expecter

type MockScheduler_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockScheduler_Expecter) Add

func (_e *MockScheduler_Expecter) Add(task interface{}) *MockScheduler_Add_Call

Add is a helper method to define mock.On call

  • task Task

func (*MockScheduler_Expecter) AddExecutor

func (_e *MockScheduler_Expecter) AddExecutor(nodeID interface{}) *MockScheduler_AddExecutor_Call

AddExecutor is a helper method to define mock.On call

  • nodeID int64

func (*MockScheduler_Expecter) Dispatch

func (_e *MockScheduler_Expecter) Dispatch(node interface{}) *MockScheduler_Dispatch_Call

Dispatch is a helper method to define mock.On call

  • node int64

func (*MockScheduler_Expecter) GetChannelTaskDelta

func (_e *MockScheduler_Expecter) GetChannelTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetChannelTaskDelta_Call

GetChannelTaskDelta is a helper method to define mock.On call

  • nodeID int64
  • collectionID int64

func (*MockScheduler_Expecter) GetChannelTaskNum

func (_e *MockScheduler_Expecter) GetChannelTaskNum(filters ...interface{}) *MockScheduler_GetChannelTaskNum_Call

GetChannelTaskNum is a helper method to define mock.On call

  • filters ...TaskFilter

func (*MockScheduler_Expecter) GetExecutedFlag

func (_e *MockScheduler_Expecter) GetExecutedFlag(nodeID interface{}) *MockScheduler_GetExecutedFlag_Call

GetExecutedFlag is a helper method to define mock.On call

  • nodeID int64

func (*MockScheduler_Expecter) GetSegmentTaskDelta

func (_e *MockScheduler_Expecter) GetSegmentTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetSegmentTaskDelta_Call

GetSegmentTaskDelta is a helper method to define mock.On call

  • nodeID int64
  • collectionID int64

func (*MockScheduler_Expecter) GetSegmentTaskNum

func (_e *MockScheduler_Expecter) GetSegmentTaskNum(filters ...interface{}) *MockScheduler_GetSegmentTaskNum_Call

GetSegmentTaskNum is a helper method to define mock.On call

  • filters ...TaskFilter

func (*MockScheduler_Expecter) GetTasksJSON

GetTasksJSON is a helper method to define mock.On call

func (*MockScheduler_Expecter) RemoveByNode

func (_e *MockScheduler_Expecter) RemoveByNode(node interface{}) *MockScheduler_RemoveByNode_Call

RemoveByNode is a helper method to define mock.On call

  • node int64

func (*MockScheduler_Expecter) RemoveExecutor

func (_e *MockScheduler_Expecter) RemoveExecutor(nodeID interface{}) *MockScheduler_RemoveExecutor_Call

RemoveExecutor is a helper method to define mock.On call

  • nodeID int64

func (*MockScheduler_Expecter) Start

Start is a helper method to define mock.On call

func (*MockScheduler_Expecter) Stop

Stop is a helper method to define mock.On call

type MockScheduler_GetChannelTaskDelta_Call

type MockScheduler_GetChannelTaskDelta_Call struct {
	*mock.Call
}

MockScheduler_GetChannelTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelTaskDelta'

func (*MockScheduler_GetChannelTaskDelta_Call) Return

func (*MockScheduler_GetChannelTaskDelta_Call) Run

func (*MockScheduler_GetChannelTaskDelta_Call) RunAndReturn

type MockScheduler_GetChannelTaskNum_Call

type MockScheduler_GetChannelTaskNum_Call struct {
	*mock.Call
}

MockScheduler_GetChannelTaskNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelTaskNum'

func (*MockScheduler_GetChannelTaskNum_Call) Return

func (*MockScheduler_GetChannelTaskNum_Call) Run

func (*MockScheduler_GetChannelTaskNum_Call) RunAndReturn

type MockScheduler_GetExecutedFlag_Call

type MockScheduler_GetExecutedFlag_Call struct {
	*mock.Call
}

MockScheduler_GetExecutedFlag_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetExecutedFlag'

func (*MockScheduler_GetExecutedFlag_Call) Return

func (*MockScheduler_GetExecutedFlag_Call) Run

func (*MockScheduler_GetExecutedFlag_Call) RunAndReturn

func (_c *MockScheduler_GetExecutedFlag_Call) RunAndReturn(run func(int64) <-chan struct{}) *MockScheduler_GetExecutedFlag_Call

type MockScheduler_GetSegmentTaskDelta_Call

type MockScheduler_GetSegmentTaskDelta_Call struct {
	*mock.Call
}

MockScheduler_GetSegmentTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentTaskDelta'

func (*MockScheduler_GetSegmentTaskDelta_Call) Return

func (*MockScheduler_GetSegmentTaskDelta_Call) Run

func (*MockScheduler_GetSegmentTaskDelta_Call) RunAndReturn

type MockScheduler_GetSegmentTaskNum_Call

type MockScheduler_GetSegmentTaskNum_Call struct {
	*mock.Call
}

MockScheduler_GetSegmentTaskNum_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentTaskNum'

func (*MockScheduler_GetSegmentTaskNum_Call) Return

func (*MockScheduler_GetSegmentTaskNum_Call) Run

func (*MockScheduler_GetSegmentTaskNum_Call) RunAndReturn

type MockScheduler_GetTasksJSON_Call

type MockScheduler_GetTasksJSON_Call struct {
	*mock.Call
}

MockScheduler_GetTasksJSON_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTasksJSON'

func (*MockScheduler_GetTasksJSON_Call) Return

func (*MockScheduler_GetTasksJSON_Call) Run

func (*MockScheduler_GetTasksJSON_Call) RunAndReturn

type MockScheduler_RemoveByNode_Call

type MockScheduler_RemoveByNode_Call struct {
	*mock.Call
}

MockScheduler_RemoveByNode_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveByNode'

func (*MockScheduler_RemoveByNode_Call) Return

func (*MockScheduler_RemoveByNode_Call) Run

func (*MockScheduler_RemoveByNode_Call) RunAndReturn

type MockScheduler_RemoveExecutor_Call

type MockScheduler_RemoveExecutor_Call struct {
	*mock.Call
}

MockScheduler_RemoveExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveExecutor'

func (*MockScheduler_RemoveExecutor_Call) Return

func (*MockScheduler_RemoveExecutor_Call) Run

func (*MockScheduler_RemoveExecutor_Call) RunAndReturn

type MockScheduler_Start_Call

type MockScheduler_Start_Call struct {
	*mock.Call
}

MockScheduler_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'

func (*MockScheduler_Start_Call) Return

func (*MockScheduler_Start_Call) Run

func (*MockScheduler_Start_Call) RunAndReturn

func (_c *MockScheduler_Start_Call) RunAndReturn(run func()) *MockScheduler_Start_Call

type MockScheduler_Stop_Call

type MockScheduler_Stop_Call struct {
	*mock.Call
}

MockScheduler_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'

func (*MockScheduler_Stop_Call) Return

func (*MockScheduler_Stop_Call) Run

func (_c *MockScheduler_Stop_Call) Run(run func()) *MockScheduler_Stop_Call

func (*MockScheduler_Stop_Call) RunAndReturn

func (_c *MockScheduler_Stop_Call) RunAndReturn(run func()) *MockScheduler_Stop_Call

type Priority

type Priority int32
const (
	TaskPriorityLow    Priority = iota // for balance checker
	TaskPriorityNormal                 // for segment checker
	TaskPriorityHigh                   // for channel checker
)

func (Priority) String

func (p Priority) String() string

type Scheduler

type Scheduler interface {
	Start()
	Stop()
	AddExecutor(nodeID int64)
	RemoveExecutor(nodeID int64)
	Add(task Task) error
	Dispatch(node int64)
	RemoveByNode(node int64)
	GetExecutedFlag(nodeID int64) <-chan struct{}
	GetChannelTaskNum(filters ...TaskFilter) int
	GetSegmentTaskNum(filters ...TaskFilter) int
	GetTasksJSON() string

	GetSegmentTaskDelta(nodeID int64, collectionID int64) int
	GetChannelTaskDelta(nodeID int64, collectionID int64) int
}

type SegmentAction

type SegmentAction struct {
	*BaseAction

	SegmentID typeutil.UniqueID
	Scope     querypb.DataScope
	// contains filtered or unexported fields
}

func NewSegmentAction

func NewSegmentAction(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID) *SegmentAction

func NewSegmentActionWithScope

func NewSegmentActionWithScope(nodeID typeutil.UniqueID, typ ActionType, shard string, segmentID typeutil.UniqueID, scope querypb.DataScope) *SegmentAction

func (*SegmentAction) Desc

func (action *SegmentAction) Desc() string

func (*SegmentAction) GetScope

func (action *SegmentAction) GetScope() querypb.DataScope

func (*SegmentAction) GetSegmentID

func (action *SegmentAction) GetSegmentID() typeutil.UniqueID

func (*SegmentAction) IsFinished

func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool

func (*SegmentAction) String

func (action *SegmentAction) String() string

type SegmentTask

type SegmentTask struct {
	// contains filtered or unexported fields
}

func NewSegmentTask

func NewSegmentTask(ctx context.Context,
	timeout time.Duration,
	source Source,
	collectionID typeutil.UniqueID,
	replica *meta.Replica,
	actions ...Action,
) (*SegmentTask, error)

NewSegmentTask creates a SegmentTask with actions, all actions must process the same segment, empty actions is not allowed

func (SegmentTask) Actions

func (task SegmentTask) Actions() []Action

func (SegmentTask) Cancel

func (task SegmentTask) Cancel(err error)

func (SegmentTask) CollectionID

func (task SegmentTask) CollectionID() typeutil.UniqueID

func (SegmentTask) Context

func (task SegmentTask) Context() context.Context

func (SegmentTask) Err

func (task SegmentTask) Err() error

func (SegmentTask) Fail

func (task SegmentTask) Fail(err error)

func (SegmentTask) GetReason

func (task SegmentTask) GetReason() string

func (SegmentTask) GetTaskLatency

func (task SegmentTask) GetTaskLatency() int64

func (SegmentTask) ID

func (task SegmentTask) ID() typeutil.UniqueID

func (*SegmentTask) Index

func (task *SegmentTask) Index() string

func (SegmentTask) IsFinished

func (task SegmentTask) IsFinished(distMgr *meta.DistributionManager) bool

func (SegmentTask) LoadType

func (task SegmentTask) LoadType() querypb.LoadType

func (*SegmentTask) MarshalJSON

func (task *SegmentTask) MarshalJSON() ([]byte, error)

func (*SegmentTask) Name

func (task *SegmentTask) Name() string

func (SegmentTask) Priority

func (task SegmentTask) Priority() Priority

func (SegmentTask) RecordStartTs

func (task SegmentTask) RecordStartTs()

func (SegmentTask) ReplicaID

func (task SegmentTask) ReplicaID() typeutil.UniqueID

func (SegmentTask) ResourceGroup

func (task SegmentTask) ResourceGroup() string

func (*SegmentTask) SegmentID

func (task *SegmentTask) SegmentID() typeutil.UniqueID

func (SegmentTask) SetID

func (task SegmentTask) SetID(id typeutil.UniqueID)

func (SegmentTask) SetPriority

func (task SegmentTask) SetPriority(priority Priority)

func (SegmentTask) SetReason

func (task SegmentTask) SetReason(reason string)

func (SegmentTask) SetStatus

func (task SegmentTask) SetStatus(status Status)

func (SegmentTask) Shard

func (task SegmentTask) Shard() string

func (SegmentTask) Source

func (task SegmentTask) Source() Source

func (SegmentTask) Status

func (task SegmentTask) Status() Status

func (SegmentTask) Step

func (task SegmentTask) Step() int

func (SegmentTask) StepUp

func (task SegmentTask) StepUp() int

func (*SegmentTask) String

func (task *SegmentTask) String() string

func (SegmentTask) Wait

func (task SegmentTask) Wait() error

type Source

type Source fmt.Stringer

func WrapIDSource

func WrapIDSource(id int64) Source

type Status

type Status = string

type Task

type Task interface {
	Context() context.Context
	Source() Source
	ID() typeutil.UniqueID
	CollectionID() typeutil.UniqueID
	// Return 0 if the task is a reduce task without given replica.
	ReplicaID() typeutil.UniqueID
	// Return "" if the task is a reduce task without given replica.
	ResourceGroup() string
	Shard() string
	SetID(id typeutil.UniqueID)
	Status() Status
	SetStatus(status Status)
	Err() error
	Priority() Priority
	SetPriority(priority Priority)
	Index() string // dedup indexing string

	// cancel the task as we don't need to continue it
	Cancel(err error)
	// fail the task as we encounter some error so be unable to continue,
	// this error will be recorded for response to user requests
	Fail(err error)
	Wait() error
	Actions() []Action
	Step() int
	StepUp() int
	IsFinished(dist *meta.DistributionManager) bool
	SetReason(reason string)
	String() string

	// MarshalJSON marshal task info to json
	MarshalJSON() ([]byte, error)
	Name() string
	GetReason() string

	RecordStartTs()
	GetTaskLatency() int64
}

type TaskFilter

type TaskFilter func(task Task) bool

func WithCollectionID2TaskFilter

func WithCollectionID2TaskFilter(collectionID int64) TaskFilter

func WithTaskTypeFilter

func WithTaskTypeFilter(taskType Type) TaskFilter

type Type

type Type int32
const (
	TaskTypeGrow Type = iota + 1
	TaskTypeReduce
	TaskTypeMove
	TaskTypeUpdate
)

func GetTaskType

func GetTaskType(task Task) Type

GetTaskType returns the task's type, for now, only 3 types; - only 1 grow action -> Grow - only 1 reduce action -> Reduce - 1 grow action, and ends with 1 reduce action -> Move

func (Type) String

func (t Type) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL