Documentation ¶
Overview ¶
Package flow provides streams.Flow implementations.
Index ¶
- func DoStream(outlet streams.Outlet, inlet streams.Inlet)
- func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
- func Merge(outlets ...streams.Flow) streams.Flow
- func Split(outlet streams.Outlet, predicate func(interface{}) bool) [2]streams.Flow
- type Filter
- type FilterPredicate
- type FlatMap
- type FlatMapFunction
- type Item
- type Map
- type MapFunction
- type PassThrough
- type PriorityQueue
- func (pq *PriorityQueue) Head() *Item
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq PriorityQueue) Slice(start, end int) PriorityQueue
- func (pq PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
- type SessionWindow
- type SlidingWindow
- type ThrottleMode
- type Throttler
- type TumblingWindow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoStream ¶
func DoStream(outlet streams.Outlet, inlet streams.Inlet)
DoStream streams data from the outlet to inlet.
func FanOut ¶
func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.
Types ¶
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [-------- FilterPredicate --------] | | |
out -- 1 -- 2 ------------------ 5 --
func NewFilter ¶
func NewFilter(filterPredicate FilterPredicate, parallelism uint) *Filter
NewFilter returns a new Filter instance.
filterPredicate is the boolean-valued filter function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
func (*Filter) In ¶
func (f *Filter) In() chan<- interface{}
In returns an input channel for receiving data
type FilterPredicate ¶ added in v0.7.0
type FilterPredicate func(interface{}) bool
FilterPredicate represents a filter predicate (boolean-valued function).
type FlatMap ¶
type FlatMap struct {
// contains filtered or unexported fields
}
FlatMap takes one element and produces zero, one, or more elements.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [-------- FlatMapFunction --------] | | | | |
out -- 1' - 2' -------- 4'- 4”- 5' -
func NewFlatMap ¶
func NewFlatMap(flatMapFunction FlatMapFunction, parallelism uint) *FlatMap
NewFlatMap returns a new FlatMap instance.
flatMapFunction is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
func (*FlatMap) In ¶
func (fm *FlatMap) In() chan<- interface{}
In returns an input channel for receiving data
type FlatMapFunction ¶ added in v0.7.0
type FlatMapFunction func(interface{}) []interface{}
FlatMapFunction represents a FlatMap transformation function.
type Item ¶
type Item struct { Msg interface{} // contains filtered or unexported fields }
Item represents a PriorityQueue item.
type Map ¶
type Map struct {
// contains filtered or unexported fields
}
Map takes one element and produces one element.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [--------- MapFunction -----------] | | | | |
out -- 1' - 2' --- 3' - 4' ----- 5' -
func NewMap ¶
func NewMap(mapFunction MapFunction, parallelism uint) *Map
NewMap returns a new Map instance.
mapFunction is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.
func (*Map) In ¶
func (m *Map) In() chan<- interface{}
In returns an input channel for receiving data
type MapFunction ¶ added in v0.7.0
type MapFunction func(interface{}) interface{}
MapFunction represents a Map transformation function.
type PassThrough ¶
type PassThrough struct {
// contains filtered or unexported fields
}
PassThrough retransmits incoming elements as is.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | |
out -- 1 -- 2 ---- 3 -- 4 ------ 5 --
func NewPassThrough ¶
func NewPassThrough() *PassThrough
NewPassThrough returns a new PassThrough instance.
func (*PassThrough) In ¶
func (pt *PassThrough) In() chan<- interface{}
In returns an input channel for receiving data
func (*PassThrough) Out ¶
func (pt *PassThrough) Out() <-chan interface{}
Out returns an output channel for sending data
func (*PassThrough) To ¶
func (pt *PassThrough) To(sink streams.Sink)
To streams data to the given sink
func (*PassThrough) Via ¶
func (pt *PassThrough) Via(flow streams.Flow) streams.Flow
Via streams data through the given flow
type PriorityQueue ¶
type PriorityQueue []*Item
PriorityQueue implements heap.Interface.
func (*PriorityQueue) Head ¶
func (pq *PriorityQueue) Head() *Item
Head returns the first item of the PriorityQueue without removing it.
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
Less is the items less comparator.
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
Pop implements heap.Interface.Pop. Removes and returns the Len() - 1 element.
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
Push implements heap.Interface.Push. Appends an item to the PriorityQueue.
func (PriorityQueue) Slice ¶
func (pq PriorityQueue) Slice(start, end int) PriorityQueue
Slice returns a sliced PriorityQueue using the given bounds.
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
Swap exchanges indexes of the items.
func (*PriorityQueue) Update ¶
func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
Update sets item priority and calls heap.Fix to re-establish the heap ordering.
type SessionWindow ¶ added in v0.7.0
SessionWindow generates groups of elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time.
func NewSessionWindow ¶ added in v0.7.0
func NewSessionWindow(inactivityGap time.Duration) *SessionWindow
NewSessionWindow returns a new SessionWindow instance.
inactivityGap is the gap of inactivity that closes a session window when occurred.
func (*SessionWindow) In ¶ added in v0.7.0
func (sw *SessionWindow) In() chan<- interface{}
In returns an input channel for receiving data
func (*SessionWindow) Out ¶ added in v0.7.0
func (sw *SessionWindow) Out() <-chan interface{}
Out returns an output channel for sending data
func (*SessionWindow) To ¶ added in v0.7.0
func (sw *SessionWindow) To(sink streams.Sink)
To streams data to the given sink
func (*SessionWindow) Via ¶ added in v0.7.0
func (sw *SessionWindow) Via(flow streams.Flow) streams.Flow
Via streams data through the given flow
type SlidingWindow ¶
SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.
func NewSlidingWindow ¶
func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow
NewSlidingWindow returns a new processing time based SlidingWindow. Processing time refers to the system time of the machine that is executing the respective operation.
size is the Duration of generated windows. slide is the sliding interval of generated windows.
func NewSlidingWindowWithTSExtractor ¶ added in v0.6.0
func NewSlidingWindowWithTSExtractor(size time.Duration, slide time.Duration, timestampExtractor func(interface{}) int64) *SlidingWindow
NewSlidingWindowWithTSExtractor returns a new event time based SlidingWindow. Event time is the time that each individual event occurred on its producing device. Gives correct results on out-of-order events, late events, or on replays of data.
size is the Duration of generated windows. slide is the sliding interval of generated windows. timestampExtractor is the record timestamp (in nanoseconds) extractor.
func (*SlidingWindow) In ¶
func (sw *SlidingWindow) In() chan<- interface{}
In returns an input channel for receiving data
func (*SlidingWindow) Out ¶
func (sw *SlidingWindow) Out() <-chan interface{}
Out returns an output channel for sending data
func (*SlidingWindow) To ¶
func (sw *SlidingWindow) To(sink streams.Sink)
To streams data to the given sink
func (*SlidingWindow) Via ¶
func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow
Via streams data through the given flow
type ThrottleMode ¶
type ThrottleMode int8
ThrottleMode defines the Throttler behavior on buffer overflow.
const ( // Backpressure on overflow mode. Backpressure ThrottleMode = iota // Discard elements on overflow mode. Discard )
type Throttler ¶
type Throttler struct {
// contains filtered or unexported fields
}
Throttler limits the throughput to a specific number of elements per time unit.
func NewThrottler ¶
func NewThrottler(elements uint, period time.Duration, bufferSize uint, mode ThrottleMode) *Throttler
NewThrottler returns a new Throttler instance.
elements is the maximum number of elements to be produced per the given period of time. bufferSize defines the incoming elements buffer size. mode defines the Throttler flow behavior on elements buffer overflow.
func (*Throttler) In ¶
func (th *Throttler) In() chan<- interface{}
In returns an input channel for receiving data
type TumblingWindow ¶
TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap.
func NewTumblingWindow ¶
func NewTumblingWindow(size time.Duration) *TumblingWindow
NewTumblingWindow returns a new TumblingWindow instance.
size is the Duration of generated windows.
func (*TumblingWindow) In ¶
func (tw *TumblingWindow) In() chan<- interface{}
In returns an input channel for receiving data
func (*TumblingWindow) Out ¶
func (tw *TumblingWindow) Out() <-chan interface{}
Out returns an output channel for sending data
func (*TumblingWindow) To ¶
func (tw *TumblingWindow) To(sink streams.Sink)
To streams data to the given sink
func (*TumblingWindow) Via ¶
func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow
Via streams data through the given flow