flow

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2024 License: MIT Imports: 6 Imported by: 34

Documentation

Overview

Package flow provides streams.Flow implementations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet streams.Outlet, inlet streams.Inlet)

DoStream streams data from the outlet to inlet.

func FanOut

func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow

FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.

func Flatten added in v0.8.0

func Flatten[T any](parallelism int) streams.Flow

Flatten creates a Flow to flatten the stream of slices. T specifies the outgoing element type, and the incoming element type is []T.

func Merge

func Merge(outlets ...streams.Flow) streams.Flow

Merge merges multiple flows into a single flow.

func RoundRobin added in v0.8.0

func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow

RoundRobin creates a balanced number of flows from the single outlet. This can be useful when work can be parallelized across multiple cores.

func Split

func Split[T any](outlet streams.Outlet, predicate func(T) bool) [2]streams.Flow

Split splits the stream into two flows according to the given boolean predicate. T specifies the incoming and outgoing element type.

Types

type Batch added in v0.10.0

type Batch[T any] struct {
	// contains filtered or unexported fields
}

Batch processor breaks a stream of elements into batches based on size or timing. When the maximum batch size is reached or the batch time is elapsed, and the current buffer is not empty, a new batch will be emitted. Note: once a batch is sent downstream, the timer will be reset. T indicates the incoming element type, and the outgoing element type is []T.

func NewBatch added in v0.10.0

func NewBatch[T any](maxBatchSize int, timeInterval time.Duration) *Batch[T]

NewBatch returns a new Batch operator using the specified maximum batch size and the time interval. T specifies the incoming element type, and the outgoing element type is []T. NewBatch will panic if the maxBatchSize argument is not positive.

func (*Batch[T]) In added in v0.10.0

func (b *Batch[T]) In() chan<- any

In returns the input channel of the Batch operator.

func (*Batch[T]) Out added in v0.10.0

func (b *Batch[T]) Out() <-chan any

Out returns the output channel of the Batch operator.

func (*Batch[T]) To added in v0.10.0

func (b *Batch[T]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*Batch[T]) Via added in v0.10.0

func (b *Batch[T]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type Filter

type Filter[T any] struct {
	// contains filtered or unexported fields
}

Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FilterPredicate -------- ]

out -- 1 -- 2 ------------------ 5 --

func NewFilter

func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism int) *Filter[T]

NewFilter returns a new Filter operator. T specifies the incoming and the outgoing element type.

filterPredicate is the boolean-valued filter function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1. If the parallelism argument is not positive, NewFilter will panic.

func (*Filter[T]) In

func (f *Filter[T]) In() chan<- any

In returns the input channel of the Filter operator.

func (*Filter[T]) Out

func (f *Filter[T]) Out() <-chan any

Out returns the output channel of the Filter operator.

func (*Filter[T]) To

func (f *Filter[T]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*Filter[T]) Via

func (f *Filter[T]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type FilterPredicate added in v0.7.0

type FilterPredicate[T any] func(T) bool

FilterPredicate represents a filter predicate (boolean-valued function).

type FlatMap

type FlatMap[T, R any] struct {
	// contains filtered or unexported fields
}

FlatMap takes one element and produces zero, one, or more elements.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ -------- FlatMapFunction -------- ]

out -- 1' - 2' -------- 4'- 4" - 5' -

func NewFlatMap

func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism int) *FlatMap[T, R]

NewFlatMap returns a new FlatMap operator. T specifies the incoming element type, and the outgoing element type is []R.

flatMapFunction is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1. If the parallelism argument is not positive, NewFlatMap will panic.

func (*FlatMap[T, R]) In

func (fm *FlatMap[T, R]) In() chan<- any

In returns the input channel of the FlatMap operator.

func (*FlatMap[T, R]) Out

func (fm *FlatMap[T, R]) Out() <-chan any

Out returns the output channel of the FlatMap operator.

func (*FlatMap[T, R]) To

func (fm *FlatMap[T, R]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*FlatMap[T, R]) Via

func (fm *FlatMap[T, R]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type FlatMapFunction added in v0.7.0

type FlatMapFunction[T, R any] func(T) []R

FlatMapFunction represents a FlatMap transformation function.

type Item

type Item struct {
	Msg any
	// contains filtered or unexported fields
}

Item represents a PriorityQueue item.

func NewItem

func NewItem(msg any, epoch int64, index int) *Item

NewItem returns a new Item.

type Map

type Map[T, R any] struct {
	// contains filtered or unexported fields
}

Map takes one element and produces one element.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ ---------- MapFunction ---------- ]

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap[T, R any](mapFunction MapFunction[T, R], parallelism int) *Map[T, R]

NewMap returns a new Map operator. T specifies the incoming element type, and the outgoing element type is R.

mapFunction is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1. If the parallelism argument is not positive, NewMap will panic.

func (*Map[T, R]) In

func (m *Map[T, R]) In() chan<- any

In returns the input channel of the Map operator.

func (*Map[T, R]) Out

func (m *Map[T, R]) Out() <-chan any

Out returns the output channel of the Map operator.

func (*Map[T, R]) To

func (m *Map[T, R]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*Map[T, R]) Via

func (m *Map[T, R]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type MapFunction added in v0.7.0

type MapFunction[T, R any] func(T) R

MapFunction represents a Map transformation function.

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough retransmits incoming elements downstream as they are.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --

func NewPassThrough

func NewPassThrough() *PassThrough

NewPassThrough returns a new PassThrough operator.

func (*PassThrough) In

func (pt *PassThrough) In() chan<- any

In returns the input channel of the PassThrough operator.

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan any

Out returns the output channel of the PassThrough operator.

func (*PassThrough) To

func (pt *PassThrough) To(sink streams.Sink)

To streams data to a specified Sink.

func (*PassThrough) Via

func (pt *PassThrough) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type PriorityQueue

type PriorityQueue []*Item

PriorityQueue implements heap.Interface.

func (*PriorityQueue) Head

func (pq *PriorityQueue) Head() *Item

Head returns the first item of the PriorityQueue without removing it.

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

Len returns the PriorityQueue length.

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

Less is the items less comparator.

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() any

Pop implements heap.Interface.Pop. Removes and returns the Len() - 1 element.

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x any)

Push implements heap.Interface.Push. Appends an item to the PriorityQueue.

func (PriorityQueue) Slice

func (pq PriorityQueue) Slice(start, end int) PriorityQueue

Slice returns a sliced PriorityQueue using the given bounds.

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

Swap exchanges indexes of the items.

func (*PriorityQueue) Update

func (pq *PriorityQueue) Update(item *Item, newEpoch int64)

Update sets item priority and calls heap.Fix to re-establish the heap ordering.

type Reduce added in v0.8.0

type Reduce[T any] struct {
	// contains filtered or unexported fields
}

Reduce represents a “rolling” reduce on a data stream. Combines the current element with the last reduced value and emits the new value.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

[ --------- ReduceFunction --------- ]

out -- 1 -- 2' --- 3' - 4' ----- 5' -

func NewReduce added in v0.8.0

func NewReduce[T any](reduceFunction ReduceFunction[T]) *Reduce[T]

NewReduce returns a new Reduce operator. T specifies the incoming and the outgoing element type.

reduceFunction combines the current element with the last reduced value.

func (*Reduce[T]) In added in v0.8.0

func (r *Reduce[T]) In() chan<- any

In returns the input channel of the Reduce operator.

func (*Reduce[T]) Out added in v0.8.0

func (r *Reduce[T]) Out() <-chan any

Out returns the output channel of the Reduce operator.

func (*Reduce[T]) To added in v0.8.0

func (r *Reduce[T]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*Reduce[T]) Via added in v0.8.0

func (r *Reduce[T]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type ReduceFunction added in v0.8.0

type ReduceFunction[T any] func(T, T) T

ReduceFunction combines the current element with the last reduced value.

type SessionWindow added in v0.7.0

type SessionWindow[T any] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SessionWindow generates groups of elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time. T indicates the incoming element type, and the outgoing element type is []T.

func NewSessionWindow added in v0.7.0

func NewSessionWindow[T any](inactivityGap time.Duration) *SessionWindow[T]

NewSessionWindow returns a new SessionWindow operator. T specifies the incoming element type, and the outgoing element type is []T.

inactivityGap is the gap of inactivity that closes a session window when occurred.

func (*SessionWindow[T]) In added in v0.7.0

func (sw *SessionWindow[T]) In() chan<- any

In returns the input channel of the SessionWindow operator.

func (*SessionWindow[T]) Out added in v0.7.0

func (sw *SessionWindow[T]) Out() <-chan any

Out returns the output channel of the SessionWindow operator.

func (*SessionWindow[T]) To added in v0.7.0

func (sw *SessionWindow[T]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*SessionWindow[T]) Via added in v0.7.0

func (sw *SessionWindow[T]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type SlidingWindow

type SlidingWindow[T any] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows. T indicates the incoming element type, and the outgoing element type is []T.

func NewSlidingWindow

func NewSlidingWindow[T any](
	windowSize time.Duration,
	slidingInterval time.Duration) *SlidingWindow[T]

NewSlidingWindow returns a new SlidingWindow operator based on processing time. Processing time refers to the system time of the machine that is executing the respective operation. T specifies the incoming element type, and the outgoing element type is []T.

windowSize is the Duration of generated windows. slidingInterval is the sliding interval of generated windows.

NewSlidingWindow panics if slidingInterval is larger than windowSize.

func NewSlidingWindowWithExtractor added in v0.10.0

func NewSlidingWindowWithExtractor[T any](
	windowSize time.Duration,
	slidingInterval time.Duration,
	timestampExtractor func(T) int64) *SlidingWindow[T]

NewSlidingWindowWithExtractor returns a new SlidingWindow operator based on event time. Event time is the time that each individual event occurred on its producing device. Gives correct results on out-of-order events, late events, or on replays of data. T specifies the incoming element type, and the outgoing element type is []T.

windowSize is the Duration of generated windows. slidingInterval is the sliding interval of generated windows. timestampExtractor is the record timestamp (in nanoseconds) extractor.

NewSlidingWindowWithExtractor panics if slidingInterval is larger than windowSize.

func (*SlidingWindow[T]) In

func (sw *SlidingWindow[T]) In() chan<- any

In returns the input channel of the SlidingWindow operator.

func (*SlidingWindow[T]) Out

func (sw *SlidingWindow[T]) Out() <-chan any

Out returns the output channel of the SlidingWindow operator.

func (*SlidingWindow[T]) To

func (sw *SlidingWindow[T]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*SlidingWindow[T]) Via

func (sw *SlidingWindow[T]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type ThrottleMode

type ThrottleMode int8

ThrottleMode represents Throttler's processing behavior when its element buffer overflows.

const (
	// Backpressure slows down upstream ingestion when the element buffer overflows.
	Backpressure ThrottleMode = iota

	// Discard drops incoming elements when the element buffer overflows.
	Discard
)

type Throttler

type Throttler struct {
	// contains filtered or unexported fields
}

Throttler limits the throughput to a specific number of elements per time unit.

func NewThrottler

func NewThrottler(elements int, period time.Duration, bufferSize int, mode ThrottleMode) *Throttler

NewThrottler returns a new Throttler operator.

elements is the maximum number of elements to be produced per the given period of time. bufferSize specifies the buffer size for incoming elements. mode specifies the processing behavior when the elements buffer overflows.

If elements or bufferSize are not positive, NewThrottler will panic.

func (*Throttler) In

func (th *Throttler) In() chan<- any

In returns the input channel of the Throttler operator.

func (*Throttler) Out

func (th *Throttler) Out() <-chan any

Out returns the output channel of the Throttler operator.

func (*Throttler) To

func (th *Throttler) To(sink streams.Sink)

To streams data to a specified Sink.

func (*Throttler) Via

func (th *Throttler) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

type TumblingWindow

type TumblingWindow[T any] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. T indicates the incoming element type, and the outgoing element type is []T.

func NewTumblingWindow

func NewTumblingWindow[T any](size time.Duration) *TumblingWindow[T]

NewTumblingWindow returns a new TumblingWindow operator. T specifies the incoming element type, and the outgoing element type is []T.

size is the Duration of generated windows.

func (*TumblingWindow[T]) In

func (tw *TumblingWindow[T]) In() chan<- any

In returns the input channel of the TumblingWindow operator.

func (*TumblingWindow[T]) Out

func (tw *TumblingWindow[T]) Out() <-chan any

Out returns the output channel of the TumblingWindow operator.

func (*TumblingWindow[T]) To

func (tw *TumblingWindow[T]) To(sink streams.Sink)

To streams data to a specified Sink.

func (*TumblingWindow[T]) Via

func (tw *TumblingWindow[T]) Via(flow streams.Flow) streams.Flow

Via streams data to a specified Flow and returns it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL