backup

package
v1.1.0-beta.0...-1acbbec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2024 License: Apache-2.0, Apache-2.0 Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultSchemaConcurrency is the default number of the concurrent
	// backup schema tasks.
	DefaultSchemaConcurrency = 64
)
View Source
const (
	// MaxResolveLocksbackupOffSleep is the maximum sleep time for resolving locks.
	// 10 minutes for every round.
	MaxResolveLocksbackupOffSleepMs = 600000
)

Variables

View Source
var TimeoutOneResponse = time.Hour

Functions

func BuildBackupSchemas

func BuildBackupSchemas(
	storage kv.Storage,
	tableFilter filter.Filter,
	backupTS uint64,
	isFullBackup bool,
	fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo),
) error

func BuildFullSchema

func BuildFullSchema(storage kv.Storage, backupTS uint64, fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo)) error

BuildFullSchema builds a full backup schemas for databases and tables.

func CheckBackupStorageIsLocked

func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage) error

CheckBackupStorageIsLocked checks whether backups is locked. which means we found other backup progress already write some data files into the same backup directory or cloud prefix.

func ObserveStoreChangesAsync

func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetryPolicy, pdCli pd.Client)

func SplitBackupReqRanges

func SplitBackupReqRanges(req backuppb.BackupRequest, count uint) []backuppb.BackupRequest

func StartTimeoutRecv

func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Context, *timeoutRecv)

func WriteBackupDDLJobs

func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64, needDomain bool) error

WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.

Types

type BackupRetryPolicy

type BackupRetryPolicy struct {
	One uint64
	All bool
}

type BackupSender

type BackupSender interface {
	SendAsync(
		ctx context.Context,
		round uint64,
		storeID uint64,
		request backuppb.BackupRequest,
		concurrency uint,
		cli backuppb.BackupClient,
		respCh chan *ResponseAndStore,
		StateNotifier chan BackupRetryPolicy)
}

type Checksum

type Checksum struct {
	Crc64Xor   uint64
	TotalKvs   uint64
	TotalBytes uint64
}

Checksum is the checksum of some backup files calculated by CollectChecksums.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a client instructs TiKV how to do a backup.

func NewBackupClient

func NewBackupClient(ctx context.Context, mgr ClientMgr) *Client

NewBackupClient returns a new backup client.

func (*Client) BackupRanges

func (bc *Client) BackupRanges(
	ctx context.Context,
	ranges []rtree.Range,
	request backuppb.BackupRequest,
	concurrency uint,
	replicaReadLabel map[string]string,
	metaWriter *metautil.MetaWriter,
	progressCallBack func(),
) error

BackupRanges make a backup of the given key ranges.

func (*Client) BuildBackupRangeAndSchema

func (bc *Client) BuildBackupRangeAndSchema(
	storage kv.Storage,
	tableFilter filter.Filter,
	backupTS uint64,
	isFullBackup bool,
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error)

Client.BuildBackupRangeAndSchema calls BuildBackupRangeAndSchema, if the checkpoint mode is used, return the ranges from checkpoint meta

func (*Client) BuildProgressRangeTree

func (bc *Client) BuildProgressRangeTree(ranges []rtree.Range) (rtree.ProgressRangeTree, error)

func (*Client) CheckCheckpoint

func (bc *Client) CheckCheckpoint(hash []byte) error

CheckCheckpoint check whether the configs are the same

func (*Client) GetApiVersion

func (bc *Client) GetApiVersion() kvrpcpb.APIVersion

GetApiVersion sets api version of the TiKV storage

func (*Client) GetClusterID

func (bc *Client) GetClusterID() uint64

GetClusterID returns the cluster ID of the tidb cluster to backup.

func (*Client) GetCurrentTS

func (bc *Client) GetCurrentTS(ctx context.Context) (uint64, error)

GetCurrentTS gets a new timestamp from PD.

func (*Client) GetGCTTL

func (bc *Client) GetGCTTL() int64

GetGCTTL get gcTTL for this backup.

func (*Client) GetSafePointID

func (bc *Client) GetSafePointID() string

GetSafePointID get the gc-safe-point's service-id from either checkpoint or immediate generation

func (*Client) GetStorage

func (bc *Client) GetStorage() storage.ExternalStorage

GetStorage gets storage for this backup.

func (*Client) GetStorageBackend

func (bc *Client) GetStorageBackend() *backuppb.StorageBackend

GetStorageBackend gets storage backupend field in client.

func (*Client) GetTS

func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) (uint64, error)

GetTS returns the latest timestamp.

func (*Client) OnBackupResponse

func (bc *Client) OnBackupResponse(
	ctx context.Context,
	r *ResponseAndStore,
	errContext *utils.ErrorContext,
	globalProgressTree *rtree.ProgressRangeTree,
) (*txnlock.Lock, error)

func (*Client) RunLoop

func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error

infinite loop to backup ranges on all tikv stores if one client grpc disconnected. resend backup request to this store. if new tikv store joined. send backup request to new store. if one tikv store rebooted. consider leader changes, resend backup request to all stores. if one tikv store disconnected. consider leader changes, resend backup request to all stores.a

func (*Client) SetApiVersion

func (bc *Client) SetApiVersion(v kvrpcpb.APIVersion)

SetApiVersion sets api version of the TiKV storage

func (*Client) SetCipher

func (bc *Client) SetCipher(cipher *backuppb.CipherInfo)

SetCipher for checkpoint to encrypt sst file's metadata

func (*Client) SetGCTTL

func (bc *Client) SetGCTTL(ttl int64)

SetGCTTL set gcTTL for client.

func (*Client) SetLockFile

func (bc *Client) SetLockFile(ctx context.Context) error

SetLockFile set write lock file.

func (*Client) SetStorage

func (bc *Client) SetStorage(
	ctx context.Context,
	backend *backuppb.StorageBackend,
	opts *storage.ExternalStorageOptions,
) error

SetStorage sets ExternalStorage for client.

func (*Client) SetStorageAndCheckNotInUse

func (bc *Client) SetStorageAndCheckNotInUse(
	ctx context.Context,
	backend *backuppb.StorageBackend,
	opts *storage.ExternalStorageOptions,
) error

SetStorageAndCheckNotInUse sets ExternalStorage for client and check storage not in used by others.

func (*Client) StartCheckpointRunner

func (bc *Client) StartCheckpointRunner(
	ctx context.Context,
	cfgHash []byte,
	backupTS uint64,
	ranges []rtree.Range,
	safePointID string,
	progressCallBack func(),
) (err error)

StartCheckpointMeta will 1. saves the initial status into the external storage; 2. load the checkpoint data from external storage 3. start checkpoint runner

func (*Client) WaitForFinishCheckpoint

func (bc *Client) WaitForFinishCheckpoint(ctx context.Context, flush bool)

type ClientMgr

type ClientMgr interface {
	GetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
	ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
	GetPDClient() pd.Client
	GetLockResolver() *txnlock.LockResolver
	Close()
}

ClientMgr manages connections needed by backup.

type MainBackupLoop

type MainBackupLoop struct {
	BackupSender

	// backup requests for all stores.
	// the subRanges may changed every round.
	BackupReq backuppb.BackupRequest
	// the number of backup clients to send backup requests per store.
	Concurrency uint
	// record the whole backup progress in infinite loop.
	GlobalProgressTree *rtree.ProgressRangeTree
	ReplicaReadLabel   map[string]string
	StateNotifier      chan BackupRetryPolicy

	ProgressCallBack        func()
	GetBackupClientCallBack func(ctx context.Context, storeID uint64, reset bool) (backuppb.BackupClient, error)
}

func (*MainBackupLoop) CollectStoreBackupsAsync

func (l *MainBackupLoop) CollectStoreBackupsAsync(
	ctx context.Context,
	round uint64,
	storeBackupChs map[uint64]chan *ResponseAndStore,
	globalCh chan *ResponseAndStore,
)

CollectStoreBackupsAsync is the receiver function of all stores backup results.

type MainBackupSender

type MainBackupSender struct{}

func (*MainBackupSender) SendAsync

func (s *MainBackupSender) SendAsync(
	ctx context.Context,
	round uint64,
	storeID uint64,
	request backuppb.BackupRequest,
	concurrency uint,
	cli backuppb.BackupClient,
	respCh chan *ResponseAndStore,
	StateNotifier chan BackupRetryPolicy,
)

type ProgressUnit

type ProgressUnit string

ProgressUnit represents the unit of progress.

type ResponseAndStore

type ResponseAndStore struct {
	Resp    *backuppb.BackupResponse
	StoreID uint64
}

func (ResponseAndStore) GetResponse

func (r ResponseAndStore) GetResponse() *backuppb.BackupResponse

func (ResponseAndStore) GetStoreID

func (r ResponseAndStore) GetStoreID() uint64

type Schemas

type Schemas struct {
	// contains filtered or unexported fields
}

Schemas is task for backuping schemas.

func BuildBackupRangeAndInitSchema

func BuildBackupRangeAndInitSchema(
	storage kv.Storage,
	tableFilter filter.Filter,
	backupTS uint64,
	isFullBackup bool,
	buildRange bool,
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error)

BuildBackupRangeAndSchema gets KV range and schema of tables. KV ranges are separated by Table IDs. Also, KV ranges are separated by Index IDs in the same table.

func NewBackupSchemas

func NewBackupSchemas(iterFunc iterFuncTp, size int) *Schemas

func (*Schemas) BackupSchemas

func (ss *Schemas) BackupSchemas(
	ctx context.Context,
	metaWriter *metautil.MetaWriter,
	checkpointRunner *checkpoint.CheckpointRunner[checkpoint.BackupKeyType, checkpoint.BackupValueType],
	store kv.Storage,
	statsHandle *handle.Handle,
	backupTS uint64,
	concurrency uint,
	copConcurrency uint,
	skipChecksum bool,
	updateCh glue.Progress,
) error

BackupSchemas backups table info, including checksum and stats.

func (*Schemas) Len

func (ss *Schemas) Len() int

Len returns the number of schemas.

func (*Schemas) SetCheckpointChecksum

func (ss *Schemas) SetCheckpointChecksum(checkpointChecksum map[int64]*checkpoint.ChecksumItem)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL