flow

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Transmit

func Transmit(state *ComponentState, downstream Inlet, current Outlet)

Transmit provides a helper function to connect the current component with the downstream component. It should be run in another goroutine.

Types

type AggregationOp

type AggregationOp interface {
	// Add puts a slice of elements as the input
	Add([]StreamRecord)
	// Snapshot takes a snapshot of the current state of the AggregationOp
	// Taking a snapshot will restore the dirty flag
	Snapshot() interface{}
	// Dirty flag means if any new item is added after the last snapshot
	Dirty() bool
}

AggregationOp defines the stateful operation for aggregation.

type AggregationOpFactory

type AggregationOpFactory func() AggregationOp

type Component

type Component interface {
	// Setup is the lifecycle hook for resource preparation, e.g. start background job for listening input channel.
	// It must be called before the flow starts to process elements.
	Setup(context.Context) error
	// Teardown is the lifecycle hook for shutting down the Component
	// Implementation should ENSURE that all resource has been correctly recycled before this method returns.
	Teardown(context.Context) error
}

type ComponentState

type ComponentState struct {
	sync.WaitGroup
}

type Data

type Data []any

type DedupPriorityQueue

type DedupPriorityQueue struct {
	Items []Element
	// contains filtered or unexported fields
}

DedupPriorityQueue implements heap.Interface. DedupPriorityQueue is not thread-safe

func NewPriorityQueue

func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupPriorityQueue

func (*DedupPriorityQueue) Len

func (pq *DedupPriorityQueue) Len() int

Len returns the DedupPriorityQueue length.

func (*DedupPriorityQueue) Less

func (pq *DedupPriorityQueue) Less(i, j int) bool

Less is the items less comparator.

func (*DedupPriorityQueue) Peek

func (pq *DedupPriorityQueue) Peek() Element

Peek returns the first item of the DedupPriorityQueue without removing it.

func (*DedupPriorityQueue) Pop

func (pq *DedupPriorityQueue) Pop() interface{}

Pop implements heap.Interface.Pop. Removes and returns the Len() - 1 element.

func (*DedupPriorityQueue) Push

func (pq *DedupPriorityQueue) Push(x interface{})

Push implements heap.Interface.Push. Appends an item to the DedupPriorityQueue.

func (*DedupPriorityQueue) ReplaceLowest

func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element)

func (*DedupPriorityQueue) Swap

func (pq *DedupPriorityQueue) Swap(i, j int)

Swap exchanges indexes of the items.

func (*DedupPriorityQueue) Values

func (pq *DedupPriorityQueue) Values() []Element

type Element

type Element interface {
	GetIndex() int
	SetIndex(int)
}

Element represents an item in the DedupPriorityQueue.

type Flow

type Flow interface {
	io.Closer
	// Filter is used to filter data.
	// The parameter f can be either predicate function for streaming,
	// or conditions for batch query.
	Filter(UnaryOperation[bool]) Flow
	// Map is used to transform data
	Map(UnaryOperation[any]) Flow
	// Window is used to split infinite data into "buckets" of finite size.
	// Currently, it is only applicable to streaming context.
	Window(WindowAssigner) WindowedFlow
	// To pipes data to the given sink
	To(sink Sink) Flow
	// Open opens the flow in the async mode for streaming scenario.
	// The first error is the error combination while opening all components,
	// while the second is a channel for receiving async errors.
	Open() <-chan error
}

Flow is an abstraction of data flow for both Streaming and Batch

type Inlet

type Inlet interface {
	In() chan<- StreamRecord
}

Inlet represents a type that exposes one open input.

type Operator

type Operator interface {
	Inlet
	Outlet
	Component
	Exec(downstream Inlet)
}

Operator represents a set of stream processing steps that has one open input and one open output.

type Outlet

type Outlet interface {
	Out() <-chan StreamRecord
}

Outlet represents a type that exposes one open output.

type Sink

type Sink interface {
	Inlet
	Component
}

Sink represents a set of stream processing steps that has one open input.

type Source

type Source interface {
	Outlet
	Component
	Exec(downstream Inlet)
}

Source represents a set of stream processing steps that has one open output.

type StreamRecord

type StreamRecord struct {
	// contains filtered or unexported fields
}

StreamRecord is a container wraps user data and timestamp. It is the underlying transmission medium for the streaming processing.

func NewStreamRecord

func NewStreamRecord(data interface{}, ts int64) StreamRecord

func NewStreamRecordWithTimestampPb

func NewStreamRecordWithTimestampPb(data interface{}, timestamp *timestamppb.Timestamp) StreamRecord

func NewStreamRecordWithoutTS

func NewStreamRecordWithoutTS(data interface{}) StreamRecord

func TryExactTimestamp

func TryExactTimestamp(item any) StreamRecord

func (StreamRecord) Data

func (sr StreamRecord) Data() interface{}

func (StreamRecord) TimestampMillis

func (sr StreamRecord) TimestampMillis() int64

func (StreamRecord) WithNewData

func (sr StreamRecord) WithNewData(data interface{}) StreamRecord

type UnaryFunc

type UnaryFunc[R any] func(context.Context, interface{}) R

UnaryFunc implements UnaryOperation as type func (context.Context, interface{})

func (UnaryFunc[R]) Apply

func (f UnaryFunc[R]) Apply(ctx context.Context, data interface{}) R

Apply implements UnOperation.Apply method

type UnaryOperation

type UnaryOperation[R any] interface {
	Apply(ctx context.Context, data interface{}) R
}

UnaryOperation represents user-defined unary function (i.e. Map, Filter, etc)

func FilterFunc

func FilterFunc(filter UnaryOperation[bool]) (UnaryOperation[any], error)

FilterFunc transform a function to an UnaryOperation

type Window

type Window interface {
	// MaxTimestamp returns the upper bound of the Window.
	// Unit: Millisecond
	MaxTimestamp() int64
}

Window is a bucket of elements with a finite size. timedWindow is the only implementation now.

type WindowAssigner

type WindowAssigner interface {
	// AssignWindows assigns a slice of Window according to the given timestamp, e.g. eventTime.
	// The unit of the timestamp here is MilliSecond.
	AssignWindows(timestamp int64) ([]Window, error)
}

WindowAssigner is used to assign Window(s) for a given timestamp, and thus it can create a WindowedFlow.

type WindowedFlow

type WindowedFlow interface {
	AllowedMaxWindows(windowCnt int) WindowedFlow
	// TopN applies a TopNAggregation to each Window.
	TopN(topNum int, opts ...any) Flow
}

WindowedFlow is a flow which processes incoming elements based on window. The WindowedFlow can be created with a WindowAssigner.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL