stream

package
v1.1.0-beta.0...-0ffac36 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0, Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultCF = "default"
	WriteCF   = "write"
)

Default columnFamily and write columnFamily

View Source
const (
	RewriteStatusPreConstructMap = iota // represents construct map status.
	RewriteStatusRestoreKV              // represents restore meta kv status.
)
View Source
const DATE_FORMAT = "2006-01-02 15:04:05.999999999 -0700"
View Source
const (
	SupportedMigVersion = pb.MigrationVersion_M1
)
View Source
const (
	// TruncateSafePointFileName is the filename that the ts(the log have been truncated) is saved into.
	TruncateSafePointFileName = "v1_stream_trancate_safepoint.txt"
)
View Source
const WildCard = "*"

Variables

This section is empty.

Functions

func AddMigrationToTable

func AddMigrationToTable(m *pb.Migration, table *glue.Table)

func BuildObserveDataRanges

func BuildObserveDataRanges(
	storage kv.Storage,
	filterStr []string,
	tableFilter filter.Filter,
	backupTS uint64,
) ([]kv.KeyRange, error)

BuildObserveDataRanges builds key ranges to observe data KV.

func BuildObserveMetaRange

func BuildObserveMetaRange() *kv.KeyRange

BuildObserveMetaRange specifies build key ranges to observe meta KV(contains all of metas)

func DecodeKVEntry

func DecodeKVEntry(buff []byte) (k, v []byte, length uint32, err error)

DecodeKVEntry decodes kv-entry from buff. If error returned is nil, it can get key/value and the length occupied in buff.

func EncodeKVEntry

func EncodeKVEntry(k, v []byte) []byte

EncodeKVEntry gets a entry-event from input: k, v.

func FastUnmarshalMetaData

func FastUnmarshalMetaData(
	ctx context.Context,
	s storage.ExternalStorage,
	metaDataWorkerPoolSize uint,
	fn func(path string, rawMetaData []byte) error,
) error

FastUnmarshalMetaData used a 128 worker pool to speed up read metadata content from external_storage.

func FormatDate

func FormatDate(ts time.Time) string

func FromSchemaMaps

func FromSchemaMaps(dbMaps []*backuppb.PitrDBMap) map[UpstreamID]*DBReplace

func GetStreamBackupGlobalCheckpointPrefix

func GetStreamBackupGlobalCheckpointPrefix() string

func GetStreamBackupMetaPrefix

func GetStreamBackupMetaPrefix() string

func GetTSFromFile

func GetTSFromFile(
	ctx context.Context,
	s storage.ExternalStorage,
	filename string,
) (uint64, error)

GetTSFromFile gets the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.

func IsMetaDBKey

func IsMetaDBKey(key []byte) bool

func IsMetaDDLJobHistoryKey

func IsMetaDDLJobHistoryKey(key []byte) bool

func MTSkipTruncateLog

func MTSkipTruncateLog(o *migToOpt)

func MaybeDBOrDDLJobHistoryKey

func MaybeDBOrDDLJobHistoryKey(key []byte) bool

func MaybeQPS

func MaybeQPS(ctx context.Context, mgr PDInfoProvider, client *http.Client) (float64, error)

MaybeQPS get a number like the QPS of last seconds for each store via the prometheus interface. TODO: this is a temporary solution(aha, like in a Hackthon),

we MUST find a better way for providing this information.

func MergeMigrations

func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration

Merge merges two migrations. The merged migration contains all operations from the two arguments.

func NewMigration

func NewMigration() *pb.Migration

func ReplaceMetadata

func ReplaceMetadata(meta *pb.Metadata, filegroups []*pb.DataFileGroup)

replace the filegroups and update the ts of the replaced metadata

func SetTSToFile

func SetTSToFile(
	ctx context.Context,
	s storage.ExternalStorage,
	safepoint uint64,
	filename string,
) error

SetTSToFile overrides the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.

func UpdateShiftTS

func UpdateShiftTS(m *pb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool)

Types

type Comparator

type Comparator interface {
	Compare(src, dst []byte) bool
}

Comparator is used for comparing the relationship of src and dst

func NewStartWithComparator

func NewStartWithComparator() Comparator

NewStartWithComparator create a comparator to compare whether src starts with dst

type ContentRef

type ContentRef struct {
	// contains filtered or unexported fields
}

type DBReplace

type DBReplace struct {
	Name     string
	DbID     DownstreamID
	TableMap map[UpstreamID]*TableReplace
}

DBReplace specifies database information mapping from up-stream cluster to up-stream cluster.

func NewDBReplace

func NewDBReplace(name string, newID DownstreamID) *DBReplace

NewDBReplace creates a DBReplace struct.

type DelRangeParams

type DelRangeParams struct {
	JobID    int64
	ElemID   int64
	StartKey string
	EndKey   string
}

type DownstreamID

type DownstreamID = int64

type EventIterator

type EventIterator struct {
	// contains filtered or unexported fields
}

EventIterator is used for reading kv-event from buffer by iterator.

func (*EventIterator) GetError

func (ei *EventIterator) GetError() error

GetError gets the error in iterator. it returns nil if it has nothing mistaken happened

func (*EventIterator) Key

func (ei *EventIterator) Key() []byte

Key gets the key in kv-event if valid() == true

func (*EventIterator) Next

func (ei *EventIterator) Next()

Next specifies the next iterative element.

func (*EventIterator) Valid

func (ei *EventIterator) Valid() bool

Valid checks whether the iterator is valid.

func (*EventIterator) Value

func (ei *EventIterator) Value() []byte

Value gets the value in kv-event if valid() == true

type FileGroupInfo

type FileGroupInfo struct {
	MaxTS   uint64
	Length  uint64
	KVCount int64
}

keep these meta-information for statistics and filtering

type Hooks

type Hooks interface {
	StartLoadingMetaForTruncating()
	EndLoadingMetaForTruncating()

	StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)
	DeletedAFileForTruncating(count int)
	DeletedAllFilesForTruncating()

	StartHandlingMetaEdits([]*pb.MetaEdit)
	HandledAMetaEdit(*pb.MetaEdit)
	HandingMetaEditDone()
}

type Iterator

type Iterator interface {
	Next()
	Valid() bool
	Key() []byte
	Value() []byte
	GetError() error
}

Iterator specifies a read iterator Interface.

func NewEventIterator

func NewEventIterator(buff []byte) Iterator

NewEventIterator creates a Iterator to read kv-event.

type MergeAndMigrateToOpt

type MergeAndMigrateToOpt func(*mergeAndMigrateToConfig)

func MMOptAlwaysRunTruncate

func MMOptAlwaysRunTruncate() MergeAndMigrateToOpt

MMOptAlwaysRunTruncate forces the merge and migrate to always run the truncating. If not set, when the `truncated-to` wasn'd modified, truncating will be skipped. This is necessary because truncating, even a no-op, requires a full scan over metadatas for now. This will be useful for retrying failed truncations.

func MMOptAppendPhantomMigration

func MMOptAppendPhantomMigration(migs ...pb.Migration) MergeAndMigrateToOpt

MMOptAppendPhantomMigration appends a phantom migration to the merge and migrate to operation. The phantom migration will be persised to BASE during executing. We call it a "phantom" because it wasn't persisted. When the target version isn't the latest version, the order of migrating may be broken. Carefully use this in that case.

func MMOptInteractiveCheck

func MMOptInteractiveCheck(f func(context.Context, *pb.Migration) bool) MergeAndMigrateToOpt

type MergeAndMigratedTo

type MergeAndMigratedTo struct {
	MigratedTo
	// The BASE migration of this "migrate to" operation.
	Base *pb.Migration
	// The migrations have been merged to the BASE migration.
	Source []*OrderedMigration
}

MergeAndMigratedTo is the result of a call to `MergeAndMigrateTo`.

type MetadataHelper

type MetadataHelper struct {
	// contains filtered or unexported fields
}

MetadataHelper make restore/truncate compatible with metadataV1 and metadataV2.

func NewMetadataHelper

func NewMetadataHelper(opts ...MetadataHelperOption) *MetadataHelper

func (*MetadataHelper) InitCacheEntry

func (m *MetadataHelper) InitCacheEntry(path string, ref int)

func (*MetadataHelper) Marshal

func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error)

For truncate command. Marshal metadata to reupload to external storage. The metadata must be unmarshal by `ParseToMetadataHard`

func (*MetadataHelper) ParseToMetadata

func (*MetadataHelper) ParseToMetadata(rawMetaData []byte) (*backuppb.Metadata, error)

func (*MetadataHelper) ParseToMetadataHard

func (*MetadataHelper) ParseToMetadataHard(rawMetaData []byte) (*backuppb.Metadata, error)

Only for deleting, after MetadataV1 is deprecated, we can remove it. Hard means convert to MetaDataV2 deeply.

func (*MetadataHelper) ReadFile

func (m *MetadataHelper) ReadFile(
	ctx context.Context,
	path string,
	offset uint64,
	length uint64,
	compressionType backuppb.CompressionType,
	storage storage.ExternalStorage,
	encryptionInfo *encryptionpb.FileEncryptionInfo,
) ([]byte, error)

type MetadataHelperOption

type MetadataHelperOption func(*MetadataHelper)

func WithEncryptionManager

func WithEncryptionManager(manager *encryption.Manager) MetadataHelperOption

type MetadataInfo

type MetadataInfo struct {
	MinTS          uint64
	FileGroupInfos []*FileGroupInfo
}

keep these meta-information for statistics and filtering

type MigrateToOpt

type MigrateToOpt func(*migToOpt)

func MTMaybeSkipTruncateLog

func MTMaybeSkipTruncateLog(cond bool) MigrateToOpt

type MigratedTo

type MigratedTo struct {
	// Errors happen during executing the migration.
	Warnings []error
	// The new BASE migration after the operation.
	NewBase *pb.Migration
}

MigratedTo is the result of trying to "migrate to" a migration.

The term "migrate to" means, try to performance all possible operations from a migration to the storage.

type MigrationExt

type MigrationExt struct {

	// The hooks used for tracking the execution.
	// See the `Hooks` type for more details.
	Hooks Hooks
	// contains filtered or unexported fields
}

MigrationExt is an extension to the `ExternalStorage` type. This added some support methods for the "migration" system of log backup.

Migrations are idempontent batch modifications (adding a compaction, delete a file, etc..) to the backup files. You may check the protocol buffer message `Migration` for more details. Idempontence is important for migrations, as they may be executed multi times due to retry or racing.

The encoded migrations will be put in a folder in the external storage, they are ordered by a series number.

Not all migrations can be applied to the storage then removed from the migration. Small "additions" will be inlined into the migration, for example, a `Compaction`. Small "deletions" will also be delayed, for example, deleting a span in a file. Such operations will be save to a special migration, the first migration, named "BASE".

A simple list of migrations (loaded by `Load()`):

base = [ compaction, deleteSpan, ... ],

layers = {
  { sn = 1, content = [ compaction, ... ] },
  { sn = 2, content = [ compaction, deleteFiles, ... ] },

func MigerationExtension

func MigerationExtension(s storage.ExternalStorage) MigrationExt

MigrateionExtnsion installs the extension methods to an `ExternalStorage`.

func (MigrationExt) AppendMigration

func (m MigrationExt) AppendMigration(ctx context.Context, mig *pb.Migration) (int, error)

func (MigrationExt) DryRun

func (m MigrationExt) DryRun(f func(MigrationExt)) []storage.Effect

func (MigrationExt) Load

func (m MigrationExt) Load(ctx context.Context) (Migrations, error)

Load loads the current living migrations from the storage.

func (MigrationExt) MergeAndMigrateTo

func (m MigrationExt) MergeAndMigrateTo(
	ctx context.Context,
	targetSpec int,
	opts ...MergeAndMigrateToOpt,
) (result MergeAndMigratedTo)

MergeAndMigrateTo will merge the migrations from BASE until the specified SN, then migrate to it. Finally it writes the new BASE and remove stale migrations from the storage.

func (MigrationExt) MigrateTo

func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ...MigrateToOpt) MigratedTo

MigrateTo migrates to a migration. If encountered some error during executing some operation, the operation will be put to the new BASE, which can be retryed then.

type Migrations

type Migrations struct {
	// The BASE migration.
	Base *pb.Migration `json:"base"`
	// Appended migrations.
	// They are sorted by their sequence numbers.
	Layers []*OrderedMigration `json:"layers"`
}

Migrations represents living migrations from the storage.

func (Migrations) ListAll

func (migs Migrations) ListAll() []*pb.Migration

ListAll returns a slice of all migrations in protobuf format. This includes the base migration and any additional layers.

func (Migrations) MergeTo

func (migs Migrations) MergeTo(seq int) *pb.Migration

MergeTo merges migrations from the BASE in the live migrations until the specified sequence number.

func (Migrations) MergeToBy

func (migs Migrations) MergeToBy(seq int, merge func(m1, m2 *pb.Migration) *pb.Migration) *pb.Migration

type NoHooks

type NoHooks struct{}

NoHooks is used for non-interactive secnarios.

func (NoHooks) DeletedAFileForTruncating

func (NoHooks) DeletedAFileForTruncating(count int)

func (NoHooks) DeletedAllFilesForTruncating

func (NoHooks) DeletedAllFilesForTruncating()

func (NoHooks) EndLoadingMetaForTruncating

func (NoHooks) EndLoadingMetaForTruncating()

func (NoHooks) HandingMetaEditDone

func (NoHooks) HandingMetaEditDone()

func (NoHooks) HandledAMetaEdit

func (NoHooks) HandledAMetaEdit(*pb.MetaEdit)

func (NoHooks) StartDeletingForTruncating

func (NoHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)

func (NoHooks) StartHandlingMetaEdits

func (NoHooks) StartHandlingMetaEdits([]*pb.MetaEdit)

func (NoHooks) StartLoadingMetaForTruncating

func (NoHooks) StartLoadingMetaForTruncating()

type OrderedMigration

type OrderedMigration struct {
	SeqNum  int          `json:"seq_num"`
	Path    string       `json:"path"`
	Content pb.Migration `json:"content"`
}

OrderedMigration is a migration with its path and sequence number.

type PDInfoProvider

type PDInfoProvider interface {
	GetPDClient() pd.Client
	GetTLSConfig() *tls.Config
}

type PreDelRangeQuery

type PreDelRangeQuery struct {
	Sql        string
	ParamsList []DelRangeParams
}

type ProgressBarHooks

type ProgressBarHooks struct {
	// contains filtered or unexported fields
}

func NewProgressBarHooks

func NewProgressBarHooks(console glue.ConsoleOperations) *ProgressBarHooks

func (*ProgressBarHooks) DeletedAFileForTruncating

func (p *ProgressBarHooks) DeletedAFileForTruncating(count int)

func (*ProgressBarHooks) DeletedAllFilesForTruncating

func (p *ProgressBarHooks) DeletedAllFilesForTruncating()

func (*ProgressBarHooks) EndLoadingMetaForTruncating

func (p *ProgressBarHooks) EndLoadingMetaForTruncating()

func (*ProgressBarHooks) HandingMetaEditDone

func (p *ProgressBarHooks) HandingMetaEditDone()

func (*ProgressBarHooks) HandledAMetaEdit

func (p *ProgressBarHooks) HandledAMetaEdit(edit *pb.MetaEdit)

func (*ProgressBarHooks) StartDeletingForTruncating

func (p *ProgressBarHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64)

func (*ProgressBarHooks) StartHandlingMetaEdits

func (p *ProgressBarHooks) StartHandlingMetaEdits(edits []*pb.MetaEdit)

func (*ProgressBarHooks) StartLoadingMetaForTruncating

func (p *ProgressBarHooks) StartLoadingMetaForTruncating()

type RawMetaKey

type RawMetaKey struct {
	Key   []byte
	Field []byte
	Ts    uint64
}

RawMetaKey specified a transaction meta key.

func ParseTxnMetaKeyFrom

func ParseTxnMetaKeyFrom(txnKey kv.Key) (*RawMetaKey, error)

ParseTxnMetaKeyFrom gets a `RawMetaKey` struct by parsing transaction meta key.

func (*RawMetaKey) EncodeMetaKey

func (k *RawMetaKey) EncodeMetaKey() kv.Key

EncodeMetaKey Encodes RawMetaKey into a transaction key

func (*RawMetaKey) UpdateField

func (k *RawMetaKey) UpdateField(field []byte)

UpdateField updates `Field` field in `RawMetaKey` struct.

func (*RawMetaKey) UpdateKey

func (k *RawMetaKey) UpdateKey(key []byte)

UpdateKey updates `key` field in `RawMetaKey` struct.

func (*RawMetaKey) UpdateTS

func (k *RawMetaKey) UpdateTS(ts uint64)

UpdateTS updates `Ts` field in `RawMetaKey` struct.

type RawWriteCFValue

type RawWriteCFValue struct {
	// contains filtered or unexported fields
}

RawWriteCFValue represents the value in write columnFamily. Detail see line: https://github.com/tikv/tikv/blob/release-6.5/components/txn_types/src/write.rs#L70

func (*RawWriteCFValue) EncodeTo

func (v *RawWriteCFValue) EncodeTo() []byte

EncodeTo encodes the RawWriteCFValue to get encoded value.

func (*RawWriteCFValue) GetShortValue

func (v *RawWriteCFValue) GetShortValue() []byte

UpdateShortValue gets the shortValue field.

func (*RawWriteCFValue) GetStartTs

func (v *RawWriteCFValue) GetStartTs() uint64

func (*RawWriteCFValue) GetWriteType

func (v *RawWriteCFValue) GetWriteType() byte

func (*RawWriteCFValue) HasShortValue

func (v *RawWriteCFValue) HasShortValue() bool

HasShortValue checks whether short value is stored in write cf.

func (*RawWriteCFValue) IsDelete

func (v *RawWriteCFValue) IsDelete() bool

IsRollback checks whether the value in cf is a `delete` record.

func (*RawWriteCFValue) IsRollback

func (v *RawWriteCFValue) IsRollback() bool

IsRollback checks whether the value in cf is a `rollback` record.

func (*RawWriteCFValue) ParseFrom

func (v *RawWriteCFValue) ParseFrom(data []byte) error

ParseFrom decodes the value to get the struct `RawWriteCFValue`.

func (*RawWriteCFValue) UpdateShortValue

func (v *RawWriteCFValue) UpdateShortValue(value []byte)

UpdateShortValue updates the shortValue field.

type RewriteStatus

type RewriteStatus int

type SchemasReplace

type SchemasReplace struct {
	DbMap map[UpstreamID]*DBReplace

	TiflashRecorder *tiflashrec.TiFlashRecorder
	RewriteTS       uint64        // used to rewrite commit ts in meta kv.
	TableFilter     filter.Filter // used to filter schema/table

	AfterTableRewritten func(deleted bool, tableInfo *model.TableInfo)
	// contains filtered or unexported fields
}

SchemasReplace specifies schemas information mapping from up-stream cluster to up-stream cluster.

func NewSchemasReplace

func NewSchemasReplace(
	dbMap map[UpstreamID]*DBReplace,
	needConstructIdMap bool,
	tiflashRecorder *tiflashrec.TiFlashRecorder,
	restoreTS uint64,
	tableFilter filter.Filter,
	genID func(ctx context.Context) (int64, error),
	genIDs func(ctx context.Context, n int) ([]int64, error),
	recordDeleteRange func(*PreDelRangeQuery),
) *SchemasReplace

NewSchemasReplace creates a SchemasReplace struct.

func (*SchemasReplace) GetIngestRecorder

func (sr *SchemasReplace) GetIngestRecorder() *ingestrec.IngestRecorder

func (*SchemasReplace) IsPreConsturctMapStatus

func (sr *SchemasReplace) IsPreConsturctMapStatus() bool

IsPreConsturctMapStatus checks the status is PreConsturctMap.

func (*SchemasReplace) IsRestoreKVStatus

func (sr *SchemasReplace) IsRestoreKVStatus() bool

IsRestoreKVStatus checks the status is RestoreKV.

func (*SchemasReplace) NeedConstructIdMap

func (sr *SchemasReplace) NeedConstructIdMap() bool

func (*SchemasReplace) RewriteKvEntry

func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, error)

RewriteKvEntry uses to rewrite tableID/dbID in entry.key and entry.value

func (*SchemasReplace) SetPreConstructMapStatus

func (sr *SchemasReplace) SetPreConstructMapStatus()

SetPreConstructMapStatus sets the PreConstructMap status.

func (*SchemasReplace) SetRestoreKVStatus

func (sr *SchemasReplace) SetRestoreKVStatus()

SetRestoreKVStatus sets the RestoreKV status.

func (*SchemasReplace) TidySchemaMaps

func (sr *SchemasReplace) TidySchemaMaps() []*backuppb.PitrDBMap

TidySchemaMaps produces schemas id maps from up-stream to down-stream.

type StatusController

type StatusController struct {
	// contains filtered or unexported fields
}

StatusController is the controller type (or context type) for the command `stream status`.

func NewStatusController

func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrinter) *StatusController

NewStatusContorller make a status controller via some resource accessors.

func (*StatusController) Close

func (ctl *StatusController) Close() error

func (*StatusController) PrintStatusOfTask

func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string) error

PrintStatusOfTask prints the status of tasks with the name. When the name is *, print all tasks.

type StreamBackupSearch

type StreamBackupSearch struct {
	// contains filtered or unexported fields
}

StreamBackupSearch is used for searching key from log data files

func NewStreamBackupSearch

func NewStreamBackupSearch(
	storage storage.ExternalStorage,
	comparator Comparator,
	searchKey []byte,
) *StreamBackupSearch

NewStreamBackupSearch creates an instance of StreamBackupSearch

func (*StreamBackupSearch) Search

func (s *StreamBackupSearch) Search(ctx context.Context) ([]*StreamKVInfo, error)

Search kv entries from log data files

func (*StreamBackupSearch) SetEndTs

func (s *StreamBackupSearch) SetEndTs(endTs uint64)

SetEndTs set end timestamp searched to

func (*StreamBackupSearch) SetStartTS

func (s *StreamBackupSearch) SetStartTS(startTs uint64)

SetStartTS set start timestamp searched from

type StreamKVInfo

type StreamKVInfo struct {
	Key        string `json:"key"`
	EncodedKey string `json:"-"`
	WriteType  byte   `json:"write-type"`
	StartTs    uint64 `json:"start-ts"`
	CommitTs   uint64 `json:"commit-ts"`
	CFName     string `json:"cf-name"`
	Value      string `json:"value,omitempty"`
	ShortValue string `json:"short-value,omitempty"`
}

StreamKVInfo stores kv info searched from log data files

type StreamMetadataSet

type StreamMetadataSet struct {
	// if set true, the metadata and datafile won't be removed
	DryRun bool

	MetadataDownloadBatchSize uint

	// a parser of metadata
	Helper *MetadataHelper

	// for test
	BeforeDoWriteBack func(path string, replaced *pb.Metadata) (skip bool)
	// contains filtered or unexported fields
}

func (*StreamMetadataSet) IterateFilesFullyBefore

func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))

IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.

0                                          before
|------------------------------------------|
 |-file1---------------| <- File contains records in this TS range would be found.
                               |-file2--------------| <- File contains any record out of this won't be found.

This function would call the `f` over file1 only.

func (*StreamMetadataSet) LoadFrom

LoadFrom loads data from an external storage into the stream metadata set. (Now only for test)

func (*StreamMetadataSet) LoadUntilAndCalculateShiftTS

func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(
	ctx context.Context,
	s storage.ExternalStorage,
	until uint64,
) (uint64, error)

LoadUntilAndCalculateShiftTS loads the metadata until the specified timestamp and calculate the shift-until-ts by the way. This would record all metadata files that *may* contain data from transaction committed before that TS.

func (*StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch

func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(
	ctx context.Context,
	from uint64,
	st storage.ExternalStorage,

	updateFn func(num int64),
) ([]string, error)

RemoveDataFilesAndUpdateMetadataInBatch concurrently remove datafilegroups and update metadata. Only one metadata is processed in each thread, including deleting its datafilegroup and updating it. Returns the not deleted datafilegroups.

func (*StreamMetadataSet) TEST_GetMetadataInfos

func (ms *StreamMetadataSet) TEST_GetMetadataInfos() map[string]*MetadataInfo

type TableReplace

type TableReplace struct {
	Name         string
	TableID      DownstreamID
	PartitionMap map[UpstreamID]DownstreamID
	IndexMap     map[UpstreamID]DownstreamID
}

TableReplace specifies table information mapping from up-stream cluster to up-stream cluster.

func NewTableReplace

func NewTableReplace(name string, newID DownstreamID) *TableReplace

NewTableReplace creates a TableReplace struct.

type TaskPrinter

type TaskPrinter interface {
	AddTask(t TaskStatus)
	PrintTasks()
}

func PrintTaskByTable

func PrintTaskByTable(c glue.ConsoleOperations) TaskPrinter

PrintTaskByTable make a TaskPrinter, which prints the task by the `Table` implement of the console.

func PrintTaskWithJSON

func PrintTaskWithJSON(c glue.ConsoleOperations) TaskPrinter

PrintTaskWithJSON make a TaskPrinter, which prints tasks as json to the console directly.

type TaskStatus

type TaskStatus struct {
	// Info is the stream task information.
	Info backuppb.StreamBackupTaskInfo

	// Checkpoints collects the checkpoints.
	Checkpoints []Checkpoint
	// Total QPS of the task in recent seconds.
	QPS float64
	// Last error reported by the store.
	LastErrors map[uint64]backuppb.StreamBackupError
	// contains filtered or unexported fields
}

func (TaskStatus) GetMinStoreCheckpoint

func (t TaskStatus) GetMinStoreCheckpoint() Checkpoint

GetCheckpoint calculates the checkpoint of the task.

type UpstreamID

type UpstreamID = int64

type WriteType

type WriteType = byte

WriteType defines a write type.

const (
	WriteTypeLock     WriteType = 'L'
	WriteTypeRollback WriteType = 'R'
	WriteTypeDelete   WriteType = 'D'
	WriteTypePut      WriteType = 'P'
)

WriteType

func WriteTypeFrom

func WriteTypeFrom(t byte) (WriteType, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL