backend

package
v3.0.20+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2020 License: Apache-2.0 Imports: 69 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LocalMemoryTableSize = 512 << 20
)
View Source
const (
	SplitRetryTimes = 8
)

Variables

This section is empty.

Functions

func Compact

func Compact(ctx context.Context, tls *common.TLS, tikvAddr string, level int32) error

Compact performs a leveled compaction with the given minimum level.

func FetchMode

func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error)

FetchMode obtains the import mode status of the TiKV node.

func FetchModeFromMetrics

func FetchModeFromMetrics(metrics string) (import_sstpb.SwitchMode, error)

FetchMode obtains the import mode status from the Prometheus metrics of a TiKV node.

func ForAllStores

func ForAllStores(
	ctx context.Context,
	tls *common.TLS,
	minState StoreState,
	action func(c context.Context, store *Store) error,
) error

ForAllStores executes `action` in parallel for all TiKV stores connected to a PD server given by the HTTPS client `tls`.

Returns the first non-nil error returned in all `action` calls. If all `action` returns nil, this method would return nil as well.

The `minState` argument defines the minimum store state to be included in the result (Tombstone < Offline < Down < Disconnected < Up).

func GetSystemRLimit

func GetSystemRLimit() (uint64, error)

func MakeUUID

func MakeUUID(tableName string, engineID int32) (string, uuid.UUID)

func NewPanickingAllocators

func NewPanickingAllocators(base int64) autoid.Allocators

NewPanickingAllocator creates a PanickingAllocator shared by all allocation types.

func NewTransaction

func NewTransaction() *transaction

func SwitchMode

func SwitchMode(ctx context.Context, tls *common.TLS, tikvAddr string, mode import_sstpb.SwitchMode) error

SwitchMode changes the TiKV node at the given address to a particular mode.

func VerifyRLimit

func VerifyRLimit(estimateMaxFiles uint64) error

VerifyRLimit checks whether the open-file limit is large enough. In Local-backend, we need to read and write a lot of L0 SST files, so we need to check system max open files limit.

Types

type AbstractBackend

type AbstractBackend interface {
	// Close the connection to the backend.
	Close()

	// MakeEmptyRows creates an empty collection of encoded rows.
	MakeEmptyRows() Rows

	// RetryImportDelay returns the duration to sleep when retrying an import
	RetryImportDelay() time.Duration

	// MaxChunkSize returns the maximum size acceptable by the backend. The
	// value will be used in `Rows.SplitIntoChunks`.
	MaxChunkSize() int

	// ShouldPostProcess returns whether KV-specific post-processing should be
	// performed for this backend. Post-processing includes checksum and analyze.
	ShouldPostProcess() bool

	// NewEncoder creates an encoder of a TiDB table.
	NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error)

	OpenEngine(ctx context.Context, engineUUID uuid.UUID) error

	WriteRows(
		ctx context.Context,
		engineUUID uuid.UUID,
		tableName string,
		columnNames []string,
		commitTS uint64,
		rows Rows,
	) error

	CloseEngine(ctx context.Context, engineUUID uuid.UUID) error

	ImportEngine(ctx context.Context, engineUUID uuid.UUID) error

	CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

	// CheckRequirements performs the check whether the backend satisfies the
	// version requirements
	CheckRequirements(ctx context.Context) error

	// FetchRemoteTableModels obtains the models of all tables given the schema
	// name. The returned table info does not need to be precise if the encoder,
	// is not requiring them, but must at least fill in the following fields for
	// TablesFromMeta to succeed:
	//  - Name
	//  - State (must be model.StatePublic)
	//  - ID
	//  - Columns
	//     * Name
	//     * State (must be model.StatePublic)
	//     * Offset (must be 0, 1, 2, ...)
	//  - PKIsHandle (true = do not generate _tidb_rowid)
	FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)
}

AbstractBackend is the abstract interface behind Backend. Implementations of this interface must be goroutine safe: you can share an instance and execute any method anywhere.

type Backend

type Backend struct {
	// contains filtered or unexported fields
}

Backend is the delivery target for Lightning

func MakeBackend

func MakeBackend(ab AbstractBackend) Backend

func NewImporter

func NewImporter(ctx context.Context, tls *common.TLS, importServerAddr string, pdAddr string) (Backend, error)

NewImporter creates a new connection to tikv-importer. A single connection per tidb-lightning instance is enough.

func NewLocalBackend

func NewLocalBackend(
	ctx context.Context,
	tls *common.TLS,
	pdAddr string,
	regionSplitSize int64,
	localFile string,
	rangeConcurrency int,
	sendKVPairs int,
	enableCheckpoint bool,
	g glue.Glue,
	maxOpenFiles int,
) (Backend, error)

NewLocalBackend creates new connections to tikv.

func NewMockImporter

func NewMockImporter(cli kv.ImportKVClient, pdAddr string) Backend

NewMockImporter creates an *unconnected* importer based on a custom ImportKVClient. This is provided for testing only. Do not use this function outside of tests.

func NewTiDBBackend

func NewTiDBBackend(db *sql.DB, onDuplicate string) Backend

NewTiDBBackend creates a new TiDB backend using the given database.

The backend does not take ownership of `db`. Caller should close `db` manually after the backend expired.

func (Backend) CheckRequirements

func (be Backend) CheckRequirements(ctx context.Context) error

func (Backend) Close

func (be Backend) Close()

func (Backend) FetchRemoteTableModels

func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

func (Backend) MakeEmptyRows

func (be Backend) MakeEmptyRows() Rows

func (Backend) NewEncoder

func (be Backend) NewEncoder(tbl table.Table, options *SessionOptions) (Encoder, error)

func (Backend) OpenEngine

func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error)

OpenEngine opens an engine with the given table name and engine ID.

func (Backend) ShouldPostProcess

func (be Backend) ShouldPostProcess() bool

func (Backend) UnsafeCloseEngine

func (be Backend) UnsafeCloseEngine(ctx context.Context, tableName string, engineID int32) (*ClosedEngine, error)

UnsafeCloseEngine closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.

func (Backend) UnsafeCloseEngineWithUUID

func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, tag string, engineUUID uuid.UUID) (*ClosedEngine, error)

UnsafeCloseEngineWithUUID closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.

type ClosedEngine

type ClosedEngine struct {
	// contains filtered or unexported fields
}

ClosedEngine represents a closed engine, allowing ingestion into the target. This type is goroutine safe: you can share an instance and execute any method anywhere.

func (*ClosedEngine) Cleanup

func (engine *ClosedEngine) Cleanup(ctx context.Context) error

Cleanup deletes the intermediate data from target.

func (*ClosedEngine) Import

func (engine *ClosedEngine) Import(ctx context.Context) error

Import the data written to the engine into the target.

func (*ClosedEngine) Logger

func (engine *ClosedEngine) Logger() log.Logger

type Encoder

type Encoder interface {
	// Close the encoder.
	Close()

	// Encode encodes a row of SQL values into a backend-friendly format.
	Encode(
		logger log.Logger,
		row []types.Datum,
		rowID int64,
		columnPermutation []int,
	) (Row, error)
}

Encoder encodes a row of SQL values into some opaque type which can be consumed by OpenEngine.WriteEncoded.

func NewTableKVEncoder

func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error)

type LocalFile

type LocalFile struct {
	Uuid uuid.UUID
	// contains filtered or unexported fields
}

func (*LocalFile) Cleanup

func (e *LocalFile) Cleanup(dataDir string) error

Cleanup remove meta and db files

func (*LocalFile) Close

func (e *LocalFile) Close() error

type OpenedEngine

type OpenedEngine struct {
	// contains filtered or unexported fields
}

OpenedEngine is an opened engine, allowing data to be written via WriteRows. This type is goroutine safe: you can share an instance and execute any method anywhere.

func (*OpenedEngine) Close

func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error)

Close the opened engine to prepare it for importing.

func (*OpenedEngine) Flush

func (engine *OpenedEngine) Flush() error

Flush current written data for local backend

func (*OpenedEngine) WriteRows

func (engine *OpenedEngine) WriteRows(ctx context.Context, columnNames []string, rows Rows) error

WriteRows writes a collection of encoded rows into the engine.

type Range

type Range struct {
	// contains filtered or unexported fields
}

Range record start and end key for localStoreDir.DB so we can write it to tikv in streaming

type RangePropertiesCollector

type RangePropertiesCollector struct {
	// contains filtered or unexported fields
}

func (*RangePropertiesCollector) Add

func (c *RangePropertiesCollector) Add(key pebble.InternalKey, value []byte) error

implement `pebble.TablePropertyCollector` implement `TablePropertyCollector.Add`

func (*RangePropertiesCollector) Finish

func (c *RangePropertiesCollector) Finish(userProps map[string]string) error

func (*RangePropertiesCollector) Name

func (c *RangePropertiesCollector) Name() string

The name of the property collector.

type Row

type Row interface {
	// ClassifyAndAppend separates the data-like and index-like parts of the
	// encoded row, and appends these parts into the existing buffers and
	// checksums.
	ClassifyAndAppend(
		data *Rows,
		dataChecksum *verification.KVChecksum,
		indices *Rows,
		indexChecksum *verification.KVChecksum,
	)
}

Row represents a single encoded row.

func MakeRowFromKvPairs

func MakeRowFromKvPairs(pairs []common.KvPair) Row

MakeRowFromKvPairs converts a KvPair slice into a Row instance. This is mainly used for testing only. The resulting Row instance should only be used for the importer backend.

type Rows

type Rows interface {
	// SplitIntoChunks splits the rows into multiple consecutive parts, each
	// part having total byte size less than `splitSize`. The meaning of "byte
	// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
	SplitIntoChunks(splitSize int) []Rows

	// Clear returns a new collection with empty content. It may share the
	// capacity with the current instance. The typical usage is `x = x.Clear()`.
	Clear() Rows
}

Rows represents a collection of encoded rows.

func MakeRowsFromKvPairs

func MakeRowsFromKvPairs(pairs []common.KvPair) Rows

MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is mainly used for testing only. The resulting Rows instance should only be used for the importer backend.

type SessionOptions

type SessionOptions struct {
	SQLMode   mysql.SQLMode
	Timestamp int64
	SysVars   map[string]string
	// a seed used for tableKvEncoder's auto random bits value
	AutoRandomSeed int64
}

SessionOptions is the initial configuration of the session.

type Store

type Store struct {
	Address string
	Version string
	State   StoreState `json:"state_name"`
}

Store contains metadata about a TiKV store.

type StoreState

type StoreState int

StoreState is the state of a TiKV store. The numerical value is sorted by the store's accessibility (Tombstone < Down < Disconnected < Offline < Up).

The meaning of each state can be found from PingCAP's documentation at https://pingcap.com/docs/v3.0/how-to/scale/horizontally/#delete-a-node-dynamically-1

const (
	// StoreStateUp means the TiKV store is in service.
	StoreStateUp StoreState = -iota
	// StoreStateOffline means the TiKV store is in the process of being taken
	// offline (but is still accessible).
	StoreStateOffline
	// StoreStateDisconnected means the TiKV store does not respond to PD.
	StoreStateDisconnected
	// StoreStateDown means the TiKV store does not respond to PD for a long
	// time (> 30 minutes).
	StoreStateDown
	// StoreStateTombstone means the TiKV store is shut down and the data has
	// been evacuated. Lightning should never interact with stores in this
	// state.
	StoreStateTombstone
)

func (*StoreState) UnmarshalJSON

func (ss *StoreState) UnmarshalJSON(content []byte) error

UnmarshalJSON implements the json.Unmarshaler interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL