chanqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2024 License: MIT Imports: 2 Imported by: 3

README

chanqueue

GoDoc Build Status Go Report Card codecov License

Concurrently access a dynamic queue using channels.

ChanQueue implements a queue that uses channels for input and output to provide concurrent access to a dynamically-sized queue. This allows the queue to be used like a channel, in a thread-safe manner. Closing the input channel closes the output channel when all queued items are read, consistent with channel behavior. In other words a ChanQueue is a dynamically buffered channel with up to infinite capacity.

ChanQueue also supports circular buffer behavior when created using NewRing. When the buffer is full, writing an additional item discards the oldest buffered item.

When specifying an unlimited buffer capacity use caution as the buffer is still limited by the resources available on the host system.

The ChanQueue buffer auto-resizes according to the number of items buffered. For more information on the internal queue, see: https://github.com/gammazero/deque

ChanQueue uses generics to contain items of the type specified. To create a ChanQueue that holds a specific type, provide a type argument to New, or to options passed to New. For example:

intQueue := chanqueue.New[int]()
stringQueue := chanqueue.New[string]()
limitedQueue := chanqueue.New(chanqueue.WithCapacity[string](256))

ChanQueue can be used to provide buffering between existing channels. Using an existing read chan, write chan, or both are supported:

in := make(chan int)
out := make(chan int)

// Create a buffer between in and out channels.
chanqueue.New(chanqueue.WithInput[int](in), chanqueue.WithOutput[int](out))
// ...
close(in) // this will close cq when all output is read.

Documentation

Overview

Package chanqueue implements a queue that uses channels for input and output to provide concurrent access to a re-sizable queue. This allows the queue to be used like a channel. Closing the input channel closes the output channel when all queued items are read, consistent with channel behavior. In other words chanqueue is a dynamically buffered channel with up to infinite capacity.

ChanQueue also supports circular buffer behavior when created using `NewRing`. When the buffer is full, writing an additional item discards the oldest buffered item.

When using an unlimited buffer capacity, the default, use caution as the buffer is still limited by the resources available on the host system.

Caution

The behavior of chanqueue differs from the behavior of a normal channel in one important way: After writing to the In() channel, the data may not be immediately available on the Out() channel (until the buffer goroutine is scheduled), and may be missed by a non-blocking select.

Credits

This implementation is based on ideas/examples from: https://github.com/eapache/channels

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithCapacity

func WithCapacity[T any](n int) func(*ChanQueue[T])

WithCapacity sets the limit on the number of unread items that ChanQueue will hold. Unbuffered behavior is not supported (use a normal channel for that), and a value of zero or less configures the default of no limit.

Example:

cq := chanqueue.New(chanqueue.WithCapacity[int](64))

func WithInput

func WithInput[T any](in chan T) func(*ChanQueue[T])

WithInput uses an existing channel as the input channel, which is the channel used to write to the queue. This is used when buffering items that must be read from an existing channel. Be aware that calling Close or Shutdown will close this channel.

Example:

in := make(chan int)
cq := chanqueue.New(chanqueue.WithInput[int](in))

func WithOutput

func WithOutput[T any](out chan T) func(*ChanQueue[T])

WithOutput uses an existing channel as the output channel, which is the channel used to read from the queue. This is used when buffering items that must be written to an existing channel. Be aware that ChanQueue will close this channel when no more items are available.

Example:

out := make(chan int)
cq := chanqueue.New(chanqueue.WithOutput[int](out))

Types

type ChanQueue

type ChanQueue[T any] struct {
	// contains filtered or unexported fields
}

ChanQueue uses a queue to buffer data between input and output channels.

func New

func New[T any](options ...Option[T]) *ChanQueue[T]

New creates a new ChanQueue that, by default, holds an unbounded number of items of the specified type.

func NewRing

func NewRing[T any](options ...Option[T]) *ChanQueue[T]

NewRing creates a new ChanQueue with the specified buffer capacity, and circular buffer behavior. When the buffer is full, writing an additional item discards the oldest buffered item.

func (*ChanQueue[T]) Cap

func (cq *ChanQueue[T]) Cap() int

Cap returns the capacity of the ChanQueue. Returns -1 if unbounded.

func (*ChanQueue[T]) Close

func (cq *ChanQueue[T]) Close()

Close closes the input channel. This is the same as calling the builtin close on the input channel, except Close can be called multiple times.. Additional input will panic, output will continue to be readable until there is no more data, and then the output channel is closed.

func (*ChanQueue[T]) In

func (cq *ChanQueue[T]) In() chan<- T

In returns the write side of the channel.

func (*ChanQueue[T]) Len

func (cq *ChanQueue[T]) Len() int

Len returns the number of items buffered in the channel.

func (*ChanQueue[T]) Out

func (cq *ChanQueue[T]) Out() <-chan T

Out returns the read side of the channel.

func (*ChanQueue[T]) Shutdown

func (cq *ChanQueue[T]) Shutdown()

Shutdown calls Close then drains the channel to ensure that the internal goroutine finishes.

type Option

type Option[T any] func(*ChanQueue[T])

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL