Documentation ¶
Index ¶
- func MakeUUID(tableName string, engineID int64) (string, uuid.UUID)
- type Backend
- type CheckCtx
- type ChunkFlushStatus
- type ClosedEngine
- type EngineConfig
- type EngineFileSize
- type EngineManager
- func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error)
- func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error)
- func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, ...) (*ClosedEngine, error)
- type EngineWriter
- type ExternalEngineConfig
- type LocalEngineConfig
- type LocalWriterConfig
- type OpenedEngine
- func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error)
- func (engine *OpenedEngine) Flush(ctx context.Context) error
- func (engine *OpenedEngine) GetEngineUUID() uuid.UUID
- func (en OpenedEngine) GetID() int32
- func (en OpenedEngine) GetUUID() uuid.UUID
- func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error)
- type TargetInfoGetter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Backend ¶
type Backend interface { // Close the connection to the backend. Close() // RetryImportDelay returns the duration to sleep when retrying an import RetryImportDelay() time.Duration // ShouldPostProcess returns whether KV-specific post-processing should be // performed for this backend. Post-processing includes checksum and analyze. ShouldPostProcess() bool OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error CloseEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error // ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected, // it means there is duplicate detected. For this situation, all data in the engine must be imported. // It's safe to reset or cleanup this engine. ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error // FlushEngine ensures all KV pairs written to an open engine has been // synchronized, such that kill-9'ing Lightning afterwards and resuming from // checkpoint can recover the exact same content. // // This method is only relevant for local backend, and is no-op for all // other backends. FlushEngine(ctx context.Context, engineUUID uuid.UUID) error // FlushAllEngines performs FlushEngine on all opened engines. This is a // very expensive operation and should only be used in some rare situation // (e.g. preparing to resolve a disk quota violation). FlushAllEngines(ctx context.Context) error // ResetEngine clears all written KV pairs in this opened engine. ResetEngine(ctx context.Context, engineUUID uuid.UUID) error // LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine. LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error) }
Backend defines the interface for a backend. Implementations of this interface must be goroutine safe: you can share an instance and execute any method anywhere. Usual workflow:
- Create a `Backend` for the whole process.
- For each table, i. Split into multiple "batches" consisting of data files with roughly equal total size. ii. For each batch, a. Create an `OpenedEngine` via `backend.OpenEngine()` b. For each chunk, deliver data into the engine via `engine.WriteRows()` c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()` d. Import data via `engine.Import()` e. Cleanup via `engine.Cleanup()`
- Close the connection via `backend.Close()`
type CheckCtx ¶
type CheckCtx struct {
DBMetas []*mydump.MDDatabaseMeta
}
CheckCtx contains all parameters used in CheckRequirements
type ChunkFlushStatus ¶
type ChunkFlushStatus interface {
Flushed() bool
}
ChunkFlushStatus is the status of a chunk flush.
type ClosedEngine ¶
type ClosedEngine struct {
// contains filtered or unexported fields
}
ClosedEngine represents a closed engine, allowing ingestion into the target. This type is goroutine safe: you can share an instance and execute any method anywhere.
func NewClosedEngine ¶
NewClosedEngine creates a new ClosedEngine.
func (*ClosedEngine) Cleanup ¶
func (engine *ClosedEngine) Cleanup(ctx context.Context) error
Cleanup deletes the intermediate data from target.
func (*ClosedEngine) Import ¶
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error
Import the data written to the engine into the target.
func (*ClosedEngine) Logger ¶
func (engine *ClosedEngine) Logger() log.Logger
Logger returns the logger for the engine.
type EngineConfig ¶
type EngineConfig struct { // TableInfo is the corresponding tidb table info TableInfo *checkpoints.TidbTableInfo // local backend specified configuration Local LocalEngineConfig // local backend external engine specified configuration External *ExternalEngineConfig // KeepSortDir indicates whether to keep the temporary sort directory // when opening the engine, instead of removing it. KeepSortDir bool // TS is the preset timestamp of data in the engine. When it's 0, the used TS // will be set lazily. This is used by local backend. This field will be written // to engineMeta.TS and take effect in below cases: // - engineManager.openEngine // - engineManager.closeEngine only for an external engine TS uint64 }
EngineConfig defines configuration used for open engine
type EngineFileSize ¶
type EngineFileSize struct { // UUID is the engine's UUID. UUID uuid.UUID // DiskSize is the estimated total file size on disk right now. DiskSize int64 // MemSize is the total memory size used by the engine. This is the // estimated additional size saved onto disk after calling Flush(). MemSize int64 // IsImporting indicates whether the engine performing Import(). IsImporting bool }
EngineFileSize represents the size of an engine on disk and in memory.
type EngineManager ¶
type EngineManager struct {
// contains filtered or unexported fields
}
EngineManager is the manager of engines. this is a wrapper of Backend, which provides some common methods for managing engines. and it has no states, can be created on demand
func MakeEngineManager ¶
func MakeEngineManager(ab Backend) EngineManager
MakeEngineManager creates a new Backend from an Backend.
func (EngineManager) OpenEngine ¶
func (be EngineManager) OpenEngine( ctx context.Context, config *EngineConfig, tableName string, engineID int32, ) (*OpenedEngine, error)
OpenEngine opens an engine with the given table name and engine ID.
func (EngineManager) UnsafeCloseEngine ¶
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error)
UnsafeCloseEngine closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.
func (EngineManager) UnsafeCloseEngineWithUUID ¶
func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error)
UnsafeCloseEngineWithUUID closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.
type EngineWriter ¶
type EngineWriter interface { AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error IsSynced() bool Close(ctx context.Context) (ChunkFlushStatus, error) }
EngineWriter is the interface for writing data to an engine.
type ExternalEngineConfig ¶
type ExternalEngineConfig struct { StorageURI string DataFiles []string StatFiles []string StartKey []byte EndKey []byte JobKeys [][]byte SplitKeys [][]byte // TotalFileSize can be an estimated value. TotalFileSize int64 // TotalKVCount can be an estimated value. TotalKVCount int64 CheckHotspot bool }
ExternalEngineConfig is the configuration used for local backend external engine.
type LocalEngineConfig ¶
type LocalEngineConfig struct { // compact small SSTs before ingest into pebble Compact bool // raw kvs size threshold to trigger compact CompactThreshold int64 // compact routine concurrency CompactConcurrency int // blocksize BlockSize int }
LocalEngineConfig is the configuration used for local backend in OpenEngine.
type LocalWriterConfig ¶
type LocalWriterConfig struct { // Local backend specified configuration Local struct { // is the chunk KV written to this LocalWriter sent in order IsKVSorted bool // MemCacheSize specifies the estimated memory cache limit used by this local // writer. It has higher priority than BackendConfig.LocalWriterMemCacheSize if // set. MemCacheSize int64 } // TiDB backend specified configuration TiDB struct { TableName string } }
LocalWriterConfig defines the configuration to open a LocalWriter
type OpenedEngine ¶
type OpenedEngine struct {
// contains filtered or unexported fields
}
OpenedEngine is an opened engine, allowing data to be written via WriteRows. This type is goroutine safe: you can share an instance and execute any method anywhere.
func (*OpenedEngine) Close ¶
func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error)
Close the opened engine to prepare it for importing.
func (*OpenedEngine) Flush ¶
func (engine *OpenedEngine) Flush(ctx context.Context) error
Flush current written data for local backend
func (*OpenedEngine) GetEngineUUID ¶
func (engine *OpenedEngine) GetEngineUUID() uuid.UUID
GetEngineUUID returns the engine UUID.
func (*OpenedEngine) LocalWriter ¶
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error)
LocalWriter returns a writer that writes to the local backend.
type TargetInfoGetter ¶
type TargetInfoGetter interface { // FetchRemoteDBModels obtains the models of all databases. Currently, only // the database name is filled. FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) // FetchRemoteTableModels obtains the TableInfo of given tables under the schema // name. It returns a map whose key is the table name in lower case and value is // the TableInfo. If the table does not exist, it will not be included in the // map. // // The returned table info does not need to be precise if the encoder, is not // requiring them, but must at least fill in the following fields for // TablesFromMeta to succeed: // - Name // - State (must be model.StatePublic) // - ID // - Columns // * Name // * State (must be model.StatePublic) // * Offset (must be 0, 1, 2, ...) // - PKIsHandle (true = do not generate _tidb_rowid) FetchRemoteTableModels(ctx context.Context, schemaName string, tableNames []string) (map[string]*model.TableInfo, error) // CheckRequirements performs the check whether the backend satisfies the version requirements CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error }
TargetInfoGetter defines the interfaces to get target information.