Documentation ¶
Overview ¶
Package pool provies an API to queue and concurrently execute a series of tasks or work.
Index ¶
- Constants
- Variables
- func StartWorker(worker Worker) chan struct{}
- func StatusText(status uint32) string
- func StopWorkers(params StopParams) error
- func Work(worker Worker)
- type Counter
- type Errors
- type Params
- type Pool
- type RunFunc
- type Signals
- type State
- type StopParams
- type Timeout
- type Validator
- type Worker
Examples ¶
Constants ¶
const ( // StoppedStatus is the default status for a pool. The pool is stopped // and no work is being processed. StoppedStatus = iota // StartingStatus represents the pool status when it's being started. // This means that the workers start processing the work incrementally. StartingStatus // StartedStatus all the workers have been started. StartedStatus // IdleStatus when all the workers have been started but no work is // being processed. IdleStatus // FinishedStatus all the workers are started and the queued work has // completely been processed. FinishedStatus // StoppingStatus represents the pool status when it's being stopped. // Some items might still be in flight. StoppingStatus // StoppedTimeout represents the pool status when has been stopped and // some of the workers have been forcefully stopped. This means that the // work that was being done by N workers was not finished, so the user that // consumes the Pool object might want to perform some checks or clean ups // to check which work wasn't completed. StoppedTimeout // StoppedSuccess represents the pool status when has been stopped without // hitting the stop timeout. The pool can still contain queued events that // have been moved to the leftover list. StoppedSuccess )
Variables ¶
var ( // ErrAddOperationTimedOut is returned when the add timeout is exceeded ErrAddOperationTimedOut = errors.New("pool: failed adding work, queue full") // ErrStopOperationTimedOut is returned when the stop timeout is exceeded ErrStopOperationTimedOut = errors.New("pool: stop timeout exceeded") // ErrAlreadyStarted is returned when Start called on a non stopped pool ErrAlreadyStarted = errors.New("pool: cannot start a non stopped pool") // ErrAlreadyStopped is returned when Stop called on a stopped pool ErrAlreadyStopped = errors.New("pool: cannot stop an already stopped pool") // ErrAlreadyStopping is returned when Stop called on a stopping pool ErrAlreadyStopping = errors.New("pool: cannot stop a stopping pool") // ErrCannotAddWorkToStoppingPool is returned when work is added to a stopped // pool ErrCannotAddWorkToStoppingPool = errors.New("pool: cannot add work to stopping pool") // ErrCannotWaitOnStoppedPool is thrown by Wait() when the pool is stopped. ErrCannotWaitOnStoppedPool = errors.New("pool: cannot wait for workers to finish on a stopped pool") // ErrCannotGetLeftovers is returned when the pool is not in a stopped state. ErrCannotGetLeftovers = errors.New("pool: cannot get the work leftovers on a non stopped pool") )
DefaultTimeout is a set of default timeout settings that can be used to create a new Pool.
Functions ¶
func StartWorker ¶
func StartWorker(worker Worker) chan struct{}
StartWorker starts a worker in the background waiting for the goroutine to actually be schedules. It returns a channel that can be used to wait until the Goroutine has been run as in the code below:
wait := StartWorker(Worker{}) // This blocks execution <-wait
func StatusText ¶
StatusText obtains the current pool status as a string, for all available states see the statusMap which contains the mappings from int to string.
func StopWorkers ¶
func StopWorkers(params StopParams) error
StopWorkers stops all of the workers in parallel trying to honor their timeout settings. If the worker cannot be stopped before the params.timeout the function returns ErrStopOperationTimedOut.
func Work ¶
func Work(worker Worker)
Work receives a Worker structure that contains the configuration to control the worker behaviour. It processes work using the worker.run function on worker.queue receive. When a stop signal is received it will wait the time.Duration defined by the stopTimeout and forcefully exit without waiting for the work to be completed.
Types ¶
type Counter ¶
type Counter struct {
// contains filtered or unexported fields
}
Counter represents a safe uint32 that can be used as a shared counter.
type Errors ¶
type Errors struct {
// contains filtered or unexported fields
}
Errors wraps a multierror.Error with a Mutex so that it can be safely used when accesses
type Params ¶
type Params struct { // Size controls how many concurrent operations are running at the same // time. Size uint16 // Run represents the actual function that will be run by each worker in // the worker pool. Run RunFunc // Timeout structure that controls the different timeout times. Timeout Timeout // Writer is the device where any (log, info) messages will be sent. Writer io.Writer // FailFast can be set to stop all the pool when any of the workers returns // with an error. FailFast bool }
Params is used to configure a Pool
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a generic worker pool implementation that can be used to complete a series of tasks concurrently and obtain any errors that have been returned by the workers. The usage of the pool is quite simple by itself and relies on the constructor function NewPool().
Example ¶
This example shows how to create a new Pool and work with it.
package main import ( "fmt" "io" "os" "time" "github.com/elastic/cloud-sdk-go/pkg/sync/pool" ) type nilValidator struct { err error } func (v nilValidator) Validate() error { return v.err } var output io.Writer = os.Stdout func main() { p, err := pool.NewPool(pool.Params{ Size: 2, Run: func(params pool.Validator) error { <-time.After(time.Millisecond * 10) return params.Validate() }, Timeout: pool.Timeout{ Add: time.Millisecond, Stop: time.Millisecond, }, }) if err != nil { panic(err) } fmt.Fprintln(output, pool.StatusText(p.Status())) // Start the pool so the workers start processing if err := p.Start(); err != nil { panic(err) } fmt.Fprintln(output, pool.StatusText(p.Status())) var work = []pool.Validator{ new(nilValidator), new(nilValidator), } // Try to add work leftovers, err := p.Add(work...) if err != nil && err != pool.ErrAddOperationTimedOut { panic(err) } // Ensure there's no leftovers if len(leftovers) > 0 { for { leftovers, _ := p.Add(leftovers...) if len(leftovers) == 0 { break } } } <-time.After(time.Millisecond) fmt.Fprintln(output, pool.StatusText(p.Status())) // Wait until all of the work is consumed if err = p.Wait(); err != nil { fmt.Fprintln(output, "execution errors:", err.Error()) } fmt.Fprintln(output, pool.StatusText(p.Status())) p.Stop() <-time.After(time.Millisecond) fmt.Fprintln(output, pool.StatusText(p.Status())) l, err := p.Leftovers() fmt.Fprintln(output, "leftovers:", len(l), "leftovers errors:", err) }
Output:
Example (Failfast) ¶
This example shows how to create a new Pool which stops processing work when an error is returned by a worker.
package main import ( "errors" "fmt" "io" "os" "time" "github.com/elastic/cloud-sdk-go/pkg/sync/pool" ) type nilValidator struct { err error } func (v nilValidator) Validate() error { return v.err } var output io.Writer = os.Stdout func main() { p, err := pool.NewPool(pool.Params{ Size: 2, Run: func(params pool.Validator) error { <-time.After(time.Millisecond * 10) return params.Validate() }, Timeout: pool.Timeout{ Add: time.Millisecond, Stop: time.Millisecond, }, // Setting FailFast will cause the pool to stop processing the queued // work and return the worker error when a worker returns with error. FailFast: true, }) if err != nil { panic(err) } fmt.Fprintln(output, pool.StatusText(p.Status())) // Start the pool so the workers start processing if err := p.Start(); err != nil { panic(err) } fmt.Fprintln(output, pool.StatusText(p.Status())) var work = []pool.Validator{ new(nilValidator), &nilValidator{err: errors.New("first error")}, new(nilValidator), new(nilValidator), &nilValidator{err: errors.New("last error")}, } // Try to add work leftovers, err := p.Add(work...) if err != nil && err != pool.ErrAddOperationTimedOut { panic(err) } // Ensure there's no leftovers if len(leftovers) > 0 { for { leftovers, _ := p.Add(leftovers...) if len(leftovers) == 0 { break } } } <-time.After(time.Millisecond) fmt.Fprintln(output, pool.StatusText(p.Status())) // Wait until all of the work is consumed if err = p.Wait(); err != nil { fmt.Fprintln(output, "execution errors:", err.Error()) } fmt.Fprintln(output, pool.StatusText(p.Status())) p.Stop() <-time.After(time.Millisecond) fmt.Fprintln(output, pool.StatusText(p.Status())) l, err := p.Leftovers() fmt.Fprintln(output, "leftovers:", len(l), "leftovers errors:", err) }
Output:
func (*Pool) Add ¶
Add adds N amount of work to the pool's queue, timing out if the queue is full for more than the defined timeout.Add. If an error is returned it will be ErrAddingOperationTimedOut, meaning that the first parameter is the list of work that didn't get added, leaving any possible retries to add work to the user.
func (*Pool) Leftovers ¶
Leftovers obtains a list of unfinished work with the following order: First it returns any items that might have been in flight and did not complete before hitting the stop timeout. Following those, the items that did not get processed by a worker. This function can only be called after the pool has been stopped.
func (*Pool) Result ¶
Result returns the results from the work that was done by the workers, namely returns any error in the multierror format.
func (*Pool) Start ¶
Start starts the workers in the worker pool, and starts all of the internal goroutines that the pool relies in.
type Signals ¶
type Signals struct { // Stop a channel that is used to signal workers to to be Stop. Stop chan struct{} // Stopped is a channel that is used for backwards communication with the // stopper to verify that the worker has been Stopped. Stopped chan bool // Finish is a channel used by workers to signal that they've finished // processing a task. Finish chan struct{} // Added is a channel used by the pool to signal that a work item has been // pushed to the queue for processing. Added chan struct{} // StopMonitor is used to stop the monitoring goroutine that updates the // pool's internal state. StopMonitor chan struct{} }
Signals contains all of the channels that are used to trigger different status changes in the Pool.
type State ¶
type State struct { // Number of work items that have been added to the queue. Queued Counter // Number of work items that have been Processed by a worker. Processed Counter // Pool global Status Status Counter // Errors that have been returned by the worker. Errors *Errors // contains filtered or unexported fields }
State contains the pool State
type StopParams ¶
type StopParams struct { // number of workers to stop Size int // Stop is used to send a signal to a worker to make it stop. Stop chan<- struct{} // Stopped is a signal given back by the worker that is being stopped // when it has stopped, either timing out or successfully. StoppedWithTimeout <-chan bool }
StopParams is consumed by StopWorkers so a set of workers can be stopped.
type Timeout ¶
type Timeout struct { // Add timeout per Add operation, used to add items to the queue, the // timeout is evaluated per work item. Must be greater than 0s. Add time.Duration // Stop timeout that is used when stopping the workers, this timeout // is evaluated per worker, so the global timeout is * N workers. Must be // greater than 0s. Stop time.Duration }
Timeout is an object that encloses different Pool operation timeouts.
type Validator ¶
type Validator interface {
Validate() error
}
Validator interface is consumed by the RunFunc.
type Worker ¶
type Worker struct { // Work Queue where the worker obtains its work. Queue <-chan Validator // Stop channel is used to signal a worker to Stop processing items from // the queue. Stop <-chan struct{} // Stopped is the channel used to communicate back with the stopper to signal // that the worker has been successfully Stopped. Stopped chan<- bool // Finished channel is used to signal that the work has been completed. If // the worker happens to be stopping and the stopTimeout is hit before the // item has been processed by the run function the Finished signal is also // sent. Finished chan<- struct{} // error channel where the errors from the work will land. Errors chan<- error // Leftovers of any incompleted work items that the worker couldn't finish // before hitting the stop timeout. Leftovers chan<- Validator // Run is the function that the worker will Run on each work item received. Run RunFunc // controls the time.Duration to wait when a stop signal is received. If the // timeout is exceeded before the work is completed the current work will be // sent to the leftover queue. StopTimeout time.Duration }
Worker is the structure that contains the configuration that a worker uses when it's spawned.