Documentation ¶
Overview ¶
Package stream contain implementation of stream executors and operators
Index ¶
- type Drain
- type Stream
- func (s *Stream) Batch() *Stream
- func (s *Stream) BatchBySize(size int64) *Stream
- func (s *Stream) Filter(f interface{}) *Stream
- func (s *Stream) FlatMap(f interface{}) *Stream
- func (s *Stream) GroupByKey(key interface{}) *Stream
- func (s *Stream) GroupByName(name string) *Stream
- func (s *Stream) GroupByPos(pos int) *Stream
- func (s *Stream) Into(snk interface{}) *Stream
- func (s *Stream) Map(f interface{}) *Stream
- func (s *Stream) Open() <-chan error
- func (s *Stream) Process(f interface{}) *Stream
- func (s *Stream) ReStream() *Stream
- func (s *Stream) Reduce(seed, f interface{}) *Stream
- func (s *Stream) Sort() *Stream
- func (s *Stream) SortByKey(key interface{}) *Stream
- func (s *Stream) SortByName(name string) *Stream
- func (s *Stream) SortByPos(pos int) *Stream
- func (s *Stream) SortWith(f func(batch interface{}, i, j int) bool) *Stream
- func (s *Stream) Sum() *Stream
- func (s *Stream) SumAllKeys() *Stream
- func (s *Stream) SumByKey(key interface{}) *Stream
- func (s *Stream) SumByName(name string) *Stream
- func (s *Stream) SumByPos(pos int) *Stream
- func (s *Stream) Transform(op api.UnOperation) *Stream
- func (s *Stream) WithContext(ctx context.Context) *Stream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Drain ¶
type Drain struct {
// contains filtered or unexported fields
}
Drain is a generic sink that terminates streamed data
func (*Drain) GetOutput ¶
func (s *Drain) GetOutput() <-chan interface{}
GetOuput returns output channel for stream node
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a stream unto which executor nodes can be attached to operate on the streamed data
func (*Stream) BatchBySize ¶
func (*Stream) Filter ¶
Filter takes a predicate user-defined func that filters the stream. The specified function must be of type:
func (T) bool
If the func returns true, current item continues downstream.
func (*Stream) FlatMap ¶
FlatMap similar to Map, however, the user-defined function is expected to return a slice of values (instead of just one mapped value) for downstream operators. The FlatMap function flatten the slice, returned by the user-defined function, into items that are individually streamed. The user-defined function must have the the following type:
func(T) []R - where T is the incoming item and []R is a slice to be flattened
func (*Stream) GroupByKey ¶
GroupByKey groups incoming items that are batched as type []map[K]V where parameter key is used to group the items when K=key. Items with same key values are grouped in a new map and returned as []map[G]V.
See Also ¶
See batch operator function GroupByKey in
"github.com/gofunky/automi/operators/batch/"#GroupByKeyFunc
func (*Stream) GroupByName ¶
GroupByName groups incoming items that are batched as type []T where T is a struct. Parameter name is used to select T.name as key to group items with the same value into a map map[key][]T that is sent downstream.
See Also ¶
See batch operator function GroupByName in
"github.com/gofunky/automi/operators/batch"
func (*Stream) GroupByPos ¶
GroupByPos groups incoming items that are batched as [][]T. For each i in dimension 1, [i][pos] is selected as key and grouped in a map, map[key][]T, that is returned downstream.
See Also ¶
See the batch operator function GroupByPosFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) Map ¶
Map uses the user-defined function to take the value of an incoming item and returns a new value that is said to be mapped to the intial item. The user-defined function must be of type:
func(T) R - where T is the type of the incoming item and R the type of the returned item.
func (*Stream) Open ¶
Open opens the Stream which executes all operators nodes. If there's an issue prior to execution, an error is returned in the error channel.
func (*Stream) Process ¶
Process applies the user-defined function for general processing of incoming streamed elements. The user-defined function must be of type:
func(T) R - where T is the incoming item from upstream, R is the type of the processed value
See Also
"github.com/gofunky/automi/operators/unary"#ProcessFunc
func (*Stream) ReStream ¶
ReStream takes upstream items of types []slice []array, map[T] and emmits their elements as individual channel items to downstream operations. Items of other types are ignored.
func (*Stream) Reduce ¶
Reduce accumulates and reduces items from upstream into a single value using the initial seed value and the reduction binary function. The provided function must be of type:
func(S, T) R where S is the type of the partial result T is the incoming item from the stream R is the type of the result, to be used in the next call
If reductive operations are called after open-ended emitters (i.e. network service), they may never end.
func (*Stream) Sort ¶
Sort sorts incoming items that are batched as []T where value T is comparable. The operator returns sorted slice []T.
See Also ¶
See also the operator function SortFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) SortByKey ¶
SortByKey sorts incoming items that are batched as type []map[K]V where K is a comparable type specified by param key and used to sort the slice. The opertor returns a sorted []map[K]V.
See Also ¶
See also the operator function SortByKeyFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) SortByName ¶
SortByName sorts incoming items that are batched as []T where T struct with fields identified by param name. Value struct.<name> is used to sort the slice. The operator returns stored slice []T.
See Also ¶
See also the operator function SortByNameFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) SortByPos ¶
SortByPos sorts incoming items that are batched as [][]T where value at [][[pos]T is used to sort the slice. The operator returns sorted slice [][]T.
See Also ¶
See also the operator function SortByPosFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) SortWith ¶
SortWith sorts incoming items that are batched as []T using the provided Less function for applicaiton with the sort package.
See Also ¶
See also the operator function SortWithFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) Sum ¶
Sum sums up numeric items that are batched as []T or [][]T where T is an integer or a floating point value. The operator returns a single value of type float64.
See Also ¶
See also the operator function SumFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) SumByKey ¶
SumByKey sums up numeric items that are batched as []map[K]V or []map[K][]V where key specifies a K value that returns a V or a []V that is a numeric (or a slice of) value of type integer or floating point. If key == nil, the grand total sum of all values for all keys will be calculated.
This operator returns map[interface{}]float64{key:sum} where sum is the calculated sum.
See Also ¶
See also the operator function SumByKeyFunc in
"github.com/gofunky/automi/operators/batch"
func (*Stream) SumByName ¶
SumByName sums up items that are batched as []T where T is a struct. The name parameter sums up fields with name identifier and are of integer of floating point types. The operator returns a float64 value.
See Also ¶
See also the operator function SumByNameFunc in
"github.com/gofunky/automi/operator/batch"
func (*Stream) SumByPos ¶
SumByPos sums up items that are batched as []T or [][]T where T is an integer or floating point. Values [pos]T or [][pos]T are added and returned as a float64 value.
See Also ¶
See also the operator function SumByPosFunc in
"github.com/gofunky/automi/operator/batch"