Documentation ¶
Index ¶
- Constants
- Variables
- func GetConcurrency(ctx context.Context, sourceIDs []string, dbs map[string]*conn.BaseDB, ...) (int, error)
- func IsTiDBFromVersion(version string) bool
- func LackedPrivilegesAsStr(lackPriv map[mysql.PrivilegeType]priv) string
- func VerifyPrivileges(grants []string, lackPrivs map[mysql.PrivilegeType]priv) (map[mysql.PrivilegeType]priv, error)
- type BinlogDBChecker
- type DumperConnNumberChecker
- type Error
- type LightningCDCPiTRChecker
- type LightningClusterVersionChecker
- type LightningEmptyRegionChecker
- type LightningFreeSpaceChecker
- type LightningRegionDistributionChecker
- type LightningTableEmptyChecker
- type LoaderConnNumberChecker
- type MetaPositionChecker
- type MySQLBinlogEnableChecker
- type MySQLBinlogFormatChecker
- type MySQLBinlogRowImageChecker
- type MySQLServerIDChecker
- type MySQLVersion
- type MySQLVersionChecker
- type OnlineDDLChecker
- type OptimisticShardingTablesChecker
- type RealChecker
- func NewBinlogDBChecker(db *conn.BaseDB, dbinfo *dbutil.DBConfig, schemas map[string]struct{}, ...) RealChecker
- func NewDumperConnNumberChecker(sourceDB *conn.BaseDB, dumperThreads int) RealChecker
- func NewLightningCDCPiTRChecker(lightningChecker precheck.Checker) RealChecker
- func NewLightningClusterVersionChecker(lightningChecker precheck.Checker) RealChecker
- func NewLightningEmptyRegionChecker(lightningChecker precheck.Checker) RealChecker
- func NewLightningEmptyTableChecker(lightningChecker precheck.Checker) RealChecker
- func NewLightningFreeSpaceChecker(sourceDataSize int64, getter importer.TargetInfoGetter) RealChecker
- func NewLightningRegionDistributionChecker(lightningChecker precheck.Checker) RealChecker
- func NewLoaderConnNumberChecker(targetDB *conn.BaseDB, stCfgs []*config.SubTaskConfig) RealChecker
- func NewMetaPositionChecker(db *conn.BaseDB, sourceCfg dbconfig.DBConfig, enableGTID bool, ...) RealChecker
- func NewMySQLBinlogEnableChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
- func NewMySQLBinlogFormatChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
- func NewMySQLBinlogRowImageChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
- func NewMySQLServerIDChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
- func NewMySQLVersionChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
- func NewOnlineDDLChecker(db *sql.DB, checkSchemas map[string]struct{}, onlineDDL onlineddl.OnlinePlugin, ...) RealChecker
- func NewOptimisticShardingTablesChecker(targetTableID string, dbs map[string]*conn.BaseDB, ...) RealChecker
- func NewShardingTablesChecker(targetTableID string, dbs map[string]*conn.BaseDB, ...) RealChecker
- func NewSourceDumpPrivilegeChecker(db *sql.DB, dbinfo *dbutil.DBConfig, checkTables []filter.Table, ...) RealChecker
- func NewSourceReplicationPrivilegeChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
- func NewTablesChecker(upstreamDBs map[string]*conn.BaseDB, downstreamDB *conn.BaseDB, ...) RealChecker
- func NewTargetPrivilegeChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
- type Result
- type ResultSummary
- type Results
- type ShardingTablesChecker
- type SourceDumpPrivilegeChecker
- type SourceReplicatePrivilegeChecker
- type State
- type TablesChecker
- type TargetPrivilegeChecker
- type WorkerPool
Constants ¶
const (
// AutoIncrementKeyChecking is an identification for auto increment key checking.
AutoIncrementKeyChecking = "auto-increment key checking"
)
Variables ¶
var MaxVersion = MySQLVersion{math.MaxUint8, math.MaxUint8, math.MaxUint8}
MaxVersion define a maximum version.
var MinVersion = MySQLVersion{0, 0, 0}
MinVersion define a mininum version.
var SupportedVersion = map[string]struct { Min MySQLVersion Max MySQLVersion }{ "mysql": { MySQLVersion{5, 6, 0}, MySQLVersion{8, 1, 0}, }, }
SupportedVersion defines the MySQL/MariaDB version that DM/syncer supports * 5.6.0 <= MySQL Version < 8.1.0.
Functions ¶
func GetConcurrency ¶
func GetConcurrency(ctx context.Context, sourceIDs []string, dbs map[string]*conn.BaseDB, dumpThreads int) (int, error)
GetConcurrency gets the concurrency of workers that we can randomly dispatch tasks on any sources to any of them, where each task needs a SQL connection.
func IsTiDBFromVersion ¶
IsTiDBFromVersion tells whether the version is tidb.
func LackedPrivilegesAsStr ¶
func LackedPrivilegesAsStr(lackPriv map[mysql.PrivilegeType]priv) string
LackedPrivilegesAsStr format lacked privileges as string. lack of privilege1: {tableID1, tableID2, ...}; lack of privilege2...
func VerifyPrivileges ¶
func VerifyPrivileges( grants []string, lackPrivs map[mysql.PrivilegeType]priv, ) (map[mysql.PrivilegeType]priv, error)
VerifyPrivileges verify user privileges, returns lacked privileges. this function modifies lackPriv in place. we expose it so other component can reuse it.
Types ¶
type BinlogDBChecker ¶
type BinlogDBChecker struct {
// contains filtered or unexported fields
}
BinlogDBChecker checks if migrated dbs are in binlog_do_db or binlog_ignore_db.
func (*BinlogDBChecker) Check ¶
func (c *BinlogDBChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*BinlogDBChecker) Name ¶
func (c *BinlogDBChecker) Name() string
Name implements the RealChecker interface.
type DumperConnNumberChecker ¶
type DumperConnNumberChecker struct {
// contains filtered or unexported fields
}
func (*DumperConnNumberChecker) Check ¶
func (d *DumperConnNumberChecker) Check(ctx context.Context) *Result
Mariadb (process priv): https://mariadb.com/kb/en/show-processlist/ MySQL(process priv): https://dev.mysql.com/doc/refman/5.7/en/privileges-provided.html Aurora (process priv): https://aws.amazon.com/cn/premiumsupport/knowledge-center/rds-mysql-running-queries/
func (*DumperConnNumberChecker) Name ¶
func (d *DumperConnNumberChecker) Name() string
type Error ¶
type Error struct { Severity State `json:"severity"` ShortErr string `json:"short_error"` Self string `json:"self,omitempty"` Other string `json:"other,omitempty"` Instruction string `json:"instruction,omitempty"` }
type LightningCDCPiTRChecker ¶
type LightningCDCPiTRChecker struct {
// contains filtered or unexported fields
}
LightningCDCPiTRChecker checks whether the cluster has running CDC PiTR tasks.
func (*LightningCDCPiTRChecker) Check ¶
func (c *LightningCDCPiTRChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*LightningCDCPiTRChecker) Name ¶
func (c *LightningCDCPiTRChecker) Name() string
Name implements the RealChecker interface.
type LightningClusterVersionChecker ¶
type LightningClusterVersionChecker struct {
// contains filtered or unexported fields
}
LightningClusterVersionChecker checks whether the cluster version is compatible with Lightning.
func (*LightningClusterVersionChecker) Check ¶
func (c *LightningClusterVersionChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*LightningClusterVersionChecker) Name ¶
func (c *LightningClusterVersionChecker) Name() string
Name implements the RealChecker interface.
type LightningEmptyRegionChecker ¶
type LightningEmptyRegionChecker struct {
// contains filtered or unexported fields
}
LightningEmptyRegionChecker checks whether there are too many empty regions in the cluster.
func (*LightningEmptyRegionChecker) Check ¶
func (c *LightningEmptyRegionChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*LightningEmptyRegionChecker) Name ¶
func (c *LightningEmptyRegionChecker) Name() string
Name implements the RealChecker interface.
type LightningFreeSpaceChecker ¶
type LightningFreeSpaceChecker struct {
// contains filtered or unexported fields
}
LightningFreeSpaceChecker checks whether the cluster has enough free space.
func (*LightningFreeSpaceChecker) Check ¶
func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*LightningFreeSpaceChecker) Name ¶
func (c *LightningFreeSpaceChecker) Name() string
Name implements the RealChecker interface.
type LightningRegionDistributionChecker ¶
type LightningRegionDistributionChecker struct {
// contains filtered or unexported fields
}
LightningRegionDistributionChecker checks whether the region distribution is balanced.
func (*LightningRegionDistributionChecker) Check ¶
func (c *LightningRegionDistributionChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*LightningRegionDistributionChecker) Name ¶
func (c *LightningRegionDistributionChecker) Name() string
Name implements the RealChecker interface.
type LightningTableEmptyChecker ¶
type LightningTableEmptyChecker struct {
// contains filtered or unexported fields
}
LightningTableEmptyChecker checks whether the cluster's target table is empty.
func (*LightningTableEmptyChecker) Check ¶
func (c *LightningTableEmptyChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*LightningTableEmptyChecker) Name ¶
func (c *LightningTableEmptyChecker) Name() string
Name implements the RealChecker interface.
type LoaderConnNumberChecker ¶
type LoaderConnNumberChecker struct {
// contains filtered or unexported fields
}
func (*LoaderConnNumberChecker) Check ¶
func (l *LoaderConnNumberChecker) Check(ctx context.Context) *Result
func (*LoaderConnNumberChecker) Name ¶
func (l *LoaderConnNumberChecker) Name() string
type MetaPositionChecker ¶
type MetaPositionChecker struct {
// contains filtered or unexported fields
}
MetaPositionChecker checks if meta position for given source database is valid.
func (*MetaPositionChecker) Check ¶
func (c *MetaPositionChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*MetaPositionChecker) Name ¶
func (c *MetaPositionChecker) Name() string
Name implements the RealChecker interface.
type MySQLBinlogEnableChecker ¶
type MySQLBinlogEnableChecker struct {
// contains filtered or unexported fields
}
MySQLBinlogEnableChecker checks whether `log_bin` variable is enabled in MySQL.
func (*MySQLBinlogEnableChecker) Check ¶
func (pc *MySQLBinlogEnableChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*MySQLBinlogEnableChecker) Name ¶
func (pc *MySQLBinlogEnableChecker) Name() string
Name implements the RealChecker interface.
type MySQLBinlogFormatChecker ¶
type MySQLBinlogFormatChecker struct {
// contains filtered or unexported fields
}
MySQLBinlogFormatChecker checks mysql binlog_format.
func (*MySQLBinlogFormatChecker) Check ¶
func (pc *MySQLBinlogFormatChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*MySQLBinlogFormatChecker) Name ¶
func (pc *MySQLBinlogFormatChecker) Name() string
Name implements the RealChecker interface.
type MySQLBinlogRowImageChecker ¶
type MySQLBinlogRowImageChecker struct {
// contains filtered or unexported fields
}
MySQLBinlogRowImageChecker checks mysql binlog_row_image.
func (*MySQLBinlogRowImageChecker) Check ¶
func (pc *MySQLBinlogRowImageChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface. 'binlog_row_image' is introduced since mysql 5.6.2, and mariadb 10.1.6. > In MySQL 5.5 and earlier, full row images are always used for both before images and after images. So we need check 'binlog_row_image' after mysql 5.6.2 version and mariadb 10.1.6. ref: - https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#sysvar_binlog_row_image - https://mariadb.com/kb/en/library/replication-and-binary-log-server-system-variables/#binlog_row_image
func (*MySQLBinlogRowImageChecker) Name ¶
func (pc *MySQLBinlogRowImageChecker) Name() string
Name implements the RealChecker interface.
type MySQLServerIDChecker ¶
type MySQLServerIDChecker struct {
// contains filtered or unexported fields
}
MySQLServerIDChecker checks mysql/mariadb server ID.
func (*MySQLServerIDChecker) Check ¶
func (pc *MySQLServerIDChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface.
func (*MySQLServerIDChecker) Name ¶
func (pc *MySQLServerIDChecker) Name() string
Name implements the RealChecker interface.
type MySQLVersion ¶
type MySQLVersion [3]uint
MySQLVersion represents MySQL version number.
func (MySQLVersion) String ¶
func (v MySQLVersion) String() string
String implements the Stringer interface.
type MySQLVersionChecker ¶
type MySQLVersionChecker struct {
// contains filtered or unexported fields
}
MySQLVersionChecker checks mysql/mariadb/rds,... version.
func (*MySQLVersionChecker) Check ¶
func (pc *MySQLVersionChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface. we only support version >= 5.6.
func (*MySQLVersionChecker) Name ¶
func (pc *MySQLVersionChecker) Name() string
Name implements the RealChecker interface.
type OnlineDDLChecker ¶
type OnlineDDLChecker struct {
// contains filtered or unexported fields
}
func (*OnlineDDLChecker) Name ¶
func (c *OnlineDDLChecker) Name() string
type OptimisticShardingTablesChecker ¶
type OptimisticShardingTablesChecker struct {
// contains filtered or unexported fields
}
OptimisticShardingTablesChecker checks consistency of table structures of one sharding group in optimistic shard. * check whether they have compatible column list.
func (*OptimisticShardingTablesChecker) Check ¶
func (c *OptimisticShardingTablesChecker) Check(ctx context.Context) *Result
Check implements RealChecker interface.
func (*OptimisticShardingTablesChecker) Name ¶
func (c *OptimisticShardingTablesChecker) Name() string
Name implements Checker interface.
type RealChecker ¶
RealChecker is interface that defines RealChecker to check configurations of system. It is mainly used for configuration checking of data synchronization between database systems.
func NewBinlogDBChecker ¶
func NewBinlogDBChecker(db *conn.BaseDB, dbinfo *dbutil.DBConfig, schemas map[string]struct{}, caseSensitive bool) RealChecker
NewBinlogDBChecker returns a RealChecker.
func NewDumperConnNumberChecker ¶
func NewDumperConnNumberChecker(sourceDB *conn.BaseDB, dumperThreads int) RealChecker
func NewLightningCDCPiTRChecker ¶
func NewLightningCDCPiTRChecker(lightningChecker precheck.Checker) RealChecker
NewLightningCDCPiTRChecker creates a new LightningCDCPiTRChecker.
func NewLightningClusterVersionChecker ¶
func NewLightningClusterVersionChecker(lightningChecker precheck.Checker) RealChecker
NewLightningClusterVersionChecker creates a new LightningClusterVersionChecker.
func NewLightningEmptyRegionChecker ¶
func NewLightningEmptyRegionChecker(lightningChecker precheck.Checker) RealChecker
NewLightningEmptyRegionChecker creates a new LightningEmptyRegionChecker.
func NewLightningEmptyTableChecker ¶
func NewLightningEmptyTableChecker(lightningChecker precheck.Checker) RealChecker
NewLightningEmptyTableChecker creates a new LightningEmptyTableChecker.
func NewLightningFreeSpaceChecker ¶
func NewLightningFreeSpaceChecker(sourceDataSize int64, getter importer.TargetInfoGetter) RealChecker
NewLightningFreeSpaceChecker creates a new LightningFreeSpaceChecker.
func NewLightningRegionDistributionChecker ¶
func NewLightningRegionDistributionChecker(lightningChecker precheck.Checker) RealChecker
NewLightningRegionDistributionChecker creates a new LightningRegionDistributionChecker.
func NewLoaderConnNumberChecker ¶
func NewLoaderConnNumberChecker(targetDB *conn.BaseDB, stCfgs []*config.SubTaskConfig) RealChecker
func NewMetaPositionChecker ¶
func NewMetaPositionChecker(db *conn.BaseDB, sourceCfg dbconfig.DBConfig, enableGTID bool, meta *config.Meta) RealChecker
NewBinlogDBChecker returns a RealChecker.
func NewMySQLBinlogEnableChecker ¶
func NewMySQLBinlogEnableChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
NewMySQLBinlogEnableChecker returns a RealChecker.
func NewMySQLBinlogFormatChecker ¶
func NewMySQLBinlogFormatChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
NewMySQLBinlogFormatChecker returns a RealChecker.
func NewMySQLBinlogRowImageChecker ¶
func NewMySQLBinlogRowImageChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
NewMySQLBinlogRowImageChecker returns a RealChecker.
func NewMySQLServerIDChecker ¶
func NewMySQLServerIDChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
NewMySQLServerIDChecker returns a RealChecker.
func NewMySQLVersionChecker ¶
func NewMySQLVersionChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
NewMySQLVersionChecker returns a RealChecker.
func NewOnlineDDLChecker ¶
func NewOnlineDDLChecker(db *sql.DB, checkSchemas map[string]struct{}, onlineDDL onlineddl.OnlinePlugin, bwlist *filter.Filter) RealChecker
func NewOptimisticShardingTablesChecker ¶
func NewOptimisticShardingTablesChecker( targetTableID string, dbs map[string]*conn.BaseDB, tableMap map[string][]filter.Table, dumpThreads int, ) RealChecker
NewOptimisticShardingTablesChecker returns a RealChecker.
func NewShardingTablesChecker ¶
func NewShardingTablesChecker( targetTableID string, dbs map[string]*conn.BaseDB, tableMap map[string][]filter.Table, checkAutoIncrementPrimaryKey bool, dumpThreads int, ) RealChecker
NewShardingTablesChecker returns a RealChecker.
func NewSourceDumpPrivilegeChecker ¶
func NewSourceDumpPrivilegeChecker( db *sql.DB, dbinfo *dbutil.DBConfig, checkTables []filter.Table, consistency string, dumpWholeInstance bool, ) RealChecker
NewSourceDumpPrivilegeChecker returns a RealChecker.
func NewSourceReplicationPrivilegeChecker ¶
func NewSourceReplicationPrivilegeChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
NewSourceReplicationPrivilegeChecker returns a RealChecker.
func NewTablesChecker ¶
func NewTablesChecker( upstreamDBs map[string]*conn.BaseDB, downstreamDB *conn.BaseDB, tableMap map[string]map[filter.Table][]filter.Table, extendedColumnPerTable map[filter.Table][]string, dumpThreads int, ) RealChecker
NewTablesChecker returns a RealChecker.
func NewTargetPrivilegeChecker ¶
func NewTargetPrivilegeChecker(db *sql.DB, dbinfo *dbutil.DBConfig) RealChecker
type Result ¶
type Result struct { ID uint64 `json:"id"` Name string `json:"name"` Desc string `json:"desc"` State State `json:"state"` Errors []*Error `json:"errors,omitempty"` Instruction string `json:"instruction,omitempty"` Extra string `json:"extra,omitempty"` }
Result is result of check.
type ResultSummary ¶
type ResultSummary struct { Passed bool `json:"passed"` Total int64 `json:"total"` Successful int64 `json:"successful"` Failed int64 `json:"failed"` Warning int64 `json:"warning"` }
ResultSummary is summary of all check results.
type Results ¶
type Results struct { Results []*Result `json:"results"` Summary *ResultSummary `json:"summary"` }
Results contains all check results and summary.
type ShardingTablesChecker ¶
type ShardingTablesChecker struct {
// contains filtered or unexported fields
}
ShardingTablesChecker checks consistency of table structures of one sharding group * check whether they have same column list * check whether they have auto_increment key.
func (*ShardingTablesChecker) Check ¶
func (c *ShardingTablesChecker) Check(ctx context.Context) *Result
Check implements RealChecker interface.
func (*ShardingTablesChecker) Name ¶
func (c *ShardingTablesChecker) Name() string
Name implements Checker interface.
type SourceDumpPrivilegeChecker ¶
type SourceDumpPrivilegeChecker struct {
// contains filtered or unexported fields
}
SourceDumpPrivilegeChecker checks dump privileges of source DB.
func (*SourceDumpPrivilegeChecker) Check ¶
func (pc *SourceDumpPrivilegeChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface. We check RELOAD, SELECT, LOCK TABLES privileges according to consistency.
func (*SourceDumpPrivilegeChecker) Name ¶
func (pc *SourceDumpPrivilegeChecker) Name() string
Name implements the RealChecker interface.
type SourceReplicatePrivilegeChecker ¶
type SourceReplicatePrivilegeChecker struct {
// contains filtered or unexported fields
}
SourceReplicatePrivilegeChecker checks replication privileges of source DB.
func (*SourceReplicatePrivilegeChecker) Check ¶
func (pc *SourceReplicatePrivilegeChecker) Check(ctx context.Context) *Result
Check implements the RealChecker interface. We only check REPLICATION SLAVE, REPLICATION CLIENT privileges.
func (*SourceReplicatePrivilegeChecker) Name ¶
func (pc *SourceReplicatePrivilegeChecker) Name() string
Name implements the RealChecker interface.
type TablesChecker ¶
type TablesChecker struct {
// contains filtered or unexported fields
}
TablesChecker checks compatibility of table structures, there are differences between MySQL and TiDB. In generally we need to check definitions of columns, constraints and table options. Because of the early TiDB engineering design, we did not have a complete list of check items, which are all based on experience now.
func (*TablesChecker) Check ¶
func (c *TablesChecker) Check(ctx context.Context) *Result
Check implements RealChecker interface.
func (*TablesChecker) Name ¶
func (c *TablesChecker) Name() string
Name implements RealChecker interface.
type TargetPrivilegeChecker ¶
type TargetPrivilegeChecker struct {
// contains filtered or unexported fields
}
func (*TargetPrivilegeChecker) Check ¶
func (t *TargetPrivilegeChecker) Check(ctx context.Context) *Result
func (*TargetPrivilegeChecker) Name ¶
func (t *TargetPrivilegeChecker) Name() string
type WorkerPool ¶
type WorkerPool[J, R any] struct { // contains filtered or unexported fields }
WorkerPool is a easy-to-use worker pool that can start workers by Go then use PutJob to send jobs to worker. After worker finished a job, the result is sequentially called by resultHandler function which is the parameter of NewWorkerPool or NewWorkerPoolWithContext. After caller send all jobs, it can call Wait to make sure all jobs are finished. The type parameter J means job, R means result. Type J MUST only share concurrent-safe member like *sql.DB.
func NewWorkerPool ¶
func NewWorkerPool[J, R any](resultHandler func(R)) *WorkerPool[J, R]
NewWorkerPool creates a new worker pool. The type parameter J means job, R means result. Type J MUST only share concurrent-safe member like *sql.DB.
func NewWorkerPoolWithContext ¶
func NewWorkerPoolWithContext[J, R any]( ctx context.Context, resultHandler func(R), ) *WorkerPool[J, R]
NewWorkerPoolWithContext creates a new worker pool with a context which may be canceled from caller. The type parameter J means job, R means result. Type J MUST only share concurrent-safe member like *sql.DB.
func (*WorkerPool[J, R]) Go ¶
func (p *WorkerPool[J, R]) Go(handler func(ctx context.Context, job J) (R, error))
Go is like a builtin go keyword. handler represents the logic of worker, if the worker has initializing logic, caller can use method of structure or closure to refer to the initialized part.
func (*WorkerPool[J, R]) PutJob ¶
func (p *WorkerPool[J, R]) PutJob(job J) bool
PutJob sends a job to worker pool. The return value means whether the workers are stopped so caller can stop early.
func (*WorkerPool[J, R]) Wait ¶
func (p *WorkerPool[J, R]) Wait() error
Wait waits all workers to finish. It will return the first error occurred in workers, or nil if no error. Other methods should not be called concurrent with Wait or after Wait.