Documentation ¶
Overview ¶
Package workman provides constructs for parallel processing.
getHTTP := func(i int) { res, _ := http.Get(fmt.Sprintf("http://httpbin.org/get?n=%d", i)) data, _ := ioutil.ReadAll(res.Body) res.Body.Close() fmt.Printf("%s", data) } wm, _ := NewWorkManager(4) wm.StartWorkers(getHTTP) for i := 0; i < 10; i++ { wm.SendWork(i) } wm.WaitForCompletion()
Construct a new work manager with the desired number of parallel workers
wm, err := NewWorkManager(10)
When using the WorkManager, it is important to invoke SendWork as if you were invoking your work function directly. That is, if you start workers using a function with the following signature:
wm.StartWorkers(func(n int, line string, active bool) {})
Then you should send it work as follows:
wm.SendWork(10, "Townsend", true) wm.SendWork(12, "Folsom/Pacific", false)
Your function should return an error or nothing
wm.StartWorkers(func() error { return errors.New("bad run") })
The error returned from WaitForCompletion indicates if at least one error occurred during processing. Those errors can be enumerated as follows:
if err := wm.WaitForCompletion(); err != nil { errs := err.All() for _, e := range errs { fmt.Println(e) } }
Set a rate limit by passing an int, representing the number of allowed requests per second across all workers.
wm.SetRateLimit(5)
Happy processing!
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // The user provided an invalid number of workers ErrTooFewWorkers = errors.New("number of workers must exceed 0") // The user provided an invalid work function ErrInvalidWorkFuncType = errors.New("invalid work function type") // Workers have already been started and cannot start again ErrAlreadyStarted = errors.New("workers already started") // Work has already completed and no more work can be done ErrWorkCompleted = errors.New("work has completed") // Some workers encountered errors during processing ErrWorkerErrors = errors.New("some workers encountered errors") // A worker timed out before completing its work ErrWorkerTimeout = errors.New("worker timeout") // Args are of the wrong type or length to be passed to the work function ErrBadWorkArgs = errors.New("args can't be passed to work function") )
Functions ¶
This section is empty.
Types ¶
type WorkManager ¶
type WorkManager struct { // WorkerMaxTimeout is the maximum allowed time a worker can run WorkerMaxTimeout time.Duration // contains filtered or unexported fields }
WorkManager manages a pool of parallel workers and provides an API for sending work and collecting errors. It offers a layer of abstraction over the concept of workers and the asynchronous nature of its processing.
A WorkManager is required to call three methods:
- StartWorkers(func (args...)) Pass a function (or method with pointer receiver) to inform the WorkManager how to process the work you will send it. This is a normal function that takes some arbitrary number of arguments. Note: the function passed *must* only return an error. It is also unsafe for the function passed to read or write to any shared state.
- SendWork(args...) Pass arguments as you would normally pass to the function previously given to StartWorkers.
- WaitForCompletion() Wait for all workers to complete their tasks and receive a list of any errors that were encountered during processing.
The WorkManager is stateful. It is an error to run its methods out of order or to send it work after it has already completed all work.
func NewWorkManager ¶
func NewWorkManager(n int) (WorkManager, error)
NewWorkManager returns a WorkManager with n parallel workers.
func (*WorkManager) SendWork ¶
func (wm *WorkManager) SendWork(args ...interface{}) error
SendWork provides the args necessary for the workers to run their workFunc.
func (*WorkManager) SetRateLimit ¶
func (wm *WorkManager) SetRateLimit(r int)
SetRateLimit sets the workers to be collectively limited to r requests per second
func (*WorkManager) StartWorkers ¶
func (wm *WorkManager) StartWorkers(workFunc interface{}) error
StartWorkers starts a pool of workers that will run workFunc.
func (*WorkManager) WaitForCompletion ¶
func (wm *WorkManager) WaitForCompletion() *WorkManagerError
WaitForCompletion blocks until all workers have completed their work. The returned error is non-nil if any worker encountered an error. The exact error list is returned by calling err.All()
type WorkManagerError ¶
type WorkManagerError struct {
// contains filtered or unexported fields
}
WorkManagerError stores a list of errors encountered during worker processing.
func (*WorkManagerError) All ¶
func (e *WorkManagerError) All() []error
All returns a list of all errors encountered by workers.