btrchannels

package
v0.0.0-...-19bd271 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: MIT, MIT Imports: 2 Imported by: 1

README

channels

Build Status GoDoc Code of Conduct

A collection of helper functions and special types for working with and extending Go's existing channels. Due to limitations of Go's type system, importing this library directly is often not practical for production code. It serves equally well, however, as a reference guide and template for implementing many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit in the resulting code.

See https://godoc.org/github.com/eapache/channels for full documentation or https://gopkg.in/eapache/channels.v1 for a versioned import path.

Requires Go version 1.1 or later, as certain necessary elements of the reflect package were not present in 1.0.

Most of the buffered channel types in this package are backed by a very fast queue implementation that used to be built into this package but has now been extracted into its own package at https://github.com/eapache/queue.

Note: Several types in this package provide so-called "infinite" buffers. Be very careful using these, as no buffer is truly infinite. If such a buffer grows too large your program will run out of memory and crash. Caveat emptor.

Documentation

Overview

package btrchannels provides a collection of helper functions, interfaces and implementations for working with and extending the capabilities of golang's existing channels. The main interface of interest is Channel, though sub-interfaces are also provided for cases where the full Channel interface cannot be met (for example, InChannel for write-only channels).

For integration with native typed golang channels, functions Wrap and Unwrap are provided which do the appropriate type conversions. The NativeChannel, NativeInChannel and NativeOutChannel type definitions are also provided for use with native channels which already carry values of type interface{}.

The heart of the package consists of several distinct implementations of the Channel interface, including channels backed by special buffers (resizable, infinite, ring buffers, etc) and other useful types. A "black hole" channel for discarding unwanted values (similar in purpose to ioutil.Discard or /dev/null) rounds out the set.

Helper functions for operating on Channels include Pipe and Tee (which behave much like their Unix namesakes), as well as Multiplex and Distribute. "Weak" versions of these functions also exist, which do not close their output channel(s) on completion.

Due to limitations of Go's type system, importing this library directly is often not practical for production code. It serves equally well, however, as a reference guide and template for implementing many common idioms; if you use it in this way I would appreciate the inclusion of some sort of credit in the resulting code.

Warning: several types in this package provide so-called "infinite" buffers. Be *very* careful using these, as no buffer is truly infinite - if such a buffer grows too large your program will run out of memory and crash. Caveat emptor.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Distribute

func Distribute[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])

Distribute takes a single input channel and an arbitrary number of output channels and duplicates each input into *one* available output. If multiple outputs are waiting for a value, one is chosen at random. When the input channel is closed, all outputs channels are closed. Distribute with a single output channel is equivalent to Pipe (though slightly less efficient).

func Multiplex

func Multiplex[T any](output SimpleInChannel[T], inputs ...SimpleOutChannel[T])

Multiplex takes an arbitrary number of input channels and multiplexes their output into a single output channel. When all input channels have been closed, the output channel is closed. Multiplex with a single input channel is equivalent to Pipe (though slightly less efficient).

func Pipe

func Pipe[T any](input SimpleOutChannel[T], output SimpleInChannel[T])

Pipe connects the input channel to the output channel so that they behave as if a single channel.

func Tee

func Tee[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])

Tee (like its Unix namesake) takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. Tee with a single output channel is equivalent to Pipe (though slightly less efficient).

func Unwrap

func Unwrap[T any](input SimpleOutChannel[T], output interface{})

Unwrap takes a SimpleOutChannel and uses reflection to pipe it to a typed native channel for easy integration with existing channel sources. Output can be any writable channel type (chan or chan<-). It panics if the output is not a writable channel, or if a value is received that cannot be sent on the output channel.

func WeakDistribute

func WeakDistribute[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])

WeakDistribute behaves like Distribute (distributing a single input amongst multiple outputs) except that it does not close the output channels when the input channel is closed.

func WeakMultiplex

func WeakMultiplex[T any](output SimpleInChannel[T], inputs ...SimpleOutChannel[T])

WeakMultiplex behaves like Multiplex (multiplexing multiple inputs into a single output) except that it does not close the output channel when the input channels are closed.

func WeakPipe

func WeakPipe[T any](input SimpleOutChannel[T], output SimpleInChannel[T])

WeakPipe behaves like Pipe (connecting the two channels) except that it does not close the output channel when the input channel is closed.

func WeakTee

func WeakTee[T any](input SimpleOutChannel[T], outputs ...SimpleInChannel[T])

WeakTee behaves like Tee (duplicating a single input into multiple outputs) except that it does not close the output channels when the input channel is closed.

Types

type BatchingChannel

type BatchingChannel[T any] struct {
	// contains filtered or unexported fields
}

BatchingChannel implements the Channel interface, with the change that instead of producing individual elements on Out(), it batches together the entire internal buffer each time. Trying to construct an unbuffered batching channel will panic, that configuration is not supported (and provides no benefit over an unbuffered NativeChannel).

func NewBatchingChannel

func NewBatchingChannel[T any](size BufferCap) *BatchingChannel[T]

func (*BatchingChannel[T]) Cap

func (ch *BatchingChannel[T]) Cap() BufferCap

func (*BatchingChannel[T]) Close

func (ch *BatchingChannel[T]) Close()

func (*BatchingChannel[T]) In

func (ch *BatchingChannel[T]) In() chan<- T

func (*BatchingChannel[T]) Len

func (ch *BatchingChannel[T]) Len() int

func (*BatchingChannel[T]) Out

func (ch *BatchingChannel[T]) Out() <-chan []T

Out returns a <-chan interface{} in order that BatchingChannel conforms to the standard Channel interface provided by this package, however each output value is guaranteed to be of type []interface{} - a slice collecting the most recent batch of values sent on the In channel. The slice is guaranteed to not be empty or nil. In practice the net result is that you need an additional type assertion to access the underlying values.

type BlackHole

type BlackHole[T any] struct {
	// contains filtered or unexported fields
}

BlackHole implements the InChannel interface and provides an analogue for the "Discard" variable in the ioutil package - it never blocks, and simply discards every value it reads. The number of items discarded in this way is counted and returned from Len.

func NewBlackHole

func NewBlackHole[T any]() *BlackHole[T]

func (*BlackHole[T]) Cap

func (ch *BlackHole[T]) Cap() BufferCap

func (*BlackHole[T]) Close

func (ch *BlackHole[T]) Close()

func (*BlackHole[T]) In

func (ch *BlackHole[T]) In() chan<- T

func (*BlackHole[T]) Len

func (ch *BlackHole[T]) Len() int

type Buffer

type Buffer interface {
	Len() int       // The number of elements currently buffered.
	Cap() BufferCap // The maximum number of elements that can be buffered.
}

Buffer is an interface for any channel that provides access to query the state of its buffer. Even unbuffered channels can implement this interface by simply returning 0 from Len() and None from Cap().

type BufferCap

type BufferCap int

BufferCap represents the capacity of the buffer backing a channel. Valid values consist of all positive integers, as well as the special values below.

const (
	// None is the capacity for channels that have no buffer at all.
	None BufferCap = 0
	// Infinity is the capacity for channels with no limit on their buffer size.
	Infinity BufferCap = -1
)

type Channel

type Channel[T any] interface {
	SimpleChannel[T]
	Buffer
}

Channel is an interface representing a channel that is readable, writeable and implements the Buffer interface

Example
var ch Channel[any]

ch = NewInfiniteChannel[any]()

for i := 0; i < 10; i++ {
	ch.In() <- nil
}

for i := 0; i < 10; i++ {
	<-ch.Out()
}
Output:

type DeadChannel

type DeadChannel[T any] struct{}

DeadChannel is a placeholder implementation of the Channel interface with no buffer that is never ready for reading or writing. Closing a dead channel is a no-op. Behaves almost like NativeChannel(nil) except that closing a nil NativeChannel will panic.

func NewDeadChannel

func NewDeadChannel[T any]() DeadChannel[T]

func (DeadChannel[T]) Cap

func (ch DeadChannel[T]) Cap() BufferCap

func (DeadChannel[T]) Close

func (ch DeadChannel[T]) Close()

func (DeadChannel[T]) In

func (ch DeadChannel[T]) In() chan<- T

func (DeadChannel[T]) Len

func (ch DeadChannel[T]) Len() int

func (DeadChannel[T]) Out

func (ch DeadChannel[T]) Out() <-chan T

type InChannel

type InChannel[T any] interface {
	SimpleInChannel[T]
	Buffer
}

InChannel is an interface representing a writeable channel with a buffer.

type InfiniteChannel

type InfiniteChannel[T any] struct {
	// contains filtered or unexported fields
}

InfiniteChannel implements the Channel interface with an infinite buffer between the input and the output.

func NewInfiniteChannel

func NewInfiniteChannel[T any]() *InfiniteChannel[T]

func (*InfiniteChannel[T]) Cap

func (ch *InfiniteChannel[T]) Cap() BufferCap

func (*InfiniteChannel[T]) Close

func (ch *InfiniteChannel[T]) Close()

func (*InfiniteChannel[T]) In

func (ch *InfiniteChannel[T]) In() chan<- T

func (*InfiniteChannel[T]) Len

func (ch *InfiniteChannel[T]) Len() int

func (*InfiniteChannel[T]) Out

func (ch *InfiniteChannel[T]) Out() <-chan T

type NativeChannel

type NativeChannel[T any] chan T

NativeChannel implements the Channel interface by wrapping a native go channel.

func NewNativeChannel

func NewNativeChannel[T any](size BufferCap) NativeChannel[T]

NewNativeChannel makes a new NativeChannel with the given buffer size. Just a convenience wrapper to avoid having to cast the result of make().

func (NativeChannel[T]) Cap

func (ch NativeChannel[T]) Cap() BufferCap

func (NativeChannel[T]) Close

func (ch NativeChannel[T]) Close()

func (NativeChannel[T]) In

func (ch NativeChannel[T]) In() chan<- T

func (NativeChannel[T]) Len

func (ch NativeChannel[T]) Len() int

func (NativeChannel[T]) Out

func (ch NativeChannel[T]) Out() <-chan T

type NativeInChannel

type NativeInChannel[T any] chan<- T

NativeInChannel implements the InChannel interface by wrapping a native go write-only channel.

func (NativeInChannel[T]) Cap

func (ch NativeInChannel[T]) Cap() BufferCap

func (NativeInChannel[T]) Close

func (ch NativeInChannel[T]) Close()

func (NativeInChannel[T]) In

func (ch NativeInChannel[T]) In() chan<- T

func (NativeInChannel[T]) Len

func (ch NativeInChannel[T]) Len() int

type NativeOutChannel

type NativeOutChannel[T any] <-chan T

NativeOutChannel implements the OutChannel interface by wrapping a native go read-only channel.

func (NativeOutChannel[T]) Cap

func (ch NativeOutChannel[T]) Cap() BufferCap

func (NativeOutChannel[T]) Len

func (ch NativeOutChannel[T]) Len() int

func (NativeOutChannel[T]) Out

func (ch NativeOutChannel[T]) Out() <-chan T

type OutChannel

type OutChannel[T any] interface {
	SimpleOutChannel[T]
	Buffer
}

OutChannel is an interface representing a readable channel implementing the Buffer interface.

type OverflowingChannel

type OverflowingChannel[T any] struct {
	// contains filtered or unexported fields
}

OverflowingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to an OverflowingChannel when its buffer is full (or, in an unbuffered case, when the recipient is not ready) then that value is simply discarded. Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the oldest element, not the newest) see RingChannel.

func NewOverflowingChannel

func NewOverflowingChannel[T any](size BufferCap) *OverflowingChannel[T]

func (*OverflowingChannel[T]) Cap

func (ch *OverflowingChannel[T]) Cap() BufferCap

func (*OverflowingChannel[T]) Close

func (ch *OverflowingChannel[T]) Close()

func (*OverflowingChannel[T]) In

func (ch *OverflowingChannel[T]) In() chan<- T

func (*OverflowingChannel[T]) Len

func (ch *OverflowingChannel[T]) Len() int

func (*OverflowingChannel[T]) Out

func (ch *OverflowingChannel[T]) Out() <-chan T

type ResizableChannel

type ResizableChannel[T any] struct {
	// contains filtered or unexported fields
}

ResizableChannel implements the Channel interface with a resizable buffer between the input and the output. The channel initially has a buffer size of 1, but can be resized by calling Resize().

Resizing to a buffer capacity of None is, unfortunately, not supported and will panic (see https://github.com/eapache/channels/issues/1). Resizing back and forth between a finite and infinite buffer is fully supported.

func NewResizableChannel

func NewResizableChannel[T any]() *ResizableChannel[T]

func (*ResizableChannel[T]) Cap

func (ch *ResizableChannel[T]) Cap() BufferCap

func (*ResizableChannel[T]) Close

func (ch *ResizableChannel[T]) Close()

func (*ResizableChannel[T]) In

func (ch *ResizableChannel[T]) In() chan<- T

func (*ResizableChannel[T]) Len

func (ch *ResizableChannel[T]) Len() int

func (*ResizableChannel[T]) Out

func (ch *ResizableChannel[T]) Out() <-chan T

func (*ResizableChannel[T]) Resize

func (ch *ResizableChannel[T]) Resize(newSize BufferCap)

type RingChannel

type RingChannel[T any] struct {
	// contains filtered or unexported fields
}

RingChannel implements the Channel interface in a way that never blocks the writer. Specifically, if a value is written to a RingChannel when its buffer is full then the oldest value in the buffer is discarded to make room (just like a standard ring-buffer). Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling the writer before the reader, so caveat emptor. For the opposite behaviour (discarding the newest element, not the oldest) see OverflowingChannel.

func NewRingChannel

func NewRingChannel[T any](size BufferCap) *RingChannel[T]

func (*RingChannel[T]) Cap

func (ch *RingChannel[T]) Cap() BufferCap

func (*RingChannel[T]) Close

func (ch *RingChannel[T]) Close()

func (*RingChannel[T]) In

func (ch *RingChannel[T]) In() chan<- T

func (*RingChannel[T]) Len

func (ch *RingChannel[T]) Len() int

func (*RingChannel[T]) Out

func (ch *RingChannel[T]) Out() <-chan T

type SharedBuffer

type SharedBuffer[T any] struct {
	// contains filtered or unexported fields
}

SharedBuffer implements the Buffer interface, and permits multiple SimpleChannel instances to "share" a single buffer. Each channel spawned by NewChannel has its own internal queue (so values flowing through do not get mixed up with other channels) but the total number of elements buffered by all spawned channels is limited to a single capacity. This means *all* such channels block and unblock for writing together. The primary use case is for implementing pipeline-style parallelism with goroutines, limiting the total number of elements in the pipeline without limiting the number of elements at any particular step.

Example
// never more than 3 elements in the pipeline at once
buf := NewSharedBuffer[int](3)

ch1 := buf.NewChannel()
ch2 := buf.NewChannel()

// or, instead of a straight pipe, implement your pipeline step
Pipe[int](ch1, ch2)

// inputs
go func() {
	for i := 0; i < 20; i++ {
		ch1.In() <- i
	}
	ch1.Close()
}()

for range ch2.Out() {
	// outputs
}

buf.Close()
Output:

func NewSharedBuffer

func NewSharedBuffer[T any](size BufferCap) *SharedBuffer[T]

func (*SharedBuffer[T]) Cap

func (buf *SharedBuffer[T]) Cap() BufferCap

func (*SharedBuffer[T]) Close

func (buf *SharedBuffer[T]) Close()

Close shuts down the SharedBuffer. It is an error to call Close while channels are still using the buffer (I'm not really sure what would happen if you do so).

func (*SharedBuffer[T]) Len

func (buf *SharedBuffer[T]) Len() int

func (*SharedBuffer[T]) NewChannel

func (buf *SharedBuffer[T]) NewChannel() SimpleChannel[T]

NewChannel spawns and returns a new channel sharing the underlying buffer.

type SimpleChannel

type SimpleChannel[T any] interface {
	SimpleInChannel[T]
	SimpleOutChannel[T]
}

SimpleChannel is an interface representing a channel that is both readable and writeable, but does not necessarily implement the Buffer interface.

type SimpleInChannel

type SimpleInChannel[T any] interface {
	In() chan<- T // The writeable end of the channel.
	Close()       // Closes the channel. It is an error to write to In() after calling Close().
}

SimpleInChannel is an interface representing a writeable channel that does not necessarily implement the Buffer interface.

type SimpleOutChannel

type SimpleOutChannel[T any] interface {
	Out() <-chan T // The readable end of the channel.
}

SimpleOutChannel is an interface representing a readable channel that does not necessarily implement the Buffer interface.

func Wrap

func Wrap[T any](ch <-chan T) SimpleOutChannel[T]

Wrap takes any readable channel type (chan or <-chan but not chan<-) and exposes it as a SimpleOutChannel for easy integration with existing channel sources. It panics if the input is not a readable channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL