utils

package
v1.1.0-beta.0...-1acbbec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2024 License: Apache-2.0, Apache-2.0 Imports: 60 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV.
	FlashbackRetryTime       = 3
	FlashbackWaitInterval    = 3 * time.Second
	FlashbackMaxWaitInterval = 15 * time.Second

	ChecksumRetryTime       = 8
	ChecksumWaitInterval    = 1 * time.Second
	ChecksumMaxWaitInterval = 30 * time.Second
)
View Source
const (
	// RegisterImportTaskPrefix is the prefix of the key for task register
	// todo: remove "/import" suffix, it's confusing to have a key like "/tidb/brie/import/restore/restore-xxx"
	RegisterImportTaskPrefix = "/tidb/brie/import"

	RegisterRetryInternal = 10 * time.Second
)

The key format should be {RegisterImportTaskPrefix}/{RegisterTaskType}/{taskName}

View Source
const (

	// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min.
	DefaultBRGCSafePointTTL = 5 * 60
	// DefaultCheckpointGCSafePointTTL means PD keep safePoint limit at least 72 minutes.
	DefaultCheckpointGCSafePointTTL = 72 * 60
	// DefaultStreamStartSafePointTTL specifies keeping the server safepoint 30 mins when start task.
	DefaultStreamStartSafePointTTL = 1800
	// DefaultStreamPauseSafePointTTL specifies Keeping the server safePoint at list 24h when pause task.
	DefaultStreamPauseSafePointTTL = 24 * 3600
)
View Source
const DefaultGcRatioVal = "1.1"
View Source
const (
	TidbNewCollationEnabled = "new_collation_enabled"
)

Variables

View Source
var (
	DumpGoroutineWhenExit atomic.Bool
)

Functions

func AllStackInfo

func AllStackInfo() []byte

func AsyncStreamBy

func AsyncStreamBy[T any](generator func() (T, error)) <-chan Result[T]

func BuildWorkerTokenChannel

func BuildWorkerTokenChannel(size uint) chan struct{}

func CatchAndLogPanic

func CatchAndLogPanic()

CatchAndLogPanic recovers when the execution get panicked, and log the panic. generally, this would be used with `defer`, like:

func foo() {
  defer utils.CatchAndLogPanic()
  maybePanic()
}

func CheckGCSafePoint

func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error

CheckGCSafePoint checks whether the ts is older than GC safepoint. Note: It ignores errors other than exceed GC safepoint.

func CheckLogBackupTaskExist

func CheckLogBackupTaskExist() bool

CheckLogBackupTaskExist checks that whether log-backup is existed.

func CheckStoreLiveness

func CheckStoreLiveness(s *metapb.Store) error

CheckStoreLiveness checks whether a store is still alive. Some versions of PD may not set the store state in the gRPC response. We need to check it manually.

func CloneSlice

func CloneSlice[T any](s []T) []T

CloneSlice sallowly clones a slice.

func CompareBytesExt

func CompareBytesExt(a []byte, aEmptyAsInf bool, b []byte, bEmptyAsInf bool) int

CompareBytesExt compare two byte sequences. different from `bytes.Compare`, we can provide whether to treat the key as inf when meet empty key to this.

func CompareEndKey

func CompareEndKey(a, b []byte) int

CompareEndKey compared two keys that BOTH represent the EXCLUSIVE ending of some range. An empty end key is the very end, so an empty key is greater than any other keys. Please note that this function is not applicable if any one argument is not an EXCLUSIVE ending of a range.

func EncloseDBAndTable

func EncloseDBAndTable(database, table string) string

EncloseDBAndTable formats the database and table name in sql.

func EncloseName

func EncloseName(name string) string

EncloseName formats name in sql.

func FallBack2CreateTable

func FallBack2CreateTable(err error) bool

func GRPCConn

func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func GetGcRatio

func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)

func GetOrZero

func GetOrZero[T any](p *T) T

GetOrZero returns the value pointed to by p, or a zero value of its type if p is nil.

func GetRegionSplitInfo

func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)

func GetSplitKeys

func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64

func GetSplitSize

func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64

func GetSysDBCIStrName

func GetSysDBCIStrName(tempDB pmodel.CIStr) (pmodel.CIStr, bool)

GetSysDBCIStrName get the CIStr name of system DB

func GetSysDBName

func GetSysDBName(tempDB pmodel.CIStr) (string, bool)

GetSysDBName get the original name of system DB

func GetTidbNewCollationEnabled

func GetTidbNewCollationEnabled() string

GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled.

func IntersectAll

func IntersectAll(s1 []kv.KeyRange, s2 []kv.KeyRange) []kv.KeyRange

IntersectAll returns the intersect of two set of segments. OWNERSHIP INFORMATION: For running faster, this function would MUTATE the input slice. (i.e. takes its ownership.) (But it is promised that this function won't change the `start key` and `end key` slice) If you want to use the input slice after, call `CloneSlice` over arguments before passing them.

You can treat "set of segments" as points maybe not adjacent. in this way, IntersectAll(s1, s2) = { point | point in both s1 and s2 } Example: ranges: |___________| |________________| toClampIn: |_____| |____| |________________| result: |_____| |_| |______________| we are assuming the arguments are sorted by the start key and no overlaps. you can call spans.Collapse to get key ranges fits this requirements. Note: this algorithm is pretty like the `checkIntervalIsSubset`, can we get them together?

func IsLogBackupInUse

func IsLogBackupInUse(ctx sessionctx.Context) bool

IsLogBackupInUse checks the log backup task existed.

func IsSysDB

func IsSysDB(dbLowerName string) bool

IsSysDB tests whether the database is system DB. Currently, both `mysql` and `sys` are system DB.

func IsTemplateSysDB

func IsTemplateSysDB(dbname pmodel.CIStr) bool

IsTemplateSysDB checks wheterh the dbname is temporary system database(__TiDB_BR_Temporary_mysql or __TiDB_BR_Temporary_sys).

func IsTypeCompatible

func IsTypeCompatible(src types.FieldType, target types.FieldType) bool

IsTypeCompatible checks whether type target is compatible with type src they're compatible if - same null/not null and unsigned flag(maybe we can allow src not null flag, target null flag later) - have same evaluation type - target's flen and decimal should be bigger or equals to src's - elements in target is superset of elements in src if they're enum or set type - same charset and collate if they're string types

func LogBackupTaskCountDec

func LogBackupTaskCountDec()

LogBackupTaskCountDec decreases the count of log backup task.

func LogBackupTaskCountInc

func LogBackupTaskCountInc()

LogBackupTaskCountInc increases the count of log backup task.

func MakeSafePointID

func MakeSafePointID() string

MakeSafePointID makes a unique safe point ID, for reduce name conflict.

func MarshalBackupMeta

func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)

MarshalBackupMeta converts the backupmeta strcture to JSON. Unlike json.Marshal, this function also format some []byte fields for human reading.

func MessageIsRetryableStorageError

func MessageIsRetryableStorageError(msg string) bool

MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.

func NeedAutoID

func NeedAutoID(tblInfo *model.TableInfo) bool

NeedAutoID checks whether the table needs backing up with an autoid.

func PanicToErr

func PanicToErr(err *error)

PanicToErr recovers when the execution get panicked, and set the error provided by the arg. generally, this would be used with named return value and `defer`, like:

func foo() (err error) {
  defer utils.PanicToErr(&err)
  return maybePanic()
}

Before using this, there are some hints for reducing resource leakage or bugs:

  • If any of clean work (by `defer`) relies on the error (say, when error happens, rollback some operations.), please place `defer this` AFTER that.
  • All resources allocated should be freed by the `defer` syntax, or when panicking, they may not be recycled.

func ParseKey

func ParseKey(format, key string) ([]byte, error)

ParseKey parse key by given format.

func SetGcRatio

func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error

func StartDynamicPProfListener

func StartDynamicPProfListener(tls *tidbutils.TLS)

StartDynamicPProfListener starts the listener that will enable pprof when received `startPProfSignal`.

func StartExitSingleListener

func StartExitSingleListener(ctx context.Context) (context.Context, context.CancelFunc)

func StartPProfListener

func StartPProfListener(statusAddr string, wrapper *tidbutils.TLS) error

StartPProfListener forks a new goroutine listening on specified port and provide pprof info.

func StartServiceSafePointKeeper

func StartServiceSafePointKeeper(
	ctx context.Context,
	pdClient pd.Client,
	sp BRServiceSafePoint,
) error

StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity hence keeping service safepoint won't lose.

func TemporaryDBName

func TemporaryDBName(db string) pmodel.CIStr

TemporaryDBName makes a 'private' database name.

func UnmarshalBackupMeta

func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)

UnmarshalBackupMeta converts the prettied JSON format of backupmeta (made by MarshalBackupMeta) back to the go structure.

func UpdateServiceSafePoint

func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error

UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.

func WithCleanUp

func WithCleanUp(errOut *error, timeout time.Duration, fn func(context.Context) error)

WithCleanUp runs a function with a timeout, and register its error to its argument if there is one. This is useful while you want to run some must run but error-prone code in a defer context. Simple usage:

func foo() (err error) {
	defer WithCleanUp(&err, time.Second, func(ctx context.Context) error {
		// do something
		return nil
	})
}

func WithRetry

func WithRetry(
	ctx context.Context,
	retryableFunc RetryableFunc,
	backoffer Backoffer,
) error

WithRetry retries a given operation with a backoff policy.

Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a multierr containing all errors encountered.

func WithRetryReturnLastErr

func WithRetryReturnLastErr(
	ctx context.Context,
	retryableFunc RetryableFunc,
	backoffer Backoffer,
) error

WithRetryReturnLastErr is like WithRetry but the returned error is the last error during retry rather than a multierr.

func WithRetryV2

func WithRetryV2[T any](
	ctx context.Context,
	backoffer Backoffer,
	fn RetryableFuncV2[T],
) (T, error)

WithRetryV2 retries a given operation with a backoff policy.

Returns the returned value if `retryableFunc` succeeded at least once. Otherwise, returns a multierr that containing all errors encountered. Comparing with `WithRetry`, this function reordered the argument order and supports catching the return value.

Types

type BRServiceSafePoint

type BRServiceSafePoint struct {
	ID       string
	TTL      int64
	BackupTS uint64
}

BRServiceSafePoint is metadata of service safe point from a BR 'instance'.

func (BRServiceSafePoint) MarshalLogObject

func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler.

type Backoffer

type Backoffer interface {
	// NextBackoff returns a duration to wait before retrying again
	NextBackoff(err error) time.Duration
	// Attempt returns the remain attempt times
	Attempt() int
}

Backoffer implements a backoff policy for retrying operations.

func GiveUpRetryOn

func GiveUpRetryOn(bo Backoffer, errs ...error) Backoffer

func NewBackoffer

func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer

NewBackoffer creates a new controller regulating a truncated exponential backoff.

func NewBackupSSTBackoffer

func NewBackupSSTBackoffer() Backoffer

func NewDiskCheckBackoffer

func NewDiskCheckBackoffer() Backoffer

func NewDownloadSSTBackoffer

func NewDownloadSSTBackoffer() Backoffer

func NewImportSSTBackoffer

func NewImportSSTBackoffer() Backoffer

func NewPDReqBackoffer

func NewPDReqBackoffer() Backoffer

func NewPDReqBackofferExt

func NewPDReqBackofferExt() Backoffer

func VerboseRetry

func VerboseRetry(bo Backoffer, logger *zap.Logger) Backoffer

type ConstantBackoff

type ConstantBackoff time.Duration

ConstantBackoff is a backoffer that retry forever until success.

func (ConstantBackoff) Attempt

func (c ConstantBackoff) Attempt() int

Attempt returns the remain attempt times

func (ConstantBackoff) NextBackoff

func (c ConstantBackoff) NextBackoff(err error) time.Duration

NextBackoff returns a duration to wait before retrying again

type DiskCheckBackoffer

type DiskCheckBackoffer struct {
	// contains filtered or unexported fields
}

func (*DiskCheckBackoffer) Attempt

func (bo *DiskCheckBackoffer) Attempt() int

func (*DiskCheckBackoffer) NextBackoff

func (bo *DiskCheckBackoffer) NextBackoff(err error) time.Duration

type ErrorContext

type ErrorContext struct {
	// contains filtered or unexported fields
}

func NewDefaultContext

func NewDefaultContext() *ErrorContext

func NewErrorContext

func NewErrorContext(scenario string, limitation int) *ErrorContext

type ErrorHandlingResult

type ErrorHandlingResult struct {
	Strategy ErrorHandlingStrategy
	Reason   string
}

func HandleBackupError

func HandleBackupError(err *backuppb.Error, storeId uint64, ec *ErrorContext) ErrorHandlingResult

func HandleUnknownBackupError

func HandleUnknownBackupError(msg string, uuid uint64, ec *ErrorContext) ErrorHandlingResult

HandleUnknownBackupError UNSAFE! TODO: remove this method and map all the current unknown errors to error types

type ErrorHandlingStrategy

type ErrorHandlingStrategy int
const (
	// StrategyRetry error can be retried but will consume the backoff attempt quota.
	StrategyRetry ErrorHandlingStrategy = iota
	// StrategyGiveUp means unrecoverable error happened and the BR should exit
	// for example:
	// 1. permission not valid.
	// 2. data not found.
	// 3. retry too many times
	StrategyGiveUp
	// StrategyUnknown for StrategyUnknown error
	StrategyUnknown
)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.

func NewConnPool

func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) *Pool

NewConnPool creates a new Pool by the specified conn factory function and capacity.

func (*Pool) Close

func (p *Pool) Close()

Close closes the conn pool.

func (*Pool) Get

func (p *Pool) Get(ctx context.Context) (*grpc.ClientConn, error)

Get tries to get an existing connection from the pool, or make a new one if the pool not full.

type ProgressPrinter

type ProgressPrinter struct {
	// contains filtered or unexported fields
}

ProgressPrinter prints a progress bar.

func NewProgressPrinter

func NewProgressPrinter(
	name string,
	total int64,
	redirectLog bool,
) *ProgressPrinter

NewProgressPrinter returns a new progress printer.

func StartProgress

func StartProgress(
	ctx context.Context,
	name string,
	total int64,
	redirectLog bool,
	log logFunc,
) *ProgressPrinter

StartProgress starts progress bar.

func (*ProgressPrinter) Close

func (pp *ProgressPrinter) Close()

Close closes the current progress bar.

func (*ProgressPrinter) GetCurrent

func (pp *ProgressPrinter) GetCurrent() int64

GetCurrent get the current progress.

func (*ProgressPrinter) Inc

func (pp *ProgressPrinter) Inc()

Inc increases the current progress bar.

func (*ProgressPrinter) IncBy

func (pp *ProgressPrinter) IncBy(cnt int64)

IncBy implements glue.Progress

type RegisterTask

type RegisterTask struct {
	Key     string
	LeaseID int64
	TTL     int64
}

RegisterTask saves the task's information

func (RegisterTask) MessageToUser

func (task RegisterTask) MessageToUser() string

MessageToUser marshal the task to user message

type RegisterTaskType

type RegisterTaskType int

RegisterTaskType for the sub-prefix path for key

const (
	RegisterRestore RegisterTaskType = iota
	RegisterLightning
	RegisterImportInto
)

func (RegisterTaskType) String

func (tp RegisterTaskType) String() string

type RegisterTasksList

type RegisterTasksList struct {
	Tasks []RegisterTask
}

func GetImportTasksFrom

func GetImportTasksFrom(ctx context.Context, client *clientv3.Client) (RegisterTasksList, error)

GetImportTasksFrom try to get all the import tasks with prefix `RegisterTaskPrefix`

func (RegisterTasksList) Empty

func (list RegisterTasksList) Empty() bool

func (RegisterTasksList) MessageToUser

func (list RegisterTasksList) MessageToUser() string

type Result

type Result[T any] struct {
	Err  error
	Item T
}

type RetryState

type RetryState struct {
	// contains filtered or unexported fields
}

RetryState is the mutable state needed for retrying. It likes the `utils.Backoffer`, but more fundamental: this only control the backoff time and knows nothing about what error happens. NOTE: Maybe also implement the backoffer via this.

func InitialRetryState

func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState

InitialRetryState make the initial state for retrying.

func (*RetryState) Attempt

func (rs *RetryState) Attempt() int

Attempt implements the `Backoffer`. TODO: Maybe use this to replace the `exponentialBackoffer` (which is nearly homomorphic to this)?

func (*RetryState) ExponentialBackoff

func (rs *RetryState) ExponentialBackoff() time.Duration

Get the exponential backoff durion and transform the state.

func (*RetryState) GiveUp

func (rs *RetryState) GiveUp()

func (*RetryState) NextBackoff

func (rs *RetryState) NextBackoff(error) time.Duration

NextBackoff implements the `Backoffer`.

func (*RetryState) ReduceRetry

func (rs *RetryState) ReduceRetry()

ReduceRetry reduces retry times for 1.

func (*RetryState) ShouldRetry

func (rs *RetryState) ShouldRetry() bool

Whether in the current state we can retry.

type RetryWithBackoffer

type RetryWithBackoffer struct {
	// contains filtered or unexported fields
}

RetryWithBackoffer is a simple context for a "mixed" retry. Some of TiDB APIs, say, `ResolveLock` requires a `tikv.Backoffer` as argument. But the `tikv.Backoffer` isn't pretty customizable, it has some sorts of predefined configuration but we cannot create new one. So we are going to mix up the flavour of `tikv.Backoffer` and our homemade back off strategy. That is what the `RetryWithBackoffer` did.

func AdaptTiKVBackoffer

func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoffer

AdaptTiKVBackoffer creates an "ad-hoc" backoffer, which wraps a backoffer and provides some new functions: When backing off, we can manually provide it a specified sleep duration instead of directly provide a retry.Config Which is sealed in the "client-go/internal".

func (*RetryWithBackoffer) BackOff

func (r *RetryWithBackoffer) BackOff() error

BackOff executes the back off: sleep for a precalculated backoff time. See `RequestBackOff` for more details.

func (*RetryWithBackoffer) Inner

func (r *RetryWithBackoffer) Inner() *tikv.Backoffer

Inner returns the reference to the inner `backoffer`.

func (*RetryWithBackoffer) MaxSleepInMS

func (r *RetryWithBackoffer) MaxSleepInMS() int

MaxSleepInMS returns the max sleep time for the retry context in ms.

func (*RetryWithBackoffer) NextSleepInMS

func (r *RetryWithBackoffer) NextSleepInMS() int

NextSleepInMS returns the time `BackOff` will sleep in ms of the state.

func (*RetryWithBackoffer) RequestBackOff

func (r *RetryWithBackoffer) RequestBackOff(ms int)

RequestBackOff register the intent of backing off at least n milliseconds. That intent will be fulfilled when calling `BackOff`.

func (*RetryWithBackoffer) TotalSleepInMS

func (r *RetryWithBackoffer) TotalSleepInMS() int

TotalSleepInMS returns the total sleeped time in ms.

type RetryableFunc

type RetryableFunc func() error

RetryableFunc presents a retryable operation.

type RetryableFuncV2

type RetryableFuncV2[T any] func(context.Context) (T, error)

type StoreManager

type StoreManager struct {
	// contains filtered or unexported fields
}

func NewStoreManager

func NewStoreManager(pdCli pd.Client, kl keepalive.ClientParameters, tlsConf *tls.Config) *StoreManager

NewStoreManager create a new manager for gRPC connections to stores.

func (*StoreManager) Close

func (mgr *StoreManager) Close()

Close closes all client in Mgr.

func (*StoreManager) GetKeepalive

func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters

func (*StoreManager) PDClient

func (mgr *StoreManager) PDClient() pd.Client

func (*StoreManager) RemoveConn

func (mgr *StoreManager) RemoveConn(ctx context.Context, storeID uint64) error

func (*StoreManager) ResetBackupClient

func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)

ResetBackupClient reset the connection for backup client.

func (*StoreManager) TLSConfig

func (mgr *StoreManager) TLSConfig() *tls.Config

func (*StoreManager) TryWithConn

func (mgr *StoreManager) TryWithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn) error) error

func (*StoreManager) WithConn

func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error

type TaskRegister

type TaskRegister interface {
	// Close closes the background task if using RegisterTask
	// and revoke the lease.
	// NOTE: we don't close the etcd client here, call should do it.
	Close(ctx context.Context) (err error)
	// RegisterTask firstly put its key to PD with a lease,
	// and start to keepalive the lease in the background.
	// DO NOT mix calls to RegisterTask and RegisterTaskOnce.
	RegisterTask(c context.Context) error
	// RegisterTaskOnce put its key to PD with a lease if the key does not exist,
	// else we refresh the lease.
	// you have to call this method periodically to keep the lease alive.
	// DO NOT mix calls to RegisterTask and RegisterTaskOnce.
	RegisterTaskOnce(ctx context.Context) error
}

TaskRegister can register the task to PD with a lease.

func NewTaskRegister

func NewTaskRegister(client *clientv3.Client, tp RegisterTaskType, taskName string) TaskRegister

NewTaskRegister build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}

func NewTaskRegisterWithTTL

func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) TaskRegister

NewTaskRegisterWithTTL build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}

Directories

Path Synopsis
package storewatch provides a `Watcher` type which allows the user to listen the events of lifetime of stores.
package storewatch provides a `Watcher` type which allows the user to listen the events of lifetime of stores.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL