split

package
v1.1.0-beta.0...-bd07e09 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0, Apache-2.0 Imports: 50 Imported by: 0

Documentation

Overview

Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

Index

Constants

View Source
const (
	SplitRetryInterval    = 50 * time.Millisecond
	SplitMaxRetryInterval = 4 * time.Second

	// it takes 30 minutes to scatter regions when each TiKV has 400k regions
	ScatterWaitUpperInterval = 30 * time.Minute

	ScanRegionPaginationLimit = 128
)

Constants for split retry machinery.

Variables

View Source
var (
	ExpectPDCfgGeneratorsResult = map[string]any{
		"merge-schedule-limit":        0,
		"leader-schedule-limit":       float64(40),
		"region-schedule-limit":       float64(40),
		"max-snapshot-count":          float64(40),
		"enable-location-replacement": "false",
		"max-pending-peer-count":      uint64(math.MaxInt32),
	}

	ExistPDCfgGeneratorBefore = map[string]any{
		"merge-schedule-limit":        100,
		"leader-schedule-limit":       float64(100),
		"region-schedule-limit":       float64(100),
		"max-snapshot-count":          float64(100),
		"enable-location-replacement": "true",
		"max-pending-peer-count":      100,
	}
)
View Source
var (
	WaitRegionOnlineAttemptTimes = config.DefaultRegionCheckBackoffLimit
	SplitRetryTimes              = 150
)
View Source
var (
	ErrBackoff             = errors.New("found backoff error")
	ErrBackoffAndDontCount = errors.New("found backoff error but don't count")
)
View Source
var DefaultTestKeepaliveCfg = keepalive.ClientParameters{
	Time:    3 * time.Second,
	Timeout: 10 * time.Second,
}

Functions

func CheckRegionEpoch

func CheckRegionEpoch(_new, _old *RegionInfo) bool

CheckRegionEpoch check region epoch.

func PdErrorCanRetry

func PdErrorCanRetry(err error) bool

PdErrorCanRetry when pd error retry.

func SplitPoint

func SplitPoint(
	ctx context.Context,
	iter *SplitHelperIterator,
	client SplitClient,
	splitF splitFunc,
) (err error)

SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region

Types

type BackoffMayNotCountBackoffer

type BackoffMayNotCountBackoffer struct {
	// contains filtered or unexported fields
}

BackoffMayNotCountBackoffer is a backoffer but it may not increase the retry counter. It should be used with ErrBackoff or ErrBackoffAndDontCount. TODO: merge with backoff.go

func NewBackoffMayNotCountBackoffer

func NewBackoffMayNotCountBackoffer() *BackoffMayNotCountBackoffer

NewBackoffMayNotCountBackoffer creates a new backoffer that may backoff or retry.

TODO: currently it has the same usage as NewWaitRegionOnlineBackoffer so we don't expose its inner settings.

func (*BackoffMayNotCountBackoffer) NextBackoff

func (b *BackoffMayNotCountBackoffer) NextBackoff(err error) time.Duration

NextBackoff implements utils.BackoffStrategy. For BackoffMayNotCountBackoffer, only ErrBackoff and ErrBackoffAndDontCount is meaningful.

func (*BackoffMayNotCountBackoffer) RemainingAttempts

func (b *BackoffMayNotCountBackoffer) RemainingAttempts() int

RemainingAttempts implements utils.BackoffStrategy.

type BaseSplitStrategy

type BaseSplitStrategy struct {
	AccumulateCount int
	TableSplitter   map[int64]*SplitHelper
	Rules           map[int64]*restoreutils.RewriteRules
}

func NewBaseSplitStrategy

func NewBaseSplitStrategy(rules map[int64]*restoreutils.RewriteRules) *BaseSplitStrategy

func (*BaseSplitStrategy) GetAccumulations

func (b *BaseSplitStrategy) GetAccumulations() *SplitHelperIterator

func (*BaseSplitStrategy) ResetAccumulations

func (b *BaseSplitStrategy) ResetAccumulations()

type ClientOptionalParameter

type ClientOptionalParameter func(*pdClient)

func WithOnSplit

func WithOnSplit(onSplit func(key [][]byte)) ClientOptionalParameter

WithOnSplit sets a callback function to be called after each split.

func WithRawKV

func WithRawKV() ClientOptionalParameter

WithRawKV sets the client to use raw kv mode.

type ExponentialBackoffer

type ExponentialBackoffer struct {
	Attempts    int
	BaseBackoff time.Duration
}

ExponentialBackoffer trivially retry any errors it meets. It's useful when the caller has handled the errors but only want to a more semantic backoff implementation.

func (*ExponentialBackoffer) Attempt

func (b *ExponentialBackoffer) Attempt() int

Attempt returns the remain attempt times

func (*ExponentialBackoffer) NextBackoff

func (b *ExponentialBackoffer) NextBackoff(error) time.Duration

NextBackoff returns a duration to wait before retrying again.

type FakePDClient

type FakePDClient struct {
	pd.Client
	// contains filtered or unexported fields
}

func NewFakePDClient

func NewFakePDClient(stores []*metapb.Store, notLeader bool, retryTime *int) *FakePDClient

func (*FakePDClient) BatchScanRegions

func (fpdc *FakePDClient) BatchScanRegions(
	ctx context.Context,
	ranges []pd.KeyRange,
	limit int,
	opts ...pd.GetRegionOption,
) ([]*pd.Region, error)

func (*FakePDClient) GetAllStores

func (fpdc *FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error)

func (*FakePDClient) GetTS

func (fpdc *FakePDClient) GetTS(ctx context.Context) (int64, int64, error)

func (*FakePDClient) ScanRegions

func (fpdc *FakePDClient) ScanRegions(
	ctx context.Context,
	key, endKey []byte,
	limit int,
	opts ...pd.GetRegionOption,
) ([]*pd.Region, error)

func (*FakePDClient) SetRegions

func (fpdc *FakePDClient) SetRegions(regions []*pd.Region)

type FakePDHTTPClient

type FakePDHTTPClient struct {
	pdhttp.Client
	// contains filtered or unexported fields
}

func NewFakePDHTTPClient

func NewFakePDHTTPClient() *FakePDHTTPClient

func (*FakePDHTTPClient) DeletePlacementRule

func (fpdh *FakePDHTTPClient) DeletePlacementRule(_ context.Context, groupID string, ruleID string) error

func (*FakePDHTTPClient) GetConfig

func (fpdh *FakePDHTTPClient) GetConfig(_ context.Context) (map[string]any, error)

func (*FakePDHTTPClient) GetDelaySchedulers

func (fpdh *FakePDHTTPClient) GetDelaySchedulers() map[string]struct{}

func (*FakePDHTTPClient) GetPlacementRule

func (fpdh *FakePDHTTPClient) GetPlacementRule(_ context.Context, groupID string, ruleID string) (*pdhttp.Rule, error)

func (*FakePDHTTPClient) GetScheduleConfig

func (fpdh *FakePDHTTPClient) GetScheduleConfig(_ context.Context) (map[string]any, error)

func (*FakePDHTTPClient) GetSchedulers

func (fpdh *FakePDHTTPClient) GetSchedulers(_ context.Context) ([]string, error)

func (*FakePDHTTPClient) SetConfig

func (fpdh *FakePDHTTPClient) SetConfig(_ context.Context, config map[string]any, ttl ...float64) error

func (*FakePDHTTPClient) SetPlacementRule

func (fpdh *FakePDHTTPClient) SetPlacementRule(_ context.Context, rule *pdhttp.Rule) error

func (*FakePDHTTPClient) SetSchedulerDelay

func (fpdh *FakePDHTTPClient) SetSchedulerDelay(_ context.Context, key string, delay int64) error

type FakeSplitClient

type FakeSplitClient struct {
	SplitClient
	// contains filtered or unexported fields
}

func NewFakeSplitClient

func NewFakeSplitClient() *FakeSplitClient

func (*FakeSplitClient) AppendPdRegion

func (f *FakeSplitClient) AppendPdRegion(region *pd.Region)

func (*FakeSplitClient) AppendRegion

func (f *FakeSplitClient) AppendRegion(startKey, endKey []byte)

func (*FakeSplitClient) ScanRegions

func (f *FakeSplitClient) ScanRegions(
	ctx context.Context,
	startKey, endKey []byte,
	limit int,
) ([]*RegionInfo, error)

func (*FakeSplitClient) WaitRegionsScattered

func (f *FakeSplitClient) WaitRegionsScattered(context.Context, []*RegionInfo) (int, error)

type MockPDClientForSplit

type MockPDClientForSplit struct {
	pd.Client

	Regions *pdtypes.RegionTree
	// contains filtered or unexported fields
}

MockPDClientForSplit is a mock PD client for testing split and scatter.

func NewMockPDClientForSplit

func NewMockPDClientForSplit() *MockPDClientForSplit

NewMockPDClientForSplit creates a new MockPDClientForSplit.

func (*MockPDClientForSplit) BatchScanRegions

func (c *MockPDClientForSplit) BatchScanRegions(
	_ context.Context,
	keyRanges []pd.KeyRange,
	limit int,
	_ ...pd.GetRegionOption,
) ([]*pd.Region, error)

func (*MockPDClientForSplit) GetOperator

func (c *MockPDClientForSplit) GetOperator(_ context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)

func (*MockPDClientForSplit) GetRegionByID

func (c *MockPDClientForSplit) GetRegionByID(_ context.Context, regionID uint64, _ ...pd.GetRegionOption) (*pd.Region, error)

func (*MockPDClientForSplit) GetStore

func (c *MockPDClientForSplit) GetStore(_ context.Context, storeID uint64) (*metapb.Store, error)

func (*MockPDClientForSplit) ScanRegions

func (c *MockPDClientForSplit) ScanRegions(
	_ context.Context,
	key, endKey []byte,
	limit int,
	_ ...pd.GetRegionOption,
) ([]*pd.Region, error)

func (*MockPDClientForSplit) ScatterRegion

func (c *MockPDClientForSplit) ScatterRegion(_ context.Context, regionID uint64) error

func (*MockPDClientForSplit) ScatterRegions

func (c *MockPDClientForSplit) ScatterRegions(_ context.Context, regionIDs []uint64, _ ...pd.RegionsOption) (*pdpb.ScatterRegionResponse, error)

func (*MockPDClientForSplit) SetRegions

func (c *MockPDClientForSplit) SetRegions(boundaries [][]byte) []*metapb.Region

func (*MockPDClientForSplit) SetStores

func (c *MockPDClientForSplit) SetStores(stores map[uint64]*metapb.Store)

func (*MockPDClientForSplit) SplitRegion

func (c *MockPDClientForSplit) SplitRegion(
	region *RegionInfo,
	keys [][]byte,
	isRawKV bool,
) (bool, *kvrpcpb.SplitRegionResponse, error)

type PipelineRegionsSplitter

type PipelineRegionsSplitter interface {
	Splitter
	ExecuteRegions(ctx context.Context, splitHelper *SplitHelperIterator) error // Method for executing pipeline-based splitting
}

PipelineRegionsSplitter defines the interface for advanced (pipeline) splitting strategies. log / compacted sst files restore need to use this to split after full restore. and the splitter must perform with a control. so we choose to split and restore in a continuous flow.

func NewPipelineRegionsSplitter

func NewPipelineRegionsSplitter(
	client SplitClient,
	splitSize uint64,
	splitKeys int64,
) PipelineRegionsSplitter

type PipelineRegionsSplitterImpl

type PipelineRegionsSplitterImpl struct {
	*RegionSplitter
	// contains filtered or unexported fields
}

func (*PipelineRegionsSplitterImpl) ExecuteRegions

func (r *PipelineRegionsSplitterImpl) ExecuteRegions(ctx context.Context, splitHelper *SplitHelperIterator) error

type RegionInfo

type RegionInfo struct {
	Region       *metapb.Region
	Leader       *metapb.Peer
	PendingPeers []*metapb.Peer
	DownPeers    []*metapb.Peer
}

RegionInfo includes a region and the leader of the region.

func PaginateScanRegion

func PaginateScanRegion(
	ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
) ([]*RegionInfo, error)

PaginateScanRegion scan regions with a limit pagination and return all regions at once. The returned regions are continuous and cover the key range. If not, or meet errors, it will retry internally.

func ScanRegionsWithRetry

func ScanRegionsWithRetry(
	ctx context.Context, client SplitClient, startKey, endKey []byte, limit int,
) ([]*RegionInfo, error)

func (*RegionInfo) ContainsInterior

func (region *RegionInfo) ContainsInterior(key []byte) bool

ContainsInterior returns whether the region contains the given key, and also that the key does not fall on the boundary (start key) of the region.

func (*RegionInfo) ToZapFields

func (region *RegionInfo) ToZapFields() zap.Field

ToZapFields returns zap fields for the RegionInfo. It can handle nil RegionInfo.

type RegionSplitter

type RegionSplitter struct {
	// contains filtered or unexported fields
}

RegionSplitter is a executor of region split by rules.

func NewRegionSplitter

func NewRegionSplitter(client SplitClient) *RegionSplitter

NewRegionSplitter returns a new RegionSplitter.

func (*RegionSplitter) ExecuteSortedKeys

func (rs *RegionSplitter) ExecuteSortedKeys(
	ctx context.Context,
	sortedSplitKeys [][]byte,
) error

ExecuteSortedKeys executes regions split and make sure new splitted regions are balance. It will split regions by the rewrite rules, then it will split regions by the end key of each range. tableRules includes the prefix of a table, since some ranges may have a prefix with record sequence or index sequence. note: all ranges and rewrite rules must have raw key.

func (*RegionSplitter) ExecuteSortedKeysOnRegion

func (rs *RegionSplitter) ExecuteSortedKeysOnRegion(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error)

ExecuteSortedKeysOnRegion expose the function `SplitWaitAndScatter` of split client.

func (*RegionSplitter) WaitForScatterRegionsTimeout

func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regionInfos []*RegionInfo, timeout time.Duration) int

type RewriteSplitter

type RewriteSplitter struct {
	RewriteKey []byte
	// contains filtered or unexported fields
}

func NewRewriteSpliter

func NewRewriteSpliter(
	rewriteKey []byte,
	tableID int64,
	rule *restoreutils.RewriteRules,
	splitter *SplitHelper,
) *RewriteSplitter

type Span

type Span = kv.KeyRange

Span is the type of an adjacent sub key space.

type SplitClient

type SplitClient interface {
	// GetStore gets a store by a store id.
	GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error)
	// GetRegion gets a region which includes a specified key.
	GetRegion(ctx context.Context, key []byte) (*RegionInfo, error)
	// GetRegionByID gets a region by a region id.
	GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error)
	// SplitKeysAndScatter splits the related regions of the keys and scatters the
	// new regions. It returns the new regions that need to be called with
	// WaitRegionsScattered.
	SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][]byte) ([]*RegionInfo, error)

	// SplitWaitAndScatter splits a region from a batch of keys, waits for the split
	// is finished, and scatters the new regions. It will return the original region,
	// new regions and error. The input keys should not be encoded.
	//
	// The split step has a few retry times. If it meets error, the error is returned
	// directly.
	//
	// The split waiting step has a backoff retry logic, if split has made progress,
	// it will not increase the retry counter. Otherwise, it will retry for about 1h.
	// If the retry is timeout, it will log a warning and continue.
	//
	// The scatter step has a few retry times. If it meets error, it will log a
	// warning and continue.
	// TODO(lance6716): remove this function in interface after BR uses SplitKeysAndScatter.
	SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error)
	// GetOperator gets the status of operator of the specified region.
	GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
	// ScanRegions gets a list of regions, starts from the region that contains key.
	// Limit limits the maximum number of regions returned.
	ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error)
	// GetPlacementRule loads a placement rule from PD.
	GetPlacementRule(ctx context.Context, groupID, ruleID string) (*pdhttp.Rule, error)
	// SetPlacementRule insert or update a placement rule to PD.
	SetPlacementRule(ctx context.Context, rule *pdhttp.Rule) error
	// DeletePlacementRule removes a placement rule from PD.
	DeletePlacementRule(ctx context.Context, groupID, ruleID string) error
	// SetStoresLabel add or update specified label of stores. If labelValue
	// is empty, it clears the label.
	SetStoresLabel(ctx context.Context, stores []uint64, labelKey, labelValue string) error
	// WaitRegionsScattered waits for an already started scatter region action to
	// finish. Internally it will backoff and retry at the maximum internal of 2
	// seconds. If the scatter makes progress during the retry, it will not decrease
	// the retry counter. If there's always no progress, it will retry for about 1h.
	// Caller can set the context timeout to control the max waiting time.
	//
	// The first return value is always the number of regions that are not finished
	// scattering no matter what the error is.
	WaitRegionsScattered(ctx context.Context, regionInfos []*RegionInfo) (notFinished int, err error)
}

SplitClient is an external client used by RegionSplitter.

func NewClient

func NewClient(
	client pd.Client,
	httpCli pdhttp.Client,
	tlsConf *tls.Config,
	splitBatchKeyCnt int,
	splitConcurrency int,
	opts ...ClientOptionalParameter,
) SplitClient

NewClient creates a SplitClient.

splitBatchKeyCnt controls how many keys are sent to TiKV in a batch in split region API. splitConcurrency controls how many regions are split concurrently.

type SplitHelper

type SplitHelper struct {
	// contains filtered or unexported fields
}

SplitHelper represents a set of valued ranges, which doesn't overlap and union of them all is the full key space.

func NewSplitHelper

func NewSplitHelper() *SplitHelper

NewSplitHelper creates a set of a subset of spans, with the full key space as initial status

func (*SplitHelper) Merge

func (f *SplitHelper) Merge(val Valued)

func (*SplitHelper) Traverse

func (f *SplitHelper) Traverse(m func(Valued) bool)

traverse the items in ascend order

type SplitHelperIterator

type SplitHelperIterator struct {
	// contains filtered or unexported fields
}

func NewSplitHelperIterator

func NewSplitHelperIterator(tableSplitters []*RewriteSplitter) *SplitHelperIterator

func (*SplitHelperIterator) Traverse

func (iter *SplitHelperIterator) Traverse(fn func(v Valued, endKey []byte, rule *restoreutils.RewriteRules) bool)

type SplitStrategy

type SplitStrategy[T any] interface {
	// Accumulate adds a new value into the split strategy's internal state.
	// This method accumulates data points or values, preparing them for potential splitting.
	Accumulate(T)
	// ShouldSplit checks if the accumulated values meet the criteria for triggering a split.
	ShouldSplit() bool
	// Skip the file by checkpoints or invalid files
	ShouldSkip(T) bool
	// GetAccumulations returns an iterator for the accumulated values.
	GetAccumulations() *SplitHelperIterator
	// Reset the buffer for next round
	ResetAccumulations()
}

SplitStrategy defines how values should be accumulated and when to trigger a split.

type Splitter

type Splitter interface {
	// ExecuteSortedKeysOnRegion splits the keys within a single region and initiates scattering
	// after the region has been successfully split.
	ExecuteSortedKeysOnRegion(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error)

	// ExecuteSortedKeys splits all provided keys while ensuring that the newly created
	// regions are balanced.
	ExecuteSortedKeys(ctx context.Context, keys [][]byte) error

	// WaitForScatterRegionsTimeout blocks until all regions have finished scattering,
	// or until the specified timeout duration has elapsed.
	WaitForScatterRegionsTimeout(ctx context.Context, regionInfos []*RegionInfo, timeout time.Duration) int
}

Splitter defines the interface for basic splitting strategies.

type TestClient

type TestClient struct {
	SplitClient
	pd.Client

	Regions     map[uint64]*RegionInfo
	RegionsInfo *pdtypes.RegionTree // For now it's only used in ScanRegions

	InjectErr   bool
	InjectTimes int32
	// contains filtered or unexported fields
}

TODO consilodate TestClient and MockPDClientForSplit and FakePDClient into one test client.

func NewTestClient

func NewTestClient(
	stores map[uint64]*metapb.Store,
	regions map[uint64]*RegionInfo,
	nextRegionID uint64,
) *TestClient

func (*TestClient) GetAllRegions

func (c *TestClient) GetAllRegions() map[uint64]*RegionInfo

func (*TestClient) GetOperator

func (*TestClient) GetPDClient

func (c *TestClient) GetPDClient() *FakePDClient

func (*TestClient) GetRegion

func (c *TestClient) GetRegion(ctx context.Context, key []byte) (*RegionInfo, error)

func (*TestClient) GetRegionByID

func (c *TestClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error)

func (*TestClient) GetStore

func (c *TestClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error)

func (*TestClient) ScanRegions

func (c *TestClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error)

func (*TestClient) SplitWaitAndScatter

func (c *TestClient) SplitWaitAndScatter(_ context.Context, _ *RegionInfo, keys [][]byte) ([]*RegionInfo, error)

func (*TestClient) WaitRegionsScattered

func (c *TestClient) WaitRegionsScattered(context.Context, []*RegionInfo) (int, error)

type Value

type Value struct {
	Size   uint64
	Number int64
}

Value is the value type of stored in the span tree.

type Valued

type Valued struct {
	Key   Span
	Value Value
}

Valued is span binding to a value, which is the entry type of span tree.

func NewValued

func NewValued(startKey, endKey []byte, value Value) Valued

func (Valued) GetEndKey

func (v Valued) GetEndKey() []byte

implement for `AppliedFile`

func (Valued) GetStartKey

func (v Valued) GetStartKey() []byte

implement for `AppliedFile`

func (Valued) Less

func (v Valued) Less(other btree.Item) bool

func (Valued) String

func (v Valued) String() string

type WaitRegionOnlineBackoffer

type WaitRegionOnlineBackoffer struct {
	Stat utils.RetryState
}

TODO: merge with backoff.go

func NewWaitRegionOnlineBackoffer

func NewWaitRegionOnlineBackoffer() *WaitRegionOnlineBackoffer

NewWaitRegionOnlineBackoffer create a backoff to wait region online.

func (*WaitRegionOnlineBackoffer) NextBackoff

func (b *WaitRegionOnlineBackoffer) NextBackoff(err error) time.Duration

NextBackoff returns a duration to wait before retrying again

func (*WaitRegionOnlineBackoffer) RemainingAttempts

func (b *WaitRegionOnlineBackoffer) RemainingAttempts() int

RemainingAttempts returns the remain attempt times

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL