pool

package
v1.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2024 License: Apache-2.0 Imports: 10 Imported by: 1

Documentation

Overview

Package pool provies an API to queue and concurrently execute a series of tasks or work.

Index

Examples

Constants

View Source
const (
	// StoppedStatus is the default status for a pool. The pool is stopped
	// and no work is being processed.
	StoppedStatus = iota
	// StartingStatus represents the pool status when it's being started.
	// This means that the workers start processing the work incrementally.
	StartingStatus
	// StartedStatus all the workers have been started.
	StartedStatus
	// IdleStatus when all the workers have been started but no work is
	// being processed.
	IdleStatus
	// FinishedStatus all the workers are started and the queued work has
	// completely been processed.
	FinishedStatus
	// StoppingStatus represents the pool status when it's being stopped.
	// Some items might still be in flight.
	StoppingStatus
	// StoppedTimeout represents the pool status when has been stopped and
	// some of the workers have been forcefully stopped. This means that the
	// work that was being done by N workers was not finished, so the user that
	// consumes the Pool object might want to perform some checks or clean ups
	// to check which work wasn't completed.
	StoppedTimeout
	// StoppedSuccess represents the pool status when has been stopped without
	// hitting the stop timeout. The pool can still contain queued events that
	// have been moved to the leftover list.
	StoppedSuccess
)

Variables

View Source
var (
	// ErrAddOperationTimedOut is returned when the add timeout is exceeded
	ErrAddOperationTimedOut = errors.New("pool: failed adding work, queue full")
	// ErrStopOperationTimedOut is returned when the stop timeout is exceeded
	ErrStopOperationTimedOut = errors.New("pool: stop timeout exceeded")
	// ErrAlreadyStarted is returned when Start called on a non stopped pool
	ErrAlreadyStarted = errors.New("pool: cannot start a non stopped pool")
	// ErrAlreadyStopped is returned when Stop called on a stopped pool
	ErrAlreadyStopped = errors.New("pool: cannot stop an already stopped pool")
	// ErrAlreadyStopping is returned when Stop called on a stopping pool
	ErrAlreadyStopping = errors.New("pool: cannot stop a stopping pool")
	// ErrCannotAddWorkToStoppingPool is returned when work is added to a stopped
	// pool
	ErrCannotAddWorkToStoppingPool = errors.New("pool: cannot add work to stopping pool")
	// ErrCannotWaitOnStoppedPool is thrown by Wait() when the pool is stopped.
	ErrCannotWaitOnStoppedPool = errors.New("pool: cannot wait for workers to finish on a stopped pool")
	// ErrCannotGetLeftovers is returned when the pool is not in a stopped state.
	ErrCannotGetLeftovers = errors.New("pool: cannot get the work leftovers on a non stopped pool")
)
View Source
var DefaultTimeout = Timeout{
	Add:  time.Second * 30,
	Stop: time.Second * 30,
}

DefaultTimeout is a set of default timeout settings that can be used to create a new Pool.

Functions

func StartWorker

func StartWorker(worker Worker) chan struct{}

StartWorker starts a worker in the background waiting for the goroutine to actually be schedules. It returns a channel that can be used to wait until the Goroutine has been run as in the code below:

wait := StartWorker(Worker{})
// This blocks execution
<-wait

func StatusText

func StatusText(status uint32) string

StatusText obtains the current pool status as a string, for all available states see the statusMap which contains the mappings from int to string.

func StopWorkers

func StopWorkers(params StopParams) error

StopWorkers stops all of the workers in parallel trying to honor their timeout settings. If the worker cannot be stopped before the params.timeout the function returns ErrStopOperationTimedOut.

func Work

func Work(worker Worker)

Work receives a Worker structure that contains the configuration to control the worker behaviour. It processes work using the worker.run function on worker.queue receive. When a stop signal is received it will wait the time.Duration defined by the stopTimeout and forcefully exit without waiting for the work to be completed.

Types

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter represents a safe uint32 that can be used as a shared counter.

func (*Counter) Add

func (c *Counter) Add(incr uint32)

Add increments the counter

func (*Counter) Get

func (c *Counter) Get() uint32

Get obtains the value of the counter

func (*Counter) Set

func (c *Counter) Set(n uint32)

Set overwrites the value of the counter in favour of the passed value.

type Errors

type Errors struct {
	// contains filtered or unexported fields
}

Errors wraps a multierror.Error with a Mutex so that it can be safely used when accesses

func (*Errors) Add

func (e *Errors) Add(errs ...error)

Add appends a new error to the error list

func (*Errors) Error

func (e *Errors) Error() error

Error returns an error or nil

type Params

type Params struct {
	// Size controls how many concurrent operations are running at the same
	// time.
	Size uint16
	// Run represents the actual function that will be run by each worker in
	// the worker pool.
	Run RunFunc
	// Timeout structure that controls the different timeout times.
	Timeout Timeout

	// Writer is the device where any (log, info) messages will be sent.
	Writer io.Writer

	// FailFast can be set to stop all the pool when any of the workers returns
	// with an error.
	FailFast bool
}

Params is used to configure a Pool

func (Params) Validate

func (params Params) Validate() error

Validate verifies that the parameters are valid and returns a multierror if any invalid parameters are found.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a generic worker pool implementation that can be used to complete a series of tasks concurrently and obtain any errors that have been returned by the workers. The usage of the pool is quite simple by itself and relies on the constructor function NewPool().

Example

This example shows how to create a new Pool and work with it.

package main

import (
	"fmt"
	"io"
	"os"
	"time"

	"github.com/elastic/cloud-sdk-go/pkg/sync/pool"
)

type nilValidator struct {
	err error
}

func (v nilValidator) Validate() error { return v.err }

var output io.Writer = os.Stdout

func main() {
	p, err := pool.NewPool(pool.Params{
		Size: 2,
		Run: func(params pool.Validator) error {
			<-time.After(time.Millisecond * 10)
			return params.Validate()
		},
		Timeout: pool.Timeout{
			Add:  time.Millisecond,
			Stop: time.Millisecond,
		},
	})
	if err != nil {
		panic(err)
	}

	fmt.Fprintln(output, pool.StatusText(p.Status()))

	// Start the pool so the workers start processing
	if err := p.Start(); err != nil {
		panic(err)
	}

	fmt.Fprintln(output, pool.StatusText(p.Status()))

	var work = []pool.Validator{
		new(nilValidator),
		new(nilValidator),
	}
	// Try to add work
	leftovers, err := p.Add(work...)
	if err != nil && err != pool.ErrAddOperationTimedOut {
		panic(err)
	}

	// Ensure there's no leftovers
	if len(leftovers) > 0 {
		for {
			leftovers, _ := p.Add(leftovers...)
			if len(leftovers) == 0 {
				break
			}
		}
	}

	<-time.After(time.Millisecond)
	fmt.Fprintln(output, pool.StatusText(p.Status()))

	// Wait until all of the work is consumed
	if err = p.Wait(); err != nil {
		fmt.Fprintln(output, "execution errors:", err.Error())
	}

	fmt.Fprintln(output, pool.StatusText(p.Status()))

	p.Stop()

	<-time.After(time.Millisecond)
	fmt.Fprintln(output, pool.StatusText(p.Status()))
	l, err := p.Leftovers()
	fmt.Fprintln(output, "leftovers:", len(l), "leftovers errors:", err)
}
Output:

Example (Failfast)

This example shows how to create a new Pool which stops processing work when an error is returned by a worker.

package main

import (
	"errors"
	"fmt"
	"io"
	"os"
	"time"

	"github.com/elastic/cloud-sdk-go/pkg/sync/pool"
)

type nilValidator struct {
	err error
}

func (v nilValidator) Validate() error { return v.err }

var output io.Writer = os.Stdout

func main() {
	p, err := pool.NewPool(pool.Params{
		Size: 2,
		Run: func(params pool.Validator) error {
			<-time.After(time.Millisecond * 10)
			return params.Validate()
		},
		Timeout: pool.Timeout{
			Add:  time.Millisecond,
			Stop: time.Millisecond,
		},
		// Setting FailFast will cause the pool to stop processing the queued
		// work and return the worker error when a worker returns with error.
		FailFast: true,
	})
	if err != nil {
		panic(err)
	}

	fmt.Fprintln(output, pool.StatusText(p.Status()))

	// Start the pool so the workers start processing
	if err := p.Start(); err != nil {
		panic(err)
	}

	fmt.Fprintln(output, pool.StatusText(p.Status()))

	var work = []pool.Validator{
		new(nilValidator),
		&nilValidator{err: errors.New("first error")},
		new(nilValidator),
		new(nilValidator),
		&nilValidator{err: errors.New("last error")},
	}
	// Try to add work
	leftovers, err := p.Add(work...)
	if err != nil && err != pool.ErrAddOperationTimedOut {
		panic(err)
	}

	// Ensure there's no leftovers
	if len(leftovers) > 0 {
		for {
			leftovers, _ := p.Add(leftovers...)
			if len(leftovers) == 0 {
				break
			}
		}
	}

	<-time.After(time.Millisecond)
	fmt.Fprintln(output, pool.StatusText(p.Status()))

	// Wait until all of the work is consumed
	if err = p.Wait(); err != nil {
		fmt.Fprintln(output, "execution errors:", err.Error())
	}

	fmt.Fprintln(output, pool.StatusText(p.Status()))

	p.Stop()

	<-time.After(time.Millisecond)
	fmt.Fprintln(output, pool.StatusText(p.Status()))
	l, err := p.Leftovers()
	fmt.Fprintln(output, "leftovers:", len(l), "leftovers errors:", err)
}
Output:

func NewPool

func NewPool(params Params) (*Pool, error)

NewPool initializes a new Pool from a set of parameters.

func (*Pool) Add

func (p *Pool) Add(work ...Validator) ([]Validator, error)

Add adds N amount of work to the pool's queue, timing out if the queue is full for more than the defined timeout.Add. If an error is returned it will be ErrAddingOperationTimedOut, meaning that the first parameter is the list of work that didn't get added, leaving any possible retries to add work to the user.

func (*Pool) Leftovers

func (p *Pool) Leftovers() ([]Validator, error)

Leftovers obtains a list of unfinished work with the following order: First it returns any items that might have been in flight and did not complete before hitting the stop timeout. Following those, the items that did not get processed by a worker. This function can only be called after the pool has been stopped.

func (*Pool) Result

func (p *Pool) Result() error

Result returns the results from the work that was done by the workers, namely returns any error in the multierror format.

func (*Pool) Start

func (p *Pool) Start() error

Start starts the workers in the worker pool, and starts all of the internal goroutines that the pool relies in.

func (*Pool) Status

func (p *Pool) Status() uint32

Status returns the numeric status of the pool

func (*Pool) Stop

func (p *Pool) Stop() error

Stop attempts to gracefully shutdown the workers from the pool. If the stop timeout is reached, the work that was being processed by the worker is sent to the leftover queue as are any items that were not processed, returning ErrStopOperationTimedOut.

func (*Pool) Wait

func (p *Pool) Wait() error

Wait will stop execution until the pool has finished processing all of the work that it had in the queue. It returns any errors that the workers might have returned.

type RunFunc

type RunFunc func(params Validator) error

RunFunc is the function that the pool will use as the worker.

type Signals

type Signals struct {
	// Stop a channel that is used to signal workers to to be Stop.
	Stop chan struct{}
	// Stopped is a channel that is used for backwards communication with the
	// stopper to verify that the worker has been Stopped.
	Stopped chan bool
	// Finish is a channel used by workers to signal that they've finished
	// processing a task.
	Finish chan struct{}
	// Added is a channel used by the pool to signal that a work item has been
	// pushed to the queue for processing.
	Added chan struct{}
	// StopMonitor is used to stop the monitoring goroutine that updates the
	// pool's internal state.
	StopMonitor chan struct{}
}

Signals contains all of the channels that are used to trigger different status changes in the Pool.

type State

type State struct {
	// Number of work items that have been added to the queue.
	Queued Counter
	// Number of work items that have been Processed by a worker.
	Processed Counter
	// Pool global Status
	Status Counter

	// Errors that have been returned by the worker.
	Errors *Errors
	// contains filtered or unexported fields
}

State contains the pool State

type StopParams

type StopParams struct {
	// number of workers to stop
	Size int
	// Stop is used to send a signal to a worker to make it stop.
	Stop chan<- struct{}
	// Stopped is a signal given back by the worker that is being stopped
	// when it has stopped, either timing out or successfully.
	StoppedWithTimeout <-chan bool
}

StopParams is consumed by StopWorkers so a set of workers can be stopped.

type Timeout

type Timeout struct {
	// Add timeout per Add operation, used to add items to the queue, the
	// timeout is evaluated per work item. Must be greater than 0s.
	Add time.Duration
	// Stop timeout that is used when stopping the workers, this timeout
	// is evaluated per worker, so the global timeout is * N workers. Must be
	// greater than 0s.
	Stop time.Duration
}

Timeout is an object that encloses different Pool operation timeouts.

type Validator

type Validator interface {
	Validate() error
}

Validator interface is consumed by the RunFunc.

type Worker

type Worker struct {
	// Work Queue where the worker obtains its work.
	Queue <-chan Validator
	// Stop channel is used to signal a worker to Stop processing items from
	// the queue.
	Stop <-chan struct{}
	// Stopped is the channel used to communicate back with the stopper to signal
	// that the worker has been successfully Stopped.
	Stopped chan<- bool
	// Finished channel is used to signal that the work has been completed. If
	// the worker happens to be stopping and the stopTimeout is hit before the
	// item has been processed by the run function the Finished signal is also
	// sent.
	Finished chan<- struct{}
	// error channel where the errors from the work will land.
	Errors chan<- error
	// Leftovers of any incompleted work items that the worker couldn't finish
	// before hitting the stop timeout.
	Leftovers chan<- Validator
	// Run is the function that the worker will Run on each work item received.
	Run RunFunc
	// controls the time.Duration to wait when a stop signal is received. If the
	// timeout is exceeded before the work is completed the current work will be
	// sent to the leftover queue.
	StopTimeout time.Duration
}

Worker is the structure that contains the configuration that a worker uses when it's spawned.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL