Documentation ¶
Index ¶
- Constants
- func EntriesEqual(e1, e2 Entry) bool
- func False(t Entry) bool
- func True(t Entry) bool
- type BiConsumer
- type BiFunction
- type CStream
- func (cs CStream) AddStreamFromChannels(channels []chan Entry) CStream
- func (cs CStream) AddStreamFromSlices(slices []EntrySlice, bufsize int) CStream
- func (cs CStream) AddStreams(streams []Stream) CStream
- func (cs CStream) Filter(predicate Predicate) CStream
- func (cs CStream) ForEach(consumer Consumer)
- type Collector
- func Filtering(predicate Predicate, collector Collector) Collector
- func FlatMapping(mapper StreamFunction, collector Collector) Collector
- func GroupingBy(classifier Function, downstream Collector) Collector
- func Mapping(mapper Function, collector Collector) Collector
- func NewCollector(supplier Supplier, accumulator BiFunction, finisher Function) Collector
- func Reducing(f2 BiFunction) Collector
- func ToEntrySlice() Collector
- type Consumer
- type Entry
- type EntryBool
- type EntryByte
- type EntryFloat
- type EntryInt
- type EntryMap
- type EntrySlice
- type EntryString
- type FloatStream
- type Function
- type IntStream
- type Maybe
- type Predicate
- type Stream
- func (s Stream) AllMatch(p Predicate) bool
- func (s Stream) AnyMatch(p Predicate) bool
- func (s Stream) Collect(c Collector) interface{}
- func (s Stream) Concurrent(n int) Stream
- func (s Stream) Count() int
- func (s Stream) Distinct() Stream
- func (s Stream) Drop(n uint64) Stream
- func (s Stream) DropUntil(p Predicate) Stream
- func (s Stream) DropWhile(p Predicate) Stream
- func (s Stream) EndsWith(slice EntrySlice) bool
- func (s Stream) Filter(predicate Predicate) Stream
- func (s Stream) FlatMap(mapper StreamFunction) Stream
- func (s Stream) ForEach(consumer Consumer)
- func (s Stream) ForEachC(consumer Consumer)
- func (s Stream) GroupBy(classifier Function) EntryMap
- func (s Stream) Head() Entry
- func (s Stream) HeadN(n uint64) EntrySlice
- func (s Stream) Intersperse(e Entry) Stream
- func (s Stream) Last() Entry
- func (s Stream) LastN(n uint64) EntrySlice
- func (s Stream) LeftReduce(f2 BiFunction) Entry
- func (s Stream) Limit(n uint64) Stream
- func (s Stream) Map(mapper Function) Stream
- func (s Stream) MapToFloat(toFloat ToFloatFunction) FloatStream
- func (s Stream) MapToInt(toInt ToIntFunction) IntStream
- func (s Stream) NoneMatch(p Predicate) bool
- func (s Stream) Peek(consumer Consumer) Stream
- func (s Stream) Reduce(f2 BiFunction) Entry
- func (s Stream) StartsWith(slice EntrySlice) bool
- func (s Stream) Take(n uint64) Stream
- func (s Stream) TakeUntil(p Predicate) Stream
- func (s Stream) TakeWhile(p Predicate) Stream
- func (s Stream) ToSlice() EntrySlice
- type StreamFunction
- type Supplier
- type ToFloatFunction
- type ToIntFunction
- type Tuple0
- type Tuple1
- type Tuple2
- type Tuple3
Examples ¶
Constants ¶
const PanicInvalidConcurrencyLevel = "stream concurrency must be at least 0"
PanicInvalidConcurrencyLevel signifies that the Stream is missing a channel.
const PanicMissingChannel = "stream creation requires a channel"
PanicMissingChannel signifies that the Stream is missing a channel.
const PanicNoSuchElement = "no such element"
PanicNoSuchElement signifies that the requested element is not present.
Variables ¶
This section is empty.
Functions ¶
func EntriesEqual ¶
EntriesEqual checks the equality of 2 Entry objects. Note: EntriesEqual(&entry1, &entry2) will not produce the desired outcome with this method.
Types ¶
type BiConsumer ¶
type BiConsumer func(i, j Entry)
BiConsumer that accepts two arguments and does not return any value.
type BiFunction ¶
BiFunction that accepts two arguments and produces a result.
Example ¶
ExampleBiFunction shows how to use BiFunction's. There are more interesting examples through the code. Search for `BiFunction` or the BiFunction signature.
data := []Entry{ EntryString("four"), EntryString("twelve"), EntryString("one"), EntryString("six"), EntryString("three")} res := NewStreamFromSlice(data, 0). Reduce(concatenateStringsBiFunc) fmt.Printf("res = %+v\n", res)
Output: res = four-twelve-one-six-three
type CStream ¶
type CStream struct {
// contains filtered or unexported fields
}
CStream is a concurrent stream.
func NewCStream ¶
NewCStream creates a new concurrent stream.
func (CStream) AddStreamFromChannels ¶
AddStreamFromChannels adds Streams derived from the supplied channels to this CStream.
func (CStream) AddStreamFromSlices ¶
func (cs CStream) AddStreamFromSlices(slices []EntrySlice, bufsize int) CStream
AddStreamFromSlices adds Streams derived from the supplied slices to this CStream.
func (CStream) AddStreams ¶
AddStreams adds Streams to this CStream.
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
A Collector is a mutable reduction operation, optionally transforming the accumulated result.
Collectors can be combined to express complex operations in a concise manner.
In other words, a collector allows creating custom actions on a Stream. **fuego** comes shipped with a number of methods such as `MapToInt`, `Head`, `LastN`, `Filter`, etc, and Collectors also provide a few additional methods. But what if you need something else? And it is straighforward or readable when combining the existing methods fuego offers? Enters `Collector`: implement you own requirement functionally! Focus on *what* needs to be done in your streams (and delegate the details of the *how* to the implementation of your `Collector`).
It should be noted that the `finisher` function is optional (i.e. it may acceptably be `nil`).
Example
strs := EntrySlice{ EntryString("a"), EntryString("bb"), EntryString("cc"), EntryString("ddd"), } NewStreamFromSlice(strs, 1e3). Collect( GroupingBy( stringLength, Mapping( stringToUpper, ToEntryMap()))) // Result: map[1:[A] 2:[BB CC] 3:[DDD]]
func Filtering ¶
Filtering adapts the Entries a Collector accepts to a subset that satisfies the given predicate.
func FlatMapping ¶
func FlatMapping(mapper StreamFunction, collector Collector) Collector
FlatMapping adapts the Entries a Collector accepts to another type by applying a flat mapping function which maps input elements to a `Stream`.
func GroupingBy ¶
GroupingBy groups the elements of the downstream Collector by classifying them with the provided classifier function.
func NewCollector ¶
func NewCollector(supplier Supplier, accumulator BiFunction, finisher Function) Collector
NewCollector creates a new Collector.
func Reducing ¶
func Reducing(f2 BiFunction) Collector
Reducing returns a collector that performs a reduction of its input elements using the provided BiFunction.
func ToEntrySlice ¶
func ToEntrySlice() Collector
ToEntrySlice returns a collector that accumulates the input entries into an EntrySlice.
type Consumer ¶
type Consumer func(i Entry)
Consumer that accepts one argument and does not return any value.
type Entry ¶
type Entry interface { Hash() uint32 // TODO: remove Hash() since the project no longer includes collections? Hashes suffer from collision. Equal(Entry) bool }
Entry is the simplest behaviour that functional types must adhere to.
func IdentityFinisher ¶
IdentityFinisher is a basic finisher that returns the original value passed to it, unmodified.
type EntryBool ¶
type EntryBool bool
EntryBool is an Entry for 'bool'.
type EntryByte ¶
type EntryByte byte
EntryByte is an Entry for 'byte'.
type EntryFloat ¶
type EntryFloat float32
EntryFloat is an Entry for 'float32'.
func (EntryFloat) Equal ¶
func (f EntryFloat) Equal(e Entry) bool
Equal returns true if 'e' and 'f' are equal.
type EntryInt ¶
type EntryInt int
EntryInt is an Entry for 'int'.
type EntryMap ¶
EntryMap is an Entry for 'map[Entry]Entry'.
type EntrySlice ¶
type EntrySlice []Entry
EntrySlice is an Entry for '[]Entry'.
func (EntrySlice) Append ¶
func (es EntrySlice) Append(e Entry) EntrySlice
Append an Entry to this EntrySlice
func (EntrySlice) Equal ¶
func (es EntrySlice) Equal(e Entry) bool
Equal returns true if this type is equal to 'e'.
func (EntrySlice) Len ¶
func (es EntrySlice) Len() int
Len returns the number of Entries in this EntrySlice.
type EntryString ¶
type EntryString string
EntryString is an Entry for 'string'.
func (EntryString) Equal ¶
func (es EntryString) Equal(e Entry) bool
Equal returns true if 'e' and 'i' are equal.
func (EntryString) Len ¶
func (es EntryString) Len() EntryInt
Len transforms this EntryString to lower case.
func (EntryString) MapToEntryBytes ¶
func (es EntryString) MapToEntryBytes(bufsize int) Stream
MapToEntryBytes transforms the bytes of this EntryString to a Stream of EntryBytes.
func (EntryString) ToLower ¶
func (es EntryString) ToLower() EntryString
ToLower transforms this EntryString to lower case.
func (EntryString) ToUpper ¶
func (es EntryString) ToUpper() EntryString
ToUpper transforms this EntryString to upper case.
type FloatStream ¶
type FloatStream struct {
Stream
}
FloatStream is a sequence of EntryFloat elements supporting sequential and (in the future?) parallel operations.
The current implementation is based on `Stream` and an intermediary channel that converts incoming `EntryFloat` elements to `Entry`. This approach offers programming conciseness but the use of an intermediary channel likely decreases performance. This also means that type checking is weak on methods "borrowed" from `Stream` that expect `Entry` (instead of `EntryFloat`).
func NewConcurrentFloatStream ¶
func NewConcurrentFloatStream(c chan EntryFloat, n int) FloatStream
NewConcurrentFloatStream creates a new FloatStream with a degree of concurrency of n. This function leaves the provided channel is the same state of openness.
func NewFloatStream ¶
func NewFloatStream(c chan EntryFloat) FloatStream
NewFloatStream creates a new FloatStream. This function leaves the provided channel is the same state of openness.
func NewFloatStreamFromSlice ¶
func NewFloatStreamFromSlice(is []EntryFloat, bufsize int) FloatStream
NewFloatStreamFromSlice creates a new FloatStream from a Go slice of EntryFloat. The stream will be closed once all the slice data has been published.
func (FloatStream) Average ¶
func (is FloatStream) Average() EntryFloat
Average returns the average of the numbers in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction. This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
func (FloatStream) Max ¶
func (is FloatStream) Max() EntryFloat
Max returns the largest number in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction and is equivalent to:
is.Reduce(max) // where max is a BiFunction that returns // the largest of two integers.
This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
func (FloatStream) Min ¶
func (is FloatStream) Min() EntryFloat
Min returns the smallest number in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction and is equivalent to:
is.Reduce(min) // where min is a BiFunction that returns // the smallest of two integers.
This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
func (FloatStream) Sum ¶
func (is FloatStream) Sum() EntryFloat
Sum adds the numbers in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction and is equivalent to:
is.Reduce(sum) // where max is a BiFunction that adds // two integers.
This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
type Function ¶
Function that accepts one argument and produces a result.
Example ¶
ExampleFunction shows how to use Function's. There are more interesting examples through the code. Search for `Function` or the Function signature.
timesTwoFunction := timesTwo() res := timesTwoFunction(EntryInt(7)) fmt.Printf("res = %+v\n", res)
Output: res = 14
type IntStream ¶
type IntStream struct {
Stream
}
IntStream is a sequence of EntryInt elements supporting sequential and (in the future?) parallel operations.
The current implementation is based on `Stream` and an intermediary channel that converts incoming `EntryInt` elements to `Entry`. This approach offers programming conciseness but the use of an intermediary channel likely decreases performance. This also means that type checking is weak on methods "borrowed" from `Stream` that expect `Entry` (instead of `EntryInt`).
func NewConcurrentIntStream ¶
NewConcurrentIntStream creates a new IntStream with a degree of concurrency of n. This function leaves the provided channel is the same state of openness.
func NewIntStream ¶
NewIntStream creates a new IntStream. This function leaves the provided channel is the same state of openness.
func NewIntStreamFromSlice ¶
NewIntStreamFromSlice creates a new IntStream from a Go slice of EntryInt. The stream will be closed once all the slice data has been published.
func (IntStream) Average ¶
Average returns the average of the numbers in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction. This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
func (IntStream) Max ¶
Max returns the largest number in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction and is equivalent to:
is.Reduce(max) // where max is a BiFunction that returns // the largest of two integers.
This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
func (IntStream) Min ¶
Min returns the smallest number in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction and is equivalent to:
is.Reduce(min) // where min is a BiFunction that returns // the smallest of two integers.
This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
func (IntStream) Sum ¶
Sum adds the numbers in the stream. Panics if the channel is nil or the stream is empty. This is a special case of a reduction and is equivalent to:
is.Reduce(sum) // where max is a BiFunction that adds // two integers.
This is a terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
type Maybe ¶
type Maybe struct {
// contains filtered or unexported fields
}
A Maybe is a maybe monad.
Example ¶
ExampleMaybe shows ways to use a Maybe.
m1 := MaybeOf(EntryString("Hello World")) fmt.Printf("m1.Get=%v\n", m1.Get()) fmt.Printf("m1.GetOrElse=%v\n", m1.GetOrElse(EntryString("Bonjour le monde"))) m2 := MaybeOf(nil) if assert.PanicsWithValue(nil, PanicNoSuchElement, func() { fmt.Printf("m2.Get=%v\n", m2.Get()) }) { fmt.Println("m2.Get() panics with PanicNoSuchElement") } fmt.Printf("m2.GetOrElse=%v\n", m2.GetOrElse(EntryString("Bonjour le monde")))
Output: m1.Get=Hello World m1.GetOrElse=Hello World m2.Get() panics with PanicNoSuchElement m2.GetOrElse=Bonjour le monde
func MaybeOf ¶
MaybeOf creates a new Maybe with the given value. If the value is nil then return None otherwise Some(value). Note: MaybeOf(nil) == None() whereas MaybeSome(nil) == MaybeSome(nil).
func MaybeSome ¶
MaybeSome creates a new Maybe with the given value. Note: MaybeOf(nil) == None() whereas MaybeSome(nil) == MaybeSome(nil).
func (Maybe) Filter ¶
Filter returns MaybeSome(value) if this is a MaybeSome and the value satisfies the predicate otherwise returns MaybeNone.
type Predicate ¶
Predicate represents a predicate (boolean-valued function) of one argument.
Example ¶
ExamplePredicate shows how to use and combine Predicates.
package main import ( "fmt" Æ’ "github.com/seborama/fuego" ) func main() { res := Æ’.Predicate(Æ’.False).Negate()(Æ’.EntryInt(1)) fmt.Printf("Not False == %+v\n", res) res = Æ’.Predicate(Æ’.True).And(Æ’.False)(Æ’.EntryInt(1)) fmt.Printf("True and False == %+v\n", res) res = Æ’.Predicate(Æ’.True).Or(Æ’.False)(Æ’.EntryInt(1)) fmt.Printf("True or False == %+v\n", res) // You can use associativity too - part 1 of 2: // False And False Or True == true res = Æ’.Predicate(Æ’.False).And(Æ’.False).Or(Æ’.True)(Æ’.EntryInt(1)) fmt.Printf("False And False Or True == %+v\n", res) // You can use associativity too - part 2 of 2: // False And (False Or True) == false res = Æ’.Predicate(Æ’.False).And(Æ’.Predicate(Æ’.False).Or(Æ’.True))(Æ’.EntryInt(1)) fmt.Printf("False And (False Or True) == %+v\n", res) }
Output: Not False == true True and False == false True or False == true False And False Or True == true False And (False Or True) == false
Example (FunctionPredicate) ¶
ExamplePredicate_custom1 shows how to create a custom Predicate using the utility function Æ’.FunctionPredicate().
isEvenNumberPredicate := Æ’.FunctionPredicate(isEvenNumberFunction) res := isEvenNumberPredicate.And(Æ’.True)(Æ’.EntryInt(23)) fmt.Printf("res = %v", res)
Output: res = false
Example (Predicate) ¶
ExamplePredicate_custom2 shows how to create a custom Predicate from scratch. Notice how we get all Predicate helpers (And, Or, Not, etc) for "free".
res := intGreaterThanPredicate(50).And(Æ’.True).Negate()(Æ’.EntryInt(23)) fmt.Printf("res = %v", res)
Output: res = true
func FunctionPredicate ¶
FunctionPredicate creates a Predicate from a Function.
func (Predicate) And ¶
And is a composed predicate that represents a short-circuiting logical AND of this predicate and another.
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is a sequence of elements supporting sequential and (in the future?) parallel operations.
A Stream is a wrapper over a Go channel ('nil' channels are prohibited).
NOTE ¶
Concurrent streams are challenging to implement owing to ordering issues in parallel processing. At the moment, the view is that the most sensible approach is to delegate control to users. Multiple fuego streams can be created and data distributed across as desired. This empowers users of fuego to implement the desired behaviour of their pipelines.
As of v8.0.0, fuego offers ordered concurrency for some linear operations such as Map().
Creation ¶
When providing a Go channel to create a Stream, beware that until you close the channel, the Stream's internal Go function that processes the data on the channel will remain active. It will block until either new data is produced or the channel is closed by the producer. When a producer forgets to close the channel, the Go function will stray.
Streams created from a slice do not suffer from this issue because they are closed when the slice content is fully pushed to the Stream.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryInt(1), Æ’.EntryInt(2), Æ’.EntryInt(3), }, 1e3) // or if you already have a channel of Entry: c := make(chan Æ’.Entry) // you could add a buffer size as a second arg, if desired go func() { defer close(c) c <- Æ’.EntryString("one") c <- Æ’.EntryString("two") c <- Æ’.EntryString("three") // c <- ... }() NewStream(c)
func NewConcurrentStream ¶
NewConcurrentStream creates a new Stream with a degree of concurrency of n.
func NewStream ¶
NewStream creates a new Stream.
This function leaves the provided channel is the same state of openness.
func NewStreamFromSlice ¶
func NewStreamFromSlice(slice EntrySlice, bufsize int) Stream
NewStreamFromSlice creates a new Stream from a Go slice.
The slice data is published to the stream after which the stream is closed.
func (Stream) AllMatch ¶
AllMatch returns whether all of the elements in the stream satisfy the predicate.
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). AllMatch(func(e Æ’.Entry) bool { return strings.Contains(string(e.(Æ’.EntryString)), "t") }) // Result: true
func (Stream) AnyMatch ¶
AnyMatch returns whether any of the elements in the stream satisfies the predicate.
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("four"), }, 1e3). AnyMatch(func(e Æ’.Entry) bool { return e.Equal(Æ’.EntryString("three")) }) // Result: true
func (Stream) Collect ¶
Collect reduces and optionally mutates the stream with the supplied Collector.
It should be noted that this method returns an `interface{}` which enables it to return `Entry` as well as any other Go types.
Also, the `finisher` function is optional. (i.e. it may acceptably be `nil`).
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
Example
strs := EntrySlice{ EntryString("a"), EntryString("bb"), EntryString("cc"), EntryString("ddd"), } NewStreamFromSlice(strs, 1e3). Collect( GroupingBy( stringLength, Mapping( stringToUpper, Filtering( stringLengthGreaterThan(1), ToEntrySlice())))) // Result: map[1:[] 2:[BB CC] 3:[DDD]]
func (Stream) Concurrent ¶
Concurrent sets the level of concurrency for this Stream.
This is used for concurrent methods such as Stream.Map.
Consumption is ordered by the stream's channel but output may be unordered (a slow consumer will be "out-raced" by faster consumers). Ordering is dependent on the implementation of concurrency. For instance Stream.Map() is orderly but Stream.ForEachC is not.
Note that to switch off concurrency, you should provide n = 0. With n = 1, concurrency is internal whereby the Stream writer will not block on writing a single element (i.e. buffered channel of 1). This already provides significant processing gains.
Performance:
Channels are inherently expensive to use owing to their internal mutex lock.
Benefits will ONLY be observed when the execution has a degree of latency (at the very least, several dozens of nanoseconds). The higher the latency, the better the gains from concurrency (even on a single CPU core).
If latency is too low or next to none, using concurrency will likely be slower than without, particularly when no CPU core is available.
func (Stream) Count ¶
Count the number of elements in the stream.
This is a special case of a reduction and is equivalent to:
s.MapToInt(func(Entry) { return EntryInt(1) }).Sum()
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
func (Stream) Distinct ¶
Distinct returns a stream of the distinct elements of this stream.
This operation is costly both in time and in memory. It is strongly recommended to use buffered channels for this operation.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
func (Stream) Drop ¶
Drop the first 'n' elements of this stream and returns a new stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). Drop(2) // Result: Stream of Æ’.EntryString("fourth")
func (Stream) DropUntil ¶
DropUntil drops the first elements of this stream until the predicate is satisfied and returns a new stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). DropUntil(func(e Æ’.Entry) bool { return e.Equal(Æ’.EntryString("fourth")) }) // Result: Stream of Æ’.EntryString("three") and Æ’.EntryString("two")
func (Stream) DropWhile ¶
DropWhile drops the first elements of this stream while the predicate is satisfied and returns a new stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). DropWhile(func(e Æ’.Entry) bool { return e.Equal(Æ’.EntryString("three")) }) // Result: Stream of Æ’.EntryString("two") and Æ’.EntryString("fourth")
func (Stream) EndsWith ¶
func (s Stream) EndsWith(slice EntrySlice) bool
EndsWith returns true when this stream ends with the supplied elements.
This is a potentially expensive method since it has to consume all the elements in the Stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). EndsWith([]Æ’.Entry{Æ’.EntryString("two"), Æ’.EntryString("fourth")}) // Result: true
func (Stream) Filter ¶
Filter returns a stream consisting of the elements of this stream that match the given predicate.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
s := Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryInt(1), Æ’.EntryInt(2), Æ’.EntryInt(3), }, 0) s.Filter( FunctionPredicate(entryIntEqualsTo(Æ’.EntryInt(1))). Or( FunctionPredicate(entryIntEqualsTo(Æ’.EntryInt(3)))), ) // Result: []Æ’.EntryInt{1,3}
func (Stream) FlatMap ¶
func (s Stream) FlatMap(mapper StreamFunction) Stream
FlatMap takes a StreamFunction to flatten the entries in this stream and produce a new stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
See: example_stream_test.go
Example ¶
a := EntrySlice{EntryInt(1), EntryInt(2), EntryInt(3)} b := EntrySlice{EntryInt(4), EntryInt(5)} c := EntrySlice{EntryInt(6), EntryInt(7), EntryInt(8)} sliceOfEntrySlicesOfEntryInts := EntrySlice{a, b, c} fmt.Printf("Before flattening: %+v\n", sliceOfEntrySlicesOfEntryInts) sliceOfEntryInts := NewStreamFromSlice(sliceOfEntrySlicesOfEntryInts, 0). FlatMap(FlattenEntrySliceToEntry(0)). Collect(ToEntrySlice()) fmt.Printf("After flattening: %+v\n", sliceOfEntryInts)
Output: Before flattening: [[1 2 3] [4 5] [6 7 8]] After flattening: [1 2 3 4 5 6 7 8]
func (Stream) ForEach ¶
ForEach executes the given function for each entry in this stream.
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
Example
total := 0 computeSumTotal := func(value Æ’.Entry) { total += int(value.(Æ’.EntryInt).Value()) } s := Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryInt(1), Æ’.EntryInt(2), Æ’.EntryInt(3), }, 0). ForEach(calculateSumTotal) // Result: total == 6
func (Stream) ForEachC ¶
ForEachC is a concurrent wrapper of ForEach.
The level of concurrency is set by the last call made to method Concurrent.
See 'ForEach' for full details.
Note that this method consumes the stream orderly but does NOT preserve order of output.
func (Stream) GroupBy ¶
GroupBy groups the elements of this Stream by classifying them.
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
See: example_stream_test.go
Example ¶
ExampleStream_GroupBy shows a use of Stream's with GroupBy.
data := EntrySlice{ Tuple2{E1: EntryInt(1), E2: EntryString("one")}, Tuple2{E1: EntryInt(2), E2: EntryString("two")}, Tuple2{E1: EntryInt(3), E2: EntryString("three")}, Tuple2{E1: EntryInt(4), E2: EntryString("four")}, Tuple2{E1: EntryInt(5), E2: EntryString("five")}, Tuple2{E1: EntryInt(6), E2: EntryString("six")}, Tuple2{E1: EntryInt(7), E2: EntryString("seven")}, Tuple2{E1: EntryInt(8), E2: EntryString("eight")}, Tuple2{E1: EntryInt(9), E2: EntryString("nine")}} resMap := map[Entry]interface{}{} NewStreamFromSlice(data, 0). GroupBy(func(i Entry) Entry { return i.(Tuple2).E1.(EntryInt) & 1 }). Stream(0). ForEach(func(e Entry) { resMap[e.(Tuple2).E1] = e.(Tuple2).E2 }) for i := 0; i < len(resMap); i++ { fmt.Printf("%d => %v\n", i, resMap[EntryInt(i)]) }
Output: 0 => [{2 two} {4 four} {6 six} {8 eight}] 1 => [{1 one} {3 three} {5 five} {7 seven} {9 nine}]
func (Stream) Head ¶
Head returns the first Entry in this stream.
This function only consumes at most one element from the stream.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). Head() // Result: Æ’.EntryString("three")
func (Stream) HeadN ¶
func (s Stream) HeadN(n uint64) EntrySlice
HeadN returns a slice of the first n elements in this stream.
This function only consumes at most 'n' elements from the stream.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). HeadN(2) // Result: []Æ’.Entry{Æ’.EntryString("three"), Æ’.EntryString("two")}
func (Stream) Intersperse ¶
Intersperse inserts an element between all elements of this Stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("four"), }, 1e3). Intersperse(Æ’.EntryString(" - ")) // Result: "three - two - four"
func (Stream) Last ¶
Last returns the last Entry in this stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). Last() // Result: Æ’.EntryString("fourth")
func (Stream) LastN ¶
func (s Stream) LastN(n uint64) EntrySlice
LastN returns a slice of the last n elements in this stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). LastN(2) // Result: []Æ’.Entry{Æ’.EntryString("two"), Æ’.EntryString("fourth")}
func (Stream) LeftReduce ¶
func (s Stream) LeftReduce(f2 BiFunction) Entry
LeftReduce accumulates the elements of this Stream by applying the given function.
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("four"), Æ’.EntryString("twelve)", Æ’.EntryString("one"), Æ’.EntryString("six"), Æ’.EntryString("three"), }, 1e3). Reduce(concatenateStringsBiFunc) // Result: Æ’.EntryString("four-twelve-one-six-three")
func (Stream) Map ¶
Map returns a Stream consisting of the result of applying the given function to the elements of this stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
func (Stream) MapToFloat ¶
func (s Stream) MapToFloat(toFloat ToFloatFunction) FloatStream
MapToFloat produces an EntryFloat stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
func (Stream) MapToInt ¶
func (s Stream) MapToInt(toInt ToIntFunction) IntStream
MapToInt produces an EntryInt stream.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
func (Stream) NoneMatch ¶
NoneMatch returns whether none of the elements in the stream satisfies the predicate. It is the opposite of AnyMatch.
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("four"), }, 1e3). NoneMatch(func(e Æ’.Entry) bool { return e.Equal(Æ’.EntryString("nothing like this")) }) // Result: true
func (Stream) Peek ¶
Peek is akin to ForEach but returns the Stream.
This is useful e.g. for debugging.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
func (Stream) Reduce ¶
func (s Stream) Reduce(f2 BiFunction) Entry
Reduce is an alias for LeftReduce.
See LeftReduce for more info.
func (Stream) StartsWith ¶
func (s Stream) StartsWith(slice EntrySlice) bool
StartsWith returns true when this stream starts with the elements in the supplied slice.
This function only consume as much data from the stream as is necessary to prove (or disprove) it starts with the supplied slice data.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). StartsWith([]Æ’.Entry{Æ’.EntryString("three"), Æ’.EntryString("two")}) // Result: true
func (Stream) Take ¶
Take returns a stream of the first 'n' elements of this stream.
This function streams continuously until the 'n' elements are picked or the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). Take(2) // Result: Stream of []Æ’.Entry{Æ’.EntryString("three"), Æ’.EntryString("two")}
func (Stream) TakeUntil ¶
TakeUntil returns a stream of the first elements of this stream until the predicate is satisfied.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). TakeUntil(func(e Æ’.Entry) bool { return e.Equal(Æ’.EntryString("fourth")) }) // Result: Stream of []Æ’.Entry{Æ’.EntryString("three"), Æ’.EntryString("two")}
func (Stream) TakeWhile ¶
TakeWhile returns a stream of the first elements of this stream while the predicate is satisfied.
This function streams continuously until the in-stream is closed at which point the out-stream will be closed too.
Example
Æ’.NewStreamFromSlice([]Æ’.Entry{ Æ’.EntryString("three"), Æ’.EntryString("two"), Æ’.EntryString("fourth"), }, 1e3). TakeWhile(func(e Æ’.Entry) bool { return strings.HasPrefix(string(e.(Æ’.EntryString)), "t") }) // Result: Stream of []Æ’.Entry{Æ’.EntryString("three"), Æ’.EntryString("two")}
func (Stream) ToSlice ¶
func (s Stream) ToSlice() EntrySlice
ToSlice extracts the elements of the stream into an EntrySlice.
This is a special case of a reduction.
This is a continuous terminal operation and hence expects the producer to close the stream in order to complete (or it will block).
type StreamFunction ¶
StreamFunction that accepts one argument and produces a stream.
It is worth noting is `StreamFunction` which accepts one argument
and produces a stream. It is used with when "flat mapping" and
`Stream`. This effectively is a one to many operation, such as exploding the individual values of an EntrySlice into a Stream.
This effectively is a one to many operation, such as exploding the individual values of an EntrySlice into a Stream.
func FlattenEntrySliceToEntry ¶
func FlattenEntrySliceToEntry(bufsize int) StreamFunction
FlattenEntrySliceToEntry is a StreamFunction that flattens an EntrySlice to a Stream of its elements.
type ToFloatFunction ¶
type ToFloatFunction func(e Entry) EntryFloat
ToFloatFunction that accepts one argument and produces an EntryFloat result.
type ToIntFunction ¶
ToIntFunction that accepts one argument and produces an EntryInt result.
type Tuple0 ¶
type Tuple0 struct{}
Tuple0 is a tuple with 0 element.
func (Tuple0) ToSlice ¶
func (t Tuple0) ToSlice() EntrySlice
ToSlice returns the elements of this tuple as a Go slice.
type Tuple1 ¶
type Tuple1 struct {
E1 Entry
}
Tuple1 is a tuple with 1 element.
func (Tuple1) Map ¶
Map applies the supplied mapper to the element of this Tuple and returns a new Tuple.
func (Tuple1) MapMulti ¶
MapMulti applies the supplied mappers one for each element of this Tuple and returns a new Tuple.
func (Tuple1) ToSlice ¶
func (t Tuple1) ToSlice() EntrySlice
ToSlice returns the elements of this tuple as a Go slice.
type Tuple2 ¶
Tuple2 is a tuple with 2 elements.
func (Tuple2) Map ¶
Map applies the supplied mapper to all elements of this Tuple and returns a new Tuple.
func (Tuple2) MapMulti ¶
MapMulti applies the supplied mappers one for each element of this Tuple and returns a new Tuple.
func (Tuple2) ToSlice ¶
func (t Tuple2) ToSlice() EntrySlice
ToSlice returns the elements of this tuple as a Go slice.
type Tuple3 ¶
Tuple3 is a tuple with 3 elements.
func (Tuple3) Map ¶
Map applies the supplied mapper to all elements of this Tuple and returns a new Tuple.
func (Tuple3) MapMulti ¶
MapMulti applies the supplied mappers one for each element of this Tuple and returns a new Tuple.
func (Tuple3) ToSlice ¶
func (t Tuple3) ToSlice() EntrySlice
ToSlice returns the elements of this tuple as a Go slice.