Documentation ¶
Overview ¶
package btrchannels provides a collection of helper functions, interfaces and implementations for working with and extending the capabilities of golang's existing channels. The main interface of interest is Channel, though sub-interfaces are also provided for cases where the full Channel interface cannot be met (for example, InChannel for write-only channels).
For integration with native typed golang channels, functions Wrap and Unwrap are provided which do the appropriate type conversions. The NativeChannel, NativeInChannel and NativeOutChannel type definitions are also provided for use with native channels which already carry values of type interface{}.
The heart of the package consists of several distinct implementations of the Channel interface, including channels backed by special buffers (resizable, infinite, ring buffers, etc) and other useful types. A "black hole" channel for discarding unwanted values (similar in purpose to ioutil.Discard or /dev/null) rounds out the set.
Helper functions for operating on Channels include Pipe and Tee (which behave much like their Unix namesakes), as well as Multiplex and Distribute. "Weak" versions of these functions also exist, which do not close their output channel(s) on completion.
Due to limitations of Go's type system, importing this library directly is often not practical for production code. It serves equally well, however, as a reference guide and template for implementing many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit in the resulting code.
Warning: several types in this package provide so-called "infinite" buffers. Be *very* careful using these, as no buffer is truly infinite - if such a buffer grows too large your program will run out of memory and crash. Caveat emptor.
Index ¶
- func Distribute[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
- func Multiplex[T any](output SimpleInChannel[T], inputs ...SimpleOutChannel[T])
- func Pipe[T any](input SimpleOutChannel[T], output SimpleInChannel[T])
- func Tee[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
- func Unwrap[T any](input SimpleOutChannel[T], output interface{})
- func WeakDistribute[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
- func WeakMultiplex[T any](output SimpleInChannel[T], inputs ...SimpleOutChannel[T])
- func WeakPipe[T any](input SimpleOutChannel[T], output SimpleInChannel[T])
- func WeakTee[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
- type BatchingChannel
- type BlackHole
- type Buffer
- type BufferCap
- type Channel
- type DeadChannel
- type InChannel
- type InfiniteChannel
- type NativeChannel
- type NativeInChannel
- type NativeOutChannel
- type OutChannel
- type OverflowingChannel
- type ResizableChannel
- type RingChannel
- type SharedBuffer
- type SimpleChannel
- type SimpleInChannel
- type SimpleOutChannel
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Distribute ¶
func Distribute[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
Distribute takes a single input channel and an arbitrary number of output channels and duplicates each input into *one* available output. If multiple outputs are waiting for a value, one is chosen at random. When the input channel is closed, all outputs channels are closed. Distribute with a single output channel is equivalent to Pipe (though slightly less efficient).
func Multiplex ¶
func Multiplex[T any](output SimpleInChannel[T], inputs ...SimpleOutChannel[T])
Multiplex takes an arbitrary number of input channels and multiplexes their output into a single output channel. When all input channels have been closed, the output channel is closed. Multiplex with a single input channel is equivalent to Pipe (though slightly less efficient).
func Pipe ¶
func Pipe[T any](input SimpleOutChannel[T], output SimpleInChannel[T])
Pipe connects the input channel to the output channel so that they behave as if a single channel.
func Tee ¶
func Tee[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
Tee (like its Unix namesake) takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. Tee with a single output channel is equivalent to Pipe (though slightly less efficient).
func Unwrap ¶
func Unwrap[T any](input SimpleOutChannel[T], output interface{})
Unwrap takes a SimpleOutChannel and uses reflection to pipe it to a typed native channel for easy integration with existing channel sources. Output can be any writable channel type (chan or chan<-). It panics if the output is not a writable channel, or if a value is received that cannot be sent on the output channel.
func WeakDistribute ¶
func WeakDistribute[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
WeakDistribute behaves like Distribute (distributing a single input amongst multiple outputs) except that it does not close the output channels when the input channel is closed.
func WeakMultiplex ¶
func WeakMultiplex[T any](output SimpleInChannel[T], inputs ...SimpleOutChannel[T])
WeakMultiplex behaves like Multiplex (multiplexing multiple inputs into a single output) except that it does not close the output channel when the input channels are closed.
func WeakPipe ¶
func WeakPipe[T any](input SimpleOutChannel[T], output SimpleInChannel[T])
WeakPipe behaves like Pipe (connecting the two channels) except that it does not close the output channel when the input channel is closed.
func WeakTee ¶
func WeakTee[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])
WeakTee behaves like Tee (duplicating a single input into multiple outputs) except that it does not close the output channels when the input channel is closed.
Types ¶
type BatchingChannel ¶
type BatchingChannel[T any] struct { // contains filtered or unexported fields }
BatchingChannel implements the Channel interface, with the change that instead of producing individual elements on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel).
func NewBatchingChannel ¶
func NewBatchingChannel[T any](size BufferCap) *BatchingChannel[T]
func (*BatchingChannel[T]) Cap ¶
func (ch *BatchingChannel[T]) Cap() BufferCap
func (*BatchingChannel[T]) Close ¶
func (ch *BatchingChannel[T]) Close()
func (*BatchingChannel[T]) In ¶
func (ch *BatchingChannel[T]) In() chan<- T
func (*BatchingChannel[T]) Len ¶
func (ch *BatchingChannel[T]) Len() int
func (*BatchingChannel[T]) Out ¶
func (ch *BatchingChannel[T]) Out() <-chan []T
Out returns a <-chan interface{} in order that BatchingChannel conforms to the standard Channel interface provided by this package, however each output value is guaranteed to be of type []interface{} - a slice collecting the most recent batch of values sent on the In channel. The slice is guaranteed to not be empty or nil. In practice the net result is that you need an additional type assertion to access the underlying values.
type BlackHole ¶
type BlackHole[T any] struct { // contains filtered or unexported fields }
BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in the ioutil package - it never blocks, and simply discards every value it reads. The number of items discarded in this way is counted and returned from Len.
func NewBlackHole ¶
type Buffer ¶
type Buffer interface { Len() int // The number of elements currently buffered. Cap() BufferCap // The maximum number of elements that can be buffered. }
Buffer is an interface for any channel that provides access to query the state of its buffer. Even unbuffered channels can implement this interface by simply returning 0 from Len() and None from Cap().
type BufferCap ¶
type BufferCap int
BufferCap represents the capacity of the buffer backing a channel. Valid values consist of all positive integers, as well as the special values below.
type Channel ¶
type Channel[T any] interface { SimpleChannel[T] Buffer }
Channel is an interface representing a channel that is readable, writeable and implements the Buffer interface
Example ¶
var ch Channel[any] ch = NewInfiniteChannel[any]() for i := 0; i < 10; i++ { ch.In() <- nil } for i := 0; i < 10; i++ { <-ch.Out() }
Output:
type DeadChannel ¶
type DeadChannel[T any] struct{}
DeadChannel is a placeholder implementation of the Channel interface with no buffer that is never ready for reading or writing. Closing a dead channel is a no-op. Behaves almost like NativeChannel(nil) except that closing a nil NativeChannel will panic.
func NewDeadChannel ¶
func NewDeadChannel[T any]() DeadChannel[T]
func (DeadChannel[T]) Cap ¶
func (ch DeadChannel[T]) Cap() BufferCap
func (DeadChannel[T]) Close ¶
func (ch DeadChannel[T]) Close()
func (DeadChannel[T]) In ¶
func (ch DeadChannel[T]) In() chan<- T
func (DeadChannel[T]) Len ¶
func (ch DeadChannel[T]) Len() int
func (DeadChannel[T]) Out ¶
func (ch DeadChannel[T]) Out() <-chan T
type InChannel ¶
type InChannel[T any] interface { SimpleInChannel[T] Buffer }
InChannel is an interface representing a writeable channel with a buffer.
type InfiniteChannel ¶
type InfiniteChannel[T any] struct { // contains filtered or unexported fields }
InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output.
func NewInfiniteChannel ¶
func NewInfiniteChannel[T any]() *InfiniteChannel[T]
func (*InfiniteChannel[T]) Cap ¶
func (ch *InfiniteChannel[T]) Cap() BufferCap
func (*InfiniteChannel[T]) Close ¶
func (ch *InfiniteChannel[T]) Close()
func (*InfiniteChannel[T]) In ¶
func (ch *InfiniteChannel[T]) In() chan<- T
func (*InfiniteChannel[T]) Len ¶
func (ch *InfiniteChannel[T]) Len() int
func (*InfiniteChannel[T]) Out ¶
func (ch *InfiniteChannel[T]) Out() <-chan T
type NativeChannel ¶
type NativeChannel[T any] chan T
NativeChannel implements the Channel interface by wrapping a native go channel.
func NewNativeChannel ¶
func NewNativeChannel[T any](size BufferCap) NativeChannel[T]
NewNativeChannel makes a new NativeChannel with the given buffer size. Just a convenience wrapper to avoid having to cast the result of make().
func (NativeChannel[T]) Cap ¶
func (ch NativeChannel[T]) Cap() BufferCap
func (NativeChannel[T]) Close ¶
func (ch NativeChannel[T]) Close()
func (NativeChannel[T]) In ¶
func (ch NativeChannel[T]) In() chan<- T
func (NativeChannel[T]) Len ¶
func (ch NativeChannel[T]) Len() int
func (NativeChannel[T]) Out ¶
func (ch NativeChannel[T]) Out() <-chan T
type NativeInChannel ¶
type NativeInChannel[T any] chan<- T
NativeInChannel implements the InChannel interface by wrapping a native go write-only channel.
func (NativeInChannel[T]) Cap ¶
func (ch NativeInChannel[T]) Cap() BufferCap
func (NativeInChannel[T]) Close ¶
func (ch NativeInChannel[T]) Close()
func (NativeInChannel[T]) In ¶
func (ch NativeInChannel[T]) In() chan<- T
func (NativeInChannel[T]) Len ¶
func (ch NativeInChannel[T]) Len() int
type NativeOutChannel ¶
type NativeOutChannel[T any] <-chan T
NativeOutChannel implements the OutChannel interface by wrapping a native go read-only channel.
func (NativeOutChannel[T]) Cap ¶
func (ch NativeOutChannel[T]) Cap() BufferCap
func (NativeOutChannel[T]) Len ¶
func (ch NativeOutChannel[T]) Len() int
func (NativeOutChannel[T]) Out ¶
func (ch NativeOutChannel[T]) Out() <-chan T
type OutChannel ¶
type OutChannel[T any] interface { SimpleOutChannel[T] Buffer }
OutChannel is an interface representing a readable channel implementing the Buffer interface.
type OverflowingChannel ¶
type OverflowingChannel[T any] struct { // contains filtered or unexported fields }
OverflowingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to an OverflowingChannel when its buffer is full (or, in an unbuffered case, when the recipient is not ready) then that value is simply discarded. Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the oldest element, not the newest) see RingChannel.
func NewOverflowingChannel ¶
func NewOverflowingChannel[T any](size BufferCap) *OverflowingChannel[T]
func (*OverflowingChannel[T]) Cap ¶
func (ch *OverflowingChannel[T]) Cap() BufferCap
func (*OverflowingChannel[T]) Close ¶
func (ch *OverflowingChannel[T]) Close()
func (*OverflowingChannel[T]) In ¶
func (ch *OverflowingChannel[T]) In() chan<- T
func (*OverflowingChannel[T]) Len ¶
func (ch *OverflowingChannel[T]) Len() int
func (*OverflowingChannel[T]) Out ¶
func (ch *OverflowingChannel[T]) Out() <-chan T
type ResizableChannel ¶
type ResizableChannel[T any] struct { // contains filtered or unexported fields }
ResizableChannel implements the Channel interface with a resizable buffer between the input and the output. The channel initially has a buffer size of 1, but can be resized by calling Resize().
Resizing to a buffer capacity of None is, unfortunately, not supported and will panic (see https://github.com/eapache/channels/issues/1). Resizing back and forth between a finite and infinite buffer is fully supported.
func NewResizableChannel ¶
func NewResizableChannel[T any]() *ResizableChannel[T]
func (*ResizableChannel[T]) Cap ¶
func (ch *ResizableChannel[T]) Cap() BufferCap
func (*ResizableChannel[T]) Close ¶
func (ch *ResizableChannel[T]) Close()
func (*ResizableChannel[T]) In ¶
func (ch *ResizableChannel[T]) In() chan<- T
func (*ResizableChannel[T]) Len ¶
func (ch *ResizableChannel[T]) Len() int
func (*ResizableChannel[T]) Out ¶
func (ch *ResizableChannel[T]) Out() <-chan T
func (*ResizableChannel[T]) Resize ¶
func (ch *ResizableChannel[T]) Resize(newSize BufferCap)
type RingChannel ¶
type RingChannel[T any] struct { // contains filtered or unexported fields }
RingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to a RingChannel when its buffer is full then the oldest value in the buffer is discarded to make room (just like a standard ring-buffer). Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the newest element, not the oldest) see OverflowingChannel.
func NewRingChannel ¶
func NewRingChannel[T any](size BufferCap) *RingChannel[T]
func (*RingChannel[T]) Cap ¶
func (ch *RingChannel[T]) Cap() BufferCap
func (*RingChannel[T]) Close ¶
func (ch *RingChannel[T]) Close()
func (*RingChannel[T]) In ¶
func (ch *RingChannel[T]) In() chan<- T
func (*RingChannel[T]) Len ¶
func (ch *RingChannel[T]) Len() int
func (*RingChannel[T]) Out ¶
func (ch *RingChannel[T]) Out() <-chan T
type SharedBuffer ¶
type SharedBuffer[T any] struct { // contains filtered or unexported fields }
SharedBuffer implements the Buffer interface, and permits multiple SimpleChannel instances to "share" a single buffer. Each channel spawned by NewChannel has its own internal queue (so values flowing through do not get mixed up with other channels) but the total number of elements buffered by all spawned channels is limited to a single capacity. This means *all* such channels block and unblock for writing together. The primary use case is for implementing pipeline-style parallelism with goroutines, limiting the total number of elements in the pipeline without limiting the number of elements at any particular step.
func NewSharedBuffer ¶
func NewSharedBuffer[T any](size BufferCap) *SharedBuffer[T]
func (*SharedBuffer[T]) Cap ¶
func (buf *SharedBuffer[T]) Cap() BufferCap
func (*SharedBuffer[T]) Close ¶
func (buf *SharedBuffer[T]) Close()
Close shuts down the SharedBuffer. It is an error to call Close while channels are still using the buffer (I'm not really sure what would happen if you do so).
func (*SharedBuffer[T]) Len ¶
func (buf *SharedBuffer[T]) Len() int
func (*SharedBuffer[T]) NewChannel ¶
func (buf *SharedBuffer[T]) NewChannel() SimpleChannel[T]
NewChannel spawns and returns a new channel sharing the underlying buffer.
type SimpleChannel ¶
type SimpleChannel[T any] interface { SimpleInChannel[T] SimpleOutChannel[T] }
SimpleChannel is an interface representing a channel that is both readable and writeable, but does not necessarily implement the Buffer interface.
type SimpleInChannel ¶
type SimpleInChannel[T any] interface { In() chan<- T // The writeable end of the channel. Close() // Closes the channel. It is an error to write to In() after calling Close(). }
SimpleInChannel is an interface representing a writeable channel that does not necessarily implement the Buffer interface.
type SimpleOutChannel ¶
type SimpleOutChannel[T any] interface { Out() <-chan T // The readable end of the channel. }
SimpleOutChannel is an interface representing a readable channel that does not necessarily implement the Buffer interface.
func Wrap ¶
func Wrap[T any](ch <-chan T) SimpleOutChannel[T]
Wrap takes any readable channel type (chan or <-chan but not chan<-) and exposes it as a SimpleOutChannel for easy integration with existing channel sources. It panics if the input is not a readable channel.