local

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2023 License: Apache-2.0, Apache-2.0 Imports: 77 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// RunInTest indicates whether the current process is running in test.
	RunInTest bool
	// LastAlloc is the last ID allocator.
	LastAlloc manual.Allocator
)
View Source
var BuildDuplicateTaskForTest = func(m *DuplicateManager) ([]dupTask, error) {
	return m.buildDupTasks()
}

BuildDuplicateTaskForTest is only used for test.

View Source
var CheckTiFlashVersionForTest = checkTiFlashVersion

CheckTiFlashVersionForTest is only used for tests.

View Source
var TiFlashReplicaQueryForTest = tiFlashReplicaQuery

TiFlashReplicaQueryForTest is only used for tests.

Functions

func NewEncodingBuilder

func NewEncodingBuilder(ctx context.Context) backend.EncodingBuilder

NewEncodingBuilder creates an KVEncodingBuilder with local backend implementation.

func NewLocalBackend

func NewLocalBackend(
	ctx context.Context,
	tls *common.TLS,
	cfg *config.Config,
	g glue.Glue,
	maxOpenFiles int,
	errorMgr *errormanager.ErrorManager,
) (backend.Backend, error)

NewLocalBackend creates new connections to tikv.

func NewTargetInfoGetter

func NewTargetInfoGetter(tls *common.TLS, g glue.Glue, pdCli pd.Client) backend.TargetInfoGetter

NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.

func VerifyRLimit

func VerifyRLimit(estimateMaxFiles Rlim_t) error

VerifyRLimit checks whether the open-file limit is large enough. In Local-backend, we need to read and write a lot of L0 SST files, so we need to check system max open files limit.

Types

type DupKVStream

type DupKVStream interface {
	// Next returns the next key-value pair or any error it encountered.
	// At the end of the stream, the error is io.EOF.
	Next() (key, val []byte, err error)
	// Close closes the stream.
	Close() error
}

DupKVStream is a streaming interface for collecting duplicate key-value pairs.

type DuplicateManager

type DuplicateManager struct {
	// contains filtered or unexported fields
}

DuplicateManager provides methods to collect and decode duplicated KV pairs into row data. The results are stored into the errorMgr.

func NewDuplicateManager

func NewDuplicateManager(
	tbl table.Table,
	tableName string,
	splitCli split.SplitClient,
	tikvCli *tikv.KVStore,
	errMgr *errormanager.ErrorManager,
	sessOpts *kv.SessionOptions,
	concurrency int,
	hasDupe *atomic.Bool,
	logger log.Logger,
) (*DuplicateManager, error)

NewDuplicateManager creates a new DuplicateManager.

func (*DuplicateManager) CollectDuplicateRowsFromDupDB

func (m *DuplicateManager) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter KeyAdapter) error

CollectDuplicateRowsFromDupDB collects duplicates from the duplicate DB and records all duplicate row info into errorMgr.

func (*DuplicateManager) CollectDuplicateRowsFromTiKV

func (m *DuplicateManager) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory) error

CollectDuplicateRowsFromTiKV collects duplicates from the remote TiKV and records all duplicate row info into errorMgr.

func (*DuplicateManager) RecordDataConflictError

func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream DupKVStream) error

RecordDataConflictError records data conflicts to errorMgr. The key received from stream must be a row key.

func (*DuplicateManager) RecordIndexConflictError

func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo) error

RecordIndexConflictError records index conflicts to errorMgr. The key received from stream must be an index key.

type Engine

type Engine struct {
	UUID uuid.UUID
	// contains filtered or unexported fields
}

func (*Engine) Cleanup

func (e *Engine) Cleanup(dataDir string) error

Cleanup remove meta and db files

func (*Engine) Close

func (e *Engine) Close() error

Close closes the engine and release all resources.

func (*Engine) Exist

func (e *Engine) Exist(dataDir string) error

Exist checks if db folder existing (meta sometimes won't flush before lightning exit)

func (*Engine) TotalMemorySize

func (e *Engine) TotalMemorySize() int64

type ImportClientFactory

type ImportClientFactory interface {
	Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error)
	Close()
}

ImportClientFactory is factory to create new import client for specific store.

type Iter

type Iter interface {
	// Seek seek to specify position.
	// if key not found, seeks next key position in iter.
	Seek(key []byte) bool
	// Error return current error on this iter.
	Error() error
	// First moves this iter to the first key.
	First() bool
	// Last moves this iter to the last key.
	Last() bool
	// Valid check this iter reach the end.
	Valid() bool
	// Next moves this iter forward.
	Next() bool
	// Key represents current position pair's key.
	Key() []byte
	// Value represents current position pair's Value.
	Value() []byte
	// Close close this iter.
	Close() error
	// OpType represents operations of pair. currently we have two types.
	// 1. Put
	// 2. Delete
	OpType() sst.Pair_OP
}

Iter abstract iterator method for Ingester.

type KeyAdapter

type KeyAdapter interface {
	// Encode encodes the key with its corresponding rowID. It appends the encoded key to dst and returns the
	// resulting slice. The encoded key is guaranteed to be in ascending order for comparison.
	Encode(dst []byte, key []byte, rowID int64) []byte

	// Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice.
	Decode(dst []byte, data []byte) ([]byte, error)

	// EncodedLen returns the encoded key length.
	EncodedLen(key []byte) int
}

KeyAdapter is used to encode and decode keys.

type LocalDupKVStream

type LocalDupKVStream struct {
	// contains filtered or unexported fields
}

LocalDupKVStream implements the interface of DupKVStream. It collects duplicate key-value pairs from a pebble.DB.

func NewLocalDupKVStream

func NewLocalDupKVStream(dupDB *pebble.DB, keyAdapter KeyAdapter, keyRange tidbkv.KeyRange) *LocalDupKVStream

NewLocalDupKVStream creates a new LocalDupKVStream with the given duplicate db and key range.

func (*LocalDupKVStream) Close

func (s *LocalDupKVStream) Close() error

func (*LocalDupKVStream) Next

func (s *LocalDupKVStream) Next() (key, val []byte, err error)

type Range

type Range struct {
	// contains filtered or unexported fields
}

Range record start and end key for localStoreDir.DB so we can write it to tikv in streaming

type RangePropertiesCollector

type RangePropertiesCollector struct {
	// contains filtered or unexported fields
}

func (*RangePropertiesCollector) Add

func (c *RangePropertiesCollector) Add(key pebble.InternalKey, value []byte) error

Add implements `pebble.TablePropertyCollector`. Add implements `TablePropertyCollector.Add`.

func (*RangePropertiesCollector) Finish

func (c *RangePropertiesCollector) Finish(userProps map[string]string) error

func (*RangePropertiesCollector) Name

func (c *RangePropertiesCollector) Name() string

type RemoteDupKVStream

type RemoteDupKVStream struct {
	// contains filtered or unexported fields
}

RemoteDupKVStream implements the interface of DupKVStream. It collects duplicate key-value pairs from a TiKV region.

func NewRemoteDupKVStream

func NewRemoteDupKVStream(
	ctx context.Context,
	region *split.RegionInfo,
	keyRange tidbkv.KeyRange,
	importClientFactory ImportClientFactory,
) (*RemoteDupKVStream, error)

NewRemoteDupKVStream creates a new RemoteDupKVStream.

func (*RemoteDupKVStream) Close

func (s *RemoteDupKVStream) Close() error

func (*RemoteDupKVStream) Next

func (s *RemoteDupKVStream) Next() (key, val []byte, err error)

type Rlim_t

type Rlim_t = uint64

func GetSystemRLimit

func GetSystemRLimit() (Rlim_t, error)

type StoreWriteLimiter

type StoreWriteLimiter interface {
	WaitN(ctx context.Context, storeID uint64, n int) error
	Limit() int
}

type Writer

type Writer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Writer) AppendRows

func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, rows kv.Rows) error

func (*Writer) Close

func (*Writer) IsSynced

func (w *Writer) IsSynced() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL