Documentation ¶
Index ¶
- Constants
- Variables
- func BuildCreateTableRequest(createTableResult cluster.CreateTableResult, ...) eventdispatch.CreateTableOnShardRequest
- func CancelEventWithLog(event *fsm.Event, err error, msg string, fields ...zap.Field)
- func CreateTableMetadata(ctx context.Context, c *cluster.Cluster, schemaName string, tableName string, ...) (cluster.CreateTableResult, error)
- func CreateTableOnShard(ctx context.Context, c *cluster.Cluster, dispatch eventdispatch.Dispatch, ...) error
- func GetRequestFromEvent[T any](event *fsm.Event) (T, error)
- func IsContains(slice []string, target string) bool
- func IsSubSlice(subSlice []string, slice []string) bool
- func Max(x, y int) int
- type EtcdStorageImpl
- type Info
- type Manager
- type ManagerImpl
- func (m *ManagerImpl) Cancel(ctx context.Context, procedureID uint64) error
- func (m *ManagerImpl) ListRunningProcedure(_ context.Context) ([]*Info, error)
- func (m *ManagerImpl) Start(ctx context.Context) error
- func (m *ManagerImpl) Stop(ctx context.Context) error
- func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error
- type Meta
- type Procedure
- type State
- type Storage
- type Typ
- type Write
Constants ¶
View Source
const ( StateInit = "init" StateRunning = "running" StateFinished = "finished" StateFailed = "failed" StateCancelled = "cancelled" )
View Source
const ( Version = "v1" PathProcedure = "procedure" PathDeletedProcedure = "deletedProcedure" )
Variables ¶
View Source
var ( ErrShardLeaderNotFound = coderr.NewCodeError(coderr.Internal, "shard leader not found") ErrProcedureNotFound = coderr.NewCodeError(coderr.Internal, "procedure not found") ErrClusterConfigChanged = coderr.NewCodeError(coderr.Internal, "cluster config changed") ErrTableNotExists = coderr.NewCodeError(coderr.Internal, "table not exists") ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists") ErrProcedureTypeNotMatch = coderr.NewCodeError(coderr.Internal, "procedure type not match") ErrDecodeRawData = coderr.NewCodeError(coderr.Internal, "decode raw data") ErrEncodeRawData = coderr.NewCodeError(coderr.Internal, "encode raw data") ErrGetRequest = coderr.NewCodeError(coderr.Internal, "get request from event") ErrNodeNumberNotEnough = coderr.NewCodeError(coderr.Internal, "node number not enough") ErrEmptyPartitionNames = coderr.NewCodeError(coderr.Internal, "partition names is empty") ErrDropTableResult = coderr.NewCodeError(coderr.Internal, "length of shard not correct") ErrPickShard = coderr.NewCodeError(coderr.Internal, "pick shard failed") )
Functions ¶
func BuildCreateTableRequest ¶ added in v1.0.0
func BuildCreateTableRequest(createTableResult cluster.CreateTableResult, req *metaservicepb.CreateTableRequest, partitionInfo *clusterpb.PartitionInfo) eventdispatch.CreateTableOnShardRequest
func CancelEventWithLog ¶ added in v1.0.0
CancelEventWithLog Cancel event when error is not nil. If error is nil, do nothing.
func CreateTableMetadata ¶ added in v1.0.0
func CreateTableOnShard ¶ added in v1.0.0
func CreateTableOnShard(ctx context.Context, c *cluster.Cluster, dispatch eventdispatch.Dispatch, shardID storage.ShardID, request eventdispatch.CreateTableOnShardRequest) error
func GetRequestFromEvent ¶ added in v1.0.0
nolint
func IsContains ¶ added in v1.0.0
func IsSubSlice ¶ added in v1.0.0
Types ¶
type EtcdStorageImpl ¶
type EtcdStorageImpl struct {
// contains filtered or unexported fields
}
func (EtcdStorageImpl) CreateOrUpdate ¶
func (e EtcdStorageImpl) CreateOrUpdate(ctx context.Context, meta Meta) error
CreateOrUpdate example: /{rootPath}/v1/procedure/{procedureID} -> {procedureType} + {procedureState} + {data}
func (EtcdStorageImpl) MarkDeleted ¶
func (e EtcdStorageImpl) MarkDeleted(ctx context.Context, id uint64) error
MarkDeleted Do a soft deletion, and the deleted key's format is: /{rootPath}/v1/historyProcedure/{clusterID}/{procedureID}
type Manager ¶
type Manager interface { // Start must be called before manager is used. Start(ctx context.Context) error // Stop must be called before manager is dropped. Stop(ctx context.Context) error // Submit procedure to be executed asynchronously. // TODO: change result type, add channel to get whether the procedure executed successfully Submit(ctx context.Context, procedure Procedure) error // Cancel procedure that has been submitted. Cancel(ctx context.Context, procedureID uint64) error ListRunningProcedure(ctx context.Context) ([]*Info, error) }
func NewManagerImpl ¶
type ManagerImpl ¶
type ManagerImpl struct {
// contains filtered or unexported fields
}
func (*ManagerImpl) Cancel ¶
func (m *ManagerImpl) Cancel(ctx context.Context, procedureID uint64) error
func (*ManagerImpl) ListRunningProcedure ¶
func (m *ManagerImpl) ListRunningProcedure(_ context.Context) ([]*Info, error)
type Procedure ¶
type Procedure interface { // ID of the procedure. ID() uint64 // Typ of the procedure. Typ() Typ // Start the procedure. Start(ctx context.Context) error // Cancel the procedure. Cancel(ctx context.Context) error // State of the procedure. Retrieve the state of this procedure. State() State }
Procedure is used to describe how to execute a set of operations from the scheduler, e.g. SwitchLeaderProcedure, MergeShardProcedure.
type Storage ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.