Documentation ¶
Index ¶
- Constants
- func AddMigrationToTable(m *pb.Migration, table *glue.Table)
- func BuildObserveDataRanges(storage kv.Storage, filterStr []string, tableFilter filter.Filter, ...) ([]kv.KeyRange, error)
- func BuildObserveMetaRange() *kv.KeyRange
- func DecodeKVEntry(buff []byte) (k, v []byte, length uint32, err error)
- func EncodeKVEntry(k, v []byte) []byte
- func FastUnmarshalMetaData(ctx context.Context, s storage.ExternalStorage, metaDataWorkerPoolSize uint, ...) error
- func FormatDate(ts time.Time) string
- func FromDBMapProto(dbMaps []*backuppb.PitrDBMap) map[UpstreamID]*DBReplace
- func GetStreamBackupGlobalCheckpointPrefix() string
- func GetStreamBackupMetaPrefix() string
- func GetTSFromFile(ctx context.Context, s storage.ExternalStorage, filename string) (uint64, error)
- func IsMetaDBKey(key []byte) bool
- func IsMetaDDLJobHistoryKey(key []byte) bool
- func MTSkipTruncateLog(o *migToOpt)
- func MaybeDBOrDDLJobHistoryKey(key []byte) bool
- func MaybeQPS(ctx context.Context, mgr PDInfoProvider, client *http.Client) (float64, error)
- func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration
- func NewMigration() *pb.Migration
- func ParseDBIDFromTableKey(key []byte) (int64, error)
- func ReplaceMetadata(meta *pb.Metadata, filegroups []*pb.DataFileGroup)
- func SetTSToFile(ctx context.Context, s storage.ExternalStorage, safepoint uint64, ...) error
- func UpdateShiftTS(m *pb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool)
- type Comparator
- type ContentRef
- type DBReplace
- type DelRangeParams
- type DownstreamID
- type EventIterator
- type FileGroupInfo
- type Hooks
- type Iterator
- type MergeAndMigrateToOpt
- type MergeAndMigratedTo
- type MetadataHelper
- func (m *MetadataHelper) InitCacheEntry(path string, ref int)
- func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error)
- func (*MetadataHelper) ParseToMetadata(rawMetaData []byte) (*backuppb.Metadata, error)
- func (*MetadataHelper) ParseToMetadataHard(rawMetaData []byte) (*backuppb.Metadata, error)
- func (m *MetadataHelper) ReadFile(ctx context.Context, path string, offset uint64, length uint64, ...) ([]byte, error)
- type MetadataHelperOption
- type MetadataInfo
- type MigrateToOpt
- type MigratedTo
- type MigrationExt
- func (m MigrationExt) AppendMigration(ctx context.Context, mig *pb.Migration) (int, error)
- func (m MigrationExt) DryRun(f func(MigrationExt)) []storage.Effect
- func (m MigrationExt) Load(ctx context.Context) (Migrations, error)
- func (m MigrationExt) MergeAndMigrateTo(ctx context.Context, targetSpec int, opts ...MergeAndMigrateToOpt) (result MergeAndMigratedTo)
- func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ...MigrateToOpt) MigratedTo
- type Migrations
- type NoHooks
- func (NoHooks) DeletedAFileForTruncating(count int)
- func (NoHooks) DeletedAllFilesForTruncating()
- func (NoHooks) EndLoadingMetaForTruncating()
- func (NoHooks) HandingMetaEditDone()
- func (NoHooks) HandledAMetaEdit(*pb.MetaEdit)
- func (NoHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
- func (NoHooks) StartHandlingMetaEdits([]*pb.MetaEdit)
- func (NoHooks) StartLoadingMetaForTruncating()
- type OrderedMigration
- type PDInfoProvider
- type PreDelRangeQuery
- type ProgressBarHooks
- func (p *ProgressBarHooks) DeletedAFileForTruncating(count int)
- func (p *ProgressBarHooks) DeletedAllFilesForTruncating()
- func (p *ProgressBarHooks) EndLoadingMetaForTruncating()
- func (p *ProgressBarHooks) HandingMetaEditDone()
- func (p *ProgressBarHooks) HandledAMetaEdit(edit *pb.MetaEdit)
- func (p *ProgressBarHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
- func (p *ProgressBarHooks) StartHandlingMetaEdits(edits []*pb.MetaEdit)
- func (p *ProgressBarHooks) StartLoadingMetaForTruncating()
- type RawMetaKey
- type RawWriteCFValue
- func (v *RawWriteCFValue) EncodeTo() []byte
- func (v *RawWriteCFValue) GetShortValue() []byte
- func (v *RawWriteCFValue) GetStartTs() uint64
- func (v *RawWriteCFValue) GetWriteType() byte
- func (v *RawWriteCFValue) HasShortValue() bool
- func (v *RawWriteCFValue) IsDelete() bool
- func (v *RawWriteCFValue) IsRollback() bool
- func (v *RawWriteCFValue) ParseFrom(data []byte) error
- func (v *RawWriteCFValue) UpdateShortValue(value []byte)
- type SchemasReplace
- type StatusController
- type StreamBackupSearch
- type StreamKVInfo
- type StreamMetadataSet
- func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))
- func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storage.ExternalStorage) error
- func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s storage.ExternalStorage, until uint64) (uint64, error)
- func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context.Context, from uint64, st storage.ExternalStorage, ...) ([]string, error)
- func (ms *StreamMetadataSet) TEST_GetMetadataInfos() map[string]*MetadataInfo
- type TableMappingManager
- type TableReplace
- type TaskPrinter
- type TaskStatus
- type UpstreamID
- type WriteType
Constants ¶
const ( DefaultCF = "default" WriteCF = "write" )
Default columnFamily and write columnFamily
const DATE_FORMAT = "2006-01-02 15:04:05.999999999 -0700"
const (
SupportedMigVersion = pb.MigrationVersion_M1
)
const (
// TruncateSafePointFileName is the filename that the ts(the log have been truncated) is saved into.
TruncateSafePointFileName = "v1_stream_trancate_safepoint.txt"
)
const WildCard = "*"
Variables ¶
This section is empty.
Functions ¶
func BuildObserveDataRanges ¶
func BuildObserveDataRanges( storage kv.Storage, filterStr []string, tableFilter filter.Filter, backupTS uint64, ) ([]kv.KeyRange, error)
BuildObserveDataRanges builds key ranges to observe data KV.
func BuildObserveMetaRange ¶
BuildObserveMetaRange specifies build key ranges to observe meta KV(contains all of metas)
func DecodeKVEntry ¶
DecodeKVEntry decodes kv-entry from buff. If error returned is nil, it can get key/value and the length occupied in buff.
func EncodeKVEntry ¶
EncodeKVEntry gets a entry-event from input: k, v.
func FastUnmarshalMetaData ¶
func FastUnmarshalMetaData( ctx context.Context, s storage.ExternalStorage, metaDataWorkerPoolSize uint, fn func(path string, rawMetaData []byte) error, ) error
FastUnmarshalMetaData used a 128 worker pool to speed up read metadata content from external_storage.
func FormatDate ¶
func FromDBMapProto ¶
func FromDBMapProto(dbMaps []*backuppb.PitrDBMap) map[UpstreamID]*DBReplace
func GetStreamBackupGlobalCheckpointPrefix ¶
func GetStreamBackupGlobalCheckpointPrefix() string
func GetStreamBackupMetaPrefix ¶
func GetStreamBackupMetaPrefix() string
func GetTSFromFile ¶
func GetTSFromFile( ctx context.Context, s storage.ExternalStorage, filename string, ) (uint64, error)
GetTSFromFile gets the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.
func IsMetaDBKey ¶
func IsMetaDDLJobHistoryKey ¶
func MTSkipTruncateLog ¶
func MTSkipTruncateLog(o *migToOpt)
func MaybeQPS ¶
MaybeQPS get a number like the QPS of last seconds for each store via the prometheus interface. TODO: this is a temporary solution(aha, like in a Hackthon),
we MUST find a better way for providing this information.
func MergeMigrations ¶
Merge merges two migrations. The merged migration contains all operations from the two arguments.
func NewMigration ¶
func ParseDBIDFromTableKey ¶
func ReplaceMetadata ¶
func ReplaceMetadata(meta *pb.Metadata, filegroups []*pb.DataFileGroup)
replace the filegroups and update the ts of the replaced metadata
func SetTSToFile ¶
func SetTSToFile( ctx context.Context, s storage.ExternalStorage, safepoint uint64, filename string, ) error
SetTSToFile overrides the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.
Types ¶
type Comparator ¶
Comparator is used for comparing the relationship of src and dst
func NewStartWithComparator ¶
func NewStartWithComparator() Comparator
NewStartWithComparator create a comparator to compare whether src starts with dst
type ContentRef ¶
type ContentRef struct {
// contains filtered or unexported fields
}
type DBReplace ¶
type DBReplace struct { Name string DbID DownstreamID TableMap map[UpstreamID]*TableReplace }
DBReplace specifies database information mapping from up-stream cluster to up-stream cluster.
func NewDBReplace ¶
func NewDBReplace(name string, newID DownstreamID) *DBReplace
NewDBReplace creates a DBReplace struct.
type DelRangeParams ¶
type DownstreamID ¶
type DownstreamID = int64
type EventIterator ¶
type EventIterator struct {
// contains filtered or unexported fields
}
EventIterator is used for reading kv-event from buffer by iterator.
func (*EventIterator) GetError ¶
func (ei *EventIterator) GetError() error
GetError gets the error in iterator. it returns nil if it has nothing mistaken happened
func (*EventIterator) Key ¶
func (ei *EventIterator) Key() []byte
Key gets the key in kv-event if valid() == true
func (*EventIterator) Next ¶
func (ei *EventIterator) Next()
Next specifies the next iterative element.
func (*EventIterator) Valid ¶
func (ei *EventIterator) Valid() bool
Valid checks whether the iterator is valid.
func (*EventIterator) Value ¶
func (ei *EventIterator) Value() []byte
Value gets the value in kv-event if valid() == true
type FileGroupInfo ¶
keep these meta-information for statistics and filtering
type Hooks ¶
type Hooks interface { StartLoadingMetaForTruncating() EndLoadingMetaForTruncating() StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64) DeletedAFileForTruncating(count int) DeletedAllFilesForTruncating() StartHandlingMetaEdits([]*pb.MetaEdit) HandledAMetaEdit(*pb.MetaEdit) HandingMetaEditDone() }
type Iterator ¶
Iterator specifies a read iterator Interface.
func NewEventIterator ¶
NewEventIterator creates a Iterator to read kv-event.
type MergeAndMigrateToOpt ¶
type MergeAndMigrateToOpt func(*mergeAndMigrateToConfig)
func MMOptAlwaysRunTruncate ¶
func MMOptAlwaysRunTruncate() MergeAndMigrateToOpt
MMOptAlwaysRunTruncate forces the merge and migrate to always run the truncating. If not set, when the `truncated-to` wasn'd modified, truncating will be skipped. This is necessary because truncating, even a no-op, requires a full scan over metadatas for now. This will be useful for retrying failed truncations.
func MMOptAppendPhantomMigration ¶
func MMOptAppendPhantomMigration(migs ...pb.Migration) MergeAndMigrateToOpt
MMOptAppendPhantomMigration appends a phantom migration to the merge and migrate to operation. The phantom migration will be persised to BASE during executing. We call it a "phantom" because it wasn't persisted. When the target version isn't the latest version, the order of migrating may be broken. Carefully use this in that case.
func MMOptInteractiveCheck ¶
type MergeAndMigratedTo ¶
type MergeAndMigratedTo struct { MigratedTo // The BASE migration of this "migrate to" operation. Base *pb.Migration // The migrations have been merged to the BASE migration. Source []*OrderedMigration }
MergeAndMigratedTo is the result of a call to `MergeAndMigrateTo`.
type MetadataHelper ¶
type MetadataHelper struct {
// contains filtered or unexported fields
}
MetadataHelper make restore/truncate compatible with metadataV1 and metadataV2.
func NewMetadataHelper ¶
func NewMetadataHelper(opts ...MetadataHelperOption) *MetadataHelper
func (*MetadataHelper) InitCacheEntry ¶
func (m *MetadataHelper) InitCacheEntry(path string, ref int)
func (*MetadataHelper) Marshal ¶
func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error)
For truncate command. Marshal metadata to reupload to external storage. The metadata must be unmarshal by `ParseToMetadataHard`
func (*MetadataHelper) ParseToMetadata ¶
func (*MetadataHelper) ParseToMetadata(rawMetaData []byte) (*backuppb.Metadata, error)
func (*MetadataHelper) ParseToMetadataHard ¶
func (*MetadataHelper) ParseToMetadataHard(rawMetaData []byte) (*backuppb.Metadata, error)
Only for deleting, after MetadataV1 is deprecated, we can remove it. Hard means convert to MetaDataV2 deeply.
func (*MetadataHelper) ReadFile ¶
func (m *MetadataHelper) ReadFile( ctx context.Context, path string, offset uint64, length uint64, compressionType backuppb.CompressionType, storage storage.ExternalStorage, encryptionInfo *encryptionpb.FileEncryptionInfo, ) ([]byte, error)
type MetadataHelperOption ¶
type MetadataHelperOption func(*MetadataHelper)
func WithEncryptionManager ¶
func WithEncryptionManager(manager *encryption.Manager) MetadataHelperOption
type MetadataInfo ¶
type MetadataInfo struct { MinTS uint64 FileGroupInfos []*FileGroupInfo }
keep these meta-information for statistics and filtering
type MigrateToOpt ¶
type MigrateToOpt func(*migToOpt)
func MTMaybeSkipTruncateLog ¶
func MTMaybeSkipTruncateLog(cond bool) MigrateToOpt
type MigratedTo ¶
type MigratedTo struct { // Errors happen during executing the migration. Warnings []error // The new BASE migration after the operation. NewBase *pb.Migration }
MigratedTo is the result of trying to "migrate to" a migration.
The term "migrate to" means, try to performance all possible operations from a migration to the storage.
type MigrationExt ¶
type MigrationExt struct { // The hooks used for tracking the execution. // See the `Hooks` type for more details. Hooks Hooks // contains filtered or unexported fields }
MigrationExt is an extension to the `ExternalStorage` type. This added some support methods for the "migration" system of log backup.
Migrations are idempontent batch modifications (adding a compaction, delete a file, etc..) to the backup files. You may check the protocol buffer message `Migration` for more details. Idempontence is important for migrations, as they may be executed multi times due to retry or racing.
The encoded migrations will be put in a folder in the external storage, they are ordered by a series number.
Not all migrations can be applied to the storage then removed from the migration. Small "additions" will be inlined into the migration, for example, a `Compaction`. Small "deletions" will also be delayed, for example, deleting a span in a file. Such operations will be save to a special migration, the first migration, named "BASE".
A simple list of migrations (loaded by `Load()`):
base = [ compaction, deleteSpan, ... ],
layers = { { sn = 1, content = [ compaction, ... ] }, { sn = 2, content = [ compaction, deleteFiles, ... ] },
func MigerationExtension ¶
func MigerationExtension(s storage.ExternalStorage) MigrationExt
MigrateionExtnsion installs the extension methods to an `ExternalStorage`.
func (MigrationExt) AppendMigration ¶
func (MigrationExt) DryRun ¶
func (m MigrationExt) DryRun(f func(MigrationExt)) []storage.Effect
func (MigrationExt) Load ¶
func (m MigrationExt) Load(ctx context.Context) (Migrations, error)
Load loads the current living migrations from the storage.
func (MigrationExt) MergeAndMigrateTo ¶
func (m MigrationExt) MergeAndMigrateTo( ctx context.Context, targetSpec int, opts ...MergeAndMigrateToOpt, ) (result MergeAndMigratedTo)
MergeAndMigrateTo will merge the migrations from BASE until the specified SN, then migrate to it. Finally it writes the new BASE and remove stale migrations from the storage.
func (MigrationExt) MigrateTo ¶
func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ...MigrateToOpt) MigratedTo
MigrateTo migrates to a migration. If encountered some error during executing some operation, the operation will be put to the new BASE, which can be retryed then.
type Migrations ¶
type Migrations struct { // The BASE migration. Base *pb.Migration `json:"base"` // Appended migrations. // They are sorted by their sequence numbers. Layers []*OrderedMigration `json:"layers"` }
Migrations represents living migrations from the storage.
func (Migrations) ListAll ¶
func (migs Migrations) ListAll() []*pb.Migration
ListAll returns a slice of all migrations in protobuf format. This includes the base migration and any additional layers.
type NoHooks ¶
type NoHooks struct{}
NoHooks is used for non-interactive secnarios.
func (NoHooks) DeletedAFileForTruncating ¶
func (NoHooks) DeletedAllFilesForTruncating ¶
func (NoHooks) DeletedAllFilesForTruncating()
func (NoHooks) EndLoadingMetaForTruncating ¶
func (NoHooks) EndLoadingMetaForTruncating()
func (NoHooks) HandingMetaEditDone ¶
func (NoHooks) HandingMetaEditDone()
func (NoHooks) HandledAMetaEdit ¶
func (NoHooks) StartDeletingForTruncating ¶
func (NoHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
func (NoHooks) StartHandlingMetaEdits ¶
func (NoHooks) StartLoadingMetaForTruncating ¶
func (NoHooks) StartLoadingMetaForTruncating()
type OrderedMigration ¶
type OrderedMigration struct { SeqNum int `json:"seq_num"` Path string `json:"path"` Content pb.Migration `json:"content"` }
OrderedMigration is a migration with its path and sequence number.
type PDInfoProvider ¶
type PreDelRangeQuery ¶
type PreDelRangeQuery struct { Sql string ParamsList []DelRangeParams }
type ProgressBarHooks ¶
type ProgressBarHooks struct {
// contains filtered or unexported fields
}
func NewProgressBarHooks ¶
func NewProgressBarHooks(console glue.ConsoleOperations) *ProgressBarHooks
func (*ProgressBarHooks) DeletedAFileForTruncating ¶
func (p *ProgressBarHooks) DeletedAFileForTruncating(count int)
func (*ProgressBarHooks) DeletedAllFilesForTruncating ¶
func (p *ProgressBarHooks) DeletedAllFilesForTruncating()
func (*ProgressBarHooks) EndLoadingMetaForTruncating ¶
func (p *ProgressBarHooks) EndLoadingMetaForTruncating()
func (*ProgressBarHooks) HandingMetaEditDone ¶
func (p *ProgressBarHooks) HandingMetaEditDone()
func (*ProgressBarHooks) HandledAMetaEdit ¶
func (p *ProgressBarHooks) HandledAMetaEdit(edit *pb.MetaEdit)
func (*ProgressBarHooks) StartDeletingForTruncating ¶
func (p *ProgressBarHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
func (*ProgressBarHooks) StartHandlingMetaEdits ¶
func (p *ProgressBarHooks) StartHandlingMetaEdits(edits []*pb.MetaEdit)
func (*ProgressBarHooks) StartLoadingMetaForTruncating ¶
func (p *ProgressBarHooks) StartLoadingMetaForTruncating()
type RawMetaKey ¶
RawMetaKey specified a transaction meta key.
func ParseTxnMetaKeyFrom ¶
func ParseTxnMetaKeyFrom(txnKey kv.Key) (*RawMetaKey, error)
ParseTxnMetaKeyFrom gets a `RawMetaKey` struct by parsing transaction meta key.
func (*RawMetaKey) EncodeMetaKey ¶
func (k *RawMetaKey) EncodeMetaKey() kv.Key
EncodeMetaKey Encodes RawMetaKey into a transaction key
func (*RawMetaKey) UpdateField ¶
func (k *RawMetaKey) UpdateField(field []byte)
UpdateField updates `Field` field in `RawMetaKey` struct.
func (*RawMetaKey) UpdateKey ¶
func (k *RawMetaKey) UpdateKey(key []byte)
UpdateKey updates `key` field in `RawMetaKey` struct.
func (*RawMetaKey) UpdateTS ¶
func (k *RawMetaKey) UpdateTS(ts uint64)
UpdateTS updates `Ts` field in `RawMetaKey` struct.
type RawWriteCFValue ¶
type RawWriteCFValue struct {
// contains filtered or unexported fields
}
RawWriteCFValue represents the value in write columnFamily. Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70
func (*RawWriteCFValue) EncodeTo ¶
func (v *RawWriteCFValue) EncodeTo() []byte
EncodeTo encodes the RawWriteCFValue to get encoded value.
func (*RawWriteCFValue) GetShortValue ¶
func (v *RawWriteCFValue) GetShortValue() []byte
UpdateShortValue gets the shortValue field.
func (*RawWriteCFValue) GetStartTs ¶
func (v *RawWriteCFValue) GetStartTs() uint64
func (*RawWriteCFValue) GetWriteType ¶
func (v *RawWriteCFValue) GetWriteType() byte
func (*RawWriteCFValue) HasShortValue ¶
func (v *RawWriteCFValue) HasShortValue() bool
HasShortValue checks whether short value is stored in write cf.
func (*RawWriteCFValue) IsDelete ¶
func (v *RawWriteCFValue) IsDelete() bool
IsRollback checks whether the value in cf is a `delete` record.
func (*RawWriteCFValue) IsRollback ¶
func (v *RawWriteCFValue) IsRollback() bool
IsRollback checks whether the value in cf is a `rollback` record.
func (*RawWriteCFValue) ParseFrom ¶
func (v *RawWriteCFValue) ParseFrom(data []byte) error
ParseFrom decodes the value to get the struct `RawWriteCFValue`.
func (*RawWriteCFValue) UpdateShortValue ¶
func (v *RawWriteCFValue) UpdateShortValue(value []byte)
UpdateShortValue updates the shortValue field.
type SchemasReplace ¶
type SchemasReplace struct { DbMap map[UpstreamID]*DBReplace TiflashRecorder *tiflashrec.TiFlashRecorder RewriteTS uint64 // used to rewrite commit ts in meta kv. TableFilter filter.Filter // used to filter schema/table AfterTableRewritten func(deleted bool, tableInfo *model.TableInfo) // contains filtered or unexported fields }
SchemasReplace specifies schemas information mapping from up-stream cluster to up-stream cluster.
func NewSchemasReplace ¶
func NewSchemasReplace( dbMap map[UpstreamID]*DBReplace, tiflashRecorder *tiflashrec.TiFlashRecorder, restoreTS uint64, tableFilter filter.Filter, recordDeleteRange func(*PreDelRangeQuery), ) *SchemasReplace
NewSchemasReplace creates a SchemasReplace struct.
func (*SchemasReplace) GetIngestRecorder ¶
func (sr *SchemasReplace) GetIngestRecorder() *ingestrec.IngestRecorder
func (*SchemasReplace) RewriteKvEntry ¶
RewriteKvEntry uses to rewrite tableID/dbID in entry.key and entry.value
type StatusController ¶
type StatusController struct {
// contains filtered or unexported fields
}
StatusController is the controller type (or context type) for the command `stream status`.
func NewStatusController ¶
func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrinter) *StatusController
NewStatusContorller make a status controller via some resource accessors.
func (*StatusController) Close ¶
func (ctl *StatusController) Close() error
func (*StatusController) PrintStatusOfTask ¶
func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string) error
PrintStatusOfTask prints the status of tasks with the name. When the name is *, print all tasks.
type StreamBackupSearch ¶
type StreamBackupSearch struct {
// contains filtered or unexported fields
}
StreamBackupSearch is used for searching key from log data files
func NewStreamBackupSearch ¶
func NewStreamBackupSearch( storage storage.ExternalStorage, comparator Comparator, searchKey []byte, ) *StreamBackupSearch
NewStreamBackupSearch creates an instance of StreamBackupSearch
func (*StreamBackupSearch) Search ¶
func (s *StreamBackupSearch) Search(ctx context.Context) ([]*StreamKVInfo, error)
Search kv entries from log data files
func (*StreamBackupSearch) SetEndTs ¶
func (s *StreamBackupSearch) SetEndTs(endTs uint64)
SetEndTs set end timestamp searched to
func (*StreamBackupSearch) SetStartTS ¶
func (s *StreamBackupSearch) SetStartTS(startTs uint64)
SetStartTS set start timestamp searched from
type StreamKVInfo ¶
type StreamKVInfo struct { Key string `json:"key"` EncodedKey string `json:"-"` WriteType byte `json:"write-type"` StartTs uint64 `json:"start-ts"` CommitTs uint64 `json:"commit-ts"` CFName string `json:"cf-name"` Value string `json:"value,omitempty"` ShortValue string `json:"short-value,omitempty"` }
StreamKVInfo stores kv info searched from log data files
type StreamMetadataSet ¶
type StreamMetadataSet struct { // if set true, the metadata and datafile won't be removed DryRun bool MetadataDownloadBatchSize uint // a parser of metadata Helper *MetadataHelper // for test BeforeDoWriteBack func(path string, replaced *pb.Metadata) (skip bool) // contains filtered or unexported fields }
func (*StreamMetadataSet) IterateFilesFullyBefore ¶
func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))
IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.
0 before |------------------------------------------| |-file1---------------| <- File contains records in this TS range would be found. |-file2--------------| <- File contains any record out of this won't be found.
This function would call the `f` over file1 only.
func (*StreamMetadataSet) LoadFrom ¶
func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storage.ExternalStorage) error
LoadFrom loads data from an external storage into the stream metadata set. (Now only for test)
func (*StreamMetadataSet) LoadUntilAndCalculateShiftTS ¶
func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS( ctx context.Context, s storage.ExternalStorage, until uint64, ) (uint64, error)
LoadUntilAndCalculateShiftTS loads the metadata until the specified timestamp and calculate the shift-until-ts by the way. This would record all metadata files that *may* contain data from transaction committed before that TS.
func (*StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch ¶
func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch( ctx context.Context, from uint64, st storage.ExternalStorage, updateFn func(num int64), ) ([]string, error)
RemoveDataFilesAndUpdateMetadataInBatch concurrently remove datafilegroups and update metadata. Only one metadata is processed in each thread, including deleting its datafilegroup and updating it. Returns the not deleted datafilegroups.
func (*StreamMetadataSet) TEST_GetMetadataInfos ¶
func (ms *StreamMetadataSet) TEST_GetMetadataInfos() map[string]*MetadataInfo
type TableMappingManager ¶
type TableMappingManager struct { DbReplaceMap map[UpstreamID]*DBReplace // contains filtered or unexported fields }
TableMappingManager iterates on log backup meta kvs and generate new id for DB, table and partition for downstream cluster. It maintains the id mapping and passes down later to the rewrite logic.
func NewTableMappingManager ¶
func NewTableMappingManager( dbReplaceMap map[UpstreamID]*DBReplace, genGlobalIdFn func(ctx context.Context) (int64, error)) *TableMappingManager
func (*TableMappingManager) ParseMetaKvAndUpdateIdMapping ¶
func (tc *TableMappingManager) ParseMetaKvAndUpdateIdMapping(e *kv.Entry, cf string) error
ParseMetaKvAndUpdateIdMapping collect table information
func (*TableMappingManager) ToProto ¶
func (tc *TableMappingManager) ToProto() []*backuppb.PitrDBMap
ToProto produces schemas id maps from up-stream to down-stream.
type TableReplace ¶
type TableReplace struct { Name string TableID DownstreamID PartitionMap map[UpstreamID]DownstreamID IndexMap map[UpstreamID]DownstreamID }
TableReplace specifies table information mapping from up-stream cluster to up-stream cluster.
func NewTableReplace ¶
func NewTableReplace(name string, newID DownstreamID) *TableReplace
NewTableReplace creates a TableReplace struct.
type TaskPrinter ¶
type TaskPrinter interface { AddTask(t TaskStatus) PrintTasks() }
func PrintTaskByTable ¶
func PrintTaskByTable(c glue.ConsoleOperations) TaskPrinter
PrintTaskByTable make a TaskPrinter, which prints the task by the `Table` implement of the console.
func PrintTaskWithJSON ¶
func PrintTaskWithJSON(c glue.ConsoleOperations) TaskPrinter
PrintTaskWithJSON make a TaskPrinter, which prints tasks as json to the console directly.
type TaskStatus ¶
type TaskStatus struct { // Info is the stream task information. Info backuppb.StreamBackupTaskInfo // Checkpoints collects the checkpoints. Checkpoints []Checkpoint // Total QPS of the task in recent seconds. QPS float64 // Last error reported by the store. LastErrors map[uint64]backuppb.StreamBackupError // contains filtered or unexported fields }
func (TaskStatus) GetMinStoreCheckpoint ¶
func (t TaskStatus) GetMinStoreCheckpoint() Checkpoint
GetCheckpoint calculates the checkpoint of the task.
type UpstreamID ¶
type UpstreamID = int64