Documentation
¶
Index ¶
- Constants
- func BuildObserveDataRanges(storage kv.Storage, filterStr []string, tableFilter filter.Filter, ...) ([]kv.KeyRange, error)
- func BuildObserveMetaRange() *kv.KeyRange
- func DecodeKVEntry(buff []byte) (k, v []byte, length uint32, err error)
- func EncodeKVEntry(k, v []byte) []byte
- func ExtractValue(e *kv.Entry, cf string) ([]byte, error)
- func FastUnmarshalMetaData(ctx context.Context, s storage.ExternalStorage, metaDataWorkerPoolSize uint, ...) error
- func FromDBMapProto(dbMaps []*backuppb.PitrDBMap) map[UpstreamID]*DBReplace
- func GetStreamBackupGlobalCheckpointPrefix() string
- func GetStreamBackupMetaPrefix() string
- func GetTSFromFile(ctx context.Context, s storage.ExternalStorage, filename string) (uint64, error)
- func LoadIngestedSSTs(ctx context.Context, s storage.ExternalStorage, paths []string) iter.TryNextor[IngestedSSTsGroup]
- func LogDBReplaceMap(title string, dbReplaces map[UpstreamID]*DBReplace)
- func MTMaybeSkipTruncateLog(cond bool) migrateToOpt
- func MTSkipTruncateLog(o *migToOpt)
- func MaybeQPS(ctx context.Context, mgr PDInfoProvider, client *http.Client) (float64, error)
- func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration
- func NewMigration() *pb.Migration
- func ParseDBIDFromTableKey(key []byte) (int64, error)
- func ReplaceMetadata(meta *pb.Metadata, filegroups []*pb.DataFileGroup)
- func SetTSToFile(ctx context.Context, s storage.ExternalStorage, safepoint uint64, ...) error
- func UpdateShiftTS(m *pb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool)
- type Comparator
- type ContentRef
- type DBReplace
- type DelRangeParams
- type DownstreamID
- type EventIterator
- type FileGroupInfo
- type Hooks
- type IngestedSSTsGroup
- type Iterator
- type LoadOptions
- type LogBackupTableHistoryManager
- func (info *LogBackupTableHistoryManager) AddPartitionHistory(partitionID int64, tableName string, dbID int64, parentTableID int64)
- func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64)
- func (info *LogBackupTableHistoryManager) GetDBNameByID(dbId int64) (string, bool)
- func (info *LogBackupTableHistoryManager) GetNewlyCreatedDBHistory() map[int64]string
- func (info *LogBackupTableHistoryManager) GetTableHistory() map[int64][2]TableLocationInfo
- func (info *LogBackupTableHistoryManager) RecordDBIdToName(dbId int64, dbName string)
- type MergeAndMigrateToOpt
- type MergeAndMigratedTo
- type MetadataHelper
- func (m *MetadataHelper) Close()
- func (m *MetadataHelper) InitCacheEntry(path string, ref int)
- func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error)
- func (*MetadataHelper) ParseToMetadata(rawMetaData []byte) (*backuppb.Metadata, error)
- func (*MetadataHelper) ParseToMetadataHard(rawMetaData []byte) (*backuppb.Metadata, error)
- func (m *MetadataHelper) ReadFile(ctx context.Context, path string, offset uint64, length uint64, ...) ([]byte, error)
- type MetadataHelperOption
- type MetadataInfo
- type MigratedTo
- type MigrationExt
- func (m MigrationExt) AddMigrationToTable(ctx context.Context, mig *pb.Migration, table *glue.Table)
- func (m MigrationExt) AppendMigration(ctx context.Context, mig *pb.Migration) (int, error)
- func (m MigrationExt) DryRun(f func(MigrationExt)) []storage.Effect
- func (m *MigrationExt) GetReadLock(ctx context.Context, hint string) (storage.RemoteLock, error)
- func (m MigrationExt) Load(ctx context.Context, opts ...LoadOptions) (Migrations, error)
- func (m MigrationExt) MergeAndMigrateTo(ctx context.Context, targetSpec int, opts ...MergeAndMigrateToOpt) (result MergeAndMigratedTo)
- type Migrations
- type NoHooks
- func (NoHooks) DeletedAFileForTruncating(count int)
- func (NoHooks) DeletedAllFilesForTruncating()
- func (NoHooks) EndLoadingMetaForTruncating()
- func (NoHooks) HandingMetaEditDone()
- func (NoHooks) HandledAMetaEdit(*pb.MetaEdit)
- func (NoHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
- func (NoHooks) StartHandlingMetaEdits([]*pb.MetaEdit)
- func (NoHooks) StartLoadingMetaForTruncating()
- type OrderedMigration
- type PDInfoProvider
- type PathedIngestedSSTs
- type PreDelRangeQuery
- type ProgressBarHooks
- func (p *ProgressBarHooks) DeletedAFileForTruncating(count int)
- func (p *ProgressBarHooks) DeletedAllFilesForTruncating()
- func (p *ProgressBarHooks) EndLoadingMetaForTruncating()
- func (p *ProgressBarHooks) HandingMetaEditDone()
- func (p *ProgressBarHooks) HandledAMetaEdit(edit *pb.MetaEdit)
- func (p *ProgressBarHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
- func (p *ProgressBarHooks) StartHandlingMetaEdits(edits []*pb.MetaEdit)
- func (p *ProgressBarHooks) StartLoadingMetaForTruncating()
- type RawMetaKey
- type RawWriteCFValue
- func (v *RawWriteCFValue) EncodeTo() []byte
- func (v *RawWriteCFValue) GetShortValue() []byte
- func (v *RawWriteCFValue) GetStartTs() uint64
- func (v *RawWriteCFValue) GetWriteType() byte
- func (v *RawWriteCFValue) HasShortValue() bool
- func (v *RawWriteCFValue) IsDelete() bool
- func (v *RawWriteCFValue) IsRollback() bool
- func (v *RawWriteCFValue) ParseFrom(data []byte) error
- func (v *RawWriteCFValue) UpdateShortValue(value []byte)
- type SchemasReplace
- type StatusController
- type StreamBackupSearch
- type StreamKVInfo
- type StreamMetadataSet
- func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))
- func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storage.ExternalStorage) error
- func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s storage.ExternalStorage, until uint64) (uint64, error)
- func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context.Context, from uint64, st storage.ExternalStorage, ...) ([]string, error)
- func (ms *StreamMetadataSet) TEST_GetMetadataInfos() map[string]*MetadataInfo
- type TableLocationInfo
- type TableMappingManager
- func (tm *TableMappingManager) ApplyFilterToDBReplaceMap(tracker *utils.PiTRIdTracker)
- func (tm *TableMappingManager) FromDBReplaceMap(dbReplaceMap map[UpstreamID]*DBReplace) error
- func (tm *TableMappingManager) IsEmpty() bool
- func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBReplace)
- func (tm *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf string) error
- func (tm *TableMappingManager) ReplaceTemporaryIDs(ctx context.Context, ...) error
- func (tm *TableMappingManager) ToProto() []*backuppb.PitrDBMap
- type TableReplace
- type TaskPrinter
- type TaskStatus
- type UpstreamID
- type WriteType
Constants ¶
const InitialTempId int64 = 0
const (
SupportedMigVersion = pb.MigrationVersion_M2
)
const (
// TruncateSafePointFileName is the filename that the ts(the log have been truncated) is saved into.
TruncateSafePointFileName = "v1_stream_trancate_safepoint.txt"
)
const WildCard = "*"
Variables ¶
This section is empty.
Functions ¶
func BuildObserveDataRanges ¶
func BuildObserveDataRanges( storage kv.Storage, filterStr []string, tableFilter filter.Filter, backupTS uint64, ) ([]kv.KeyRange, error)
BuildObserveDataRanges builds key ranges to observe data KV.
func BuildObserveMetaRange ¶
BuildObserveMetaRange specifies build key ranges to observe meta KV(contains all of metas)
func DecodeKVEntry ¶
DecodeKVEntry decodes kv-entry from buff. If error returned is nil, it can get key/value and the length occupied in buff.
func EncodeKVEntry ¶
EncodeKVEntry gets a entry-event from input: k, v.
func FastUnmarshalMetaData ¶
func FastUnmarshalMetaData( ctx context.Context, s storage.ExternalStorage, metaDataWorkerPoolSize uint, fn func(path string, rawMetaData []byte) error, ) error
FastUnmarshalMetaData used a 128 worker pool to speed up read metadata content from external_storage.
func FromDBMapProto ¶
func FromDBMapProto(dbMaps []*backuppb.PitrDBMap) map[UpstreamID]*DBReplace
func GetStreamBackupGlobalCheckpointPrefix ¶
func GetStreamBackupGlobalCheckpointPrefix() string
func GetStreamBackupMetaPrefix ¶
func GetStreamBackupMetaPrefix() string
func GetTSFromFile ¶
func GetTSFromFile( ctx context.Context, s storage.ExternalStorage, filename string, ) (uint64, error)
GetTSFromFile gets the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.
func LoadIngestedSSTs ¶
func LoadIngestedSSTs( ctx context.Context, s storage.ExternalStorage, paths []string, ) iter.TryNextor[IngestedSSTsGroup]
func LogDBReplaceMap ¶
func LogDBReplaceMap(title string, dbReplaces map[UpstreamID]*DBReplace)
func MTMaybeSkipTruncateLog ¶
func MTMaybeSkipTruncateLog(cond bool) migrateToOpt
func MTSkipTruncateLog ¶
func MTSkipTruncateLog(o *migToOpt)
func MaybeQPS ¶
MaybeQPS get a number like the QPS of last seconds for each store via the prometheus interface. TODO: this is a temporary solution(aha, like in a Hackthon),
we MUST find a better way for providing this information.
func MergeMigrations ¶
Merge merges two migrations. The merged migration contains all operations from the two arguments.
func NewMigration ¶
func ParseDBIDFromTableKey ¶
func ReplaceMetadata ¶
func ReplaceMetadata(meta *pb.Metadata, filegroups []*pb.DataFileGroup)
replace the filegroups and update the ts of the replaced metadata
func SetTSToFile ¶
func SetTSToFile( ctx context.Context, s storage.ExternalStorage, safepoint uint64, filename string, ) error
SetTSToFile overrides the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.
Types ¶
type Comparator ¶
Comparator is used for comparing the relationship of src and dst
func NewStartWithComparator ¶
func NewStartWithComparator() Comparator
NewStartWithComparator create a comparator to compare whether src starts with dst
type ContentRef ¶
type ContentRef struct {
// contains filtered or unexported fields
}
type DBReplace ¶
type DBReplace struct { Name string DbID DownstreamID TableMap map[UpstreamID]*TableReplace FilteredOut bool }
DBReplace specifies database information mapping from up-stream cluster to down-stream cluster.
func NewDBReplace ¶
func NewDBReplace(name string, newID DownstreamID) *DBReplace
NewDBReplace creates a DBReplace struct.
type DelRangeParams ¶
type DownstreamID ¶
type DownstreamID = int64
type EventIterator ¶
type EventIterator struct {
// contains filtered or unexported fields
}
EventIterator is used for reading kv-event from buffer by iterator.
func (*EventIterator) GetError ¶
func (ei *EventIterator) GetError() error
GetError gets the error in iterator. it returns nil if it has nothing mistaken happened
func (*EventIterator) Key ¶
func (ei *EventIterator) Key() []byte
Key gets the key in kv-event if valid() == true
func (*EventIterator) Next ¶
func (ei *EventIterator) Next()
Next specifies the next iterative element.
func (*EventIterator) Valid ¶
func (ei *EventIterator) Valid() bool
Valid checks whether the iterator is valid.
func (*EventIterator) Value ¶
func (ei *EventIterator) Value() []byte
Value gets the value in kv-event if valid() == true
type FileGroupInfo ¶
keep these meta-information for statistics and filtering
type Hooks ¶
type Hooks interface { StartLoadingMetaForTruncating() EndLoadingMetaForTruncating() StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64) DeletedAFileForTruncating(count int) DeletedAllFilesForTruncating() StartHandlingMetaEdits([]*pb.MetaEdit) HandledAMetaEdit(*pb.MetaEdit) HandingMetaEditDone() }
type IngestedSSTsGroup ¶
type IngestedSSTsGroup []PathedIngestedSSTs
func (IngestedSSTsGroup) GroupFinished ¶
func (ebs IngestedSSTsGroup) GroupFinished() bool
func (IngestedSSTsGroup) GroupTS ¶
func (ebs IngestedSSTsGroup) GroupTS() uint64
type Iterator ¶
Iterator specifies a read iterator Interface.
func NewEventIterator ¶
NewEventIterator creates a Iterator to read kv-event.
type LoadOptions ¶
type LoadOptions func(*loadConfig)
func MLNotFoundIsErr ¶
func MLNotFoundIsErr() LoadOptions
type LogBackupTableHistoryManager ¶
type LogBackupTableHistoryManager struct {
// contains filtered or unexported fields
}
func NewTableHistoryManager ¶
func NewTableHistoryManager() *LogBackupTableHistoryManager
func (*LogBackupTableHistoryManager) AddPartitionHistory ¶
func (info *LogBackupTableHistoryManager) AddPartitionHistory(partitionID int64, tableName string, dbID int64, parentTableID int64)
AddPartitionHistory adds or updates history for a partition
func (*LogBackupTableHistoryManager) AddTableHistory ¶
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64)
AddTableHistory adds or updates history for a regular table
func (*LogBackupTableHistoryManager) GetDBNameByID ¶
func (info *LogBackupTableHistoryManager) GetDBNameByID(dbId int64) (string, bool)
func (*LogBackupTableHistoryManager) GetNewlyCreatedDBHistory ¶
func (info *LogBackupTableHistoryManager) GetNewlyCreatedDBHistory() map[int64]string
func (*LogBackupTableHistoryManager) GetTableHistory ¶
func (info *LogBackupTableHistoryManager) GetTableHistory() map[int64][2]TableLocationInfo
GetTableHistory returns information about all tables that have been renamed. Returns a map of table IDs to their original and current locations
func (*LogBackupTableHistoryManager) RecordDBIdToName ¶
func (info *LogBackupTableHistoryManager) RecordDBIdToName(dbId int64, dbName string)
type MergeAndMigrateToOpt ¶
type MergeAndMigrateToOpt func(*mergeAndMigrateToConfig)
func MMOptAlwaysRunTruncate ¶
func MMOptAlwaysRunTruncate() MergeAndMigrateToOpt
MMOptAlwaysRunTruncate forces the merge and migrate to always run the truncating. If not set, when the `truncated-to` wasn'd modified, truncating will be skipped. This is necessary because truncating, even a no-op, requires a full scan over metadatas for now. This will be useful for retrying failed truncations.
func MMOptAppendPhantomMigration ¶
func MMOptAppendPhantomMigration(migs ...pb.Migration) MergeAndMigrateToOpt
MMOptAppendPhantomMigration appends a phantom migration to the merge and migrate to operation. The phantom migration will be persised to BASE during executing. We call it a "phantom" because it wasn't persisted. When the target version isn't the latest version, the order of migrating may be broken. Carefully use this in that case.
func MMOptInteractiveCheck ¶
func MMOptSkipLockingInTest ¶
func MMOptSkipLockingInTest() MergeAndMigrateToOpt
type MergeAndMigratedTo ¶
type MergeAndMigratedTo struct { MigratedTo // The BASE migration of this "migrate to" operation. Base *pb.Migration // The migrations have been merged to the BASE migration. Source []*OrderedMigration }
MergeAndMigratedTo is the result of a call to `MergeAndMigrateTo`.
type MetadataHelper ¶
type MetadataHelper struct {
// contains filtered or unexported fields
}
MetadataHelper make restore/truncate compatible with metadataV1 and metadataV2.
func NewMetadataHelper ¶
func NewMetadataHelper(opts ...MetadataHelperOption) *MetadataHelper
func (*MetadataHelper) Close ¶
func (m *MetadataHelper) Close()
func (*MetadataHelper) InitCacheEntry ¶
func (m *MetadataHelper) InitCacheEntry(path string, ref int)
func (*MetadataHelper) Marshal ¶
func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error)
For truncate command. Marshal metadata to reupload to external storage. The metadata must be unmarshal by `ParseToMetadataHard`
func (*MetadataHelper) ParseToMetadata ¶
func (*MetadataHelper) ParseToMetadata(rawMetaData []byte) (*backuppb.Metadata, error)
func (*MetadataHelper) ParseToMetadataHard ¶
func (*MetadataHelper) ParseToMetadataHard(rawMetaData []byte) (*backuppb.Metadata, error)
Only for deleting, after MetadataV1 is deprecated, we can remove it. Hard means convert to MetaDataV2 deeply.
func (*MetadataHelper) ReadFile ¶
func (m *MetadataHelper) ReadFile( ctx context.Context, path string, offset uint64, length uint64, compressionType backuppb.CompressionType, storage storage.ExternalStorage, encryptionInfo *encryptionpb.FileEncryptionInfo, ) ([]byte, error)
type MetadataHelperOption ¶
type MetadataHelperOption func(*MetadataHelper)
func WithEncryptionManager ¶
func WithEncryptionManager(manager *encryption.Manager) MetadataHelperOption
type MetadataInfo ¶
type MetadataInfo struct { MinTS uint64 FileGroupInfos []*FileGroupInfo }
keep these meta-information for statistics and filtering
type MigratedTo ¶
type MigratedTo struct { // Non-fatal errors happen during executing the migration. Warnings []error // The new BASE migration after the operation. NewBase *pb.Migration }
MigratedTo is the result of trying to "migrate to" a migration.
The term "migrate to" means, try to performance all possible operations from a migration to the storage.
func (*MigratedTo) Warn ¶
func (m *MigratedTo) Warn(err error)
type MigrationExt ¶
type MigrationExt struct { // The hooks used for tracking the execution. // See the `Hooks` type for more details. Hooks Hooks // contains filtered or unexported fields }
MigrationExt is an extension to the `ExternalStorage` type. This added some support methods for the "migration" system of log backup.
Migrations are idempontent batch modifications (adding a compaction, delete a file, etc..) to the backup files. You may check the protocol buffer message `Migration` for more details. Idempontence is important for migrations, as they may be executed multi times due to retry or racing.
The encoded migrations will be put in a folder in the external storage, they are ordered by a series number.
Not all migrations can be applied to the storage then removed from the migration. Small "additions" will be inlined into the migration, for example, a `Compaction`. Small "deletions" will also be delayed, for example, deleting a span in a file. Such operations will be save to a special migration, the first migration, named "BASE".
A simple list of migrations (loaded by `Load()`):
base = [ compaction, deleteSpan, ... ],
layers = { { sn = 1, content = [ compaction, ... ] }, { sn = 2, content = [ compaction, deleteFiles, ... ] },
func MigrationExtension ¶
func MigrationExtension(s storage.ExternalStorage) MigrationExt
MigrateionExtnsion installs the extension methods to an `ExternalStorage`.
func (MigrationExt) AddMigrationToTable ¶
func (MigrationExt) AppendMigration ¶
func (MigrationExt) DryRun ¶
func (m MigrationExt) DryRun(f func(MigrationExt)) []storage.Effect
func (*MigrationExt) GetReadLock ¶
func (m *MigrationExt) GetReadLock(ctx context.Context, hint string) (storage.RemoteLock, error)
GetReadLock locks the storage and make sure there won't be other one modify this backup.
func (MigrationExt) Load ¶
func (m MigrationExt) Load(ctx context.Context, opts ...LoadOptions) (Migrations, error)
Load loads the current living migrations from the storage.
func (MigrationExt) MergeAndMigrateTo ¶
func (m MigrationExt) MergeAndMigrateTo( ctx context.Context, targetSpec int, opts ...MergeAndMigrateToOpt, ) (result MergeAndMigratedTo)
MergeAndMigrateTo will merge the migrations from BASE until the specified SN, then migrate to it. Finally it writes the new BASE and remove stale migrations from the storage.
type Migrations ¶
type Migrations struct { // The BASE migration. Base *pb.Migration `json:"base"` // Appended migrations. // They are sorted by their sequence numbers. Layers []*OrderedMigration `json:"layers"` }
Migrations represents living migrations from the storage.
func (Migrations) ListAll ¶
func (migs Migrations) ListAll() []*pb.Migration
ListAll returns a slice of all migrations in protobuf format. This includes the base migration and any additional layers.
type NoHooks ¶
type NoHooks struct{}
NoHooks is used for non-interactive secnarios.
func (NoHooks) DeletedAFileForTruncating ¶
func (NoHooks) DeletedAllFilesForTruncating ¶
func (NoHooks) DeletedAllFilesForTruncating()
func (NoHooks) EndLoadingMetaForTruncating ¶
func (NoHooks) EndLoadingMetaForTruncating()
func (NoHooks) HandingMetaEditDone ¶
func (NoHooks) HandingMetaEditDone()
func (NoHooks) HandledAMetaEdit ¶
func (NoHooks) StartDeletingForTruncating ¶
func (NoHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
func (NoHooks) StartHandlingMetaEdits ¶
func (NoHooks) StartLoadingMetaForTruncating ¶
func (NoHooks) StartLoadingMetaForTruncating()
type OrderedMigration ¶
type OrderedMigration struct { SeqNum int `json:"seq_num"` Path string `json:"path"` Content pb.Migration `json:"content"` }
OrderedMigration is a migration with its path and sequence number.
type PDInfoProvider ¶
type PathedIngestedSSTs ¶
type PathedIngestedSSTs struct { *pb.IngestedSSTs // contains filtered or unexported fields }
type PreDelRangeQuery ¶
type PreDelRangeQuery struct { Sql string ParamsList []DelRangeParams }
type ProgressBarHooks ¶
type ProgressBarHooks struct {
// contains filtered or unexported fields
}
func NewProgressBarHooks ¶
func NewProgressBarHooks(console glue.ConsoleOperations) *ProgressBarHooks
func (*ProgressBarHooks) DeletedAFileForTruncating ¶
func (p *ProgressBarHooks) DeletedAFileForTruncating(count int)
func (*ProgressBarHooks) DeletedAllFilesForTruncating ¶
func (p *ProgressBarHooks) DeletedAllFilesForTruncating()
func (*ProgressBarHooks) EndLoadingMetaForTruncating ¶
func (p *ProgressBarHooks) EndLoadingMetaForTruncating()
func (*ProgressBarHooks) HandingMetaEditDone ¶
func (p *ProgressBarHooks) HandingMetaEditDone()
func (*ProgressBarHooks) HandledAMetaEdit ¶
func (p *ProgressBarHooks) HandledAMetaEdit(edit *pb.MetaEdit)
func (*ProgressBarHooks) StartDeletingForTruncating ¶
func (p *ProgressBarHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
func (*ProgressBarHooks) StartHandlingMetaEdits ¶
func (p *ProgressBarHooks) StartHandlingMetaEdits(edits []*pb.MetaEdit)
func (*ProgressBarHooks) StartLoadingMetaForTruncating ¶
func (p *ProgressBarHooks) StartLoadingMetaForTruncating()
type RawMetaKey ¶
RawMetaKey specified a transaction meta key.
func ParseTxnMetaKeyFrom ¶
func ParseTxnMetaKeyFrom(txnKey kv.Key) (*RawMetaKey, error)
ParseTxnMetaKeyFrom gets a `RawMetaKey` struct by parsing transaction meta key.
func (*RawMetaKey) EncodeMetaKey ¶
func (k *RawMetaKey) EncodeMetaKey() kv.Key
EncodeMetaKey Encodes RawMetaKey into a transaction key
func (*RawMetaKey) UpdateField ¶
func (k *RawMetaKey) UpdateField(field []byte)
UpdateField updates `Field` field in `RawMetaKey` struct.
func (*RawMetaKey) UpdateKey ¶
func (k *RawMetaKey) UpdateKey(key []byte)
UpdateKey updates `key` field in `RawMetaKey` struct.
func (*RawMetaKey) UpdateTS ¶
func (k *RawMetaKey) UpdateTS(ts uint64)
UpdateTS updates `Ts` field in `RawMetaKey` struct.
type RawWriteCFValue ¶
type RawWriteCFValue struct {
// contains filtered or unexported fields
}
RawWriteCFValue represents the value in write columnFamily. Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70
func (*RawWriteCFValue) EncodeTo ¶
func (v *RawWriteCFValue) EncodeTo() []byte
EncodeTo encodes the RawWriteCFValue to get encoded value.
func (*RawWriteCFValue) GetShortValue ¶
func (v *RawWriteCFValue) GetShortValue() []byte
UpdateShortValue gets the shortValue field.
func (*RawWriteCFValue) GetStartTs ¶
func (v *RawWriteCFValue) GetStartTs() uint64
func (*RawWriteCFValue) GetWriteType ¶
func (v *RawWriteCFValue) GetWriteType() byte
func (*RawWriteCFValue) HasShortValue ¶
func (v *RawWriteCFValue) HasShortValue() bool
HasShortValue checks whether short value is stored in write cf.
func (*RawWriteCFValue) IsDelete ¶
func (v *RawWriteCFValue) IsDelete() bool
IsRollback checks whether the value in cf is a `delete` record.
func (*RawWriteCFValue) IsRollback ¶
func (v *RawWriteCFValue) IsRollback() bool
IsRollback checks whether the value in cf is a `rollback` record.
func (*RawWriteCFValue) ParseFrom ¶
func (v *RawWriteCFValue) ParseFrom(data []byte) error
ParseFrom decodes the value to get the struct `RawWriteCFValue`.
func (*RawWriteCFValue) UpdateShortValue ¶
func (v *RawWriteCFValue) UpdateShortValue(value []byte)
UpdateShortValue updates the shortValue field.
type SchemasReplace ¶
type SchemasReplace struct { DbReplaceMap map[UpstreamID]*DBReplace TiflashRecorder *tiflashrec.TiFlashRecorder RewriteTS uint64 // used to rewrite commit ts in meta kv. AfterTableRewrittenFn func(deleted bool, tableInfo *model.TableInfo) // contains filtered or unexported fields }
SchemasReplace specifies schemas information mapping from up-stream cluster to down-stream cluster.
func NewSchemasReplace ¶
func NewSchemasReplace( dbReplaceMap map[UpstreamID]*DBReplace, tiflashRecorder *tiflashrec.TiFlashRecorder, restoreTS uint64, recordDeleteRange func(*PreDelRangeQuery), ) *SchemasReplace
NewSchemasReplace creates a SchemasReplace struct.
func (*SchemasReplace) GetIngestRecorder ¶
func (sr *SchemasReplace) GetIngestRecorder() *ingestrec.IngestRecorder
func (*SchemasReplace) RewriteMetaKvEntry ¶
RewriteMetaKvEntry uses to rewrite tableID/dbID in entry.key and entry.value
type StatusController ¶
type StatusController struct {
// contains filtered or unexported fields
}
StatusController is the controller type (or context type) for the command `stream status`.
func NewStatusController ¶
func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrinter) *StatusController
NewStatusContorller make a status controller via some resource accessors.
func (*StatusController) Close ¶
func (ctl *StatusController) Close() error
func (*StatusController) PrintStatusOfTask ¶
func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string) error
PrintStatusOfTask prints the status of tasks with the name. When the name is *, print all tasks.
type StreamBackupSearch ¶
type StreamBackupSearch struct {
// contains filtered or unexported fields
}
StreamBackupSearch is used for searching key from log data files
func NewStreamBackupSearch ¶
func NewStreamBackupSearch( storage storage.ExternalStorage, comparator Comparator, searchKey []byte, ) *StreamBackupSearch
NewStreamBackupSearch creates an instance of StreamBackupSearch
func (*StreamBackupSearch) Search ¶
func (s *StreamBackupSearch) Search(ctx context.Context) ([]*StreamKVInfo, error)
Search kv entries from log data files
func (*StreamBackupSearch) SetEndTs ¶
func (s *StreamBackupSearch) SetEndTs(endTs uint64)
SetEndTs set end timestamp searched to
func (*StreamBackupSearch) SetStartTS ¶
func (s *StreamBackupSearch) SetStartTS(startTs uint64)
SetStartTS set start timestamp searched from
type StreamKVInfo ¶
type StreamKVInfo struct { Key string `json:"key"` EncodedKey string `json:"-"` WriteType byte `json:"write-type"` StartTs uint64 `json:"start-ts"` CommitTs uint64 `json:"commit-ts"` CFName string `json:"cf-name"` Value string `json:"value,omitempty"` ShortValue string `json:"short-value,omitempty"` }
StreamKVInfo stores kv info searched from log data files
type StreamMetadataSet ¶
type StreamMetadataSet struct { // if set true, the metadata and datafile won't be removed DryRun bool MetadataDownloadBatchSize uint // a parser of metadata Helper *MetadataHelper // for test BeforeDoWriteBack func(path string, replaced *pb.Metadata) (skip bool) // contains filtered or unexported fields }
func (*StreamMetadataSet) IterateFilesFullyBefore ¶
func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))
IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.
0 before |------------------------------------------| |-file1---------------| <- File contains records in this TS range would be found. |-file2--------------| <- File contains any record out of this won't be found.
This function would call the `f` over file1 only.
func (*StreamMetadataSet) LoadFrom ¶
func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storage.ExternalStorage) error
LoadFrom loads data from an external storage into the stream metadata set. (Now only for test)
func (*StreamMetadataSet) LoadUntilAndCalculateShiftTS ¶
func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS( ctx context.Context, s storage.ExternalStorage, until uint64, ) (uint64, error)
LoadUntilAndCalculateShiftTS loads the metadata until the specified timestamp and calculate the shift-until-ts by the way. This would record all metadata files that *may* contain data from transaction committed before that TS.
func (*StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch ¶
func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch( ctx context.Context, from uint64, st storage.ExternalStorage, updateFn func(num int64), ) ([]string, error)
RemoveDataFilesAndUpdateMetadataInBatch concurrently remove datafilegroups and update metadata. Only one metadata is processed in each thread, including deleting its datafilegroup and updating it. Returns the not deleted datafilegroups.
func (*StreamMetadataSet) TEST_GetMetadataInfos ¶
func (ms *StreamMetadataSet) TEST_GetMetadataInfos() map[string]*MetadataInfo
type TableLocationInfo ¶
type TableLocationInfo struct { DbID int64 TableName string IsPartition bool ParentTableID int64 // only meaningful when IsPartition is true }
TableLocationInfo stores the table name, db id, and parent table id if is a partition
type TableMappingManager ¶
type TableMappingManager struct { DBReplaceMap map[UpstreamID]*DBReplace // contains filtered or unexported fields }
TableMappingManager processes each log backup meta kv and generate new id for DB, table and partition for downstream cluster. It maintains the id mapping and passes down later to the rewrite logic.
The usage in the code base is listed below 1. during PiTR, it runs before snapshot restore to collect table id mapping information. For each id to map it generates a dummy downstream id first, this is because we can only generate global id after running snapshot restore 2. at log restore phase, it merges the db replace map generated from the full backup or previous PiTR task, it will replace some dummy id at this step. 3. it runs a filter to filter out tables that we don't need 4. after all above steps, it uses the genGenGlobalIDs method to generate a batch of ids in one call and replace the dummy ids, it builds the final state of the db replace map
func NewTableMappingManager ¶
func NewTableMappingManager() *TableMappingManager
func (*TableMappingManager) ApplyFilterToDBReplaceMap ¶
func (tm *TableMappingManager) ApplyFilterToDBReplaceMap(tracker *utils.PiTRIdTracker)
func (*TableMappingManager) FromDBReplaceMap ¶
func (tm *TableMappingManager) FromDBReplaceMap(dbReplaceMap map[UpstreamID]*DBReplace) error
func (*TableMappingManager) IsEmpty ¶
func (tm *TableMappingManager) IsEmpty() bool
func (*TableMappingManager) MergeBaseDBReplace ¶
func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBReplace)
func (*TableMappingManager) ParseMetaKvAndUpdateIdMapping ¶
func (tm *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf string) error
ParseMetaKvAndUpdateIdMapping collect table information the keys and values that are selected to parse here follows the implementation in rewrite_meta_rawkv. Maybe parsing a subset of these keys/values would suffice, but to make it safe we decide to parse exactly same as in rewrite_meta_rawkv.
func (*TableMappingManager) ReplaceTemporaryIDs ¶
func (*TableMappingManager) ToProto ¶
func (tm *TableMappingManager) ToProto() []*backuppb.PitrDBMap
ToProto produces schemas id maps from up-stream to down-stream.
type TableReplace ¶
type TableReplace struct { Name string TableID DownstreamID PartitionMap map[UpstreamID]DownstreamID IndexMap map[UpstreamID]DownstreamID FilteredOut bool }
TableReplace specifies table information mapping from up-stream cluster to down-stream cluster.
func NewTableReplace ¶
func NewTableReplace(name string, newID DownstreamID) *TableReplace
NewTableReplace creates a TableReplace struct.
type TaskPrinter ¶
type TaskPrinter interface { AddTask(t TaskStatus) PrintTasks() }
func PrintTaskByTable ¶
func PrintTaskByTable(c glue.ConsoleOperations) TaskPrinter
PrintTaskByTable make a TaskPrinter, which prints the task by the `Table` implement of the console.
func PrintTaskWithJSON ¶
func PrintTaskWithJSON(c glue.ConsoleOperations) TaskPrinter
PrintTaskWithJSON make a TaskPrinter, which prints tasks as json to the console directly.
type TaskStatus ¶
type TaskStatus struct { // Info is the stream task information. Info backuppb.StreamBackupTaskInfo // Checkpoints collects the checkpoints. Checkpoints []Checkpoint // Total QPS of the task in recent seconds. QPS float64 // Last error reported by the store. LastErrors map[uint64]backuppb.StreamBackupError // contains filtered or unexported fields }
func (TaskStatus) GetMinStoreCheckpoint ¶
func (t TaskStatus) GetMinStoreCheckpoint() Checkpoint
GetCheckpoint calculates the checkpoint of the task.
type UpstreamID ¶
type UpstreamID = int64