Documentation ¶
Index ¶
- Constants
- Variables
- func BuildBackupSchemas(storage kv.Storage, tableFilter filter.Filter, backupTS uint64, ...) error
- func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage) error
- func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetryPolicy, pdCli pd.Client)
- func SplitBackupReqRanges(req backuppb.BackupRequest, count uint) []backuppb.BackupRequest
- func StartTimeoutRecv(ctx context.Context, timeout time.Duration, storeID uint64) (context.Context, *timeoutRecv)
- func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, ...) error
- type BackupRetryPolicy
- type BackupSender
- type Checksum
- type Client
- func (bc *Client) BackupRanges(ctx context.Context, ranges []rtree.Range, request backuppb.BackupRequest, ...) error
- func (bc *Client) BuildBackupRangeAndSchema(storage kv.Storage, tableFilter filter.Filter, backupTS uint64, ...) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error)
- func (bc *Client) BuildProgressRangeTree(ranges []rtree.Range) (rtree.ProgressRangeTree, error)
- func (bc *Client) CheckCheckpoint(hash []byte) error
- func (bc *Client) GetApiVersion() kvrpcpb.APIVersion
- func (bc *Client) GetCheckpointRunner() ...
- func (bc *Client) GetClusterID() uint64
- func (bc *Client) GetCurrentTS(ctx context.Context) (uint64, error)
- func (bc *Client) GetGCTTL() int64
- func (bc *Client) GetSafePointID() string
- func (bc *Client) GetStorage() storage.ExternalStorage
- func (bc *Client) GetStorageBackend() *backuppb.StorageBackend
- func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64) (uint64, error)
- func (bc *Client) OnBackupResponse(ctx context.Context, r *ResponseAndStore, errContext *utils.ErrorContext, ...) (*txnlock.Lock, error)
- func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error
- func (bc *Client) SetApiVersion(v kvrpcpb.APIVersion)
- func (bc *Client) SetCipher(cipher *backuppb.CipherInfo)
- func (bc *Client) SetGCTTL(ttl int64)
- func (bc *Client) SetLockFile(ctx context.Context) error
- func (bc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, ...) error
- func (bc *Client) SetStorageAndCheckNotInUse(ctx context.Context, backend *backuppb.StorageBackend, ...) error
- func (bc *Client) StartCheckpointRunner(ctx context.Context, cfgHash []byte, backupTS uint64, ranges []rtree.Range, ...) (err error)
- func (bc *Client) WaitForFinishCheckpoint(ctx context.Context, flush bool)
- type ClientMgr
- type MainBackupLoop
- type MainBackupSender
- type ProgressUnit
- type ResponseAndStore
- type Schemas
Constants ¶
const ( // MaxResolveLocksbackupOffSleep is the maximum sleep time for resolving locks. // 10 minutes for every round. MaxResolveLocksbackupOffSleepMs = 600000 IncompleteRangesUpdateInterval = time.Second * 15 )
const ( // DefaultSchemaConcurrency is the default number of the concurrent // backup schema tasks. DefaultSchemaConcurrency = 64 )
Variables ¶
var TimeoutOneResponse = time.Hour
Functions ¶
func BuildBackupSchemas ¶
func CheckBackupStorageIsLocked ¶
func CheckBackupStorageIsLocked(ctx context.Context, s storage.ExternalStorage) error
CheckBackupStorageIsLocked checks whether backups is locked. which means we found other backup progress already write some data files into the same backup directory or cloud prefix.
func ObserveStoreChangesAsync ¶
func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetryPolicy, pdCli pd.Client)
func SplitBackupReqRanges ¶
func SplitBackupReqRanges(req backuppb.BackupRequest, count uint) []backuppb.BackupRequest
func StartTimeoutRecv ¶
Types ¶
type BackupRetryPolicy ¶
type BackupSender ¶
type BackupSender interface { SendAsync( ctx context.Context, round uint64, storeID uint64, request backuppb.BackupRequest, concurrency uint, cli backuppb.BackupClient, respCh chan *ResponseAndStore, StateNotifier chan BackupRetryPolicy) }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a client instructs TiKV how to do a backup.
func NewBackupClient ¶
NewBackupClient returns a new backup client.
func (*Client) BackupRanges ¶
func (bc *Client) BackupRanges( ctx context.Context, ranges []rtree.Range, request backuppb.BackupRequest, concurrency uint, replicaReadLabel map[string]string, metaWriter *metautil.MetaWriter, progressCallBack func(ProgressUnit), ) error
BackupRanges make a backup of the given key ranges.
func (*Client) BuildBackupRangeAndSchema ¶
func (bc *Client) BuildBackupRangeAndSchema( storage kv.Storage, tableFilter filter.Filter, backupTS uint64, isFullBackup bool, ) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error)
Client.BuildBackupRangeAndSchema calls BuildBackupRangeAndSchema, if the checkpoint mode is used, return the ranges from checkpoint meta
func (*Client) BuildProgressRangeTree ¶
func (*Client) CheckCheckpoint ¶
CheckCheckpoint check whether the configs are the same
func (*Client) GetApiVersion ¶
func (bc *Client) GetApiVersion() kvrpcpb.APIVersion
GetApiVersion sets api version of the TiKV storage
func (*Client) GetCheckpointRunner ¶
func (bc *Client) GetCheckpointRunner() *checkpoint.CheckpointRunner[checkpoint.BackupKeyType, checkpoint.BackupValueType]
func (*Client) GetClusterID ¶
GetClusterID returns the cluster ID of the tidb cluster to backup.
func (*Client) GetCurrentTS ¶
GetCurrentTS gets a new timestamp from PD.
func (*Client) GetSafePointID ¶
GetSafePointID get the gc-safe-point's service-id from either checkpoint or immediate generation
func (*Client) GetStorage ¶
func (bc *Client) GetStorage() storage.ExternalStorage
GetStorage gets storage for this backup.
func (*Client) GetStorageBackend ¶
func (bc *Client) GetStorageBackend() *backuppb.StorageBackend
GetStorageBackend gets storage backupend field in client.
func (*Client) OnBackupResponse ¶
func (bc *Client) OnBackupResponse( ctx context.Context, r *ResponseAndStore, errContext *utils.ErrorContext, globalProgressTree *rtree.ProgressRangeTree, ) (*txnlock.Lock, error)
func (*Client) RunLoop ¶
func (bc *Client) RunLoop(ctx context.Context, loop *MainBackupLoop) error
infinite loop to backup ranges on all tikv stores if one client grpc disconnected. resend backup request to this store. if new tikv store joined. send backup request to new store. if one tikv store rebooted. consider leader changes, resend backup request to all stores. if one tikv store disconnected. consider leader changes, resend backup request to all stores.a
func (*Client) SetApiVersion ¶
func (bc *Client) SetApiVersion(v kvrpcpb.APIVersion)
SetApiVersion sets api version of the TiKV storage
func (*Client) SetCipher ¶
func (bc *Client) SetCipher(cipher *backuppb.CipherInfo)
SetCipher for checkpoint to encrypt sst file's metadata
func (*Client) SetLockFile ¶
SetLockFile set write lock file.
func (*Client) SetStorage ¶
func (bc *Client) SetStorage( ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions, ) error
SetStorage sets ExternalStorage for client.
func (*Client) SetStorageAndCheckNotInUse ¶
func (bc *Client) SetStorageAndCheckNotInUse( ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions, ) error
SetStorageAndCheckNotInUse sets ExternalStorage for client and check storage not in used by others.
func (*Client) StartCheckpointRunner ¶
func (bc *Client) StartCheckpointRunner( ctx context.Context, cfgHash []byte, backupTS uint64, ranges []rtree.Range, safePointID string, progressCallBack func(ProgressUnit), ) (err error)
StartCheckpointMeta will 1. saves the initial status into the external storage; 2. load the checkpoint data from external storage 3. start checkpoint runner
type ClientMgr ¶
type ClientMgr interface { GetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) GetPDClient() pd.Client GetLockResolver() *txnlock.LockResolver Close() }
ClientMgr manages connections needed by backup.
type MainBackupLoop ¶
type MainBackupLoop struct { BackupSender // backup requests for all stores. // the subRanges may changed every round. BackupReq backuppb.BackupRequest // the number of backup clients to send backup requests per store. Concurrency uint // record the whole backup progress in infinite loop. GlobalProgressTree *rtree.ProgressRangeTree ReplicaReadLabel map[string]string StateNotifier chan BackupRetryPolicy ProgressCallBack func(ProgressUnit) GetBackupClientCallBack func(ctx context.Context, storeID uint64, reset bool) (backuppb.BackupClient, error) }
func (*MainBackupLoop) CollectStoreBackupsAsync ¶
func (l *MainBackupLoop) CollectStoreBackupsAsync( ctx context.Context, round uint64, storeBackupChs map[uint64]chan *ResponseAndStore, globalCh chan *ResponseAndStore, )
CollectStoreBackupsAsync is the receiver function of all stores backup results.
type MainBackupSender ¶
type MainBackupSender struct{}
func (*MainBackupSender) SendAsync ¶
func (s *MainBackupSender) SendAsync( ctx context.Context, round uint64, storeID uint64, request backuppb.BackupRequest, concurrency uint, cli backuppb.BackupClient, respCh chan *ResponseAndStore, StateNotifier chan BackupRetryPolicy, )
type ProgressUnit ¶
type ProgressUnit string
ProgressUnit represents the unit of progress.
const ( // UnitRange represents the progress updated counter when a range finished. UnitRange ProgressUnit = "range" // UnitRegion represents the progress updated counter when a region finished. UnitRegion ProgressUnit = "region" )
type ResponseAndStore ¶
type ResponseAndStore struct { Resp *backuppb.BackupResponse StoreID uint64 }
func (ResponseAndStore) GetResponse ¶
func (r ResponseAndStore) GetResponse() *backuppb.BackupResponse
func (ResponseAndStore) GetStoreID ¶
func (r ResponseAndStore) GetStoreID() uint64
type Schemas ¶
type Schemas struct {
// contains filtered or unexported fields
}
Schemas is task for backuping schemas.
func BuildBackupRangeAndInitSchema ¶
func BuildBackupRangeAndInitSchema( storage kv.Storage, tableFilter filter.Filter, backupTS uint64, isFullBackup bool, buildRange bool, ) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error)
BuildBackupRangeAndSchema gets KV range and schema of tables. KV ranges are separated by Table IDs. Also, KV ranges are separated by Index IDs in the same table.
func NewBackupSchemas ¶
func (*Schemas) BackupSchemas ¶
func (ss *Schemas) BackupSchemas( ctx context.Context, metaWriter *metautil.MetaWriter, checkpointRunner *checkpoint.CheckpointRunner[checkpoint.BackupKeyType, checkpoint.BackupValueType], store kv.Storage, statsHandle *handle.Handle, backupTS uint64, concurrency uint, copConcurrency uint, skipChecksum bool, updateCh glue.Progress, ) error
BackupSchemas backups table info, including checksum and stats.
func (*Schemas) SetCheckpointChecksum ¶
func (ss *Schemas) SetCheckpointChecksum(checkpointChecksum map[int64]*checkpoint.ChecksumItem)