task

package
v3.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2023 License: Apache-2.0, Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package task provides an interface for indicating when an operation has been blocked, so that a worker pool which wants to be doing N things at a time can start trying new things when some things are blocked.

To understand this, you have to start with the original context: We have a worker pool, which can handle up to N tasks at once. Tasks come in in batches, asynchronously. At most one write task can be active on a given database at a time, but many read tasks can be active on the database, with or without a write task. Each read task completes only when its entire containing operation completes. Write tasks can *partially* complete immediately, but in some cases, must wait for read tasks to finish before they can do crucial bookkeeping work.

Regardless of the workload, we always have tasks which can progress available, and if we do them, eventually everything will complete. However, for some workloads, it is possible to pick N tasks *all of which are blocked*. In this case, the worker pool becomes useless. Furthermore, even if we don't hit that state, we can hit a state where nearly all worker pool tasks are blocked.

To address this, we need a way for a worker pool to recognize that a worker has become blocked, and *start another worker*. This can result in running more than N workers at once. However, it rarely results in running *many* more. The typical case would be that we have a worker pool of N, and M of them are blocked waiting for write access to a given database. If one of them becomes unblocked, we may end up with N+1 active workers, but the other M-1 waiting on that database are still blocked.

It might seem like the simplest thing to do is use a buffered channel as a semaphore, this being a standard Go idiom for pools. It's a great idiom, but in our case, it runs into a problem. When each worker starts, it writes into a buffered channel. When it becomes blocked, it reads from the channel to free up a slot. When it becomes unblocked, then, it has to write to the channel to indicate that it's taking up a slot again. But writes to the channel are contested, and usually only become possible when something else either blocks or exits... Meaning that, precisely at the moment that we have gained a highly contested lock and are able to proceed, we block for an indeterminate period of time *while holding that lock*. This is the opposite of what we want.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a worker-pool type thing, which will call a given function in parallel aiming for a given level of concurrency. To use a pool, you create it, passing in a worker function; it then spawns goroutines to run that function in a loop. If the Pool's Block method is called, this marks one instance of the worker goroutine as blocked; the Unblock method marks it as unblocked. When there are insufficient unblocked goroutines, more are spawned. When there are excess goroutines, they exit.

The pool can be shut down by calling Close(), setting its target number of workers to 0.

func NewPool

func NewPool(targetN int, step func(), stats PoolStats) *Pool

NewPool creates a pool that attempts to keep targetN goroutines active, executing step() repeatedly. It updates poolSize with the current size of the pool when that changes.

func (*Pool) Block

func (p *Pool) Block()

Block marks a worker as blocked, indicating that we may need a new worker spawned because the caller is about to be blocked for an indeterminate period of time. If a new worker is needed, it's spawned immediately before Block returns.

func (*Pool) Close

func (p *Pool) Close()

Close is a Shutdown followed by waiting for all jobs to exit.

func (*Pool) Shutdown

func (p *Pool) Shutdown()

Shutdown tells a pool to terminate by setting its desired pool size to zero, but does not wait for the jobs in it to stop. It is safe to call this before calling Close.

func (*Pool) Stats

func (p *Pool) Stats() (live, unblocked, target int)

Stats reports on the pool's current state -- total live workers it has, how many it thinks are unblocked, and what its target is. These numbers are sampled individually, and there's no locking, so they are not guaranteed to be consistent. This is useful for approximate monitoring.

func (*Pool) Unblock

func (p *Pool) Unblock()

Unblock marks a worker as unblocked, potentially allowing the pool to retire a worker thread at some point in the future.

type PoolStats

type PoolStats interface {
	PoolSize(int) // reports current pool size
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL