Documentation ¶
Index ¶
- Constants
- Variables
- func CancelEventWithLog(event *fsm.Event, err error, msg string, fields ...zap.Field)
- func GetRequestFromEvent[T any](event *fsm.Event) (T, error)
- type DelayQueue
- type EtcdStorageImpl
- type Info
- type Manager
- type ManagerImpl
- type Meta
- type Priority
- type Procedure
- type RelatedVersionInfo
- type State
- type Storage
- type Typ
- type Write
Constants ¶
View Source
const ( StateInit = "init" StateRunning = "running" StateFinished = "finished" StateFailed = "failed" StateCancelled = "cancelled" )
View Source
const ( Version = "v1" PathProcedure = "procedure" PathDeletedProcedure = "deletedProcedure" )
Variables ¶
View Source
var ( ErrShardLeaderNotFound = coderr.NewCodeError(coderr.Internal, "shard leader not found") ErrProcedureNotFound = coderr.NewCodeError(coderr.Internal, "procedure not found") ErrClusterConfigChanged = coderr.NewCodeError(coderr.Internal, "cluster config changed") ErrTableNotExists = coderr.NewCodeError(coderr.Internal, "table not exists") ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists") ErrListRunningProcedure = coderr.NewCodeError(coderr.Internal, "procedure type not match") ErrListProcedure = coderr.NewCodeError(coderr.Internal, "list running procedure") ErrDecodeRawData = coderr.NewCodeError(coderr.Internal, "decode raw data") ErrEncodeRawData = coderr.NewCodeError(coderr.Internal, "encode raw data") ErrGetRequest = coderr.NewCodeError(coderr.Internal, "get request from event") ErrNodeNumberNotEnough = coderr.NewCodeError(coderr.Internal, "node number not enough") ErrEmptyPartitionNames = coderr.NewCodeError(coderr.Internal, "partition names is empty") ErrDropTableResult = coderr.NewCodeError(coderr.Internal, "length of shard not correct") ErrPickShard = coderr.NewCodeError(coderr.Internal, "pick shard failed") ErrSubmitProcedure = coderr.NewCodeError(coderr.Internal, "submit new procedure") ErrQueueFull = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data") ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure") ErrShardNumberNotEnough = coderr.NewCodeError(coderr.Internal, "shard number not enough") ErrEmptyBatchProcedure = coderr.NewCodeError(coderr.Internal, "procedure batch is empty") ErrMergeBatchProcedure = coderr.NewCodeError(coderr.Internal, "failed to merge procedures batch") )
Functions ¶
func CancelEventWithLog ¶ added in v1.0.0
CancelEventWithLog Cancel event when error is not nil. If error is nil, do nothing.
Types ¶
type DelayQueue ¶ added in v1.2.0
type DelayQueue struct {
// contains filtered or unexported fields
}
func NewProcedureDelayQueue ¶ added in v1.2.0
func NewProcedureDelayQueue(maxLen int) *DelayQueue
func (*DelayQueue) Len ¶ added in v1.2.0
func (q *DelayQueue) Len() int
func (*DelayQueue) Pop ¶ added in v1.2.0
func (q *DelayQueue) Pop() Procedure
type EtcdStorageImpl ¶
type EtcdStorageImpl struct {
// contains filtered or unexported fields
}
func (EtcdStorageImpl) CreateOrUpdate ¶
func (e EtcdStorageImpl) CreateOrUpdate(ctx context.Context, meta Meta) error
CreateOrUpdate example: /{rootPath}/v1/procedure/{procedureID} -> {procedureType} + {procedureState} + {data}
func (EtcdStorageImpl) MarkDeleted ¶
func (e EtcdStorageImpl) MarkDeleted(ctx context.Context, id uint64) error
MarkDeleted Do a soft deletion, and the deleted key's format is: /{rootPath}/v1/historyProcedure/{clusterID}/{procedureID}
type Manager ¶
type Manager interface { // Start must be called before manager is used. Start(ctx context.Context) error // Stop must be called before manager is dropped. Stop(ctx context.Context) error // Submit procedure to be executed asynchronously. // TODO: change result type, add channel to get whether the procedure executed successfully Submit(ctx context.Context, procedure Procedure) error // ListRunningProcedure return immutable procedures info. ListRunningProcedure(ctx context.Context) ([]*Info, error) }
func NewManagerImpl ¶
type ManagerImpl ¶
type ManagerImpl struct {
// contains filtered or unexported fields
}
func (*ManagerImpl) ListRunningProcedure ¶
func (m *ManagerImpl) ListRunningProcedure(_ context.Context) ([]*Info, error)
type Procedure ¶
type Procedure interface { // ID of the procedure. ID() uint64 // Typ of the procedure. Typ() Typ // Start the procedure. Start(ctx context.Context) error // Cancel the procedure. Cancel(ctx context.Context) error // State of the procedure. Retrieve the state of this procedure. State() State // RelatedVersionInfo return the related shard and version information corresponding to this procedure for verifying whether the procedure can be executed. RelatedVersionInfo() RelatedVersionInfo // Priority present the priority of this procedure, the procedure with high level priority will be executed first. Priority() Priority }
Procedure is used to describe how to execute a set of operations from the scheduler, e.g. SwitchLeaderProcedure, MergeShardProcedure.
type RelatedVersionInfo ¶ added in v1.2.0
type RelatedVersionInfo struct { ClusterID storage.ClusterID // shardWithVersion return the shardID associated with this procedure. ShardWithVersion map[storage.ShardID]uint64 // clusterVersion return the cluster version when the procedure is created. // When performing cluster operation, it is necessary to ensure cluster version consistency. ClusterVersion uint64 }
type Storage ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.