core

package
v0.0.0-...-a4eaa21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EvictorTypeAccumulate  = 1
	EvictorTypeRecalculate = 2
)
View Source
const (
	OperatorDataTypeInt    = 0
	OperatorDataTypeFloat  = 1
	OperatorDataTypeString = 2
)
View Source
const (
	TriggerTypeCounterTrigger = 1
	TriggerTypeTimerTrigger   = 2
)
View Source
const (
	OperatorTypeSum = 1
)

Variables

This section is empty.

Functions

func Adder

func Adder[T Added](sum T) (func(add any), func() T)

Types

type AccumulateEvictor

type AccumulateEvictor struct{}

AccumulateEvictor nothing to do

func (AccumulateEvictor) AfterOperator

func (e AccumulateEvictor) AfterOperator(windows *windowBase, key string)

func (AccumulateEvictor) BeforeOperator

func (e AccumulateEvictor) BeforeOperator(windows *windowBase, key string)

func (AccumulateEvictor) Clone

func (e AccumulateEvictor) Clone() Evictor

type Added

type Added interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr |
		~float32 | ~float64 |
		~string
}

type CalResultHandle

type CalResultHandle func(du <-chan DU)

type CounterTrigger

type CounterTrigger struct {
	// contains filtered or unexported fields
}

func (CounterTrigger) Clone

func (c CounterTrigger) Clone() Trigger

func (CounterTrigger) GetParams

func (c CounterTrigger) GetParams() int

func (CounterTrigger) OnReady

func (c CounterTrigger) OnReady() <-chan string

func (CounterTrigger) Reset

func (c CounterTrigger) Reset(key string)

func (CounterTrigger) Run

func (c CounterTrigger) Run(ctx context.Context, windowBase *windowBase)

type DU

type DU struct {
	Key              string
	Value            any
	EventTime        time.Time
	NeedCancelBefore bool // meaning this key before value should be dropped
}

DU dataUnit

type Evictor

type Evictor interface {
	// BeforeOperator method called before operator run
	BeforeOperator(window *windowBase, key string)
	// AfterOperator method called after operator run
	AfterOperator(window *windowBase, key string)
	// Clone clone
	Clone() Evictor
}

type FixedWindow

type FixedWindow struct {
	// contains filtered or unexported fields
}

func NewFixedWindows

func NewFixedWindows(size time.Duration) *FixedWindow

func (*FixedWindow) AssignWindow

func (fw *FixedWindow) AssignWindow(data DU) []*windowBase

func (*FixedWindow) CreateWindow

func (fw *FixedWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase

func (*FixedWindow) GetParams

func (fw *FixedWindow) GetParams() time.Duration

func (*FixedWindow) GetWindows

func (fw *FixedWindow) GetWindows() []*windowBase

type GlobalWindow

type GlobalWindow struct {
	*sync.Once
	// contains filtered or unexported fields
}

func (*GlobalWindow) AssignWindow

func (gw *GlobalWindow) AssignWindow(data DU) []*windowBase

func (*GlobalWindow) CreateWindow

func (gw *GlobalWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase

func (*GlobalWindow) GetWindows

func (gw *GlobalWindow) GetWindows() []*windowBase

func (GlobalWindow) GroupByKey

func (wb GlobalWindow) GroupByKey(dataList []DU) map[string][]DU

type Operator

type Operator interface {
	// Operate indicate how to calculate window data
	Operate(DUs []DU) DU
	Clone() Operator
	GetDataType() int32
}

type Processor

type Processor struct {
	ID string
	// contains filtered or unexported fields
}

func BuildProcessor

func BuildProcessor() *Processor

func (*Processor) Build

func (p *Processor) Build() (*Processor, chan<- DU, <-chan DU)

func (*Processor) Evictor

func (p *Processor) Evictor(evictor Evictor) *Processor

func (*Processor) Operator

func (p *Processor) Operator(operator Operator) *Processor

func (*Processor) PopResult

func (p *Processor) PopResult(fn CalResultHandle)

func (*Processor) PushData

func (p *Processor) PushData(Data DU)

func (*Processor) Start

func (p *Processor) Start()

func (*Processor) Stop

func (p *Processor) Stop()

func (*Processor) Trigger

func (p *Processor) Trigger(trigger Trigger) *Processor

func (*Processor) Window

func (p *Processor) Window(windows Windows) *Processor

type RecalculateEvictor

type RecalculateEvictor struct{}

RecalculateEvictor remove the old data after operate for next calculate

func (RecalculateEvictor) AfterOperator

func (e RecalculateEvictor) AfterOperator(windows *windowBase, key string)

func (RecalculateEvictor) BeforeOperator

func (e RecalculateEvictor) BeforeOperator(windows *windowBase, key string)

func (RecalculateEvictor) Clone

func (e RecalculateEvictor) Clone() Evictor

type SessionWindow

type SessionWindow struct {
	// contains filtered or unexported fields
}

func (*SessionWindow) AssignWindow

func (sw *SessionWindow) AssignWindow(data DU) []*windowBase

func (*SessionWindow) CreateWindow

func (sw *SessionWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase

func (*SessionWindow) GetParams

func (sw *SessionWindow) GetParams() time.Duration

func (*SessionWindow) GetWindows

func (sw *SessionWindow) GetWindows() []*windowBase

type SlideWindow

type SlideWindow struct {
	// contains filtered or unexported fields
}

func NewSlideWindow

func NewSlideWindow(size, period time.Duration) *SlideWindow

func (*SlideWindow) AssignWindow

func (sw *SlideWindow) AssignWindow(data DU) []*windowBase

func (*SlideWindow) CreateWindow

func (sw *SlideWindow) CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) (createdWindows []*windowBase)

func (*SlideWindow) GetParams

func (sw *SlideWindow) GetParams() (time.Duration, time.Duration)

func (*SlideWindow) GetWindows

func (sw *SlideWindow) GetWindows() []*windowBase

type SumOperator

type SumOperator struct {
	DataType int32
}

func (SumOperator) Clone

func (s SumOperator) Clone() Operator

func (SumOperator) GetDataType

func (s SumOperator) GetDataType() int32

func (SumOperator) Operate

func (s SumOperator) Operate(DUs []DU) DU

type TimeTrigger

type TimeTrigger struct {
	// contains filtered or unexported fields
}

func (TimeTrigger) Clone

func (t TimeTrigger) Clone() Trigger

func (TimeTrigger) GetParams

func (t TimeTrigger) GetParams() time.Duration

func (TimeTrigger) OnReady

func (t TimeTrigger) OnReady() <-chan string

func (TimeTrigger) Reset

func (t TimeTrigger) Reset(string)

func (TimeTrigger) Run

func (t TimeTrigger) Run(ctx context.Context, windowBase *windowBase)

type Trigger

type Trigger interface {
	OnReady() <-chan string
	Clone() Trigger
	Run(ctx context.Context, windowBase *windowBase)
	Reset(key string)
}

func NewCounterTrigger

func NewCounterTrigger(count int) Trigger

func NewTimerTrigger

func NewTimerTrigger(period time.Duration) Trigger

type WindowType

type WindowType int32
const (
	WindowTypeGlobal WindowType = iota
	WindowTypeFixedWindow
	WindowTypeSlideWindow
	WindowTypeSessionWindow
)

type Windows

type Windows interface {
	// AssignWindow determine which window the coming data will drop in and return the window
	AssignWindow(data DU) []*windowBase
	// CreateWindow create a list empty window for saving data,
	CreateWindow(data DU, trigger Trigger, operator Operator, evictor Evictor) []*windowBase
	// GetWindows return windows in processor
	GetWindows() []*windowBase
}

func NewDefaultGlobalWindow

func NewDefaultGlobalWindow() Windows

func NewSessionWindow

func NewSessionWindow(gap time.Duration) Windows

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL