common

package
v1.1.0-beta.0...-d5b6b69 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2024 License: Apache-2.0 Imports: 52 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// IndexEngineID is the engine ID for index engine.
	IndexEngineID = -1
)

Variables

View Source
var (
	ErrUnknown         = errors.Normalize("unknown error", errors.RFCCodeText("Lightning:Common:ErrUnknown"))
	ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("Lightning:Common:ErrInvalidArgument"))
	ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("Lightning:Common:ErrVersionMismatch"))

	ErrReadConfigFile     = errors.Normalize("cannot read config file '%s'", errors.RFCCodeText("Lightning:Config:ErrReadConfigFile"))
	ErrParseConfigFile    = errors.Normalize("cannot parse config file '%s'", errors.RFCCodeText("Lightning:Config:ErrParseConfigFile"))
	ErrInvalidConfig      = errors.Normalize("invalid config", errors.RFCCodeText("Lightning:Config:ErrInvalidConfig"))
	ErrInvalidTLSConfig   = errors.Normalize("invalid tls config", errors.RFCCodeText("Lightning:Config:ErrInvalidTLSConfig"))
	ErrInvalidSortedKVDir = errors.Normalize("invalid sorted-kv-dir '%s' for local backend, please change the config or delete the path", errors.RFCCodeText("Lightning:Config:ErrInvalidSortedKVDir"))

	ErrStorageUnknown       = errors.Normalize("unknown storage error", errors.RFCCodeText("Lightning:Storage:ErrStorageUnknown"))
	ErrInvalidPermission    = errors.Normalize("invalid permission", errors.RFCCodeText("Lightning:Storage:ErrInvalidPermission"))
	ErrInvalidStorageConfig = errors.Normalize("invalid data-source-dir", errors.RFCCodeText("Lightning:Storage:ErrInvalidStorageConfig"))
	ErrEmptySourceDir       = errors.Normalize("data-source-dir '%s' doesn't exist or contains no files", errors.RFCCodeText("Lightning:Storage:ErrEmptySourceDir"))

	ErrTableRoute         = errors.Normalize("table route error", errors.RFCCodeText("Lightning:Loader:ErrTableRoute"))
	ErrInvalidSchemaFile  = errors.Normalize("invalid schema file", errors.RFCCodeText("Lightning:Loader:ErrInvalidSchemaFile"))
	ErrTooManySourceFiles = errors.Normalize("too many source files", errors.RFCCodeText("Lightning:Loader:ErrTooManySourceFiles"))

	ErrSystemRequirementNotMet    = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet"))
	ErrCheckpointSchemaConflict   = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict"))
	ErrPreCheckFailed             = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed"))
	ErrCheckClusterRegion         = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion"))
	ErrCheckLocalResource         = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource"))
	ErrCheckTableEmpty            = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty"))
	ErrCheckCSVHeader             = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader"))
	ErrCheckDataSource            = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource"))
	ErrCheckCDCPiTR               = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR"))
	ErrCheckPDTiDBFromSameCluster = errors.Normalize("check PD and TiDB in the same cluster error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckPDTiDBSameCluster"))

	ErrOpenCheckpoint          = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint"))
	ErrReadCheckpoint          = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint"))
	ErrUpdateCheckpoint        = errors.Normalize("update checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrUpdateCheckpoint"))
	ErrUnknownCheckpointDriver = errors.Normalize("unknown checkpoint driver '%s'", errors.RFCCodeText("Lightning:Checkpoint:ErrUnknownCheckpointDriver"))
	ErrInvalidCheckpoint       = errors.Normalize("invalid checkpoint", errors.RFCCodeText("Lightning:Checkpoint:ErrInvalidCheckpoint"))
	ErrCheckpointNotFound      = errors.Normalize("checkpoint not found", errors.RFCCodeText("Lightning:Checkpoint:ErrCheckpointNotFound"))
	ErrInitCheckpoint          = errors.Normalize("init checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrInitCheckpoint"))
	ErrCleanCheckpoint         = errors.Normalize("clean checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrCleanCheckpoint"))

	ErrMetaMgrUnknown = errors.Normalize("unknown error occur on meta manager", errors.RFCCodeText("Lightning:MetaMgr:ErrMetaMgrUnknown"))

	ErrDBConnect       = errors.Normalize("failed to connect database", errors.RFCCodeText("Lightning:DB:ErrDBConnect"))
	ErrInitErrManager  = errors.Normalize("init error manager error", errors.RFCCodeText("Lightning:DB:ErrInitErrManager"))
	ErrInitMetaManager = errors.Normalize("init meta manager error", errors.RFCCodeText("Lightning:DB:ErrInitMetaManager"))

	ErrUpdatePD       = errors.Normalize("update pd error", errors.RFCCodeText("Lightning:PD:ErrUpdatePD"))
	ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient"))
	ErrPauseGC        = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC"))

	ErrCheckKVVersion        = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion"))
	ErrCreateKVClient        = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient"))
	ErrCheckMultiIngest      = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest"))
	ErrKVEpochNotMatch       = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch"))
	ErrKVNotLeader           = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader"))
	ErrKVServerIsBusy        = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy"))
	ErrKVRegionNotFound      = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound"))
	ErrKVReadIndexNotReady   = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady"))
	ErrKVIngestFailed        = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed"))
	ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped"))
	ErrNoLeader              = errors.Normalize("write to tikv with no leader returned, region '%d', leader: %d", errors.RFCCodeText("Lightning:KV:ErrNoLeader"))

	ErrUnknownBackend       = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend"))
	ErrCheckLocalFile       = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile"))
	ErrOpenDuplicateDB      = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB"))
	ErrSchemaNotExists      = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists"))
	ErrInvalidSchemaStmt    = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt"))
	ErrCreateSchema         = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema"))
	ErrUnknownColumns       = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns"))
	ErrChecksumMismatch     = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch"))
	ErrRestoreTable         = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable"))
	ErrEncodeKV             = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV"))
	ErrAllocTableRowIDs     = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs"))
	ErrInvalidMetaStatus    = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus"))
	ErrTableIsChecksuming   = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming"))
	ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows"))
	// ErrFoundDuplicateKeys shoud be replaced with ErrFoundDataConflictRecords and ErrFoundIndexConflictRecords (TODO)
	ErrFoundDuplicateKeys        = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey"))
	ErrAddIndexFailed            = errors.Normalize("add index on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrAddIndexFailed"))
	ErrDropIndexFailed           = errors.Normalize("drop index %s on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrDropIndexFailed"))
	ErrFoundDataConflictRecords  = errors.Normalize("found data conflict records in table %s, primary key is '%s', row data is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDataConflictRecords"))
	ErrFoundIndexConflictRecords = errors.Normalize("found index conflict records in table %s, index name is '%s', unique key is '%s', primary key is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundIndexConflictRecords"))
)

error definitions

View Source
var DefaultImportVariablesTiDB = map[string]string{
	"tidb_row_format_version": "1",
}

DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system variables from downstream in local/importer backend. The values record the default values if missing.

View Source
var DefaultImportantVariables = map[string]string{
	"max_allowed_packet":      "67108864",
	"div_precision_increment": "4",
	"time_zone":               "SYSTEM",
	"lc_time_names":           "en_US",
	"default_week_format":     "0",
	"block_encryption_mode":   "aes-128-ecb",
	"group_concat_max_len":    "1024",
	"tidb_backoff_weight":     "6",
}

DefaultImportantVariables is used in ObtainImportantVariables to retrieve the system variables from downstream which may affect KV encode result. The values record the default values if missing.

View Source
var ErrWriteTooSlow = errors.New("write too slow, maybe gRPC is blocked forever")

ErrWriteTooSlow is used to get rid of the gRPC blocking issue. there are some strange blocking issues of gRPC like https://github.com/pingcap/tidb/issues/48352 https://github.com/pingcap/tidb/issues/46321 and I don't know why 😭

View Source
var (
	// MinRowID is the minimum rowID value after DupDetectKeyAdapter.Encode().
	MinRowID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0}
)

Functions

func BuildAddIndexSQL

func BuildAddIndexSQL(
	tableName string,
	curTblInfo,
	desiredTblInfo *model.TableInfo,
) (singleSQL string, multiSQLs []string)

BuildAddIndexSQL builds the SQL statement to create missing indexes. It returns both a single SQL statement that creates all indexes at once, and a list of SQL statements that creates each index individually.

func BuildDropIndexSQL

func BuildDropIndexSQL(dbName, tableName string, idxInfo *model.IndexInfo) string

BuildDropIndexSQL builds the SQL statement to drop index.

func ConnectMySQL

func ConnectMySQL(cfg *mysql.Config) (*sql.DB, error)

ConnectMySQL connects MySQL with the dsn. If access is denied and the password is a valid base64 encoding, we will try to connect MySQL with the base64 decoding of the password.

func EncodeIntRowID

func EncodeIntRowID(rowID int64) []byte

EncodeIntRowID encodes an int64 row id.

func EscapeIdentifier

func EscapeIdentifier(identifier string) string

EscapeIdentifier quote and escape an sql identifier

func FprintfWithIdentifiers

func FprintfWithIdentifiers(w io.Writer, format string, identifiers ...string) (int, error)

FprintfWithIdentifiers escapes the identifiers and fprintf them. The input identifiers must not be escaped.

func GetAutoRandomColumn

func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo

GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it. todo: better put in ddl package, but this will cause import cycle since ddl package import lightning

func GetBackoffWeightFromDB

func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error)

GetBackoffWeightFromDB gets the backoff weight from database.

func GetDropIndexInfos

func GetDropIndexInfos(
	tblInfo *model.TableInfo,
) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo)

GetDropIndexInfos returns the index infos that need to be dropped and the remain indexes.

func GetExplicitRequestSourceTypeFromDB

func GetExplicitRequestSourceTypeFromDB(ctx context.Context, db *sql.DB) (string, error)

GetExplicitRequestSourceTypeFromDB gets the explicit request source type from database.

func GetGlobalAutoIDAlloc

func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error)

GetGlobalAutoIDAlloc returns the autoID allocators for a table. export it for testing.

func GetJSON

func GetJSON(ctx context.Context, client *http.Client, url string, v any) error

GetJSON fetches a page and parses it as JSON. The parsed result will be stored into the `v`. The variable `v` must be a pointer to a type that can be unmarshalled from JSON.

Example:

client := &http.Client{}
var resp struct { IP string }
if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil {
	return errors.Trace(err)
}
fmt.Println(resp.IP)

func GetMaxAutoIDBase

func GetMaxAutoIDBase(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (int64, error)

GetMaxAutoIDBase returns the max auto ID base for a table.

func GetMockTLSUrl

func GetMockTLSUrl(tls *TLS) string

GetMockTLSUrl returns tls's host for mock test

func InterpolateMySQLString

func InterpolateMySQLString(s string) string

InterpolateMySQLString interpolates a string into a MySQL string literal.

func IsAccessDeniedNeedConfigPrivilegeError

func IsAccessDeniedNeedConfigPrivilegeError(err error) bool

IsAccessDeniedNeedConfigPrivilegeError checks if err is generated from a query to TiDB which failed due to missing CONFIG privilege.

func IsContextCanceledError

func IsContextCanceledError(err error) bool

IsContextCanceledError returns whether the error is caused by context cancellation. This function should only be used when the code logic is affected by whether the error is canceling or not.

This function returns `false` (not a context-canceled error) if `err == nil`.

func IsDirExists

func IsDirExists(name string) bool

IsDirExists checks if dir exists.

func IsDupKeyError

func IsDupKeyError(err error) bool

IsDupKeyError checks if err is a duplicate index error.

func IsEmptyDir

func IsEmptyDir(name string) bool

IsEmptyDir checks if dir is empty.

func IsFunctionNotExistErr

func IsFunctionNotExistErr(err error, functionName string) bool

IsFunctionNotExistErr checks if err is a function not exist error.

func IsRaftKV2

func IsRaftKV2(ctx context.Context, db *sql.DB) (bool, error)

IsRaftKV2 checks whether the raft-kv2 is enabled

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError returns whether the error is transient (e.g. network connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This function returns `false` (irrecoverable) if `err == nil`.

If the error is a multierr, returns true only if all suberrors are retryable.

func KillMySelf

func KillMySelf() error

KillMySelf sends sigint to current process, used in integration test only

Only works on Unix. Signaling on Windows is not supported.

func NormalizeError

func NormalizeError(err error) error

NormalizeError converts an arbitrary error to *errors.Error based above predefined errors. If the underlying err is already an *error.Error which is prefixed by "Lightning:", leave error ID unchanged. Otherwise, converts the error ID to Lightning's predefined error IDs.

func NormalizeOrWrapErr

func NormalizeOrWrapErr(rfcErr *errors.Error, err error, args ...any) error

NormalizeOrWrapErr tries to normalize err. If the returned error is ErrUnknown, wraps it with the given rfcErr.

func RebaseTableAllocators

func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, r autoid.Requirement, dbID int64,
	tblInfo *model.TableInfo) error

RebaseTableAllocators rebase the allocators of a table. This function only rebase a table allocator when its new base is given in `bases` param, else it will be skipped. base is the max id that have been used by the table, the next usable id will be base + 1, see Allocator.Alloc.

func Retry

func Retry(purpose string, parentLogger log.Logger, action func() error) error

Retry is shared by SQLWithRetry.perform, implementation of GlueCheckpointsDB and TiDB's glue implementation

func SameDisk

func SameDisk(dir1 string, dir2 string) (bool, error)

SameDisk is used to check dir1 and dir2 in the same disk.

func SchemaExists

func SchemaExists(ctx context.Context, db dbutil.QueryExecutor, schema string) (bool, error)

SchemaExists return whether schema with specified name exists.

func SprintfWithIdentifiers

func SprintfWithIdentifiers(format string, identifiers ...string) string

SprintfWithIdentifiers escapes the identifiers and sprintf them. The input identifiers must not be escaped.

func TableExists

func TableExists(ctx context.Context, db dbutil.QueryExecutor, schema, table string) (bool, error)

TableExists return whether table with specified name exists in target db

func TableHasAutoID

func TableHasAutoID(info *model.TableInfo) bool

TableHasAutoID return whether table has auto generated id.

func TableHasAutoRowID

func TableHasAutoRowID(info *model.TableInfo) bool

TableHasAutoRowID return whether table has auto generated row id

func ToGRPCDialOption

func ToGRPCDialOption(tls *tls.Config) grpc.DialOption

ToGRPCDialOption constructs a gRPC dial option from tls.Config.

func UniqueTable

func UniqueTable(schema string, table string) string

UniqueTable returns an unique table name.

func WriteMySQLIdentifier

func WriteMySQLIdentifier(builder *strings.Builder, identifier string)

WriteMySQLIdentifier writes a MySQL identifier into the string builder. Writes a MySQL identifier into the string builder. The identifier is always escaped into the form "`foo`".

Types

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

ConnPool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.

func NewConnPool

func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error),
	logger log.Logger) *ConnPool

NewConnPool creates a new connPool by the specified conn factory function and capacity.

func (*ConnPool) Close

func (p *ConnPool) Close()

Close closes the conn pool.

func (*ConnPool) TakeConns

func (p *ConnPool) TakeConns() (conns []*grpc.ClientConn)

TakeConns takes all connections from the pool.

type DataAndRanges

type DataAndRanges struct {
	Data         IngestData
	SortedRanges []Range
}

DataAndRanges is a pair of IngestData and list of Range. Each Range will become a regionJob, and the regionJob will read data from Data field.

type DupDetectKeyAdapter

type DupDetectKeyAdapter struct{}

DupDetectKeyAdapter is a key adapter that appends rowID to the key to avoid overwritten.

func (DupDetectKeyAdapter) Decode

func (DupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error)

Decode implements KeyAdapter.

func (DupDetectKeyAdapter) Encode

func (DupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte

Encode implements KeyAdapter.

func (DupDetectKeyAdapter) EncodedLen

func (DupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int

EncodedLen implements KeyAdapter.

type DupDetectOpt

type DupDetectOpt struct {
	ReportErrOnDup bool
}

DupDetectOpt is the option for duplicate detection.

type DupDetector

type DupDetector struct {
	// contains filtered or unexported fields
}

DupDetector extract the decoded key and value from the iter which may contain duplicate keys and store the keys encoded by KeyAdapter. The duplicate keys and values will be saved in dupDB.

func NewDupDetector

func NewDupDetector(
	keyAdaptor KeyAdapter,
	dupDBWriteBatch *pebble.Batch,
	logger log.Logger,
	option DupDetectOpt,
) *DupDetector

NewDupDetector creates a new DupDetector. dupDBWriteBatch will be closed when DupDetector is closed.

func (*DupDetector) Close

func (d *DupDetector) Close() error

Close closes the DupDetector.

func (*DupDetector) Init

func (d *DupDetector) Init(iter KVIter) (key []byte, val []byte, err error)

Init initializes the status of DupDetector by reading the current Key and Value of given iter.

func (*DupDetector) Next

func (d *DupDetector) Next(iter KVIter) (key []byte, value []byte, ok bool, err error)

Next reads the next key and value from given iter. If it meets duplicate key, it will record the duplicate key and value in dupDB and skip it.

type Engine

type Engine interface {
	// ID is the identifier of an engine.
	ID() string
	// LoadIngestData sends DataAndRanges to outCh.
	LoadIngestData(ctx context.Context, outCh chan<- DataAndRanges) error
	// KVStatistics returns the total kv size and total kv count.
	KVStatistics() (totalKVSize int64, totalKVCount int64)
	// ImportedStatistics returns the imported kv size and imported kv count.
	ImportedStatistics() (importedKVSize int64, importedKVCount int64)
	// GetKeyRange returns the key range [startKey, endKey) of the engine. If the
	// duplicate detection is enabled, the keys in engine are encoded by duplicate
	// detection but the returned keys should not be encoded.
	GetKeyRange() (startKey []byte, endKey []byte, err error)
	// GetRegionSplitKeys checks the KV distribution of the Engine and returns the
	// keys that can be used as region split keys. If the duplicate detection is
	// enabled, the keys stored in engine are encoded by duplicate detection but the
	// returned keys should not be encoded.
	GetRegionSplitKeys() ([][]byte, error)
	Close() error
}

Engine describes the common interface of local and external engine that local backend uses.

type ForwardIter

type ForwardIter interface {
	// First moves this iter to the first key.
	First() bool
	// Valid check this iter reach the end.
	Valid() bool
	// Next moves this iter forward.
	Next() bool
	// Key returns current position pair's key. The key is accessible after more
	// Next() or Key() invocations but is invalidated by Close() or ReleaseBuf().
	Key() []byte
	// Value returns current position pair's Value. The value is accessible after
	// more Next() or Value() invocations but is invalidated by Close() or
	// ReleaseBuf().
	Value() []byte
	// Close close this iter.
	Close() error
	// Error return current error on this iter.
	Error() error
	// ReleaseBuf release the memory that saves the previously returned keys and
	// values. These previously returned keys and values should not be accessed
	// again.
	ReleaseBuf()
}

ForwardIter describes a iterator that can only move forward.

type GRPCConns

type GRPCConns struct {
	// contains filtered or unexported fields
}

GRPCConns is a pool of gRPC connections.

func NewGRPCConns

func NewGRPCConns() *GRPCConns

NewGRPCConns creates a new GRPCConns.

func (*GRPCConns) Close

func (conns *GRPCConns) Close()

Close closes all gRPC connections in the pool.

func (*GRPCConns) GetGrpcConn

func (conns *GRPCConns) GetGrpcConn(ctx context.Context, storeID uint64,
	tcpConcurrency int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) (*grpc.ClientConn, error)

GetGrpcConn gets a gRPC connection from the pool.

type IngestData

type IngestData interface {
	// GetFirstAndLastKey returns the first and last key of the data reader in the
	// range [lowerBound, upperBound). Empty or nil bounds means unbounded.
	// lowerBound must be less than upperBound.
	// when there is no data in the range, it should return nil, nil, nil
	GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error)
	// NewIter creates an iterator. The only expected usage of the iterator is read
	// batches of key-value pairs by caller. Due to the implementation of IngestData,
	// the iterator may need to allocate memories to retain the key-value pair batch,
	// these memories will be allocated from given bufPool and be released when the
	// iterator is closed or ForwardIter.ReleaseBuf is called.
	NewIter(ctx context.Context, lowerBound, upperBound []byte, bufPool *membuf.Pool) ForwardIter
	// GetTS will be used as the start/commit TS of the data.
	GetTS() uint64
	// IncRef should be called every time when IngestData is referred by regionJob.
	// Multiple regionJob can share one IngestData. Same amount of DecRef should be
	// called to release the IngestData.
	IncRef()
	// DecRef is used to cooperate with IncRef to release IngestData.
	DecRef()
	// Finish will be called when the data is ingested successfully. Note that
	// one IngestData maybe partially ingested, so this function may be called
	// multiple times.
	Finish(totalBytes, totalCount int64)
}

IngestData describes a common interface that is needed by TiKV write + ingest RPC.

type KVIter

type KVIter interface {
	Next() bool
	Key() []byte
	Value() []byte
}

KVIter is a slim interface that DupDetector needs.

type KeyAdapter

type KeyAdapter interface {
	// Encode encodes the key with its corresponding rowID. It appends the encoded
	// key to dst and returns the resulting slice. The encoded key is guaranteed to
	// be in ascending order for comparison.
	// rowID must be a coded mem-comparable value, one way to get it is to use
	// tidb/util/codec package.
	Encode(dst []byte, key []byte, rowID []byte) []byte

	// Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice.
	Decode(dst []byte, data []byte) ([]byte, error)

	// EncodedLen returns the encoded key length.
	EncodedLen(key []byte, rowID []byte) int
}

KeyAdapter is used to encode and decode keys so that duplicate key can be identified by rowID and avoid overwritten.

type KvPair

type KvPair struct {
	// Key is the key of the KV pair
	Key []byte
	// Val is the value of the KV pair
	Val []byte
	// RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It has
	// two sources:
	//
	// When the KvPair is generated from ADD INDEX, the RowID is the encoded handle.
	//
	// Otherwise, the RowID is related to the row number in the source files, and
	// encode the number with `codec.EncodeComparableVarint`.
	RowID []byte
}

KvPair contains a key-value pair and other fields that can be used to ingest KV pairs into TiKV.

type MySQLConnectParam

type MySQLConnectParam struct {
	Host                     string
	Port                     int
	User                     string
	Password                 string
	SQLMode                  string
	MaxAllowedPacket         uint64
	TLSConfig                *tls.Config
	AllowFallbackToPlaintext bool
	Net                      string
	Vars                     map[string]string
}

MySQLConnectParam records the parameters needed to connect to a MySQL database.

func (*MySQLConnectParam) Connect

func (param *MySQLConnectParam) Connect() (*sql.DB, error)

Connect creates a new connection to the database.

func (*MySQLConnectParam) ToDriverConfig

func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config

ToDriverConfig converts the MySQLConnectParam to a mysql.Config.

type NoopKeyAdapter

type NoopKeyAdapter struct{}

NoopKeyAdapter is a key adapter that does nothing.

func (NoopKeyAdapter) Decode

func (NoopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error)

Decode implements KeyAdapter.

func (NoopKeyAdapter) Encode

func (NoopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte

Encode implements KeyAdapter.

func (NoopKeyAdapter) EncodedLen

func (NoopKeyAdapter) EncodedLen(key []byte, _ []byte) int

EncodedLen implements KeyAdapter.

type OnceError

type OnceError struct {
	// contains filtered or unexported fields
}

OnceError is an error value which will can be assigned once.

The zero value is ready for use.

func (*OnceError) Get

func (oe *OnceError) Get() error

Get returns the first error value stored in this instance.

func (*OnceError) Set

func (oe *OnceError) Set(e error)

Set assigns an error to this instance, if `e != nil`.

If this method is called multiple times, only the first call is effective.

type Pauser

type Pauser struct {
	// contains filtered or unexported fields
}

Pauser is a type which could allow multiple goroutines to wait on demand, similar to a gate or traffic light.

func NewPauser

func NewPauser() *Pauser

NewPauser returns an initialized pauser.

func (*Pauser) IsPaused

func (p *Pauser) IsPaused() bool

IsPaused gets whether the current state is paused or not.

func (*Pauser) Pause

func (p *Pauser) Pause()

Pause causes all calls to Wait() to block.

func (*Pauser) Resume

func (p *Pauser) Resume()

Resume causes all calls to Wait() to continue.

func (*Pauser) Wait

func (p *Pauser) Wait(ctx context.Context) error

Wait blocks the current goroutine if the current state is paused, until the pauser itself is resumed at least once.

If `ctx` is done, this method will also unblock immediately, and return the context error.

type Range

type Range struct {
	Start []byte
	End   []byte // end is always exclusive except import_sstpb.SSTMeta
}

Range contains a start key and an end key. The Range's key should not be encoded by duplicate detection.

type SQLWithRetry

type SQLWithRetry struct {
	// either *sql.DB or *sql.Conn
	DB           dbutil.DBExecutor
	Logger       log.Logger
	HideQueryLog bool
}

SQLWithRetry constructs a retryable transaction.

func (SQLWithRetry) Exec

func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, args ...any) error

Exec executes a single SQL with optional retry.

func (SQLWithRetry) QueryRow

func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...any) error

QueryRow executes a query that is expected to return at most one row.

func (SQLWithRetry) QueryStringRows

func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)

QueryStringRows executes a query that is expected to return multiple rows whose every column is string.

func (SQLWithRetry) Transact

func (t SQLWithRetry) Transact(ctx context.Context, purpose string, action func(context.Context, *sql.Tx) error) error

Transact executes an action in a transaction, and retry if the action failed with a retryable error.

type StorageSize

type StorageSize struct {
	Capacity  uint64
	Available uint64
}

StorageSize represents the storage's capacity and available size Learn from tidb-binlog source code.

func GetStorageSize

func GetStorageSize(dir string) (size StorageSize, err error)

GetStorageSize gets storage's capacity and available size

type TLS

type TLS struct {
	// contains filtered or unexported fields
}

TLS is a wrapper around a TLS configuration.

func NewTLS

func NewTLS(caPath, certPath, keyPath, host string, caBytes, certBytes, keyBytes []byte) (*TLS, error)

NewTLS constructs a new HTTP client with TLS configured with the CA, certificate and key paths.

func NewTLSFromMockServer

func NewTLSFromMockServer(server *httptest.Server) *TLS

NewTLSFromMockServer constructs a new TLS instance from the certificates of an *httptest.Server.

func (*TLS) GetJSON

func (tc *TLS) GetJSON(ctx context.Context, path string, v any) error

GetJSON performs a GET request to the given path and unmarshals the response

func (*TLS) TLSConfig

func (tc *TLS) TLSConfig() *tls.Config

TLSConfig returns the underlying TLS configuration.

func (*TLS) ToGRPCDialOption

func (tc *TLS) ToGRPCDialOption() grpc.DialOption

ToGRPCDialOption constructs a gRPC dial option.

func (*TLS) ToPDSecurityOption

func (tc *TLS) ToPDSecurityOption() pd.SecurityOption

ToPDSecurityOption converts the TLS configuration to a PD security option.

func (*TLS) ToTiKVSecurityConfig

func (tc *TLS) ToTiKVSecurityConfig() config.Security

ToTiKVSecurityConfig converts the TLS configuration to a TiKV security config. TODO: TiKV does not support pass in content.

func (*TLS) WithHost

func (tc *TLS) WithHost(host string) *TLS

WithHost creates a new TLS instance with the host replaced.

func (*TLS) WrapListener

func (tc *TLS) WrapListener(l net.Listener) net.Listener

WrapListener places a TLS layer on top of the existing listener.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL