flow

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2021 License: MIT Imports: 6 Imported by: 34

Documentation

Overview

Package flow provides streams.Flow implementations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet streams.Outlet, inlet streams.Inlet)

DoStream streams data from the outlet to inlet.

func FanOut

func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow

FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.

func Merge

func Merge(outlets ...streams.Flow) streams.Flow

Merge merges multiple flows into a single flow.

func Split

func Split(outlet streams.Outlet, predicate func(interface{}) bool) [2]streams.Flow

Split splits the stream into two flows according to the given boolean predicate.

Types

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[-------- FilterPredicate --------]
    |    |                    |

out -- 1 -- 2 ------------------ 5 --

func NewFilter

func NewFilter(filterPredicate FilterPredicate, parallelism uint) *Filter

NewFilter returns a new Filter instance.

filterPredicate is the boolean-valued filter function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Filter) In

func (f *Filter) In() chan<- interface{}

In returns an input channel for receiving data

func (*Filter) Out

func (f *Filter) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Filter) To

func (f *Filter) To(sink streams.Sink)

To streams data to the given sink

func (*Filter) Via

func (f *Filter) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type FilterPredicate added in v0.7.0

type FilterPredicate func(interface{}) bool

FilterPredicate represents a filter predicate (boolean-valued function).

type FlatMap

type FlatMap struct {
	// contains filtered or unexported fields
}

FlatMap takes one element and produces zero, one, or more elements.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[-------- FlatMapFunction --------]
    |    |           |   |    |

out -- 1' - 2' -------- 4'- 4”- 5' -

func NewFlatMap

func NewFlatMap(flatMapFunction FlatMapFunction, parallelism uint) *FlatMap

NewFlatMap returns a new FlatMap instance.

flatMapFunction is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*FlatMap) In

func (fm *FlatMap) In() chan<- interface{}

In returns an input channel for receiving data

func (*FlatMap) Out

func (fm *FlatMap) Out() <-chan interface{}

Out returns an output channel for sending data

func (*FlatMap) To

func (fm *FlatMap) To(sink streams.Sink)

To streams data to the given sink

func (*FlatMap) Via

func (fm *FlatMap) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type FlatMapFunction added in v0.7.0

type FlatMapFunction func(interface{}) []interface{}

FlatMapFunction represents a FlatMap transformation function.

type Item

type Item struct {
	Msg interface{}
	// contains filtered or unexported fields
}

Item represents a PriorityQueue item.

func NewItem

func NewItem(msg interface{}, epoch int64, index int) *Item

NewItem returns a new Item.

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map takes one element and produces one element.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[--------- MapFunction -----------]
    |    |      |    |        |

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap(mapFunction MapFunction, parallelism uint) *Map

NewMap returns a new Map instance.

mapFunction is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Map) In

func (m *Map) In() chan<- interface{}

In returns an input channel for receiving data

func (*Map) Out

func (m *Map) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Map) To

func (m *Map) To(sink streams.Sink)

To streams data to the given sink

func (*Map) Via

func (m *Map) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type MapFunction added in v0.7.0

type MapFunction func(interface{}) interface{}

MapFunction represents a Map transformation function.

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough retransmits incoming elements as is.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

|    |      |    |        |

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --

func NewPassThrough

func NewPassThrough() *PassThrough

NewPassThrough returns a new PassThrough instance.

func (*PassThrough) In

func (pt *PassThrough) In() chan<- interface{}

In returns an input channel for receiving data

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan interface{}

Out returns an output channel for sending data

func (*PassThrough) To

func (pt *PassThrough) To(sink streams.Sink)

To streams data to the given sink

func (*PassThrough) Via

func (pt *PassThrough) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type PriorityQueue

type PriorityQueue []*Item

PriorityQueue implements heap.Interface.

func (*PriorityQueue) Head

func (pq *PriorityQueue) Head() *Item

Head returns the first item of the PriorityQueue without removing it.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

Len returns the PriorityQueue length.

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

Less is the items less comparator.

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

Pop implements heap.Interface.Pop. Removes and returns the Len() - 1 element.

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

Push implements heap.Interface.Push. Appends an item to the PriorityQueue.

func (PriorityQueue) Slice

func (pq PriorityQueue) Slice(start, end int) PriorityQueue

Slice returns a sliced PriorityQueue using the given bounds.

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

Swap exchanges indexes of the items.

func (*PriorityQueue) Update

func (pq *PriorityQueue) Update(item *Item, newEpoch int64)

Update sets item priority and calls heap.Fix to re-establish the heap ordering.

type SessionWindow added in v0.7.0

type SessionWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SessionWindow generates groups of elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time.

func NewSessionWindow added in v0.7.0

func NewSessionWindow(inactivityGap time.Duration) *SessionWindow

NewSessionWindow returns a new SessionWindow instance.

inactivityGap is the gap of inactivity that closes a session window when occurred.

func (*SessionWindow) In added in v0.7.0

func (sw *SessionWindow) In() chan<- interface{}

In returns an input channel for receiving data

func (*SessionWindow) Out added in v0.7.0

func (sw *SessionWindow) Out() <-chan interface{}

Out returns an output channel for sending data

func (*SessionWindow) To added in v0.7.0

func (sw *SessionWindow) To(sink streams.Sink)

To streams data to the given sink

func (*SessionWindow) Via added in v0.7.0

func (sw *SessionWindow) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type SlidingWindow

type SlidingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows.

func NewSlidingWindow

func NewSlidingWindow(size time.Duration, slide time.Duration) *SlidingWindow

NewSlidingWindow returns a new processing time based SlidingWindow. Processing time refers to the system time of the machine that is executing the respective operation.

size is the Duration of generated windows. slide is the sliding interval of generated windows.

func NewSlidingWindowWithTSExtractor added in v0.6.0

func NewSlidingWindowWithTSExtractor(size time.Duration, slide time.Duration,
	timestampExtractor func(interface{}) int64) *SlidingWindow

NewSlidingWindowWithTSExtractor returns a new event time based SlidingWindow. Event time is the time that each individual event occurred on its producing device. Gives correct results on out-of-order events, late events, or on replays of data.

size is the Duration of generated windows. slide is the sliding interval of generated windows. timestampExtractor is the record timestamp (in nanoseconds) extractor.

func (*SlidingWindow) In

func (sw *SlidingWindow) In() chan<- interface{}

In returns an input channel for receiving data

func (*SlidingWindow) Out

func (sw *SlidingWindow) Out() <-chan interface{}

Out returns an output channel for sending data

func (*SlidingWindow) To

func (sw *SlidingWindow) To(sink streams.Sink)

To streams data to the given sink

func (*SlidingWindow) Via

func (sw *SlidingWindow) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type ThrottleMode

type ThrottleMode int8

ThrottleMode defines the Throttler behavior on buffer overflow.

const (
	// Backpressure on overflow mode.
	Backpressure ThrottleMode = iota

	// Discard elements on overflow mode.
	Discard
)

type Throttler

type Throttler struct {
	// contains filtered or unexported fields
}

Throttler limits the throughput to a specific number of elements per time unit.

func NewThrottler

func NewThrottler(elements uint, period time.Duration, bufferSize uint, mode ThrottleMode) *Throttler

NewThrottler returns a new Throttler instance.

elements is the maximum number of elements to be produced per the given period of time. bufferSize defines the incoming elements buffer size. mode defines the Throttler flow behavior on elements buffer overflow.

func (*Throttler) In

func (th *Throttler) In() chan<- interface{}

In returns an input channel for receiving data

func (*Throttler) Out

func (th *Throttler) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Throttler) To

func (th *Throttler) To(sink streams.Sink)

To streams data to the given sink

func (*Throttler) Via

func (th *Throttler) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

type TumblingWindow

type TumblingWindow struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap.

func NewTumblingWindow

func NewTumblingWindow(size time.Duration) *TumblingWindow

NewTumblingWindow returns a new TumblingWindow instance.

size is the Duration of generated windows.

func (*TumblingWindow) In

func (tw *TumblingWindow) In() chan<- interface{}

In returns an input channel for receiving data

func (*TumblingWindow) Out

func (tw *TumblingWindow) Out() <-chan interface{}

Out returns an output channel for sending data

func (*TumblingWindow) To

func (tw *TumblingWindow) To(sink streams.Sink)

To streams data to the given sink

func (*TumblingWindow) Via

func (tw *TumblingWindow) Via(flow streams.Flow) streams.Flow

Via streams data through the given flow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL