backend

package
v0.0.0-...-41b4272 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2022 License: Apache-2.0, Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MakeUUID

func MakeUUID(tableName string, engineID int32) (string, uuid.UUID)

Types

type AbstractBackend

type AbstractBackend interface {
	// Close the connection to the backend.
	Close()

	// MakeEmptyRows creates an empty collection of encoded rows.
	MakeEmptyRows() kv.Rows

	// RetryImportDelay returns the duration to sleep when retrying an import
	RetryImportDelay() time.Duration

	// ShouldPostProcess returns whether KV-specific post-processing should be
	// performed for this backend. Post-processing includes checksum and analyze.
	ShouldPostProcess() bool

	// NewEncoder creates an encoder of a TiDB table.
	NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)

	OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error

	CloseEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error

	// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
	// it means there is duplicate detected. For this situation, all data in the engine must be imported.
	// It's safe to reset or cleanup this engine.
	ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error

	CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

	// CheckRequirements performs the check whether the backend satisfies the
	// version requirements
	CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error

	// FetchRemoteTableModels obtains the models of all tables given the schema
	// name. The returned table info does not need to be precise if the encoder,
	// is not requiring them, but must at least fill in the following fields for
	// TablesFromMeta to succeed:
	//  - Name
	//  - State (must be model.StatePublic)
	//  - ID
	//  - Columns
	//     * Name
	//     * State (must be model.StatePublic)
	//     * Offset (must be 0, 1, 2, ...)
	//  - PKIsHandle (true = do not generate _tidb_rowid)
	FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

	// FlushEngine ensures all KV pairs written to an open engine has been
	// synchronized, such that kill-9'ing Lightning afterwards and resuming from
	// checkpoint can recover the exact same content.
	//
	// This method is only relevant for local backend, and is no-op for all
	// other backends.
	FlushEngine(ctx context.Context, engineUUID uuid.UUID) error

	// FlushAllEngines performs FlushEngine on all opened engines. This is a
	// very expensive operation and should only be used in some rare situation
	// (e.g. preparing to resolve a disk quota violation).
	FlushAllEngines(ctx context.Context) error

	// EngineFileSizes obtains the size occupied locally of all engines managed
	// by this backend. This method is used to compute disk quota.
	// It can return nil if the content are all stored remotely.
	EngineFileSizes() []EngineFileSize

	// ResetEngine clears all written KV pairs in this opened engine.
	ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

	// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
	LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)

	// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
	//  may be repeated with other keys in local data source.
	CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)

	// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
	//  the data import by other lightning.
	CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)

	// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
	// according to the required algorithm.
	ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error
}

AbstractBackend is the abstract interface behind Backend. Implementations of this interface must be goroutine safe: you can share an instance and execute any method anywhere.

type Backend

type Backend struct {
	// contains filtered or unexported fields
}

Backend is the delivery target for Lightning

func MakeBackend

func MakeBackend(ab AbstractBackend) Backend

func (Backend) CheckDiskQuota

func (be Backend) CheckDiskQuota(quota int64) (
	largeEngines []uuid.UUID,
	inProgressLargeEngines int,
	totalDiskSize int64,
	totalMemSize int64,
)

CheckDiskQuota verifies if the total engine file size is below the given quota. If the quota is exceeded, this method returns an array of engines, which after importing can decrease the total size below quota.

func (Backend) CheckRequirements

func (be Backend) CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error

func (Backend) Close

func (be Backend) Close()

func (Backend) CollectLocalDuplicateRows

func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error)

func (Backend) CollectRemoteDuplicateRows

func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error)

func (Backend) FetchRemoteTableModels

func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

func (Backend) FlushAll

func (be Backend) FlushAll(ctx context.Context) error

func (Backend) MakeEmptyRows

func (be Backend) MakeEmptyRows() kv.Rows

func (Backend) NewEncoder

func (be Backend) NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)

func (Backend) OpenEngine

func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error)

OpenEngine opens an engine with the given table name and engine ID.

func (Backend) ResolveDuplicateRows

func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error

func (Backend) ShouldPostProcess

func (be Backend) ShouldPostProcess() bool

func (Backend) UnsafeCloseEngine

func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error)

UnsafeCloseEngine closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.

func (Backend) UnsafeCloseEngineWithUUID

func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID) (*ClosedEngine, error)

UnsafeCloseEngineWithUUID closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.

func (Backend) UnsafeImportAndReset

func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error

UnsafeImportAndReset forces the backend to import the content of an engine into the target and then reset the engine to empty. This method will not close the engine. Make sure the engine is flushed manually before calling this method.

type CheckCtx

type CheckCtx struct {
	DBMetas []*mydump.MDDatabaseMeta
}

CheckCtx contains all parameters used in CheckRequirements

type ChunkFlushStatus

type ChunkFlushStatus interface {
	Flushed() bool
}

type ClosedEngine

type ClosedEngine struct {
	// contains filtered or unexported fields
}

ClosedEngine represents a closed engine, allowing ingestion into the target. This type is goroutine safe: you can share an instance and execute any method anywhere.

func (*ClosedEngine) Cleanup

func (engine *ClosedEngine) Cleanup(ctx context.Context) error

Cleanup deletes the intermediate data from target.

func (*ClosedEngine) Import

func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error

Import the data written to the engine into the target.

func (*ClosedEngine) Logger

func (engine *ClosedEngine) Logger() log.Logger

type EngineConfig

type EngineConfig struct {
	// TableInfo is the corresponding tidb table info
	TableInfo *checkpoints.TidbTableInfo
	// local backend specified configuration
	Local *LocalEngineConfig
}

EngineConfig defines configuration used for open engine

type EngineFileSize

type EngineFileSize struct {
	// UUID is the engine's UUID.
	UUID uuid.UUID
	// DiskSize is the estimated total file size on disk right now.
	DiskSize int64
	// MemSize is the total memory size used by the engine. This is the
	// estimated additional size saved onto disk after calling Flush().
	MemSize int64
	// IsImporting indicates whether the engine performing Import().
	IsImporting bool
}

type EngineWriter

type EngineWriter interface {
	AppendRows(
		ctx context.Context,
		tableName string,
		columnNames []string,
		rows kv.Rows,
	) error
	IsSynced() bool
	Close(ctx context.Context) (ChunkFlushStatus, error)
}

type LocalEngineConfig

type LocalEngineConfig struct {
	// compact small SSTs before ingest into pebble
	Compact bool
	// raw kvs size threshold to trigger compact
	CompactThreshold int64
	// compact routine concurrency
	CompactConcurrency int
}

LocalEngineConfig is the configuration used for local backend in OpenEngine.

type LocalEngineWriter

type LocalEngineWriter struct {
	// contains filtered or unexported fields
}

func (*LocalEngineWriter) Close

func (*LocalEngineWriter) IsSynced

func (w *LocalEngineWriter) IsSynced() bool

func (*LocalEngineWriter) WriteRows

func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error

WriteRows writes a collection of encoded rows into the engine.

type LocalWriterConfig

type LocalWriterConfig struct {
	// is the chunk KV written to this LocalWriter sent in order
	IsKVSorted bool
}

LocalWriterConfig defines the configuration to open a LocalWriter

type OpenedEngine

type OpenedEngine struct {
	// contains filtered or unexported fields
}

OpenedEngine is an opened engine, allowing data to be written via WriteRows. This type is goroutine safe: you can share an instance and execute any method anywhere.

func (*OpenedEngine) Close

func (engine *OpenedEngine) Close(ctx context.Context, cfg *EngineConfig) (*ClosedEngine, error)

Close the opened engine to prepare it for importing.

func (*OpenedEngine) Flush

func (engine *OpenedEngine) Flush(ctx context.Context) error

Flush current written data for local backend

func (*OpenedEngine) LocalWriter

func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL