Documentation ¶
Overview ¶
Package channelqueue implements a queue that uses channels for input and output to provide concurrent access to a re-sizable queue. This allows the queue to be used like a channel. Closing the input channel closes the output channel when all queued items are read, consistent with channel behavior. In other words channelqueue is a dynamically buffered channel with up to infinite capacity.
ChannelQueue also supports circular buffer behavior when created using `NewRing`. When the buffer is full, writing an additional item discards the oldest buffered item.
When specifying an unlimited buffer capacity use caution as the buffer is still limited by the resources available on the host system.
Caution ¶
The behavior of channelqueue differs from the behavior of a normal channel in one important way: After writing to the In() channel, the data may not be immediately available on the Out() channel (until the buffer goroutine is scheduled), and may be missed by a non-blocking select.
Credits ¶
This implementation is based on ideas/examples from: https://github.com/eapache/channels
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithCapacity ¶ added in v1.0.0
func WithCapacity[T any](n int) func(*ChannelQueue[T])
WithCapacity sets the limit on the number of unread items that channelqueue will hold. Unbuffered behavior is not supported (use a normal channel for that), and a value of zero or less configures the default of no limit.
Example:
cq := channelqueue.New(channelqueue.WithCapacity[int](64))
func WithInput ¶ added in v1.0.0
func WithInput[T any](in chan T) func(*ChannelQueue[T])
WithInput uses an existing channel as the input channel, which is the channel used to write to the queue. This is used when buffering items that must be read from an existing channel. Be aware that calling Close or Shutdown will close this channel.
Example:
in := make(chan int) cq := channelqueue.New(channelqueue.WithInput[int](in))
func WithOutput ¶ added in v1.0.0
func WithOutput[T any](out chan T) func(*ChannelQueue[T])
WithOutput uses an existing channel as the output channel, which is the channel used to read from the queue. This is used when buffering items that must be written to an existing channel. Be aware that ChannelQueue will close this channel when no more items are available.
Example:
out := make(chan int) cq := channelqueue.New(channelqueue.WithOutput[int](out))
Types ¶
type ChannelQueue ¶
type ChannelQueue[T any] struct { // contains filtered or unexported fields }
ChannelQueue uses a queue to buffer data between input and output channels.
func New ¶
func New[T any](options ...Option[T]) *ChannelQueue[T]
New creates a new ChannelQueue that, by default, holds an unbounded number of items of the specified type.
func NewRing ¶ added in v0.2.1
func NewRing[T any](options ...Option[T]) *ChannelQueue[T]
NewRing creates a new ChannelQueue with the specified buffer capacity, and circular buffer behavior. When the buffer is full, writing an additional item discards the oldest buffered item.
func (*ChannelQueue[T]) Cap ¶
func (cq *ChannelQueue[T]) Cap() int
Cap returns the capacity of the channelqueue. Returns -1 if unbounded.
func (*ChannelQueue[T]) Close ¶
func (cq *ChannelQueue[T]) Close()
Close closes the input channel. This is the same as calling the builtin close on the input channel, except Close can be called multiple times.. Additional input will panic, output will continue to be readable until there is no more data, and then the output channel is closed.
func (*ChannelQueue[T]) In ¶
func (cq *ChannelQueue[T]) In() chan<- T
In returns the write side of the channel.
func (*ChannelQueue[T]) Len ¶
func (cq *ChannelQueue[T]) Len() int
Len returns the number of items buffered in the channel.
func (*ChannelQueue[T]) Out ¶
func (cq *ChannelQueue[T]) Out() <-chan T
Out returns the read side of the channel.
func (*ChannelQueue[T]) Shutdown ¶ added in v1.0.0
func (cq *ChannelQueue[T]) Shutdown()
Shutdown calls Close then drains the channel to ensure that the internal goroutine finishes.
type Option ¶ added in v1.0.0
type Option[T any] func(*ChannelQueue[T])