concurrentqueue

package
v0.0.0-...-5c79d48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: AGPL-3.0 Imports: 1 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*config)

Option is a queue configuration option.

func Capacity

func Capacity(c int) Option

Capacity is the amount of "in flight" items the queue can hold, not including any extra capacity in the input/output buffers. Assuming unbuffered input/output, this is the number of items that can be pushed to a queue before it begins to exhibit backpressure. A value of 8-16x the number of workers is probably a reasonable choice for most applications. Note that a queue always has a capacity at least equal to the number of workers, so `New(fn,Workers(7),Capacity(3))` results in a queue with capacity `7`. Defaults to 64.

func InputBuf

func InputBuf(b int) Option

InputBuf is the amount of buffer to be used in the input/push channel. Defaults to 0 (unbuffered).

func OutputBuf

func OutputBuf(b int) Option

OutputBuf is the amount of buffer to be used in the output/pop channel. Allocating output buffer space may improve performance when items are able to be popped in quick succession. Defaults to 0 (unbuffered).

func Workers

func Workers(w int) Option

Workers is the number of concurrent workers to be used for the queue. Defaults to 4.

type Queue

type Queue[I any, O any] struct {
	// contains filtered or unexported fields
}

Queue is a data processing helper which uses a worker pool to apply a closure to a series of values concurrently, preserving the correct ordering of results. It is essentially the concurrent equivalent of this:

for msg := range inputChannel {
    outputChannel <- workFunction(msg)
}

In order to prevent indefinite memory growth within the queue due to slow consumption and/or workers, the queue will exert backpressure over its input channel once a configurable capacity is reached.

func New

func New[I any, O any](workfn func(I) O, opts ...Option) *Queue[I, O]

New builds a new queue instance around the supplied work function.

func (*Queue[I, O]) Close

func (q *Queue[I, O]) Close() error

Close permanently terminates all background operations. If the queue is not drained before closure, items may be lost.

func (*Queue[I, O]) Done

func (q *Queue[I, O]) Done() <-chan struct{}

Done signals closure of the queue.

func (*Queue[I, O]) Pop

func (q *Queue[I, O]) Pop() <-chan O

Pop accesses the queue's output channel. The type of the received value will match the output of the work function.

func (*Queue[I, O]) Push

func (q *Queue[I, O]) Push() chan<- I

Push accesses the queue's input channel. The type of sent values must match that expected by the queue's work function. If the queue was configured with a buffered input/push channel, non-blocking sends can be used as a heuristic for detecting backpressure due to queue capacity. This is not a perfect test, but the rate of false positives will be extremely low for a queue with a decent capacity and non-trivial work function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL