Documentation ¶
Index ¶
- Constants
- func Adder[T Added](sum T) (func(add any), func() T)
- type AccumulateEvictor
- type Added
- type CalResultHandle
- type CounterTrigger
- type DU
- type Evictor
- type FixedWindow
- type GlobalWindow
- type Operator
- type Processor
- func (p *Processor) Build() (*Processor, chan<- DU, <-chan DU)
- func (p *Processor) Evictor(evictor Evictor) *Processor
- func (p *Processor) Operator(operator Operator) *Processor
- func (p *Processor) PopResult(fn CalResultHandle)
- func (p *Processor) PushData(Data DU)
- func (p *Processor) Start()
- func (p *Processor) Stop()
- func (p *Processor) Trigger(trigger Trigger) *Processor
- func (p *Processor) Window(windows Windows) *Processor
- type RecalculateEvictor
- type SessionWindow
- type SlideWindow
- func (sw *SlideWindow) AssignWindow(data DU) []*windowBase
- func (sw *SlideWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) (createdWindows []*windowBase)
- func (sw *SlideWindow) GetParams() (time.Duration, time.Duration)
- func (sw *SlideWindow) GetWindows() []*windowBase
- type SumOperator
- type TimeTrigger
- type Trigger
- type WindowType
- type Windows
Constants ¶
View Source
const ( EvictorTypeAccumulate = 1 EvictorTypeRecalculate = 2 )
View Source
const ( OperatorDataTypeInt = 0 OperatorDataTypeFloat = 1 OperatorDataTypeString = 2 )
View Source
const ( TriggerTypeCounterTrigger = 1 TriggerTypeTimerTrigger = 2 )
View Source
const (
OperatorTypeSum = 1
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AccumulateEvictor ¶
type AccumulateEvictor struct{}
AccumulateEvictor nothing to do
func (AccumulateEvictor) AfterOperator ¶
func (e AccumulateEvictor) AfterOperator(windows *windowBase, key string)
func (AccumulateEvictor) BeforeOperator ¶
func (e AccumulateEvictor) BeforeOperator(windows *windowBase, key string)
func (AccumulateEvictor) Clone ¶
func (e AccumulateEvictor) Clone() Evictor
type CalResultHandle ¶
type CalResultHandle func(du <-chan DU)
type CounterTrigger ¶
type CounterTrigger struct {
// contains filtered or unexported fields
}
func (CounterTrigger) Clone ¶
func (c CounterTrigger) Clone() Trigger
func (CounterTrigger) GetParams ¶
func (c CounterTrigger) GetParams() int
func (CounterTrigger) OnReady ¶
func (c CounterTrigger) OnReady() <-chan string
func (CounterTrigger) Reset ¶
func (c CounterTrigger) Reset(key string)
func (CounterTrigger) Run ¶
func (c CounterTrigger) Run(ctx context.Context, windowBase *windowBase)
type DU ¶
type DU struct { Key string Value any EventTime time.Time NeedCancelBefore bool // meaning this key before value should be dropped }
DU dataUnit
type FixedWindow ¶
type FixedWindow struct {
// contains filtered or unexported fields
}
func NewFixedWindows ¶
func NewFixedWindows(size time.Duration) *FixedWindow
func (*FixedWindow) AssignWindow ¶
func (fw *FixedWindow) AssignWindow(data DU) []*windowBase
func (*FixedWindow) CreateWindow ¶
func (fw *FixedWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase
func (*FixedWindow) GetParams ¶
func (fw *FixedWindow) GetParams() time.Duration
func (*FixedWindow) GetWindows ¶
func (fw *FixedWindow) GetWindows() []*windowBase
type GlobalWindow ¶
func (*GlobalWindow) AssignWindow ¶
func (gw *GlobalWindow) AssignWindow(data DU) []*windowBase
func (*GlobalWindow) CreateWindow ¶
func (gw *GlobalWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase
func (*GlobalWindow) GetWindows ¶
func (gw *GlobalWindow) GetWindows() []*windowBase
func (GlobalWindow) GroupByKey ¶
type Processor ¶
type Processor struct { ID string // contains filtered or unexported fields }
func BuildProcessor ¶
func BuildProcessor() *Processor
func (*Processor) PopResult ¶
func (p *Processor) PopResult(fn CalResultHandle)
type RecalculateEvictor ¶
type RecalculateEvictor struct{}
RecalculateEvictor remove the old data after operate for next calculate
func (RecalculateEvictor) AfterOperator ¶
func (e RecalculateEvictor) AfterOperator(windows *windowBase, key string)
func (RecalculateEvictor) BeforeOperator ¶
func (e RecalculateEvictor) BeforeOperator(windows *windowBase, key string)
func (RecalculateEvictor) Clone ¶
func (e RecalculateEvictor) Clone() Evictor
type SessionWindow ¶
type SessionWindow struct {
// contains filtered or unexported fields
}
func (*SessionWindow) AssignWindow ¶
func (sw *SessionWindow) AssignWindow(data DU) []*windowBase
func (*SessionWindow) CreateWindow ¶
func (sw *SessionWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase
func (*SessionWindow) GetParams ¶
func (sw *SessionWindow) GetParams() time.Duration
func (*SessionWindow) GetWindows ¶
func (sw *SessionWindow) GetWindows() []*windowBase
type SlideWindow ¶
type SlideWindow struct {
// contains filtered or unexported fields
}
func NewSlideWindow ¶
func NewSlideWindow(size, period time.Duration) *SlideWindow
func (*SlideWindow) AssignWindow ¶
func (sw *SlideWindow) AssignWindow(data DU) []*windowBase
func (*SlideWindow) CreateWindow ¶
func (sw *SlideWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) (createdWindows []*windowBase)
func (*SlideWindow) GetWindows ¶
func (sw *SlideWindow) GetWindows() []*windowBase
type SumOperator ¶
type SumOperator struct {
DataType int32
}
func (SumOperator) Clone ¶
func (s SumOperator) Clone() Operator
func (SumOperator) GetDataType ¶
func (s SumOperator) GetDataType() int32
func (SumOperator) Operate ¶
func (s SumOperator) Operate(DUs []DU) DU
type TimeTrigger ¶
type TimeTrigger struct {
// contains filtered or unexported fields
}
func (TimeTrigger) Clone ¶
func (t TimeTrigger) Clone() Trigger
func (TimeTrigger) GetParams ¶
func (t TimeTrigger) GetParams() time.Duration
func (TimeTrigger) OnReady ¶
func (t TimeTrigger) OnReady() <-chan string
func (TimeTrigger) Reset ¶
func (t TimeTrigger) Reset(string)
func (TimeTrigger) Run ¶
func (t TimeTrigger) Run(ctx context.Context, windowBase *windowBase)
type Trigger ¶
type Trigger interface { OnReady() <-chan string Clone() Trigger Run(ctx context.Context, windowBase *windowBase) Reset(key string) }
func NewCounterTrigger ¶
func NewTimerTrigger ¶
type WindowType ¶
type WindowType int32
const ( WindowTypeGlobal WindowType = iota WindowTypeFixedWindow WindowTypeSlideWindow WindowTypeSessionWindow )
type Windows ¶
type Windows interface { // AssignWindow determine which window the coming data will drop in and return the window AssignWindow(data DU) []*windowBase // CreateWindow create a list empty window for saving data, CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase // GetWindows return windows in processor GetWindows() []*windowBase }
func NewDefaultGlobalWindow ¶
func NewDefaultGlobalWindow() Windows
func NewSessionWindow ¶
Click to show internal directories.
Click to hide internal directories.