Documentation
¶
Overview ¶
Package channels provides a collection of helper functions, interfaces and implementations for working with and extending the capabilities of golang's existing channels. The main interface of interest is Channel, though sub-interfaces are also provided for cases where the full Channel interface cannot be met (for example, InChannel for write-only channels).
For integration with native typed golang channels, functions Wrap and Unwrap are provided which do the appropriate type conversions. The NativeChannel, NativeInChannel and NativeOutChannel type definitions are also provided for use with native channels which already carry values of type interface{}.
The heart of the package consists of several distinct implementations of the Channel interface, including channels backed by special buffers (resizable, infinite, ring buffers, etc) and other useful types. A "black hole" channel for discarding unwanted values (similar in purpose to ioutil.Discard or /dev/null) rounds out the set.
Helper functions for operating on Channels include Pipe and Tee (which behave much like their Unix namesakes), as well as Multiplex and Distribute. "Weak" versions of these functions also exist, which do not close their output channel(s) on completion.
Due to limitations of Go's type system, importing this library directly is often not practical for production code. It serves equally well, however, as a reference guide and template for implementing many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit in the resulting code.
Warning: several types in this package provide so-called "infinite" buffers. Be *very* careful using these, as no buffer is truly infinite - if such a buffer grows too large your program will run out of memory and crash. Caveat emptor.
Index ¶
- func Distribute(input SimpleOutChannel, outputs ...SimpleInChannel)
- func Multiplex(output SimpleInChannel, inputs ...SimpleOutChannel)
- func Pipe(input SimpleOutChannel, output SimpleInChannel)
- func Tee(input SimpleOutChannel, outputs ...SimpleInChannel)
- func Unwrap(input SimpleOutChannel, output interface{})
- func WeakDistribute(input SimpleOutChannel, outputs ...SimpleInChannel)
- func WeakMultiplex(output SimpleInChannel, inputs ...SimpleOutChannel)
- func WeakPipe(input SimpleOutChannel, output SimpleInChannel)
- func WeakTee(input SimpleOutChannel, outputs ...SimpleInChannel)
- type BatchingChannel
- type BlackHole
- type Buffer
- type BufferCap
- type Channel
- type DeadChannel
- type InChannel
- type InfiniteChannel
- type NativeChannel
- type NativeInChannel
- type NativeOutChannel
- type OutChannel
- type OverflowingChannel
- type ResizableChannel
- type RingChannel
- type SharedBuffer
- type SimpleChannel
- type SimpleInChannel
- type SimpleOutChannel
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Distribute ¶
func Distribute(input SimpleOutChannel, outputs ...SimpleInChannel)
Distribute takes a single input channel and an arbitrary number of output channels and duplicates each input into *one* available output. If multiple outputs are waiting for a value, one is chosen at random. When the input channel is closed, all outputs channels are closed. Distribute with a single output channel is equivalent to Pipe (though slightly less efficient).
func Multiplex ¶
func Multiplex(output SimpleInChannel, inputs ...SimpleOutChannel)
Multiplex takes an arbitrary number of input channels and multiplexes their output into a single output channel. When all input channels have been closed, the output channel is closed. Multiplex with a single input channel is equivalent to Pipe (though slightly less efficient).
func Pipe ¶
func Pipe(input SimpleOutChannel, output SimpleInChannel)
Pipe connects the input channel to the output channel so that they behave as if a single channel.
func Tee ¶
func Tee(input SimpleOutChannel, outputs ...SimpleInChannel)
Tee (like its Unix namesake) takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. Tee with a single output channel is equivalent to Pipe (though slightly less efficient).
func Unwrap ¶
func Unwrap(input SimpleOutChannel, output interface{})
Unwrap takes a SimpleOutChannel and uses reflection to pipe it to a typed native channel for easy integration with existing channel sources. Output can be any writable channel type (chan or chan<-). It panics if the output is not a writable channel, or if a value is received that cannot be sent on the output channel.
func WeakDistribute ¶
func WeakDistribute(input SimpleOutChannel, outputs ...SimpleInChannel)
WeakDistribute behaves like Distribute (distributing a single input amongst multiple outputs) except that it does not close the output channels when the input channel is closed.
func WeakMultiplex ¶
func WeakMultiplex(output SimpleInChannel, inputs ...SimpleOutChannel)
WeakMultiplex behaves like Multiplex (multiplexing multiple inputs into a single output) except that it does not close the output channel when the input channels are closed.
func WeakPipe ¶
func WeakPipe(input SimpleOutChannel, output SimpleInChannel)
WeakPipe behaves like Pipe (connecting the two channels) except that it does not close the output channel when the input channel is closed.
func WeakTee ¶
func WeakTee(input SimpleOutChannel, outputs ...SimpleInChannel)
WeakTee behaves like Tee (duplicating a single input into multiple outputs) except that it does not close the output channels when the input channel is closed.
Types ¶
type BatchingChannel ¶
type BatchingChannel struct {
// contains filtered or unexported fields
}
BatchingChannel implements the Channel interface, with the change that instead of producing individual elements on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel).
func NewBatchingChannel ¶
func NewBatchingChannel(size BufferCap) *BatchingChannel
func (*BatchingChannel) Cap ¶
func (ch *BatchingChannel) Cap() BufferCap
func (*BatchingChannel) Close ¶
func (ch *BatchingChannel) Close()
func (*BatchingChannel) In ¶
func (ch *BatchingChannel) In() chan<- interface{}
func (*BatchingChannel) Len ¶
func (ch *BatchingChannel) Len() int
func (*BatchingChannel) Out ¶
func (ch *BatchingChannel) Out() <-chan interface{}
Out returns a <-chan interface{} in order that BatchingChannel conforms to the standard Channel interface provided by this package, however each output value is guaranteed to be of type []interface{} - a slice collecting the most recent batch of values sent on the In channel. The slice is guaranteed to not be empty or nil. In practice the net result is that you need an additional type assertion to access the underlying values.
type BlackHole ¶
type BlackHole struct {
// contains filtered or unexported fields
}
BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in the ioutil package - it never blocks, and simply discards every value it reads. The number of items discarded in this way is counted and returned from Len.
func NewBlackHole ¶
func NewBlackHole() *BlackHole
type Buffer ¶
type Buffer interface { Len() int // The number of elements currently buffered. Cap() BufferCap // The maximum number of elements that can be buffered. }
Buffer is an interface for any channel that provides access to query the state of its buffer. Even unbuffered channels can implement this interface by simply returning 0 from Len() and None from Cap().
type BufferCap ¶
type BufferCap int
BufferCap represents the capacity of the buffer backing a channel. Valid values consist of all positive integers, as well as the special values below.
type Channel ¶
type Channel interface { SimpleChannel Buffer }
Channel is an interface representing a channel that is readable, writeable and implements the Buffer interface
Example ¶
var ch Channel ch = NewInfiniteChannel() for i := 0; i < 10; i++ { ch.In() <- nil } for i := 0; i < 10; i++ { <-ch.Out() }
Output:
type DeadChannel ¶
type DeadChannel struct{}
DeadChannel is a placeholder implementation of the Channel interface with no buffer that is never ready for reading or writing. Closing a dead channel is a no-op. Behaves almost like NativeChannel(nil) except that closing a nil NativeChannel will panic.
func NewDeadChannel ¶
func NewDeadChannel() DeadChannel
func (DeadChannel) Cap ¶
func (ch DeadChannel) Cap() BufferCap
func (DeadChannel) Close ¶
func (ch DeadChannel) Close()
func (DeadChannel) In ¶
func (ch DeadChannel) In() chan<- interface{}
func (DeadChannel) Len ¶
func (ch DeadChannel) Len() int
func (DeadChannel) Out ¶
func (ch DeadChannel) Out() <-chan interface{}
type InChannel ¶
type InChannel interface { SimpleInChannel Buffer }
InChannel is an interface representing a writeable channel with a buffer.
type InfiniteChannel ¶
type InfiniteChannel struct {
// contains filtered or unexported fields
}
InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output.
func NewInfiniteChannel ¶
func NewInfiniteChannel() *InfiniteChannel
func (*InfiniteChannel) Cap ¶
func (ch *InfiniteChannel) Cap() BufferCap
func (*InfiniteChannel) Close ¶
func (ch *InfiniteChannel) Close()
func (*InfiniteChannel) In ¶
func (ch *InfiniteChannel) In() chan<- interface{}
func (*InfiniteChannel) Len ¶
func (ch *InfiniteChannel) Len() int
func (*InfiniteChannel) Out ¶
func (ch *InfiniteChannel) Out() <-chan interface{}
type NativeChannel ¶
type NativeChannel chan interface{}
NativeChannel implements the Channel interface by wrapping a native go channel.
func NewNativeChannel ¶
func NewNativeChannel(size BufferCap) NativeChannel
NewNativeChannel makes a new NativeChannel with the given buffer size. Just a convenience wrapper to avoid having to cast the result of make().
func (NativeChannel) Cap ¶
func (ch NativeChannel) Cap() BufferCap
func (NativeChannel) Close ¶
func (ch NativeChannel) Close()
func (NativeChannel) In ¶
func (ch NativeChannel) In() chan<- interface{}
func (NativeChannel) Len ¶
func (ch NativeChannel) Len() int
func (NativeChannel) Out ¶
func (ch NativeChannel) Out() <-chan interface{}
type NativeInChannel ¶
type NativeInChannel chan<- interface{}
NativeInChannel implements the InChannel interface by wrapping a native go write-only channel.
func (NativeInChannel) Cap ¶
func (ch NativeInChannel) Cap() BufferCap
func (NativeInChannel) Close ¶
func (ch NativeInChannel) Close()
func (NativeInChannel) In ¶
func (ch NativeInChannel) In() chan<- interface{}
func (NativeInChannel) Len ¶
func (ch NativeInChannel) Len() int
type NativeOutChannel ¶
type NativeOutChannel <-chan interface{}
NativeOutChannel implements the OutChannel interface by wrapping a native go read-only channel.
func (NativeOutChannel) Cap ¶
func (ch NativeOutChannel) Cap() BufferCap
func (NativeOutChannel) Len ¶
func (ch NativeOutChannel) Len() int
func (NativeOutChannel) Out ¶
func (ch NativeOutChannel) Out() <-chan interface{}
type OutChannel ¶
type OutChannel interface { SimpleOutChannel Buffer }
OutChannel is an interface representing a readable channel implementing the Buffer interface.
type OverflowingChannel ¶
type OverflowingChannel struct {
// contains filtered or unexported fields
}
OverflowingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to an OverflowingChannel when its buffer is full (or, in an unbuffered case, when the recipient is not ready) then that value is simply discarded. Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the oldest element, not the newest) see RingChannel.
func NewOverflowingChannel ¶
func NewOverflowingChannel(size BufferCap) *OverflowingChannel
func (*OverflowingChannel) Cap ¶
func (ch *OverflowingChannel) Cap() BufferCap
func (*OverflowingChannel) Close ¶
func (ch *OverflowingChannel) Close()
func (*OverflowingChannel) In ¶
func (ch *OverflowingChannel) In() chan<- interface{}
func (*OverflowingChannel) Len ¶
func (ch *OverflowingChannel) Len() int
func (*OverflowingChannel) Out ¶
func (ch *OverflowingChannel) Out() <-chan interface{}
type ResizableChannel ¶
type ResizableChannel struct {
// contains filtered or unexported fields
}
ResizableChannel implements the Channel interface with a resizable buffer between the input and the output. The channel initially has a buffer size of 1, but can be resized by calling Resize().
Resizing to a buffer capacity of None is, unfortunately, not supported and will panic (see https://github.com/eapache/channels/issues/1). Resizing back and forth between a finite and infinite buffer is fully supported.
func NewResizableChannel ¶
func NewResizableChannel() *ResizableChannel
func (*ResizableChannel) Cap ¶
func (ch *ResizableChannel) Cap() BufferCap
func (*ResizableChannel) Close ¶
func (ch *ResizableChannel) Close()
func (*ResizableChannel) In ¶
func (ch *ResizableChannel) In() chan<- interface{}
func (*ResizableChannel) Len ¶
func (ch *ResizableChannel) Len() int
func (*ResizableChannel) Out ¶
func (ch *ResizableChannel) Out() <-chan interface{}
func (*ResizableChannel) Resize ¶
func (ch *ResizableChannel) Resize(newSize BufferCap)
type RingChannel ¶
type RingChannel struct {
// contains filtered or unexported fields
}
RingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to a RingChannel when its buffer is full then the oldest value in the buffer is discarded to make room (just like a standard ring-buffer). Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the newest element, not the oldest) see OverflowingChannel.
func NewRingChannel ¶
func NewRingChannel(size BufferCap) *RingChannel
func (*RingChannel) Cap ¶
func (ch *RingChannel) Cap() BufferCap
func (*RingChannel) Close ¶
func (ch *RingChannel) Close()
func (*RingChannel) In ¶
func (ch *RingChannel) In() chan<- interface{}
func (*RingChannel) Len ¶
func (ch *RingChannel) Len() int
func (*RingChannel) Out ¶
func (ch *RingChannel) Out() <-chan interface{}
type SharedBuffer ¶
type SharedBuffer struct {
// contains filtered or unexported fields
}
SharedBuffer implements the Buffer interface, and permits multiple SimpleChannel instances to "share" a single buffer. Each channel spawned by NewChannel has its own internal queue (so values flowing through do not get mixed up with other channels) but the total number of elements buffered by all spawned channels is limited to a single capacity. This means *all* such channels block and unblock for writing together. The primary use case is for implementing pipeline-style parallelism with goroutines, limiting the total number of elements in the pipeline without limiting the number of elements at any particular step.
func NewSharedBuffer ¶
func NewSharedBuffer(size BufferCap) *SharedBuffer
func (*SharedBuffer) Cap ¶
func (buf *SharedBuffer) Cap() BufferCap
func (*SharedBuffer) Close ¶
func (buf *SharedBuffer) Close()
Close shuts down the SharedBuffer. It is an error to call Close while channels are still using the buffer (I'm not really sure what would happen if you do so).
func (*SharedBuffer) Len ¶
func (buf *SharedBuffer) Len() int
func (*SharedBuffer) NewChannel ¶
func (buf *SharedBuffer) NewChannel() SimpleChannel
NewChannel spawns and returns a new channel sharing the underlying buffer.
type SimpleChannel ¶
type SimpleChannel interface { SimpleInChannel SimpleOutChannel }
SimpleChannel is an interface representing a channel that is both readable and writeable, but does not necessarily implement the Buffer interface.
type SimpleInChannel ¶
type SimpleInChannel interface { In() chan<- interface{} // The writeable end of the channel. Close() // Closes the channel. It is an error to write to In() after calling Close(). }
SimpleInChannel is an interface representing a writeable channel that does not necessarily implement the Buffer interface.
type SimpleOutChannel ¶
type SimpleOutChannel interface {
Out() <-chan interface{} // The readable end of the channel.
}
SimpleOutChannel is an interface representing a readable channel that does not necessarily implement the Buffer interface.
func Wrap ¶
func Wrap(ch interface{}) SimpleOutChannel
Wrap takes any readable channel type (chan or <-chan but not chan<-) and exposes it as a SimpleOutChannel for easy integration with existing channel sources. It panics if the input is not a readable channel.