Documentation ¶
Index ¶
- Constants
- Variables
- func ArchiveSize(meta *backuppb.BackupMeta) uint64
- func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error
- func CheckLogBackupEnabled(ctx sessionctx.Context) bool
- func CheckLogBackupTaskExist() bool
- func CloneSlice[T any](s []T) []T
- func CompareBytesExt(a []byte, aEmptyAsInf bool, b []byte, bEmptyAsInf bool) int
- func CompareEndKey(a, b []byte) int
- func EncloseDBAndTable(database, table string) string
- func EncloseName(name string) string
- func FallBack2CreateTable(err error) bool
- func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, ...) (*grpc.ClientConn, error)
- func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)
- func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)
- func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64
- func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64
- func GetSysDBCIStrName(tempDB model.CIStr) (model.CIStr, bool)
- func GetSysDBName(tempDB model.CIStr) (string, bool)
- func GetTidbNewCollationEnabled() string
- func HideSensitive(input string) string
- func IntersectAll(s1 []kv.KeyRange, s2 []kv.KeyRange) []kv.KeyRange
- func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error)
- func IsLogBackupInUse(ctx sessionctx.Context) bool
- func IsSysDB(dbLowerName string) bool
- func IsTypeCompatible(src types.FieldType, target types.FieldType) bool
- func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error)
- func LogBackupTaskCountDec()
- func LogBackupTaskCountInc()
- func LogEnvVariables()
- func MakeSafePointID() string
- func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)
- func MessageIsNotFoundStorageError(msg string) bool
- func MessageIsPermissionDeniedStorageError(msg string) bool
- func MessageIsRetryableStorageError(msg string) bool
- func NeedAutoID(tblInfo *model.TableInfo) bool
- func NextPowerOfTwo(i int64) int64
- func PanicToErr(err *error)
- func ParseKey(format, key string) ([]byte, error)
- func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error
- func StartDynamicPProfListener(tls *tidbutils.TLS)
- func StartPProfListener(statusAddr string, wrapper *tidbutils.TLS) error
- func StartServiceSafePointKeeper(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error
- func TemporaryDBName(db string) model.CIStr
- func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)
- func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error
- func WithRetry(ctx context.Context, retryableFunc RetryableFunc, backoffer Backoffer) error
- type BRServiceSafePoint
- type Backoffer
- type DBExecutor
- type Database
- type FakePDClient
- type Pool
- type ProgressPrinter
- type QueryExecutor
- type RetryState
- type RetryableFunc
- type StmtExecutor
- type StoreManager
- func (mgr *StoreManager) Close()
- func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters
- func (mgr *StoreManager) PDClient() pd.Client
- func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
- func (mgr *StoreManager) TLSConfig() *tls.Config
- func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error
- type Worker
- type WorkerPool
- func (pool *WorkerPool) Apply(fn taskFunc)
- func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error)
- func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc)
- func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error)
- func (pool *WorkerPool) ApplyWorker() *Worker
- func (pool *WorkerPool) HasWorker() bool
- func (pool *WorkerPool) IdleCount() int
- func (pool *WorkerPool) Limit() int
- func (pool *WorkerPool) RecycleWorker(worker *Worker)
Constants ¶
const ( // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min. DefaultBRGCSafePointTTL = 5 * 60 // DefaultStreamStartSafePointTTL specifies keeping the server safepoint 30 mins when start task. DefaultStreamStartSafePointTTL = 1800 // DefaultStreamPauseSafePointTTL specifies Keeping the server safePoint at list 24h when pause task. DefaultStreamPauseSafePointTTL = 24 * 3600 )
Variables ¶
var (
LogBackupTaskMutex sync.Mutex
)
Functions ¶
func ArchiveSize ¶
func ArchiveSize(meta *backuppb.BackupMeta) uint64
ArchiveSize returns the total size of the backup archive.
func CheckGCSafePoint ¶
CheckGCSafePoint checks whether the ts is older than GC safepoint. Note: It ignores errors other than exceed GC safepoint.
func CheckLogBackupEnabled ¶
func CheckLogBackupEnabled(ctx sessionctx.Context) bool
CheckLogBackupEnabled checks if LogBackup is enabled in cluster. this mainly used in three places. 1. GC worker resolve locks to scan more locks after safepoint. (every minute) 2. Add index skipping use lightning.(every add index ddl) 3. Telemetry of log backup feature usage (every 6 hours). NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster.
func CheckLogBackupTaskExist ¶
func CheckLogBackupTaskExist() bool
CheckLogBackupTaskExist checks that whether log-backup is existed.
func CompareBytesExt ¶
CompareBytesExt compare two byte sequences. different from `bytes.Compare`, we can provide whether to treat the key as inf when meet empty key to this.
func CompareEndKey ¶
CompareEndKey compared two keys that BOTH represent the EXCLUSIVE ending of some range. An empty end key is the very end, so an empty key is greater than any other keys. Please note that this function is not applicable if any one argument is not an EXCLUSIVE ending of a range.
func EncloseDBAndTable ¶
EncloseDBAndTable formats the database and table name in sql.
func FallBack2CreateTable ¶
func GRPCConn ¶
func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, opts ...grpc.DialOption) (*grpc.ClientConn, error)
func GetGcRatio ¶
func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)
func GetRegionSplitInfo ¶
func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)
func GetSplitKeys ¶
func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64
func GetSplitSize ¶
func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64
func GetSysDBCIStrName ¶
GetSysDBCIStrName get the CIStr name of system DB
func GetSysDBName ¶
GetSysDBName get the original name of system DB
func GetTidbNewCollationEnabled ¶
func GetTidbNewCollationEnabled() string
GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled.
func HideSensitive ¶
HideSensitive replace password with ******.
func IntersectAll ¶
IntersectAll returns the intersect of two set of segments. OWNERSHIP INFORMATION: For running faster, this function would MUTATE the input slice. (i.e. takes its ownership.) (But it is promised that this function won't change the `start key` and `end key` slice) If you want to use the input slice after, call `CloneSlice` over arguments before passing them.
You can treat "set of segments" as points maybe not adjacent. in this way, IntersectAll(s1, s2) = { point | point in both s1 and s2 } Example: ranges: |___________| |________________| toClampIn: |_____| |____| |________________| result: |_____| |_| |______________| we are assuming the arguments are sorted by the start key and no overlaps. you can call spans.Collapse to get key ranges fits this requirements. Note: this algorithm is pretty like the `checkIntervalIsSubset`, can we get them together?
func IsLogBackupEnabled ¶
func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error)
IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. it should return error.
func IsLogBackupInUse ¶
func IsLogBackupInUse(ctx sessionctx.Context) bool
IsLogBackupInUse checks the log backup task existed.
func IsSysDB ¶
IsSysDB tests whether the database is system DB. Currently, the only system DB is mysql.
func IsTypeCompatible ¶
IsTypeCompatible checks whether type target is compatible with type src they're compatible if - same null/not null and unsigned flag(maybe we can allow src not null flag, target null flag later) - have same evaluation type - target's flen and decimal should be bigger or equals to src's - elements in target is superset of elements in src if they're enum or set type - same charset and collate if they're string types
func LoadBackupTables ¶
func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error)
LoadBackupTables loads schemas from BackupMeta.
func LogBackupTaskCountDec ¶
func LogBackupTaskCountDec()
LogBackupTaskCountDec decreases the count of log backup task.
func LogBackupTaskCountInc ¶
func LogBackupTaskCountInc()
LogBackupTaskCountInc increases the count of log backup task.
func MakeSafePointID ¶
func MakeSafePointID() string
MakeSafePointID makes a unique safe point ID, for reduce name conflict.
func MarshalBackupMeta ¶
func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)
MarshalBackupMeta converts the backupmeta strcture to JSON. Unlike json.Marshal, this function also format some []byte fields for human reading.
func MessageIsNotFoundStorageError ¶
MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func MessageIsPermissionDeniedStorageError ¶
MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error
func MessageIsRetryableStorageError ¶
MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
func NeedAutoID ¶
NeedAutoID checks whether the table needs backing up with an autoid.
func NextPowerOfTwo ¶
NextPowerOfTwo returns the smallest power of two greater than or equal to `i` Caller should guarantee that i > 0 and the return value is not overflow.
func PanicToErr ¶
func PanicToErr(err *error)
PanicToErr recovers when the execution get panicked, and set the error provided by the arg. generally, this would be used with named return value and `defer`, like:
func foo() (err error) { defer utils.PanicToErr(&err) return maybePanic() }
Before using this, there are some hints for reducing resource leakage or bugs:
- If any of clean work (by `defer`) relies on the error (say, when error happens, rollback some operations.), please place `defer this` AFTER that.
- All resources allocated should be freed by the `defer` syntax, or when panicking, they may not be recycled.
func SetGcRatio ¶
func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error
func StartDynamicPProfListener ¶
StartDynamicPProfListener starts the listener that will enable pprof when received `startPProfSignal`.
func StartPProfListener ¶
StartPProfListener forks a new goroutine listening on specified port and provide pprof info.
func StartServiceSafePointKeeper ¶
func StartServiceSafePointKeeper( ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint, ) error
StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity hence keeping service safepoint won't lose.
func TemporaryDBName ¶
TemporaryDBName makes a 'private' database name.
func UnmarshalBackupMeta ¶
func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)
UnmarshalBackupMeta converts the prettied JSON format of backupmeta (made by MarshalBackupMeta) back to the go structure.
func UpdateServiceSafePoint ¶
UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
Types ¶
type BRServiceSafePoint ¶
BRServiceSafePoint is metadata of service safe point from a BR 'instance'.
func (BRServiceSafePoint) MarshalLogObject ¶
func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler.
type Backoffer ¶
type Backoffer interface { // NextBackoff returns a duration to wait before retrying again NextBackoff(err error) time.Duration // Attempt returns the remain attempt times Attempt() int }
Backoffer implements a backoff policy for retrying operations.
func NewBackoffer ¶
NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewDownloadSSTBackoffer ¶
func NewDownloadSSTBackoffer() Backoffer
func NewImportSSTBackoffer ¶
func NewImportSSTBackoffer() Backoffer
func NewPDReqBackoffer ¶
func NewPDReqBackoffer() Backoffer
func NewPDReqBackofferExt ¶
func NewPDReqBackofferExt() Backoffer
type DBExecutor ¶
type DBExecutor interface { StmtExecutor BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) }
DBExecutor is a interface for statements and txn
type FakePDClient ¶
func (FakePDClient) GetAllStores ¶
func (c FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error)
GetAllStores return fake stores.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.
func NewConnPool ¶
NewConnPool creates a new Pool by the specified conn factory function and capacity.
type ProgressPrinter ¶
type ProgressPrinter struct {
// contains filtered or unexported fields
}
ProgressPrinter prints a progress bar.
func NewProgressPrinter ¶
func NewProgressPrinter( name string, total int64, redirectLog bool, ) *ProgressPrinter
NewProgressPrinter returns a new progress printer.
func StartProgress ¶
func StartProgress( ctx context.Context, name string, total int64, redirectLog bool, log logFunc, ) *ProgressPrinter
StartProgress starts progress bar.
func (*ProgressPrinter) Close ¶
func (pp *ProgressPrinter) Close()
Close closes the current progress bar.
func (*ProgressPrinter) GetCurrent ¶
func (pp *ProgressPrinter) GetCurrent() int64
GetCurrent get the current progress.
func (*ProgressPrinter) Inc ¶
func (pp *ProgressPrinter) Inc()
Inc increases the current progress bar.
func (*ProgressPrinter) IncBy ¶
func (pp *ProgressPrinter) IncBy(cnt int64)
IncBy implements glue.Progress
type QueryExecutor ¶
type QueryExecutor interface { QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row }
QueryExecutor is a interface for exec query
type RetryState ¶
type RetryState struct {
// contains filtered or unexported fields
}
RetryState is the mutable state needed for retrying. It likes the `utils.Backoffer`, but more fundamental: this only control the backoff time and knows nothing about what error happens. NOTE: Maybe also implement the backoffer via this.
func InitialRetryState ¶
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState
InitialRetryState make the initial state for retrying.
func (*RetryState) Attempt ¶
func (rs *RetryState) Attempt() int
Attempt implements the `Backoffer`. TODO: Maybe use this to replace the `exponentialBackoffer` (which is nearly homomorphic to this)?
func (*RetryState) ExponentialBackoff ¶
func (rs *RetryState) ExponentialBackoff() time.Duration
Get the exponential backoff durion and transform the state.
func (*RetryState) NextBackoff ¶
func (rs *RetryState) NextBackoff(error) time.Duration
NextBackoff implements the `Backoffer`.
func (*RetryState) RecordRetry ¶
func (rs *RetryState) RecordRetry()
RecordRetry simply record retry times, and no backoff
func (*RetryState) ShouldRetry ¶
func (rs *RetryState) ShouldRetry() bool
Whether in the current state we can retry.
type StmtExecutor ¶
type StmtExecutor interface { QueryExecutor ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) }
StmtExecutor define both query and exec methods
type StoreManager ¶
type StoreManager struct {
// contains filtered or unexported fields
}
func NewStoreManager ¶
func NewStoreManager(pdCli pd.Client, kl keepalive.ClientParameters, tlsConf *tls.Config) *StoreManager
NewStoreManager create a new manager for gRPC connections to stores.
func (*StoreManager) GetKeepalive ¶
func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters
func (*StoreManager) PDClient ¶
func (mgr *StoreManager) PDClient() pd.Client
func (*StoreManager) ResetBackupClient ¶
func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
ResetBackupClient reset the connection for backup client.
func (*StoreManager) TLSConfig ¶
func (mgr *StoreManager) TLSConfig() *tls.Config
func (*StoreManager) WithConn ¶
func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool contains a pool of workers.
func NewWorkerPool ¶
func NewWorkerPool(limit uint, name string) *WorkerPool
NewWorkerPool returns a WorkPool.
func (*WorkerPool) ApplyOnErrorGroup ¶
func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error)
ApplyOnErrorGroup executes a task in an errorgroup.
func (*WorkerPool) ApplyWithID ¶
func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc)
ApplyWithID execute a task and provides it with the worker ID.
func (*WorkerPool) ApplyWithIDInErrorGroup ¶
func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error)
ApplyWithIDInErrorGroup executes a task in an errorgroup and provides it with the worker ID.
func (*WorkerPool) ApplyWorker ¶
func (pool *WorkerPool) ApplyWorker() *Worker
ApplyWorker apply a worker.
func (*WorkerPool) HasWorker ¶
func (pool *WorkerPool) HasWorker() bool
HasWorker checks if the pool has unallocated workers.
func (*WorkerPool) IdleCount ¶
func (pool *WorkerPool) IdleCount() int
IdleCount counts how many idle workers in the pool.
func (*WorkerPool) RecycleWorker ¶
func (pool *WorkerPool) RecycleWorker(worker *Worker)
RecycleWorker recycle a worker.