concurrent

package
v0.3.10-0-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2022 License: Apache-2.0 Imports: 5 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Balancer

type Balancer struct {
	// contains filtered or unexported fields
}

Balancer is a type that can balance load among a set of workers

func NewBalancer

func NewBalancer(workerCount int) *Balancer

NewBalancer creates a new instance of Balancer that needs to balance load between 'workerCount' workers

func (*Balancer) Init

func (b *Balancer) Init()

Init initializes all channels and start the workers.

func (*Balancer) Run

func (b *Balancer) Run(requestChan <-chan *Request) (<-chan error, <-chan bool)

Run read request from the request channel identified by the parameter requestChan and dispatch it the worker with least load. This method returns two channels, a channel to communicate error from any worker back to the consumer of balancer and second channel is used by the balancer to signal consumer that all workers has been finished executing.

func (*Balancer) TearDownWorkers

func (b *Balancer) TearDownWorkers()

TearDownWorkers sends a force quit signal to all workers, which case worker to quit as soon as possible, workers won't drain it's request channel in this case.

func (*Balancer) WorkersCurrentLoad

func (b *Balancer) WorkersCurrentLoad() string

WorkersCurrentLoad returns the load of the workers this balancer manages as comma separated string values where each value consists of worker id (Worker.Id property) and pending requests associated with the worker.

type Pool

type Pool struct {
	sync.RWMutex           // If consumer want to use workers in a concurrent environment
	Workers      []*Worker // The workers
}

Pool is the collection of Worker, it is a min-heap that implements heap.Interface. The priority is the number of pending works assigned to the worker. Lower the pending work count higher the priority. Pool embeds sync.RWMutex to support concurrent heap operation.

func (*Pool) Len

func (p *Pool) Len() int

Len returns number of workers in the pool.

func (*Pool) Less

func (p *Pool) Less(i, j int) bool

Less returns true if priority of Worker instance at index i is less than priority of Worker instance at j, lower the pending value higher the priority

func (*Pool) Pop

func (p *Pool) Pop() interface{}

Pop is used by heap.Pop implementation, to pop a worker w with minimum priority from a Pool p, we call w := heap.Pop(&p).(*Worker), which swap the min priority worker at the beginning of the pool with the end of item, fix the heap and then invokes this method for popping the worker from the end.

func (*Pool) Push

func (p *Pool) Push(x interface{})

Push is used by heap.Push implementation, to add a worker w to a Pool pool, we call heap.Push(&pool, w) which invokes this method to add the worker to the end of collection then it fix the heap by moving the added item to its correct position.

func (*Pool) Swap

func (p *Pool) Swap(i, j int)

Swap swaps the Worker instances at the given indices i and j

func (*Pool) WorkersCurrentLoad

func (p *Pool) WorkersCurrentLoad() string

WorkersCurrentLoad returns the load of the workers as comma separated string values, where each value consists of worker id (Worker.Id property) and pending requests associated with the worker.

type Request

type Request struct {
	ID          string               // The Id of the work (for debugging purposes)
	Work        func() error         // The work to be executed by a worker
	ShouldRetry func(err error) bool // The method used by worker to decide whether to retry if work execution fails
}

Request represents a work that Worker needs to execute

type Worker

type Worker struct {
	RequestsToHandleChan chan *Request // The buffered channel of works this worker needs to handle
	Pending              int           // The number of pending requests this worker needs to handle (i.e. worker load)

	ID    int // Unique Id for worker (Debugging purpose)
	Index int // The index of the item in the heap.
	// contains filtered or unexported fields
}

Worker represents a type which can listen for work from a channel and run them

func NewWorker

func NewWorker(id int, workChannelSize int, pool *Pool, errorChan chan<- error, requestHandledChan chan<- *Worker, workerFinishedChan chan<- *Worker) *Worker

NewWorker creates a new instance of the worker with the given work channel size. errorChan is the channel to report the failure in addressing a work request after all retries, each time a work is completed (failure or success) doneChan will be signalled

func (*Worker) Run

func (w *Worker) Run(tearDownChan <-chan bool)

Run starts a go-routine that read work from work-queue associated with the worker and executes one at a time. The go-routine returns/exit once one of the following condition is met:

  1. The work-queue is closed and drained and there is no work to steal from peers worker's work-queue
  2. A signal is received in the tearDownChan channel parameter

After executing each work, this method sends report to Worker::requestHandledChan channel If a work fails after maximum retry, this method sends report to Worker::errorChan channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL