Documentation
¶
Index ¶
- func DoStream(outlet streams.Outlet, inlet streams.Inlet)
- func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
- func Merge(outlets ...streams.Flow) streams.Flow
- func Split(outlet streams.Outlet, cond func(interface{}) bool) [2]streams.Flow
- type Filter
- type FilterFunc
- type FlatMap
- type FlatMapFunc
- type Item
- type Map
- type MapFunc
- type PassThrough
- type PriorityQueue
- func (pq *PriorityQueue) Head() *Item
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() interface{}
- func (pq *PriorityQueue) Push(x interface{})
- func (pq PriorityQueue) Slice(start, end int) PriorityQueue
- func (pq PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
- type SlidingWindow
- type ThrottleMode
- type Throttler
- type TumblingWindow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Filter ¶
type Filter struct { FilterF FilterFunc // contains filtered or unexported fields }
Filter stream flow
func NewFilter ¶
func NewFilter(f FilterFunc, parallelism uint) *Filter
NewFilter returns new Filter instance FilterFunc - resolver function parallelism - parallelism factor, in case events order matters use parallelism = 1
type FlatMap ¶
type FlatMap struct { FlatMapF FlatMapFunc // contains filtered or unexported fields }
FlatMap function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [---------- FlatMapFunc ----------] | | | | |
out -- 1' - 2' -------- 4'- 4”- 5' -
func NewFlatMap ¶
func NewFlatMap(f FlatMapFunc, parallelism uint) *FlatMap
NewFlatMap returns new FlatMap instance FlatMapFunc - transformation function parallelism - parallelism factor, in case events order matters use parallelism = 1
func (*FlatMap) In ¶
func (fm *FlatMap) In() chan<- interface{}
In returns channel for receiving data
type Item ¶
type Item struct { Msg interface{} // contains filtered or unexported fields }
Item of PriorityQueue
type Map ¶
type Map struct { MapF MapFunc // contains filtered or unexported fields }
Map function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | | [----------- MapFunc -------------] | | | | |
out -- 1' - 2' --- 3' - 4' ----- 5' -
type PassThrough ¶
type PassThrough struct {
// contains filtered or unexported fields
}
PassThrough flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
| | | | |
out -- 1 -- 2 ---- 3 -- 4 ------ 5 --
func NewPassThrough ¶
func NewPassThrough() *PassThrough
NewPassThrough returns new PassThrough instance
func (*PassThrough) In ¶
func (pt *PassThrough) In() chan<- interface{}
In returns channel for receiving data
func (*PassThrough) Out ¶
func (pt *PassThrough) Out() <-chan interface{}
Out returns channel for sending data
type PriorityQueue ¶
type PriorityQueue []*Item
PriorityQueue implements heap.Interface
func (PriorityQueue) Slice ¶
func (pq PriorityQueue) Slice(start, end int) PriorityQueue
Slice Queue
func (*PriorityQueue) Update ¶
func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
Update item epoch
type SlidingWindow ¶
定义每隔‘silide’时间,统计当前时间过去‘size’时间内的数据的窗口
func NewSlidingWindow ¶
func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow
NewSlidingWindow 返回一个新的滑块处理窗口, size - 生成窗口的时间跨度 slide - 每次滑动的时间
func NewSlidingWindowWithTsExtractor ¶
func NewSlidingWindowWithTsExtractor(size time.Duration, slide time.Duration, timestampExtractor func(interface{}) int64) *SlidingWindow
生成一个新的时间滑动窗口,可以包容无序事件、延迟事件或者重复出现的事件,并给出正确的过滤结果 size - 生成窗口的事件跨度 slide - 每次滑动的时间 timestampExtractor 时间戳(一纳秒为单位,对应flink时间水印)
type ThrottleMode ¶
type ThrottleMode int8
ThrottleMode defines Throttler behavior on buffer overflow
const ( // Backpressure on overflow mode Backpressure ThrottleMode = iota // Discard on overflow mode Discard )
type Throttler ¶
Throttler limits the throughput to a specific number of elements per time unit
func NewThrottler ¶
NewThrottler returns new Throttler instance elements - number of elements per - time unit buffer - buffer channel size mode - flow behavior on buffer overflow
func (*Throttler) In ¶
func (th *Throttler) In() chan<- interface{}
In returns channel for receiving data
type TumblingWindow ¶
每隔‘size’时间统计一次过去‘size’时间内的数据,窗口数据不重叠
func NewTumblingWindow ¶
func NewTumblingWindow(size time.Duration) *TumblingWindow
返回一个‘TumblingWindow’窗口实例