workerpool

package
v1.1.0-beta.0...-6463db6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type None

type None struct{}

None is a type placeholder for the worker pool that does not have a result receiver.

type Option

type Option[T TaskMayPanic, R any] interface {
	Apply(pool *WorkerPool[T, R])
}

Option is the config option for WorkerPool.

type TaskMayPanic

type TaskMayPanic interface {
	// RecoverArgs returns the argument for pkg/util.Recover function of this task.
	RecoverArgs() (metricsLabel string, funcInfo string, recoverFn func(), quit bool)
}

TaskMayPanic is a type to remind the developer that need to handle panic in the task.

type Worker

type Worker[T TaskMayPanic, R any] interface {
	// HandleTask consumes a task(T) and produces a result(R).
	// The result is sent to the result channel by calling `send` function.
	HandleTask(task T, send func(R))
	Close()
}

Worker is worker interface.

type WorkerPool

type WorkerPool[T TaskMayPanic, R any] struct {
	// contains filtered or unexported fields
}

WorkerPool is a pool of workers.

func NewWorkerPool

func NewWorkerPool[T TaskMayPanic, R any](
	name string,
	_ util.Component,
	numWorkers int,
	createWorker func() Worker[T, R],
	opts ...Option[T, R],
) *WorkerPool[T, R]

NewWorkerPool creates a new worker pool.

func (*WorkerPool[T, R]) AddTask

func (p *WorkerPool[T, R]) AddTask(task T)

AddTask adds a task to the pool.

func (*WorkerPool[T, R]) Cap

func (p *WorkerPool[T, R]) Cap() int32

Cap returns the capacity of the pool.

func (*WorkerPool[T, R]) GetOriginConcurrency

func (p *WorkerPool[T, R]) GetOriginConcurrency() int32

GetOriginConcurrency return the concurrency of the pool at the init.

func (*WorkerPool[T, R]) GetResultChan

func (p *WorkerPool[T, R]) GetResultChan() <-chan R

GetResultChan gets the result channel from the pool.

func (*WorkerPool[T, R]) LastTunerTs

func (p *WorkerPool[T, R]) LastTunerTs() time.Time

LastTunerTs returns the last time when the pool was tuned.

func (*WorkerPool[T, R]) Name

func (p *WorkerPool[T, R]) Name() string

Name returns the name of the pool.

func (*WorkerPool[T, R]) Release

func (p *WorkerPool[T, R]) Release()

Release releases the pool.

func (*WorkerPool[T, R]) ReleaseAndWait

func (p *WorkerPool[T, R]) ReleaseAndWait()

ReleaseAndWait releases the pool and wait for complete.

func (*WorkerPool[T, R]) Running

func (p *WorkerPool[T, R]) Running() int32

Running returns the number of running workers.

func (*WorkerPool[T, R]) SetResultSender

func (p *WorkerPool[T, R]) SetResultSender(sender chan R)

SetResultSender sets the result sender for the pool.

func (*WorkerPool[T, R]) SetTaskReceiver

func (p *WorkerPool[T, R]) SetTaskReceiver(recv chan T)

SetTaskReceiver sets the task receiver for the pool.

func (*WorkerPool[T, R]) Start

func (p *WorkerPool[T, R]) Start(ctx context.Context)

Start starts default count of workers.

func (*WorkerPool[T, R]) Tune

func (p *WorkerPool[T, R]) Tune(numWorkers int32)

Tune tunes the pool to the specified number of workers.

func (*WorkerPool[T, R]) Wait

func (p *WorkerPool[T, R]) Wait()

Wait waits for all workers to complete.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL