
v0.1.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2021 License: Apache-2.0 Imports: 58 Imported by: 2



Copyright 2020 MatrixOrigin.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at


Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.



This section is empty.


View Source
var (
	// SingleTestCluster single test raft cluster
	SingleTestCluster = WithTestClusterNodeCount(1)
	// DiskTestCluster using pebble storage test raft store
	DiskTestCluster = WithTestClusterUseDisk()
	// DisableScheduleTestCluster disable prophet schedulers  test raft store
	DisableScheduleTestCluster = WithTestClusterDisableSchedule()
	// NewTestCluster clean data before test cluster start
	NewTestCluster = WithTestClusterRecreate(true)
	// NoCleanTestCluster using exists data before test cluster start
	OldTestCluster = WithTestClusterRecreate(false)
	// SetCMDTestClusterHandler set cmd handler
	SetCMDTestClusterHandler = WithTestClusterWriteHandler(1, func(s bhmetapb.Shard, r *raftcmdpb.Request, c command.Context) (uint64, int64, *raftcmdpb.Response) {
		resp := pb.AcquireResponse()
		c.WriteBatch().Set(r.Key, r.Cmd)
		resp.Value = []byte("OK")
		changed := uint64(len(r.Key)) + uint64(len(r.Cmd))
		return changed, int64(changed), resp
	// GetCMDTestClusterHandler get cmd handler
	GetCMDTestClusterHandler = WithTestClusterReadHandler(2, func(s bhmetapb.Shard, r *raftcmdpb.Request, c command.Context) (*raftcmdpb.Response, uint64) {
		resp := pb.AcquireResponse()
		value, err := c.DataStorage().(storage.KVStorage).Get(r.Key)
		if err != nil {
			panic("BUG: can not error")
		resp.Value = value
		return resp, uint64(len(value))
View Source
var (

	// DataPrefixSize data prefix size
	DataPrefixSize = dataPrefixKeySize + 8

data is in (z, z+1)

View Source
var (
	// ErrTimeout timeout error
	ErrTimeout = errors.New("exec timeout")
View Source
var (
	// RetryInterval retry interval
	RetryInterval = time.Second


func DecodeDataKey

func DecodeDataKey(key []byte) []byte

DecodeDataKey decode data key

func EncodeDataKey

func EncodeDataKey(group uint64, key []byte) []byte

EncodeDataKey encode data key

func GetMaxKey

func GetMaxKey() []byte

GetMaxKey return max key

func GetMinKey

func GetMinKey() []byte

GetMinKey return min key

func GetStoreIdentKey

func GetStoreIdentKey() []byte

GetStoreIdentKey return key of StoreIdent

func NewResourceAdapterWithShard

func NewResourceAdapterWithShard(meta bhmetapb.Shard) metadata.Resource

NewResourceAdapterWithShard create a prophet resource use shard

func WriteGroupPrefix

func WriteGroupPrefix(group uint64, key []byte)

WriteGroupPrefix write group prefix


type Router

type Router interface {
	// Start the router
	Start() error
	// SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End).
	// If returns leader address is "", means the current shard has no leader
	SelectShard(group uint64, key []byte) (uint64, string)
	// Every do with all shards
	Every(group uint64, mustLeader bool, fn func(shard *bhmetapb.Shard, store bhmetapb.Store))
	// ForeachShards foreach shards
	ForeachShards(group uint64, fn func(shard *bhmetapb.Shard) bool)

	// LeaderStore return leader peer store
	LeaderPeerStore(shardID uint64) bhmetapb.Store
	// RandomPeerStore return random peer store
	RandomPeerStore(shardID uint64) bhmetapb.Store

	// GetShardStats returns the runtime stats info of the shard
	GetShardStats(id uint64) *metapb.ResourceStats
	// GetStoreStats returns the runtime stats info of the store
	GetStoreStats(id uint64) *metapb.ContainerStats

	// GetWatcher returns the prophet event watcher
	GetWatcher() prophet.Watcher

Router route the request to the corresponding shard

type ShardsPool

type ShardsPool interface {
	// Alloc alloc a shard from shards pool, returns error if no idle shards left. The `purpose` is used to avoid
	// duplicate allocation.
	Alloc(group uint64, purpose []byte) (bhmetapb.AllocatedShard, error)

ShardsPool is a shards pool, it will always create shards until the number of available shards reaches the value specified by `capacity`, we called these `Idle Shards`.

The pool will create a Job in the prophet. Once a node became the prophet leader, shards pool job will start, and stop if the node became the follower, So the job can be executed on any node. It will use prophet client to create shard after the job starts.

type ShardsProxy

type ShardsProxy interface {
	Dispatch(req *raftcmdpb.Request) error
	DispatchTo(req *raftcmdpb.Request, shard uint64, store string) error
	Router() Router

ShardsProxy Shards proxy, distribute the appropriate request to the corresponding backend, retry the request for the error

func NewShardsProxy

func NewShardsProxy(router Router,
	doneCB doneFunc,
	errorDoneCB errorDoneFunc) ShardsProxy

NewShardsProxy returns a shard proxy

func NewShardsProxyWithStore

func NewShardsProxyWithStore(store Store,
	doneCB doneFunc,
	errorDoneCB errorDoneFunc,
) (ShardsProxy, error)

NewShardsProxyWithStore returns a shard proxy with a raftstore

type Store

type Store interface {
	// Start the raft store
	// Stop the raft store
	// GetConfig returns the config of the store
	GetConfig() *config.Config
	// Meta returns store meta
	Meta() bhmetapb.Store
	// GetRouter returns a router
	GetRouter() Router
	// RegisterReadFunc register read command handler
	RegisterReadFunc(uint64, command.ReadCommandFunc)
	// RegisterWriteFunc register write command handler
	RegisterWriteFunc(uint64, command.WriteCommandFunc)
	// RegisterLocalFunc register local command handler
	RegisterLocalFunc(uint64, command.LocalCommandFunc)
	// RegisterLocalRequestCB register local request cb to process response
	RegisterLocalRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response))
	// RegisterRPCRequestCB register rpc request cb to process response
	RegisterRPCRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response))
	// OnRequest receive a request, and call cb while the request is completed
	OnRequest(*raftcmdpb.Request) error
	// MetadataStorage returns a MetadataStorage of the shard group
	MetadataStorage() storage.MetadataStorage
	// DataStorage returns a DataStorage of the shard group
	DataStorageByGroup(uint64, uint64) storage.DataStorage
	// MaybeLeader returns the shard replica maybe leader
	MaybeLeader(uint64) bool
	// AllocID returns a uint64 id, panic if has a error
	MustAllocID() uint64
	// Prophet return current prophet instance
	Prophet() prophet.Prophet
	// CreateRPCCliendSideCodec returns the rpc codec at client side
	CreateRPCCliendSideCodec() (codec.Encoder, codec.Decoder)

	// CreateResourcePool create resource pools, the resource pool will create shards,
	// and try to maintain the number of shards in the pool not less than the `capacity`
	// parameter. This is an idempotent operation.
	CreateResourcePool(...metapb.ResourcePool) (ShardsPool, error)
	// GetResourcePool returns `ShardsPool`, nil if `CreateResourcePool` not completed
	GetResourcePool() ShardsPool

Store manage a set of raft group

func NewStore

func NewStore(cfg *config.Config) Store

NewStore returns a raft store

type TestClusterOption

type TestClusterOption func(*testClusterOptions)

TestClusterOption is the option for create TestCluster

func WithAppendTestClusterAdjustConfigFunc

func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption

WithAppendTestClusterAdjustConfigFunc adjust config

func WithDataStorageOption

func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption

WithDataStorageOption set options to create data storage

func WithMetadataStorageOption

func WithMetadataStorageOption(metaOpts *cpebble.Options) TestClusterOption

WithMetadataStorageOption set options to create metadata storage

func WithTestClusterDataPath

func WithTestClusterDataPath(path string) TestClusterOption

WithTestClusterDataPath set data data storage directory

func WithTestClusterDisableSchedule

func WithTestClusterDisableSchedule() TestClusterOption

WithTestClusterDisableSchedule disable pd schedule

func WithTestClusterLogLevel

func WithTestClusterLogLevel(level string) TestClusterOption

WithTestClusterLogLevel set raftstore log level

func WithTestClusterNodeCount

func WithTestClusterNodeCount(n int) TestClusterOption

WithTestClusterNodeCount set node count of test cluster

func WithTestClusterNodeStartFunc

func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption

WithTestClusterNodeStartFunc custom node start func

func WithTestClusterReadHandler

func WithTestClusterReadHandler(cmd uint64, value command.ReadCommandFunc) TestClusterOption

WithTestClusterReadHandler read handlers

func WithTestClusterRecreate

func WithTestClusterRecreate(value bool) TestClusterOption

WithTestClusterRecreate if true, the test cluster will clean and recreate the data dir

func WithTestClusterStoreFactory

func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption

WithTestClusterStoreFactory custom create raftstore factory

func WithTestClusterUseDisk

func WithTestClusterUseDisk() TestClusterOption

WithTestClusterUseDisk use disk storage for testing

func WithTestClusterWriteHandler

func WithTestClusterWriteHandler(cmd uint64, value command.WriteCommandFunc) TestClusterOption

WithTestClusterWriteHandler write handlers

type TestKVClient

type TestKVClient interface {
	// Set set key-value to the backend kv storage
	Set(key, value string, timeout time.Duration) error
	// Get returns the value of the specific key from backend kv storage
	Get(key string, timeout time.Duration) (string, error)
	// Close close the test client

TestKVClient is a kv client that uses `TestRaftCluster` as Backend's KV storage engine

type TestRaftCluster

type TestRaftCluster interface {
	// EveryStore do fn at every store, it can be used to init some store register
	EveryStore(fn func(i int, store Store))
	// GetStore returns the node store
	GetStore(node int) Store
	// GetWatcher returns event watcher of the node
	GetWatcher(node int) prophet.Watcher
	// Start start each node sequentially
	// Stop stop each node sequentially
	// StartWithConcurrent after starting the first node, other nodes start concurrently
	// Restart restart the cluster
	// RestartWithFunc restart the cluster, `beforeStartFunc` is called before starting
	RestartWithFunc(beforeStartFunc func())
	// GetPRCount returns the number of replicas on the node
	GetPRCount(node int) int
	// GetShardByIndex returns the shard by `shardIndex`, `shardIndex` is the order in which
	// the shard is created on the node
	GetShardByIndex(node int, shardIndex int) bhmetapb.Shard
	// GetShardByID returns the shard from the node by shard id
	GetShardByID(node int, shardID uint64) bhmetapb.Shard
	// CheckShardCount check whether the number of shards on each node is correct
	CheckShardCount(countPerNode int)
	// CheckShardRange check whether the range field of the shard on each node is correct,
	// `shardIndex` is the order in which the shard is created on the node
	CheckShardRange(shardIndex int, start, end []byte)
	// WaitRemovedByShardID check whether the specific shard removed from every node until timeout
	WaitRemovedByShardID(shardID uint64, timeout time.Duration)
	// WaitLeadersByCount check that the number of leaders of the cluster reaches at least the specified value
	// until the timeout
	WaitLeadersByCount(count int, timeout time.Duration)
	// WaitShardByCount check that the number of shard of the cluster reaches at least the specified value
	// until the timeout
	WaitShardByCount(count int, timeout time.Duration)
	// WaitShardByCountPerNode check that the number of shard of each node reaches at least the specified value
	// until the timeout
	WaitShardByCountPerNode(count int, timeout time.Duration)
	// WaitShardSplitByCount check whether the count of shard split reaches a specific value until timeout
	WaitShardSplitByCount(id uint64, count int, timeout time.Duration)
	// WaitShardByCounts check whether the number of shards reaches a specific value until timeout
	WaitShardByCounts(counts []int, timeout time.Duration)
	// WaitShardStateChangedTo check whether the state of shard changes to the specific value until timeout
	WaitShardStateChangedTo(shardID uint64, to metapb.ResourceState, timeout time.Duration)
	// GetShardLeaderStore return the leader node of the shard
	GetShardLeaderStore(shardID uint64) Store
	// GetProphet returns the prophet instance
	GetProphet() prophet.Prophet
	// Set write key-value pairs to the `DataStorage` of the node
	Set(node int, key, value []byte)

	// CreateTestKVClient create and returns a kv client
	CreateTestKVClient(node int) TestKVClient

TestRaftCluster is the test cluster is used to test starting N nodes in a process, and to provide the start and stop capabilities of a single node, which is used to test `raftstore` more easily.

func NewSingleTestClusterStore

func NewSingleTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster

NewSingleTestClusterStore create test cluster with 1 node

func NewTestClusterStore

func NewTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster

NewTestClusterStore create test cluster using options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL