boost

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2024 License: MIT Imports: 15 Imported by: 4

Documentation

Index

Constants

View Source
const (
	// TODO: This is just temporary, channel size definition still needs to be
	// fine tuned
	//
	DefaultChSize = 100
)
View Source
const (
	MaxWorkers = 100
)

Variables

View Source
var (
	WithDisablePurge     = ants.WithDisablePurge
	WithExpiryDuration   = ants.WithExpiryDuration
	WithGenerator        = ants.WithGenerator
	WithInput            = ants.WithInput
	WithMaxBlockingTasks = ants.WithMaxBlockingTasks
	WithNonblocking      = ants.WithNonblocking
	WithOptions          = ants.WithOptions
	WithOutput           = ants.WithOutput
	WithPanicHandler     = ants.WithPanicHandler
	WithPreAlloc         = ants.WithPreAlloc
	WithSize             = ants.WithSize
)

Functions

func StartCancellationMonitor added in v0.6.0

func StartCancellationMonitor(ctx context.Context,
	cancel context.CancelFunc,
	wg WaitGroup,
	cancelCh CancelStreamR,
	on OnCancel,
)

StartCancellationMonitor

Types

type AnnotatedWaitGroup

type AnnotatedWaitGroup struct {
	// contains filtered or unexported fields
}

AnnotatedWaitGroup is a wrapper around the standard WaitGroup that provides annotations to wait group operations that can assist in diagnosing concurrency issues.

func (*AnnotatedWaitGroup) Add

func (d *AnnotatedWaitGroup) Add(delta int, name ...GoRoutineName)

Add wraps the standard WaitGroup Add operation with the addition of being able to associate a go routine (identified by a client provided name) with the Add request.

func (*AnnotatedWaitGroup) Count

func (d *AnnotatedWaitGroup) Count() int

func (*AnnotatedWaitGroup) Done

func (d *AnnotatedWaitGroup) Done(name ...GoRoutineName)

Done wraps the standard WaitGroup Done operation with the addition of being able to associate a go routine (identified by a client provided name) with the Done request.

func (*AnnotatedWaitGroup) Wait

func (d *AnnotatedWaitGroup) Wait(name ...GoRoutineName)

Wait wraps the standard WaitGroup Wait operation with the addition of being able to associate a go routine (identified by a client provided name) with the Wait request.

type AnnotatedWgAQ

type AnnotatedWgAQ interface {
	AnnotatedWgAdder
	AnnotatedWgQuitter
}

AnnotatedWgAQ is the interface that is a restricted view of a wait group that allows adding to the wait group and Done signalling with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.

type AnnotatedWgAdder

type AnnotatedWgAdder interface {
	Add(delta int, name ...GoRoutineName)
}

AnnotatedWgAdder is the interface that is a restricted view of a wait group that only allows adding to the wait group with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.

type AnnotatedWgCounter

type AnnotatedWgCounter interface {
	Count() int
}

AnnotatedWgCounter is the interface that is a restricted view of a wait group that only allows querying the wait group count. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.

type AnnotatedWgQuitter

type AnnotatedWgQuitter interface {
	Done(name ...GoRoutineName)
}

AnnotatedWgQuitter is the interface that is a restricted view of a wait group that only allows Done signalling on the wait group with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.

type AnnotatedWgWaiter

type AnnotatedWgWaiter interface {
	Wait(name ...GoRoutineName)
}

AnnotatedWgWaiter is the interface that is a restricted view of a wait group that only allows waiting on the wait group with the addition of being able to specify the name representing the calling go routine. This interface can be acquired from the wait group using a standard interface type query. Deprecated: use ants base worker-pool instead.

type CancelStream

type CancelStream = chan CancelWorkSignal

type CancelStreamR

type CancelStreamR = <-chan CancelWorkSignal

type CancelStreamW

type CancelStreamW = chan<- CancelWorkSignal

type CancelWorkSignal

type CancelWorkSignal struct{}

type Duplex added in v0.4.0

type Duplex[T any] struct {
	Channel  chan T
	ReaderCh <-chan T
	WriterCh chan<- T
}

Duplex represents a channel with multiple views, to be used by clients that need to hand out different ends of the same channel to different entities.

func NewDuplex added in v0.4.0

func NewDuplex[T any](channel chan T) *Duplex[T]

NewDuplex creates a new instance of a Duplex with all members populated

type DuplexJobOutput added in v0.4.0

type DuplexJobOutput[O any] Duplex[JobOutput[O]]

type ExecutiveFunc

type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)

func (ExecutiveFunc[I, O]) Invoke

func (f ExecutiveFunc[I, O]) Invoke(j Job[I]) (JobOutput[O], error)

type FuncPool added in v0.6.0

type FuncPool[I, O any] struct {
	// contains filtered or unexported fields
}

func NewFuncPool added in v0.6.0

func NewFuncPool[I, O any](ctx context.Context,
	pf ants.PoolFunc,
	wg WaitGroup,
	options ...Option,
) (*FuncPool[I, O], error)

NewFuncPool creates a new worker pool using the native ants interface; ie new jobs are submitted with Submit(task TaskFunc)

func (*FuncPool) CancelCh added in v0.6.0

func (p *FuncPool) CancelCh() CancelStreamR

CancelCh

func (*FuncPool) Observe added in v0.6.0

func (p *FuncPool) Observe() JobOutputStreamR[O]

Observe

func (*FuncPool) Post added in v0.6.0

func (p *FuncPool) Post(ctx context.Context, job InputParam) error

Post submits a task to the pool.

func (*FuncPool) Release added in v0.6.0

func (p *FuncPool) Release(ctx context.Context)

Release closes this pool and releases the worker queue.

func (*FuncPool) Running added in v0.6.0

func (p *FuncPool) Running() int

Running returns the number of workers currently running.

func (*FuncPool) Waiting added in v0.6.0

func (p *FuncPool) Waiting() int

Waiting returns the number of tasks waiting to be executed.

type GoRoutineID

type GoRoutineID string

type GoRoutineName

type GoRoutineName string

type IDGenerator added in v0.6.0

type IDGenerator = ants.IDGenerator

type InputParam added in v0.6.0

type InputParam = ants.InputParam

type Job

type Job[I any] struct {
	ID         string
	SequenceNo int
	Input      I
}

type JobOutput

type JobOutput[O any] struct {
	ID         string
	SequenceNo int
	Payload    O
	Error      error
}

type JobOutputStream added in v0.4.0

type JobOutputStream[O any] chan JobOutput[O]

type JobOutputStreamR added in v0.4.0

type JobOutputStreamR[O any] <-chan JobOutput[O]

type JobOutputStreamW added in v0.4.0

type JobOutputStreamW[O any] chan<- JobOutput[O]

type JobStream

type JobStream[I any] chan Job[I]

type JobStreamR

type JobStreamR[I any] <-chan Job[I]

type JobStreamW

type JobStreamW[I any] chan<- Job[I]

type ManifoldFunc added in v0.6.0

type ManifoldFunc[I, O any] func(input I) (O, error)

ManifoldFunc is the pre-defined function registered with the worker pool, executed for each incoming job.

type ManifoldFuncPool added in v0.6.0

type ManifoldFuncPool[I, O any] struct {
	// contains filtered or unexported fields
}

ManifoldFuncPool is a wrapper around the underlying ants function based worker pool. The client is expected to create an output channel to receive the outputs of executing jobs in the worker pool. If the output channel is not defined, then jobs will still be executed, but the output of which will not be sent, also losing job execution error status.

func NewManifoldFuncPool added in v0.6.0

func NewManifoldFuncPool[I, O any](ctx context.Context,
	mf ManifoldFunc[I, O],
	wg WaitGroup,
	options ...Option,
) (*ManifoldFuncPool[I, O], error)

NewManifoldFuncPool creates a new manifold function based worker pool.

func (*ManifoldFuncPool) CancelCh added in v0.6.0

func (p *ManifoldFuncPool) CancelCh() CancelStreamR

CancelCh

func (*ManifoldFuncPool[I, O]) Conclude added in v0.6.0

func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context)

Conclude signifies to the worker pool that no more work will be submitted. When submitting to the pool directly using the Post method, the client must call this method. Failure to do so will result in a pool that never ends. When the client elects to use an input channel, by invoking Source, then Conclude will be called automatically as long as the input channel has been closed. Failure to close the channel will again result in a never ending worker pool.

func (*ManifoldFuncPool) Observe added in v0.6.0

func (p *ManifoldFuncPool) Observe() JobOutputStreamR[O]

Observe

func (*ManifoldFuncPool[I, O]) Post added in v0.6.0

func (p *ManifoldFuncPool[I, O]) Post(ctx context.Context, input I) error

Post allows the client to submit to the work pool represented by input values of type I.

func (*ManifoldFuncPool) Release added in v0.6.0

func (p *ManifoldFuncPool) Release(ctx context.Context)

Release closes this pool and releases the worker queue.

func (*ManifoldFuncPool) Running added in v0.6.0

func (p *ManifoldFuncPool) Running() int

Running returns the number of workers currently running.

func (*ManifoldFuncPool[I, O]) Source added in v0.6.0

func (p *ManifoldFuncPool[I, O]) Source(ctx context.Context,
	wg WaitGroup,
) SourceStreamW[I]

Source returns an input stream through which the client can submit jobs to the pool. Using an input stream vs invoking Post is mutually exclusive; that is to say, if Source is called, then Post must not be called; any such invocations will be ignored.

func (*ManifoldFuncPool) Waiting added in v0.6.0

func (p *ManifoldFuncPool) Waiting() int

Waiting returns the number of tasks waiting to be executed.

type NewWorkerPoolParamsL added in v0.6.0

type NewWorkerPoolParamsL[I, O any] struct {
	NoWorkers       int
	OutputChTimeout time.Duration
	Exec            ExecutiveFunc[I, O]
	JobsCh          JobStream[I]
	CancelCh        CancelStream
	WaitAQ          AnnotatedWgAQ
	Logger          *slog.Logger
}

NewWorkerPoolParamsL Deprecated: use ants base worker-pool instead.

type OnCancel added in v0.6.0

type OnCancel func()

OnCancel is the callback required by StartCancellationMonitor

type Option added in v0.6.0

type Option = ants.Option

type Options added in v0.6.0

type Options = ants.Options

type PoolFunc added in v0.6.0

type PoolFunc = ants.PoolFunc

type PoolResult added in v0.4.0

type PoolResult struct {
	Error error
}

type PoolResultStream added in v0.4.0

type PoolResultStream = chan *PoolResult

type PoolResultStreamR added in v0.4.0

type PoolResultStreamR = <-chan *PoolResult

type PoolResultStreamW added in v0.4.0

type PoolResultStreamW = chan<- *PoolResult

type Sequential added in v0.6.0

type Sequential struct {
	Format string
	// contains filtered or unexported fields
}

func (*Sequential) Generate added in v0.6.0

func (g *Sequential) Generate() string

type SourceStream added in v0.6.0

type SourceStream[I any] chan I

type SourceStreamR added in v0.6.0

type SourceStreamR[I any] <-chan I

type SourceStreamW added in v0.6.0

type SourceStreamW[I any] chan<- I

type TaskFunc added in v0.6.0

type TaskFunc = ants.TaskFunc

type TaskPool added in v0.6.0

type TaskPool[I, O any] struct {
	// contains filtered or unexported fields
}

func NewTaskPool added in v0.6.0

func NewTaskPool[I, O any](ctx context.Context,
	wg WaitGroup,
	options ...Option,
) (*TaskPool[I, O], error)

NewTaskPool creates a new worker pool using the native ants interface; ie new jobs are submitted with Submit(task TaskFunc)

func (*TaskPool) CancelCh added in v0.6.0

func (p *TaskPool) CancelCh() CancelStreamR

CancelCh

func (*TaskPool) Observe added in v0.6.0

func (p *TaskPool) Observe() JobOutputStreamR[O]

Observe

func (*TaskPool) Post added in v0.6.0

func (p *TaskPool) Post(ctx context.Context, task TaskFunc) error

Post submits a task to the pool.

func (*TaskPool) Release added in v0.6.0

func (p *TaskPool) Release(ctx context.Context)

Release closes this pool and releases the worker queue.

func (*TaskPool) Running added in v0.6.0

func (p *TaskPool) Running() int

Running returns the number of workers currently running.

func (*TaskPool) Waiting added in v0.6.0

func (p *TaskPool) Waiting() int

Waiting returns the number of tasks waiting to be executed.

type TrackableWaitGroup added in v0.6.1

type TrackableWaitGroup struct {
	// contains filtered or unexported fields
}

TrackableWaitGroup

func (*TrackableWaitGroup) Add added in v0.6.1

func (t *TrackableWaitGroup) Add(delta int)

Add

func (*TrackableWaitGroup) Count added in v0.6.1

func (t *TrackableWaitGroup) Count() int32

func (*TrackableWaitGroup) Done added in v0.6.1

func (t *TrackableWaitGroup) Done()

Done

func (*TrackableWaitGroup) Wait added in v0.6.1

func (t *TrackableWaitGroup) Wait()

Wait

type Tracker added in v0.6.1

type Tracker func(count int32)

Tracker

type WaitGroup added in v0.6.1

type WaitGroup interface {
	Add(delta int)
	Done()
	Wait()
}

WaitGroup allows the core sync.WaitGroup to be decorated by the client for debugging purposes.

func TrackWaitGroup added in v0.6.1

func TrackWaitGroup(wg *sync.WaitGroup, add, done Tracker) WaitGroup

TrackWaitGroup returns a trackable wait group for the native sync wait group specified; useful for debugging purposes.

type WaitGroupAn

WaitGroupAn the extended WaitGroup Deprecated: use ants base worker-pool instead.

func NewAnnotatedWaitGroup

func NewAnnotatedWaitGroup(name string, log ...*slog.Logger) WaitGroupAn

NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing the core WaitGroup instance. Deprecated: use ants base worker-pool instead.

type WaitGroupName

type WaitGroupName string

type WorkerPoolL added in v0.6.0

type WorkerPoolL[I, O any] struct {
	RoutineName GoRoutineName
	WaitAQ      AnnotatedWgAQ
	ResultInCh  PoolResultStreamR
	Logger      *slog.Logger
	// contains filtered or unexported fields
}

WorkerPoolL owns the resultOut channel, because it is the only entity that knows when all workers have completed their work due to the finished channel, which it also owns. Deprecated: use ants base worker-pool instead.

func NewWorkerPoolL added in v0.6.0

func NewWorkerPoolL[I, O any](params *NewWorkerPoolParamsL[I, O]) *WorkerPoolL[I, O]

NewWorkerPoolL Deprecated: use ants base worker-pool instead.

func (*WorkerPoolL[I, O]) Start added in v0.6.0

func (p *WorkerPoolL[I, O]) Start(
	parentContext context.Context,
	parentCancel context.CancelFunc,
	outputsChOut chan<- JobOutput[O],
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL