flow

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2019 License: MIT Imports: 4 Imported by: 30

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet streams.Outlet, inlet streams.Inlet)

DoStream from inlet to outlet

func FanOut

func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow

FanOut the stream to magntude number of Flows

func Merge

func Merge(outlets ...streams.Flow) streams.Flow

Merge multiple flows

func Split

func Split(outlet streams.Outlet, cond func(interface{}) bool) [2]streams.Flow

Split stream to two flows first - satisfies the condition second - doesn't satisfy the condition

Types

type Filter

type Filter struct {
	FilterF FilterFunc
	// contains filtered or unexported fields
}

Filter stream flow

func NewFilter

func NewFilter(f FilterFunc, parallelism uint) *Filter

NewFilter returns new Filter instance FilterFunc - resolver function parallelism - parallelism factor, in case events order matters use parallelism = 1

func (*Filter) In

func (f *Filter) In() chan<- interface{}

In returns channel for receiving data

func (*Filter) Out

func (f *Filter) Out() <-chan interface{}

Out returns channel for sending data

func (*Filter) To

func (f *Filter) To(sink streams.Sink)

To streams data to given sink

func (*Filter) Via

func (f *Filter) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type FilterFunc

type FilterFunc func(interface{}) bool

FilterFunc resolver

type FlatMap

type FlatMap struct {
	FlatMapF FlatMapFunc
	// contains filtered or unexported fields
}

FlatMap function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[---------- FlatMapFunc ----------]
    |    |           |   |    |

out -- 1' - 2' -------- 4'- 4”- 5' -

func NewFlatMap

func NewFlatMap(f FlatMapFunc, parallelism uint) *FlatMap

NewFlatMap returns new FlatMap instance FlatMapFunc - transformation function parallelism - parallelism factor, in case events order matters use parallelism = 1

func (*FlatMap) In

func (fm *FlatMap) In() chan<- interface{}

In returns channel for receiving data

func (*FlatMap) Out

func (fm *FlatMap) Out() <-chan interface{}

Out returns channel for sending data

func (*FlatMap) To

func (fm *FlatMap) To(sink streams.Sink)

To streams data to given sink

func (*FlatMap) Via

func (fm *FlatMap) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type FlatMapFunc

type FlatMapFunc func(interface{}) []interface{}

FlatMapFunc transformer

type Item

type Item struct {
	Msg interface{}
	// contains filtered or unexported fields
}

Item of PriorityQueue

func NewItem

func NewItem(msg interface{}, epoch int64, index int) *Item

NewItem constructor

type Map

type Map struct {
	MapF MapFunc
	// contains filtered or unexported fields
}

Map function transformation flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[----------- MapFunc -------------]
    |    |      |    |        |

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap(f MapFunc, parallelism uint) *Map

NewMap returns new Map instance MapFunc - transformation function parallelism - parallelism factor, in case events order matters use parallelism = 1

func (*Map) In

func (m *Map) In() chan<- interface{}

In returns channel for receiving data

func (*Map) Out

func (m *Map) Out() <-chan interface{}

Out returns channel for sending data

func (*Map) To

func (m *Map) To(sink streams.Sink)

To streams data to given sink

func (*Map) Via

func (m *Map) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type MapFunc

type MapFunc func(interface{}) interface{}

MapFunc transformer

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough flow in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

|    |      |    |        |

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --

func NewPassThrough

func NewPassThrough() *PassThrough

NewPassThrough returns new PassThrough instance

func (*PassThrough) In

func (pt *PassThrough) In() chan<- interface{}

In returns channel for receiving data

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan interface{}

Out returns channel for sending data

func (*PassThrough) To

func (pt *PassThrough) To(sink streams.Sink)

To streams data to given sink

func (*PassThrough) Via

func (pt *PassThrough) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type PriorityQueue

type PriorityQueue []*Item

PriorityQueue implements heap.Interface

func (*PriorityQueue) Head

func (pq *PriorityQueue) Head() *Item

Head returns Queue head item

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

Len returns PriorityQueue length

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

Less comparator

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

Pop item from the Queue

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

Push item to the Queue

func (PriorityQueue) Slice

func (pq PriorityQueue) Slice(start, end int) PriorityQueue

Slice Queue

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

Swap items by indexes

func (*PriorityQueue) Update

func (pq *PriorityQueue) Update(item *Item, newEpoch int64)

Update item epoch

type SlidingWindow

type SlidingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SlidingWindow flow Generates windows of a specified fixed size Slides by slide interval with records overlap

func NewSlidingWindow

func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow

NewSlidingWindow returns new Processing time sliding window size - The size of the generated windows slide - The slide interval of the generated windows

func NewSlidingWindowWithTsExtractor

func NewSlidingWindowWithTsExtractor(size time.Duration, slide time.Duration,
	timestampExtractor func(interface{}) int64) *SlidingWindow

NewSlidingWindowWithTsExtractor returns new Event time sliding window Gives correct results on out-of-order events, late events, or on replays of data size - The size of the generated windows slide - The slide interval of the generated windows timestampExtractor - The record timestamp (in nanoseconds) extractor

func (*SlidingWindow) In

func (sw *SlidingWindow) In() chan<- interface{}

In returns channel for receiving data

func (*SlidingWindow) Out

func (sw *SlidingWindow) Out() <-chan interface{}

Out returns channel for sending data

func (*SlidingWindow) To

func (sw *SlidingWindow) To(sink streams.Sink)

To streams data to given sink

func (*SlidingWindow) Via

func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type ThrottleMode

type ThrottleMode int8

ThrottleMode defines Throttler behavior on buffer overflow

const (
	// Backpressure on overflow mode
	Backpressure ThrottleMode = iota
	// Discard on overflow mode
	Discard
)

type Throttler

type Throttler struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Throttler limits the throughput to a specific number of elements per time unit

func NewThrottler

func NewThrottler(elements uint, per time.Duration, buffer uint, mode ThrottleMode) *Throttler

NewThrottler returns new Throttler instance elements - number of elements per - time unit buffer - buffer channel size mode - flow behavior on buffer overflow

func (*Throttler) In

func (th *Throttler) In() chan<- interface{}

In returns channel for receiving data

func (*Throttler) Out

func (th *Throttler) Out() <-chan interface{}

Out returns channel for sending data

func (*Throttler) To

func (th *Throttler) To(sink streams.Sink)

To streams data to given sink

func (*Throttler) Via

func (th *Throttler) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

type TumblingWindow

type TumblingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TumblingWindow flow Generates windows of a specified window size Tumbling windows have a fixed size and do not overlap

func NewTumblingWindow

func NewTumblingWindow(size time.Duration) *TumblingWindow

NewTumblingWindow returns new TumblingWindow instance size - The size of the generated windows

func (*TumblingWindow) In

func (tw *TumblingWindow) In() chan<- interface{}

In returns channel for receiving data

func (*TumblingWindow) Out

func (tw *TumblingWindow) Out() <-chan interface{}

Out returns channel for sending data

func (*TumblingWindow) To

func (tw *TumblingWindow) To(sink streams.Sink)

To streams data to given sink

func (*TumblingWindow) Via

func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow

Via streams data through given flow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL