util

package
v1.1.0-beta.0...-3ac2b49 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2024 License: Apache-2.0 Imports: 68 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// NewSessionDefaultRetryCnt is the default retry times when create new session.
	NewSessionDefaultRetryCnt = 3
	// NewSessionRetryUnlimited is the unlimited retry times when create new session.
	NewSessionRetryUnlimited = math.MaxInt64
)
View Source
const (
	// DefaultMaxRetries indicates the max retry count.
	DefaultMaxRetries = 30
	// RetryInterval indicates retry interval.
	RetryInterval uint64 = 500
)
View Source
const (
	// Country is type name for country.
	Country = "C"
	// Organization is type name for organization.
	Organization = "O"
	// OrganizationalUnit is type name for organizational unit.
	OrganizationalUnit = "OU"
	// Locality is type name for locality.
	Locality = "L"
	// Email is type name for email.
	Email = "emailAddress"
	// CommonName is type name for common name.
	CommonName = "CN"
	// Province is type name for province or state.
	Province = "ST"
)
View Source
const (
	// URI indicates uri info in SAN.
	URI = SANType("URI")
	// DNS indicates dns info in SAN.
	DNS = SANType("DNS")
	// IP indicates ip info in SAN.
	IP = SANType("IP")
)
View Source
const (
	// SyntaxErrorPrefix is the common prefix for SQL syntax error in TiDB.
	SyntaxErrorPrefix = "You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use"
)

Variables

View Source
var (
	// InformationSchemaName is the `INFORMATION_SCHEMA` database name.
	InformationSchemaName = pmodel.NewCIStr("INFORMATION_SCHEMA")
	// PerformanceSchemaName is the `PERFORMANCE_SCHEMA` database name.
	PerformanceSchemaName = pmodel.NewCIStr("PERFORMANCE_SCHEMA")
	// MetricSchemaName is the `METRICS_SCHEMA` database name.
	MetricSchemaName = pmodel.NewCIStr("METRICS_SCHEMA")
	// ClusterTableInstanceColumnName is the `INSTANCE` column name of the cluster table.
	ClusterTableInstanceColumnName = "INSTANCE"
)
View Source
var (
	Version   = "None"
	BuildTS   = "None"
	GitHash   = "None"
	GitBranch = "None"
)

Version information.

View Source
var GetSequenceByName func(is infoschema.MetaOnlyInfoSchema, schema, sequence pmodel.CIStr) (SequenceTable, error)

GetSequenceByName could be used in expression package without import cycle problem.

View Source
var SupportCipher = make(map[string]struct{}, len(tlsCipherString))

SupportCipher maintains cipher supported by TiDB.

Functions

func CheckIfSameCluster

func CheckIfSameCluster(
	ctx context.Context,
	pdAddrsGetter, pdAddrsGetter2 func(context.Context) ([]string, error),
) (bool, []string, []string, error)

CheckIfSameCluster reads PD addresses registered in etcd from two sources, to check if there are common addresses in both sources. If there are common addresses, the first return value is true which means we have confidence that the two sources are in the same cluster. If there are no common addresses, the first return value is false, which means 1) the two sources are in different clusters, or 2) the two sources may be in the same cluster but the getter function does not return the common addresses.

The getters should keep the same format of the returned addresses, like both have URL scheme or not.

The second and third return values are the PD addresses from the first and second getters respectively. The fourth return value is the error occurred.

func CheckSupportX509NameOneline

func CheckSupportX509NameOneline(oneline string) (err error)

CheckSupportX509NameOneline parses and validate input str is X509_NAME_oneline format and precheck check-item is supported by TiDB https://www.openssl.org/docs/manmaster/man3/X509_NAME_oneline.html

func ClientWithTLS

func ClientWithTLS(tlsCfg *tls.Config) *http.Client

ClientWithTLS creates a http client wit tls

func ColumnToProto

func ColumnToProto(c *model.ColumnInfo, forIndex bool, isTiFlashStore bool) *tipb.ColumnInfo

ColumnToProto converts model.ColumnInfo to tipb.ColumnInfo.

func ColumnsToProto

func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool, forIndex bool, isTiFlashStore bool) []*tipb.ColumnInfo

ColumnsToProto converts a slice of model.ColumnInfo to a slice of tipb.ColumnInfo.

func ComposeURL

func ComposeURL(address, path string) string

ComposeURL adds HTTP schema if missing and concats address with path

func CreateCertificates

func CreateCertificates(certpath string, keypath string, rsaKeySize int, pubKeyAlgo x509.PublicKeyAlgorithm,
	signAlgo x509.SignatureAlgorithm) error

CreateCertificates creates and writes a cert based on the params.

func DelKeyWithPrefix

func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error

DelKeyWithPrefix deletes keys with prefix.

func FmtNonASCIIPrintableCharToHex

func FmtNonASCIIPrintableCharToHex(str string) string

FmtNonASCIIPrintableCharToHex turns non-printable-ASCII characters into Hex

func FormatLeaseID

func FormatLeaseID(id clientv3.LeaseID) string

FormatLeaseID formats lease id to hex string as what etcdctl does. see https://github.com/etcd-io/etcd/blob/995027f5c1363404e86f7a858ea2833df01f0954/etcdctl/ctlv3/command/printer_simple.go#L118

func GenLogFields

func GenLogFields(costTime time.Duration, info *ProcessInfo, needTruncateSQL bool) []zap.Field

GenLogFields generate log fields.

func GenRLimit

func GenRLimit(source string) uint64

GenRLimit get RLIMIT_NOFILE limit

func GetCPUPercentage

func GetCPUPercentage() float64

GetCPUPercentage calculates CPU usage and returns percentage in float64(e.g. 2.5 means 2.5%). http://man7.org/linux/man-pages/man2/getrusage.2.html

func GetGOGC

func GetGOGC() int

GetGOGC returns the current value of GOGC.

func GetJSON

func GetJSON(client *http.Client, url string, v any) error

GetJSON fetches a page and parses it as JSON. The parsed result will be stored into the `v`. The variable `v` must be a pointer to a type that can be unmarshalled from JSON.

Example:

client := &http.Client{}
var resp struct { IP string }
if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil {
	return errors.Trace(err)
}
fmt.Println(resp.IP)

nolint:unused

func GetLocalIP

func GetLocalIP() string

GetLocalIP will return a local IP(non-loopback, non 0.0.0.0), if there is one

func GetPDsAddrWithoutScheme

func GetPDsAddrWithoutScheme(db *sql.DB) func(context.Context) ([]string, error)

GetPDsAddrWithoutScheme returns a function that read all PD nodes' first etcd client URL by SQL query. This is done by query INFORMATION_SCHEMA.CLUSTER_INFO table and its executor memtableRetriever.dataForTiDBClusterInfo.

func GetRawInfo

func GetRawInfo(app string) string

GetRawInfo do what its name tells nolint:unused

func GetRecoverError

func GetRecoverError(r any) error

GetRecoverError gets the error from recover.

func GetValuesList

func GetValuesList(lower, upper []byte, num int, valuesList [][]byte) [][]byte

GetValuesList is used to get `num` values between lower and upper value. To Simplify the explain, suppose lower and upper value type is int64, and lower=0, upper=100, num=10, then calculate the step=(upper-lower)/num=10, then the function should return 0+10, 10+10, 20+10... all together 9 (num-1) values. Then the function will return [10,20,30,40,50,60,70,80,90]. The difference is the value type of upper, lower is []byte, So I use getUint64FromBytes to convert []byte to uint64.

func HasCancelled

func HasCancelled(ctx context.Context) (cancel bool)

HasCancelled checks whether context has be cancelled.

func InternalHTTPClient

func InternalHTTPClient() *http.Client

InternalHTTPClient is used by TiDB-Server to request other components.

func InternalHTTPSchema

func InternalHTTPSchema() string

InternalHTTPSchema specifies use http or https to request other components.

func IsInCorrectIdentifierName

func IsInCorrectIdentifierName(name string) bool

IsInCorrectIdentifierName checks if the identifier is incorrect. See https://dev.mysql.com/doc/refman/5.7/en/identifiers.html

func IsMemDB

func IsMemDB(dbLowerName string) bool

IsMemDB checks whether dbLowerName is memory database.

func IsMemOrSysDB

func IsMemOrSysDB(dbLowerName string) bool

IsMemOrSysDB uses to check whether dbLowerName is memory database or system database.

func IsSysDB

func IsSysDB(dbLowerName string) bool

IsSysDB checks whether dbLowerName is system database.

func IsSystemView

func IsSystemView(dbLowerName string) bool

IsSystemView is similar to IsMemOrSyDB, but does not include the mysql schema

func LoadTLSCertificates

func LoadTLSCertificates(ca, key, cert string, autoTLS bool, rsaKeySize int) (tlsConfig *tls.Config, autoReload bool, err error)

LoadTLSCertificates loads CA/KEY/CERT for special paths.

func MockPkixAttribute

func MockPkixAttribute(name, value string) pkix.AttributeTypeAndValue

MockPkixAttribute generates mock AttributeTypeAndValue. only used for test.

func NewSession

func NewSession(ctx context.Context, logPrefix string, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error)

NewSession creates a new etcd session.

func NewTCPConnWithIOCounter

func NewTCPConnWithIOCounter(conn *net.TCPConn, c *atomic.Uint64) net.Conn

NewTCPConnWithIOCounter creates a new TCPConnWithIOCounter.

func NewTLSConfig

func NewTLSConfig(opts ...TLSConfigOption) (*tls.Config, error)

NewTLSConfig creates a tls.Config from the given options. If no certificate is provided, it will return (nil, nil).

func OriginError

func OriginError(err error) error

OriginError return original err

func ParseAndCheckSAN

func ParseAndCheckSAN(san string) (map[SANType][]string, error)

ParseAndCheckSAN parses and check SAN str.

func ParseHostPortAddr

func ParseHostPortAddr(s string) ([]string, error)

ParseHostPortAddr returns a scheme://host:port or host:port list

func PrintInfo

func PrintInfo(app string)

PrintInfo prints the app's basic information in log nolint:unused

func PrintableASCII

func PrintableASCII(b byte) bool

PrintableASCII detects if b is a printable ASCII character. Ref to:http://facweb.cs.depaul.edu/sjost/it212/documents/ascii-pr.htm

func ReadLine

func ReadLine(reader *bufio.Reader, maxLineSize int) ([]byte, error)

ReadLine tries to read a complete line from bufio.Reader. maxLineSize specifies the maximum size of a single line.

func ReadLines

func ReadLines(reader *bufio.Reader, count int, maxLineSize int) ([][]byte, error)

ReadLines tries to read lines from bufio.Reader. count specifies the number of lines. maxLineSize specifies the maximum size of a single line.

func Recover

func Recover(metricsLabel, funcInfo string, recoverFn func(), quit bool)

Recover includes operations such as recovering, clearing,and printing information. It will dump current goroutine stack into log if catch any recover result.

metricsLabel: The label of PanicCounter metrics.
funcInfo:     Some information for the panic function.
recoverFn:    Handler will be called after recover and before dump stack, passing `nil` means noop.
quit:         If this value is true, the current program exits after recovery.

func RowKeyPrefixFilter

func RowKeyPrefixFilter(rowKeyPrefix kv.Key) kv.FnKeyCmp

RowKeyPrefixFilter returns a function which checks whether currentKey has decoded rowKeyPrefix as prefix.

func RunWithRetry

func RunWithRetry(retryCnt int, backoff uint64, f func() (bool, error)) (err error)

RunWithRetry will run the f with backoff and retry. retryCnt: Max retry count backoff: When run f failed, it will sleep backoff * triedCount time.Millisecond. Function f should have two return value. The first one is an bool which indicate if the err if retryable. The second is if the f meet any error.

func ScanMetaWithPrefix

func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error

ScanMetaWithPrefix scans metadata with the prefix.

func SetGOGC

func SetGOGC(val int) int

SetGOGC update GOGC and related metrics.

func SliceToMap

func SliceToMap(slice []string) map[string]any

SliceToMap converts slice to map nolint:unused

func Str2Int64Map

func Str2Int64Map(str string) map[int64]struct{}

Str2Int64Map converts a string to a map[int64]struct{}.

func StringsToInterfaces

func StringsToInterfaces(strs []string) []any

StringsToInterfaces converts string slice to interface slice

func SyntaxError

func SyntaxError(err error) error

SyntaxError converts parser error to TiDB's syntax error.

func SyntaxWarn

func SyntaxWarn(err error) error

SyntaxWarn converts parser warn to TiDB's syntax warn.

func TLSCipher2String

func TLSCipher2String(n uint16) string

TLSCipher2String convert tls num to string. Taken from https://testssl.sh/openssl-rfc.mapping.html .

func ToTLSConfig

func ToTLSConfig(caPath, certPath, keyPath string) (*tls.Config, error)

ToTLSConfig generates tls's config.

func ToTLSConfigWithVerify

func ToTLSConfigWithVerify(caPath, certPath, keyPath string, verifyCN []string) (*tls.Config, error)

ToTLSConfigWithVerify constructs a `*tls.Config` from the CA, certification and key paths, and add verify for CN.

If the CA path is empty, returns nil.

func WithRecovery

func WithRecovery(exec func(), recoverFn func(r any))

WithRecovery wraps goroutine startup call with force recovery. it will dump current goroutine stack into log if catch any recover result.

exec:      execute logic function.
recoverFn: handler will be called after recover and before dump stack, passing `nil` means noop.

func X509NameOnline

func X509NameOnline(n pkix.Name) string

X509NameOnline prints pkix.Name into old X509_NAME_oneline format. https://www.openssl.org/docs/manmaster/man3/X509_NAME_oneline.html

Types

type ErrorGroupWithRecover

type ErrorGroupWithRecover struct {
	*errgroup.Group
}

ErrorGroupWithRecover will recover panic from error group. Please note that panic will break the control flow unexpectedly, even if we recover it some key logic may be skipped due to panic, for example, Mutex.Unlock(), and continue running may cause unexpected behaviour. Use it with caution.

func NewErrorGroupWithRecover

func NewErrorGroupWithRecover() *ErrorGroupWithRecover

NewErrorGroupWithRecover creates a ErrorGroupWithRecover.

func NewErrorGroupWithRecoverWithCtx

func NewErrorGroupWithRecoverWithCtx(ctx context.Context) (*ErrorGroupWithRecover, context.Context)

NewErrorGroupWithRecoverWithCtx is like errgroup.WithContext, but returns a ErrorGroupWithRecover.

func (*ErrorGroupWithRecover) Go

func (g *ErrorGroupWithRecover) Go(fn func() error)

Go is like errgroup.Group.Go, but convert panic and its stack into error.

type IDGenerator

type IDGenerator struct {
	// contains filtered or unexported fields
}

IDGenerator util class used for generate auto-increasing id

func (*IDGenerator) GetNextID

func (g *IDGenerator) GetNextID() int

GetNextID return the id++

type OOMAlarmVariablesInfo

type OOMAlarmVariablesInfo struct {
	SessionAnalyzeVersion         int
	SessionEnabledRateLimitAction bool
	SessionMemQuotaQuery          int64
}

OOMAlarmVariablesInfo is a struct for OOM alarm variables.

type ProcessInfo

type ProcessInfo struct {
	Time                time.Time
	ExpensiveLogTime    time.Time
	ExpensiveTxnLogTime time.Time
	CurTxnCreateTime    time.Time
	Plan                any
	CursorTracker       cursor.Tracker
	StmtCtx             *stmtctx.StatementContext
	// SQLCPUUsage should be set nil for sleep command
	SQLCPUUsage           *ppcpuusage.SQLCPUUsages
	RefCountOfStmtCtx     *stmtctx.ReferenceCount
	MemTracker            *memory.Tracker
	DiskTracker           *disk.Tracker
	RunawayChecker        resourcegroup.RunawayChecker
	StatsInfo             func(any) map[string]uint64
	RuntimeStatsColl      *execdetails.RuntimeStatsColl
	User                  string
	Digest                string
	Host                  string
	DB                    string
	Info                  string
	Port                  string
	ResourceGroupName     string
	SessionAlias          string
	RedactSQL             string
	IndexNames            []string
	TableIDs              []int64
	PlanExplainRows       [][]string
	OOMAlarmVariablesInfo OOMAlarmVariablesInfo
	ID                    uint64
	CurTxnStartTS         uint64
	// MaxExecutionTime is the timeout for select statement, in milliseconds.
	// If the query takes too long, kill it.
	MaxExecutionTime uint64
	State            uint16
	Command          byte
}

ProcessInfo is a struct used for show processlist statement.

func (*ProcessInfo) Clone

func (pi *ProcessInfo) Clone() *ProcessInfo

Clone return a shallow clone copy of this processInfo.

func (*ProcessInfo) String

func (pi *ProcessInfo) String() string

func (*ProcessInfo) ToRow

func (pi *ProcessInfo) ToRow(tz *time.Location) []any

ToRow returns []interface{} for the row data of "SELECT * FROM INFORMATION_SCHEMA.PROCESSLIST".

func (*ProcessInfo) ToRowForShow

func (pi *ProcessInfo) ToRowForShow(full bool) []any

ToRowForShow returns []interface{} for the row data of "SHOW [FULL] PROCESSLIST".

type SANType

type SANType string

SANType is enum value for GlobalPrivValue.SANs keys.

type SequenceTable

type SequenceTable interface {
	GetSequenceID() int64
	GetSequenceNextVal(ctx any, dbName, seqName string) (int64, error)
	SetSequenceVal(ctx any, newVal int64, dbName, seqName string) (int64, bool, error)
}

SequenceTable is implemented by tableCommon, and it is specialised in handling sequence operation. Otherwise calling table will cause import cycle problem.

type SessionManager

type SessionManager interface {
	ShowProcessList() map[uint64]*ProcessInfo
	ShowTxnList() []*txninfo.TxnInfo
	GetProcessInfo(id uint64) (*ProcessInfo, bool)
	Kill(connectionID uint64, query bool, maxExecutionTime bool, runaway bool)
	KillAllConnections()
	UpdateTLSConfig(cfg *tls.Config)
	ServerID() uint64
	// StoreInternalSession puts the internal session pointer to the map in the SessionManager.
	StoreInternalSession(se any)
	// DeleteInternalSession deletes the internal session pointer from the map in the SessionManager.
	DeleteInternalSession(se any)
	// GetInternalSessionStartTSList gets all startTS of every transactions running in the current internal sessions.
	GetInternalSessionStartTSList() []uint64
	// CheckOldRunningTxn checks if there is an old transaction running in the current sessions
	CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string)
	// KillNonFlashbackClusterConn kill all non flashback cluster connections.
	KillNonFlashbackClusterConn()
	// GetConAttrs gets the connection attributes
	GetConAttrs(user *auth.UserIdentity) map[uint64]map[string]string
}

SessionManager is an interface for session manage. Show processlist and kill statement rely on this interface.

type SessionPool

type SessionPool interface {
	Get() (pools.Resource, error)
	Put(pools.Resource)
	Close()
}

SessionPool is a recyclable resource pool for the session.

func NewSessionPool

func NewSessionPool(capacity int, factory pools.Factory, getCallback, putCallback resourceCallback) SessionPool

NewSessionPool creates a new session pool with the given capacity and factory function.

type TCPConnWithIOCounter

type TCPConnWithIOCounter struct {
	*net.TCPConn
	// contains filtered or unexported fields
}

TCPConnWithIOCounter is a wrapper of net.TCPConn with counter that accumulates the bytes this connection reads/writes.

func (*TCPConnWithIOCounter) Read

func (t *TCPConnWithIOCounter) Read(b []byte) (n int, err error)

func (*TCPConnWithIOCounter) Write

func (t *TCPConnWithIOCounter) Write(b []byte) (n int, err error)

type TLS

type TLS struct {
	// contains filtered or unexported fields
}

TLS saves some information about tls

func NewTLS

func NewTLS(caPath, certPath, keyPath, host string, verifyCN []string) (*TLS, error)

NewTLS constructs a new HTTP client with TLS configured with the CA, certificate and key paths.

If the CA path is empty, returns an instance where TLS is disabled.

func (*TLS) WrapListener

func (tc *TLS) WrapListener(l net.Listener) net.Listener

WrapListener places a TLS layer on top of the existing listener.

type TLSConfigOption

type TLSConfigOption func(*tlsConfigBuilder)

TLSConfigOption is used to build a tls.Config in NewTLSConfig.

func WithCAContent

func WithCAContent(caContent []byte) TLSConfigOption

WithCAContent sets the CA content to build a tls.Config, and the peer should use the certificate which can be verified by this CA. It has lower priority than WithCAPath. empty `caContent` is no-op.

func WithCAPath

func WithCAPath(caPath string) TLSConfigOption

WithCAPath sets the CA path to build a tls.Config, and the peer should use the certificate which can be verified by this CA. It has higher priority than WithCAContent. empty `caPath` is no-op.

func WithCertAndKeyContent

func WithCertAndKeyContent(certContent, keyContent []byte) TLSConfigOption

WithCertAndKeyContent sets the client certificate and primary key content to build a tls.Config. It has lower priority than WithCertAndKeyPath. empty `certContent`/`keyContent` is no-op.

func WithCertAndKeyPath

func WithCertAndKeyPath(certPath, keyPath string) TLSConfigOption

WithCertAndKeyPath sets the client certificate and primary key path to build a tls.Config. It has higher priority than WithCertAndKeyContent. empty `certPath`/`keyPath` is no-op. WithCertAndKeyPath also support rotation, which means if the client certificate or primary key file is changed, the new content will be used.

func WithMinTLSVersion

func WithMinTLSVersion(minTLSVersion uint16) TLSConfigOption

WithMinTLSVersion sets the min tls version to build a tls.Config.

func WithVerifyCommonName

func WithVerifyCommonName(verifyCN []string) TLSConfigOption

WithVerifyCommonName sets the Common Name the peer must provide before starting a TLS connection. empty `verifyCN` is no-op.

type Token

type Token struct {
}

Token is used as a permission to keep on running.

type TokenLimiter

type TokenLimiter struct {
	// contains filtered or unexported fields
}

TokenLimiter is used to limit the number of concurrent tasks.

func NewTokenLimiter

func NewTokenLimiter(count uint) *TokenLimiter

NewTokenLimiter creates a TokenLimiter with count tokens.

func (*TokenLimiter) Get

func (tl *TokenLimiter) Get() *Token

Get obtains a token.

func (*TokenLimiter) Put

func (tl *TokenLimiter) Put(tk *Token)

Put releases the token.

type WaitGroupEnhancedWrapper

type WaitGroupEnhancedWrapper struct {
	sync.WaitGroup
	// contains filtered or unexported fields
}

WaitGroupEnhancedWrapper wrapper wg, it provides the basic ability of WaitGroupWrapper with checking unexited process if the `exited` signal is true by print them on log.

func NewWaitGroupEnhancedWrapper

func NewWaitGroupEnhancedWrapper(source string, exit chan struct{}, exitedCheck bool) *WaitGroupEnhancedWrapper

NewWaitGroupEnhancedWrapper returns WaitGroupEnhancedWrapper, the empty source indicates the unit test, then the `checkUnExitedProcess` won't be executed.

func (*WaitGroupEnhancedWrapper) Run

func (w *WaitGroupEnhancedWrapper) Run(exec func(), label string)

Run runs a function in a goroutine, adds 1 to WaitGroup and calls done when function returns. Please DO NOT use panic in the cb function. Note that the registered label shouldn't be duplicated.

func (*WaitGroupEnhancedWrapper) RunWithRecover

func (w *WaitGroupEnhancedWrapper) RunWithRecover(exec func(), recoverFn func(r any), label string)

RunWithRecover wraps goroutine startup call with force recovery, add 1 to WaitGroup and call done when function return. exec is that execute logic function. recoverFn is that handler will be called after recover and before dump stack, passing `nil` means noop. Note that the registered label shouldn't be duplicated.

type WaitGroupPool

type WaitGroupPool struct {
	sync.WaitGroup
	// contains filtered or unexported fields
}

WaitGroupPool is a wrapper for sync.WaitGroup and support goroutine pool

func NewWaitGroupPool

func NewWaitGroupPool(gp *gp.Pool) *WaitGroupPool

NewWaitGroupPool returns WaitGroupPool

func (*WaitGroupPool) Run

func (w *WaitGroupPool) Run(exec func())

Run runs a function in a goroutine, adds 1 to WaitGroup and calls done when function returns. Please DO NOT use panic in the cb function.

type WaitGroupWrapper

type WaitGroupWrapper struct {
	sync.WaitGroup
}

WaitGroupWrapper is a wrapper for sync.WaitGroup

func (*WaitGroupWrapper) Run

func (w *WaitGroupWrapper) Run(exec func())

Run runs a function in a goroutine, adds 1 to WaitGroup and calls done when function returns. Please DO NOT use panic in the cb function.

func (*WaitGroupWrapper) RunWithLog

func (w *WaitGroupWrapper) RunWithLog(exec func())

RunWithLog works like Run, but it also logs on panic.

func (*WaitGroupWrapper) RunWithRecover

func (w *WaitGroupWrapper) RunWithRecover(exec func(), recoverFn func(r any))

RunWithRecover wraps goroutine startup call with force recovery, add 1 to WaitGroup and call done when function return. it will dump current goroutine stack into log if catch any recover result. exec is that execute logic function. recoverFn is that handler will be called after recover and before dump stack, passing `nil` means noop.

type Worker

type Worker struct {
	ID uint64
}

Worker identified by ID.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool contains a pool of workers.

func NewWorkerPool

func NewWorkerPool(limit uint, name string) *WorkerPool

NewWorkerPool returns a WorkPool.

func (*WorkerPool) Apply

func (pool *WorkerPool) Apply(fn func())

Apply executes a task.

func (*WorkerPool) ApplyOnErrorGroup

func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error)

ApplyOnErrorGroup executes a task in an errorgroup.

func (*WorkerPool) ApplyWithID

func (pool *WorkerPool) ApplyWithID(fn func(uint64))

ApplyWithID execute a task and provides it with the worker ID.

func (*WorkerPool) ApplyWithIDInErrorGroup

func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error)

ApplyWithIDInErrorGroup executes a task in an errorgroup and provides it with the worker ID.

func (*WorkerPool) ApplyWorker

func (pool *WorkerPool) ApplyWorker() *Worker

ApplyWorker apply a worker.

func (*WorkerPool) HasWorker

func (pool *WorkerPool) HasWorker() bool

HasWorker checks if the pool has unallocated workers.

func (*WorkerPool) IdleCount

func (pool *WorkerPool) IdleCount() int

IdleCount counts how many idle workers in the pool.

func (*WorkerPool) Limit

func (pool *WorkerPool) Limit() int

Limit is the limit of the pool

func (*WorkerPool) RecycleWorker

func (pool *WorkerPool) RecycleWorker(worker *Worker)

RecycleWorker recycle a worker.

Directories

Path Synopsis
dbutiltest
Package dbutiltest is a package for some common used methods for db related testing.
Package dbutiltest is a package for some common used methods for db related testing.
linter
Package mock is just for test only.
Package mock is just for test only.
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
v2
sys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL