Documentation ¶
Overview ¶
Package stream implements lazy functional streams using Go channels.
Streams consist of a single generator, zero or more operators, and a single terminator. Every stream shares a stop channel, which the owner may use to cleanly shut down the entire stream.
Streams exploit Go's lightweight channels and goroutines in order to implement concurrency without locking shared data structures.
This is intentionally reminiscent of the Java 8 Streams API
Index ¶
- func ForEach(in *Stream, f DoFunc) chan interface{}
- func NewJoiner() (*Stream, *Joiner)
- func Reduce(in *Stream, initVal interface{}, f ReduceFunc) chan interface{}
- func Split(in *Stream, f FilterFunc) (*Stream, *Stream)
- func Tee(in *Stream) (*Stream, *Stream)
- func Wait(in *Stream) chan interface{}
- type DoFunc
- type FilterFunc
- type Joiner
- type MapFunc
- type ReduceFunc
- type Repeater
- type Stream
- func Buffer(in *Stream, size int) *Stream
- func Chargen() *Stream
- func Copy(in *Stream, n int) []*Stream
- func Do(in *Stream, f DoFunc) *Stream
- func Filter(in *Stream, f FilterFunc) *Stream
- func Iota(args ...uint64) *Stream
- func Join(in ...*Stream) *Stream
- func Limit(in *Stream, mod api.LimitModifier) *Stream
- func Map(in *Stream, f MapFunc) *Stream
- func Null() *Stream
- func OnOffValve(in *Stream) (*Stream, chan<- bool)
- func Overflow(in *Stream) *Stream
- func Throttle(in *Stream, mod api.ThrottleModifier) *Stream
- func Ticker(d time.Duration) *Stream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ForEach ¶
ForEach adds a terminator onto the stream that consumes each element and runs the given function on them.
func NewJoiner ¶
NewJoiner creates a controllable joiner for the given stream. New streams can be created from the Joiner and later removed.
func Reduce ¶
func Reduce(in *Stream, initVal interface{}, f ReduceFunc) chan interface{}
Reduce adds a terminator onto the stream that accumulates a value using the given function and then returns it once the input stream terminates.
func Split ¶
func Split(in *Stream, f FilterFunc) (*Stream, *Stream)
Split splits the input stream by the given filter function
Types ¶
type DoFunc ¶
type DoFunc func(interface{})
DoFunc is the signature of a function that is called by Do
type FilterFunc ¶
type FilterFunc func(interface{}) bool
FilterFunc is the signature of a function that is called by Filter
type Joiner ¶
type Joiner struct {
// contains filtered or unexported fields
}
Joiner is a construct that combines multiple input streams into a single output stream.
type MapFunc ¶
type MapFunc func(interface{}) interface{}
MapFunc is the signature of a function that is called by Map
type ReduceFunc ¶
type ReduceFunc func(interface{}, interface{}) interface{}
ReduceFunc is the signature of a function called by Reduce.
type Repeater ¶
type Repeater struct {
// contains filtered or unexported fields
}
Repeater is a stream that repeats elements from one input to multiple output streams.
func NewRepeater ¶
NewRepeater creates a controllable repeater for the given stream. It acts as a Stream terminator for the parent stream, but allows new Streams to be dynamically created and removed from the parent stream. It consumes all events from the parent stream regardless of whether there are child streams or not. If this is not the desired behavior, attach a Valve between the parent and the repeater.
type Stream ¶
type Stream struct { // Ctrl is the control channel to the stream. Consumers may close // it to shut down the stream. Ctrl chan<- interface{} // Data is the data channel of the stream. Consumers may receive // from it to consume stream elements. Data <-chan interface{} }
Stream represents a stream consisting of a generator, zero or more operators, and zero or one terminators. Consumers receive stream elements from the Data channel and may terminate the stream by closing the Ctrl channel.
func Buffer ¶
Buffer stores up to the given number of elements from the input Stream before blocking
func Chargen ¶
func Chargen() *Stream
Chargen creates a generator that produces a stream of single-character strings from a pattern reminiscent of RFC864 chargen TCP/UDP services.
func Filter ¶
func Filter(in *Stream, f FilterFunc) *Stream
Filter adds a filter to a stream. For each element in the stream, a false return from the filter function causes the element to be discarded.
func Iota ¶
Iota creates a generator that produces the given count of int elements (or infinite, if not specified). Additional optional arguments specify the start and step value.
func Join ¶
Join combines multiple input Streams into a single output Stream. Closing the Join closes the input streams as well.
func Limit ¶
func Limit(in *Stream, mod api.LimitModifier) *Stream
Limit limits the number of results returned
func Map ¶
Map adds an operator in the stream that calls the given function for every element and forwards along the returned value.
func OnOffValve ¶
OnOffValve adds a simple on/off valve operator onto the stream. It defaults to off to prevent a race condition from accepting input before being able to be switched off.
func Throttle ¶
func Throttle(in *Stream, mod api.ThrottleModifier) *Stream
Throttle limits the number of events emitted by the stream