Documentation
¶
Overview ¶
Copyright 2020 MatrixOrigin.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func DecodeDataKey(key []byte) []byte
- func EncodeDataKey(group uint64, key []byte) []byte
- func GetMaxKey() []byte
- func GetMinKey() []byte
- func GetStoreIdentKey() []byte
- func NewResourceAdapterWithShard(meta bhmetapb.Shard) metadata.Resource
- func WriteGroupPrefix(group uint64, key []byte)
- type Router
- type ShardsPool
- type ShardsProxy
- type Store
- type TestClusterOption
- func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
- func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
- func WithMetadataStorageOption(metaOpts *cpebble.Options) TestClusterOption
- func WithTestClusterDataPath(path string) TestClusterOption
- func WithTestClusterDisableSchedule() TestClusterOption
- func WithTestClusterLogLevel(level string) TestClusterOption
- func WithTestClusterNodeCount(n int) TestClusterOption
- func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
- func WithTestClusterReadHandler(cmd uint64, value command.ReadCommandFunc) TestClusterOption
- func WithTestClusterRecreate(value bool) TestClusterOption
- func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
- func WithTestClusterUseDisk() TestClusterOption
- func WithTestClusterWriteHandler(cmd uint64, value command.WriteCommandFunc) TestClusterOption
- type TestKVClient
- type TestRaftCluster
Constants ¶
This section is empty.
Variables ¶
var ( // SingleTestCluster single test raft cluster SingleTestCluster = WithTestClusterNodeCount(1) // DiskTestCluster using pebble storage test raft store DiskTestCluster = WithTestClusterUseDisk() // DisableScheduleTestCluster disable prophet schedulers test raft store DisableScheduleTestCluster = WithTestClusterDisableSchedule() // NewTestCluster clean data before test cluster start NewTestCluster = WithTestClusterRecreate(true) // NoCleanTestCluster using exists data before test cluster start OldTestCluster = WithTestClusterRecreate(false) // SetCMDTestClusterHandler set cmd handler SetCMDTestClusterHandler = WithTestClusterWriteHandler(1, func(s bhmetapb.Shard, r *raftcmdpb.Request, c command.Context) (uint64, int64, *raftcmdpb.Response) { resp := pb.AcquireResponse() c.WriteBatch().Set(r.Key, r.Cmd) resp.Value = []byte("OK") changed := uint64(len(r.Key)) + uint64(len(r.Cmd)) return changed, int64(changed), resp }) // GetCMDTestClusterHandler get cmd handler GetCMDTestClusterHandler = WithTestClusterReadHandler(2, func(s bhmetapb.Shard, r *raftcmdpb.Request, c command.Context) (*raftcmdpb.Response, uint64) { resp := pb.AcquireResponse() value, err := c.DataStorage().(storage.KVStorage).Get(r.Key) if err != nil { panic("BUG: can not error") } resp.Value = value return resp, uint64(len(value)) }) )
var (
// DataPrefixSize data prefix size
DataPrefixSize = dataPrefixKeySize + 8
)
data is in (z, z+1)
var ( // ErrTimeout timeout error ErrTimeout = errors.New("exec timeout") )
var ( // RetryInterval retry interval RetryInterval = time.Second )
Functions ¶
func EncodeDataKey ¶
EncodeDataKey encode data key
func NewResourceAdapterWithShard ¶
NewResourceAdapterWithShard create a prophet resource use shard
func WriteGroupPrefix ¶
WriteGroupPrefix write group prefix
Types ¶
type Router ¶
type Router interface { // Start the router Start() error // SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End). // If returns leader address is "", means the current shard has no leader SelectShard(group uint64, key []byte) (uint64, string) // Every do with all shards Every(group uint64, mustLeader bool, fn func(shard *bhmetapb.Shard, store bhmetapb.Store)) // ForeachShards foreach shards ForeachShards(group uint64, fn func(shard *bhmetapb.Shard) bool) // LeaderStore return leader peer store LeaderPeerStore(shardID uint64) bhmetapb.Store // RandomPeerStore return random peer store RandomPeerStore(shardID uint64) bhmetapb.Store // GetShardStats returns the runtime stats info of the shard GetShardStats(id uint64) *metapb.ResourceStats // GetStoreStats returns the runtime stats info of the store GetStoreStats(id uint64) *metapb.ContainerStats // GetWatcher returns the prophet event watcher GetWatcher() prophet.Watcher }
Router route the request to the corresponding shard
type ShardsPool ¶
type ShardsPool interface { // Alloc alloc a shard from shards pool, returns error if no idle shards left. The `purpose` is used to avoid // duplicate allocation. Alloc(group uint64, purpose []byte) (bhmetapb.AllocatedShard, error) }
ShardsPool is a shards pool, it will always create shards until the number of available shards reaches the value specified by `capacity`, we called these `Idle Shards`.
The pool will create a Job in the prophet. Once a node became the prophet leader, shards pool job will start, and stop if the node became the follower, So the job can be executed on any node. It will use prophet client to create shard after the job starts.
type ShardsProxy ¶
type ShardsProxy interface { Dispatch(req *raftcmdpb.Request) error DispatchTo(req *raftcmdpb.Request, shard uint64, store string) error Router() Router }
ShardsProxy Shards proxy, distribute the appropriate request to the corresponding backend, retry the request for the error
func NewShardsProxy ¶
func NewShardsProxy(router Router, doneCB doneFunc, errorDoneCB errorDoneFunc) ShardsProxy
NewShardsProxy returns a shard proxy
func NewShardsProxyWithStore ¶
func NewShardsProxyWithStore(store Store, doneCB doneFunc, errorDoneCB errorDoneFunc, ) (ShardsProxy, error)
NewShardsProxyWithStore returns a shard proxy with a raftstore
type Store ¶
type Store interface { // Start the raft store Start() // Stop the raft store Stop() // GetConfig returns the config of the store GetConfig() *config.Config // Meta returns store meta Meta() bhmetapb.Store // GetRouter returns a router GetRouter() Router // RegisterReadFunc register read command handler RegisterReadFunc(uint64, command.ReadCommandFunc) // RegisterWriteFunc register write command handler RegisterWriteFunc(uint64, command.WriteCommandFunc) // RegisterLocalFunc register local command handler RegisterLocalFunc(uint64, command.LocalCommandFunc) // RegisterLocalRequestCB register local request cb to process response RegisterLocalRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response)) // RegisterRPCRequestCB register rpc request cb to process response RegisterRPCRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response)) // OnRequest receive a request, and call cb while the request is completed OnRequest(*raftcmdpb.Request) error // MetadataStorage returns a MetadataStorage of the shard group MetadataStorage() storage.MetadataStorage // DataStorage returns a DataStorage of the shard group DataStorageByGroup(uint64, uint64) storage.DataStorage // MaybeLeader returns the shard replica maybe leader MaybeLeader(uint64) bool // AllocID returns a uint64 id, panic if has a error MustAllocID() uint64 // Prophet return current prophet instance Prophet() prophet.Prophet // CreateRPCCliendSideCodec returns the rpc codec at client side CreateRPCCliendSideCodec() (codec.Encoder, codec.Decoder) // CreateResourcePool create resource pools, the resource pool will create shards, // and try to maintain the number of shards in the pool not less than the `capacity` // parameter. This is an idempotent operation. CreateResourcePool(...metapb.ResourcePool) (ShardsPool, error) // GetResourcePool returns `ShardsPool`, nil if `CreateResourcePool` not completed GetResourcePool() ShardsPool }
Store manage a set of raft group
type TestClusterOption ¶
type TestClusterOption func(*testClusterOptions)
TestClusterOption is the option for create TestCluster
func WithAppendTestClusterAdjustConfigFunc ¶
func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
WithAppendTestClusterAdjustConfigFunc adjust config
func WithDataStorageOption ¶
func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
WithDataStorageOption set options to create data storage
func WithMetadataStorageOption ¶
func WithMetadataStorageOption(metaOpts *cpebble.Options) TestClusterOption
WithMetadataStorageOption set options to create metadata storage
func WithTestClusterDataPath ¶
func WithTestClusterDataPath(path string) TestClusterOption
WithTestClusterDataPath set data data storage directory
func WithTestClusterDisableSchedule ¶
func WithTestClusterDisableSchedule() TestClusterOption
WithTestClusterDisableSchedule disable pd schedule
func WithTestClusterLogLevel ¶
func WithTestClusterLogLevel(level string) TestClusterOption
WithTestClusterLogLevel set raftstore log level
func WithTestClusterNodeCount ¶
func WithTestClusterNodeCount(n int) TestClusterOption
WithTestClusterNodeCount set node count of test cluster
func WithTestClusterNodeStartFunc ¶
func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
WithTestClusterNodeStartFunc custom node start func
func WithTestClusterReadHandler ¶
func WithTestClusterReadHandler(cmd uint64, value command.ReadCommandFunc) TestClusterOption
WithTestClusterReadHandler read handlers
func WithTestClusterRecreate ¶
func WithTestClusterRecreate(value bool) TestClusterOption
WithTestClusterRecreate if true, the test cluster will clean and recreate the data dir
func WithTestClusterStoreFactory ¶
func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
WithTestClusterStoreFactory custom create raftstore factory
func WithTestClusterUseDisk ¶
func WithTestClusterUseDisk() TestClusterOption
WithTestClusterUseDisk use disk storage for testing
func WithTestClusterWriteHandler ¶
func WithTestClusterWriteHandler(cmd uint64, value command.WriteCommandFunc) TestClusterOption
WithTestClusterWriteHandler write handlers
type TestKVClient ¶
type TestKVClient interface { // Set set key-value to the backend kv storage Set(key, value string, timeout time.Duration) error // Get returns the value of the specific key from backend kv storage Get(key string, timeout time.Duration) (string, error) // Close close the test client Close() }
TestKVClient is a kv client that uses `TestRaftCluster` as Backend's KV storage engine
type TestRaftCluster ¶
type TestRaftCluster interface { // EveryStore do fn at every store, it can be used to init some store register EveryStore(fn func(i int, store Store)) // GetStore returns the node store GetStore(node int) Store // GetWatcher returns event watcher of the node GetWatcher(node int) prophet.Watcher // Start start each node sequentially Start() // Stop stop each node sequentially Stop() // StartWithConcurrent after starting the first node, other nodes start concurrently StartWithConcurrent(bool) // Restart restart the cluster Restart() // RestartWithFunc restart the cluster, `beforeStartFunc` is called before starting RestartWithFunc(beforeStartFunc func()) // GetPRCount returns the number of replicas on the node GetPRCount(node int) int // GetShardByIndex returns the shard by `shardIndex`, `shardIndex` is the order in which // the shard is created on the node GetShardByIndex(node int, shardIndex int) bhmetapb.Shard // GetShardByID returns the shard from the node by shard id GetShardByID(node int, shardID uint64) bhmetapb.Shard // CheckShardCount check whether the number of shards on each node is correct CheckShardCount(countPerNode int) // CheckShardRange check whether the range field of the shard on each node is correct, // `shardIndex` is the order in which the shard is created on the node CheckShardRange(shardIndex int, start, end []byte) // WaitRemovedByShardID check whether the specific shard removed from every node until timeout WaitRemovedByShardID(shardID uint64, timeout time.Duration) // WaitLeadersByCount check that the number of leaders of the cluster reaches at least the specified value // until the timeout WaitLeadersByCount(count int, timeout time.Duration) // WaitShardByCount check that the number of shard of the cluster reaches at least the specified value // until the timeout WaitShardByCount(count int, timeout time.Duration) // WaitShardByCountPerNode check that the number of shard of each node reaches at least the specified value // until the timeout WaitShardByCountPerNode(count int, timeout time.Duration) // WaitShardSplitByCount check whether the count of shard split reaches a specific value until timeout WaitShardSplitByCount(id uint64, count int, timeout time.Duration) // WaitShardByCounts check whether the number of shards reaches a specific value until timeout WaitShardByCounts(counts []int, timeout time.Duration) // WaitShardStateChangedTo check whether the state of shard changes to the specific value until timeout WaitShardStateChangedTo(shardID uint64, to metapb.ResourceState, timeout time.Duration) // GetShardLeaderStore return the leader node of the shard GetShardLeaderStore(shardID uint64) Store // GetProphet returns the prophet instance GetProphet() prophet.Prophet // Set write key-value pairs to the `DataStorage` of the node Set(node int, key, value []byte) // CreateTestKVClient create and returns a kv client CreateTestKVClient(node int) TestKVClient }
TestRaftCluster is the test cluster is used to test starting N nodes in a process, and to provide the start and stop capabilities of a single node, which is used to test `raftstore` more easily.
func NewSingleTestClusterStore ¶
func NewSingleTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewSingleTestClusterStore create test cluster with 1 node
func NewTestClusterStore ¶
func NewTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewTestClusterStore create test cluster using options
Source Files
¶
- batch.go
- cmd.go
- codec_rpc.go
- errors.go
- keys.go
- metric.go
- peer_apply.go
- peer_apply_exec.go
- peer_event_loop.go
- peer_event_post_apply.go
- peer_event_proposal.go
- peer_event_raft_ready.go
- peer_job.go
- peer_replica.go
- peer_storage.go
- pool.go
- prophet_adapter.go
- proxy.go
- proxy_backend.go
- router.go
- rpc.go
- snap.go
- store.go
- store_bootstrap.go
- store_handler.go
- store_ready.go
- store_route_handler.go
- store_shards_pool.go
- testutil.go
- util.go