Documentation ¶
Overview ¶
Package concurrency provides common concurrency patterns and utilities.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var Stopped = errors.New("stopped")
Stopped is a special error value is signals that the runnable is stopped
Functions ¶
func NewRetryableError ¶
NewRetryableError is a convenience to wrap another error in a retryable
Types ¶
type AsyncRunnable ¶
type AsyncRunnable interface { Runnable StopWait() }
AsyncRunnable is a runnable which is can run asynchrounously
type Retryable ¶
type Retryable interface {
Retryable() bool
}
Retryable is an interface which describes whether something is retryable
type RetryableError ¶
type RetryableError struct {
Err error
}
RetryableError is an error which is retryable
func (RetryableError) Error ¶
func (e RetryableError) Error() string
type Runnable ¶
type Runnable interface { Start() error Stop() }
Runnable describes something which can start and stop.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is an implementation of a start/stoppable worker pool.
In this implementation, jobs are essentially tokens to perform some work. Jobs are not delivered to the pool, but instead 'claimed' by a worker, and 'returned' when finished (technically, a new job is posted to the queue). In this case, the worker itself is the one which determines the work it should do. This gives a lot of flexibility when combined with implementations of Worker, and maintains a generic but type safe implementation.
It uses a number of channels to control the concurrency: - jobs is a buffered channel that signals that a worker should process a job - results signals that a result was computed by work() - errors collects any errors from work(). An error on the channel will stop the ingester - done is used to signal when the ingester has totally stopped (i.e. all workers drained)
func NewWorkerPool ¶
func NewWorkerPool(logger *zap.Logger, numWorkers int, w Worker) *WorkerPool
NewWorkerPool creates a new worker-pool
func (*WorkerPool) Start ¶
func (s *WorkerPool) Start() error
Start makes the ingester-pool start to process messages.
It continually loops and looks for either a result, in which case it adds another job to the pool to be processed, or an error, in which case it stops the ingester-pool, waits for the workers to drain, then signals that it is done.
func (*WorkerPool) Stop ¶
func (s *WorkerPool) Stop()
Stop signals the ingester-pool to stop processing new messages. Use StopWait to wait until all messages are processed
func (*WorkerPool) StopWait ¶
func (s *WorkerPool) StopWait()
StopWait starts the process of stopping, and waits for all workers to stop before returning.