Documentation ¶
Index ¶
- Constants
- Variables
- func AllStackInfo() []byte
- func AsyncStreamBy[T any](generator func() (T, error)) <-chan Result[T]
- func BuildWorkerTokenChannel(size uint) chan struct{}
- func CatchAndLogPanic()
- func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error
- func CheckLogBackupTaskExist() bool
- func CheckStoreLiveness(s *metapb.Store) error
- func CompareBytesExt(a []byte, aEmptyAsInf bool, b []byte, bEmptyAsInf bool) int
- func CompareEndKey(a, b []byte) int
- func Decrypt(content []byte, cipher *backuppb.CipherInfo, iv []byte) ([]byte, error)
- func EncloseDBAndTable(database, table string) string
- func EncloseName(name string) string
- func FallBack2CreateTable(err error) bool
- func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, ...) (*grpc.ClientConn, error)
- func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)
- func GetOrZero[T any](p *T) T
- func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)
- func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64
- func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64
- func GetSysDBCIStrName(tempDB ast.CIStr) (ast.CIStr, bool)
- func GetSysDBName(tempDB ast.CIStr) (string, bool)
- func GetTidbNewCollationEnabled() string
- func IntersectAll(s1 []kv.KeyRange, s2 []kv.KeyRange) []kv.KeyRange
- func IsEffectiveEncryptionMethod(method encryptionpb.EncryptionMethod) bool
- func IsLogBackupInUse(ctx sessionctx.Context) bool
- func IsSysDB(dbLowerName string) bool
- func IsTemplateSysDB(dbname ast.CIStr) bool
- func IsTypeCompatible(src types.FieldType, target types.FieldType) bool
- func LogBackupTaskCountDec()
- func LogBackupTaskCountInc()
- func MakeSafePointID() string
- func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)
- func MarshalMetaFile(meta *backuppb.MetaFile) ([]byte, error)
- func MarshalStatsFile(meta *backuppb.StatsFile) ([]byte, error)
- func MessageIsRetryableStorageError(msg string) bool
- func NeedAutoID(tblInfo *model.TableInfo) bool
- func PanicToErr(err *error)
- func ParseKey(format, key string) ([]byte, error)
- func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error
- func StartDynamicPProfListener(tls *tidbutils.TLS)
- func StartExitSingleListener(ctx context.Context) (context.Context, context.CancelFunc)
- func StartPProfListener(statusAddr string, wrapper *tidbutils.TLS) error
- func StartServiceSafePointKeeper(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error
- func TemporaryDBName(db string) ast.CIStr
- func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)
- func UnmarshalMetaFile(data []byte) (*backuppb.MetaFile, error)
- func UnmarshalStatsFile(data []byte) (*backuppb.StatsFile, error)
- func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error
- func WaitUntil(ctx context.Context, condition func() bool, ...) error
- func WithCleanUp(errOut *error, timeout time.Duration, fn func(context.Context) error)
- func WithRetry(ctx context.Context, retryableFunc RetryableFunc, ...) error
- func WithRetryReturnLastErr(ctx context.Context, retryableFunc RetryableFunc, ...) error
- func WithRetryV2[T any](ctx context.Context, backoffStrategy BackoffStrategy, fn RetryableFuncV2[T]) (T, error)
- type BRServiceSafePoint
- type BackoffOption
- func WithDelayTime(delay time.Duration) BackoffOption
- func WithErrorContext(errContext *ErrorContext) BackoffOption
- func WithMaxDelayTime(maxDelay time.Duration) BackoffOption
- func WithNonRetryErrorFunc(isNonRetryErr func(error) bool) BackoffOption
- func WithRemainingAttempts(attempts int) BackoffOption
- func WithRetryErrorFunc(isRetryErr func(error) bool) BackoffOption
- type BackoffStrategy
- func GiveUpRetryOn(bo BackoffStrategy, errs ...error) BackoffStrategy
- func NewAggressivePDBackoffStrategy() BackoffStrategy
- func NewBackoffRetryAllErrorStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration) BackoffStrategy
- func NewBackoffRetryAllExceptStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration, ...) BackoffStrategy
- func NewBackoffStrategy(opts ...BackoffOption) BackoffStrategy
- func NewBackupSSTBackoffStrategy() BackoffStrategy
- func NewChecksumBackoffStrategy() BackoffStrategy
- func NewConservativePDBackoffStrategy() BackoffStrategy
- func NewDiskCheckBackoffStrategy() BackoffStrategy
- func NewDownloadSSTBackoffStrategy() BackoffStrategy
- func NewFlashBackBackoffStrategy() BackoffStrategy
- func NewImportSSTBackoffStrategy() BackoffStrategy
- func NewPDBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration) BackoffStrategy
- func NewRecoveryBackoffStrategy(isRetryErrFunc func(error) bool) BackoffStrategy
- func NewTiKVStoreBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) BackoffStrategy
- func VerboseRetry(bo BackoffStrategy, logger *zap.Logger) BackoffStrategy
- type ConstantBackoff
- type ErrorContext
- type ErrorHandlingResult
- type ErrorHandlingStrategy
- type Pool
- type ProgressPrinter
- type RegisterTask
- type RegisterTaskType
- type RegisterTasksList
- type Result
- type RetryState
- type RetryWithBackoff
- type RetryableFunc
- type RetryableFuncV2
- type StoreManager
- func (mgr *StoreManager) Close()
- func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters
- func (mgr *StoreManager) PDClient() pd.Client
- func (mgr *StoreManager) RemoveConn(ctx context.Context, storeID uint64) error
- func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
- func (mgr *StoreManager) TLSConfig() *tls.Config
- func (mgr *StoreManager) TryWithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn) error) error
- func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error
- type TaskRegister
Constants ¶
const ( // region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV. FlashbackRetryTime = 3 FlashbackWaitInterval = 3 * time.Second FlashbackMaxWaitInterval = 15 * time.Second ChecksumRetryTime = 8 ChecksumWaitInterval = 1 * time.Second ChecksumMaxWaitInterval = 30 * time.Second )
const ( DefaultGcRatioVal = "1.1" DisabledGcRatioVal = "-1.0" )
const ( // RegisterImportTaskPrefix is the prefix of the key for task register // todo: remove "/import" suffix, it's confusing to have a key like "/tidb/brie/import/restore/restore-xxx" RegisterImportTaskPrefix = "/tidb/brie/import" RegisterRetryInternal = 10 * time.Second )
The key format should be {RegisterImportTaskPrefix}/{RegisterTaskType}/{taskName}
const ( // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min. DefaultBRGCSafePointTTL = 5 * 60 // DefaultCheckpointGCSafePointTTL means PD keep safePoint limit at least 72 minutes. DefaultCheckpointGCSafePointTTL = 72 * 60 // DefaultStreamStartSafePointTTL specifies keeping the server safepoint 30 mins when start task. DefaultStreamStartSafePointTTL = 1800 // DefaultStreamPauseSafePointTTL specifies Keeping the server safePoint at list 24h when pause task. DefaultStreamPauseSafePointTTL = 24 * 3600 )
const (
TidbNewCollationEnabled = "new_collation_enabled"
)
Variables ¶
var (
DumpGoroutineWhenExit atomic.Bool
)
Functions ¶
func AllStackInfo ¶
func AllStackInfo() []byte
func AsyncStreamBy ¶
func BuildWorkerTokenChannel ¶
func BuildWorkerTokenChannel(size uint) chan struct{}
func CatchAndLogPanic ¶
func CatchAndLogPanic()
CatchAndLogPanic recovers when the execution get panicked, and log the panic. generally, this would be used with `defer`, like:
func foo() { defer utils.CatchAndLogPanic() maybePanic() }
func CheckGCSafePoint ¶
CheckGCSafePoint checks whether the ts is older than GC safepoint. Note: It ignores errors other than exceed GC safepoint.
func CheckLogBackupTaskExist ¶
func CheckLogBackupTaskExist() bool
CheckLogBackupTaskExist checks that whether log-backup is existed.
func CheckStoreLiveness ¶
CheckStoreLiveness checks whether a store is still alive. Some versions of PD may not set the store state in the gRPC response. We need to check it manually.
func CompareBytesExt ¶
CompareBytesExt compare two byte sequences. different from `bytes.Compare`, we can provide whether to treat the key as inf when meet empty key to this.
func CompareEndKey ¶
CompareEndKey compared two keys that BOTH represent the EXCLUSIVE ending of some range. An empty end key is the very end, so an empty key is greater than any other keys. Please note that this function is not applicable if any one argument is not an EXCLUSIVE ending of a range.
func EncloseDBAndTable ¶
EncloseDBAndTable formats the database and table name in sql.
func FallBack2CreateTable ¶
func GRPCConn ¶
func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, opts ...grpc.DialOption) (*grpc.ClientConn, error)
func GetGcRatio ¶
func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)
func GetOrZero ¶
func GetOrZero[T any](p *T) T
GetOrZero returns the value pointed to by p, or a zero value of its type if p is nil.
func GetRegionSplitInfo ¶
func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)
func GetSplitKeys ¶
func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64
func GetSplitSize ¶
func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64
func GetSysDBCIStrName ¶
GetSysDBCIStrName get the CIStr name of system DB
func GetSysDBName ¶
GetSysDBName get the original name of system DB
func GetTidbNewCollationEnabled ¶
func GetTidbNewCollationEnabled() string
GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled.
func IntersectAll ¶
IntersectAll returns the intersect of two set of segments. OWNERSHIP INFORMATION: For running faster, this function would MUTATE the input slice. (i.e. takes its ownership.) (But it is promised that this function won't change the `start key` and `end key` slice) If you want to use the input slice after, call `CloneSlice` over arguments before passing them.
You can treat "set of segments" as points maybe not adjacent. in this way, IntersectAll(s1, s2) = { point | point in both s1 and s2 } Example: ranges: |___________| |________________| toClampIn: |_____| |____| |________________| result: |_____| |_| |______________| we are assuming the arguments are sorted by the start key and no overlaps. you can call spans.Collapse to get key ranges fits this requirements. Note: this algorithm is pretty like the `checkIntervalIsSubset`, can we get them together?
func IsEffectiveEncryptionMethod ¶
func IsEffectiveEncryptionMethod(method encryptionpb.EncryptionMethod) bool
func IsLogBackupInUse ¶
func IsLogBackupInUse(ctx sessionctx.Context) bool
IsLogBackupInUse checks the log backup task existed.
func IsSysDB ¶
IsSysDB tests whether the database is system DB. Currently, both `mysql` and `sys` are system DB.
func IsTemplateSysDB ¶
IsTemplateSysDB checks wheterh the dbname is temporary system database(__TiDB_BR_Temporary_mysql or __TiDB_BR_Temporary_sys).
func IsTypeCompatible ¶
IsTypeCompatible checks whether type target is compatible with type src they're compatible if - same null/not null and unsigned flag(maybe we can allow src not null flag, target null flag later) - have same evaluation type - target's flen and decimal should be bigger or equals to src's - elements in target is superset of elements in src if they're enum or set type - same charset and collate if they're string types
func LogBackupTaskCountDec ¶
func LogBackupTaskCountDec()
LogBackupTaskCountDec decreases the count of log backup task.
func LogBackupTaskCountInc ¶
func LogBackupTaskCountInc()
LogBackupTaskCountInc increases the count of log backup task.
func MakeSafePointID ¶
func MakeSafePointID() string
MakeSafePointID makes a unique safe point ID, for reduce name conflict.
func MarshalBackupMeta ¶
func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)
MarshalBackupMeta converts the backupmeta strcture to JSON. Unlike json.Marshal, this function also format some []byte fields for human reading.
func MessageIsRetryableStorageError ¶
MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
func NeedAutoID ¶
NeedAutoID checks whether the table needs backing up with an autoid.
func PanicToErr ¶
func PanicToErr(err *error)
PanicToErr recovers when the execution get panicked, and set the error provided by the arg. generally, this would be used with named return value and `defer`, like:
func foo() (err error) { defer utils.PanicToErr(&err) return maybePanic() }
Before using this, there are some hints for reducing resource leakage or bugs:
- If any of clean work (by `defer`) relies on the error (say, when error happens, rollback some operations.), please place `defer this` AFTER that.
- All resources allocated should be freed by the `defer` syntax, or when panicking, they may not be recycled.
func SetGcRatio ¶
func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error
func StartDynamicPProfListener ¶
StartDynamicPProfListener starts the listener that will enable pprof when received `startPProfSignal`.
func StartExitSingleListener ¶
func StartPProfListener ¶
StartPProfListener forks a new goroutine listening on specified port and provide pprof info.
func StartServiceSafePointKeeper ¶
func StartServiceSafePointKeeper( ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint, ) error
StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity hence keeping service safepoint won't lose.
func TemporaryDBName ¶
TemporaryDBName makes a 'private' database name.
func UnmarshalBackupMeta ¶
func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)
UnmarshalBackupMeta converts the prettied JSON format of backupmeta (made by MarshalBackupMeta) back to the go structure.
func UpdateServiceSafePoint ¶
UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
func WithCleanUp ¶
WithCleanUp runs a function with a timeout, and register its error to its argument if there is one. This is useful while you want to run some must run but error-prone code in a defer context. Simple usage:
func foo() (err error) { defer WithCleanUp(&err, time.Second, func(ctx context.Context) error { // do something return nil }) }
func WithRetry ¶
func WithRetry( ctx context.Context, retryableFunc RetryableFunc, backoffStrategy BackoffStrategy, ) error
WithRetry retries a given operation with a backoff policy.
Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a multierr containing all errors encountered.
func WithRetryReturnLastErr ¶
func WithRetryReturnLastErr( ctx context.Context, retryableFunc RetryableFunc, backoffStrategy BackoffStrategy, ) error
WithRetryReturnLastErr is like WithRetry but the returned error is the last error during retry rather than a multierr.
func WithRetryV2 ¶
func WithRetryV2[T any]( ctx context.Context, backoffStrategy BackoffStrategy, fn RetryableFuncV2[T], ) (T, error)
WithRetryV2 retries a given operation with a backoff policy.
Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a multierr that containing all errors encountered. Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.
Types ¶
type BRServiceSafePoint ¶
BRServiceSafePoint is metadata of service safe point from a BR 'instance'.
func (BRServiceSafePoint) MarshalLogObject ¶
func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler.
type BackoffOption ¶
type BackoffOption func(*backoffStrategyImpl)
BackoffOption defines a function type for configuring backoffStrategyImpl
func WithDelayTime ¶
func WithDelayTime(delay time.Duration) BackoffOption
WithDelayTime sets the initial delay time
func WithErrorContext ¶
func WithErrorContext(errContext *ErrorContext) BackoffOption
WithErrorContext sets the error context
func WithMaxDelayTime ¶
func WithMaxDelayTime(maxDelay time.Duration) BackoffOption
WithMaxDelayTime sets the maximum delay time
func WithNonRetryErrorFunc ¶
func WithNonRetryErrorFunc(isNonRetryErr func(error) bool) BackoffOption
WithNonRetryErrorFunc sets the non-retry error checking function
func WithRemainingAttempts ¶
func WithRemainingAttempts(attempts int) BackoffOption
WithRemainingAttempts sets the remaining attempts
func WithRetryErrorFunc ¶
func WithRetryErrorFunc(isRetryErr func(error) bool) BackoffOption
WithRetryErrorFunc sets the retry error checking function
type BackoffStrategy ¶
type BackoffStrategy interface { // NextBackoff returns a duration to wait before retrying again NextBackoff(err error) time.Duration // RemainingAttempts returns the remaining number of attempts RemainingAttempts() int }
BackoffStrategy implements a backoff strategy for retry operations.
func GiveUpRetryOn ¶
func GiveUpRetryOn(bo BackoffStrategy, errs ...error) BackoffStrategy
func NewAggressivePDBackoffStrategy ¶
func NewAggressivePDBackoffStrategy() BackoffStrategy
func NewBackoffRetryAllErrorStrategy ¶
func NewBackoffRetryAllErrorStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration) BackoffStrategy
func NewBackoffStrategy ¶
func NewBackoffStrategy(opts ...BackoffOption) BackoffStrategy
NewBackoffStrategy creates a new backoff strategy with custom retry logic
func NewBackupSSTBackoffStrategy ¶
func NewBackupSSTBackoffStrategy() BackoffStrategy
func NewChecksumBackoffStrategy ¶
func NewChecksumBackoffStrategy() BackoffStrategy
func NewConservativePDBackoffStrategy ¶
func NewConservativePDBackoffStrategy() BackoffStrategy
func NewDiskCheckBackoffStrategy ¶
func NewDiskCheckBackoffStrategy() BackoffStrategy
func NewDownloadSSTBackoffStrategy ¶
func NewDownloadSSTBackoffStrategy() BackoffStrategy
func NewFlashBackBackoffStrategy ¶
func NewFlashBackBackoffStrategy() BackoffStrategy
func NewImportSSTBackoffStrategy ¶
func NewImportSSTBackoffStrategy() BackoffStrategy
func NewPDBackoffStrategy ¶
func NewPDBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration) BackoffStrategy
func NewRecoveryBackoffStrategy ¶
func NewRecoveryBackoffStrategy(isRetryErrFunc func(error) bool) BackoffStrategy
func NewTiKVStoreBackoffStrategy ¶
func NewTiKVStoreBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) BackoffStrategy
func VerboseRetry ¶
func VerboseRetry(bo BackoffStrategy, logger *zap.Logger) BackoffStrategy
type ConstantBackoff ¶
ConstantBackoff is a backoff strategy that retry forever until success.
func (ConstantBackoff) NextBackoff ¶
func (c ConstantBackoff) NextBackoff(err error) time.Duration
NextBackoff returns a duration to wait before retrying again
func (ConstantBackoff) RemainingAttempts ¶
func (c ConstantBackoff) RemainingAttempts() int
RemainingAttempts returns the remain attempt times
type ErrorContext ¶
type ErrorContext struct {
// contains filtered or unexported fields
}
func NewDefaultContext ¶
func NewDefaultContext() *ErrorContext
func NewErrorContext ¶
func NewErrorContext(scenario string, limitation int) *ErrorContext
func NewZeroRetryContext ¶
func NewZeroRetryContext(scenario string) *ErrorContext
type ErrorHandlingResult ¶
type ErrorHandlingResult struct { Strategy ErrorHandlingStrategy Reason string }
func HandleBackupError ¶
func HandleBackupError(err *backuppb.Error, storeId uint64, ec *ErrorContext) ErrorHandlingResult
func HandleUnknownBackupError ¶
func HandleUnknownBackupError(msg string, uuid uint64, ec *ErrorContext) ErrorHandlingResult
HandleUnknownBackupError UNSAFE! TODO: remove this method and map all the current unknown errors to error types
type ErrorHandlingStrategy ¶
type ErrorHandlingStrategy int
const ( // StrategyRetry error can be retried but will consume the backoff attempt quota. StrategyRetry ErrorHandlingStrategy = iota // StrategyGiveUp means unrecoverable error happened and the BR should exit // for example: // 1. permission not valid. // 2. data not found. // 3. retry too many times StrategyGiveUp // StrategyUnknown for StrategyUnknown error StrategyUnknown )
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.
func NewConnPool ¶
NewConnPool creates a new Pool by the specified conn factory function and capacity.
type ProgressPrinter ¶
type ProgressPrinter struct {
// contains filtered or unexported fields
}
ProgressPrinter prints a progress bar.
func NewProgressPrinter ¶
func NewProgressPrinter( name string, total int64, redirectLog bool, ) *ProgressPrinter
NewProgressPrinter returns a new progress printer.
func StartProgress ¶
func StartProgress( ctx context.Context, name string, total int64, redirectLog bool, log logFunc, ) *ProgressPrinter
StartProgress starts progress bar.
func (*ProgressPrinter) Close ¶
func (pp *ProgressPrinter) Close()
Close closes the current progress bar.
func (*ProgressPrinter) GetCurrent ¶
func (pp *ProgressPrinter) GetCurrent() int64
GetCurrent get the current progress.
func (*ProgressPrinter) Inc ¶
func (pp *ProgressPrinter) Inc()
Inc increases the current progress bar.
func (*ProgressPrinter) IncBy ¶
func (pp *ProgressPrinter) IncBy(cnt int64)
IncBy implements glue.Progress
type RegisterTask ¶
RegisterTask saves the task's information
func (RegisterTask) MessageToUser ¶
func (task RegisterTask) MessageToUser() string
MessageToUser marshal the task to user message
type RegisterTaskType ¶
type RegisterTaskType int
RegisterTaskType for the sub-prefix path for key
const ( RegisterRestore RegisterTaskType = iota RegisterLightning RegisterImportInto )
func (RegisterTaskType) String ¶
func (tp RegisterTaskType) String() string
type RegisterTasksList ¶
type RegisterTasksList struct {
Tasks []RegisterTask
}
func GetImportTasksFrom ¶
GetImportTasksFrom try to get all the import tasks with prefix `RegisterTaskPrefix`
func (RegisterTasksList) Empty ¶
func (list RegisterTasksList) Empty() bool
func (RegisterTasksList) MessageToUser ¶
func (list RegisterTasksList) MessageToUser() string
type RetryState ¶
type RetryState struct {
// contains filtered or unexported fields
}
RetryState is the mutable state needed for retrying. It likes the `utils.BackoffStrategy`, but more fundamental: this only control the backoff time and knows nothing about what error happens. NOTE: Maybe also implement the backoffer via this. TODO: merge with BackoffStrategy
func InitialRetryState ¶
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState
InitialRetryState make the initial state for retrying.
func (*RetryState) ExponentialBackoff ¶
func (rs *RetryState) ExponentialBackoff() time.Duration
Get the exponential backoff durion and transform the state.
func (*RetryState) GiveUp ¶
func (rs *RetryState) GiveUp()
func (*RetryState) NextBackoff ¶
func (rs *RetryState) NextBackoff(error) time.Duration
NextBackoff implements the `BackoffStrategy`.
func (*RetryState) ReduceRetry ¶
func (rs *RetryState) ReduceRetry()
ReduceRetry reduces retry times for 1.
func (*RetryState) RemainingAttempts ¶
func (rs *RetryState) RemainingAttempts() int
Attempt implements the `BackoffStrategy`. TODO: Maybe use this to replace the `exponentialBackoffer` (which is nearly homomorphic to this)?
func (*RetryState) ShouldRetry ¶
func (rs *RetryState) ShouldRetry() bool
Whether in the current state we can retry.
type RetryWithBackoff ¶
type RetryWithBackoff struct {
// contains filtered or unexported fields
}
RetryWithBackoff is a simple context for a "mixed" retry. Some of TiDB APIs, say, `ResolveLock` requires a `tikv.Backoffer` as argument. But the `tikv.Backoffer` isn't pretty customizable, it has some sorts of predefined configuration but we cannot create new one. So we are going to mix up the flavour of `tikv.Backoffer` and our homemade back off strategy. That is what the `RetryWithBackoff` did.
func AdaptTiKVBackoffer ¶
func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoff
AdaptTiKVBackoffer creates an "ad-hoc" backoffStrategy, which wraps a backoffer and provides some new functions: When backing off, we can manually provide it a specified sleep duration instead of directly provide a retry.Config Which is sealed in the "client-go/internal".
func (*RetryWithBackoff) BackOff ¶
func (r *RetryWithBackoff) BackOff() error
BackOff executes the back off: sleep for a precalculated backoff time. See `RequestBackOff` for more details.
func (*RetryWithBackoff) Inner ¶
func (r *RetryWithBackoff) Inner() *tikv.Backoffer
Inner returns the reference to the inner `backoffer`.
func (*RetryWithBackoff) MaxSleepInMS ¶
func (r *RetryWithBackoff) MaxSleepInMS() int
MaxSleepInMS returns the max sleep time for the retry context in ms.
func (*RetryWithBackoff) NextSleepInMS ¶
func (r *RetryWithBackoff) NextSleepInMS() int
NextSleepInMS returns the time `BackOff` will sleep in ms of the state.
func (*RetryWithBackoff) RequestBackOff ¶
func (r *RetryWithBackoff) RequestBackOff(ms int)
RequestBackOff register the intent of backing off at least n milliseconds. That intent will be fulfilled when calling `BackOff`.
func (*RetryWithBackoff) TotalSleepInMS ¶
func (r *RetryWithBackoff) TotalSleepInMS() int
TotalSleepInMS returns the total slept time in ms.
type StoreManager ¶
type StoreManager struct {
// contains filtered or unexported fields
}
func NewStoreManager ¶
func NewStoreManager(pdCli pd.Client, kl keepalive.ClientParameters, tlsConf *tls.Config) *StoreManager
NewStoreManager create a new manager for gRPC connections to stores.
func (*StoreManager) GetKeepalive ¶
func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters
func (*StoreManager) PDClient ¶
func (mgr *StoreManager) PDClient() pd.Client
func (*StoreManager) RemoveConn ¶
func (mgr *StoreManager) RemoveConn(ctx context.Context, storeID uint64) error
func (*StoreManager) ResetBackupClient ¶
func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
ResetBackupClient reset the connection for backup client.
func (*StoreManager) TLSConfig ¶
func (mgr *StoreManager) TLSConfig() *tls.Config
func (*StoreManager) TryWithConn ¶
func (mgr *StoreManager) TryWithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn) error) error
func (*StoreManager) WithConn ¶
func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error
type TaskRegister ¶
type TaskRegister interface { // Close closes the background task if using RegisterTask // and revoke the lease. // NOTE: we don't close the etcd client here, call should do it. Close(ctx context.Context) (err error) // RegisterTask firstly put its key to PD with a lease, // and start to keepalive the lease in the background. // DO NOT mix calls to RegisterTask and RegisterTaskOnce. RegisterTask(c context.Context) error // RegisterTaskOnce put its key to PD with a lease if the key does not exist, // else we refresh the lease. // you have to call this method periodically to keep the lease alive. // DO NOT mix calls to RegisterTask and RegisterTaskOnce. RegisterTaskOnce(ctx context.Context) error }
TaskRegister can register the task to PD with a lease.
func NewTaskRegister ¶
func NewTaskRegister(client *clientv3.Client, tp RegisterTaskType, taskName string) TaskRegister
NewTaskRegister build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func NewTaskRegisterWithTTL ¶
func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) TaskRegister
NewTaskRegisterWithTTL build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
package storewatch provides a `Watcher` type which allows the user to listen the events of lifetime of stores.
|
package storewatch provides a `Watcher` type which allows the user to listen the events of lifetime of stores. |