Documentation ¶
Index ¶
- type Option
- type Options
- type Pool
- func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF])
- func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF])
- func (p *Pool[T, U, C, CT, TF]) Cap() int
- func (p *Pool[T, U, C, CT, TF]) DeleteTask(id uint64)
- func (p *Pool[T, U, C, CT, TF]) ExitSubTask(id uint64)
- func (p *Pool[T, U, C, CT, TF]) Free() int
- func (p *Pool[T, U, C, CT, TF]) IsClosed() bool
- func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait()
- func (p *Pool[T, U, C, CT, TF]) Running() int
- func (p *Pool[T, U, C, CT, TF]) SetConsumerFunc(consumerFunc func(T, C, CT) U)
- func (p *Pool[T, U, C, CT, TF]) StopTask(id uint64)
- func (p *Pool[T, U, C, CT, TF]) Tune(size int)
- func (p *Pool[T, U, C, CT, TF]) Waiting() int
- type TaskOption
- type TaskOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(opts *Options)
Option represents the optional function.
func WithExpiryDuration ¶
WithExpiryDuration sets up the interval time of cleaning up goroutines.
func WithMaxBlockingTasks ¶
WithMaxBlockingTasks sets up the maximum number of goroutines that are blocked when it reaches the capacity of pool.
func WithNonblocking ¶
WithNonblocking indicates that pool will return nil when there is no available workers.
func WithPanicHandler ¶
func WithPanicHandler(panicHandler func(interface{})) Option
WithPanicHandler sets up panic handler.
type Options ¶
type Options struct { // PanicHandler is used to handle panics from each worker goroutine. // if nil, panics will be thrown out again from worker goroutines. PanicHandler func(interface{}) // ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers, // the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been // used for more than `ExpiryDuration`. ExpiryDuration time.Duration // LimitDuration is a period in the limit mode. LimitDuration time.Duration // Max number of goroutine blocking on pool.Submit. // 0 (default value) means no such limit. MaxBlockingTasks int // When Nonblocking is true, Pool.AddProduce will never be blocked. // ErrPoolOverload will be returned when Pool.Submit cannot be done at once. // When Nonblocking is true, MaxBlockingTasks is inoperative. Nonblocking bool }
Options contains all options which will be applied when instantiating an pool.
type Pool ¶
type Pool[T any, U any, C any, CT any, TF pooltask.Context[CT]] struct { gpool.BasePool // contains filtered or unexported fields }
Pool is a single producer, multiple consumer goroutine pool. T is the type of the task. We can treat it as input. U is the type of the result. We can treat it as output. C is the type of the const parameter. if Our task look like y = ax + b, C acts like b as const parameter. CT is the type of the context. It needs to be read/written parallel. TF is the type of the context getter. It is used to get a context. if we don't need to use CT/TF, we can define CT as any and TF as NilContext.
func NewSPMCPool ¶
func NewSPMCPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](name string, size int32, component util.Component, options ...Option) (*Pool[T, U, C, CT, TF], error)
NewSPMCPool create a single producer, multiple consumer goroutine pool.
func (*Pool[T, U, C, CT, TF]) AddProduceBySlice ¶
func (p *Pool[T, U, C, CT, TF]) AddProduceBySlice(producer func() ([]T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF])
AddProduceBySlice is to add Produce by a slice. Producer need to return ErrProducerClosed when to exit.
func (*Pool[T, U, C, CT, TF]) AddProducer ¶
func (p *Pool[T, U, C, CT, TF]) AddProducer(producer func() (T, error), constArg C, contextFn TF, options ...TaskOption) (<-chan U, pooltask.TaskController[T, U, C, CT, TF])
AddProducer is to add producer. Producer need to return ErrProducerClosed when to exit.
func (*Pool[T, U, C, CT, TF]) DeleteTask ¶
DeleteTask is to delete task. Please don't use it manually.
func (*Pool[T, U, C, CT, TF]) ExitSubTask ¶
ExitSubTask is to reduce the number of subtasks.
func (*Pool[T, U, C, CT, TF]) Free ¶
Free returns the number of available goroutines to work, -1 indicates this pool is unlimited.
func (*Pool[T, U, C, CT, TF]) ReleaseAndWait ¶
func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait()
ReleaseAndWait is like Release, it waits all workers to exit.
func (*Pool[T, U, C, CT, TF]) SetConsumerFunc ¶
func (p *Pool[T, U, C, CT, TF]) SetConsumerFunc(consumerFunc func(T, C, CT) U)
SetConsumerFunc is to set ConsumerFunc which is to process the task.
func (*Pool[T, U, C, CT, TF]) StopTask ¶
StopTask is to stop task by id Please don't use it manually.
type TaskOption ¶
type TaskOption func(opts *TaskOptions)
TaskOption represents the optional function.
func WithConcurrency ¶
func WithConcurrency(c int) TaskOption
WithConcurrency is to set the concurrency of task.
func WithResultChanLen ¶
func WithResultChanLen(resultChanLen uint64) TaskOption
WithResultChanLen is to set the length of result channel.
func WithTaskChanLen ¶
func WithTaskChanLen(taskChanLen uint64) TaskOption
WithTaskChanLen is to set the length of task channel.
type TaskOptions ¶
TaskOptions contains all options