external

package
v1.1.0-beta.0...-c0b86a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: Apache-2.0, Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultBlockSize = 16 * units.MiB

DefaultBlockSize is the default block size for preAllocKVBuf.

View Source
const (
	// DefaultMemSizeLimit is the default memory size limit for writer.
	DefaultMemSizeLimit = 256 * size.MB
)

Variables

View Source
var (
	// ConcurrentReaderBufferSizePerConc is the buffer size for concurrent reader per
	// concurrency.
	ConcurrentReaderBufferSizePerConc = 4 * 1024 * 1024
	// ConcurrentReaderConcurrency is the concurrency for concurrent reader.
	ConcurrentReaderConcurrency = 8
)
View Source
var (

	// MergeSortOverlapThreshold is the threshold of overlap between sorted kv files.
	// if the overlap ratio is greater than this threshold, we will merge the files.
	MergeSortOverlapThreshold int64 = 1000
	// MergeSortFileCountStep is the step of file count when we split the sorted kv files.
	MergeSortFileCountStep = 1000
)
View Source
var LargeRegionSplitDataThreshold = int(config.SplitRegionSize)

LargeRegionSplitDataThreshold is exposed for test.

Functions

func BytesMax

func BytesMax(a, b []byte) []byte

BytesMax returns the largest of byte slice a and b.

func BytesMin

func BytesMin(a, b []byte) []byte

BytesMin returns the smallest of byte slice a and b.

func CleanUpFiles

func CleanUpFiles(ctx context.Context, store storage.ExternalStorage, subDir string) error

CleanUpFiles delete all data and stat files under one subDir.

func GetAllFileNames

func GetAllFileNames(
	ctx context.Context,
	store storage.ExternalStorage,
	subDir string,
) ([]string, []string, error)

GetAllFileNames returns data file paths and stat file paths. Both paths are sorted.

func GetMaxOverlapping

func GetMaxOverlapping(points []Endpoint) int64

GetMaxOverlapping returns the maximum overlapping weight treating given `points` as endpoints of intervals. `points` are not required to be sorted, and will be sorted in-place in this function.

func GetMaxOverlappingTotal

func GetMaxOverlappingTotal(stats []MultipleFilesStat) int64

GetMaxOverlappingTotal assume the most overlapping case from given stats and returns the overlapping level.

func MergeOverlappingFiles

func MergeOverlappingFiles(ctx context.Context, paths []string, store storage.ExternalStorage, readBufferSize int,
	newFilePrefix string, blockSize int, writeBatchCount uint64, propSizeDist uint64, propKeysDist uint64,
	onClose OnCloseFunc, concurrency int, checkHotspot bool) error

MergeOverlappingFiles reads from given files whose key range may overlap and writes to new sorted, nonoverlapping files.

func MockExternalEngine

func MockExternalEngine(
	storage storage.ExternalStorage,
	keys [][]byte,
	values [][]byte,
) (dataFiles []string, statsFiles []string, err error)

MockExternalEngine generates an external engine with the given keys and values.

func MockExternalEngineWithWriter

func MockExternalEngineWithWriter(
	storage storage.ExternalStorage,
	writer *Writer,
	subDir string,
	keys [][]byte,
	values [][]byte,
) (dataFiles []string, statsFiles []string, err error)

MockExternalEngineWithWriter generates an external engine with the given writer, keys and values.

func NewExternalEngine

func NewExternalEngine(
	storage storage.ExternalStorage,
	dataFiles []string,
	statsFiles []string,
	startKey []byte,
	endKey []byte,
	splitKeys [][]byte,
	regionSplitSize int64,
	keyAdapter common.KeyAdapter,
	duplicateDetection bool,
	duplicateDB *pebble.DB,
	dupDetectOpt common.DupDetectOpt,
	workerConcurrency int,
	ts uint64,
	totalKVSize int64,
	totalKVCount int64,
	checkHotspot bool,
) common.Engine

NewExternalEngine creates an (external) engine.

Types

type Endpoint

type Endpoint struct {
	Key    []byte
	Tp     EndpointTp
	Weight int64 // all EndpointTp use positive weight
}

Endpoint represents an endpoint of an interval which can be used by GetMaxOverlapping.

type EndpointTp

type EndpointTp int

EndpointTp is the type of Endpoint.Key.

const (
	// ExclusiveEnd represents "..., Endpoint.Key)".
	ExclusiveEnd EndpointTp = iota
	// InclusiveStart represents "[Endpoint.Key, ...".
	InclusiveStart
	// InclusiveEnd represents "..., Endpoint.Key]".
	InclusiveEnd
)

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine stored sorted key/value pairs in an external storage.

func (*Engine) Close

func (e *Engine) Close() error

Close implements common.Engine.

func (*Engine) GetKeyRange

func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error)

GetKeyRange implements common.Engine.

func (*Engine) ID

func (e *Engine) ID() string

ID is the identifier of an engine.

func (*Engine) ImportedStatistics

func (e *Engine) ImportedStatistics() (importedSize int64, importedKVCount int64)

ImportedStatistics returns the imported kv size and imported kv count.

func (*Engine) KVStatistics

func (e *Engine) KVStatistics() (totalKVSize int64, totalKVCount int64)

KVStatistics returns the total kv size and total kv count.

func (*Engine) LoadIngestData

func (e *Engine) LoadIngestData(
	ctx context.Context,
	regionRanges []common.Range,
	outCh chan<- common.DataAndRange,
) error

LoadIngestData loads the data from the external storage to memory in [start, end) range, so local backend can ingest it. The used byte slice of ingest data are allocated from Engine.bufPool and must be released by MemoryIngestData.DecRef().

func (*Engine) SplitRanges

func (e *Engine) SplitRanges(
	startKey, endKey []byte,
	_, _ int64,
	_ log.Logger,
) ([]common.Range, error)

SplitRanges split the ranges by split keys provided by external engine.

type EngineWriter

type EngineWriter struct {
	// contains filtered or unexported fields
}

EngineWriter implements backend.EngineWriter interface.

func NewEngineWriter

func NewEngineWriter(w *Writer) *EngineWriter

NewEngineWriter creates a new EngineWriter.

func (*EngineWriter) AppendRows

func (e *EngineWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error

AppendRows implements backend.EngineWriter interface.

func (*EngineWriter) Close

Close implements backend.EngineWriter interface.

func (*EngineWriter) IsSynced

func (e *EngineWriter) IsSynced() bool

IsSynced implements backend.EngineWriter interface.

type KeyValueStore

type KeyValueStore struct {
	// contains filtered or unexported fields
}

KeyValueStore stores key-value pairs and maintains the range properties.

func NewKeyValueStore

func NewKeyValueStore(
	ctx context.Context,
	dataWriter storage.ExternalFileWriter,
	rangePropertiesCollector *rangePropertiesCollector,
) (*KeyValueStore, error)

NewKeyValueStore creates a new KeyValueStore. The data will be written to the given dataWriter and range properties will be maintained in the given rangePropertiesCollector.

func (*KeyValueStore) Close

func (s *KeyValueStore) Close()

Close closes the KeyValueStore and append the last range property.

type MemoryIngestData

type MemoryIngestData struct {
	// contains filtered or unexported fields
}

MemoryIngestData is the in-memory implementation of IngestData.

func (*MemoryIngestData) DecRef

func (m *MemoryIngestData) DecRef()

DecRef implements IngestData.DecRef.

func (*MemoryIngestData) Finish

func (m *MemoryIngestData) Finish(totalBytes, totalCount int64)

Finish implements IngestData.Finish.

func (*MemoryIngestData) GetFirstAndLastKey

func (m *MemoryIngestData) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error)

GetFirstAndLastKey implements IngestData.GetFirstAndLastKey.

func (*MemoryIngestData) GetTS

func (m *MemoryIngestData) GetTS() uint64

GetTS implements IngestData.GetTS.

func (*MemoryIngestData) IncRef

func (m *MemoryIngestData) IncRef()

IncRef implements IngestData.IncRef.

func (*MemoryIngestData) NewIter

func (m *MemoryIngestData) NewIter(ctx context.Context, lowerBound, upperBound []byte) common.ForwardIter

NewIter implements IngestData.NewIter.

type MergeKVIter

type MergeKVIter struct {
	// contains filtered or unexported fields
}

MergeKVIter is an iterator that merges multiple sorted KV pairs from different files.

func NewMergeKVIter

func NewMergeKVIter(
	ctx context.Context,
	paths []string,
	pathsStartOffset []uint64,
	exStorage storage.ExternalStorage,
	readBufferSize int,
	checkHotspot bool,
) (*MergeKVIter, error)

NewMergeKVIter creates a new MergeKVIter. The KV can be accessed by calling Next() then Key() or Values(). readBufferSize is the buffer size for each file reader, which means the total memory usage is readBufferSize * len(paths).

func (*MergeKVIter) Close

func (i *MergeKVIter) Close() error

Close closes the iterator.

func (*MergeKVIter) Error

func (i *MergeKVIter) Error() error

Error returns the error of the iterator.

func (*MergeKVIter) Key

func (i *MergeKVIter) Key() []byte

Key returns the current key.

func (*MergeKVIter) Next

func (i *MergeKVIter) Next() bool

Next moves the iterator to the next position. When it returns false, the iterator is not usable.

func (*MergeKVIter) Value

func (i *MergeKVIter) Value() []byte

Value returns the current value.

type MergePropIter

type MergePropIter struct {
	// contains filtered or unexported fields
}

MergePropIter is an iterator that merges multiple range properties from different files.

func NewMergePropIter

func NewMergePropIter(
	ctx context.Context,
	paths []string,
	exStorage storage.ExternalStorage,
	checkHotSpot bool,
) (*MergePropIter, error)

NewMergePropIter creates a new MergePropIter.

func (*MergePropIter) Close

func (i *MergePropIter) Close() error

Close closes the iterator.

func (*MergePropIter) Error

func (i *MergePropIter) Error() error

Error returns the error of the iterator.

func (*MergePropIter) Next

func (i *MergePropIter) Next() bool

Next moves the iterator to the next position.

type MultipleFilesStat

type MultipleFilesStat struct {
	MinKey            tidbkv.Key  `json:"min-key"`
	MaxKey            tidbkv.Key  `json:"max-key"`
	Filenames         [][2]string `json:"filenames"` // [dataFile, statFile]
	MaxOverlappingNum int64       `json:"max-overlapping-num"`
}

MultipleFilesStat is the statistic information of multiple files (currently every 500 files). It is used to estimate the data overlapping, and per-file statistic information maybe too big to loaded into memory.

type OnCloseFunc

type OnCloseFunc func(summary *WriterSummary)

OnCloseFunc is the callback function when a writer is closed.

type RangeSplitter

type RangeSplitter struct {
	// contains filtered or unexported fields
}

RangeSplitter is used to split key ranges of an external engine. It will return one group of ranges by invoking `SplitOneRangesGroup` once.

func NewRangeSplitter

func NewRangeSplitter(
	ctx context.Context,
	dataFiles, statFiles []string,
	externalStorage storage.ExternalStorage,
	rangesGroupSize, rangesGroupKeys int64,
	maxRangeSize, maxRangeKeys int64,
	checkHotSpot bool,
) (*RangeSplitter, error)

NewRangeSplitter creates a new RangeSplitter. `dataFiles` and `statFiles` must be corresponding to each other. `rangesGroupSize` and `rangesGroupKeys` controls the total range group size of one `SplitOneRangesGroup` invocation, while `rangeSize` and `rangeKeys` controls the size of one range.

func (*RangeSplitter) Close

func (r *RangeSplitter) Close() error

Close release the resources of RangeSplitter.

func (*RangeSplitter) GetRangeSplitSize

func (r *RangeSplitter) GetRangeSplitSize() int64

GetRangeSplitSize returns the expected size of one range.

func (*RangeSplitter) SplitOneRangesGroup

func (r *RangeSplitter) SplitOneRangesGroup() (
	endKeyOfGroup []byte,
	dataFiles []string,
	statFiles []string,
	rangeSplitKeys [][]byte,
	err error,
)

SplitOneRangesGroup splits one group of ranges. `endKeyOfGroup` represents the end key of the group, but it will be nil when the group is the last one. `dataFiles` and `statFiles` are all the files that have overlapping key ranges in this group. `rangeSplitKeys` are the internal split keys of the ranges in this group.

type SortedKVMeta

type SortedKVMeta struct {
	StartKey           []byte              `json:"start-key"`
	EndKey             []byte              `json:"end-key"` // exclusive
	TotalKVSize        uint64              `json:"total-kv-size"`
	MultipleFilesStats []MultipleFilesStat `json:"multiple-files-stats"`
}

SortedKVMeta is the meta of sorted kv.

func NewSortedKVMeta

func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta

NewSortedKVMeta creates a SortedKVMeta from a WriterSummary. If the summary is empty, it will return a pointer to zero SortedKVMeta.

func (*SortedKVMeta) GetDataFiles

func (m *SortedKVMeta) GetDataFiles() []string

GetDataFiles returns all data files in the meta.

func (*SortedKVMeta) GetStatFiles

func (m *SortedKVMeta) GetStatFiles() []string

GetStatFiles returns all stat files in the meta.

func (*SortedKVMeta) Merge

func (m *SortedKVMeta) Merge(other *SortedKVMeta)

Merge merges the other SortedKVMeta into this one.

func (*SortedKVMeta) MergeSummary

func (m *SortedKVMeta) MergeSummary(summary *WriterSummary)

MergeSummary merges the WriterSummary into this SortedKVMeta.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is used to write data into external storage.

func (*Writer) Close

func (w *Writer) Close(ctx context.Context) error

Close closes the writer.

func (*Writer) LockForWrite

func (w *Writer) LockForWrite() func()

LockForWrite implements ingest.Writer. Since flushKVs is thread-safe in external storage writer, this is implemented as noop.

func (*Writer) WriteRow

func (w *Writer) WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error

WriteRow implements ingest.Writer.

type WriterBuilder

type WriterBuilder struct {
	// contains filtered or unexported fields
}

WriterBuilder builds a new Writer.

func NewWriterBuilder

func NewWriterBuilder() *WriterBuilder

NewWriterBuilder creates a WriterBuilder.

func (*WriterBuilder) Build

func (b *WriterBuilder) Build(
	store storage.ExternalStorage,
	prefix string,
	writerID string,
) *Writer

Build builds a new Writer. The files writer will create are under the prefix of "{prefix}/{writerID}".

func (*WriterBuilder) SetBlockSize

func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder

SetBlockSize sets the block size of pre-allocated buf in the writer.

func (*WriterBuilder) SetKeyDuplicationEncoding

func (b *WriterBuilder) SetKeyDuplicationEncoding(val bool) *WriterBuilder

SetKeyDuplicationEncoding sets if the writer can distinguish duplicate key.

func (*WriterBuilder) SetMemorySizeLimit

func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder

SetMemorySizeLimit sets the memory size limit of the writer. When accumulated data size exceeds this limit, the writer will flush data as a file to external storage.

func (*WriterBuilder) SetOnCloseFunc

func (b *WriterBuilder) SetOnCloseFunc(onClose OnCloseFunc) *WriterBuilder

SetOnCloseFunc sets the callback function when a writer is closed.

func (*WriterBuilder) SetPropKeysDistance

func (b *WriterBuilder) SetPropKeysDistance(dist uint64) *WriterBuilder

SetPropKeysDistance sets the distance of range keys for each property.

func (*WriterBuilder) SetPropSizeDistance

func (b *WriterBuilder) SetPropSizeDistance(dist uint64) *WriterBuilder

SetPropSizeDistance sets the distance of range size for each property.

func (*WriterBuilder) SetWriterBatchCount

func (b *WriterBuilder) SetWriterBatchCount(count uint64) *WriterBuilder

SetWriterBatchCount sets the batch count of the writer.

type WriterSummary

type WriterSummary struct {
	WriterID string
	Seq      int
	// Min and Max are the min and max key written by this writer, both are
	// inclusive, i.e. [Min, Max].
	// will be empty if no key is written.
	Min                tidbkv.Key
	Max                tidbkv.Key
	TotalSize          uint64
	MultipleFilesStats []MultipleFilesStat
}

WriterSummary is the summary of a writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL