Documentation ¶
Overview ¶
Package flow provides streams.Flow implementations.
Index ¶
- func DoStream(outlet streams.Outlet, inlet streams.Inlet)
- func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
- func Flatten[T any](parallelism int) streams.Flow
- func Merge(outlets ...streams.Flow) streams.Flow
- func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow
- func Split[T any](outlet streams.Outlet, predicate func(T) bool) [2]streams.Flow
- type Batch
- type Filter
- type FilterPredicate
- type FlatMap
- type FlatMapFunction
- type Item
- type Map
- type MapFunction
- type PassThrough
- type PriorityQueue
- func (pq *PriorityQueue) Head() *Item
- func (pq PriorityQueue) Len() int
- func (pq PriorityQueue) Less(i, j int) bool
- func (pq *PriorityQueue) Pop() any
- func (pq *PriorityQueue) Push(x any)
- func (pq PriorityQueue) Slice(start, end int) PriorityQueue
- func (pq PriorityQueue) Swap(i, j int)
- func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
- type Reduce
- type ReduceFunction
- type SessionWindow
- type SlidingWindow
- type ThrottleMode
- type Throttler
- type TumblingWindow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoStream ¶
func DoStream(outlet streams.Outlet, inlet streams.Inlet)
DoStream streams data from the outlet to inlet.
func FanOut ¶
func FanOut(outlet streams.Outlet, magnitude int) []streams.Flow
FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.
func Flatten ¶ added in v0.8.0
Flatten creates a Flow to flatten the stream of slices. T specifies the outgoing element type, and the incoming element type is []T.
func Merge ¶
func Merge(outlets ...streams.Flow) streams.Flow
Merge merges multiple flows into a single flow.
func RoundRobin ¶ added in v0.8.0
func RoundRobin(outlet streams.Outlet, magnitude int) []streams.Flow
RoundRobin creates a balanced number of flows from the single outlet. This can be useful when work can be parallelized across multiple cores.
Types ¶
type Batch ¶ added in v0.10.0
type Batch[T any] struct { // contains filtered or unexported fields }
Batch processor breaks a stream of elements into batches based on size or timing. When the maximum batch size is reached or the batch time is elapsed, and the current buffer is not empty, a new batch will be emitted. Note: once a batch is sent downstream, the timer will be reset. T indicates the incoming element type, and the outgoing element type is []T.
func NewBatch ¶ added in v0.10.0
NewBatch returns a new Batch operator using the specified maximum batch size and the time interval. T specifies the incoming element type, and the outgoing element type is []T. NewBatch will panic if the maxBatchSize argument is not positive.
type Filter ¶
type Filter[T any] struct { // contains filtered or unexported fields }
Filter filters incoming elements using a filter predicate. If an element matches the predicate, the element is passed downstream. If not, the element is discarded.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ -------- FilterPredicate -------- ]
out -- 1 -- 2 ------------------ 5 --
func NewFilter ¶
func NewFilter[T any](filterPredicate FilterPredicate[T], parallelism int) *Filter[T]
NewFilter returns a new Filter operator. T specifies the incoming and the outgoing element type.
filterPredicate is the boolean-valued filter function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1. If the parallelism argument is not positive, NewFilter will panic.
type FilterPredicate ¶ added in v0.7.0
FilterPredicate represents a filter predicate (boolean-valued function).
type FlatMap ¶
type FlatMap[T, R any] struct { // contains filtered or unexported fields }
FlatMap takes one element and produces zero, one, or more elements.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ -------- FlatMapFunction -------- ]
out -- 1' - 2' -------- 4'- 4" - 5' -
func NewFlatMap ¶
func NewFlatMap[T, R any](flatMapFunction FlatMapFunction[T, R], parallelism int) *FlatMap[T, R]
NewFlatMap returns a new FlatMap operator. T specifies the incoming element type, and the outgoing element type is []R.
flatMapFunction is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1. If the parallelism argument is not positive, NewFlatMap will panic.
type FlatMapFunction ¶ added in v0.7.0
type FlatMapFunction[T, R any] func(T) []R
FlatMapFunction represents a FlatMap transformation function.
type Item ¶
type Item struct { Msg any // contains filtered or unexported fields }
Item represents a PriorityQueue item.
type Map ¶
type Map[T, R any] struct { // contains filtered or unexported fields }
Map takes one element and produces one element.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ ---------- MapFunction ---------- ]
out -- 1' - 2' --- 3' - 4' ----- 5' -
func NewMap ¶
func NewMap[T, R any](mapFunction MapFunction[T, R], parallelism int) *Map[T, R]
NewMap returns a new Map operator. T specifies the incoming element type, and the outgoing element type is R.
mapFunction is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1. If the parallelism argument is not positive, NewMap will panic.
type MapFunction ¶ added in v0.7.0
type MapFunction[T, R any] func(T) R
MapFunction represents a Map transformation function.
type PassThrough ¶
type PassThrough struct {
// contains filtered or unexported fields
}
PassThrough retransmits incoming elements downstream as they are.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
out -- 1 -- 2 ---- 3 -- 4 ------ 5 --
func NewPassThrough ¶
func NewPassThrough() *PassThrough
NewPassThrough returns a new PassThrough operator.
func (*PassThrough) In ¶
func (pt *PassThrough) In() chan<- any
In returns the input channel of the PassThrough operator.
func (*PassThrough) Out ¶
func (pt *PassThrough) Out() <-chan any
Out returns the output channel of the PassThrough operator.
func (*PassThrough) To ¶
func (pt *PassThrough) To(sink streams.Sink)
To streams data to a specified Sink.
func (*PassThrough) Via ¶
func (pt *PassThrough) Via(flow streams.Flow) streams.Flow
Via streams data to a specified Flow and returns it.
type PriorityQueue ¶
type PriorityQueue []*Item
PriorityQueue implements heap.Interface.
func (*PriorityQueue) Head ¶
func (pq *PriorityQueue) Head() *Item
Head returns the first item of the PriorityQueue without removing it.
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
Less is the items less comparator.
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() any
Pop implements heap.Interface.Pop. Removes and returns the Len() - 1 element.
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x any)
Push implements heap.Interface.Push. Appends an item to the PriorityQueue.
func (PriorityQueue) Slice ¶
func (pq PriorityQueue) Slice(start, end int) PriorityQueue
Slice returns a sliced PriorityQueue using the given bounds.
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
Swap exchanges indexes of the items.
func (*PriorityQueue) Update ¶
func (pq *PriorityQueue) Update(item *Item, newEpoch int64)
Update sets item priority and calls heap.Fix to re-establish the heap ordering.
type Reduce ¶ added in v0.8.0
type Reduce[T any] struct { // contains filtered or unexported fields }
Reduce represents a “rolling” reduce on a data stream. Combines the current element with the last reduced value and emits the new value.
in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
[ --------- ReduceFunction --------- ]
out -- 1 -- 2' --- 3' - 4' ----- 5' -
func NewReduce ¶ added in v0.8.0
func NewReduce[T any](reduceFunction ReduceFunction[T]) *Reduce[T]
NewReduce returns a new Reduce operator. T specifies the incoming and the outgoing element type.
reduceFunction combines the current element with the last reduced value.
type ReduceFunction ¶ added in v0.8.0
type ReduceFunction[T any] func(T, T) T
ReduceFunction combines the current element with the last reduced value.
type SessionWindow ¶ added in v0.7.0
SessionWindow generates groups of elements by sessions of activity. Session windows do not overlap and do not have a fixed start and end time. T indicates the incoming element type, and the outgoing element type is []T.
func NewSessionWindow ¶ added in v0.7.0
func NewSessionWindow[T any](inactivityGap time.Duration) *SessionWindow[T]
NewSessionWindow returns a new SessionWindow operator. T specifies the incoming element type, and the outgoing element type is []T.
inactivityGap is the gap of inactivity that closes a session window when occurred.
func (*SessionWindow[T]) In ¶ added in v0.7.0
func (sw *SessionWindow[T]) In() chan<- any
In returns the input channel of the SessionWindow operator.
func (*SessionWindow[T]) Out ¶ added in v0.7.0
func (sw *SessionWindow[T]) Out() <-chan any
Out returns the output channel of the SessionWindow operator.
func (*SessionWindow[T]) To ¶ added in v0.7.0
func (sw *SessionWindow[T]) To(sink streams.Sink)
To streams data to a specified Sink.
func (*SessionWindow[T]) Via ¶ added in v0.7.0
func (sw *SessionWindow[T]) Via(flow streams.Flow) streams.Flow
Via streams data to a specified Flow and returns it.
type SlidingWindow ¶
SlidingWindow assigns elements to windows of fixed length configured by the window size parameter. An additional window slide parameter controls how frequently a sliding window is started. Hence, sliding windows can be overlapping if the slide is smaller than the window size. In this case elements are assigned to multiple windows. T indicates the incoming element type, and the outgoing element type is []T.
func NewSlidingWindow ¶
func NewSlidingWindow[T any]( windowSize time.Duration, slidingInterval time.Duration) *SlidingWindow[T]
NewSlidingWindow returns a new SlidingWindow operator based on processing time. Processing time refers to the system time of the machine that is executing the respective operation. T specifies the incoming element type, and the outgoing element type is []T.
windowSize is the Duration of generated windows. slidingInterval is the sliding interval of generated windows.
NewSlidingWindow panics if slidingInterval is larger than windowSize.
func NewSlidingWindowWithExtractor ¶ added in v0.10.0
func NewSlidingWindowWithExtractor[T any]( windowSize time.Duration, slidingInterval time.Duration, timestampExtractor func(T) int64) *SlidingWindow[T]
NewSlidingWindowWithExtractor returns a new SlidingWindow operator based on event time. Event time is the time that each individual event occurred on its producing device. Gives correct results on out-of-order events, late events, or on replays of data. T specifies the incoming element type, and the outgoing element type is []T.
windowSize is the Duration of generated windows. slidingInterval is the sliding interval of generated windows. timestampExtractor is the record timestamp (in nanoseconds) extractor.
NewSlidingWindowWithExtractor panics if slidingInterval is larger than windowSize.
func (*SlidingWindow[T]) In ¶
func (sw *SlidingWindow[T]) In() chan<- any
In returns the input channel of the SlidingWindow operator.
func (*SlidingWindow[T]) Out ¶
func (sw *SlidingWindow[T]) Out() <-chan any
Out returns the output channel of the SlidingWindow operator.
func (*SlidingWindow[T]) To ¶
func (sw *SlidingWindow[T]) To(sink streams.Sink)
To streams data to a specified Sink.
func (*SlidingWindow[T]) Via ¶
func (sw *SlidingWindow[T]) Via(flow streams.Flow) streams.Flow
Via streams data to a specified Flow and returns it.
type ThrottleMode ¶
type ThrottleMode int8
ThrottleMode represents Throttler's processing behavior when its element buffer overflows.
const ( // Backpressure slows down upstream ingestion when the element buffer overflows. Backpressure ThrottleMode = iota // Discard drops incoming elements when the element buffer overflows. Discard )
type Throttler ¶
type Throttler struct {
// contains filtered or unexported fields
}
Throttler limits the throughput to a specific number of elements per time unit.
func NewThrottler ¶
NewThrottler returns a new Throttler operator.
elements is the maximum number of elements to be produced per the given period of time. bufferSize specifies the buffer size for incoming elements. mode specifies the processing behavior when the elements buffer overflows.
If elements or bufferSize are not positive, NewThrottler will panic.
type TumblingWindow ¶
TumblingWindow assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. T indicates the incoming element type, and the outgoing element type is []T.
func NewTumblingWindow ¶
func NewTumblingWindow[T any](size time.Duration) *TumblingWindow[T]
NewTumblingWindow returns a new TumblingWindow operator. T specifies the incoming element type, and the outgoing element type is []T.
size is the Duration of generated windows.
func (*TumblingWindow[T]) In ¶
func (tw *TumblingWindow[T]) In() chan<- any
In returns the input channel of the TumblingWindow operator.
func (*TumblingWindow[T]) Out ¶
func (tw *TumblingWindow[T]) Out() <-chan any
Out returns the output channel of the TumblingWindow operator.
func (*TumblingWindow[T]) To ¶
func (tw *TumblingWindow[T]) To(sink streams.Sink)
To streams data to a specified Sink.
func (*TumblingWindow[T]) Via ¶
func (tw *TumblingWindow[T]) Via(flow streams.Flow) streams.Flow
Via streams data to a specified Flow and returns it.