Documentation ¶
Index ¶
- type None
- type Option
- type TaskMayPanic
- type Worker
- type WorkerPool
- func (p *WorkerPool[T, R]) AddTask(task T)
- func (p *WorkerPool[T, R]) Cap() int32
- func (p *WorkerPool[T, R]) GetOriginConcurrency() int32
- func (p *WorkerPool[T, R]) GetResultChan() <-chan R
- func (p *WorkerPool[T, R]) LastTunerTs() time.Time
- func (p *WorkerPool[T, R]) Name() string
- func (p *WorkerPool[T, R]) Release()
- func (p *WorkerPool[T, R]) ReleaseAndWait()
- func (p *WorkerPool[T, R]) Running() int32
- func (p *WorkerPool[T, R]) SetResultSender(sender chan R)
- func (p *WorkerPool[T, R]) SetTaskReceiver(recv chan T)
- func (p *WorkerPool[T, R]) Start(ctx context.Context)
- func (p *WorkerPool[T, R]) Tune(numWorkers int32)
- func (p *WorkerPool[T, R]) Wait()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type None ¶
type None struct{}
None is a type placeholder for the worker pool that does not have a result receiver.
type Option ¶
type Option[T TaskMayPanic, R any] interface { Apply(pool *WorkerPool[T, R]) }
Option is the config option for WorkerPool.
type TaskMayPanic ¶
type TaskMayPanic interface { // RecoverArgs returns the argument for pkg/util.Recover function of this task. RecoverArgs() (metricsLabel string, funcInfo string, recoverFn func(), quit bool) }
TaskMayPanic is a type to remind the developer that need to handle panic in the task.
type Worker ¶
type Worker[T TaskMayPanic, R any] interface { // HandleTask consumes a task(T) and produces a result(R). // The result is sent to the result channel by calling `send` function. HandleTask(task T, send func(R)) Close() }
Worker is worker interface.
type WorkerPool ¶
type WorkerPool[T TaskMayPanic, R any] struct { // contains filtered or unexported fields }
WorkerPool is a pool of workers.
func NewWorkerPool ¶
func NewWorkerPool[T TaskMayPanic, R any]( name string, _ util.Component, numWorkers int, createWorker func() Worker[T, R], opts ...Option[T, R], ) *WorkerPool[T, R]
NewWorkerPool creates a new worker pool.
func (*WorkerPool[T, R]) AddTask ¶
func (p *WorkerPool[T, R]) AddTask(task T)
AddTask adds a task to the pool.
func (*WorkerPool[T, R]) Cap ¶
func (p *WorkerPool[T, R]) Cap() int32
Cap returns the capacity of the pool.
func (*WorkerPool[T, R]) GetOriginConcurrency ¶
func (p *WorkerPool[T, R]) GetOriginConcurrency() int32
GetOriginConcurrency return the concurrency of the pool at the init.
func (*WorkerPool[T, R]) GetResultChan ¶
func (p *WorkerPool[T, R]) GetResultChan() <-chan R
GetResultChan gets the result channel from the pool.
func (*WorkerPool[T, R]) LastTunerTs ¶
func (p *WorkerPool[T, R]) LastTunerTs() time.Time
LastTunerTs returns the last time when the pool was tuned.
func (*WorkerPool[T, R]) Name ¶
func (p *WorkerPool[T, R]) Name() string
Name returns the name of the pool.
func (*WorkerPool[T, R]) ReleaseAndWait ¶
func (p *WorkerPool[T, R]) ReleaseAndWait()
ReleaseAndWait releases the pool and wait for complete.
func (*WorkerPool[T, R]) Running ¶
func (p *WorkerPool[T, R]) Running() int32
Running returns the number of running workers.
func (*WorkerPool[T, R]) SetResultSender ¶
func (p *WorkerPool[T, R]) SetResultSender(sender chan R)
SetResultSender sets the result sender for the pool.
func (*WorkerPool[T, R]) SetTaskReceiver ¶
func (p *WorkerPool[T, R]) SetTaskReceiver(recv chan T)
SetTaskReceiver sets the task receiver for the pool.
func (*WorkerPool[T, R]) Start ¶
func (p *WorkerPool[T, R]) Start(ctx context.Context)
Start starts default count of workers.
func (*WorkerPool[T, R]) Tune ¶
func (p *WorkerPool[T, R]) Tune(numWorkers int32)
Tune tunes the pool to the specified number of workers.
func (*WorkerPool[T, R]) Wait ¶
func (p *WorkerPool[T, R]) Wait()
Wait waits for all workers to complete.