workpool

package
v0.0.0-...-66343a0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2024 License: BSD-3-Clause, BSD-3-Clause Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrorIgnore = ErrorCallback(func(error) {})

ErrorIgnore is an ErrorHandler that will ignore all errors.

Functions

func DefaultOptions

func DefaultOptions() *options

Types

type ErrorCallback

type ErrorCallback func(error)

func ErrorLog

func ErrorLog(log logger.Printer) ErrorCallback

ErrorLog returns an ErrorHandler that will just log the error message, and ignore it.

func ErrorStore

func ErrorStore(dest *error) ErrorCallback

ErrorStore returns an ErrorHandler that will store the returned error into the specified pointer.

func (ErrorCallback) Handle

func (c ErrorCallback) Handle(err error)

type ErrorHandler

type ErrorHandler interface {
	Handle(error)
}

type ErrorResult

type ErrorResult struct {
	*Result
}

func ErrorRetriever

func ErrorRetriever() *ErrorResult

ErrorResult returns an ErrorHandler that allows to retrieve the error returned by the work function.

To use it:

delayedError := ErrorRetriever()
wp.Add(WithError(error, delayedError))
...
err := delayedError.Get()

Note that the Get function will block until the error has been returned.

func (*ErrorResult) Get

func (ep *ErrorResult) Get() error

func (*ErrorResult) Handle

func (ep *ErrorResult) Handle(result error)

type ErrorWork

type ErrorWork func() error

ErrorWork represents Work that can return an error.

Functions returning an error cannot be passed directly to a workpool or a scheduler. Instead, you can wrap those functions in WithRetry() or WithError(), defined below.

type Flags

type Flags struct {
	QueueSize          int
	ImmediateQueueSize int
	Workers            int
}

func DefaultFlags

func DefaultFlags() *Flags

func (*Flags) Register

func (cf *Flags) Register(set kflags.FlagSet, prefix string) *Flags

type Modifier

type Modifier func(*options) error

func FromFlags

func FromFlags(flags *Flags) Modifier

func WithImmediateQueueSize

func WithImmediateQueueSize(size int) Modifier

func WithQueueSize

func WithQueueSize(size int) Modifier

func WithWaitGroup

func WithWaitGroup(wg *sync.WaitGroup) Modifier

func WithWorkers

func WithWorkers(size int) Modifier

type Modifiers

type Modifiers []Modifier

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result maintains state about the result of work passed to WithResult.

Create Result objects ResultRetriever.

func ResultRetriever

func ResultRetriever() *Result

ResultRetriever creates a result object.

func (*Result) Get

func (p *Result) Get() interface{}

Get returns the value returned by the executed work. It will block until the Work has run.

func (*Result) Handle

func (p *Result) Handle(result interface{})

Handle implements the ResultHandler interface, is invoked internally by WithResult to feed a value.

type ResultCallback

type ResultCallback func(interface{})

ResultCallback invokes the specified callback as soon as the result is ready.

Note that the callback will block the worker until completion. If this is not desireable, make sure your callback just schedules a goroutine.

Use it like:

..., WithResult(work, ResultCallback(handler))

func (ResultCallback) Handle

func (c ResultCallback) Handle(result interface{})

type ResultHandler

type ResultHandler interface {
	Handle(interface{})
}

ResultHandler is the interface used by WithResult to handle a result.

WithResult will just call the Handle() function as soon as the result of the Work is available for consumption.

type ResultWork

type ResultWork func() interface{}

ResultWork represents a function that returns some value (eg, an error, a string, ...).

type Work

type Work func()

func InGoRoutine

func InGoRoutine(work Work) Work

InGoRoutine will spawn a seperate go routine to run the Work.

This is an anti-pattern for WorkPool, as the whole point of WorkPool is to have a fixed pool of coroutines to complete the work. However, this function is convenient when dealing with events scheduled via the scheduler package, or when there are large chunks of work that benefit from being queued within a WorkPool.

func WithError

func WithError(w ErrorWork, h ErrorHandler) Work

func WithResult

func WithResult(w ResultWork, h ResultHandler) Work

WithResult collects the return value of a function and makes it available through a ResultHandler.

For example, by doing:

result := ResultRetriever()
workpool.Add(WithResult(func() interface{} { return "hello" }, result))

To retrieve the returned value, you can then run:

value := result.Get().(string)

Get() will block until the work is completed.

As an argument for WithResult, you can use anything implementing the ResultHandler interface, ResultRetriever() and ResultCallback() being the main implementations.

To use ResultRetriever with functions returning multiple values, just wrap the returned values in an object.

func WithRetry

func WithRetry(retries *retry.Options, sched *scheduler.Scheduler, wp *WorkPool, work ErrorWork, eh ErrorHandler) Work

WithRetry will retry running the specified ErrorWork until it either succeeds, or the number of attempts has been exceeded.

In case of failure, the ErrorHandler specified is invoked.

This function is fully asynchronous: rather than block the worker thread until the attempts have been exhausted and the timers expired, it uses a scheduler to retry later, while freeing the worker thread.

sched indicates a scheduler to use. wp indicates a WorkPool to use to re-schedule subsequent attempts. eh indicates what to do if - even after all attempts - the function is still failing.

type WorkPool

type WorkPool struct {
	// contains filtered or unexported fields
}

func New

func New(mods ...Modifier) (*WorkPool, error)

Creates a new WorkPool.

func (*WorkPool) Add

func (wp *WorkPool) Add(work Work)

Add adds work to be completed from one of the goroutines managed by the WorkPool.

func (*WorkPool) AddImmediate

func (wp *WorkPool) AddImmediate(work Work)

AddImmediate is just like Add: it adds work to be completed from one of the goroutines managed by the WorkPool.

The difference between Add and AddImmediate is that they use two different queues. Assuming that you normally use Add to queue your work, calling AddImmediate would bypass any work queued by Add, and have some work run as soon as a worker becomes available, rather than at the end of the queue.

func (*WorkPool) Cancel

func (wp *WorkPool) Cancel()

Cancel causes the WorkPool to stop processing any work immediately, and terminate all the workers. The WorkPool can no longer be used after Cancel() is called.

func (*WorkPool) Do

func (wp *WorkPool) Do()

Do runs an infinite loop processing all the work requested.

Normally, Do() is invoked with 'go wp.Do()' from New, but you can call 'go wp.Do()' manually to spawn more workers.

func (*WorkPool) Done

func (wp *WorkPool) Done()

Done waits for all the work queued to be completed, to then terminate all the workers. The WorkPool can no longer be used after Done() is called.

func (*WorkPool) Wait

func (wp *WorkPool) Wait()

Wait blocks until all the work queued in the configured WaitGroup is completed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL