Documentation ¶
Index ¶
- func DoStream(outlet streams.Outlet, inlet streams.Inlet)
- func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
- func Merge(outlets ...streams.Flow) streams.Flow
- func Split(outlet streams.Outlet, cond func(interface{}) bool) [2]streams.Flow
- type Filter
- type FilterFunc
- type FlatMap
- type FlatMapFunc
- type Item
- type Map
- type MapFunc
- type PassThrough
- type PriorityQueue
- func (pq *PriorityQueue) Head() *Item
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq PriorityQueue) Slice(start, end int) PriorityQueue
- func (pq PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
- type SlidingWindow
- type ThrottleMode
- type Throttler
- type TumblingWindow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoStream ¶
func DoStream(outlet streams.Outlet, inlet streams.Inlet)
DoStream from inlet to outlet
Types ¶
type Filter ¶
type Filter struct { FilterF FilterFunc // contains filtered or unexported fields }
Filter stream flow
func NewFilter ¶
func NewFilter(f FilterFunc, parallelism uint) *Filter
NewFilter returns new Filter instance FilterFunc - resolver function parallelism - parallelism factor, in case events order matters use parallelism = 1
type FlatMap ¶
type FlatMap struct { FlatMapF FlatMapFunc // contains filtered or unexported fields }
FlatMap function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [---------- FlatMapFunc ----------] | | | | |
out -- 1' - 2' -------- 4'- 4”- 5' -
func NewFlatMap ¶
func NewFlatMap(f FlatMapFunc, parallelism uint) *FlatMap
NewFlatMap returns new FlatMap instance FlatMapFunc - transformation function parallelism - parallelism factor, in case events order matters use parallelism = 1
func (*FlatMap) In ¶
func (fm *FlatMap) In() chan<- interface{}
In returns channel for receiving data
type Item ¶
type Item struct { Msg interface{} // contains filtered or unexported fields }
Item of PriorityQueue
type Map ¶
type Map struct { MapF MapFunc // contains filtered or unexported fields }
Map function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [----------- MapFunc -------------] | | | | |
out -- 1' - 2' --- 3' - 4' ----- 5' -
type PassThrough ¶
type PassThrough struct {
// contains filtered or unexported fields
}
PassThrough flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | |
out -- 1 -- 2 ---- 3 -- 4 ------ 5 --
func NewPassThrough ¶
func NewPassThrough() *PassThrough
NewPassThrough returns new PassThrough instance
func (*PassThrough) In ¶
func (pt *PassThrough) In() chan<- interface{}
In returns channel for receiving data
func (*PassThrough) Out ¶
func (pt *PassThrough) Out() <-chan interface{}
Out returns channel for sending data
func (*PassThrough) Via ¶
func (pt *PassThrough) Via(flow streams.Flow) streams.Flow
Via streams data through given flow
type PriorityQueue ¶
type PriorityQueue []*Item
PriorityQueue implements heap.Interface
func (PriorityQueue) Slice ¶
func (pq PriorityQueue) Slice(start, end int) PriorityQueue
Slice Queue
func (*PriorityQueue) Update ¶
func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
Update item epoch
type SlidingWindow ¶
SlidingWindow flow Generates windows of a specified fixed size Slides by slide interval with records overlap
func NewSlidingWindow ¶
func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow
NewSlidingWindow returns new Processing time sliding window size - The size of the generated windows slide - The slide interval of the generated windows
func NewSlidingWindowWithTsExtractor ¶
func NewSlidingWindowWithTsExtractor(size time.Duration, slide time.Duration, timestampExtractor func(interface{}) int64) *SlidingWindow
NewSlidingWindowWithTsExtractor returns new Event time sliding window Gives correct results on out-of-order events, late events, or on replays of data size - The size of the generated windows slide - The slide interval of the generated windows timestampExtractor - The record timestamp (in nanoseconds) extractor
func (*SlidingWindow) In ¶
func (sw *SlidingWindow) In() chan<- interface{}
In returns channel for receiving data
func (*SlidingWindow) Out ¶
func (sw *SlidingWindow) Out() <-chan interface{}
Out returns channel for sending data
func (*SlidingWindow) To ¶
func (sw *SlidingWindow) To(sink streams.Sink)
To streams data to given sink
func (*SlidingWindow) Via ¶
func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow
Via streams data through given flow
type ThrottleMode ¶
type ThrottleMode int8
ThrottleMode defines Throttler behavior on buffer overflow
const ( // Backpressure on overflow mode Backpressure ThrottleMode = iota // Discard on overflow mode Discard )
type Throttler ¶
Throttler limits the throughput to a specific number of elements per time unit
func NewThrottler ¶
NewThrottler returns new Throttler instance elements - number of elements per - time unit buffer - buffer channel size mode - flow behavior on buffer overflow
func (*Throttler) In ¶
func (th *Throttler) In() chan<- interface{}
In returns channel for receiving data
type TumblingWindow ¶
TumblingWindow flow Generates windows of a specified window size Tumbling windows have a fixed size and do not overlap
func NewTumblingWindow ¶
func NewTumblingWindow(size time.Duration) *TumblingWindow
NewTumblingWindow returns new TumblingWindow instance size - The size of the generated windows
func (*TumblingWindow) In ¶
func (tw *TumblingWindow) In() chan<- interface{}
In returns channel for receiving data
func (*TumblingWindow) Out ¶
func (tw *TumblingWindow) Out() <-chan interface{}
Out returns channel for sending data
func (*TumblingWindow) To ¶
func (tw *TumblingWindow) To(sink streams.Sink)
To streams data to given sink
func (*TumblingWindow) Via ¶
func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow
Via streams data through given flow