Documentation ¶
Index ¶
- Constants
- Variables
- func StartCancellationMonitor(ctx context.Context, cancel context.CancelFunc, wg WaitGroup, ...)
- type AnnotatedWaitGroup
- type AnnotatedWgAQ
- type AnnotatedWgAdder
- type AnnotatedWgCounter
- type AnnotatedWgQuitter
- type AnnotatedWgWaiter
- type CancelStream
- type CancelStreamR
- type CancelStreamW
- type CancelWorkSignal
- type Duplex
- type DuplexJobOutput
- type ExecutiveFunc
- type FuncPool
- type GoRoutineID
- type GoRoutineName
- type IDGenerator
- type InputParam
- type Job
- type JobOutput
- type JobOutputStream
- type JobOutputStreamR
- type JobOutputStreamW
- type JobStream
- type JobStreamR
- type JobStreamW
- type ManifoldFunc
- type ManifoldFuncPool
- func (p *ManifoldFuncPool) CancelCh() CancelStreamR
- func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context)
- func (p *ManifoldFuncPool) Observe() JobOutputStreamR[O]
- func (p *ManifoldFuncPool[I, O]) Post(ctx context.Context, input I) error
- func (p *ManifoldFuncPool) Release(ctx context.Context)
- func (p *ManifoldFuncPool) Running() int
- func (p *ManifoldFuncPool[I, O]) Source(ctx context.Context, wg WaitGroup) SourceStreamW[I]
- func (p *ManifoldFuncPool) Waiting() int
- type NewWorkerPoolParamsL
- type OnCancel
- type Option
- type Options
- type PoolFunc
- type PoolResult
- type PoolResultStream
- type PoolResultStreamR
- type PoolResultStreamW
- type Sequential
- type SourceStream
- type SourceStreamR
- type SourceStreamW
- type TaskFunc
- type TaskPool
- type TrackableWaitGroup
- type Tracker
- type WaitGroup
- type WaitGroupAn
- type WaitGroupName
- type WorkerPoolL
Constants ¶
const ( // TODO: This is just temporary, channel size definition still needs to be // fine tuned // DefaultChSize = 100 )
const (
MaxWorkers = 100
)
Variables ¶
var ( WithDisablePurge = ants.WithDisablePurge WithExpiryDuration = ants.WithExpiryDuration WithGenerator = ants.WithGenerator WithInput = ants.WithInput WithMaxBlockingTasks = ants.WithMaxBlockingTasks WithNonblocking = ants.WithNonblocking WithOptions = ants.WithOptions WithOutput = ants.WithOutput WithPanicHandler = ants.WithPanicHandler WithPreAlloc = ants.WithPreAlloc WithSize = ants.WithSize )
Functions ¶
func StartCancellationMonitor ¶ added in v0.6.0
func StartCancellationMonitor(ctx context.Context, cancel context.CancelFunc, wg WaitGroup, cancelCh CancelStreamR, on OnCancel, )
StartCancellationMonitor
Types ¶
type AnnotatedWaitGroup ¶
type AnnotatedWaitGroup struct {
// contains filtered or unexported fields
}
AnnotatedWaitGroup is a wrapper around the standard WaitGroup that provides annotations to wait group operations that can assist in diagnosing concurrency issues.
func (*AnnotatedWaitGroup) Add ¶
func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName)
Add wraps the standard WaitGroup Add operation with the addition of being able to associate a go routine (identified by a client provided name) with the Add request.
func (*AnnotatedWaitGroup) Count ¶
func (d *AnnotatedWaitGroup) Count() int
func (*AnnotatedWaitGroup) Done ¶
func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName)
Done wraps the standard WaitGroup Done operation with the addition of being able to associate a go routine (identified by a client provided name) with the Done request.
func (*AnnotatedWaitGroup) Wait ¶
func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName)
Wait wraps the standard WaitGroup Wait operation with the addition of being able to associate a go routine (identified by a client provided name) with the Wait request.
type AnnotatedWgAQ ¶
type AnnotatedWgAQ interface { AnnotatedWgAdder AnnotatedWgQuitter }
AnnotatedWgAQ is the interface that is a restricted view of a wait group that allows adding to the wait group and Done signalling with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.
type AnnotatedWgAdder ¶
type AnnotatedWgAdder interface {
Add(delta int, name ...GoRoutineName)
}
AnnotatedWgAdder is the interface that is a restricted view of a wait group that only allows adding to the wait group with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.
type AnnotatedWgCounter ¶
type AnnotatedWgCounter interface {
Count() int
}
AnnotatedWgCounter is the interface that is a restricted view of a wait group that only allows querying the wait group count. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.
type AnnotatedWgQuitter ¶
type AnnotatedWgQuitter interface {
Done(name ...GoRoutineName)
}
AnnotatedWgQuitter is the interface that is a restricted view of a wait group that only allows Done signalling on the wait group with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.
type AnnotatedWgWaiter ¶
type AnnotatedWgWaiter interface {
Wait(name ...GoRoutineName)
}
AnnotatedWgWaiter is the interface that is a restricted view of a wait group that only allows waiting on the wait group with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.
type CancelStream ¶
type CancelStream = chan CancelWorkSignal
type CancelStreamR ¶
type CancelStreamR = <-chan CancelWorkSignal
type CancelStreamW ¶
type CancelStreamW = chan<- CancelWorkSignal
type CancelWorkSignal ¶
type CancelWorkSignal struct{}
type Duplex ¶ added in v0.4.0
type Duplex[T any] struct { Channel chan T ReaderCh <-chan T WriterCh chan<- T }
Duplex represents a channel with multiple views, to be used by clients that need to hand out different ends of the same channel to different entities.
type DuplexJobOutput ¶ added in v0.4.0
type ExecutiveFunc ¶
func (ExecutiveFunc[I, O]) Invoke ¶
func (f ExecutiveFunc[I, O]) Invoke(j Job[I]) (JobOutput[O], error)
type FuncPool ¶ added in v0.6.0
type FuncPool[I, O any] struct { // contains filtered or unexported fields }
func NewFuncPool ¶ added in v0.6.0
func NewFuncPool[I, O any](ctx context.Context, pf ants.PoolFunc, wg WaitGroup, options ...Option, ) (*FuncPool[I, O], error)
NewFuncPool creates a new worker pool using the native ants interface; ie new jobs are submitted with Submit(task TaskFunc)
func (*FuncPool) Post ¶ added in v0.6.0
func (p *FuncPool) Post(ctx context.Context, job InputParam) error
Post submits a task to the pool.
type GoRoutineID ¶
type GoRoutineID string
type GoRoutineName ¶
type GoRoutineName string
type IDGenerator ¶ added in v0.6.0
type IDGenerator = ants.IDGenerator
type InputParam ¶ added in v0.6.0
type InputParam = ants.InputParam
type JobOutputStream ¶ added in v0.4.0
type JobOutputStreamR ¶ added in v0.4.0
type JobOutputStreamW ¶ added in v0.4.0
type JobStreamR ¶
type JobStreamW ¶
type ManifoldFunc ¶ added in v0.6.0
ManifoldFunc is the pre-defined function registered with the worker pool, executed for each incoming job.
type ManifoldFuncPool ¶ added in v0.6.0
type ManifoldFuncPool[I, O any] struct { // contains filtered or unexported fields }
ManifoldFuncPool is a wrapper around the underlying ants function based worker pool. The client is expected to create an output channel to receive the outputs of executing jobs in the worker pool. If the output channel is not defined, then jobs will still be executed, but the output of which will not be sent, also losing job execution error status.
func NewManifoldFuncPool ¶ added in v0.6.0
func NewManifoldFuncPool[I, O any](ctx context.Context, mf ManifoldFunc[I, O], wg WaitGroup, options ...Option, ) (*ManifoldFuncPool[I, O], error)
NewManifoldFuncPool creates a new manifold function based worker pool.
func (*ManifoldFuncPool) CancelCh ¶ added in v0.6.0
func (p *ManifoldFuncPool) CancelCh() CancelStreamR
CancelCh
func (*ManifoldFuncPool[I, O]) Conclude ¶ added in v0.6.0
func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context)
Conclude signifies to the worker pool that no more work will be submitted. When submitting to the pool directly using the Post method, the client must call this method. Failure to do so will result in a pool that never ends. When the client elects to use an input channel, by invoking Source, then Conclude will be called automatically as long as the input channel has been closed. Failure to close the channel will again result in a never ending worker pool.
func (*ManifoldFuncPool) Observe ¶ added in v0.6.0
func (p *ManifoldFuncPool) Observe() JobOutputStreamR[O]
Observe
func (*ManifoldFuncPool[I, O]) Post ¶ added in v0.6.0
func (p *ManifoldFuncPool[I, O]) Post(ctx context.Context, input I) error
Post allows the client to submit to the work pool represented by input values of type I.
func (*ManifoldFuncPool) Release ¶ added in v0.6.0
Release closes this pool and releases the worker queue.
func (*ManifoldFuncPool) Running ¶ added in v0.6.0
func (p *ManifoldFuncPool) Running() int
Running returns the number of workers currently running.
func (*ManifoldFuncPool[I, O]) Source ¶ added in v0.6.0
func (p *ManifoldFuncPool[I, O]) Source(ctx context.Context, wg WaitGroup, ) SourceStreamW[I]
Source returns an input stream through which the client can submit jobs to the pool. Using an input stream vs invoking Post is mutually exclusive; that is to say, if Source is called, then Post must not be called; any such invocations will be ignored.
type NewWorkerPoolParamsL ¶ added in v0.6.0
type NewWorkerPoolParamsL[I, O any] struct { NoWorkers int OutputChTimeout time.Duration Exec ExecutiveFunc[I, O] JobsCh JobStream[I] CancelCh CancelStream WaitAQ AnnotatedWgAQ Logger *slog.Logger }
NewWorkerPoolParamsL Deprecated: use ants base worker-pool instead.
type OnCancel ¶ added in v0.6.0
type OnCancel func()
OnCancel is the callback required by StartCancellationMonitor
type PoolResult ¶ added in v0.4.0
type PoolResult struct {
Error error
}
type PoolResultStream ¶ added in v0.4.0
type PoolResultStream = chan *PoolResult
type PoolResultStreamR ¶ added in v0.4.0
type PoolResultStreamR = <-chan *PoolResult
type PoolResultStreamW ¶ added in v0.4.0
type PoolResultStreamW = chan<- *PoolResult
type Sequential ¶ added in v0.6.0
type Sequential struct { Format string // contains filtered or unexported fields }
func (*Sequential) Generate ¶ added in v0.6.0
func (g *Sequential) Generate() string
type SourceStream ¶ added in v0.6.0
type SourceStream[I any] chan I
type SourceStreamR ¶ added in v0.6.0
type SourceStreamR[I any] <-chan I
type SourceStreamW ¶ added in v0.6.0
type SourceStreamW[I any] chan<- I
type TaskPool ¶ added in v0.6.0
type TaskPool[I, O any] struct { // contains filtered or unexported fields }
func NewTaskPool ¶ added in v0.6.0
func NewTaskPool[I, O any](ctx context.Context, wg WaitGroup, options ...Option, ) (*TaskPool[I, O], error)
NewTaskPool creates a new worker pool using the native ants interface; ie new jobs are submitted with Submit(task TaskFunc)
type TrackableWaitGroup ¶ added in v0.6.1
type TrackableWaitGroup struct {
// contains filtered or unexported fields
}
TrackableWaitGroup
func (*TrackableWaitGroup) Count ¶ added in v0.6.1
func (t *TrackableWaitGroup) Count() int32
type WaitGroup ¶ added in v0.6.1
type WaitGroup interface { Add(delta int) Done() Wait() }
WaitGroup allows the core sync.WaitGroup to be decorated by the client for debugging purposes.
func TrackWaitGroup ¶ added in v0.6.1
TrackWaitGroup returns a trackable wait group for the native sync wait group specified; useful for debugging purposes.
type WaitGroupAn ¶
type WaitGroupAn interface { AnnotatedWgAdder AnnotatedWgQuitter AnnotatedWgWaiter AnnotatedWgCounter }
WaitGroupAn the extended WaitGroup Deprecated: use ants base worker-pool instead.
func NewAnnotatedWaitGroup ¶
func NewAnnotatedWaitGroup(name string, log ...*slog.Logger) WaitGroupAn
NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing the core WaitGroup instance. Deprecated: use ants base worker-pool instead.
type WaitGroupName ¶
type WaitGroupName string
type WorkerPoolL ¶ added in v0.6.0
type WorkerPoolL[I, O any] struct { RoutineName GoRoutineName WaitAQ AnnotatedWgAQ ResultInCh PoolResultStreamR Logger *slog.Logger // contains filtered or unexported fields }
WorkerPoolL owns the resultOut channel, because it is the only entity that knows when all workers have completed their work due to the finished channel, which it also owns. Deprecated: use ants base worker-pool instead.
func NewWorkerPoolL ¶ added in v0.6.0
func NewWorkerPoolL[I, O any](params *NewWorkerPoolParamsL[I, O]) *WorkerPoolL[I, O]
NewWorkerPoolL Deprecated: use ants base worker-pool instead.
func (*WorkerPoolL[I, O]) Start ¶ added in v0.6.0
func (p *WorkerPoolL[I, O]) Start( parentContext context.Context, parentCancel context.CancelFunc, outputsChOut chan<- JobOutput[O], )