Documentation ¶
Index ¶
- Constants
- Variables
- func BuildAddIndexSQL(tableName string, curTblInfo, desiredTblInfo *model.TableInfo) (singleSQL string, multiSQLs []string)
- func BuildDropIndexSQL(dbName, tableName string, idxInfo *model.IndexInfo) string
- func ConnectMySQL(cfg *mysql.Config) (*sql.DB, error)
- func EncodeIntRowID(rowID int64) []byte
- func EscapeIdentifier(identifier string) string
- func FprintfWithIdentifiers(w io.Writer, format string, identifiers ...string) (int, error)
- func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo
- func GetBackoffWeightFromDB(ctx context.Context, db *sql.DB) (int, error)
- func GetDropIndexInfos(tblInfo *model.TableInfo) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo)
- func GetExplicitRequestSourceTypeFromDB(ctx context.Context, db *sql.DB) (string, error)
- func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error)
- func GetJSON(ctx context.Context, client *http.Client, url string, v any) error
- func GetMaxAutoIDBase(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) (int64, error)
- func GetMockTLSUrl(tls *TLS) string
- func InterpolateMySQLString(s string) string
- func IsAccessDeniedNeedConfigPrivilegeError(err error) bool
- func IsContextCanceledError(err error) bool
- func IsDirExists(name string) bool
- func IsDupKeyError(err error) bool
- func IsEmptyDir(name string) bool
- func IsFunctionNotExistErr(err error, functionName string) bool
- func IsRaftKV2(ctx context.Context, db *sql.DB) (bool, error)
- func IsRetryableError(err error) bool
- func KillMySelf() error
- func NormalizeError(err error) error
- func NormalizeOrWrapErr(rfcErr *errors.Error, err error, args ...any) error
- func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, ...) error
- func Retry(purpose string, parentLogger log.Logger, action func() error) error
- func SameDisk(dir1 string, dir2 string) (bool, error)
- func SchemaExists(ctx context.Context, db dbutil.QueryExecutor, schema string) (bool, error)
- func SprintfWithIdentifiers(format string, identifiers ...string) string
- func TableExists(ctx context.Context, db dbutil.QueryExecutor, schema, table string) (bool, error)
- func TableHasAutoID(info *model.TableInfo) bool
- func TableHasAutoRowID(info *model.TableInfo) bool
- func ToGRPCDialOption(tls *tls.Config) grpc.DialOption
- func UniqueTable(schema string, table string) string
- func WriteMySQLIdentifier(builder *strings.Builder, identifier string)
- type ConnPool
- type DataAndRanges
- type DupDetectKeyAdapter
- type DupDetectOpt
- type DupDetector
- type Engine
- type ForwardIter
- type GRPCConns
- type IngestData
- type KVIter
- type KeyAdapter
- type KvPair
- type MySQLConnectParam
- type NoopKeyAdapter
- type OnceError
- type Pauser
- type Range
- type SQLWithRetry
- func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, args ...any) error
- func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...any) error
- func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)
- func (t SQLWithRetry) Transact(ctx context.Context, purpose string, ...) error
- type StorageSize
- type TLS
- func (tc *TLS) GetJSON(ctx context.Context, path string, v any) error
- func (tc *TLS) TLSConfig() *tls.Config
- func (tc *TLS) ToGRPCDialOption() grpc.DialOption
- func (tc *TLS) ToPDSecurityOption() pd.SecurityOption
- func (tc *TLS) ToTiKVSecurityConfig() config.Security
- func (tc *TLS) WithHost(host string) *TLS
- func (tc *TLS) WrapListener(l net.Listener) net.Listener
Constants ¶
const (
// IndexEngineID is the engine ID for index engine.
IndexEngineID = -1
)
Variables ¶
var ( ErrUnknown = errors.Normalize("unknown error", errors.RFCCodeText("Lightning:Common:ErrUnknown")) ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("Lightning:Common:ErrInvalidArgument")) ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("Lightning:Common:ErrVersionMismatch")) ErrReadConfigFile = errors.Normalize("cannot read config file '%s'", errors.RFCCodeText("Lightning:Config:ErrReadConfigFile")) ErrParseConfigFile = errors.Normalize("cannot parse config file '%s'", errors.RFCCodeText("Lightning:Config:ErrParseConfigFile")) ErrInvalidConfig = errors.Normalize("invalid config", errors.RFCCodeText("Lightning:Config:ErrInvalidConfig")) ErrInvalidTLSConfig = errors.Normalize("invalid tls config", errors.RFCCodeText("Lightning:Config:ErrInvalidTLSConfig")) ErrInvalidSortedKVDir = errors.Normalize("invalid sorted-kv-dir '%s' for local backend, please change the config or delete the path", errors.RFCCodeText("Lightning:Config:ErrInvalidSortedKVDir")) ErrStorageUnknown = errors.Normalize("unknown storage error", errors.RFCCodeText("Lightning:Storage:ErrStorageUnknown")) ErrInvalidPermission = errors.Normalize("invalid permission", errors.RFCCodeText("Lightning:Storage:ErrInvalidPermission")) ErrInvalidStorageConfig = errors.Normalize("invalid data-source-dir", errors.RFCCodeText("Lightning:Storage:ErrInvalidStorageConfig")) ErrEmptySourceDir = errors.Normalize("data-source-dir '%s' doesn't exist or contains no files", errors.RFCCodeText("Lightning:Storage:ErrEmptySourceDir")) ErrTableRoute = errors.Normalize("table route error", errors.RFCCodeText("Lightning:Loader:ErrTableRoute")) ErrInvalidSchemaFile = errors.Normalize("invalid schema file", errors.RFCCodeText("Lightning:Loader:ErrInvalidSchemaFile")) ErrTooManySourceFiles = errors.Normalize("too many source files", errors.RFCCodeText("Lightning:Loader:ErrTooManySourceFiles")) ErrSystemRequirementNotMet = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet")) ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict")) ErrPreCheckFailed = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed")) ErrCheckClusterRegion = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion")) ErrCheckLocalResource = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource")) ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty")) ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader")) ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource")) ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR")) ErrOpenCheckpoint = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint")) ErrReadCheckpoint = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint")) ErrUpdateCheckpoint = errors.Normalize("update checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrUpdateCheckpoint")) ErrUnknownCheckpointDriver = errors.Normalize("unknown checkpoint driver '%s'", errors.RFCCodeText("Lightning:Checkpoint:ErrUnknownCheckpointDriver")) ErrInvalidCheckpoint = errors.Normalize("invalid checkpoint", errors.RFCCodeText("Lightning:Checkpoint:ErrInvalidCheckpoint")) ErrCheckpointNotFound = errors.Normalize("checkpoint not found", errors.RFCCodeText("Lightning:Checkpoint:ErrCheckpointNotFound")) ErrInitCheckpoint = errors.Normalize("init checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrInitCheckpoint")) ErrCleanCheckpoint = errors.Normalize("clean checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrCleanCheckpoint")) ErrMetaMgrUnknown = errors.Normalize("unknown error occur on meta manager", errors.RFCCodeText("Lightning:MetaMgr:ErrMetaMgrUnknown")) ErrDBConnect = errors.Normalize("failed to connect database", errors.RFCCodeText("Lightning:DB:ErrDBConnect")) ErrInitErrManager = errors.Normalize("init error manager error", errors.RFCCodeText("Lightning:DB:ErrInitErrManager")) ErrInitMetaManager = errors.Normalize("init meta manager error", errors.RFCCodeText("Lightning:DB:ErrInitMetaManager")) ErrUpdatePD = errors.Normalize("update pd error", errors.RFCCodeText("Lightning:PD:ErrUpdatePD")) ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient")) ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC")) ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion")) ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient")) ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest")) ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch")) ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader")) ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy")) ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound")) ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady")) ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed")) ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped")) ErrNoLeader = errors.Normalize("write to tikv with no leader returned, region '%d', leader: %d", errors.RFCCodeText("Lightning:KV:ErrNoLeader")) ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend")) ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile")) ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB")) ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists")) ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt")) ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema")) ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns")) ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch")) ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable")) ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV")) ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs")) ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus")) ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming")) ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows")) // ErrFoundDuplicateKeys shoud be replaced with ErrFoundDataConflictRecords and ErrFoundIndexConflictRecords (TODO) ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey")) ErrAddIndexFailed = errors.Normalize("add index on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrAddIndexFailed")) ErrDropIndexFailed = errors.Normalize("drop index %s on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrDropIndexFailed")) ErrFoundDataConflictRecords = errors.Normalize("found data conflict records in table %s, primary key is '%s', row data is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDataConflictRecords")) ErrFoundIndexConflictRecords = errors.Normalize("found index conflict records in table %s, index name is '%s', unique key is '%s', primary key is '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundIndexConflictRecords")) )
error definitions
var DefaultImportVariablesTiDB = map[string]string{
"tidb_row_format_version": "1",
}
DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system variables from downstream in local/importer backend. The values record the default values if missing.
var DefaultImportantVariables = map[string]string{
"max_allowed_packet": "67108864",
"div_precision_increment": "4",
"time_zone": "SYSTEM",
"lc_time_names": "en_US",
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
"tidb_backoff_weight": "6",
}
DefaultImportantVariables is used in ObtainImportantVariables to retrieve the system variables from downstream which may affect KV encode result. The values record the default values if missing.
var ErrWriteTooSlow = errors.New("write too slow, maybe gRPC is blocked forever")
ErrWriteTooSlow is used to get rid of the gRPC blocking issue. there are some strange blocking issues of gRPC like https://github.com/pingcap/tidb/issues/48352 https://github.com/pingcap/tidb/issues/46321 and I don't know why 😭
var ( // MinRowID is the minimum rowID value after DupDetectKeyAdapter.Encode(). MinRowID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0} )
Functions ¶
func BuildAddIndexSQL ¶
func BuildAddIndexSQL( tableName string, curTblInfo, desiredTblInfo *model.TableInfo, ) (singleSQL string, multiSQLs []string)
BuildAddIndexSQL builds the SQL statement to create missing indexes. It returns both a single SQL statement that creates all indexes at once, and a list of SQL statements that creates each index individually.
func BuildDropIndexSQL ¶
BuildDropIndexSQL builds the SQL statement to drop index.
func ConnectMySQL ¶
ConnectMySQL connects MySQL with the dsn. If access is denied and the password is a valid base64 encoding, we will try to connect MySQL with the base64 decoding of the password.
func EncodeIntRowID ¶
EncodeIntRowID encodes an int64 row id.
func EscapeIdentifier ¶
EscapeIdentifier quote and escape an sql identifier
func FprintfWithIdentifiers ¶
FprintfWithIdentifiers escapes the identifiers and fprintf them. The input identifiers must not be escaped.
func GetAutoRandomColumn ¶
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo
GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it. todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetBackoffWeightFromDB ¶
GetBackoffWeightFromDB gets the backoff weight from database.
func GetDropIndexInfos ¶
func GetDropIndexInfos( tblInfo *model.TableInfo, ) (remainIndexes []*model.IndexInfo, dropIndexes []*model.IndexInfo)
GetDropIndexInfos returns the index infos that need to be dropped and the remain indexes.
func GetExplicitRequestSourceTypeFromDB ¶
GetExplicitRequestSourceTypeFromDB gets the explicit request source type from database.
func GetGlobalAutoIDAlloc ¶
func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error)
GetGlobalAutoIDAlloc returns the autoID allocators for a table. export it for testing.
func GetJSON ¶
GetJSON fetches a page and parses it as JSON. The parsed result will be stored into the `v`. The variable `v` must be a pointer to a type that can be unmarshalled from JSON.
Example:
client := &http.Client{} var resp struct { IP string } if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil { return errors.Trace(err) } fmt.Println(resp.IP)
func GetMaxAutoIDBase ¶
GetMaxAutoIDBase returns the max auto ID base for a table.
func GetMockTLSUrl ¶
GetMockTLSUrl returns tls's host for mock test
func InterpolateMySQLString ¶
InterpolateMySQLString interpolates a string into a MySQL string literal.
func IsAccessDeniedNeedConfigPrivilegeError ¶
IsAccessDeniedNeedConfigPrivilegeError checks if err is generated from a query to TiDB which failed due to missing CONFIG privilege.
func IsContextCanceledError ¶
IsContextCanceledError returns whether the error is caused by context cancellation. This function should only be used when the code logic is affected by whether the error is canceling or not.
This function returns `false` (not a context-canceled error) if `err == nil`.
func IsDupKeyError ¶
IsDupKeyError checks if err is a duplicate index error.
func IsFunctionNotExistErr ¶
IsFunctionNotExistErr checks if err is a function not exist error.
func IsRetryableError ¶
IsRetryableError returns whether the error is transient (e.g. network connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This function returns `false` (irrecoverable) if `err == nil`.
If the error is a multierr, returns true only if all suberrors are retryable.
func KillMySelf ¶
func KillMySelf() error
KillMySelf sends sigint to current process, used in integration test only
Only works on Unix. Signaling on Windows is not supported.
func NormalizeError ¶
NormalizeError converts an arbitrary error to *errors.Error based above predefined errors. If the underlying err is already an *error.Error which is prefixed by "Lightning:", leave error ID unchanged. Otherwise, converts the error ID to Lightning's predefined error IDs.
func NormalizeOrWrapErr ¶
NormalizeOrWrapErr tries to normalize err. If the returned error is ErrUnknown, wraps it with the given rfcErr.
func RebaseTableAllocators ¶
func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) error
RebaseTableAllocators rebase the allocators of a table. This function only rebase a table allocator when its new base is given in `bases` param, else it will be skipped. base is the max id that have been used by the table, the next usable id will be base + 1, see Allocator.Alloc.
func Retry ¶
Retry is shared by SQLWithRetry.perform, implementation of GlueCheckpointsDB and TiDB's glue implementation
func SchemaExists ¶
SchemaExists return whether schema with specified name exists.
func SprintfWithIdentifiers ¶
SprintfWithIdentifiers escapes the identifiers and sprintf them. The input identifiers must not be escaped.
func TableExists ¶
TableExists return whether table with specified name exists in target db
func TableHasAutoID ¶
TableHasAutoID return whether table has auto generated id.
func TableHasAutoRowID ¶
TableHasAutoRowID return whether table has auto generated row id
func ToGRPCDialOption ¶
func ToGRPCDialOption(tls *tls.Config) grpc.DialOption
ToGRPCDialOption constructs a gRPC dial option from tls.Config.
func UniqueTable ¶
UniqueTable returns an unique table name.
func WriteMySQLIdentifier ¶
WriteMySQLIdentifier writes a MySQL identifier into the string builder. Writes a MySQL identifier into the string builder. The identifier is always escaped into the form "`foo`".
Types ¶
type ConnPool ¶
type ConnPool struct {
// contains filtered or unexported fields
}
ConnPool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.
func NewConnPool ¶
func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error), logger log.Logger) *ConnPool
NewConnPool creates a new connPool by the specified conn factory function and capacity.
func (*ConnPool) TakeConns ¶
func (p *ConnPool) TakeConns() (conns []*grpc.ClientConn)
TakeConns takes all connections from the pool.
type DataAndRanges ¶
type DataAndRanges struct { Data IngestData SortedRanges []Range }
DataAndRanges is a pair of IngestData and list of Range. Each Range will become a regionJob, and the regionJob will read data from Data field.
type DupDetectKeyAdapter ¶
type DupDetectKeyAdapter struct{}
DupDetectKeyAdapter is a key adapter that appends rowID to the key to avoid overwritten.
func (DupDetectKeyAdapter) Decode ¶
func (DupDetectKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error)
Decode implements KeyAdapter.
func (DupDetectKeyAdapter) Encode ¶
func (DupDetectKeyAdapter) Encode(dst []byte, key []byte, rowID []byte) []byte
Encode implements KeyAdapter.
func (DupDetectKeyAdapter) EncodedLen ¶
func (DupDetectKeyAdapter) EncodedLen(key []byte, rowID []byte) int
EncodedLen implements KeyAdapter.
type DupDetectOpt ¶
type DupDetectOpt struct {
ReportErrOnDup bool
}
DupDetectOpt is the option for duplicate detection.
type DupDetector ¶
type DupDetector struct {
// contains filtered or unexported fields
}
DupDetector extract the decoded key and value from the iter which may contain duplicate keys and store the keys encoded by KeyAdapter. The duplicate keys and values will be saved in dupDB.
func NewDupDetector ¶
func NewDupDetector( keyAdaptor KeyAdapter, dupDBWriteBatch *pebble.Batch, logger log.Logger, option DupDetectOpt, ) *DupDetector
NewDupDetector creates a new DupDetector. dupDBWriteBatch will be closed when DupDetector is closed.
type Engine ¶
type Engine interface { // ID is the identifier of an engine. ID() string // LoadIngestData sends DataAndRanges to outCh. LoadIngestData(ctx context.Context, outCh chan<- DataAndRanges) error // KVStatistics returns the total kv size and total kv count. KVStatistics() (totalKVSize int64, totalKVCount int64) // ImportedStatistics returns the imported kv size and imported kv count. ImportedStatistics() (importedKVSize int64, importedKVCount int64) // GetKeyRange returns the key range [startKey, endKey) of the engine. If the // duplicate detection is enabled, the keys in engine are encoded by duplicate // detection but the returned keys should not be encoded. GetKeyRange() (startKey []byte, endKey []byte, err error) // GetRegionSplitKeys checks the KV distribution of the Engine and returns the // keys that can be used as region split keys. If the duplicate detection is // enabled, the keys stored in engine are encoded by duplicate detection but the // returned keys should not be encoded. // // Currently, the start/end key of this import should also be included in the // returned split keys. GetRegionSplitKeys() ([][]byte, error) Close() error }
Engine describes the common interface of local and external engine that local backend uses.
type ForwardIter ¶
type ForwardIter interface { // First moves this iter to the first key. First() bool // Valid check this iter reach the end. Valid() bool // Next moves this iter forward. Next() bool // Key returns current position pair's key. The key is accessible after more // Next() or Key() invocations but is invalidated by Close() or ReleaseBuf(). Key() []byte // Value returns current position pair's Value. The value is accessible after // more Next() or Value() invocations but is invalidated by Close() or // ReleaseBuf(). Value() []byte // Close close this iter. Close() error // Error return current error on this iter. Error() error // ReleaseBuf release the memory that saves the previously returned keys and // values. These previously returned keys and values should not be accessed // again. ReleaseBuf() }
ForwardIter describes a iterator that can only move forward.
type GRPCConns ¶
type GRPCConns struct {
// contains filtered or unexported fields
}
GRPCConns is a pool of gRPC connections.
func (*GRPCConns) Close ¶
func (conns *GRPCConns) Close()
Close closes all gRPC connections in the pool.
func (*GRPCConns) GetGrpcConn ¶
func (conns *GRPCConns) GetGrpcConn(ctx context.Context, storeID uint64, tcpConcurrency int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) (*grpc.ClientConn, error)
GetGrpcConn gets a gRPC connection from the pool.
type IngestData ¶
type IngestData interface { // GetFirstAndLastKey returns the first and last key of the data reader in the // range [lowerBound, upperBound). Empty or nil bounds means unbounded. // lowerBound must be less than upperBound. // when there is no data in the range, it should return nil, nil, nil GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) // NewIter creates an iterator. The only expected usage of the iterator is read // batches of key-value pairs by caller. Due to the implementation of IngestData, // the iterator may need to allocate memories to retain the key-value pair batch, // these memories will be allocated from given bufPool and be released when the // iterator is closed or ForwardIter.ReleaseBuf is called. NewIter(ctx context.Context, lowerBound, upperBound []byte, bufPool *membuf.Pool) ForwardIter // GetTS will be used as the start/commit TS of the data. GetTS() uint64 // IncRef should be called every time when IngestData is referred by regionJob. // Multiple regionJob can share one IngestData. Same amount of DecRef should be // called to release the IngestData. IncRef() // DecRef is used to cooperate with IncRef to release IngestData. DecRef() // Finish will be called when the data is ingested successfully. Note that // one IngestData maybe partially ingested, so this function may be called // multiple times. Finish(totalBytes, totalCount int64) }
IngestData describes a common interface that is needed by TiKV write + ingest RPC.
type KeyAdapter ¶
type KeyAdapter interface { // Encode encodes the key with its corresponding rowID. It appends the encoded // key to dst and returns the resulting slice. The encoded key is guaranteed to // be in ascending order for comparison. // rowID must be a coded mem-comparable value, one way to get it is to use // tidb/util/codec package. Encode(dst []byte, key []byte, rowID []byte) []byte // Decode decodes the original key to dst. It appends the encoded key to dst and returns the resulting slice. Decode(dst []byte, data []byte) ([]byte, error) // EncodedLen returns the encoded key length. EncodedLen(key []byte, rowID []byte) int }
KeyAdapter is used to encode and decode keys so that duplicate key can be identified by rowID and avoid overwritten.
type KvPair ¶
type KvPair struct { // Key is the key of the KV pair Key []byte // Val is the value of the KV pair Val []byte // RowID identifies a KvPair in case two KvPairs are equal in Key and Val. It has // two sources: // // When the KvPair is generated from ADD INDEX, the RowID is the encoded handle. // // Otherwise, the RowID is related to the row number in the source files, and // encode the number with `codec.EncodeComparableVarint`. RowID []byte }
KvPair contains a key-value pair and other fields that can be used to ingest KV pairs into TiKV.
type MySQLConnectParam ¶
type MySQLConnectParam struct { Host string Port int User string Password string SQLMode string MaxAllowedPacket uint64 TLSConfig *tls.Config AllowFallbackToPlaintext bool Net string Vars map[string]string }
MySQLConnectParam records the parameters needed to connect to a MySQL database.
func (*MySQLConnectParam) Connect ¶
func (param *MySQLConnectParam) Connect() (*sql.DB, error)
Connect creates a new connection to the database.
func (*MySQLConnectParam) ToDriverConfig ¶
func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config
ToDriverConfig converts the MySQLConnectParam to a mysql.Config.
type NoopKeyAdapter ¶
type NoopKeyAdapter struct{}
NoopKeyAdapter is a key adapter that does nothing.
func (NoopKeyAdapter) Decode ¶
func (NoopKeyAdapter) Decode(dst []byte, data []byte) ([]byte, error)
Decode implements KeyAdapter.
func (NoopKeyAdapter) Encode ¶
func (NoopKeyAdapter) Encode(dst []byte, key []byte, _ []byte) []byte
Encode implements KeyAdapter.
func (NoopKeyAdapter) EncodedLen ¶
func (NoopKeyAdapter) EncodedLen(key []byte, _ []byte) int
EncodedLen implements KeyAdapter.
type OnceError ¶
type OnceError struct {
// contains filtered or unexported fields
}
OnceError is an error value which will can be assigned once.
The zero value is ready for use.
type Pauser ¶
type Pauser struct {
// contains filtered or unexported fields
}
Pauser is a type which could allow multiple goroutines to wait on demand, similar to a gate or traffic light.
type Range ¶
type Range struct { Start []byte End []byte // end is always exclusive except import_sstpb.SSTMeta }
Range contains a start key and an end key. The Range's key should not be encoded by duplicate detection.
type SQLWithRetry ¶
type SQLWithRetry struct { // either *sql.DB or *sql.Conn DB dbutil.DBExecutor Logger log.Logger HideQueryLog bool }
SQLWithRetry constructs a retryable transaction.
func (SQLWithRetry) QueryRow ¶
func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...any) error
QueryRow executes a query that is expected to return at most one row.
func (SQLWithRetry) QueryStringRows ¶
func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)
QueryStringRows executes a query that is expected to return multiple rows whose every column is string.
type StorageSize ¶
StorageSize represents the storage's capacity and available size Learn from tidb-binlog source code.
func GetStorageSize ¶
func GetStorageSize(dir string) (size StorageSize, err error)
GetStorageSize gets storage's capacity and available size
type TLS ¶
type TLS struct {
// contains filtered or unexported fields
}
TLS is a wrapper around a TLS configuration.
func NewTLS ¶
func NewTLS(caPath, certPath, keyPath, host string, caBytes, certBytes, keyBytes []byte) (*TLS, error)
NewTLS constructs a new HTTP client with TLS configured with the CA, certificate and key paths.
func NewTLSFromMockServer ¶
NewTLSFromMockServer constructs a new TLS instance from the certificates of an *httptest.Server.
func (*TLS) ToGRPCDialOption ¶
func (tc *TLS) ToGRPCDialOption() grpc.DialOption
ToGRPCDialOption constructs a gRPC dial option.
func (*TLS) ToPDSecurityOption ¶
func (tc *TLS) ToPDSecurityOption() pd.SecurityOption
ToPDSecurityOption converts the TLS configuration to a PD security option.
func (*TLS) ToTiKVSecurityConfig ¶
ToTiKVSecurityConfig converts the TLS configuration to a TiKV security config. TODO: TiKV does not support pass in content.