utils

package
v7.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2023 License: Apache-2.0, Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min.
	DefaultBRGCSafePointTTL = 5 * 60
	// DefaultStreamStartSafePointTTL specifies keeping the server safepoint 30 mins when start task.
	DefaultStreamStartSafePointTTL = 1800
	// DefaultStreamPauseSafePointTTL specifies Keeping the server safePoint at list 24h when pause task.
	DefaultStreamPauseSafePointTTL = 24 * 3600
)

Variables

View Source
var (
	LogBackupTaskMutex sync.Mutex
)

Functions

func ArchiveSize

func ArchiveSize(meta *backuppb.BackupMeta) uint64

ArchiveSize returns the total size of the backup archive.

func CheckGCSafePoint

func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error

CheckGCSafePoint checks whether the ts is older than GC safepoint. Note: It ignores errors other than exceed GC safepoint.

func CheckLogBackupEnabled

func CheckLogBackupEnabled(ctx sessionctx.Context) bool

CheckLogBackupEnabled checks if LogBackup is enabled in cluster. this mainly used in three places. 1. GC worker resolve locks to scan more locks after safepoint. (every minute) 2. Add index skipping use lightning.(every add index ddl) 3. Telemetry of log backup feature usage (every 6 hours). NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster.

func CheckLogBackupTaskExist

func CheckLogBackupTaskExist() bool

CheckLogBackupTaskExist checks that whether log-backup is existed.

func CloneSlice

func CloneSlice[T any](s []T) []T

CloneSlice sallowly clones a slice.

func CompareBytesExt

func CompareBytesExt(a []byte, aEmptyAsInf bool, b []byte, bEmptyAsInf bool) int

CompareBytesExt compare two byte sequences. different from `bytes.Compare`, we can provide whether to treat the key as inf when meet empty key to this.

func CompareEndKey

func CompareEndKey(a, b []byte) int

CompareEndKey compared two keys that BOTH represent the EXCLUSIVE ending of some range. An empty end key is the very end, so an empty key is greater than any other keys. Please note that this function is not applicable if any one argument is not an EXCLUSIVE ending of a range.

func EncloseDBAndTable

func EncloseDBAndTable(database, table string) string

EncloseDBAndTable formats the database and table name in sql.

func EncloseName

func EncloseName(name string) string

EncloseName formats name in sql.

func FallBack2CreateTable

func FallBack2CreateTable(err error) bool

func GRPCConn

func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, opts ...grpc.DialOption) (*grpc.ClientConn, error)

func GetGcRatio

func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)

func GetRegionSplitInfo

func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)

func GetSplitKeys

func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64

func GetSplitSize

func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64

func GetSysDBCIStrName

func GetSysDBCIStrName(tempDB model.CIStr) (model.CIStr, bool)

GetSysDBCIStrName get the CIStr name of system DB

func GetSysDBName

func GetSysDBName(tempDB model.CIStr) (string, bool)

GetSysDBName get the original name of system DB

func GetTidbNewCollationEnabled

func GetTidbNewCollationEnabled() string

GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled.

func HideSensitive

func HideSensitive(input string) string

HideSensitive replace password with ******.

func IntersectAll

func IntersectAll(s1 []kv.KeyRange, s2 []kv.KeyRange) []kv.KeyRange

IntersectAll returns the intersect of two set of segments. OWNERSHIP INFORMATION: For running faster, this function would MUTATE the input slice. (i.e. takes its ownership.) (But it is promised that this function won't change the `start key` and `end key` slice) If you want to use the input slice after, call `CloneSlice` over arguments before passing them.

You can treat "set of segments" as points maybe not adjacent. in this way, IntersectAll(s1, s2) = { point | point in both s1 and s2 } Example: ranges: |___________| |________________| toClampIn: |_____| |____| |________________| result: |_____| |_| |______________| we are assuming the arguments are sorted by the start key and no overlaps. you can call spans.Collapse to get key ranges fits this requirements. Note: this algorithm is pretty like the `checkIntervalIsSubset`, can we get them together?

func IsLogBackupEnabled

func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error)

IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. it should return error.

func IsLogBackupInUse

func IsLogBackupInUse(ctx sessionctx.Context) bool

IsLogBackupInUse checks the log backup task existed.

func IsSysDB

func IsSysDB(dbLowerName string) bool

IsSysDB tests whether the database is system DB. Currently, the only system DB is mysql.

func IsTypeCompatible

func IsTypeCompatible(src types.FieldType, target types.FieldType) bool

IsTypeCompatible checks whether type target is compatible with type src they're compatible if - same null/not null and unsigned flag(maybe we can allow src not null flag, target null flag later) - have same evaluation type - target's flen and decimal should be bigger or equals to src's - elements in target is superset of elements in src if they're enum or set type - same charset and collate if they're string types

func LoadBackupTables

func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error)

LoadBackupTables loads schemas from BackupMeta.

func LogBackupTaskCountDec

func LogBackupTaskCountDec()

LogBackupTaskCountDec decreases the count of log backup task.

func LogBackupTaskCountInc

func LogBackupTaskCountInc()

LogBackupTaskCountInc increases the count of log backup task.

func LogEnvVariables

func LogEnvVariables()

LogEnvVariables logs related environment variables.

func MakeSafePointID

func MakeSafePointID() string

MakeSafePointID makes a unique safe point ID, for reduce name conflict.

func MarshalBackupMeta

func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)

MarshalBackupMeta converts the backupmeta strcture to JSON. Unlike json.Marshal, this function also format some []byte fields for human reading.

func MessageIsNotFoundStorageError

func MessageIsNotFoundStorageError(msg string) bool

MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error

func MessageIsPermissionDeniedStorageError

func MessageIsPermissionDeniedStorageError(msg string) bool

MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error

func MessageIsRetryableStorageError

func MessageIsRetryableStorageError(msg string) bool

MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.

func NeedAutoID

func NeedAutoID(tblInfo *model.TableInfo) bool

NeedAutoID checks whether the table needs backing up with an autoid.

func NextPowerOfTwo

func NextPowerOfTwo(i int64) int64

NextPowerOfTwo returns the smallest power of two greater than or equal to `i` Caller should guarantee that i > 0 and the return value is not overflow.

func PanicToErr

func PanicToErr(err *error)

PanicToErr recovers when the execution get panicked, and set the error provided by the arg. generally, this would be used with named return value and `defer`, like:

func foo() (err error) {
  defer utils.PanicToErr(&err)
  return maybePanic()
}

Before using this, there are some hints for reducing resource leakage or bugs:

  • If any of clean work (by `defer`) relies on the error (say, when error happens, rollback some operations.), please place `defer this` AFTER that.
  • All resources allocated should be freed by the `defer` syntax, or when panicking, they may not be recycled.

func ParseKey

func ParseKey(format, key string) ([]byte, error)

ParseKey parse key by given format.

func SetGcRatio

func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error

func StartDynamicPProfListener

func StartDynamicPProfListener(tls *tidbutils.TLS)

StartDynamicPProfListener starts the listener that will enable pprof when received `startPProfSignal`.

func StartPProfListener

func StartPProfListener(statusAddr string, wrapper *tidbutils.TLS) error

StartPProfListener forks a new goroutine listening on specified port and provide pprof info.

func StartServiceSafePointKeeper

func StartServiceSafePointKeeper(
	ctx context.Context,
	pdClient pd.Client,
	sp BRServiceSafePoint,
) error

StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity hence keeping service safepoint won't lose.

func TemporaryDBName

func TemporaryDBName(db string) model.CIStr

TemporaryDBName makes a 'private' database name.

func UnmarshalBackupMeta

func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)

UnmarshalBackupMeta converts the prettied JSON format of backupmeta (made by MarshalBackupMeta) back to the go structure.

func UpdateServiceSafePoint

func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error

UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.

func WithRetry

func WithRetry(
	ctx context.Context,
	retryableFunc RetryableFunc,
	backoffer Backoffer,
) error

WithRetry retries a given operation with a backoff policy.

Returns nil if `retryableFunc` succeeded at least once. Otherwise, returns a multierr containing all errors encountered.

Types

type BRServiceSafePoint

type BRServiceSafePoint struct {
	ID       string
	TTL      int64
	BackupTS uint64
}

BRServiceSafePoint is metadata of service safe point from a BR 'instance'.

func (BRServiceSafePoint) MarshalLogObject

func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error

MarshalLogObject implements zapcore.ObjectMarshaler.

type Backoffer

type Backoffer interface {
	// NextBackoff returns a duration to wait before retrying again
	NextBackoff(err error) time.Duration
	// Attempt returns the remain attempt times
	Attempt() int
}

Backoffer implements a backoff policy for retrying operations.

func NewBackoffer

func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer

NewBackoffer creates a new controller regulating a truncated exponential backoff.

func NewDownloadSSTBackoffer

func NewDownloadSSTBackoffer() Backoffer

func NewImportSSTBackoffer

func NewImportSSTBackoffer() Backoffer

func NewPDReqBackoffer

func NewPDReqBackoffer() Backoffer

func NewPDReqBackofferExt

func NewPDReqBackofferExt() Backoffer

type DBExecutor

type DBExecutor interface {
	StmtExecutor
	BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
}

DBExecutor is a interface for statements and txn

type Database

type Database struct {
	Info   *model.DBInfo
	Tables []*metautil.Table
}

Database wraps the schema and tables of a database.

func (*Database) GetTable

func (db *Database) GetTable(name string) *metautil.Table

GetTable returns a table of the database by name.

type FakePDClient

type FakePDClient struct {
	pd.Client
	Stores []*metapb.Store
}

func (FakePDClient) GetAllStores

func (c FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error)

GetAllStores return fake stores.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.

func NewConnPool

func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) *Pool

NewConnPool creates a new Pool by the specified conn factory function and capacity.

func (*Pool) Close

func (p *Pool) Close()

Close closes the conn pool.

func (*Pool) Get

func (p *Pool) Get(ctx context.Context) (*grpc.ClientConn, error)

Get tries to get an existing connection from the pool, or make a new one if the pool not full.

type ProgressPrinter

type ProgressPrinter struct {
	// contains filtered or unexported fields
}

ProgressPrinter prints a progress bar.

func NewProgressPrinter

func NewProgressPrinter(
	name string,
	total int64,
	redirectLog bool,
) *ProgressPrinter

NewProgressPrinter returns a new progress printer.

func StartProgress

func StartProgress(
	ctx context.Context,
	name string,
	total int64,
	redirectLog bool,
	log logFunc,
) *ProgressPrinter

StartProgress starts progress bar.

func (*ProgressPrinter) Close

func (pp *ProgressPrinter) Close()

Close closes the current progress bar.

func (*ProgressPrinter) GetCurrent

func (pp *ProgressPrinter) GetCurrent() int64

GetCurrent get the current progress.

func (*ProgressPrinter) Inc

func (pp *ProgressPrinter) Inc()

Inc increases the current progress bar.

func (*ProgressPrinter) IncBy

func (pp *ProgressPrinter) IncBy(cnt int64)

IncBy implements glue.Progress

type QueryExecutor

type QueryExecutor interface {
	QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

QueryExecutor is a interface for exec query

type RetryState

type RetryState struct {
	// contains filtered or unexported fields
}

RetryState is the mutable state needed for retrying. It likes the `utils.Backoffer`, but more fundamental: this only control the backoff time and knows nothing about what error happens. NOTE: Maybe also implement the backoffer via this.

func InitialRetryState

func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState

InitialRetryState make the initial state for retrying.

func (*RetryState) Attempt

func (rs *RetryState) Attempt() int

Attempt implements the `Backoffer`. TODO: Maybe use this to replace the `exponentialBackoffer` (which is nearly homomorphic to this)?

func (*RetryState) ExponentialBackoff

func (rs *RetryState) ExponentialBackoff() time.Duration

Get the exponential backoff durion and transform the state.

func (*RetryState) NextBackoff

func (rs *RetryState) NextBackoff(error) time.Duration

NextBackoff implements the `Backoffer`.

func (*RetryState) RecordRetry

func (rs *RetryState) RecordRetry()

RecordRetry simply record retry times, and no backoff

func (*RetryState) ShouldRetry

func (rs *RetryState) ShouldRetry() bool

Whether in the current state we can retry.

type RetryableFunc

type RetryableFunc func() error

RetryableFunc presents a retryable operation.

type StmtExecutor

type StmtExecutor interface {
	QueryExecutor
	ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}

StmtExecutor define both query and exec methods

type StoreManager

type StoreManager struct {
	// contains filtered or unexported fields
}

func NewStoreManager

func NewStoreManager(pdCli pd.Client, kl keepalive.ClientParameters, tlsConf *tls.Config) *StoreManager

NewStoreManager create a new manager for gRPC connections to stores.

func (*StoreManager) Close

func (mgr *StoreManager) Close()

Close closes all client in Mgr.

func (*StoreManager) GetKeepalive

func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters

func (*StoreManager) PDClient

func (mgr *StoreManager) PDClient() pd.Client

func (*StoreManager) ResetBackupClient

func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)

ResetBackupClient reset the connection for backup client.

func (*StoreManager) TLSConfig

func (mgr *StoreManager) TLSConfig() *tls.Config

func (*StoreManager) WithConn

func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error

type Worker

type Worker struct {
	ID uint64
}

Worker identified by ID.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool contains a pool of workers.

func NewWorkerPool

func NewWorkerPool(limit uint, name string) *WorkerPool

NewWorkerPool returns a WorkPool.

func (*WorkerPool) Apply

func (pool *WorkerPool) Apply(fn taskFunc)

Apply executes a task.

func (*WorkerPool) ApplyOnErrorGroup

func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error)

ApplyOnErrorGroup executes a task in an errorgroup.

func (*WorkerPool) ApplyWithID

func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc)

ApplyWithID execute a task and provides it with the worker ID.

func (*WorkerPool) ApplyWithIDInErrorGroup

func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error)

ApplyWithIDInErrorGroup executes a task in an errorgroup and provides it with the worker ID.

func (*WorkerPool) ApplyWorker

func (pool *WorkerPool) ApplyWorker() *Worker

ApplyWorker apply a worker.

func (*WorkerPool) HasWorker

func (pool *WorkerPool) HasWorker() bool

HasWorker checks if the pool has unallocated workers.

func (*WorkerPool) IdleCount

func (pool *WorkerPool) IdleCount() int

IdleCount counts how many idle workers in the pool.

func (*WorkerPool) Limit

func (pool *WorkerPool) Limit() int

Limit is the limit of the pool

func (*WorkerPool) RecycleWorker

func (pool *WorkerPool) RecycleWorker(worker *Worker)

RecycleWorker recycle a worker.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL