Documentation ¶
Index ¶
- Constants
- func ArchiveSize(meta *backuppb.BackupMeta) uint64
- func CheckGCSafePoint(ctx context.Context, pdClient pd.Client, ts uint64) error
- func CheckLogBackupEnabled(ctx sessionctx.Context) bool
- func CheckLogBackupTaskExist() bool
- func CheckStoreLiveness(s *metapb.Store) error
- func CloneSlice[T any](s []T) []T
- func CompareBytesExt(a []byte, aEmptyAsInf bool, b []byte, bEmptyAsInf bool) int
- func CompareEndKey(a, b []byte) int
- func EncloseDBAndTable(database, table string) string
- func EncloseName(name string) string
- func FallBack2CreateTable(err error) bool
- func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, ...) (*grpc.ClientConn, error)
- func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)
- func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)
- func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64
- func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64
- func GetSysDBCIStrName(tempDB model.CIStr) (model.CIStr, bool)
- func GetSysDBName(tempDB model.CIStr) (string, bool)
- func GetTidbNewCollationEnabled() string
- func HideSensitive(input string) string
- func IntersectAll(s1 []kv.KeyRange, s2 []kv.KeyRange) []kv.KeyRange
- func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error)
- func IsLogBackupInUse(ctx sessionctx.Context) bool
- func IsSysDB(dbLowerName string) bool
- func IsTemplateSysDB(dbname model.CIStr) bool
- func IsTypeCompatible(src types.FieldType, target types.FieldType) bool
- func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error)
- func LogBackupTaskCountDec()
- func LogBackupTaskCountInc()
- func LogEnvVariables()
- func MakeSafePointID() string
- func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)
- func MessageIsNotFoundStorageError(msg string) bool
- func MessageIsPermissionDeniedStorageError(msg string) bool
- func MessageIsRetryableStorageError(msg string) bool
- func NeedAutoID(tblInfo *model.TableInfo) bool
- func NextPowerOfTwo(i int64) int64
- func PanicToErr(err *error)
- func ParseKey(format, key string) ([]byte, error)
- func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error
- func StartDynamicPProfListener(tls *tidbutils.TLS)
- func StartPProfListener(statusAddr string, wrapper *tidbutils.TLS) error
- func StartServiceSafePointKeeper(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error
- func TemporaryDBName(db string) model.CIStr
- func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)
- func UpdateServiceSafePoint(ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint) error
- func WithRetry(ctx context.Context, retryableFunc RetryableFunc, backoffer Backoffer) error
- type BRServiceSafePoint
- type Backoffer
- type CDCNameSet
- type DBExecutor
- type Database
- type FakePDClient
- type Pool
- type ProgressPrinter
- type QueryExecutor
- type RegisterTask
- type RegisterTaskType
- type RegisterTasksList
- type RetryState
- func (rs *RetryState) Attempt() int
- func (rs *RetryState) ExponentialBackoff() time.Duration
- func (rs *RetryState) NextBackoff(error) time.Duration
- func (rs *RetryState) RecordRetry()
- func (rs *RetryState) ReduceRetry()
- func (rs *RetryState) RetryTimes() int
- func (rs *RetryState) ShouldRetry() bool
- func (rs *RetryState) StopRetry()
- type RetryWithBackoffer
- type RetryableFunc
- type StmtExecutor
- type StoreManager
- func (mgr *StoreManager) Close()
- func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters
- func (mgr *StoreManager) PDClient() pd.Client
- func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
- func (mgr *StoreManager) TLSConfig() *tls.Config
- func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error
- type TaskRegister
- type Worker
- type WorkerPool
- func (pool *WorkerPool) Apply(fn taskFunc)
- func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error)
- func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc)
- func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error)
- func (pool *WorkerPool) ApplyWorker() *Worker
- func (pool *WorkerPool) HasWorker() bool
- func (pool *WorkerPool) IdleCount() int
- func (pool *WorkerPool) Limit() int
- func (pool *WorkerPool) RecycleWorker(worker *Worker)
Constants ¶
const ( // region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV. FlashbackRetryTime = 3 FlashbackWaitInterval = 3 * time.Second FlashbackMaxWaitInterval = 15 * time.Second ChecksumRetryTime = 8 ChecksumWaitInterval = 1 * time.Second ChecksumMaxWaitInterval = 30 * time.Second )
const ( CDCPrefix = "/tidb/cdc/" ChangefeedPath = "/changefeed/info/" CDCPrefixV61 = "/tidb/cdc/changefeed/info/" )
const ( RegisterImportTaskPrefix = "/tidb/brie/import" RegisterRetryInternal = 10 * time.Second )
The key format should be {RegisterImportTaskPrefix}/{RegisterTaskType}/{taskName}
const ( // DefaultBRGCSafePointTTL means PD keep safePoint limit at least 5min. DefaultBRGCSafePointTTL = 5 * 60 // DefaultCheckpointGCSafePointTTL means PD keep safePoint limit at least 72 minutes. DefaultCheckpointGCSafePointTTL = 72 * 60 // DefaultStreamStartSafePointTTL specifies keeping the server safepoint 30 mins when start task. DefaultStreamStartSafePointTTL = 1800 // DefaultStreamPauseSafePointTTL specifies Keeping the server safePoint at list 24h when pause task. DefaultStreamPauseSafePointTTL = 24 * 3600 )
const DefaultGcRatioVal = "1.1"
Variables ¶
This section is empty.
Functions ¶
func ArchiveSize ¶
func ArchiveSize(meta *backuppb.BackupMeta) uint64
ArchiveSize returns the total size of the backup archive.
func CheckGCSafePoint ¶
CheckGCSafePoint checks whether the ts is older than GC safepoint. Note: It ignores errors other than exceed GC safepoint.
func CheckLogBackupEnabled ¶
func CheckLogBackupEnabled(ctx sessionctx.Context) bool
CheckLogBackupEnabled checks if LogBackup is enabled in cluster. this mainly used in three places. 1. GC worker resolve locks to scan more locks after safepoint. (every minute) 2. Add index skipping use lightning.(every add index ddl) 3. Telemetry of log backup feature usage (every 6 hours). NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster.
func CheckLogBackupTaskExist ¶
func CheckLogBackupTaskExist() bool
CheckLogBackupTaskExist checks that whether log-backup is existed.
func CheckStoreLiveness ¶
CheckStoreLiveness checks whether a store is still alive. Some versions of PD may not set the store state in the gRPC response. We need to check it manually.
func CompareBytesExt ¶
CompareBytesExt compare two byte sequences. different from `bytes.Compare`, we can provide whether to treat the key as inf when meet empty key to this.
func CompareEndKey ¶
CompareEndKey compared two keys that BOTH represent the EXCLUSIVE ending of some range. An empty end key is the very end, so an empty key is greater than any other keys. Please note that this function is not applicable if any one argument is not an EXCLUSIVE ending of a range.
func EncloseDBAndTable ¶
EncloseDBAndTable formats the database and table name in sql.
func FallBack2CreateTable ¶
func GRPCConn ¶
func GRPCConn(ctx context.Context, storeAddr string, tlsConf *tls.Config, opts ...grpc.DialOption) (*grpc.ClientConn, error)
func GetGcRatio ¶
func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error)
func GetRegionSplitInfo ¶
func GetRegionSplitInfo(ctx sqlexec.RestrictedSQLExecutor) (uint64, int64)
func GetSplitKeys ¶
func GetSplitKeys(ctx sqlexec.RestrictedSQLExecutor) int64
func GetSplitSize ¶
func GetSplitSize(ctx sqlexec.RestrictedSQLExecutor) uint64
func GetSysDBCIStrName ¶
GetSysDBCIStrName get the CIStr name of system DB
func GetSysDBName ¶
GetSysDBName get the original name of system DB
func GetTidbNewCollationEnabled ¶
func GetTidbNewCollationEnabled() string
GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled.
func HideSensitive ¶
HideSensitive replace password with ******.
func IntersectAll ¶
IntersectAll returns the intersect of two set of segments. OWNERSHIP INFORMATION: For running faster, this function would MUTATE the input slice. (i.e. takes its ownership.) (But it is promised that this function won't change the `start key` and `end key` slice) If you want to use the input slice after, call `CloneSlice` over arguments before passing them.
You can treat "set of segments" as points maybe not adjacent. in this way, IntersectAll(s1, s2) = { point | point in both s1 and s2 } Example: ranges: |___________| |________________| toClampIn: |_____| |____| |________________| result: |_____| |_| |______________| we are assuming the arguments are sorted by the start key and no overlaps. you can call spans.Collapse to get key ranges fits this requirements. Note: this algorithm is pretty like the `checkIntervalIsSubset`, can we get them together?
func IsLogBackupEnabled ¶
func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error)
IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. it should return error.
func IsLogBackupInUse ¶
func IsLogBackupInUse(ctx sessionctx.Context) bool
IsLogBackupInUse checks the log backup task existed.
func IsSysDB ¶
IsSysDB tests whether the database is system DB. Currently, the only system DB is mysql.
func IsTemplateSysDB ¶
IsTemplateSysDB checks wheterh the dbname is temporary system database(__TiDB_BR_Temporary_mysql).
func IsTypeCompatible ¶
IsTypeCompatible checks whether type target is compatible with type src they're compatible if - same null/not null and unsigned flag(maybe we can allow src not null flag, target null flag later) - have same evaluation type - target's flen and decimal should be bigger or equals to src's - elements in target is superset of elements in src if they're enum or set type - same charset and collate if they're string types
func LoadBackupTables ¶
func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error)
LoadBackupTables loads schemas from BackupMeta.
func LogBackupTaskCountDec ¶
func LogBackupTaskCountDec()
LogBackupTaskCountDec decreases the count of log backup task.
func LogBackupTaskCountInc ¶
func LogBackupTaskCountInc()
LogBackupTaskCountInc increases the count of log backup task.
func MakeSafePointID ¶
func MakeSafePointID() string
MakeSafePointID makes a unique safe point ID, for reduce name conflict.
func MarshalBackupMeta ¶
func MarshalBackupMeta(meta *backuppb.BackupMeta) ([]byte, error)
MarshalBackupMeta converts the backupmeta strcture to JSON. Unlike json.Marshal, this function also format some []byte fields for human reading.
func MessageIsNotFoundStorageError ¶
MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func MessageIsPermissionDeniedStorageError ¶
MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error
func MessageIsRetryableStorageError ¶
MessageIsRetryableStorageError checks whether the message returning from TiKV is retryable ExternalStorageError.
func NeedAutoID ¶
NeedAutoID checks whether the table needs backing up with an autoid.
func NextPowerOfTwo ¶
NextPowerOfTwo returns the smallest power of two greater than or equal to `i` Caller should guarantee that i > 0 and the return value is not overflow.
func PanicToErr ¶
func PanicToErr(err *error)
PanicToErr recovers when the execution get panicked, and set the error provided by the arg. generally, this would be used with named return value and `defer`, like:
func foo() (err error) { defer utils.PanicToErr(&err) return maybePanic() }
Before using this, there are some hints for reducing resource leakage or bugs:
- If any of clean work (by `defer`) relies on the error (say, when error happens, rollback some operations.), please place `defer this` AFTER that.
- All resources allocated should be freed by the `defer` syntax, or when panicking, they may not be recycled.
func SetGcRatio ¶
func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error
func StartDynamicPProfListener ¶
StartDynamicPProfListener starts the listener that will enable pprof when received `startPProfSignal`.
func StartPProfListener ¶
StartPProfListener forks a new goroutine listening on specified port and provide pprof info.
func StartServiceSafePointKeeper ¶
func StartServiceSafePointKeeper( ctx context.Context, pdClient pd.Client, sp BRServiceSafePoint, ) error
StartServiceSafePointKeeper will run UpdateServiceSafePoint periodicity hence keeping service safepoint won't lose.
func TemporaryDBName ¶
TemporaryDBName makes a 'private' database name.
func UnmarshalBackupMeta ¶
func UnmarshalBackupMeta(data []byte) (*backuppb.BackupMeta, error)
UnmarshalBackupMeta converts the prettied JSON format of backupmeta (made by MarshalBackupMeta) back to the go structure.
func UpdateServiceSafePoint ¶
UpdateServiceSafePoint register BackupTS to PD, to lock down BackupTS as safePoint with TTL seconds.
Types ¶
type BRServiceSafePoint ¶
BRServiceSafePoint is metadata of service safe point from a BR 'instance'.
func (BRServiceSafePoint) MarshalLogObject ¶
func (sp BRServiceSafePoint) MarshalLogObject(encoder zapcore.ObjectEncoder) error
MarshalLogObject implements zapcore.ObjectMarshaler.
type Backoffer ¶
type Backoffer interface { // NextBackoff returns a duration to wait before retrying again NextBackoff(err error) time.Duration // Attempt returns the remain attempt times Attempt() int }
Backoffer implements a backoff policy for retrying operations.
func NewBackoffer ¶
NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewDownloadSSTBackoffer ¶
func NewDownloadSSTBackoffer() Backoffer
func NewImportSSTBackoffer ¶
func NewImportSSTBackoffer() Backoffer
func NewPDReqBackoffer ¶
func NewPDReqBackoffer() Backoffer
func NewPDReqBackofferExt ¶
func NewPDReqBackofferExt() Backoffer
type CDCNameSet ¶
type CDCNameSet struct {
// contains filtered or unexported fields
}
CDCNameSet saves CDC changefeed's information. nameSet maps `cluster/namespace` to `changefeed`s
func GetCDCChangefeedNameSet ¶
GetCDCChangefeedNameSet gets CDC changefeed information and wraps them to a map for CDC >= v6.2, the etcd key format is /tidb/cdc/<clusterID>/<namespace>/changefeed/info/<changefeedID> for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/<changefeedID>
func (*CDCNameSet) Empty ¶
func (s *CDCNameSet) Empty() bool
that the nameSet is empty means no changefeed exists.
func (*CDCNameSet) MessageToUser ¶
func (s *CDCNameSet) MessageToUser() string
MessageToUser convert the map `nameSet` to a readable message to user.
type DBExecutor ¶
type DBExecutor interface { StmtExecutor BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) }
DBExecutor is a interface for statements and txn
type FakePDClient ¶
func (FakePDClient) GetAllStores ¶
func (c FakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error)
GetAllStores return fake stores.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.
func NewConnPool ¶
NewConnPool creates a new Pool by the specified conn factory function and capacity.
type ProgressPrinter ¶
type ProgressPrinter struct {
// contains filtered or unexported fields
}
ProgressPrinter prints a progress bar.
func NewProgressPrinter ¶
func NewProgressPrinter( name string, total int64, redirectLog bool, ) *ProgressPrinter
NewProgressPrinter returns a new progress printer.
func StartProgress ¶
func StartProgress( ctx context.Context, name string, total int64, redirectLog bool, log logFunc, ) *ProgressPrinter
StartProgress starts progress bar.
func (*ProgressPrinter) Close ¶
func (pp *ProgressPrinter) Close()
Close closes the current progress bar.
func (*ProgressPrinter) GetCurrent ¶
func (pp *ProgressPrinter) GetCurrent() int64
GetCurrent get the current progress.
func (*ProgressPrinter) Inc ¶
func (pp *ProgressPrinter) Inc()
Inc increases the current progress bar.
func (*ProgressPrinter) IncBy ¶
func (pp *ProgressPrinter) IncBy(cnt int64)
IncBy implements glue.Progress
type QueryExecutor ¶
type QueryExecutor interface { QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row }
QueryExecutor is a interface for exec query
type RegisterTask ¶
RegisterTask saves the task's information
func (RegisterTask) MessageToUser ¶
func (task RegisterTask) MessageToUser() string
MessageToUser marshal the task to user message
type RegisterTaskType ¶
type RegisterTaskType int
RegisterTaskType for the sub-prefix path for key
const ( RegisterRestore RegisterTaskType = iota RegisterLightning )
func (RegisterTaskType) String ¶
func (tp RegisterTaskType) String() string
type RegisterTasksList ¶
type RegisterTasksList struct {
Tasks []RegisterTask
}
func GetImportTasksFrom ¶
GetImportTasksFrom try to get all the import tasks with prefix `RegisterTaskPrefix`
func (RegisterTasksList) Empty ¶
func (list RegisterTasksList) Empty() bool
func (RegisterTasksList) MessageToUser ¶
func (list RegisterTasksList) MessageToUser() string
type RetryState ¶
type RetryState struct {
// contains filtered or unexported fields
}
RetryState is the mutable state needed for retrying. It likes the `utils.Backoffer`, but more fundamental: this only control the backoff time and knows nothing about what error happens. NOTE: Maybe also implement the backoffer via this.
func InitialRetryState ¶
func InitialRetryState(maxRetryTimes int, initialBackoff, maxBackoff time.Duration) RetryState
InitialRetryState make the initial state for retrying.
func (*RetryState) Attempt ¶
func (rs *RetryState) Attempt() int
Attempt implements the `Backoffer`. TODO: Maybe use this to replace the `exponentialBackoffer` (which is nearly homomorphic to this)?
func (*RetryState) ExponentialBackoff ¶
func (rs *RetryState) ExponentialBackoff() time.Duration
Get the exponential backoff durion and transform the state.
func (*RetryState) NextBackoff ¶
func (rs *RetryState) NextBackoff(error) time.Duration
NextBackoff implements the `Backoffer`.
func (*RetryState) RecordRetry ¶
func (rs *RetryState) RecordRetry()
RecordRetry simply record retry times, and no backoff
func (*RetryState) ReduceRetry ¶
func (rs *RetryState) ReduceRetry()
ReduceRetry reduces retry times for 1.
func (*RetryState) RetryTimes ¶
func (rs *RetryState) RetryTimes() int
RetryTimes returns the retry times. usage: unit test.
func (*RetryState) ShouldRetry ¶
func (rs *RetryState) ShouldRetry() bool
Whether in the current state we can retry.
func (*RetryState) StopRetry ¶
func (rs *RetryState) StopRetry()
type RetryWithBackoffer ¶
type RetryWithBackoffer struct {
// contains filtered or unexported fields
}
RetryWithBackoffer is a simple context for a "mixed" retry. Some of TiDB APIs, say, `ResolveLock` requires a `tikv.Backoffer` as argument. But the `tikv.Backoffer` isn't pretty customizable, it has some sorts of predefined configuration but we cannot create new one. So we are going to mix up the flavour of `tikv.Backoffer` and our homemade back off strategy. That is what the `RetryWithBackoffer` did.
func AdaptTiKVBackoffer ¶
func AdaptTiKVBackoffer(ctx context.Context, maxSleepMs int, baseErr error) *RetryWithBackoffer
AdaptTiKVBackoffer creates an "ad-hoc" backoffer, which wraps a backoffer and provides some new functions: When backing off, we can manually provide it a specified sleep duration instead of directly provide a retry.Config Which is sealed in the "client-go/internal".
func (*RetryWithBackoffer) BackOff ¶
func (r *RetryWithBackoffer) BackOff() error
BackOff executes the back off: sleep for a precalculated backoff time. See `RequestBackOff` for more details.
func (*RetryWithBackoffer) Inner ¶
func (r *RetryWithBackoffer) Inner() *tikv.Backoffer
Inner returns the reference to the inner `backoffer`.
func (*RetryWithBackoffer) MaxSleepInMS ¶
func (r *RetryWithBackoffer) MaxSleepInMS() int
MaxSleepInMS returns the max sleep time for the retry context in ms.
func (*RetryWithBackoffer) NextSleepInMS ¶
func (r *RetryWithBackoffer) NextSleepInMS() int
NextSleepInMS returns the time `BackOff` will sleep in ms of the state.
func (*RetryWithBackoffer) RequestBackOff ¶
func (r *RetryWithBackoffer) RequestBackOff(ms int)
RequestBackOff register the intent of backing off at least n milliseconds. That intent will be fulfilled when calling `BackOff`.
func (*RetryWithBackoffer) TotalSleepInMS ¶
func (r *RetryWithBackoffer) TotalSleepInMS() int
TotalSleepInMS returns the total sleeped time in ms.
type StmtExecutor ¶
type StmtExecutor interface { QueryExecutor ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) }
StmtExecutor define both query and exec methods
type StoreManager ¶
type StoreManager struct {
// contains filtered or unexported fields
}
func NewStoreManager ¶
func NewStoreManager(pdCli pd.Client, kl keepalive.ClientParameters, tlsConf *tls.Config) *StoreManager
NewStoreManager create a new manager for gRPC connections to stores.
func (*StoreManager) GetKeepalive ¶
func (mgr *StoreManager) GetKeepalive() keepalive.ClientParameters
func (*StoreManager) PDClient ¶
func (mgr *StoreManager) PDClient() pd.Client
func (*StoreManager) ResetBackupClient ¶
func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error)
ResetBackupClient reset the connection for backup client.
func (*StoreManager) TLSConfig ¶
func (mgr *StoreManager) TLSConfig() *tls.Config
func (*StoreManager) WithConn ¶
func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error
type TaskRegister ¶
type TaskRegister struct {
// contains filtered or unexported fields
}
TaskRegister can register the task to PD with a lease, and keepalive it in the background
func NewTaskRegister ¶
func NewTaskRegister(client *clientv3.Client, tp RegisterTaskType, taskName string) *TaskRegister
NewTaskRegister build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func NewTaskRegisterWithTTL ¶
func NewTaskRegisterWithTTL(client *clientv3.Client, ttl time.Duration, tp RegisterTaskType, taskName string) *TaskRegister
NewTaskRegisterWithTTL build a TaskRegister with key format {RegisterTaskPrefix}/{RegisterTaskType}/{taskName}
func (*TaskRegister) Close ¶
func (tr *TaskRegister) Close(ctx context.Context) (err error)
Close closes the background task of taskRegister
func (*TaskRegister) RegisterTask ¶
func (tr *TaskRegister) RegisterTask(c context.Context) error
RegisterTask firstly put its key to PD with a lease, and start to keepalive the lease in the background.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool contains a pool of workers.
func NewWorkerPool ¶
func NewWorkerPool(limit uint, name string) *WorkerPool
NewWorkerPool returns a WorkPool.
func (*WorkerPool) ApplyOnErrorGroup ¶
func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error)
ApplyOnErrorGroup executes a task in an errorgroup.
func (*WorkerPool) ApplyWithID ¶
func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc)
ApplyWithID execute a task and provides it with the worker ID.
func (*WorkerPool) ApplyWithIDInErrorGroup ¶
func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error)
ApplyWithIDInErrorGroup executes a task in an errorgroup and provides it with the worker ID.
func (*WorkerPool) ApplyWorker ¶
func (pool *WorkerPool) ApplyWorker() *Worker
ApplyWorker apply a worker.
func (*WorkerPool) HasWorker ¶
func (pool *WorkerPool) HasWorker() bool
HasWorker checks if the pool has unallocated workers.
func (*WorkerPool) IdleCount ¶
func (pool *WorkerPool) IdleCount() int
IdleCount counts how many idle workers in the pool.
func (*WorkerPool) RecycleWorker ¶
func (pool *WorkerPool) RecycleWorker(worker *Worker)
RecycleWorker recycle a worker.