Versions in this module Expand all Collapse all v1 v1.24.0 Jan 8, 2024 Changes in this version + var ErrContextDone = errors.New("context is done") + var ErrFinished = errors.New("appendLog is finished") + var ErrHandlerReturnErr = errors.New("handler returned error") + var ErrInterpreterNotInNewState = fmt.Errorf("interpreter is not in new state") + var ErrNoPublisher = errors.New("no appendLog") + var ErrNotFound = errors.New("node not found") + var ErrPublishWithOffset = errors.New("cannot publish message with offset") + var NotFound = errors.New("not found") + func AccumulateDiscardRetractHandlerShape() shape.Shape + func AccumulateShape() shape.Shape + func AccumulateToJSON(x *Accumulate) ([]byte, error) + func AccumulatingAndRetractingShape() shape.Shape + func AccumulatingAndRetractingToJSON(x *AccumulatingAndRetracting) ([]byte, error) + func AllOfShape() shape.Shape + func AllOfToJSON(x *AllOf) ([]byte, error) + func AnyOfShape() shape.Shape + func AnyOfToJSON(x *AnyOf) ([]byte, error) + func AtPeriod1Shape() shape.Shape + func AtPeriod1ToJSON(x *AtPeriod1) ([]byte, error) + func AtPeriodShape() shape.Shape + func AtPeriodToJSON(x *AtPeriod) ([]byte, error) + func AtWatermark1Shape() shape.Shape + func AtWatermark1ToJSON(x *AtWatermark1) ([]byte, error) + func AtWatermarkShape() shape.Shape + func AtWatermarkToJSON(x *AtWatermark) ([]byte, error) + func AtWindowItemSize1Shape() shape.Shape + func AtWindowItemSize1ToJSON(x *AtWindowItemSize1) ([]byte, error) + func AtWindowItemSizeShape() shape.Shape + func AtWindowItemSizeToJSON(x *AtWindowItemSize) ([]byte, error) + func Bool(b bool) *bool + func DefaultContextShape() shape.Shape + func DiscardShape() shape.Shape + func DiscardToJSON(x *Discard) ([]byte, error) + func DoJoinShape() shape.Shape + func DoJoinToJSON(x *DoJoin) ([]byte, error) + func DoLoadShape() shape.Shape + func DoLoadToJSON(x *DoLoad) ([]byte, error) + func DoMapShape() shape.Shape + func DoMapToJSON(x *DoMap) ([]byte, error) + func DoWindowShape() shape.Shape + func DoWindowToJSON(x *DoWindow) ([]byte, error) + func Each(x schema.Schema, f func(value schema.Schema)) + func EvaluateTriggerR0[T0 TriggerDescription, T1 TriggerType](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod), ...) + func EvaluateTriggerR1[T0 TriggerDescription, T1 TriggerType, TOut1 any](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) TOut1, ...) TOut1 + func EvaluateTriggerR2[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2), ...) (TOut1, TOut2) + func EvaluateTriggerR3[T0 TriggerDescription, T1 TriggerType, TOut1 any, TOut2 any, TOut3 any](t0 T0, t1 T1, f0 func(x0 *AtPeriod, x1 *AtPeriod) (TOut1, TOut2, TOut3), ...) (TOut1, TOut2, TOut3) + func EventTimeShape() shape.Shape + func FixedWindowShape() shape.Shape + func FixedWindowToJSON(x *FixedWindow) ([]byte, error) + func GameShape() shape.Shape + func GenerateItemsEvery(start int64, size int, every time.Duration) chan Item + func HashNode(n Node) string + func InMemoryBagOfShape() shape.Shape + func ItemGroupedByKeyShape() shape.Shape + func ItemGroupedByWindowShape() shape.Shape + func ItemShape() shape.Shape + func ItemTypeShape() shape.Shape + func KeyWithNamespace(key string, namespace string) string + func KeyedWindowKey(x *KeyedWindow) string + func KeyedWindowShape() shape.Shape + func MatchNodeR0(x Node, f1 func(x *DoWindow), f2 func(x *DoMap), f3 func(x *DoLoad), ...) + func MatchNodeR1[T0 any](x Node, f1 func(x *DoWindow) T0, f2 func(x *DoMap) T0, f3 func(x *DoLoad) T0, ...) T0 + func MatchNodeR2[T0, T1 any](x Node, f1 func(x *DoWindow) (T0, T1), f2 func(x *DoMap) (T0, T1), ...) (T0, T1) + func MatchNodeR3[T0, T1, T2 any](x Node, f1 func(x *DoWindow) (T0, T1, T2), f2 func(x *DoMap) (T0, T1, T2), ...) (T0, T1, T2) + func MatchTriggerDescriptionR0(x TriggerDescription, f1 func(x *AtPeriod), f2 func(x *AtWindowItemSize), ...) + func MatchTriggerDescriptionR1[T0 any](x TriggerDescription, f1 func(x *AtPeriod) T0, f2 func(x *AtWindowItemSize) T0, ...) T0 + func MatchTriggerDescriptionR2[T0, T1 any](x TriggerDescription, f1 func(x *AtPeriod) (T0, T1), ...) (T0, T1) + func MatchTriggerDescriptionR3[T0, T1, T2 any](x TriggerDescription, f1 func(x *AtPeriod) (T0, T1, T2), ...) (T0, T1, T2) + func MatchTriggerTypeR0(x TriggerType, f1 func(x *AtPeriod1), f2 func(x *AtWindowItemSize1), ...) + func MatchTriggerTypeR1[T0 any](x TriggerType, f1 func(x *AtPeriod1) T0, f2 func(x *AtWindowItemSize1) T0, ...) T0 + func MatchTriggerTypeR2[T0, T1 any](x TriggerType, f1 func(x *AtPeriod1) (T0, T1), ...) (T0, T1) + func MatchTriggerTypeR3[T0, T1, T2 any](x TriggerType, f1 func(x *AtPeriod1) (T0, T1, T2), ...) (T0, T1, T2) + func MatchWindowDescriptionR0(x WindowDescription, f1 func(x *SessionWindow), f2 func(x *SlidingWindow), ...) + func MatchWindowDescriptionR1[T0 any](x WindowDescription, f1 func(x *SessionWindow) T0, ...) T0 + func MatchWindowDescriptionR2[T0, T1 any](x WindowDescription, f1 func(x *SessionWindow) (T0, T1), ...) (T0, T1) + func MatchWindowDescriptionR3[T0, T1, T2 any](x WindowDescription, f1 func(x *SessionWindow) (T0, T1, T2), ...) (T0, T1, T2) + func MatchWindowFlushModeR0(x WindowFlushMode, f1 func(x *Accumulate), f2 func(x *Discard), ...) + func MatchWindowFlushModeR1[T0 any](x WindowFlushMode, f1 func(x *Accumulate) T0, f2 func(x *Discard) T0, ...) T0 + func MatchWindowFlushModeR2[T0, T1 any](x WindowFlushMode, f1 func(x *Accumulate) (T0, T1), ...) (T0, T1) + func MatchWindowFlushModeR3[T0, T1, T2 any](x WindowFlushMode, f1 func(x *Accumulate) (T0, T1, T2), ...) (T0, T1, T2) + func MessageShape() shape.Shape + func NewStatsCollector() *statsCollector + func NodeShape() shape.Shape + func NodeToJSON(x Node) ([]byte, error) + func NodeToString(node Node) string + func PackRetractAndAggregate(x, y schema.Schema) *schema.Map + func SessionWindowShape() shape.Shape + func SessionWindowToJSON(x *SessionWindow) ([]byte, error) + func SessionsStatsShape() shape.Shape + func SlidingWindowShape() shape.Shape + func SlidingWindowToJSON(x *SlidingWindow) ([]byte, error) + func StatsShape() shape.Shape + func TickersShape() shape.Shape + func ToMermaidGraph(dag *DAGBuilder) string + func ToMermaidGraphWithOrder(dag *DAGBuilder, order []Node) string + func ToStr(x Node) string + func ToStrItem(item *Item) string + func TriggerDescriptionShape() shape.Shape + func TriggerDescriptionToJSON(x TriggerDescription) ([]byte, error) + func TriggerHandlerShape() shape.Shape + func TriggerManagerShape() shape.Shape + func TriggerStateShape() shape.Shape + func TriggerTypeShape() shape.Shape + func TriggerTypeToJSON(x TriggerType) ([]byte, error) + func WindowBufferShape() shape.Shape + func WindowDescriptionShape() shape.Shape + func WindowDescriptionToJSON(x WindowDescription) ([]byte, error) + func WindowFlushModeShape() shape.Shape + func WindowFlushModeToJSON(x WindowFlushMode) ([]byte, error) + func WindowKey(window *Window) string + func WindowShape() shape.Shape + func WindowTriggerShape() shape.Shape + type Accumulate struct + AllowLateArrival time.Duration + func AccumulateFromJSON(x []byte) (*Accumulate, error) + func (r *Accumulate) AcceptWindowFlushMode(v WindowFlushModeVisitor) any + func (r *Accumulate) MarshalJSON() ([]byte, error) + func (r *Accumulate) UnmarshalJSON(data []byte) error + type AccumulateDiscardRetractHandler struct + func (a *AccumulateDiscardRetractHandler) Process(x Item, returning func(Item)) error + func (a *AccumulateDiscardRetractHandler) Retract(x Item, returning func(Item)) error + type AccumulatingAndRetracting struct + AllowLateArrival time.Duration + func AccumulatingAndRetractingFromJSON(x []byte) (*AccumulatingAndRetracting, error) + func (r *AccumulatingAndRetracting) AcceptWindowFlushMode(v WindowFlushModeVisitor) any + func (r *AccumulatingAndRetracting) MarshalJSON() ([]byte, error) + func (r *AccumulatingAndRetracting) UnmarshalJSON(data []byte) error + type AllOf struct + Triggers []TriggerDescription + func AllOfFromJSON(x []byte) (*AllOf, error) + func (r *AllOf) AcceptTriggerDescription(v TriggerDescriptionVisitor) any + func (r *AllOf) MarshalJSON() ([]byte, error) + func (r *AllOf) UnmarshalJSON(data []byte) error + type AnyOf struct + Triggers []TriggerDescription + func AnyOfFromJSON(x []byte) (*AnyOf, error) + func (r *AnyOf) AcceptTriggerDescription(v TriggerDescriptionVisitor) any + func (r *AnyOf) MarshalJSON() ([]byte, error) + func (r *AnyOf) UnmarshalJSON(data []byte) error + type AtPeriod struct + Duration time.Duration + func AtPeriodFromJSON(x []byte) (*AtPeriod, error) + func (r *AtPeriod) AcceptTriggerDescription(v TriggerDescriptionVisitor) any + func (r *AtPeriod) MarshalJSON() ([]byte, error) + func (r *AtPeriod) UnmarshalJSON(data []byte) error + type AtPeriod1 = AtPeriod + func AtPeriod1FromJSON(x []byte) (*AtPeriod1, error) + func (r *AtPeriod1) AcceptTriggerType(v TriggerTypeVisitor) any + type AtWatermark struct + Timestamp int64 + func AtWatermarkFromJSON(x []byte) (*AtWatermark, error) + func (r *AtWatermark) AcceptTriggerDescription(v TriggerDescriptionVisitor) any + func (r *AtWatermark) MarshalJSON() ([]byte, error) + func (r *AtWatermark) UnmarshalJSON(data []byte) error + type AtWatermark1 = AtWatermark + func AtWatermark1FromJSON(x []byte) (*AtWatermark1, error) + func (r *AtWatermark1) AcceptTriggerType(v TriggerTypeVisitor) any + type AtWindowItemSize struct + Number int + func AtWindowItemSizeFromJSON(x []byte) (*AtWindowItemSize, error) + func (r *AtWindowItemSize) AcceptTriggerDescription(v TriggerDescriptionVisitor) any + func (r *AtWindowItemSize) MarshalJSON() ([]byte, error) + func (r *AtWindowItemSize) UnmarshalJSON(data []byte) error + type AtWindowItemSize1 = AtWindowItemSize + func AtWindowItemSize1FromJSON(x []byte) (*AtWindowItemSize1, error) + func (r *AtWindowItemSize1) AcceptTriggerType(v TriggerTypeVisitor) any + type AvgHandler struct + func (h *AvgHandler) Process(msg Item, returning func(Item)) error + type BagOf interface + Del func(key string) error + Get func(key string) (A, error) + Range func(f func(key string, item A)) + Set func(key string, value A) error + type Builder interface + Build func() []Node + Join func(a, b Builder, opts ...ContextOptionFunc) Builder + Load func(f Handler, opts ...ContextOptionFunc) Builder + Map func(f Handler, opts ...ContextOptionFunc) Builder + Window func(opts ...ContextOptionFunc) Builder + type ContextOptionFunc func(c *DefaultContext) + func WithAccumulate() ContextOptionFunc + func WithAccumulatingAndRetracting() ContextOptionFunc + func WithDiscard() ContextOptionFunc + func WithFixedWindow(width time.Duration) ContextOptionFunc + func WithName(name string) ContextOptionFunc + func WithSessionWindow(gap time.Duration) ContextOptionFunc + func WithSlidingWindow(width time.Duration, period time.Duration) ContextOptionFunc + func WithTriggers(and ...TriggerDescription) ContextOptionFunc + func WithWindowDescription(wd WindowDescription) ContextOptionFunc + func WithWindowFlushMode(fm WindowFlushMode) ContextOptionFunc + type CountHandler struct + func (h *CountHandler) Process(msg Item, returning func(Item)) error + type DAGBuilder struct + func NewDAGBuilder() *DAGBuilder + func (d *DAGBuilder) Build() []Node + func (d *DAGBuilder) GetByName(name string) (*DAGBuilder, error) + func (d *DAGBuilder) Join(a, b Builder, opts ...ContextOptionFunc) Builder + func (d *DAGBuilder) Load(f Handler, opts ...ContextOptionFunc) Builder + func (d *DAGBuilder) Map(f Handler, opts ...ContextOptionFunc) Builder + func (d *DAGBuilder) Window(opts ...ContextOptionFunc) Builder + type DebounceHandler struct + MaxSize int + MaxTime time.Duration + func (t *DebounceHandler) Process(x Item, returning func(Item)) error + func (t *DebounceHandler) Retract(x Item, returning func(Item)) error + type DefaultContext struct + func GetCtx(node Node) *DefaultContext + func NewContextBuilder(builders ...func(config *DefaultContext)) *DefaultContext + func (c *DefaultContext) Name() string + func (c *DefaultContext) Scope(name string) *DefaultContext + type Discard struct + func DiscardFromJSON(x []byte) (*Discard, error) + func (r *Discard) AcceptWindowFlushMode(v WindowFlushModeVisitor) any + func (r *Discard) MarshalJSON() ([]byte, error) + func (r *Discard) UnmarshalJSON(data []byte) error + type DoJoin struct + Ctx *DefaultContext + Input []Node + func DoJoinFromJSON(x []byte) (*DoJoin, error) + func (r *DoJoin) AcceptNode(v NodeVisitor) any + func (r *DoJoin) MarshalJSON() ([]byte, error) + func (r *DoJoin) UnmarshalJSON(data []byte) error + type DoLoad struct + Ctx *DefaultContext + OnLoad Handler + func DoLoadFromJSON(x []byte) (*DoLoad, error) + func (r *DoLoad) AcceptNode(v NodeVisitor) any + func (r *DoLoad) MarshalJSON() ([]byte, error) + func (r *DoLoad) UnmarshalJSON(data []byte) error + type DoMap struct + Ctx *DefaultContext + Input Node + OnMap Handler + func DoMapFromJSON(x []byte) (*DoMap, error) + func (r *DoMap) AcceptNode(v NodeVisitor) any + func (r *DoMap) MarshalJSON() ([]byte, error) + func (r *DoMap) UnmarshalJSON(data []byte) error + type DoWindow struct + Ctx *DefaultContext + Input Node + func DoWindowFromJSON(x []byte) (*DoWindow, error) + func (r *DoWindow) AcceptNode(v NodeVisitor) any + func (r *DoWindow) MarshalJSON() ([]byte, error) + func (r *DoWindow) UnmarshalJSON(data []byte) error + type Dual struct + func NewDual() *Dual + func (d *Dual) IsValid() bool + func (d *Dual) List() []*Message + func (d *Dual) ReturningAggregate(msg Item) + func (d *Dual) ReturningRetract(msg Item) + type EvaluateTrigger interface + MatchAllOfAny func(*AllOf, TriggerType) + MatchAnyOfAny func(*AnyOf, TriggerType) + MatchCount func(*AtWindowItemSize, *AtWindowItemSize) + MatchDefault func(T0, T1) + MatchPeriod func(*AtPeriod, *AtPeriod) + MatchWatermark func(*AtWatermark, *AtWatermark) + type EventTime = int64 + type ExecutionGroup struct + func (g *ExecutionGroup) Go(f func() error) + func (g *ExecutionGroup) Wait() error + type ExecutionStatus int + const ExecutionStatusError + const ExecutionStatusFinished + const ExecutionStatusNew + const ExecutionStatusRunning + type FilterHandler struct + Where *predicate.WherePredicates + func (f *FilterHandler) Process(x Item, returning func(Item)) error + func (f *FilterHandler) Retract(x Item, returning func(Item)) error + type FixedWindow struct + Width time.Duration + func FixedWindowFromJSON(x []byte) (*FixedWindow, error) + func (r *FixedWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any + func (r *FixedWindow) MarshalJSON() ([]byte, error) + func (r *FixedWindow) UnmarshalJSON(data []byte) error + type Game struct + IsDraw bool + Players []string + SessionID string + Winner string + func (r *Game) MarshalJSON() ([]byte, error) + func (r *Game) UnmarshalJSON(data []byte) error + type GenerateHandler struct + Load func(push func(message Item)) error + func (h *GenerateHandler) Process(x Item, returning func(Item)) error + func (h *GenerateHandler) Retract(x Item, returning func(Item)) error + type Handler interface + Process func(x Item, returning func(Item)) error + Retract func(x Item, returning func(Item)) error + func Log(prefix string) Handler + type InMemoryBagOf struct + func NewInMemoryBagOf[A any]() *InMemoryBagOf[A] + func (b *InMemoryBagOf[A]) Del(key string) error + func (b *InMemoryBagOf[A]) Get(key string) (A, error) + func (b *InMemoryBagOf[A]) Range(f func(key string, item A)) + func (b *InMemoryBagOf[A]) Set(key string, value A) error + type InMemoryTwoInterpreter struct + func NewInMemoryTwoInterpreter() *InMemoryTwoInterpreter + func (i *InMemoryTwoInterpreter) Run(ctx context.Context, nodes []Node) error + func (i *InMemoryTwoInterpreter) StatsSnapshotAndReset() Stats + type Item struct + Data schema.Schema + EventTime EventTime + Key string + Type ItemType + Window *Window + func AssignWindows(x []Item, wd WindowDescription) []Item + func DropTimestamps(x []Item) []Item + func ExpandToElements(x []ItemGroupedByWindow) []Item + func ToElement(group *ItemGroupedByWindow) Item + func (r *Item) MarshalJSON() ([]byte, error) + func (r *Item) UnmarshalJSON(data []byte) error + type ItemGroupedByKey struct + Data []Item + Key string + func GroupByKey(x []Item) []ItemGroupedByKey + func MergeWindows(x []ItemGroupedByKey, wd WindowDescription) []ItemGroupedByKey + func (r *ItemGroupedByKey) MarshalJSON() ([]byte, error) + func (r *ItemGroupedByKey) UnmarshalJSON(data []byte) error + type ItemGroupedByWindow struct + Data *schema.List + Key string + Window *Window + func GroupAlsoByWindow(x []ItemGroupedByKey) []ItemGroupedByWindow + func (r *ItemGroupedByWindow) MarshalJSON() ([]byte, error) + func (r *ItemGroupedByWindow) UnmarshalJSON(data []byte) error + type ItemType uint8 + const ItemAggregation + const ItemRetractAndAggregate + type JoinHandler struct + F func(a, b T, returning func(T)) error + func (j *JoinHandler[T]) Process(x Item, returning func(Item)) error + func (j *JoinHandler[T]) Retract(x Item, returning func(Item)) error + type KeyedWindow struct + Key string + Window *Window + func ToKeyedWindowFromGrouped(x *ItemGroupedByWindow) *KeyedWindow + func ToKeyedWindowFromItem(x *Item) *KeyedWindow + type ListAssert struct + Err error + Items []Item + func (l *ListAssert) AssertAt(index int, expected Item) bool + func (l *ListAssert) AssertLen(expected int) bool + func (l *ListAssert) Contains(expected Item) bool + func (l *ListAssert) Returning(msg Item) + type LogHandler struct + func (l *LogHandler) Process(x Item, returning func(Item)) error + func (l *LogHandler) Retract(x Item, returning func(Item)) error + type MapHandler struct + F func(x A, returning func(key string, value B)) error + func (h *MapHandler[A, B]) Process(x Item, returning func(Item)) error + func (h *MapHandler[A, B]) Retract(x Item, returning func(Item)) error + type MergeHandler struct + Combine func(a, b A) (A, error) + DoRetract func(a, b A) (A, error) + func (h *MergeHandler[A]) Process(x Item, returning func(Item)) error + func (h *MergeHandler[A]) Retract(x Item, returning func(Item)) error + type Message struct + Item *Item + Key string + Offset int + Watermark *int64 + type Node interface + AcceptNode func(g NodeVisitor) any + func NodeFromJSON(x []byte) (Node, error) + func ReverseSort(nodes []Node) []Node + func Sort(dag *DAGBuilder) []Node + type NodeUnionJSON struct + DoJoin json.RawMessage + DoLoad json.RawMessage + DoMap json.RawMessage + DoWindow json.RawMessage + Type string + type NodeVisitor interface + VisitDoJoin func(v *DoJoin) any + VisitDoLoad func(v *DoLoad) any + VisitDoMap func(v *DoMap) any + VisitDoWindow func(v *DoWindow) any + type PubSub struct + func NewPubSub[T comparable]() *PubSub[T] + func (p *PubSub[T]) Finish(ctx context.Context, key T) + func (p *PubSub[T]) Publish(ctx context.Context, key T, msg Message) error + func (p *PubSub[T]) Register(key T) error + func (p *PubSub[T]) Subscribe(ctx context.Context, node T, fromOffset int, f func(Message) error) error + type PubSubChan struct + func NewPubSubChan[T any]() *PubSubChan[T] + func (s *PubSubChan[T]) Close() + func (s *PubSubChan[T]) Process() + func (s *PubSubChan[T]) Publish(msg T) error + func (s *PubSubChan[T]) Subscribe(f func(T) error) error + type PubSubForInterpreter interface + Finish func(ctx context.Context, key T) + Publish func(ctx context.Context, key T, msg Message) error + Register func(key T) error + Subscribe func(ctx context.Context, node T, fromOffset int, f func(Message) error) error + type PubSubMulti struct + func NewPubSubMultiChan[T comparable]() *PubSubMulti[T] + func (p *PubSubMulti[T]) Finish(ctx context.Context, key T) + func (p *PubSubMulti[T]) Publish(ctx context.Context, key T, msg Message) error + func (p *PubSubMulti[T]) Register(key T) error + func (p *PubSubMulti[T]) Subscribe(ctx context.Context, key T, fromOffset int, f func(Message) error) error + type PubSubSingle struct + func NewPubSubSingle() *PubSubSingle + func (p *PubSubSingle) Finish() + func (p *PubSubSingle) Publish(ctx context.Context, msg Message) error + func (p *PubSubSingle) Subscribe(ctx context.Context, fromOffset int, f func(Message) error) error + type PubSubSingler interface + Close func() + Process func() + Publish func(msg T) error + Subscribe func(f func(T) error) error + type RepositorySink struct + func NewRepositorySink(recordType string, store schemaless.Repository[schema.Schema]) *RepositorySink + func (s *RepositorySink) FlushOnTime() + func (s *RepositorySink) Process(x Item, returning func(Item)) error + func (s *RepositorySink) Retract(x Item, returning func(Item)) error + type SessionWindow struct + GapDuration time.Duration + func SessionWindowFromJSON(x []byte) (*SessionWindow, error) + func (r *SessionWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any + func (r *SessionWindow) MarshalJSON() ([]byte, error) + func (r *SessionWindow) UnmarshalJSON(data []byte) error + type SessionsStats struct + Draws int + Loose int + Wins int + func (r *SessionsStats) MarshalJSON() ([]byte, error) + func (r *SessionsStats) UnmarshalJSON(data []byte) error + type SimpleProcessHandler struct + P func(x Item, returning func(Item)) error + R func(x Item, returning func(Item)) error + func (s *SimpleProcessHandler) Process(x Item, returning func(Item)) error + func (s *SimpleProcessHandler) Retract(x Item, returning func(Item)) error + type SlidingWindow struct + Period time.Duration + Width time.Duration + func SlidingWindowFromJSON(x []byte) (*SlidingWindow, error) + func (r *SlidingWindow) AcceptWindowDescription(v WindowDescriptionVisitor) any + func (r *SlidingWindow) MarshalJSON() ([]byte, error) + func (r *SlidingWindow) UnmarshalJSON(data []byte) error + type Stats = map[string]int + type StatsCollector interface + Incr func(key string, increment int) + Snapshot func() Stats + type Tickers struct + func NewTimeTicker() *Tickers + func (t *Tickers) Register(td TriggerDescription, ts TimeSignaler) + func (t *Tickers) Unregister(td TriggerDescription) + type TimeSignaler interface + SignalDuration func(duration time.Duration) + type TriggerDescription interface + AcceptTriggerDescription func(g TriggerDescriptionVisitor) any + func TriggerDescriptionFromJSON(x []byte) (TriggerDescription, error) + type TriggerDescriptionUnionJSON struct + AllOf json.RawMessage + AnyOf json.RawMessage + AtPeriod json.RawMessage + AtWatermark json.RawMessage + AtWindowItemSize json.RawMessage + Type string + type TriggerDescriptionVisitor interface + VisitAllOf func(v *AllOf) any + VisitAnyOf func(v *AnyOf) any + VisitAtPeriod func(v *AtPeriod) any + VisitAtWatermark func(v *AtWatermark) any + VisitAtWindowItemSize func(v *AtWindowItemSize) any + type TriggerHandler struct + func (tm *TriggerHandler) Process(x Item, returning func(Item)) error + func (tm *TriggerHandler) Retract(x Item, returning func(Item)) error + func (tm *TriggerHandler) Triggered(trigger TriggerType, returning func(Item)) error + type TriggerManager struct + func NewTriggerManager(td TriggerDescription) *TriggerManager + func (tm *TriggerManager) SignalDuration(duration time.Duration) + func (tm *TriggerManager) SignalWatermark(timestamp int64) + func (tm *TriggerManager) SignalWindowCreated(kw *KeyedWindow) + func (tm *TriggerManager) SignalWindowDeleted(kw *KeyedWindow) + func (tm *TriggerManager) SignalWindowSizeReached(kw *KeyedWindow, size int) + func (tm *TriggerManager) WhenTrigger(f func(w *KeyedWindow)) + type TriggerState struct + type TriggerType interface + AcceptTriggerType func(g TriggerTypeVisitor) any + func TriggerTypeFromJSON(x []byte) (TriggerType, error) + type TriggerTypeUnionJSON struct + AtPeriod1 json.RawMessage + AtWatermark1 json.RawMessage + AtWindowItemSize1 json.RawMessage + Type string + type TriggerTypeVisitor interface + VisitAtPeriod1 func(v *AtPeriod1) any + VisitAtWatermark1 func(v *AtWatermark1) any + VisitAtWindowItemSize1 func(v *AtWindowItemSize1) any + type WatermarkSignaler interface + SignalWatermark func(timestamp int64) + type Window struct + End int64 + Start int64 + type WindowBuffer struct + func NewWindowBuffer(wd WindowDescription, sig WindowBufferSignaler) *WindowBuffer + func (wb *WindowBuffer) Append(x Item) + func (wb *WindowBuffer) EachItemGroupedByWindow(f func(group *ItemGroupedByWindow)) + func (wb *WindowBuffer) EachKeyedWindow(kw *KeyedWindow, f func(group *ItemGroupedByWindow)) + func (wb *WindowBuffer) GroupAlsoByWindow(x []ItemGroupedByKey) + func (wb *WindowBuffer) RemoveItemGropedByWindow(item *ItemGroupedByWindow) + type WindowBufferSignaler interface + SignalWindowCreated func(kw *KeyedWindow) + SignalWindowDeleted func(kw *KeyedWindow) + SignalWindowSizeReached func(kw *KeyedWindow, size int) + type WindowDescription interface + AcceptWindowDescription func(g WindowDescriptionVisitor) any + func WindowDescriptionFromJSON(x []byte) (WindowDescription, error) + type WindowDescriptionUnionJSON struct + FixedWindow json.RawMessage + SessionWindow json.RawMessage + SlidingWindow json.RawMessage + Type string + type WindowDescriptionVisitor interface + VisitFixedWindow func(v *FixedWindow) any + VisitSessionWindow func(v *SessionWindow) any + VisitSlidingWindow func(v *SlidingWindow) any + type WindowFlushMode interface + AcceptWindowFlushMode func(g WindowFlushModeVisitor) any + func WindowFlushModeFromJSON(x []byte) (WindowFlushMode, error) + type WindowFlushModeUnionJSON struct + Accumulate json.RawMessage + AccumulatingAndRetracting json.RawMessage + Discard json.RawMessage + Type string + type WindowFlushModeVisitor interface + VisitAccumulate func(v *Accumulate) any + VisitAccumulatingAndRetracting func(v *AccumulatingAndRetracting) any + VisitDiscard func(v *Discard) any + type WindowTrigger struct + func NewWindowTrigger(w *Window, td TriggerDescription) *WindowTrigger + func (wt *WindowTrigger) ReceiveEvent(triggerType TriggerType) + func (wt *WindowTrigger) Reset() + func (wt *WindowTrigger) ShouldTrigger() bool