Documentation ¶
Index ¶
- Constants
- func Compact(ctx context.Context, tls *common.TLS, tikvAddr string, level int32) error
- func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error)
- func FetchModeFromMetrics(metrics string) (import_sstpb.SwitchMode, error)
- func ForAllStores(ctx context.Context, tls *common.TLS, minState StoreState, ...) error
- func GetSystemRLimit() (uint64, error)
- func MakeUUID(tableName string, engineID int32) (string, uuid.UUID)
- func NewPanickingAllocators(base int64) autoid.Allocators
- func SwitchMode(ctx context.Context, tls *common.TLS, tikvAddr string, ...) error
- func VerifyRLimit(estimateMaxFiles uint64) error
- type AbstractBackend
- type Backend
- func MakeBackend(ab AbstractBackend) Backend
- func NewImporter(ctx context.Context, tls *common.TLS, importServerAddr string, pdAddr string) (Backend, error)
- func NewLocalBackend(ctx context.Context, tls *common.TLS, pdAddr string, cfg *config.TikvImporter, ...) (Backend, error)
- func NewMockImporter(cli kv.ImportKVClient, pdAddr string) Backend
- func NewTiDBBackend(db *sql.DB, onDuplicate string) Backend
- func (be Backend) CheckDiskQuota(quota int64) (largeEngines []uuid.UUID, inProgressLargeEngines int, totalDiskSize int64, ...)
- func (be Backend) CheckRequirements(ctx context.Context) error
- func (be Backend) Close()
- func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)
- func (be Backend) FlushAll(ctx context.Context) error
- func (be Backend) MakeEmptyRows() Rows
- func (be Backend) NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error)
- func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32, ts uint64) (*OpenedEngine, error)
- func (be Backend) ShouldPostProcess() bool
- func (be Backend) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int32) (*ClosedEngine, error)
- func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, tag string, engineUUID uuid.UUID) (*ClosedEngine, error)
- func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID) error
- type ClosedEngine
- type Encoder
- type EngineFileSize
- type EngineWriter
- type ImporterWriter
- type LocalEngineWriter
- type LocalFile
- type LocalWriter
- type OpenedEngine
- type Range
- type RangePropertiesCollector
- type Row
- type Rows
- type SessionOptions
- type Store
- type StoreState
- type TiDBWriter
Constants ¶
const (
SplitRetryTimes = 8
)
Variables ¶
This section is empty.
Functions ¶
func FetchMode ¶
func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error)
FetchMode obtains the import mode status of the TiKV node.
func FetchModeFromMetrics ¶
func FetchModeFromMetrics(metrics string) (import_sstpb.SwitchMode, error)
FetchMode obtains the import mode status from the Prometheus metrics of a TiKV node.
func ForAllStores ¶
func ForAllStores( ctx context.Context, tls *common.TLS, minState StoreState, action func(c context.Context, store *Store) error, ) error
ForAllStores executes `action` in parallel for all TiKV stores connected to a PD server given by the HTTPS client `tls`.
Returns the first non-nil error returned in all `action` calls. If all `action` returns nil, this method would return nil as well.
The `minState` argument defines the minimum store state to be included in the result (Tombstone < Offline < Down < Disconnected < Up).
func GetSystemRLimit ¶
func NewPanickingAllocators ¶
func NewPanickingAllocators(base int64) autoid.Allocators
NewPanickingAllocator creates a PanickingAllocator shared by all allocation types.
func SwitchMode ¶
func SwitchMode(ctx context.Context, tls *common.TLS, tikvAddr string, mode import_sstpb.SwitchMode) error
SwitchMode changes the TiKV node at the given address to a particular mode.
func VerifyRLimit ¶
VerifyRLimit checks whether the open-file limit is large enough. In Local-backend, we need to read and write a lot of L0 SST files, so we need to check system max open files limit.
Types ¶
type AbstractBackend ¶
type AbstractBackend interface { // Close the connection to the backend. Close() // MakeEmptyRows creates an empty collection of encoded rows. MakeEmptyRows() Rows // RetryImportDelay returns the duration to sleep when retrying an import RetryImportDelay() time.Duration // ShouldPostProcess returns whether KV-specific post-processing should be // performed for this backend. Post-processing includes checksum and analyze. ShouldPostProcess() bool // NewEncoder creates an encoder of a TiDB table. NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error) OpenEngine(ctx context.Context, engineUUID uuid.UUID) error CloseEngine(ctx context.Context, engineUUID uuid.UUID) error ImportEngine(ctx context.Context, engineUUID uuid.UUID) error CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error // CheckRequirements performs the check whether the backend satisfies the // version requirements CheckRequirements(ctx context.Context) error // FetchRemoteTableModels obtains the models of all tables given the schema // name. The returned table info does not need to be precise if the encoder, // is not requiring them, but must at least fill in the following fields for // TablesFromMeta to succeed: // - Name // - State (must be model.StatePublic) // - ID // - Columns // * Name // * State (must be model.StatePublic) // * Offset (must be 0, 1, 2, ...) // - PKIsHandle (true = do not generate _tidb_rowid) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) // FlushEngine ensures all KV pairs written to an open engine has been // synchronized, such that kill-9'ing Lightning afterwards and resuming from // checkpoint can recover the exact same content. // // This method is only relevant for local backend, and is no-op for all // other backends. FlushEngine(ctx context.Context, engineUUID uuid.UUID) error // FlushAllEngines performs FlushEngine on all opened engines. This is a // very expensive operation and should only be used in some rare situation // (e.g. preparing to resolve a disk quota violation). FlushAllEngines(ctx context.Context) error // EngineFileSizes obtains the size occupied locally of all engines managed // by this backend. This method is used to compute disk quota. // It can return nil if the content are all stored remotely. EngineFileSizes() []EngineFileSize // ResetEngine clears all written KV pairs in this opened engine. ResetEngine(ctx context.Context, engineUUID uuid.UUID) error // LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine. LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error) }
AbstractBackend is the abstract interface behind Backend. Implementations of this interface must be goroutine safe: you can share an instance and execute any method anywhere.
type Backend ¶
type Backend struct {
// contains filtered or unexported fields
}
Backend is the delivery target for Lightning
func MakeBackend ¶
func MakeBackend(ab AbstractBackend) Backend
func NewImporter ¶
func NewImporter(ctx context.Context, tls *common.TLS, importServerAddr string, pdAddr string) (Backend, error)
NewImporter creates a new connection to tikv-importer. A single connection per tidb-lightning instance is enough.
func NewLocalBackend ¶
func NewLocalBackend( ctx context.Context, tls *common.TLS, pdAddr string, cfg *config.TikvImporter, enableCheckpoint bool, g glue.Glue, maxOpenFiles int, ) (Backend, error)
NewLocalBackend creates new connections to tikv.
func NewMockImporter ¶
func NewMockImporter(cli kv.ImportKVClient, pdAddr string) Backend
NewMockImporter creates an *unconnected* importer based on a custom ImportKVClient. This is provided for testing only. Do not use this function outside of tests.
func NewTiDBBackend ¶
NewTiDBBackend creates a new TiDB backend using the given database.
The backend does not take ownership of `db`. Caller should close `db` manually after the backend expired.
func (Backend) CheckDiskQuota ¶
func (be Backend) CheckDiskQuota(quota int64) ( largeEngines []uuid.UUID, inProgressLargeEngines int, totalDiskSize int64, totalMemSize int64, )
CheckDiskQuota verifies if the total engine file size is below the given quota. If the quota is exceeded, this method returns an array of engines, which after importing can decrease the total size below quota.
func (Backend) FetchRemoteTableModels ¶
func (Backend) MakeEmptyRows ¶
func (Backend) NewEncoder ¶
func (Backend) OpenEngine ¶
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32, ts uint64) (*OpenedEngine, error)
OpenEngine opens an engine with the given table name and engine ID.
func (Backend) ShouldPostProcess ¶
func (Backend) UnsafeCloseEngine ¶
func (be Backend) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int32) (*ClosedEngine, error)
UnsafeCloseEngine closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.
func (Backend) UnsafeCloseEngineWithUUID ¶
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, tag string, engineUUID uuid.UUID) (*ClosedEngine, error)
UnsafeCloseEngineWithUUID closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.
func (Backend) UnsafeImportAndReset ¶
UnsafeImportAndReset forces the backend to import the content of an engine into the target and then reset the engine to empty. This method will not close the engine. Make sure the engine is flushed manually before calling this method.
type ClosedEngine ¶
type ClosedEngine struct {
// contains filtered or unexported fields
}
ClosedEngine represents a closed engine, allowing ingestion into the target. This type is goroutine safe: you can share an instance and execute any method anywhere.
func (*ClosedEngine) Cleanup ¶
func (engine *ClosedEngine) Cleanup(ctx context.Context) error
Cleanup deletes the intermediate data from target.
func (*ClosedEngine) Import ¶
func (engine *ClosedEngine) Import(ctx context.Context) error
Import the data written to the engine into the target.
func (*ClosedEngine) Logger ¶
func (engine *ClosedEngine) Logger() log.Logger
type Encoder ¶
type Encoder interface { // Close the encoder. Close() // Encode encodes a row of SQL values into a backend-friendly format. Encode( logger log.Logger, row []types.Datum, rowID int64, columnPermutation []int, ) (Row, error) }
Encoder encodes a row of SQL values into some opaque type which can be consumed by OpenEngine.WriteEncoded.
func NewTableKVEncoder ¶
func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error)
type EngineFileSize ¶
type EngineFileSize struct { // UUID is the engine's UUID. UUID uuid.UUID // DiskSize is the estimated total file size on disk right now. DiskSize int64 // MemSize is the total memory size used by the engine. This is the // estimated additional size saved onto disk after calling Flush(). MemSize int64 // IsImporting indicates whether the engine performing Import(). IsImporting bool }
type EngineWriter ¶
type ImporterWriter ¶
type ImporterWriter struct {
// contains filtered or unexported fields
}
func (*ImporterWriter) AppendRows ¶
func (*ImporterWriter) Close ¶
func (w *ImporterWriter) Close() error
type LocalEngineWriter ¶
type LocalEngineWriter struct {
// contains filtered or unexported fields
}
func (*LocalEngineWriter) Close ¶
func (w *LocalEngineWriter) Close() error
type LocalWriter ¶
type LocalWriter struct {
// contains filtered or unexported fields
}
func (*LocalWriter) AppendRows ¶
func (w *LocalWriter) AppendRows(ctx context.Context, tableName string, columnNames []string, ts uint64, rows Rows) error
TODO: replace the implementation back with `AppendRowsAsync` after addressing the performance issue.
func (*LocalWriter) AppendRowsAsync ¶
func (w *LocalWriter) AppendRowsAsync(ctx context.Context, tableName string, columnNames []string, ts uint64, rows Rows) error
TODO: temporarily replace this async append rows with the former write-batch approach before addressing the performance issue.
func (*LocalWriter) Close ¶
func (w *LocalWriter) Close() error
type OpenedEngine ¶
type OpenedEngine struct {
// contains filtered or unexported fields
}
OpenedEngine is an opened engine, allowing data to be written via WriteRows. This type is goroutine safe: you can share an instance and execute any method anywhere.
func (*OpenedEngine) Close ¶
func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error)
Close the opened engine to prepare it for importing.
func (*OpenedEngine) Flush ¶
func (engine *OpenedEngine) Flush(ctx context.Context) error
Flush current written data for local backend
func (*OpenedEngine) LocalWriter ¶
func (engine *OpenedEngine) LocalWriter(ctx context.Context) (*LocalEngineWriter, error)
type Range ¶
type Range struct {
// contains filtered or unexported fields
}
Range record start and end key for localStoreDir.DB so we can write it to tikv in streaming
type RangePropertiesCollector ¶
type RangePropertiesCollector struct {
// contains filtered or unexported fields
}
func (*RangePropertiesCollector) Add ¶
func (c *RangePropertiesCollector) Add(key pebble.InternalKey, value []byte) error
implement `pebble.TablePropertyCollector` implement `TablePropertyCollector.Add`
func (*RangePropertiesCollector) Finish ¶
func (c *RangePropertiesCollector) Finish(userProps map[string]string) error
func (*RangePropertiesCollector) Name ¶
func (c *RangePropertiesCollector) Name() string
The name of the property collector.
type Row ¶
type Row interface { // ClassifyAndAppend separates the data-like and index-like parts of the // encoded row, and appends these parts into the existing buffers and // checksums. ClassifyAndAppend( data *Rows, dataChecksum *verification.KVChecksum, indices *Rows, indexChecksum *verification.KVChecksum, ) }
Row represents a single encoded row.
func MakeRowFromKvPairs ¶
MakeRowFromKvPairs converts a KvPair slice into a Row instance. This is mainly used for testing only. The resulting Row instance should only be used for the importer backend.
type Rows ¶
type Rows interface { // SplitIntoChunks splits the rows into multiple consecutive parts, each // part having total byte size less than `splitSize`. The meaning of "byte // size" should be consistent with the value used in `Row.ClassifyAndAppend`. SplitIntoChunks(splitSize int) []Rows // Clear returns a new collection with empty content. It may share the // capacity with the current instance. The typical usage is `x = x.Clear()`. Clear() Rows }
Rows represents a collection of encoded rows.
func MakeRowsFromKvPairs ¶
MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is mainly used for testing only. The resulting Rows instance should only be used for the importer backend.
type SessionOptions ¶
type SessionOptions struct { SQLMode mysql.SQLMode Timestamp int64 SysVars map[string]string // a seed used for tableKvEncoder's auto random bits value AutoRandomSeed int64 }
SessionOptions is the initial configuration of the session.
type Store ¶
type Store struct { Address string Version string State StoreState `json:"state_name"` }
Store contains metadata about a TiKV store.
type StoreState ¶
type StoreState int
StoreState is the state of a TiKV store. The numerical value is sorted by the store's accessibility (Tombstone < Down < Disconnected < Offline < Up).
The meaning of each state can be found from PingCAP's documentation at https://pingcap.com/docs/v3.0/how-to/scale/horizontally/#delete-a-node-dynamically-1
const ( // StoreStateUp means the TiKV store is in service. StoreStateUp StoreState = -iota // StoreStateOffline means the TiKV store is in the process of being taken // offline (but is still accessible). StoreStateOffline // StoreStateDisconnected means the TiKV store does not respond to PD. StoreStateDisconnected // StoreStateDown means the TiKV store does not respond to PD for a long // time (> 30 minutes). StoreStateDown // StoreStateTombstone means the TiKV store is shut down and the data has // been evacuated. Lightning should never interact with stores in this // state. StoreStateTombstone )
func (*StoreState) UnmarshalJSON ¶
func (ss *StoreState) UnmarshalJSON(content []byte) error
UnmarshalJSON implements the json.Unmarshaler interface.
type TiDBWriter ¶
type TiDBWriter struct {
// contains filtered or unexported fields
}
func (*TiDBWriter) AppendRows ¶
func (*TiDBWriter) Close ¶
func (w *TiDBWriter) Close() error