channelqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2024 License: MIT Imports: 2 Imported by: 1

README

channelqueue

GoDoc Build Status Go Report Card codecov License

Concurrently access a dynamic queue using channels.

ChannelQueue implements a queue that uses channels for input and output to provide concurrent access to a dynamically-sized queue. This allows the queue to be used like a channel, in a thread-safe manner. Closing the input channel closes the output channel when all queued items are read, consistent with channel behavior. In other words a ChannelQueue is a dynamically buffered channel with up to infinite capacity.

ChannelQueue also supports circular buffer behavior when created using NewRing. When the buffer is full, writing an additional item discards the oldest buffered item.

When specifying an unlimited buffer capacity use caution as the buffer is still limited by the resources available on the host system.

The ChannelQueue buffer auto-resizes according to the number of items buffered. For more information on the internal queue, see: https://github.com/gammazero/deque

ChannelQueue uses generics to contain items of the type specified. To create a ChannelQueue that holds a specific type, provide a type argument to New. For example:

intChanQueue := channelqueue.New[int]()
stringChanQueue := channelqueue.New[string]()

ChannelQueue can be used to provide buffering between existing channels. Using an existing read chan, write chan, or both are supported:

in := make(chan int)
out := make(chan int)

// Create a buffer between in and out channels.
channelqueue.New(channelqueue.WithInput[int](in), channelqueue.WithOutput[int](out))
// ...
close(in) // this will close cq when all output is read.

Documentation

Overview

Package channelqueue implements a queue that uses channels for input and output to provide concurrent access to a re-sizable queue. This allows the queue to be used like a channel. Closing the input channel closes the output channel when all queued items are read, consistent with channel behavior. In other words channelqueue is a dynamically buffered channel with up to infinite capacity.

ChannelQueue also supports circular buffer behavior when created using `NewRing`. When the buffer is full, writing an additional item discards the oldest buffered item.

When specifying an unlimited buffer capacity use caution as the buffer is still limited by the resources available on the host system.

Caution

The behavior of channelqueue differs from the behavior of a normal channel in one important way: After writing to the In() channel, the data may not be immediately available on the Out() channel (until the buffer goroutine is scheduled), and may be missed by a non-blocking select.

Credits

This implementation is based on ideas/examples from: https://github.com/eapache/channels

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithCapacity added in v1.0.0

func WithCapacity[T any](n int) func(*ChannelQueue[T])

WithCapacity sets the limit on the number of unread items that channelqueue will hold. Unbuffered behavior is not supported (use a normal channel for that), and a value of zero or less configures the default of no limit.

Example:

cq := channelqueue.New(channelqueue.WithCapacity[int](64))

func WithInput added in v1.0.0

func WithInput[T any](in chan T) func(*ChannelQueue[T])

WithInput uses an existing channel as the input channel, which is the channel used to write to the queue. This is used when buffering items that must be read from an existing channel. Be aware that calling Close or Shutdown will close this channel.

Example:

in := make(chan int)
cq := channelqueue.New(channelqueue.WithInput[int](in))

func WithOutput added in v1.0.0

func WithOutput[T any](out chan T) func(*ChannelQueue[T])

WithOutput uses an existing channel as the output channel, which is the channel used to read from the queue. This is used when buffering items that must be written to an existing channel. Be aware that ChannelQueue will close this channel when no more items are available.

Example:

out := make(chan int)
cq := channelqueue.New(channelqueue.WithOutput[int](out))

Types

type ChannelQueue

type ChannelQueue[T any] struct {
	// contains filtered or unexported fields
}

ChannelQueue uses a queue to buffer data between input and output channels.

func New

func New[T any](options ...Option[T]) *ChannelQueue[T]

New creates a new ChannelQueue that, by default, holds an unbounded number of items of the specified type.

func NewRing added in v0.2.1

func NewRing[T any](options ...Option[T]) *ChannelQueue[T]

NewRing creates a new ChannelQueue with the specified buffer capacity, and circular buffer behavior. When the buffer is full, writing an additional item discards the oldest buffered item.

func (*ChannelQueue[T]) Cap

func (cq *ChannelQueue[T]) Cap() int

Cap returns the capacity of the channelqueue. Returns -1 if unbounded.

func (*ChannelQueue[T]) Close

func (cq *ChannelQueue[T]) Close()

Close closes the input channel. This is the same as calling the builtin close on the input channel, except Close can be called multiple times.. Additional input will panic, output will continue to be readable until there is no more data, and then the output channel is closed.

func (*ChannelQueue[T]) In

func (cq *ChannelQueue[T]) In() chan<- T

In returns the write side of the channel.

func (*ChannelQueue[T]) Len

func (cq *ChannelQueue[T]) Len() int

Len returns the number of items buffered in the channel.

func (*ChannelQueue[T]) Out

func (cq *ChannelQueue[T]) Out() <-chan T

Out returns the read side of the channel.

func (*ChannelQueue[T]) Shutdown added in v1.0.0

func (cq *ChannelQueue[T]) Shutdown()

Shutdown calls Close then drains the channel to ensure that the internal goroutine finishes.

type Option added in v1.0.0

type Option[T any] func(*ChannelQueue[T])

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL