Documentation
¶
Overview ¶
Copyright 2020 MatrixOrigin.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func IsShardUnavailableErr(err error) bool
- func NewNewShardUnavailableErr(id uint64) error
- func NewResourceAdapterWithShard(meta Shard) metadata.Resource
- type Epoch
- type ErrTryAgain
- type FailureCallback
- type LogReader
- func (lr *LogReader) Append(entries []pb.Entry) error
- func (lr *LogReader) ApplySnapshot(snapshot pb.Snapshot) error
- func (lr *LogReader) Compact(index uint64) error
- func (lr *LogReader) CreateSnapshot(snapshot pb.Snapshot) error
- func (lr *LogReader) Entries(low uint64, high uint64, maxSize uint64) ([]pb.Entry, error)
- func (lr *LogReader) FirstIndex() (uint64, error)
- func (lr *LogReader) GetSnapshotRequested() bool
- func (lr *LogReader) InitialState() (pb.HardState, pb.ConfState, error)
- func (lr *LogReader) LastIndex() (uint64, error)
- func (lr *LogReader) SetConfState(cs pb.ConfState)
- func (lr *LogReader) SetRange(firstIndex uint64, length uint64)
- func (lr *LogReader) SetState(s pb.HardState)
- func (lr *LogReader) Snapshot() (pb.Snapshot, error)
- func (lr *LogReader) Term(index uint64) (uint64, error)
- type Replica
- type RetryController
- type Router
- type Shard
- type ShardUnavailableErr
- type ShardsPool
- type ShardsProxy
- type Store
- type SuccessCallback
- type TestClusterOption
- func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
- func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
- func WithEnableTestParallel() TestClusterOption
- func WithTestClusterDataPath(path string) TestClusterOption
- func WithTestClusterDisableSchedule() TestClusterOption
- func WithTestClusterEnableAdvertiseAddr() TestClusterOption
- func WithTestClusterLogLevel(level zapcore.Level) TestClusterOption
- func WithTestClusterNodeCount(n int) TestClusterOption
- func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
- func WithTestClusterRecreate(value bool) TestClusterOption
- func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
- func WithTestClusterUseDisk() TestClusterOption
- func WithTestClusterUseInitProphetCluster() TestClusterOption
- type TestDataBuilder
- type TestKVClient
- type TestRaftCluster
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidConfigChangeRequest = errors.New("invalid config change request") ErrRemoveVoter = errors.New("removing voter") ErrRemoveLeader = errors.New("removing leader") ErrPendingConfigChange = errors.New("pending config change") ErrDuplicatedRequest = errors.New("duplicated config change request") ErrLearnerOnlyChange = errors.New("learner only change") )
var ( ErrNotLearnerReplica = errors.New("not learner") ErrReplicaNotFound = errors.New("replica not found") ErrReplicaDuplicated = errors.New("replica duplicated") )
var ( // SingleTestCluster single test raft cluster SingleTestCluster = WithTestClusterNodeCount(1) // DiskTestCluster using pebble storage test raft store DiskTestCluster = WithTestClusterUseDisk() // DisableScheduleTestCluster disable prophet schedulers test raft store DisableScheduleTestCluster = WithTestClusterDisableSchedule() // NewTestCluster clean data before test cluster start NewTestCluster = WithTestClusterRecreate(true) // NoCleanTestCluster using exists data before test cluster start OldTestCluster = WithTestClusterRecreate(false) )
var (
ErrRemoveShardKeyRange = errors.New("failed to delete shard key range")
)
var ( // ErrTimeout timeout error ErrTimeout = errors.New("exec timeout") )
var ( // ErrUnknownReplica indicates that the replica is unknown. ErrUnknownReplica = errors.New("unknown replica") )
Functions ¶
func IsShardUnavailableErr ¶ added in v0.3.0
IsShardUnavailableErr is ShardUnavailableErr error
func NewNewShardUnavailableErr ¶ added in v0.3.0
NewShardUnavailableErr returns a wrapped error that the shard is unavailable
func NewResourceAdapterWithShard ¶
NewResourceAdapterWithShard create a prophet resource use shard
Types ¶
type Epoch ¶ added in v0.2.0
type Epoch = metapb.ResourceEpoch
type ErrTryAgain ¶ added in v0.3.0
ErrTryAgain indicates that an operation should retry later
func (*ErrTryAgain) Error ¶ added in v0.3.0
func (e *ErrTryAgain) Error() string
type FailureCallback ¶ added in v0.2.0
FailureCallback request failure callback
type LogReader ¶ added in v0.2.0
LogReader is the struct used to manage logs that have already been persisted into LogDB. LogReader implements the raft.Storage interface.
func NewLogReader ¶ added in v0.2.0
NewLogReader creates and returns a new LogReader instance.
func (*LogReader) Append ¶ added in v0.2.0
Append marks the specified entries as persisted and make them available from logreader.
func (*LogReader) ApplySnapshot ¶ added in v0.2.0
ApplySnapshot applies the specified snapshot.
func (*LogReader) CreateSnapshot ¶ added in v0.2.0
CreateSnapshot keeps the metadata of the specified snapshot.
func (*LogReader) Entries ¶ added in v0.2.0
Entries returns persisted entries between [low, high) with a total limit of up to maxSize bytes.
func (*LogReader) FirstIndex ¶ added in v0.2.0
func (*LogReader) GetSnapshotRequested ¶ added in v0.2.0
GetSnapshotRequested returns a boolean value indicating whether creating a new snapshot has been requested.
func (*LogReader) InitialState ¶ added in v0.2.0
InitialState returns the saved HardState and ConfState information.
func (*LogReader) SetConfState ¶ added in v0.2.0
func (*LogReader) SetRange ¶ added in v0.2.0
SetRange updates the LogReader to reflect what is available in it.
type RetryController ¶ added in v0.2.0
type RetryController interface { // Retry used to control retry if retryable error encountered. returns false means stop retry. Retry(requestID []byte) (rpc.Request, bool) }
RetryController retry controller
type Router ¶
type Router interface { // Start the router Start() error // Stop stops the router Stop() // SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End). // If returns leader address is "", means the current shard has no leader SelectShard(group uint64, key []byte) (Shard, string) // Every do with all shards Every(group uint64, mustLeader bool, fn func(shard Shard, store meta.Store) bool) // ForeachShards foreach shards ForeachShards(group uint64, fn func(shard Shard) bool) // GetShard returns the shard by shard id GetShard(id uint64) Shard // UpdateLeader update shard leader UpdateLeader(shardID uint64, leaderReplciaID uint64) // LeaderStore return leader replica store LeaderReplicaStore(shardID uint64) meta.Store // RandomReplicaStore return random replica store RandomReplicaStore(shardID uint64) meta.Store // GetShardStats returns the runtime stats info of the shard GetShardStats(id uint64) metapb.ResourceStats // GetStoreStats returns the runtime stats info of the store GetStoreStats(id uint64) metapb.ContainerStats }
Router route the request to the corresponding shard
type ShardUnavailableErr ¶ added in v0.3.0
type ShardUnavailableErr struct {
// contains filtered or unexported fields
}
ShardUnavailableErr is a error that the shard is unavailable
func (ShardUnavailableErr) Error ¶ added in v0.3.0
func (err ShardUnavailableErr) Error() string
String implement error interface
type ShardsPool ¶
type ShardsPool interface { // Alloc alloc a shard from shards pool, returns error if no idle shards left. The `purpose` is used to avoid // duplicate allocation. Alloc(group uint64, purpose []byte) (meta.AllocatedShard, error) }
ShardsPool is a shards pool, it will always create shards until the number of available shards reaches the value specified by `capacity`, we called these `Idle Shards`.
The pool will create a Job in the prophet. Once a node became the prophet leader, shards pool job will start, and stop if the node became the follower, So the job can be executed on any node. It will use prophet client to create shard after the job starts.
type ShardsProxy ¶
type ShardsProxy interface { Start() error Stop() error Dispatch(req rpc.Request) error DispatchTo(req rpc.Request, shard Shard, store string) error SetCallback(SuccessCallback, FailureCallback) SetRetryController(retryController RetryController) OnResponse(rpc.ResponseBatch) Router() Router }
ShardsProxy Shards proxy, distribute the appropriate request to the corresponding backend, retry the request for the error
type Store ¶
type Store interface { // Start the raft store Start() // Stop the raft store Stop() // GetConfig returns the config of the store GetConfig() *config.Config // Meta returns store meta Meta() meta.Store // GetRouter returns a router GetRouter() Router // GetShardsProxy get shards proxy to dispatch requests GetShardsProxy() ShardsProxy // OnRequest receive a request, and call cb while the request is completed OnRequest(rpc.Request) error // OnRequestWithCB receive a request, and call cb while the request is completed OnRequestWithCB(req rpc.Request, cb func(resp rpc.ResponseBatch)) error // DataStorage returns a DataStorage of the shard group DataStorageByGroup(uint64) storage.DataStorage // MaybeLeader returns the shard replica maybe leader MaybeLeader(uint64) bool // AllocID returns a uint64 id, panic if has a error MustAllocID() uint64 // Prophet return current prophet instance Prophet() prophet.Prophet // CreateResourcePool create resource pools, the resource pool will create shards, // and try to maintain the number of shards in the pool not less than the `capacity` // parameter. This is an idempotent operation. CreateResourcePool(...metapb.ResourcePool) (ShardsPool, error) // GetResourcePool returns `ShardsPool`, nil if `CreateResourcePool` not completed GetResourcePool() ShardsPool }
Store manage a set of raft group
type SuccessCallback ¶ added in v0.2.0
SuccessCallback request success callback
type TestClusterOption ¶
type TestClusterOption func(*testClusterOptions)
TestClusterOption is the option for create TestCluster
func WithAppendTestClusterAdjustConfigFunc ¶
func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
WithAppendTestClusterAdjustConfigFunc adjust config
func WithDataStorageOption ¶
func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
WithDataStorageOption set options to create data storage
func WithEnableTestParallel ¶ added in v0.2.0
func WithEnableTestParallel() TestClusterOption
WithEnableTestParallel enable parallel testing
func WithTestClusterDataPath ¶
func WithTestClusterDataPath(path string) TestClusterOption
WithTestClusterDataPath set data data storage directory
func WithTestClusterDisableSchedule ¶
func WithTestClusterDisableSchedule() TestClusterOption
WithTestClusterDisableSchedule disable pd schedule
func WithTestClusterEnableAdvertiseAddr ¶ added in v0.2.0
func WithTestClusterEnableAdvertiseAddr() TestClusterOption
WithTestClusterEnableAdvertiseAddr set data data storage directory
func WithTestClusterLogLevel ¶
func WithTestClusterLogLevel(level zapcore.Level) TestClusterOption
WithTestClusterLogLevel set raftstore log level
func WithTestClusterNodeCount ¶
func WithTestClusterNodeCount(n int) TestClusterOption
WithTestClusterNodeCount set node count of test cluster
func WithTestClusterNodeStartFunc ¶
func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
WithTestClusterNodeStartFunc custom node start func
func WithTestClusterRecreate ¶
func WithTestClusterRecreate(value bool) TestClusterOption
WithTestClusterRecreate if true, the test cluster will clean and recreate the data dir
func WithTestClusterStoreFactory ¶
func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
WithTestClusterStoreFactory custom create raftstore factory
func WithTestClusterUseDisk ¶
func WithTestClusterUseDisk() TestClusterOption
WithTestClusterUseDisk use disk storage for testing
func WithTestClusterUseInitProphetCluster ¶ added in v0.3.0
func WithTestClusterUseInitProphetCluster() TestClusterOption
WithTestClusterUseInitProphetCluster set using init prophet cluster config
type TestDataBuilder ¶ added in v0.2.0
type TestDataBuilder struct { }
TestDataBuilder build test data
func NewTestDataBuilder ¶ added in v0.2.0
func NewTestDataBuilder() *TestDataBuilder
NewTestDataBuilder create and return TestDataBuilder
func (*TestDataBuilder) CreateShard ¶ added in v0.2.0
func (b *TestDataBuilder) CreateShard(id uint64, replicasFormater string) Shard
CreateShard create shard for testing. format: id: id range: [id, id+1) replicasFormat: Voter Format: pid/cid
Learner Format: pid/cid/[l|v], default v Initial Member: pid/cid/l/[t|f], default f Use ',' to split multi-replica. First is current replica
type TestKVClient ¶
type TestKVClient interface { // Set set key-value to the backend kv storage Set(key, value string, timeout time.Duration) error // Get returns the value of the specific key from backend kv storage Get(key string, timeout time.Duration) (string, error) // Set set key-value to the backend kv storage SetWithShard(key, value string, id uint64, timeout time.Duration) error // Get returns the value of the specific key from backend kv storage GetWithShard(key string, id uint64, timeout time.Duration) (string, error) // UpdateLabel update the shard label UpdateLabel(shard, group uint64, key, value string, timeout time.Duration) error // Close close the test client Close() }
TestKVClient is a kv client that uses `TestRaftCluster` as Backend's KV storage engine
type TestRaftCluster ¶
type TestRaftCluster interface { // EveryStore do fn at every store, it can be used to init some store register EveryStore(fn func(i int, store Store)) // GetStore returns the node store GetStore(node int) Store // GetStoreByID returns the store GetStoreByID(id uint64) Store // Start start each node sequentially Start() // Stop stop each node sequentially Stop() // StartWithConcurrent after starting the first node, other nodes start concurrently StartWithConcurrent(bool) // Restart restart the cluster Restart() // RestartWithFunc restart the cluster, `beforeStartFunc` is called before starting RestartWithFunc(beforeStartFunc func()) // StartNode start the node StartNode(node int) // StopNode stop the node StopNode(node int) // RestartNode restart the node RestartNode(node int) // StartNetworkPartition node will in network partition, must call after node started StartNetworkPartition(partitions [][]int) // StopNetworkPartition stop network partition StopNetworkPartition() // GetPRCount returns the number of replicas on the node GetPRCount(node int) int // GetShardByIndex returns the shard by `shardIndex`, `shardIndex` is the order in which // the shard is created on the node GetShardByIndex(node int, shardIndex int) Shard // GetShardByID returns the shard from the node by shard id GetShardByID(node int, shardID uint64) Shard // CheckShardCount check whether the number of shards on each node is correct CheckShardCount(countPerNode int) // CheckShardRange check whether the range field of the shard on each node is correct, // `shardIndex` is the order in which the shard is created on the node CheckShardRange(shardIndex int, start, end []byte) // WaitRemovedByShardID check whether the specific shard removed from every node until timeout WaitRemovedByShardID(shardID uint64, timeout time.Duration) // WaitRemovedByShardIDAt check whether the specific shard removed from specific node until timeout WaitRemovedByShardIDAt(shardID uint64, nodes []int, timeout time.Duration) // WaitLeadersByCount check that the number of leaders of the cluster reaches at least the specified value // until the timeout WaitLeadersByCount(count int, timeout time.Duration) // WaitLeadersByCountsAndShardGroupAndLabel check that the number of leaders of the cluster reaches at least the specified value // until the timeout WaitLeadersByCountsAndShardGroupAndLabel(counts []int, group uint64, key, value string, timeout time.Duration) // WaitShardByCount check that the number of shard of the cluster reaches at least the specified value // until the timeout WaitShardByCount(count int, timeout time.Duration) // WaitShardByLabel check that the shard has the specified label until the timeout WaitShardByLabel(id uint64, label, value string, timeout time.Duration) // WaitVoterReplicaByCount check that the number of voter shard of the cluster reaches at least the specified value // until the timeout WaitVoterReplicaByCountPerNode(count int, timeout time.Duration) // WaitVoterReplicaByCounts check that the number of voter shard of the cluster reaches at least the specified value // until the timeout WaitVoterReplicaByCounts(counts []int, timeout time.Duration) // WaitVoterReplicaByCountsAndShardGroup check that the number of voter shard of the cluster reaches at least the specified value // until the timeout WaitVoterReplicaByCountsAndShardGroup(counts []int, shardGroup uint64, timeout time.Duration) // WaitVoterReplicaByCountsAndShardGroupAndLabel check that the number of voter shard of the cluster reaches at least the specified value // until the timeout WaitVoterReplicaByCountsAndShardGroupAndLabel(counts []int, shardGroup uint64, label, value string, timeout time.Duration) // WaitShardByCountPerNode check that the number of shard of each node reaches at least the specified value // until the timeout WaitShardByCountPerNode(count int, timeout time.Duration) // WaitAllReplicasChangeToVoter check that the role of shard of each node change to voter until the timeout WaitAllReplicasChangeToVoter(shard uint64, timeout time.Duration) // WaitShardByCountOnNode check that the number of shard of the specified node reaches at least the specified value // until the timeout WaitShardByCountOnNode(node, count int, timeout time.Duration) // WaitShardSplitByCount check whether the count of shard split reaches a specific value until timeout WaitShardSplitByCount(id uint64, count int, timeout time.Duration) // WaitShardByCounts check whether the number of shards reaches a specific value until timeout WaitShardByCounts(counts []int, timeout time.Duration) // WaitShardStateChangedTo check whether the state of shard changes to the specific value until timeout WaitShardStateChangedTo(shardID uint64, to metapb.ResourceState, timeout time.Duration) // GetShardLeaderStore return the leader node of the shard GetShardLeaderStore(shardID uint64) Store // GetProphet returns the prophet instance GetProphet() prophet.Prophet // CreateTestKVClient create and returns a kv client CreateTestKVClient(node int) TestKVClient // CreateTestKVClientWithAdjust create and returns a kv client with adjust func to modify request CreateTestKVClientWithAdjust(node int, adjust func(req *rpc.Request)) TestKVClient }
TestRaftCluster is the test cluster is used to test starting N nodes in a process, and to provide the start and stop capabilities of a single node, which is used to test `raftstore` more easily.
func NewSingleTestClusterStore ¶
func NewSingleTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewSingleTestClusterStore create test cluster with 1 node
func NewTestClusterStore ¶
func NewTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewTestClusterStore create test cluster using options
Source Files
¶
- batch.go
- codec_rpc.go
- errors.go
- execute_context.go
- group_controller.go
- log_reader.go
- metric.go
- pending_proposal.go
- prophet_adapter.go
- proposal_batch.go
- proxy.go
- proxy_backend.go
- proxy_rpc.go
- read_index_queue.go
- replica.go
- replica_apply_result.go
- replica_create.go
- replica_destroy.go
- replica_destroy_task.go
- replica_event_loop.go
- replica_event_proposal.go
- replica_event_raft_ready.go
- replica_snapshot.go
- replica_split.go
- replica_state_machine.go
- replica_state_machine_exec.go
- replica_stats.go
- resp.go
- router.go
- snapshotter.go
- split_checker.go
- store.go
- store_bootstrap.go
- store_debug.go
- store_handler.go
- store_route_handler.go
- store_shards_pool.go
- store_timer_task.go
- testutil.go
- type.go
- util.go
- vacuum_cleaner.go
- worker_pool.go