workshare

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2022 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package workshare implements work sharing worker pool.

Package workshare implements work sharing worker pool.

It is commonly used to traverse tree-like structures:

type processWorkRequest struct {
  node *someNode
  err error
}
func processWork(p *workshare.Pool, req interface{}) {
  req := req.(*processWorkRequest)
  req.err = visitNode(p, req.node)
}
func visitNode(p *workshare.Pool, n *someNode) error {
  var wg workshare.AsyncGroup
  defer wg.Close()

  for _, child := range n.children {
    if wg.CanShareWork(p) {
      // run asynchronously, collect result using wg.Wait() below.
      RunAsync(p, processWork, processWorkRequest{child})
    } else {
	      if err := visitNode(p, n); err != nil {
         return err
       }
    }
  }

  // wait for results from all shared work and handle them.
  for _, req := range wg.Wait() {
    if err := req.(*processWorkRequest).err; err != nil {
      return err
    }
  }

  return nil
}

wp = workshare.NewPool(10) defer wp.Close() visitNode(wp, root)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncGroup

type AsyncGroup struct {
	// contains filtered or unexported fields
}

AsyncGroup launches and awaits asynchronous work through a WorkerPool. It provides API designed to minimize allocations while being reasonably easy to use. AsyncGroup is very lightweight and NOT safe for concurrent use, all interactions must be from the same goroutine.

func (*AsyncGroup) CanShareWork

func (g *AsyncGroup) CanShareWork(w *Pool) bool

CanShareWork determines if the provided worker pool has capacity to share work. If the function returns true, the use MUST call RunAsync() exactly once. This pattern avoids allocations required to create asynchronous input if the worker pool is full.

func (*AsyncGroup) Close added in v0.11.0

func (g *AsyncGroup) Close()

Close ensures all asynchronous work has been awaited for.

func (*AsyncGroup) RunAsync

func (g *AsyncGroup) RunAsync(w *Pool, process ProcessFunc, request interface{})

RunAsync starts the asynchronous work to process the provided request, the user must call Wait() after all RunAsync() have been scheduled.

func (*AsyncGroup) Wait

func (g *AsyncGroup) Wait() []interface{}

Wait waits for scheduled asynchronous work to complete and returns all asynchronously processed inputs.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a pool of generic workers that can process workItem.

func NewPool

func NewPool(numWorkers int) *Pool

NewPool creates a worker pool that launches a given number of goroutines that can invoke shared work.

func (*Pool) ActiveWorkers

func (w *Pool) ActiveWorkers() int

ActiveWorkers returns the number of active workers.

func (*Pool) Close

func (w *Pool) Close()

Close closes the worker pool.

type ProcessFunc

type ProcessFunc func(c *Pool, request interface{})

ProcessFunc processes the provided request, which is typically a pointer to a structure. The result will typically be written within the request structure. To avoid allocations, the function should not be a local closure but a named function (either global or a method).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL