Documentation
¶
Overview ¶
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Code generated by mkunion. DO NOT EDIT.
Index ¶
- Variables
- func AccumulateDiscardRetractHandlerShape() shape.Shape
- func AccumulateShape() shape.Shape
- func AccumulateToJSON(x *Accumulate) ([]byte, error)
- func AccumulatingAndRetractingShape() shape.Shape
- func AccumulatingAndRetractingToJSON(x *AccumulatingAndRetracting) ([]byte, error)
- func AllOfShape() shape.Shape
- func AllOfToJSON(x *AllOf) ([]byte, error)
- func AnyOfShape() shape.Shape
- func AnyOfToJSON(x *AnyOf) ([]byte, error)
- func AtPeriod1Shape() shape.Shape
- func AtPeriod1ToJSON(x *AtPeriod1) ([]byte, error)
- func AtPeriodShape() shape.Shape
- func AtPeriodToJSON(x *AtPeriod) ([]byte, error)
- func AtWatermark1Shape() shape.Shape
- func AtWatermark1ToJSON(x *AtWatermark1) ([]byte, error)
- func AtWatermarkShape() shape.Shape
- func AtWatermarkToJSON(x *AtWatermark) ([]byte, error)
- func AtWindowItemSize1Shape() shape.Shape
- func AtWindowItemSize1ToJSON(x *AtWindowItemSize1) ([]byte, error)
- func AtWindowItemSizeShape() shape.Shape
- func AtWindowItemSizeToJSON(x *AtWindowItemSize) ([]byte, error)
- func Bool(b bool) *bool
- func DefaultContextShape() shape.Shape
- func DiscardShape() shape.Shape
- func DiscardToJSON(x *Discard) ([]byte, error)
- func DoJoinShape() shape.Shape
- func DoJoinToJSON(x *DoJoin) ([]byte, error)
- func DoLoadShape() shape.Shape
- func DoLoadToJSON(x *DoLoad) ([]byte, error)
- func DoMapShape() shape.Shape
- func DoMapToJSON(x *DoMap) ([]byte, error)
- func DoWindowShape() shape.Shape
- func DoWindowToJSON(x *DoWindow) ([]byte, error)
- func Each(x schema.Schema, f func(value schema.Schema))
- func EvaluateTriggerR0[T0 TriggerDescription, T1 TriggerType](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod), ...)
- func EvaluateTriggerR1[T0 TriggerDescription, T1 TriggerType, TOut1 any](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) TOut1, ...) TOut1
- func EvaluateTriggerR2[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2), ...) (TOut1, TOut2)
- func EvaluateTriggerR3[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any, TOut3 any](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2, TOut3), ...) (TOut1, TOut2, TOut3)
- func EventTimeShape() shape.Shape
- func FixedWindowShape() shape.Shape
- func FixedWindowToJSON(x *FixedWindow) ([]byte, error)
- func GameShape() shape.Shape
- func GenerateItemsEvery(start int64, size int, every time.Duration) chan Item
- func HashNode(n Node) string
- func InMemoryBagOfShape() shape.Shape
- func ItemGroupedByKeyShape() shape.Shape
- func ItemGroupedByWindowShape() shape.Shape
- func ItemShape() shape.Shape
- func ItemTypeShape() shape.Shape
- func KeyWithNamespace(key string, namespace string) string
- func KeyedWindowKey(x *KeyedWindow) string
- func KeyedWindowShape() shape.Shape
- func MatchNodeR0(x Node, f1 func(x *DoWindow), f2 func(x *DoMap), f3 func(x *DoLoad), ...)
- func MatchNodeR1[T0 any](x Node, f1 func(x *DoWindow) T0, f2 func(x *DoMap) T0, f3 func(x *DoLoad) T0, ...) T0
- func MatchNodeR2[T0, T1 any](x Node, f1 func(x *DoWindow) (T0, T1), f2 func(x *DoMap) (T0, T1), ...) (T0, T1)
- func MatchNodeR3[T0, T1, T2 any](x Node, f1 func(x *DoWindow) (T0, T1, T2), f2 func(x *DoMap) (T0, T1, T2), ...) (T0, T1, T2)
- func MatchTriggerDescriptionR0(x TriggerDescription, f1 func(x *AtPeriod), f2 func(x *AtWindowItemSize), ...)
- func MatchTriggerDescriptionR1[T0 any](x TriggerDescription, f1 func(x *AtPeriod) T0, f2 func(x *AtWindowItemSize) T0, ...) T0
- func MatchTriggerDescriptionR2[T0, T1 any](x TriggerDescription, f1 func(x *AtPeriod) (T0, T1), ...) (T0, T1)
- func MatchTriggerDescriptionR3[T0, T1, T2 any](x TriggerDescription, f1 func(x *AtPeriod) (T0, T1, T2), ...) (T0, T1, T2)
- func MatchTriggerTypeR0(x TriggerType, f1 func(x *AtPeriod1), f2 func(x *AtWindowItemSize1), ...)
- func MatchTriggerTypeR1[T0 any](x TriggerType, f1 func(x *AtPeriod1) T0, f2 func(x *AtWindowItemSize1) T0, ...) T0
- func MatchTriggerTypeR2[T0, T1 any](x TriggerType, f1 func(x *AtPeriod1) (T0, T1), ...) (T0, T1)
- func MatchTriggerTypeR3[T0, T1, T2 any](x TriggerType, f1 func(x *AtPeriod1) (T0, T1, T2), ...) (T0, T1, T2)
- func MatchWindowDescriptionR0(x WindowDescription, f1 func(x *SessionWindow), f2 func(x *SlidingWindow), ...)
- func MatchWindowDescriptionR1[T0 any](x WindowDescription, f1 func(x *SessionWindow) T0, ...) T0
- func MatchWindowDescriptionR2[T0, T1 any](x WindowDescription, f1 func(x *SessionWindow) (T0, T1), ...) (T0, T1)
- func MatchWindowDescriptionR3[T0, T1, T2 any](x WindowDescription, f1 func(x *SessionWindow) (T0, T1, T2), ...) (T0, T1, T2)
- func MatchWindowFlushModeR0(x WindowFlushMode, f1 func(x *Accumulate), f2 func(x *Discard), ...)
- func MatchWindowFlushModeR1[T0 any](x WindowFlushMode, f1 func(x *Accumulate) T0, f2 func(x *Discard) T0, ...) T0
- func MatchWindowFlushModeR2[T0, T1 any](x WindowFlushMode, f1 func(x *Accumulate) (T0, T1), ...) (T0, T1)
- func MatchWindowFlushModeR3[T0, T1, T2 any](x WindowFlushMode, f1 func(x *Accumulate) (T0, T1, T2), ...) (T0, T1, T2)
- func MessageShape() shape.Shape
- func NewStatsCollector() *statsCollector
- func NodeShape() shape.Shape
- func NodeToJSON(x Node) ([]byte, error)
- func NodeToString(node Node) string
- func PackRetractAndAggregate(x, y schema.Schema) *schema.Map
- func SessionWindowShape() shape.Shape
- func SessionWindowToJSON(x *SessionWindow) ([]byte, error)
- func SessionsStatsShape() shape.Shape
- func SlidingWindowShape() shape.Shape
- func SlidingWindowToJSON(x *SlidingWindow) ([]byte, error)
- func StatsShape() shape.Shape
- func TickersShape() shape.Shape
- func ToMermaidGraph(dag *DAGBuilder) string
- func ToMermaidGraphWithOrder(dag *DAGBuilder, order []Node) string
- func ToStr(x Node) string
- func ToStrItem(item *Item) string
- func TriggerDescriptionShape() shape.Shape
- func TriggerDescriptionToJSON(x TriggerDescription) ([]byte, error)
- func TriggerHandlerShape() shape.Shape
- func TriggerManagerShape() shape.Shape
- func TriggerStateShape() shape.Shape
- func TriggerTypeShape() shape.Shape
- func TriggerTypeToJSON(x TriggerType) ([]byte, error)
- func WindowBufferShape() shape.Shape
- func WindowDescriptionShape() shape.Shape
- func WindowDescriptionToJSON(x WindowDescription) ([]byte, error)
- func WindowFlushModeShape() shape.Shape
- func WindowFlushModeToJSON(x WindowFlushMode) ([]byte, error)
- func WindowKey(window *Window) string
- func WindowShape() shape.Shape
- func WindowTriggerShape() shape.Shape
- type Accumulate
- type AccumulateDiscardRetractHandler
- type AccumulatingAndRetracting
- type AllOf
- type AnyOf
- type AtPeriod
- type AtPeriod1
- type AtWatermark
- type AtWatermark1
- type AtWindowItemSize
- type AtWindowItemSize1
- type AvgHandler
- type BagOf
- type Builder
- type ContextOptionFunc
- func WithAccumulate() ContextOptionFunc
- func WithAccumulatingAndRetracting() ContextOptionFunc
- func WithDiscard() ContextOptionFunc
- func WithFixedWindow(width time.Duration) ContextOptionFunc
- func WithName(name string) ContextOptionFunc
- func WithSessionWindow(gap time.Duration) ContextOptionFunc
- func WithSlidingWindow(width time.Duration, period time.Duration) ContextOptionFunc
- func WithTriggers(and ...TriggerDescription) ContextOptionFunc
- func WithWindowDescription(wd WindowDescription) ContextOptionFunc
- func WithWindowFlushMode(fm WindowFlushMode) ContextOptionFunc
- type CountHandler
- type DAGBuilder
- func (d *DAGBuilder) Build() []Node
- func (d *DAGBuilder) GetByName(name string) (*DAGBuilder, error)
- func (d *DAGBuilder) Join(a, b Builder, opts ...ContextOptionFunc) Builder
- func (d *DAGBuilder) Load(f Handler, opts ...ContextOptionFunc) Builder
- func (d *DAGBuilder) Map(f Handler, opts ...ContextOptionFunc) Builder
- func (d *DAGBuilder) Window(opts ...ContextOptionFunc) Builder
- type DebounceHandler
- type DefaultContext
- type Discard
- type DoJoin
- type DoLoad
- type DoMap
- type DoWindow
- type Dual
- type EvaluateTrigger
- type EventTime
- type ExecutionGroup
- type ExecutionStatus
- type FilterHandler
- type FixedWindow
- type Game
- type GenerateHandler
- type Handler
- type InMemoryBagOf
- type InMemoryTwoInterpreter
- type Item
- type ItemGroupedByKey
- type ItemGroupedByWindow
- type ItemType
- type JoinHandler
- type KeyedWindow
- type ListAssert
- type LogHandler
- type MapHandler
- type MergeHandler
- type Message
- type Node
- type NodeUnionJSON
- type NodeVisitor
- type PubSub
- type PubSubChan
- type PubSubForInterpreter
- type PubSubMulti
- type PubSubSingle
- type PubSubSingler
- type RepositorySink
- type SessionWindow
- type SessionsStats
- type SimpleProcessHandler
- type SlidingWindow
- type Stats
- type StatsCollector
- type Tickers
- type TimeSignaler
- type TriggerDescription
- type TriggerDescriptionUnionJSON
- type TriggerDescriptionVisitor
- type TriggerHandler
- type TriggerManager
- func (tm *TriggerManager) SignalDuration(duration time.Duration)
- func (tm *TriggerManager) SignalWatermark(timestamp int64)
- func (tm *TriggerManager) SignalWindowCreated(kw *KeyedWindow)
- func (tm *TriggerManager) SignalWindowDeleted(kw *KeyedWindow)
- func (tm *TriggerManager) SignalWindowSizeReached(kw *KeyedWindow, size int)
- func (tm *TriggerManager) WhenTrigger(f func(w *KeyedWindow))
- type TriggerState
- type TriggerType
- type TriggerTypeUnionJSON
- type TriggerTypeVisitor
- type WatermarkSignaler
- type Window
- type WindowBuffer
- func (wb *WindowBuffer) Append(x Item)
- func (wb *WindowBuffer) EachItemGroupedByWindow(f func(group *ItemGroupedByWindow))
- func (wb *WindowBuffer) EachKeyedWindow(kw *KeyedWindow, f func(group *ItemGroupedByWindow))
- func (wb *WindowBuffer) GroupAlsoByWindow(x []ItemGroupedByKey)
- func (wb *WindowBuffer) RemoveItemGropedByWindow(item *ItemGroupedByWindow)
- type WindowBufferSignaler
- type WindowDescription
- type WindowDescriptionUnionJSON
- type WindowDescriptionVisitor
- type WindowFlushMode
- type WindowFlushModeUnionJSON
- type WindowFlushModeVisitor
- type WindowTrigger
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoPublisher = errors.New("no appendLog") ErrFinished = errors.New("appendLog is finished") ErrContextDone = errors.New("context is done") ErrHandlerReturnErr = errors.New("handler returned error") ErrPublishWithOffset = errors.New("cannot publish message with offset") )
var (
ErrInterpreterNotInNewState = fmt.Errorf("interpreter is not in new state")
)
var ErrNotFound = errors.New("node not found")
var NotFound = errors.New("not found")
Functions ¶
func AccumulateShape ¶
func AccumulateToJSON ¶
func AccumulateToJSON(x *Accumulate) ([]byte, error)
func AccumulatingAndRetractingToJSON ¶
func AccumulatingAndRetractingToJSON(x *AccumulatingAndRetracting) ([]byte, error)
func AllOfShape ¶
func AllOfToJSON ¶
func AnyOfShape ¶
func AnyOfToJSON ¶
func AtPeriod1Shape ¶
func AtPeriod1ToJSON ¶
func AtPeriodShape ¶
func AtPeriodToJSON ¶
func AtWatermark1Shape ¶
func AtWatermark1ToJSON ¶
func AtWatermark1ToJSON(x *AtWatermark1) ([]byte, error)
func AtWatermarkShape ¶
func AtWatermarkToJSON ¶
func AtWatermarkToJSON(x *AtWatermark) ([]byte, error)
func AtWindowItemSize1Shape ¶
func AtWindowItemSize1ToJSON ¶
func AtWindowItemSize1ToJSON(x *AtWindowItemSize1) ([]byte, error)
func AtWindowItemSizeShape ¶
func AtWindowItemSizeToJSON ¶
func AtWindowItemSizeToJSON(x *AtWindowItemSize) ([]byte, error)
func DefaultContextShape ¶
func DiscardShape ¶
func DiscardToJSON ¶
func DoJoinShape ¶
func DoJoinToJSON ¶
func DoLoadShape ¶
func DoLoadToJSON ¶
func DoMapShape ¶
func DoMapToJSON ¶
func DoWindowShape ¶
func DoWindowToJSON ¶
func EvaluateTriggerR0 ¶
func EvaluateTriggerR0[T0 TriggerDescription, T1 TriggerType]( t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod), f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize), f2 func(x0 *AtWatermark, x1 *AtWatermark), f3 func(x0 *AnyOf, x1 TriggerType), f4 func(x0 *AllOf, x1 TriggerType), f5 func(x0 T0, x1 T1), )
func EvaluateTriggerR1 ¶
func EvaluateTriggerR1[T0 TriggerDescription, T1 TriggerType, TOut1 any]( t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) TOut1, f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize) TOut1, f2 func(x0 *AtWatermark, x1 *AtWatermark) TOut1, f3 func(x0 *AnyOf, x1 TriggerType) TOut1, f4 func(x0 *AllOf, x1 TriggerType) TOut1, f5 func(x0 T0, x1 T1) TOut1, ) TOut1
func EvaluateTriggerR2 ¶
func EvaluateTriggerR2[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any]( t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2), f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize) (TOut1, TOut2), f2 func(x0 *AtWatermark, x1 *AtWatermark) (TOut1, TOut2), f3 func(x0 *AnyOf, x1 TriggerType) (TOut1, TOut2), f4 func(x0 *AllOf, x1 TriggerType) (TOut1, TOut2), f5 func(x0 T0, x1 T1) (TOut1, TOut2), ) (TOut1, TOut2)
func EvaluateTriggerR3 ¶
func EvaluateTriggerR3[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any, TOut3 any]( t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2, TOut3), f1 func(x0 *AtWindowItemSize, x1 *AtWindowItemSize) (TOut1, TOut2, TOut3), f2 func(x0 *AtWatermark, x1 *AtWatermark) (TOut1, TOut2, TOut3), f3 func(x0 *AnyOf, x1 TriggerType) (TOut1, TOut2, TOut3), f4 func(x0 *AllOf, x1 TriggerType) (TOut1, TOut2, TOut3), f5 func(x0 T0, x1 T1) (TOut1, TOut2, TOut3), ) (TOut1, TOut2, TOut3)
func EventTimeShape ¶
func FixedWindowShape ¶
func FixedWindowToJSON ¶
func FixedWindowToJSON(x *FixedWindow) ([]byte, error)
func GenerateItemsEvery ¶
func InMemoryBagOfShape ¶
func ItemGroupedByKeyShape ¶
func ItemTypeShape ¶
func KeyWithNamespace ¶
func KeyedWindowKey ¶
func KeyedWindowKey(x *KeyedWindow) string
func KeyedWindowShape ¶
func MatchNodeR0 ¶
func MatchNodeR1 ¶
func MatchNodeR2 ¶
func MatchNodeR3 ¶
func MatchTriggerDescriptionR0 ¶
func MatchTriggerDescriptionR0( x TriggerDescription, f1 func(x *AtPeriod), f2 func(x *AtWindowItemSize), f3 func(x *AtWatermark), f4 func(x *AnyOf), f5 func(x *AllOf), )
func MatchTriggerDescriptionR1 ¶
func MatchTriggerDescriptionR1[T0 any]( x TriggerDescription, f1 func(x *AtPeriod) T0, f2 func(x *AtWindowItemSize) T0, f3 func(x *AtWatermark) T0, f4 func(x *AnyOf) T0, f5 func(x *AllOf) T0, ) T0
func MatchTriggerDescriptionR2 ¶
func MatchTriggerDescriptionR2[T0, T1 any]( x TriggerDescription, f1 func(x *AtPeriod) (T0, T1), f2 func(x *AtWindowItemSize) (T0, T1), f3 func(x *AtWatermark) (T0, T1), f4 func(x *AnyOf) (T0, T1), f5 func(x *AllOf) (T0, T1), ) (T0, T1)
func MatchTriggerDescriptionR3 ¶
func MatchTriggerDescriptionR3[T0, T1, T2 any]( x TriggerDescription, f1 func(x *AtPeriod) (T0, T1, T2), f2 func(x *AtWindowItemSize) (T0, T1, T2), f3 func(x *AtWatermark) (T0, T1, T2), f4 func(x *AnyOf) (T0, T1, T2), f5 func(x *AllOf) (T0, T1, T2), ) (T0, T1, T2)
func MatchTriggerTypeR0 ¶
func MatchTriggerTypeR0( x TriggerType, f1 func(x *AtPeriod1), f2 func(x *AtWindowItemSize1), f3 func(x *AtWatermark1), )
func MatchTriggerTypeR1 ¶
func MatchTriggerTypeR1[T0 any]( x TriggerType, f1 func(x *AtPeriod1) T0, f2 func(x *AtWindowItemSize1) T0, f3 func(x *AtWatermark1) T0, ) T0
func MatchTriggerTypeR2 ¶
func MatchTriggerTypeR2[T0, T1 any]( x TriggerType, f1 func(x *AtPeriod1) (T0, T1), f2 func(x *AtWindowItemSize1) (T0, T1), f3 func(x *AtWatermark1) (T0, T1), ) (T0, T1)
func MatchTriggerTypeR3 ¶
func MatchTriggerTypeR3[T0, T1, T2 any]( x TriggerType, f1 func(x *AtPeriod1) (T0, T1, T2), f2 func(x *AtWindowItemSize1) (T0, T1, T2), f3 func(x *AtWatermark1) (T0, T1, T2), ) (T0, T1, T2)
func MatchWindowDescriptionR0 ¶
func MatchWindowDescriptionR0( x WindowDescription, f1 func(x *SessionWindow), f2 func(x *SlidingWindow), f3 func(x *FixedWindow), )
func MatchWindowDescriptionR1 ¶
func MatchWindowDescriptionR1[T0 any]( x WindowDescription, f1 func(x *SessionWindow) T0, f2 func(x *SlidingWindow) T0, f3 func(x *FixedWindow) T0, ) T0
func MatchWindowDescriptionR2 ¶
func MatchWindowDescriptionR2[T0, T1 any]( x WindowDescription, f1 func(x *SessionWindow) (T0, T1), f2 func(x *SlidingWindow) (T0, T1), f3 func(x *FixedWindow) (T0, T1), ) (T0, T1)
func MatchWindowDescriptionR3 ¶
func MatchWindowDescriptionR3[T0, T1, T2 any]( x WindowDescription, f1 func(x *SessionWindow) (T0, T1, T2), f2 func(x *SlidingWindow) (T0, T1, T2), f3 func(x *FixedWindow) (T0, T1, T2), ) (T0, T1, T2)
func MatchWindowFlushModeR0 ¶
func MatchWindowFlushModeR0( x WindowFlushMode, f1 func(x *Accumulate), f2 func(x *Discard), f3 func(x *AccumulatingAndRetracting), )
func MatchWindowFlushModeR1 ¶
func MatchWindowFlushModeR1[T0 any]( x WindowFlushMode, f1 func(x *Accumulate) T0, f2 func(x *Discard) T0, f3 func(x *AccumulatingAndRetracting) T0, ) T0
func MatchWindowFlushModeR2 ¶
func MatchWindowFlushModeR2[T0, T1 any]( x WindowFlushMode, f1 func(x *Accumulate) (T0, T1), f2 func(x *Discard) (T0, T1), f3 func(x *AccumulatingAndRetracting) (T0, T1), ) (T0, T1)
func MatchWindowFlushModeR3 ¶
func MatchWindowFlushModeR3[T0, T1, T2 any]( x WindowFlushMode, f1 func(x *Accumulate) (T0, T1, T2), f2 func(x *Discard) (T0, T1, T2), f3 func(x *AccumulatingAndRetracting) (T0, T1, T2), ) (T0, T1, T2)
func MessageShape ¶
func NewStatsCollector ¶
func NewStatsCollector() *statsCollector
func NodeToJSON ¶
func NodeToString ¶
func SessionWindowShape ¶
func SessionWindowToJSON ¶
func SessionWindowToJSON(x *SessionWindow) ([]byte, error)
func SessionsStatsShape ¶
func SlidingWindowShape ¶
func SlidingWindowToJSON ¶
func SlidingWindowToJSON(x *SlidingWindow) ([]byte, error)
func StatsShape ¶
func TickersShape ¶
func ToMermaidGraph ¶
func ToMermaidGraph(dag *DAGBuilder) string
func ToMermaidGraphWithOrder ¶
func ToMermaidGraphWithOrder(dag *DAGBuilder, order []Node) string
func TriggerDescriptionShape ¶
func TriggerDescriptionToJSON ¶
func TriggerDescriptionToJSON(x TriggerDescription) ([]byte, error)
func TriggerHandlerShape ¶
func TriggerManagerShape ¶
func TriggerStateShape ¶
func TriggerTypeShape ¶
func TriggerTypeToJSON ¶
func TriggerTypeToJSON(x TriggerType) ([]byte, error)
func WindowBufferShape ¶
func WindowDescriptionShape ¶
func WindowDescriptionToJSON ¶
func WindowDescriptionToJSON(x WindowDescription) ([]byte, error)
func WindowFlushModeShape ¶
func WindowFlushModeToJSON ¶
func WindowFlushModeToJSON(x WindowFlushMode) ([]byte, error)
func WindowShape ¶
func WindowTriggerShape ¶
Types ¶
type Accumulate ¶
func AccumulateFromJSON ¶
func AccumulateFromJSON(x []byte) (*Accumulate, error)
func (*Accumulate) AcceptWindowFlushMode ¶
func (r *Accumulate) AcceptWindowFlushMode(v WindowFlushModeVisitor) any
func (*Accumulate) MarshalJSON ¶
func (r *Accumulate) MarshalJSON() ([]byte, error)
func (*Accumulate) UnmarshalJSON ¶
func (r *Accumulate) UnmarshalJSON(data []byte) error
type AccumulateDiscardRetractHandler ¶
type AccumulateDiscardRetractHandler struct {
// contains filtered or unexported fields
}
type AccumulatingAndRetracting ¶
func AccumulatingAndRetractingFromJSON ¶
func AccumulatingAndRetractingFromJSON(x []byte) (*AccumulatingAndRetracting, error)
func (*AccumulatingAndRetracting) AcceptWindowFlushMode ¶
func (r *AccumulatingAndRetracting) AcceptWindowFlushMode(v WindowFlushModeVisitor) any
func (*AccumulatingAndRetracting) MarshalJSON ¶
func (r *AccumulatingAndRetracting) MarshalJSON() ([]byte, error)
func (*AccumulatingAndRetracting) UnmarshalJSON ¶
func (r *AccumulatingAndRetracting) UnmarshalJSON(data []byte) error
type AllOf ¶
type AllOf struct {
Triggers []TriggerDescription
}
func AllOfFromJSON ¶
func (*AllOf) AcceptTriggerDescription ¶
func (r *AllOf) AcceptTriggerDescription(v TriggerDescriptionVisitor) any
func (*AllOf) MarshalJSON ¶
func (*AllOf) UnmarshalJSON ¶
type AnyOf ¶
type AnyOf struct {
Triggers []TriggerDescription
}
func AnyOfFromJSON ¶
func (*AnyOf) AcceptTriggerDescription ¶
func (r *AnyOf) AcceptTriggerDescription(v TriggerDescriptionVisitor) any
func (*AnyOf) MarshalJSON ¶
func (*AnyOf) UnmarshalJSON ¶
type AtPeriod ¶
func AtPeriodFromJSON ¶
func (*AtPeriod) AcceptTriggerDescription ¶
func (r *AtPeriod) AcceptTriggerDescription(v TriggerDescriptionVisitor) any
func (*AtPeriod) MarshalJSON ¶
func (*AtPeriod) UnmarshalJSON ¶
type AtPeriod1 ¶
type AtPeriod1 = AtPeriod
go:generate mkunion -name=TriggerType -variants=AtPeriod,AtWindowItemSize,AtWatermark
func AtPeriod1FromJSON ¶
func (*AtPeriod1) AcceptTriggerType ¶
func (r *AtPeriod1) AcceptTriggerType(v TriggerTypeVisitor) any
type AtWatermark ¶
type AtWatermark struct {
Timestamp int64
}
func AtWatermarkFromJSON ¶
func AtWatermarkFromJSON(x []byte) (*AtWatermark, error)
func (*AtWatermark) AcceptTriggerDescription ¶
func (r *AtWatermark) AcceptTriggerDescription(v TriggerDescriptionVisitor) any
func (*AtWatermark) MarshalJSON ¶
func (r *AtWatermark) MarshalJSON() ([]byte, error)
func (*AtWatermark) UnmarshalJSON ¶
func (r *AtWatermark) UnmarshalJSON(data []byte) error
type AtWatermark1 ¶
type AtWatermark1 = AtWatermark
go:generate mkunion -name=TriggerType -variants=AtPeriod,AtWindowItemSize,AtWatermark
func AtWatermark1FromJSON ¶
func AtWatermark1FromJSON(x []byte) (*AtWatermark1, error)
func (*AtWatermark1) AcceptTriggerType ¶
func (r *AtWatermark1) AcceptTriggerType(v TriggerTypeVisitor) any
type AtWindowItemSize ¶
type AtWindowItemSize struct {
Number int
}
func AtWindowItemSizeFromJSON ¶
func AtWindowItemSizeFromJSON(x []byte) (*AtWindowItemSize, error)
func (*AtWindowItemSize) AcceptTriggerDescription ¶
func (r *AtWindowItemSize) AcceptTriggerDescription(v TriggerDescriptionVisitor) any
func (*AtWindowItemSize) MarshalJSON ¶
func (r *AtWindowItemSize) MarshalJSON() ([]byte, error)
func (*AtWindowItemSize) UnmarshalJSON ¶
func (r *AtWindowItemSize) UnmarshalJSON(data []byte) error
type AtWindowItemSize1 ¶
type AtWindowItemSize1 = AtWindowItemSize
go:generate mkunion -name=TriggerType -variants=AtPeriod,AtWindowItemSize,AtWatermark
func AtWindowItemSize1FromJSON ¶
func AtWindowItemSize1FromJSON(x []byte) (*AtWindowItemSize1, error)
func (*AtWindowItemSize1) AcceptTriggerType ¶
func (r *AtWindowItemSize1) AcceptTriggerType(v TriggerTypeVisitor) any
type AvgHandler ¶
type AvgHandler struct {
// contains filtered or unexported fields
}
type Builder ¶
type Builder interface { Load(f Handler, opts ...ContextOptionFunc) Builder Window(opts ...ContextOptionFunc) Builder Map(f Handler, opts ...ContextOptionFunc) Builder Join(a, b Builder, opts ...ContextOptionFunc) Builder Build() []Node }
type ContextOptionFunc ¶
type ContextOptionFunc func(c *DefaultContext)
func WithAccumulate ¶
func WithAccumulate() ContextOptionFunc
func WithAccumulatingAndRetracting ¶
func WithAccumulatingAndRetracting() ContextOptionFunc
func WithDiscard ¶
func WithDiscard() ContextOptionFunc
func WithFixedWindow ¶
func WithFixedWindow(width time.Duration) ContextOptionFunc
func WithName ¶
func WithName(name string) ContextOptionFunc
func WithSessionWindow ¶
func WithSessionWindow(gap time.Duration) ContextOptionFunc
func WithSlidingWindow ¶
func WithSlidingWindow(width time.Duration, period time.Duration) ContextOptionFunc
func WithTriggers ¶
func WithTriggers(and ...TriggerDescription) ContextOptionFunc
func WithWindowDescription ¶
func WithWindowDescription(wd WindowDescription) ContextOptionFunc
func WithWindowFlushMode ¶
func WithWindowFlushMode(fm WindowFlushMode) ContextOptionFunc
type CountHandler ¶
type CountHandler struct {
// contains filtered or unexported fields
}
type DAGBuilder ¶
type DAGBuilder struct {
// contains filtered or unexported fields
}
func NewDAGBuilder ¶
func NewDAGBuilder() *DAGBuilder
func (*DAGBuilder) Build ¶
func (d *DAGBuilder) Build() []Node
func (*DAGBuilder) GetByName ¶
func (d *DAGBuilder) GetByName(name string) (*DAGBuilder, error)
func (*DAGBuilder) Join ¶
func (d *DAGBuilder) Join(a, b Builder, opts ...ContextOptionFunc) Builder
func (*DAGBuilder) Load ¶
func (d *DAGBuilder) Load(f Handler, opts ...ContextOptionFunc) Builder
DoLoad loads data from a source. This node is a root of the DAG. DAG can have many DoLoad nodesFromTo.
func (*DAGBuilder) Map ¶
func (d *DAGBuilder) Map(f Handler, opts ...ContextOptionFunc) Builder
func (*DAGBuilder) Window ¶
func (d *DAGBuilder) Window(opts ...ContextOptionFunc) Builder
type DebounceHandler ¶
type DebounceHandler struct { MaxSize int MaxTime time.Duration // contains filtered or unexported fields }
type DefaultContext ¶
type DefaultContext struct {
// contains filtered or unexported fields
}
func GetCtx ¶
func GetCtx(node Node) *DefaultContext
func NewContextBuilder ¶
func NewContextBuilder(builders ...func(config *DefaultContext)) *DefaultContext
func (*DefaultContext) Name ¶
func (c *DefaultContext) Name() string
func (*DefaultContext) Scope ¶
func (c *DefaultContext) Scope(name string) *DefaultContext
type Discard ¶
type Discard struct{}
func DiscardFromJSON ¶
func (*Discard) AcceptWindowFlushMode ¶
func (r *Discard) AcceptWindowFlushMode(v WindowFlushModeVisitor) any
func (*Discard) MarshalJSON ¶
func (*Discard) UnmarshalJSON ¶
type DoJoin ¶
type DoJoin struct { Ctx *DefaultContext Input []Node }
func DoJoinFromJSON ¶
func (*DoJoin) AcceptNode ¶
func (r *DoJoin) AcceptNode(v NodeVisitor) any
func (*DoJoin) MarshalJSON ¶
func (*DoJoin) UnmarshalJSON ¶
type DoLoad ¶
type DoLoad struct { Ctx *DefaultContext OnLoad Handler }
func DoLoadFromJSON ¶
func (*DoLoad) AcceptNode ¶
func (r *DoLoad) AcceptNode(v NodeVisitor) any
func (*DoLoad) MarshalJSON ¶
func (*DoLoad) UnmarshalJSON ¶
type DoMap ¶
type DoMap struct { Ctx *DefaultContext OnMap Handler Input Node }
DoMap implicitly means, merge by key
func DoMapFromJSON ¶
func (*DoMap) AcceptNode ¶
func (r *DoMap) AcceptNode(v NodeVisitor) any
func (*DoMap) MarshalJSON ¶
func (*DoMap) UnmarshalJSON ¶
type DoWindow ¶
type DoWindow struct { Ctx *DefaultContext Input Node }
func DoWindowFromJSON ¶
func (*DoWindow) AcceptNode ¶
func (r *DoWindow) AcceptNode(v NodeVisitor) any
func (*DoWindow) MarshalJSON ¶
func (*DoWindow) UnmarshalJSON ¶
type Dual ¶
type Dual struct {
// contains filtered or unexported fields
}
func (*Dual) ReturningAggregate ¶
func (*Dual) ReturningRetract ¶
type EvaluateTrigger ¶
type EvaluateTrigger[T0 TriggerDescription, T1 TriggerType] interface { MatchPeriod(*AtPeriod, *AtPeriod) MatchCount(*AtWindowItemSize, *AtWindowItemSize) MatchWatermark(*AtWatermark, *AtWatermark) MatchAnyOfAny(*AnyOf, TriggerType) MatchAllOfAny(*AllOf, TriggerType) MatchDefault(T0, T1) }
type ExecutionGroup ¶
type ExecutionGroup struct {
// contains filtered or unexported fields
}
func (*ExecutionGroup) Go ¶
func (g *ExecutionGroup) Go(f func() error)
func (*ExecutionGroup) Wait ¶
func (g *ExecutionGroup) Wait() error
type ExecutionStatus ¶
type ExecutionStatus int
import (
"context" "fmt" log "github.com/sirupsen/logrus" "github.com/widmogrod/mkunion/x/schema" "sync"
)
func DefaultInMemoryInterpreter() *InMemoryInterpreter { return &InMemoryInterpreter{ pubsub: NewPubSubMultiChan[Node](), //pubsub: NewPubSub[Node](), byKeys: make(map[Node]map[string]Item), running: make(map[Node]struct{}), stats: NewStatsCollector(), } }
const ( ExecutionStatusNew ExecutionStatus = iota ExecutionStatusRunning ExecutionStatusError ExecutionStatusFinished )
type FilterHandler ¶
type FilterHandler struct {
Where *predicate.WherePredicates
}
type FixedWindow ¶
func FixedWindowFromJSON ¶
func FixedWindowFromJSON(x []byte) (*FixedWindow, error)
func (*FixedWindow) AcceptWindowDescription ¶
func (r *FixedWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any
func (*FixedWindow) MarshalJSON ¶
func (r *FixedWindow) MarshalJSON() ([]byte, error)
func (*FixedWindow) UnmarshalJSON ¶
func (r *FixedWindow) UnmarshalJSON(data []byte) error
type GenerateHandler ¶
type Handler ¶
type InMemoryBagOf ¶
type InMemoryBagOf[A any] struct { // contains filtered or unexported fields }
func NewInMemoryBagOf ¶
func NewInMemoryBagOf[A any]() *InMemoryBagOf[A]
func (*InMemoryBagOf[A]) Del ¶
func (b *InMemoryBagOf[A]) Del(key string) error
func (*InMemoryBagOf[A]) Get ¶
func (b *InMemoryBagOf[A]) Get(key string) (A, error)
func (*InMemoryBagOf[A]) Range ¶
func (b *InMemoryBagOf[A]) Range(f func(key string, item A))
func (*InMemoryBagOf[A]) Set ¶
func (b *InMemoryBagOf[A]) Set(key string, value A) error
type InMemoryTwoInterpreter ¶
type InMemoryTwoInterpreter struct {
// contains filtered or unexported fields
}
func NewInMemoryTwoInterpreter ¶
func NewInMemoryTwoInterpreter() *InMemoryTwoInterpreter
func (*InMemoryTwoInterpreter) Run ¶
func (i *InMemoryTwoInterpreter) Run(ctx context.Context, nodes []Node) error
func (*InMemoryTwoInterpreter) StatsSnapshotAndReset ¶
func (i *InMemoryTwoInterpreter) StatsSnapshotAndReset() Stats
type Item ¶
func AssignWindows ¶
func AssignWindows(x []Item, wd WindowDescription) []Item
func DropTimestamps ¶
func ExpandToElements ¶
func ExpandToElements(x []ItemGroupedByWindow) []Item
func ToElement ¶
func ToElement(group *ItemGroupedByWindow) Item
func (*Item) MarshalJSON ¶
func (*Item) UnmarshalJSON ¶
type ItemGroupedByKey ¶
func GroupByKey ¶
func GroupByKey(x []Item) []ItemGroupedByKey
func MergeWindows ¶
func MergeWindows(x []ItemGroupedByKey, wd WindowDescription) []ItemGroupedByKey
func (*ItemGroupedByKey) MarshalJSON ¶
func (r *ItemGroupedByKey) MarshalJSON() ([]byte, error)
func (*ItemGroupedByKey) UnmarshalJSON ¶
func (r *ItemGroupedByKey) UnmarshalJSON(data []byte) error
type ItemGroupedByWindow ¶
func GroupAlsoByWindow ¶
func GroupAlsoByWindow(x []ItemGroupedByKey) []ItemGroupedByWindow
func (*ItemGroupedByWindow) MarshalJSON ¶
func (r *ItemGroupedByWindow) MarshalJSON() ([]byte, error)
func (*ItemGroupedByWindow) UnmarshalJSON ¶
func (r *ItemGroupedByWindow) UnmarshalJSON(data []byte) error
type JoinHandler ¶
type KeyedWindow ¶
func ToKeyedWindowFromGrouped ¶
func ToKeyedWindowFromGrouped(x *ItemGroupedByWindow) *KeyedWindow
func ToKeyedWindowFromItem ¶
func ToKeyedWindowFromItem(x *Item) *KeyedWindow
type ListAssert ¶
func (*ListAssert) AssertLen ¶
func (l *ListAssert) AssertLen(expected int) bool
func (*ListAssert) Contains ¶
func (l *ListAssert) Contains(expected Item) bool
func (*ListAssert) Returning ¶
func (l *ListAssert) Returning(msg Item)
type LogHandler ¶
type LogHandler struct {
// contains filtered or unexported fields
}
type MapHandler ¶
type MergeHandler ¶
type MergeHandler[A any] struct { Combine func(a, b A) (A, error) DoRetract func(a, b A) (A, error) }
type Node ¶
type Node interface {
AcceptNode(g NodeVisitor) any
}
func NodeFromJSON ¶
func ReverseSort ¶
func Sort ¶
func Sort(dag *DAGBuilder) []Node
Sort sorts nodesFromTo in topological order https://en.wikipedia.org/wiki/Topological_sorting using Kahn's algorithm
type NodeUnionJSON ¶
type NodeUnionJSON struct { Type string `json:"$type,omitempty"` DoWindow json.RawMessage `json:"projection.DoWindow,omitempty"` DoMap json.RawMessage `json:"projection.DoMap,omitempty"` DoLoad json.RawMessage `json:"projection.DoLoad,omitempty"` DoJoin json.RawMessage `json:"projection.DoJoin,omitempty"` }
type NodeVisitor ¶
type PubSub ¶
type PubSub[T comparable] struct { // contains filtered or unexported fields }
func NewPubSub ¶
func NewPubSub[T comparable]() *PubSub[T]
type PubSubChan ¶
type PubSubChan[T any] struct { // contains filtered or unexported fields }
func NewPubSubChan ¶
func NewPubSubChan[T any]() *PubSubChan[T]
func (*PubSubChan[T]) Close ¶
func (s *PubSubChan[T]) Close()
func (*PubSubChan[T]) Process ¶
func (s *PubSubChan[T]) Process()
func (*PubSubChan[T]) Publish ¶
func (s *PubSubChan[T]) Publish(msg T) error
func (*PubSubChan[T]) Subscribe ¶
func (s *PubSubChan[T]) Subscribe(f func(T) error) error
type PubSubForInterpreter ¶
type PubSubMulti ¶
type PubSubMulti[T comparable] struct { // contains filtered or unexported fields }
func NewPubSubMultiChan ¶
func NewPubSubMultiChan[T comparable]() *PubSubMulti[T]
func (*PubSubMulti[T]) Finish ¶
func (p *PubSubMulti[T]) Finish(ctx context.Context, key T)
func (*PubSubMulti[T]) Publish ¶
func (p *PubSubMulti[T]) Publish(ctx context.Context, key T, msg Message) error
func (*PubSubMulti[T]) Register ¶
func (p *PubSubMulti[T]) Register(key T) error
type PubSubSingle ¶
type PubSubSingle struct {
// contains filtered or unexported fields
}
func NewPubSubSingle ¶
func NewPubSubSingle() *PubSubSingle
func (*PubSubSingle) Finish ¶
func (p *PubSubSingle) Finish()
Finish is called when a node won't publish any more messages
type PubSubSingler ¶
type PubSubSingler[T comparable] interface { Publish(msg T) error Process() Subscribe(f func(T) error) error Close() }
type RepositorySink ¶
type RepositorySink struct {
// contains filtered or unexported fields
}
func NewRepositorySink ¶
func NewRepositorySink(recordType string, store schemaless.Repository[schema.Schema]) *RepositorySink
func (*RepositorySink) FlushOnTime ¶
func (s *RepositorySink) FlushOnTime()
type SessionWindow ¶
func SessionWindowFromJSON ¶
func SessionWindowFromJSON(x []byte) (*SessionWindow, error)
func (*SessionWindow) AcceptWindowDescription ¶
func (r *SessionWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any
func (*SessionWindow) MarshalJSON ¶
func (r *SessionWindow) MarshalJSON() ([]byte, error)
func (*SessionWindow) UnmarshalJSON ¶
func (r *SessionWindow) UnmarshalJSON(data []byte) error
type SessionsStats ¶
func (*SessionsStats) MarshalJSON ¶
func (r *SessionsStats) MarshalJSON() ([]byte, error)
func (*SessionsStats) UnmarshalJSON ¶
func (r *SessionsStats) UnmarshalJSON(data []byte) error
type SimpleProcessHandler ¶
type SimpleProcessHandler struct { P func(x Item, returning func(Item)) error R func(x Item, returning func(Item)) error }
type SlidingWindow ¶
func SlidingWindowFromJSON ¶
func SlidingWindowFromJSON(x []byte) (*SlidingWindow, error)
func (*SlidingWindow) AcceptWindowDescription ¶
func (r *SlidingWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any
func (*SlidingWindow) MarshalJSON ¶
func (r *SlidingWindow) MarshalJSON() ([]byte, error)
func (*SlidingWindow) UnmarshalJSON ¶
func (r *SlidingWindow) UnmarshalJSON(data []byte) error
type StatsCollector ¶
type Tickers ¶
type Tickers struct {
// contains filtered or unexported fields
}
func NewTimeTicker ¶
func NewTimeTicker() *Tickers
func (*Tickers) Register ¶
func (t *Tickers) Register(td TriggerDescription, ts TimeSignaler)
func (*Tickers) Unregister ¶
func (t *Tickers) Unregister(td TriggerDescription)
type TimeSignaler ¶
type TriggerDescription ¶
type TriggerDescription interface {
AcceptTriggerDescription(g TriggerDescriptionVisitor) any
}
func TriggerDescriptionFromJSON ¶
func TriggerDescriptionFromJSON(x []byte) (TriggerDescription, error)
type TriggerDescriptionUnionJSON ¶
type TriggerDescriptionUnionJSON struct { Type string `json:"$type,omitempty"` AtPeriod json.RawMessage `json:"projection.AtPeriod,omitempty"` AtWindowItemSize json.RawMessage `json:"projection.AtWindowItemSize,omitempty"` AtWatermark json.RawMessage `json:"projection.AtWatermark,omitempty"` AnyOf json.RawMessage `json:"projection.AnyOf,omitempty"` AllOf json.RawMessage `json:"projection.AllOf,omitempty"` }
type TriggerDescriptionVisitor ¶
type TriggerDescriptionVisitor interface { VisitAtPeriod(v *AtPeriod) any VisitAtWindowItemSize(v *AtWindowItemSize) any VisitAtWatermark(v *AtWatermark) any VisitAnyOf(v *AnyOf) any VisitAllOf(v *AllOf) any }
type TriggerHandler ¶
type TriggerHandler struct {
// contains filtered or unexported fields
}
func (*TriggerHandler) Process ¶
func (tm *TriggerHandler) Process(x Item, returning func(Item)) error
func (*TriggerHandler) Retract ¶
func (tm *TriggerHandler) Retract(x Item, returning func(Item)) error
func (*TriggerHandler) Triggered ¶
func (tm *TriggerHandler) Triggered(trigger TriggerType, returning func(Item)) error
type TriggerManager ¶
type TriggerManager struct {
// contains filtered or unexported fields
}
func NewTriggerManager ¶
func NewTriggerManager(td TriggerDescription) *TriggerManager
func (*TriggerManager) SignalDuration ¶
func (tm *TriggerManager) SignalDuration(duration time.Duration)
func (*TriggerManager) SignalWatermark ¶
func (tm *TriggerManager) SignalWatermark(timestamp int64)
func (*TriggerManager) SignalWindowCreated ¶
func (tm *TriggerManager) SignalWindowCreated(kw *KeyedWindow)
func (*TriggerManager) SignalWindowDeleted ¶
func (tm *TriggerManager) SignalWindowDeleted(kw *KeyedWindow)
func (*TriggerManager) SignalWindowSizeReached ¶
func (tm *TriggerManager) SignalWindowSizeReached(kw *KeyedWindow, size int)
func (*TriggerManager) WhenTrigger ¶
func (tm *TriggerManager) WhenTrigger(f func(w *KeyedWindow))
type TriggerState ¶
type TriggerState struct {
// contains filtered or unexported fields
}
type TriggerType ¶
type TriggerType interface {
AcceptTriggerType(g TriggerTypeVisitor) any
}
func TriggerTypeFromJSON ¶
func TriggerTypeFromJSON(x []byte) (TriggerType, error)
type TriggerTypeUnionJSON ¶
type TriggerTypeUnionJSON struct { Type string `json:"$type,omitempty"` AtPeriod1 json.RawMessage `json:"projection.AtPeriod1,omitempty"` AtWindowItemSize1 json.RawMessage `json:"projection.AtWindowItemSize1,omitempty"` AtWatermark1 json.RawMessage `json:"projection.AtWatermark1,omitempty"` }
type TriggerTypeVisitor ¶
type TriggerTypeVisitor interface { VisitAtPeriod1(v *AtPeriod1) any VisitAtWindowItemSize1(v *AtWindowItemSize1) any VisitAtWatermark1(v *AtWatermark1) any }
type WatermarkSignaler ¶
type WatermarkSignaler interface {
SignalWatermark(timestamp int64)
}
type WindowBuffer ¶
type WindowBuffer struct {
// contains filtered or unexported fields
}
func NewWindowBuffer ¶
func NewWindowBuffer(wd WindowDescription, sig WindowBufferSignaler) *WindowBuffer
func (*WindowBuffer) Append ¶
func (wb *WindowBuffer) Append(x Item)
func (*WindowBuffer) EachItemGroupedByWindow ¶
func (wb *WindowBuffer) EachItemGroupedByWindow(f func(group *ItemGroupedByWindow))
func (*WindowBuffer) EachKeyedWindow ¶
func (wb *WindowBuffer) EachKeyedWindow(kw *KeyedWindow, f func(group *ItemGroupedByWindow))
func (*WindowBuffer) GroupAlsoByWindow ¶
func (wb *WindowBuffer) GroupAlsoByWindow(x []ItemGroupedByKey)
func (*WindowBuffer) RemoveItemGropedByWindow ¶
func (wb *WindowBuffer) RemoveItemGropedByWindow(item *ItemGroupedByWindow)
type WindowBufferSignaler ¶
type WindowBufferSignaler interface { SignalWindowCreated(kw *KeyedWindow) SignalWindowDeleted(kw *KeyedWindow) SignalWindowSizeReached(kw *KeyedWindow, size int) }
type WindowDescription ¶
type WindowDescription interface {
AcceptWindowDescription(g WindowDescriptionVisitor) any
}
func WindowDescriptionFromJSON ¶
func WindowDescriptionFromJSON(x []byte) (WindowDescription, error)
type WindowDescriptionUnionJSON ¶
type WindowDescriptionUnionJSON struct { Type string `json:"$type,omitempty"` SessionWindow json.RawMessage `json:"projection.SessionWindow,omitempty"` SlidingWindow json.RawMessage `json:"projection.SlidingWindow,omitempty"` FixedWindow json.RawMessage `json:"projection.FixedWindow,omitempty"` }
type WindowDescriptionVisitor ¶
type WindowDescriptionVisitor interface { VisitSessionWindow(v *SessionWindow) any VisitSlidingWindow(v *SlidingWindow) any VisitFixedWindow(v *FixedWindow) any }
type WindowFlushMode ¶
type WindowFlushMode interface {
AcceptWindowFlushMode(g WindowFlushModeVisitor) any
}
func WindowFlushModeFromJSON ¶
func WindowFlushModeFromJSON(x []byte) (WindowFlushMode, error)
type WindowFlushModeUnionJSON ¶
type WindowFlushModeUnionJSON struct { Type string `json:"$type,omitempty"` Accumulate json.RawMessage `json:"projection.Accumulate,omitempty"` Discard json.RawMessage `json:"projection.Discard,omitempty"` AccumulatingAndRetracting json.RawMessage `json:"projection.AccumulatingAndRetracting,omitempty"` }
type WindowFlushModeVisitor ¶
type WindowFlushModeVisitor interface { VisitAccumulate(v *Accumulate) any VisitDiscard(v *Discard) any VisitAccumulatingAndRetracting(v *AccumulatingAndRetracting) any }
type WindowTrigger ¶
type WindowTrigger struct {
// contains filtered or unexported fields
}
func NewWindowTrigger ¶
func NewWindowTrigger(w *Window, td TriggerDescription) *WindowTrigger
func (*WindowTrigger) ReceiveEvent ¶
func (wt *WindowTrigger) ReceiveEvent(triggerType TriggerType)
func (*WindowTrigger) Reset ¶
func (wt *WindowTrigger) Reset()
func (*WindowTrigger) ShouldTrigger ¶
func (wt *WindowTrigger) ShouldTrigger() bool
Source Files
¶
- dag_builder.go
- example_data.go
- example_data_serde_gen.go
- example_data_shape_gen.go
- handler_avg.go
- handler_count.go
- handler_filter.go
- handler_generate.go
- handler_join.go
- handler_log.go
- handler_map.go
- handler_merge.go
- handler_simple.go
- handler_time_or_size_buffer.go
- interprete_in_memory_two.go
- interpreter_in_memory.go
- projection.go
- projection_serde_gen.go
- projection_shape_gen.go
- projection_stats_collector.go
- projection_union_gen.go
- pub_sub.go
- pub_sub_chan.go
- pub_sub_key_partitioned.go
- pub_sub_multi.go
- pub_sub_single.go
- registry_funcion.go
- sink_repository.go
- triggering.go
- triggering_shape_gen.go
- triggering_union_gen.go
- utils.go
- windowing.go
- windowing_match_evaluatetrigger.go
- windowing_shape_gen.go
- windowing_union_gen.go