procedure

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StateInit      = "init"
	StateRunning   = "running"
	StateFinished  = "finished"
	StateFailed    = "failed"
	StateCancelled = "cancelled"
)
View Source
const (
	Version              = "v1"
	PathProcedure        = "procedure"
	PathDeletedProcedure = "deletedProcedure"
)

Variables

View Source
var (
	ErrShardLeaderNotFound     = coderr.NewCodeError(coderr.Internal, "shard leader not found")
	ErrProcedureNotFound       = coderr.NewCodeError(coderr.Internal, "procedure not found")
	ErrClusterConfigChanged    = coderr.NewCodeError(coderr.Internal, "cluster config changed")
	ErrTableNotExists          = coderr.NewCodeError(coderr.Internal, "table not exists")
	ErrTableAlreadyExists      = coderr.NewCodeError(coderr.Internal, "table already exists")
	ErrProcedureTypeNotMatch   = coderr.NewCodeError(coderr.Internal, "procedure type not match")
	ErrDecodeRawData           = coderr.NewCodeError(coderr.Internal, "decode raw data")
	ErrEncodeRawData           = coderr.NewCodeError(coderr.Internal, "encode raw data")
	ErrGetRequest              = coderr.NewCodeError(coderr.Internal, "get request from event")
	ErrNodeNumberNotEnough     = coderr.NewCodeError(coderr.Internal, "node number not enough")
	ErrEmptyPartitionNames     = coderr.NewCodeError(coderr.Internal, "partition names is empty")
	ErrDropTableResult         = coderr.NewCodeError(coderr.Internal, "length of shard not correct")
	ErrPickShard               = coderr.NewCodeError(coderr.Internal, "pick shard failed")
	ErrSubmitProcedure         = coderr.NewCodeError(coderr.Internal, "submit new procedure")
	ErrQueueFull               = coderr.NewCodeError(coderr.Internal, "queue is full, unable to offer more data")
	ErrPushDuplicatedProcedure = coderr.NewCodeError(coderr.Internal, "try to push duplicated procedure")
	ErrShardNumberNotEnough    = coderr.NewCodeError(coderr.Internal, "shard number not enough")
)

Functions

func CancelEventWithLog added in v1.0.0

func CancelEventWithLog(event *fsm.Event, err error, msg string, fields ...zap.Field)

CancelEventWithLog Cancel event when error is not nil. If error is nil, do nothing.

func GetRequestFromEvent added in v1.0.0

func GetRequestFromEvent[T any](event *fsm.Event) (T, error)

nolint

Types

type DelayQueue added in v1.2.0

type DelayQueue struct {
	// contains filtered or unexported fields
}

func NewProcedureDelayQueue added in v1.2.0

func NewProcedureDelayQueue(maxLen int) *DelayQueue

func (*DelayQueue) Len added in v1.2.0

func (q *DelayQueue) Len() int

func (*DelayQueue) Pop added in v1.2.0

func (q *DelayQueue) Pop() Procedure

func (*DelayQueue) Push added in v1.2.0

func (q *DelayQueue) Push(p Procedure, delay time.Duration) error

type EtcdStorageImpl

type EtcdStorageImpl struct {
	// contains filtered or unexported fields
}

func (EtcdStorageImpl) CreateOrUpdate

func (e EtcdStorageImpl) CreateOrUpdate(ctx context.Context, meta Meta) error

CreateOrUpdate example: /{rootPath}/v1/procedure/{procedureID} -> {procedureType} + {procedureState} + {data}

func (EtcdStorageImpl) List

func (e EtcdStorageImpl) List(ctx context.Context, batchSize int) ([]*Meta, error)

func (EtcdStorageImpl) MarkDeleted

func (e EtcdStorageImpl) MarkDeleted(ctx context.Context, id uint64) error

MarkDeleted Do a soft deletion, and the deleted key's format is: /{rootPath}/v1/historyProcedure/{clusterID}/{procedureID}

type Info

type Info struct {
	ID    uint64
	Typ   Typ
	State State
}

Info is used to provide immutable description procedure information.

type Manager

type Manager interface {
	// Start must be called before manager is used.
	Start(ctx context.Context) error
	// Stop must be called before manager is dropped.
	Stop(ctx context.Context) error

	// Submit procedure to be executed asynchronously.
	// TODO: change result type, add channel to get whether the procedure executed successfully
	Submit(ctx context.Context, procedure Procedure) error
	// ListRunningProcedure return immutable procedures info.
	ListRunningProcedure(ctx context.Context) ([]*Info, error)
}

func NewManagerImpl

func NewManagerImpl(logger *zap.Logger, metadata *metadata.ClusterMetadata) (Manager, error)

type ManagerImpl

type ManagerImpl struct {
	// contains filtered or unexported fields
}

func (*ManagerImpl) ListRunningProcedure

func (m *ManagerImpl) ListRunningProcedure(_ context.Context) ([]*Info, error)

func (*ManagerImpl) Start

func (m *ManagerImpl) Start(ctx context.Context) error

func (*ManagerImpl) Stop

func (m *ManagerImpl) Stop(ctx context.Context) error

func (*ManagerImpl) Submit

func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error

type Meta

type Meta struct {
	ID      uint64
	Typ     Typ
	State   State
	RawData []byte
}

type Priority added in v1.2.0

type Priority uint32
const (
	PriorityHigh Priority = 3
	PriorityMed  Priority = 5
	PriorityLow  Priority = 10
)

Lower value means higher priority.

type Procedure

type Procedure interface {
	// ID of the procedure.
	ID() uint64

	// Typ of the procedure.
	Typ() Typ

	// Start the procedure.
	Start(ctx context.Context) error

	// Cancel the procedure.
	Cancel(ctx context.Context) error

	// State of the procedure. Retrieve the state of this procedure.
	State() State

	// RelatedVersionInfo return the related shard and version information corresponding to this procedure for verifying whether the procedure can be executed.
	RelatedVersionInfo() RelatedVersionInfo

	// Priority present the priority of this procedure, the procedure with high level priority will be executed first.
	Priority() Priority
}

Procedure is used to describe how to execute a set of operations from the scheduler, e.g. SwitchLeaderProcedure, MergeShardProcedure.

type RelatedVersionInfo added in v1.2.0

type RelatedVersionInfo struct {
	ClusterID storage.ClusterID
	// shardWithVersion return the shardID associated with this procedure.
	ShardWithVersion map[storage.ShardID]uint64
	// clusterVersion return the cluster version when the procedure is created.
	// When performing cluster operation, it is necessary to ensure cluster version consistency.
	ClusterVersion uint64
}

type State

type State string

type Storage

type Storage interface {
	Write
	List(ctx context.Context, batchSize int) ([]*Meta, error)
	MarkDeleted(ctx context.Context, id uint64) error
}

func NewEtcdStorageImpl

func NewEtcdStorageImpl(client *clientv3.Client, rootPath string) Storage

type Typ

type Typ uint
const (
	// Cluster Operation
	Create Typ = iota
	Delete
	TransferLeader
	Migrate
	Split
	Merge
	Scatter

	// DDL
	CreateTable
	DropTable
	CreatePartitionTable
	DropPartitionTable
)

type Write

type Write interface {
	CreateOrUpdate(ctx context.Context, meta Meta) error
}

Directories

Path Synopsis
ddl
operation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL