Documentation
¶
Index ¶
- func Transmit(state *ComponentState, downstream Inlet, current Outlet)
- type AggregationOp
- type AggregationOpFactory
- type Component
- type ComponentState
- type Data
- type DedupPriorityQueue
- func (pq *DedupPriorityQueue) Len() int
- func (pq *DedupPriorityQueue) Less(i, j int) bool
- func (pq *DedupPriorityQueue) Peek() Element
- func (pq *DedupPriorityQueue) Pop() interface{}
- func (pq *DedupPriorityQueue) Push(x interface{})
- func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element)
- func (pq *DedupPriorityQueue) Swap(i, j int)
- func (pq *DedupPriorityQueue) Values() []Element
- type Element
- type Flow
- type Inlet
- type Operator
- type Outlet
- type Sink
- type Source
- type StreamRecord
- type UnaryFunc
- type UnaryOperation
- type Window
- type WindowAssigner
- type WindowedFlow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Transmit ¶
func Transmit(state *ComponentState, downstream Inlet, current Outlet)
Transmit provides a helper function to connect the current component with the downstream component. It should be run in another goroutine.
Types ¶
type AggregationOp ¶
type AggregationOp interface { // Add puts a slice of elements as the input Add([]StreamRecord) // Snapshot takes a snapshot of the current state of the AggregationOp // Taking a snapshot will restore the dirty flag Snapshot() interface{} // Dirty flag means if any new item is added after the last snapshot Dirty() bool }
AggregationOp defines the stateful operation for aggregation.
type AggregationOpFactory ¶
type AggregationOpFactory func() AggregationOp
type Component ¶
type Component interface { // Setup is the lifecycle hook for resource preparation, e.g. start background job for listening input channel. // It must be called before the flow starts to process elements. Setup(context.Context) error // Teardown is the lifecycle hook for shutting down the Component // Implementation should ENSURE that all resource has been correctly recycled before this method returns. Teardown(context.Context) error }
type ComponentState ¶
type DedupPriorityQueue ¶
type DedupPriorityQueue struct { Items []Element // contains filtered or unexported fields }
DedupPriorityQueue implements heap.Interface. DedupPriorityQueue is not thread-safe
func NewPriorityQueue ¶
func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupPriorityQueue
func (*DedupPriorityQueue) Len ¶
func (pq *DedupPriorityQueue) Len() int
Len returns the DedupPriorityQueue length.
func (*DedupPriorityQueue) Less ¶
func (pq *DedupPriorityQueue) Less(i, j int) bool
Less is the items less comparator.
func (*DedupPriorityQueue) Peek ¶
func (pq *DedupPriorityQueue) Peek() Element
Peek returns the first item of the DedupPriorityQueue without removing it.
func (*DedupPriorityQueue) Pop ¶
func (pq *DedupPriorityQueue) Pop() interface{}
Pop implements heap.Interface.Pop. Removes and returns the Len() - 1 element.
func (*DedupPriorityQueue) Push ¶
func (pq *DedupPriorityQueue) Push(x interface{})
Push implements heap.Interface.Push. Appends an item to the DedupPriorityQueue.
func (*DedupPriorityQueue) ReplaceLowest ¶
func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element)
func (*DedupPriorityQueue) Swap ¶
func (pq *DedupPriorityQueue) Swap(i, j int)
Swap exchanges indexes of the items.
func (*DedupPriorityQueue) Values ¶
func (pq *DedupPriorityQueue) Values() []Element
type Flow ¶
type Flow interface { io.Closer // Filter is used to filter data. // The parameter f can be either predicate function for streaming, // or conditions for batch query. Filter(UnaryOperation[bool]) Flow // Map is used to transform data Map(UnaryOperation[any]) Flow // Window is used to split infinite data into "buckets" of finite size. // Currently, it is only applicable to streaming context. Window(WindowAssigner) WindowedFlow // To pipes data to the given sink To(sink Sink) Flow // Open opens the flow in the async mode for streaming scenario. // The first error is the error combination while opening all components, // while the second is a channel for receiving async errors. Open() <-chan error }
Flow is an abstraction of data flow for both Streaming and Batch
type Inlet ¶
type Inlet interface {
In() chan<- StreamRecord
}
Inlet represents a type that exposes one open input.
type Operator ¶
Operator represents a set of stream processing steps that has one open input and one open output.
type Outlet ¶
type Outlet interface {
Out() <-chan StreamRecord
}
Outlet represents a type that exposes one open output.
type StreamRecord ¶
type StreamRecord struct {
// contains filtered or unexported fields
}
StreamRecord is a container wraps user data and timestamp. It is the underlying transmission medium for the streaming processing.
func NewStreamRecord ¶
func NewStreamRecord(data interface{}, ts int64) StreamRecord
func NewStreamRecordWithTimestampPb ¶
func NewStreamRecordWithTimestampPb(data interface{}, timestamp *timestamppb.Timestamp) StreamRecord
func NewStreamRecordWithoutTS ¶
func NewStreamRecordWithoutTS(data interface{}) StreamRecord
func TryExactTimestamp ¶
func TryExactTimestamp(item any) StreamRecord
func (StreamRecord) Data ¶
func (sr StreamRecord) Data() interface{}
func (StreamRecord) TimestampMillis ¶
func (sr StreamRecord) TimestampMillis() int64
func (StreamRecord) WithNewData ¶
func (sr StreamRecord) WithNewData(data interface{}) StreamRecord
type UnaryOperation ¶
UnaryOperation represents user-defined unary function (i.e. Map, Filter, etc)
func FilterFunc ¶
func FilterFunc(filter UnaryOperation[bool]) (UnaryOperation[any], error)
FilterFunc transform a function to an UnaryOperation
type Window ¶
type Window interface { // MaxTimestamp returns the upper bound of the Window. // Unit: Millisecond MaxTimestamp() int64 }
Window is a bucket of elements with a finite size. timedWindow is the only implementation now.
type WindowAssigner ¶
type WindowAssigner interface { // AssignWindows assigns a slice of Window according to the given timestamp, e.g. eventTime. // The unit of the timestamp here is MilliSecond. AssignWindows(timestamp int64) ([]Window, error) }
WindowAssigner is used to assign Window(s) for a given timestamp, and thus it can create a WindowedFlow.
type WindowedFlow ¶
type WindowedFlow interface { AllowedMaxWindows(windowCnt int) WindowedFlow // TopN applies a TopNAggregation to each Window. TopN(topNum int, opts ...any) Flow }
WindowedFlow is a flow which processes incoming elements based on window. The WindowedFlow can be created with a WindowAssigner.