Documentation ¶
Index ¶
- Constants
- Variables
- func Eval(expr ast.Expr, m Valuer) interface{}
- func ExecFunc(funcName string, f api.Function, args []interface{}, fctx api.FunctionContext) (interface{}, bool)
- func GetDataSource(m kv.KeyValue, name string) (stmt *ast.StreamStmt, err error)
- func GetStatementFromSql(sql string) (stmt *ast.SelectStatement, err error)
- func GetStreams(stmt *ast.SelectStatement) (result []string)
- func HasAggFuncs(node ast.Node) bool
- func IsAggregate(expr ast.Expr) (r bool)
- func IsTextFormat(format string) bool
- func NewAggregateFunctionValuers(p *funcRuntime) (*FunctionValuer, *AggregateFunctionValuer)
- func NewFuncRuntime(ctx api.StreamContext) *funcRuntime
- func NewFunctionValuersForOp(ctx api.StreamContext) (*FunctionValuer, *AggregateFunctionValuer)
- func Validate(stmt *ast.SelectStatement) error
- func WithAggFields(stmt *ast.SelectStatement) bool
- type AffiliateRow
- func (d *AffiliateRow) AliasValue(key string) (interface{}, bool)
- func (d *AffiliateRow) AppendAlias(key string, value interface{}) bool
- func (d *AffiliateRow) Clone() AffiliateRow
- func (d *AffiliateRow) Del(col string)
- func (d *AffiliateRow) IsEmpty() bool
- func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{})
- func (d *AffiliateRow) Pick(cols [][]string) [][]string
- func (d *AffiliateRow) Set(col string, value interface{})
- func (d *AffiliateRow) Value(key, table string) (interface{}, bool)
- type AggregateCallValuer
- type AggregateData
- type AggregateFunctionValuer
- func (v *AggregateFunctionValuer) AliasValue(_ string) (interface{}, bool)
- func (*AggregateFunctionValuer) AppendAlias(string, interface{}) bool
- func (v *AggregateFunctionValuer) Call(name string, funcId int, args []interface{}) (interface{}, bool)
- func (v *AggregateFunctionValuer) FuncValue(key string) (interface{}, bool)
- func (v *AggregateFunctionValuer) GetAllTuples() AggregateData
- func (v *AggregateFunctionValuer) GetSingleCallValuer() CallValuer
- func (v *AggregateFunctionValuer) Meta(_, _ string) (interface{}, bool)
- func (v *AggregateFunctionValuer) SetData(data AggregateData)
- func (v *AggregateFunctionValuer) Value(_, _ string) (interface{}, bool)
- type Alias
- type AliasValuer
- type BracketEvalResult
- type CallValuer
- type Collection
- type CollectionRow
- type ControlTuple
- type EOFTuple
- type EmittedData
- type ErrorSourceTuple
- type Event
- type FuncValuer
- type FunctionValuer
- func (*FunctionValuer) AliasValue(string) (interface{}, bool)
- func (*FunctionValuer) AppendAlias(string, interface{}) bool
- func (fv *FunctionValuer) Call(name string, funcId int, args []interface{}) (interface{}, bool)
- func (*FunctionValuer) Meta(_, _ string) (interface{}, bool)
- func (*FunctionValuer) Value(_, _ string) (interface{}, bool)
- type GroupedTuples
- func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
- func (s *GroupedTuples) All(_ string) (map[string]interface{}, bool)
- func (s *GroupedTuples) Clone() Row
- func (s *GroupedTuples) GetTracerCtx() api.StreamContext
- func (s *GroupedTuples) Meta(key, table string) (interface{}, bool)
- func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, ...)
- func (s *GroupedTuples) SetTracerCtx(ctx api.StreamContext)
- func (s *GroupedTuples) ToMap() map[string]interface{}
- func (s *GroupedTuples) Value(key, table string) (interface{}, bool)
- type GroupedTuplesSet
- func (s *GroupedTuplesSet) Clone() Collection
- func (s *GroupedTuplesSet) Filter(groups []int) Collection
- func (s *GroupedTuplesSet) GetBySrc(_ string) []Row
- func (s *GroupedTuplesSet) GetTracerCtx() api.StreamContext
- func (s *GroupedTuplesSet) GetWindowRange() *WindowRange
- func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error
- func (s *GroupedTuplesSet) Index(i int) Row
- func (s *GroupedTuplesSet) Len() int
- func (s *GroupedTuplesSet) Range(f func(i int, r ReadonlyRow) (bool, error)) error
- func (s *GroupedTuplesSet) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
- func (s *GroupedTuplesSet) RangeSet(f func(i int, r Row) (bool, error)) error
- func (s *GroupedTuplesSet) SetIsAgg(_ bool)
- func (s *GroupedTuplesSet) SetTracerCtx(ctx api.StreamContext)
- func (s *GroupedTuplesSet) Swap(i, j int)
- func (s *GroupedTuplesSet) ToAggMaps() []map[string]any
- func (s *GroupedTuplesSet) ToMaps() []map[string]any
- func (s *GroupedTuplesSet) ToRowMaps() []map[string]any
- type HasTracerCtx
- type JoinTuple
- func (jt *JoinTuple) AddTuple(tuple Row)
- func (jt *JoinTuple) AddTuples(tuples []Row)
- func (jt *JoinTuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
- func (jt *JoinTuple) All(stream string) (map[string]interface{}, bool)
- func (jt *JoinTuple) Clone() Row
- func (jt *JoinTuple) GetEmitter() string
- func (jt *JoinTuple) GetTracerCtx() api.StreamContext
- func (jt *JoinTuple) Meta(key, table string) (interface{}, bool)
- func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, ...)
- func (jt *JoinTuple) SetTracerCtx(ctx api.StreamContext)
- func (jt *JoinTuple) ToMap() map[string]interface{}
- func (jt *JoinTuple) Value(key, table string) (interface{}, bool)
- type JoinTuples
- func (s *JoinTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
- func (s *JoinTuples) All(_ string) (map[string]interface{}, bool)
- func (s *JoinTuples) Clone() Collection
- func (s *JoinTuples) Filter(indexes []int) Collection
- func (s *JoinTuples) GetBySrc(_ string) []Row
- func (s *JoinTuples) GetTracerCtx() api.StreamContext
- func (s *JoinTuples) GetWindowRange() *WindowRange
- func (s *JoinTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error
- func (s *JoinTuples) Index(i int) Row
- func (s *JoinTuples) Len() int
- func (s *JoinTuples) Meta(key, table string) (interface{}, bool)
- func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, ...)
- func (s *JoinTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error
- func (s *JoinTuples) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
- func (s *JoinTuples) RangeSet(f func(i int, r Row) (bool, error)) error
- func (s *JoinTuples) SetIsAgg(_ bool)
- func (s *JoinTuples) SetTracerCtx(ctx api.StreamContext)
- func (s *JoinTuples) Swap(i, j int)
- func (s *JoinTuples) ToAggMaps() []map[string]interface{}
- func (s *JoinTuples) ToMap() map[string]interface{}
- func (s *JoinTuples) ToMaps() []map[string]interface{}
- func (s *JoinTuples) ToRowMaps() []map[string]interface{}
- func (s *JoinTuples) Value(key, table string) (interface{}, bool)
- type Message
- type MetaData
- type Metadata
- type MultiFunc
- type MultiSorter
- type ParseTree
- type Parser
- func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.Window, error)
- func (p *Parser) Parse() (*ast.SelectStatement, error)
- func (p *Parser) ParseCondition() (ast.Expr, error)
- func (p *Parser) ParseCreateStmt() (ast.Statement, error)
- func (p *Parser) ParseExpr() (ast.Expr, error)
- func (p *Parser) ParseJoin(joinType ast.JoinType) (*ast.Join, error)
- func (p *Parser) ParseLimit() (ast.Expr, error)
- func (p *Parser) ParseOver4Window() (ast.Expr, error)
- func (p *Parser) ParseQueries() ([]ast.SelectStatement, error)
- type RawRow
- type RawTuple
- func (r *RawTuple) AllProps() map[string]string
- func (r *RawTuple) DynamicProps(template string) (string, bool)
- func (r *RawTuple) GetTracerCtx() api.StreamContext
- func (r *RawTuple) Meta(key, table string) (any, bool)
- func (r *RawTuple) Raw() []byte
- func (r *RawTuple) Replace(new []byte)
- func (r *RawTuple) SetTracerCtx(ctx api.StreamContext)
- type ReadonlyRow
- type Row
- type Scanner
- func (s *Scanner) Scan() (tok ast.Token, lit string)
- func (s *Scanner) ScanBackquoteIdent() (tok ast.Token, lit string)
- func (s *Scanner) ScanDigit() (tok ast.Token, lit string)
- func (s *Scanner) ScanIdent() (tok ast.Token, lit string)
- func (s *Scanner) ScanNumber(startWithDot bool, isNeg bool) (tok ast.Token, lit string)
- func (s *Scanner) ScanString(isSingle bool) (tok ast.Token, lit string)
- func (s *Scanner) ScanWhiteSpace() (tok ast.Token, lit string)
- type SortingData
- type StreamInfo
- type TransformedTupleList
- func (l *TransformedTupleList) AllProps() map[string]string
- func (l *TransformedTupleList) Clone() *TransformedTupleList
- func (l *TransformedTupleList) DynamicProps(template string) (string, bool)
- func (l *TransformedTupleList) GetTracerCtx() api.StreamContext
- func (l *TransformedTupleList) Len() int
- func (l *TransformedTupleList) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
- func (l *TransformedTupleList) SetTracerCtx(ctx api.StreamContext)
- func (l *TransformedTupleList) ToMaps() []map[string]any
- type Tuple
- func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
- func (t *Tuple) All(string) (map[string]any, bool)
- func (t *Tuple) AllMeta() map[string]any
- func (t *Tuple) AllProps() map[string]string
- func (t *Tuple) Clone() Row
- func (t *Tuple) Created() time.Time
- func (t *Tuple) DynamicProps(template string) (string, bool)
- func (t *Tuple) FuncValue(key string) (interface{}, bool)
- func (t *Tuple) GetEmitter() string
- func (t *Tuple) GetTimestamp() time.Time
- func (t *Tuple) GetTracerCtx() api.StreamContext
- func (t *Tuple) IsWatermark() bool
- func (t *Tuple) Meta(key, table string) (interface{}, bool)
- func (t *Tuple) MetaData() Metadata
- func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, ...)
- func (t *Tuple) SetTracerCtx(ctx api.StreamContext)
- func (t *Tuple) ToMap() map[string]interface{}
- func (t *Tuple) Value(key, table string) (interface{}, bool)
- type Valuer
- type ValuerEval
- type WatermarkTuple
- type WildcardValuer
- type Wildcarder
- type WindowRange
- type WindowRangeValuer
- type WindowTuples
- func (w *WindowTuples) AddTuple(tuple Row) *WindowTuples
- func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
- func (w *WindowTuples) All(_ string) (map[string]interface{}, bool)
- func (w *WindowTuples) Clone() Collection
- func (w *WindowTuples) Filter(indexes []int) Collection
- func (w *WindowTuples) GetBySrc(emitter string) []Row
- func (w *WindowTuples) GetTracerCtx() api.StreamContext
- func (w *WindowTuples) GetWindowRange() *WindowRange
- func (w *WindowTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error
- func (w *WindowTuples) Index(index int) Row
- func (w *WindowTuples) Len() int
- func (w *WindowTuples) Meta(key, table string) (interface{}, bool)
- func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, ...)
- func (w *WindowTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error
- func (w *WindowTuples) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
- func (w *WindowTuples) RangeSet(f func(i int, r Row) (bool, error)) error
- func (w *WindowTuples) SetIsAgg(_ bool)
- func (w *WindowTuples) SetTracerCtx(ctx api.StreamContext)
- func (w *WindowTuples) Swap(i, j int)
- func (w *WindowTuples) ToAggMaps() []map[string]interface{}
- func (w *WindowTuples) ToMap() map[string]interface{}
- func (w *WindowTuples) ToMaps() []map[string]interface{}
- func (w *WindowTuples) ToRowMaps() []map[string]interface{}
- func (w *WindowTuples) Value(key, table string) (interface{}, bool)
Constants ¶
const ( DEFAULT_FIELD_NAME_PREFIX string = "kuiper_field_" PRIVATE_PREFIX string = "$$" )
Variables ¶
var ( // ImplicitStateFuncs is a set of functions that read/update global state implicitly. ImplicitStateFuncs = map[string]bool{ "last_hit_time": true, "last_hit_count": true, "last_agg_hit_time": true, "last_agg_hit_count": true, } )
var (
Language = &ParseTree{}
)
var WindowFuncs = map[string]struct{}{
"tumblingwindow": {},
"hoppingwindow": {},
"sessionwindow": {},
"slidingwindow": {},
"countwindow": {},
"dedup_trigger": {},
}
Functions ¶
func GetDataSource ¶
func GetStatementFromSql ¶
func GetStatementFromSql(sql string) (stmt *ast.SelectStatement, err error)
func GetStreams ¶
func GetStreams(stmt *ast.SelectStatement) (result []string)
func HasAggFuncs ¶
func IsAggregate ¶
IsAggregate check if an expression is aggregate with the binding alias info
func IsTextFormat ¶
func NewAggregateFunctionValuers ¶
func NewAggregateFunctionValuers(p *funcRuntime) (*FunctionValuer, *AggregateFunctionValuer)
Should only be called by stream to make sure a single instance for an operation
func NewFuncRuntime ¶
func NewFuncRuntime(ctx api.StreamContext) *funcRuntime
func NewFunctionValuersForOp ¶
func NewFunctionValuersForOp(ctx api.StreamContext) (*FunctionValuer, *AggregateFunctionValuer)
func Validate ¶
func Validate(stmt *ast.SelectStatement) error
Validate select statement without context. This is the pre-validation. In planner, there will be a more comprehensive validation after binding
func WithAggFields ¶
func WithAggFields(stmt *ast.SelectStatement) bool
Types ¶
type AffiliateRow ¶
type AffiliateRow struct { CalCols map[string]interface{} // mutable and must be cloned when broadcast AliasMap map[string]interface{} // contains filtered or unexported fields }
AffiliateRow part of other row types do help calculation of newly added cols
func (*AffiliateRow) AliasValue ¶
func (d *AffiliateRow) AliasValue(key string) (interface{}, bool)
func (*AffiliateRow) AppendAlias ¶
func (d *AffiliateRow) AppendAlias(key string, value interface{}) bool
func (*AffiliateRow) Clone ¶
func (d *AffiliateRow) Clone() AffiliateRow
func (*AffiliateRow) Del ¶
func (d *AffiliateRow) Del(col string)
func (*AffiliateRow) IsEmpty ¶
func (d *AffiliateRow) IsEmpty() bool
func (*AffiliateRow) MergeMap ¶
func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{})
func (*AffiliateRow) Pick ¶
func (d *AffiliateRow) Pick(cols [][]string) [][]string
func (*AffiliateRow) Set ¶
func (d *AffiliateRow) Set(col string, value interface{})
func (*AffiliateRow) Value ¶
func (d *AffiliateRow) Value(key, table string) (interface{}, bool)
type AggregateCallValuer ¶
type AggregateCallValuer interface { CallValuer GetAllTuples() AggregateData GetSingleCallValuer() CallValuer }
type AggregateData ¶
type AggregateData interface {
AggregateEval(expr ast.Expr, v CallValuer) []interface{}
}
AggregateData Could be a tuple or collection
type AggregateFunctionValuer ¶
type AggregateFunctionValuer struct {
// contains filtered or unexported fields
}
func (*AggregateFunctionValuer) AliasValue ¶
func (v *AggregateFunctionValuer) AliasValue(_ string) (interface{}, bool)
func (*AggregateFunctionValuer) AppendAlias ¶
func (*AggregateFunctionValuer) AppendAlias(string, interface{}) bool
func (*AggregateFunctionValuer) Call ¶
func (v *AggregateFunctionValuer) Call(name string, funcId int, args []interface{}) (interface{}, bool)
func (*AggregateFunctionValuer) FuncValue ¶
func (v *AggregateFunctionValuer) FuncValue(key string) (interface{}, bool)
func (*AggregateFunctionValuer) GetAllTuples ¶
func (v *AggregateFunctionValuer) GetAllTuples() AggregateData
func (*AggregateFunctionValuer) GetSingleCallValuer ¶
func (v *AggregateFunctionValuer) GetSingleCallValuer() CallValuer
func (*AggregateFunctionValuer) Meta ¶
func (v *AggregateFunctionValuer) Meta(_, _ string) (interface{}, bool)
func (*AggregateFunctionValuer) SetData ¶
func (v *AggregateFunctionValuer) SetData(data AggregateData)
func (*AggregateFunctionValuer) Value ¶
func (v *AggregateFunctionValuer) Value(_, _ string) (interface{}, bool)
type Alias ¶
type Alias struct {
AliasMap map[string]interface{}
}
Alias will not need to convert cases
type AliasValuer ¶
type AliasValuer interface { // AliasValue Get the value of alias AliasValue(name string) (interface{}, bool) // AppendAlias set the alias result AppendAlias(key string, value interface{}) bool }
AliasValuer is used to calculate and cache the alias value
type BracketEvalResult ¶
type BracketEvalResult struct {
Start, End int
}
type CallValuer ¶
type CallValuer interface { Valuer // Call is invoked to evaluate a function call (if possible). Call(name string, funcId int, args []interface{}) (interface{}, bool) }
CallValuer implements the Call method for evaluating function calls.
type Collection ¶
type Collection interface { HasTracerCtx api.MessageTupleList SortingData // GroupRange through each group. For non-grouped collection, the whole data is a single group GroupRange(func(i int, aggRow CollectionRow) (bool, error)) error // Range through each row. For grouped collection, each row is an aggregation of groups Range(func(i int, r ReadonlyRow) (bool, error)) error // RangeSet range through each row by cloning the row RangeSet(func(i int, r Row) (bool, error)) error Filter(indexes []int) Collection GetWindowRange() *WindowRange // ToMaps returns the data as a map ToMaps() []map[string]interface{} // SetIsAgg Set by project, indicate if the collection is used in an aggregate context which will affect ToMaps output SetIsAgg(isAgg bool) // ToAggMaps returns the aggregated data as a map ToAggMaps() []map[string]interface{} // ToRowMaps returns all the data in the collection ToRowMaps() []map[string]interface{} // GetBySrc returns the rows by the given emitter GetBySrc(emitter string) []Row // Clone the collection Clone() Collection }
Collection A collection of rows as a table. It is used for window, join, group by, etc.
type CollectionRow ¶
type CollectionRow interface { RawRow AggregateData }
CollectionRow is the aggregation row of a non-grouped collection. Thinks of it as a single group. The row data is immutable
type ControlTuple ¶
type ControlTuple interface {
ControlType() string
}
type EmittedData ¶
type EmittedData interface { // GetEmitter returns the emitter of the row GetEmitter() string }
EmittedData is data that is produced by a specific source
type ErrorSourceTuple ¶
type ErrorSourceTuple struct {
Error error `json:"error"`
}
func (*ErrorSourceTuple) Message ¶
func (t *ErrorSourceTuple) Message() map[string]interface{}
func (*ErrorSourceTuple) Meta ¶
func (t *ErrorSourceTuple) Meta() map[string]interface{}
func (*ErrorSourceTuple) Timestamp ¶
func (t *ErrorSourceTuple) Timestamp() time.Time
type FuncValuer ¶
FuncValuer can calculate function type value like window_start and window_end
type FunctionValuer ¶
type FunctionValuer struct {
// contains filtered or unexported fields
}
FunctionValuer ONLY use NewFunctionValuer function to initialize
func NewFunctionValuer ¶
func NewFunctionValuer(p *funcRuntime) *FunctionValuer
Should only be called by stream to make sure a single instance for an operation
func (*FunctionValuer) AliasValue ¶
func (*FunctionValuer) AliasValue(string) (interface{}, bool)
func (*FunctionValuer) AppendAlias ¶
func (*FunctionValuer) AppendAlias(string, interface{}) bool
func (*FunctionValuer) Call ¶
func (fv *FunctionValuer) Call(name string, funcId int, args []interface{}) (interface{}, bool)
func (*FunctionValuer) Meta ¶
func (*FunctionValuer) Meta(_, _ string) (interface{}, bool)
func (*FunctionValuer) Value ¶
func (*FunctionValuer) Value(_, _ string) (interface{}, bool)
type GroupedTuples ¶
type GroupedTuples struct { Ctx api.StreamContext Content []Row *WindowRange AffiliateRow // contains filtered or unexported fields }
GroupedTuples is a collection of tuples grouped by a key
func (*GroupedTuples) AggregateEval ¶
func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
func (*GroupedTuples) Clone ¶
func (s *GroupedTuples) Clone() Row
func (*GroupedTuples) GetTracerCtx ¶
func (s *GroupedTuples) GetTracerCtx() api.StreamContext
func (*GroupedTuples) Meta ¶
func (s *GroupedTuples) Meta(key, table string) (interface{}, bool)
func (*GroupedTuples) SetTracerCtx ¶
func (s *GroupedTuples) SetTracerCtx(ctx api.StreamContext)
func (*GroupedTuples) ToMap ¶
func (s *GroupedTuples) ToMap() map[string]interface{}
func (*GroupedTuples) Value ¶
func (s *GroupedTuples) Value(key, table string) (interface{}, bool)
type GroupedTuplesSet ¶
type GroupedTuplesSet struct { Ctx api.StreamContext Groups []*GroupedTuples *WindowRange }
func (*GroupedTuplesSet) Clone ¶
func (s *GroupedTuplesSet) Clone() Collection
func (*GroupedTuplesSet) Filter ¶
func (s *GroupedTuplesSet) Filter(groups []int) Collection
Filter clone and return the filtered set
func (*GroupedTuplesSet) GetBySrc ¶
func (s *GroupedTuplesSet) GetBySrc(_ string) []Row
GetBySrc to be implemented to support join after join
func (*GroupedTuplesSet) GetTracerCtx ¶
func (s *GroupedTuplesSet) GetTracerCtx() api.StreamContext
func (*GroupedTuplesSet) GetWindowRange ¶
func (s *GroupedTuplesSet) GetWindowRange() *WindowRange
func (*GroupedTuplesSet) GroupRange ¶
func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error
func (*GroupedTuplesSet) Index ¶
func (s *GroupedTuplesSet) Index(i int) Row
func (*GroupedTuplesSet) Len ¶
func (s *GroupedTuplesSet) Len() int
func (*GroupedTuplesSet) Range ¶
func (s *GroupedTuplesSet) Range(f func(i int, r ReadonlyRow) (bool, error)) error
func (*GroupedTuplesSet) RangeOfTuples ¶
func (s *GroupedTuplesSet) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
func (*GroupedTuplesSet) SetIsAgg ¶
func (s *GroupedTuplesSet) SetIsAgg(_ bool)
func (*GroupedTuplesSet) SetTracerCtx ¶
func (s *GroupedTuplesSet) SetTracerCtx(ctx api.StreamContext)
func (*GroupedTuplesSet) Swap ¶
func (s *GroupedTuplesSet) Swap(i, j int)
func (*GroupedTuplesSet) ToAggMaps ¶
func (s *GroupedTuplesSet) ToAggMaps() []map[string]any
func (*GroupedTuplesSet) ToMaps ¶
func (s *GroupedTuplesSet) ToMaps() []map[string]any
func (*GroupedTuplesSet) ToRowMaps ¶
func (s *GroupedTuplesSet) ToRowMaps() []map[string]any
type HasTracerCtx ¶
type HasTracerCtx interface { GetTracerCtx() api.StreamContext SetTracerCtx(ctx api.StreamContext) }
type JoinTuple ¶
type JoinTuple struct { Ctx api.StreamContext Tuples []Row // The content is immutable, but the slice may be added or removed AffiliateRow // contains filtered or unexported fields }
JoinTuple is a row produced by a join operation
func (*JoinTuple) AggregateEval ¶
func (jt *JoinTuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
func (*JoinTuple) GetEmitter ¶
func (*JoinTuple) GetTracerCtx ¶
func (jt *JoinTuple) GetTracerCtx() api.StreamContext
func (*JoinTuple) SetTracerCtx ¶
func (jt *JoinTuple) SetTracerCtx(ctx api.StreamContext)
type JoinTuples ¶
type JoinTuples struct { Ctx api.StreamContext Content []*JoinTuple *WindowRange AffiliateRow // contains filtered or unexported fields }
func (*JoinTuples) AggregateEval ¶
func (s *JoinTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
func (*JoinTuples) Clone ¶
func (s *JoinTuples) Clone() Collection
func (*JoinTuples) Filter ¶
func (s *JoinTuples) Filter(indexes []int) Collection
Filter the tuples by the given predicate
func (*JoinTuples) GetBySrc ¶
func (s *JoinTuples) GetBySrc(_ string) []Row
GetBySrc to be implemented to support join after join
func (*JoinTuples) GetTracerCtx ¶
func (s *JoinTuples) GetTracerCtx() api.StreamContext
func (*JoinTuples) GetWindowRange ¶
func (s *JoinTuples) GetWindowRange() *WindowRange
func (*JoinTuples) GroupRange ¶
func (s *JoinTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error
func (*JoinTuples) Index ¶
func (s *JoinTuples) Index(i int) Row
func (*JoinTuples) Len ¶
func (s *JoinTuples) Len() int
func (*JoinTuples) Meta ¶
func (s *JoinTuples) Meta(key, table string) (interface{}, bool)
func (*JoinTuples) Range ¶
func (s *JoinTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error
func (*JoinTuples) RangeOfTuples ¶
func (s *JoinTuples) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
func (*JoinTuples) SetIsAgg ¶
func (s *JoinTuples) SetIsAgg(_ bool)
func (*JoinTuples) SetTracerCtx ¶
func (s *JoinTuples) SetTracerCtx(ctx api.StreamContext)
func (*JoinTuples) Swap ¶
func (s *JoinTuples) Swap(i, j int)
func (*JoinTuples) ToAggMaps ¶
func (s *JoinTuples) ToAggMaps() []map[string]interface{}
func (*JoinTuples) ToMap ¶
func (s *JoinTuples) ToMap() map[string]interface{}
func (*JoinTuples) ToMaps ¶
func (s *JoinTuples) ToMaps() []map[string]interface{}
func (*JoinTuples) ToRowMaps ¶
func (s *JoinTuples) ToRowMaps() []map[string]interface{}
func (*JoinTuples) Value ¶
func (s *JoinTuples) Value(key, table string) (interface{}, bool)
type Message ¶
type Message map[string]interface{}
Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.
type MultiFunc ¶
type MultiFunc interface { ValidateWithName(args []ast.Expr, name string) error ExecWithName(args []interface{}, ctx api.FunctionContext, name string) (interface{}, bool) }
MultiFunc hack for builtin functions that works for multiple functions
type MultiSorter ¶
type MultiSorter struct { Ctx api.StreamContext SortingData // contains filtered or unexported fields }
MultiSorter implements the Sort interface, sorting the changes within.
func OrderedBy ¶
func OrderedBy(fields ast.SortFields, fv *FunctionValuer, afv *AggregateFunctionValuer) *MultiSorter
OrderedBy returns a Sorter that sorts using the less functions, in order. Call its Sort method to sort the data.
func (*MultiSorter) GetTracerCtx ¶
func (ms *MultiSorter) GetTracerCtx() api.StreamContext
func (*MultiSorter) Less ¶
func (ms *MultiSorter) Less(i, j int) bool
Less is part of sort.Interface. It is implemented by looping along the less functions until it finds a comparison that discriminates between the two items (one is less than the other). Note that it can call the less functions twice per call. We could change the functions to return -1, 0, 1 and reduce the number of calls for greater efficiency: an exercise for the reader.
func (*MultiSorter) SetTracerCtx ¶
func (ms *MultiSorter) SetTracerCtx(ctx api.StreamContext)
func (*MultiSorter) Sort ¶
func (ms *MultiSorter) Sort(data SortingData) error
Sort sorts the argument slice according to the fewer functions passed to OrderedBy.
func (*MultiSorter) Swap ¶
func (ms *MultiSorter) Swap(i, j int)
type ParseTree ¶
type ParseTree struct { Handlers map[string]func(*Parser) (ast.Statement, error) Tokens map[string]*ParseTree Keys []string }
type Parser ¶
type Parser struct {
// contains filtered or unexported fields
}
func (*Parser) ConvertToWindows ¶
func (*Parser) ParseQueries ¶
func (p *Parser) ParseQueries() ([]ast.SelectStatement, error)
type RawRow ¶
type RawRow interface { ReadonlyRow // Del Only for some ops like functionOp * and Alias Del(col string) // Set Only for some ops like functionOp * Set(col string, value interface{}) // ToMap converts the row to a map to export to other systems * ToMap() map[string]interface{} // Pick the columns and discard others. It replaces the underlying message with a new value. There are 3 types to pick: column, alias and anonymous expressions. // cols is a list [columnname, tablename] Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string) }
RawRow is the basic data type for logical row. It could be a row or a collection row.
type RawTuple ¶
type RawTuple struct { Ctx api.StreamContext Emitter string Timestamp time.Time Rawdata []byte Metadata Metadata // immutable Props map[string]string }
func (*RawTuple) GetTracerCtx ¶
func (r *RawTuple) GetTracerCtx() api.StreamContext
func (*RawTuple) SetTracerCtx ¶
func (r *RawTuple) SetTracerCtx(ctx api.StreamContext)
type ReadonlyRow ¶
type ReadonlyRow interface { HasTracerCtx Valuer AliasValuer Wildcarder }
type Scanner ¶
type Scanner struct {
// contains filtered or unexported fields
}
func NewScanner ¶
func (*Scanner) ScanBackquoteIdent ¶
func (*Scanner) ScanNumber ¶
func (*Scanner) ScanString ¶
type SortingData ¶
type SortingData interface { HasTracerCtx Len() int Swap(i, j int) Index(i int) Row }
type StreamInfo ¶
type StreamInfo struct { StreamType ast.StreamType `json:"streamType"` StreamKind string `json:"streamKind"` Statement string `json:"statement"` }
func GetDataSourceStatement ¶
func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error)
type TransformedTupleList ¶
type TransformedTupleList struct { Ctx api.StreamContext Content []api.MessageTuple Maps []map[string]any Props map[string]string }
func (*TransformedTupleList) AllProps ¶
func (l *TransformedTupleList) AllProps() map[string]string
func (*TransformedTupleList) Clone ¶
func (l *TransformedTupleList) Clone() *TransformedTupleList
func (*TransformedTupleList) DynamicProps ¶
func (l *TransformedTupleList) DynamicProps(template string) (string, bool)
func (*TransformedTupleList) GetTracerCtx ¶
func (l *TransformedTupleList) GetTracerCtx() api.StreamContext
func (*TransformedTupleList) Len ¶
func (l *TransformedTupleList) Len() int
func (*TransformedTupleList) RangeOfTuples ¶
func (l *TransformedTupleList) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
func (*TransformedTupleList) SetTracerCtx ¶
func (l *TransformedTupleList) SetTracerCtx(ctx api.StreamContext)
func (*TransformedTupleList) ToMaps ¶
func (l *TransformedTupleList) ToMaps() []map[string]any
type Tuple ¶
type Tuple struct { Ctx api.StreamContext Emitter string Message Message // the original pointer is immutable & big; may be cloned. Timestamp time.Time Metadata Metadata // immutable Props map[string]string AffiliateRow // contains filtered or unexported fields }
Tuple The input row, produced by the source
func (*Tuple) AggregateEval ¶
func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
func (*Tuple) GetEmitter ¶
func (*Tuple) GetTimestamp ¶
func (*Tuple) GetTracerCtx ¶
func (t *Tuple) GetTracerCtx() api.StreamContext
func (*Tuple) IsWatermark ¶
func (*Tuple) SetTracerCtx ¶
func (t *Tuple) SetTracerCtx(ctx api.StreamContext)
type Valuer ¶
type Valuer interface { // Value returns the value and existence flag for a given key. Value(key, table string) (interface{}, bool) Meta(key, table string) (interface{}, bool) }
Valuer is the interface that wraps the Value() method.
func MultiAggregateValuer ¶
func MultiAggregateValuer(data AggregateData, singleCallValuer CallValuer, valuers ...Valuer) Valuer
func MultiValuer ¶
MultiValuer returns a Valuer that iterates over multiple Valuer instances to find a match.
type ValuerEval ¶
type ValuerEval struct { Valuer Valuer // IntegerFloatDivision will set the eval system to treat // a division between two integers as a floating point division. IntegerFloatDivision bool }
ValuerEval will evaluate an expression using the Valuer.
func (*ValuerEval) Eval ¶
func (v *ValuerEval) Eval(expr ast.Expr) interface{}
Eval evaluates an expression and returns a value. map the expression to the correct valuer
type WatermarkTuple ¶
func (*WatermarkTuple) GetTimestamp ¶
func (t *WatermarkTuple) GetTimestamp() time.Time
func (*WatermarkTuple) IsWatermark ¶
func (t *WatermarkTuple) IsWatermark() bool
type WildcardValuer ¶
type WildcardValuer struct {
Data Wildcarder
}
func (*WildcardValuer) Meta ¶
func (wv *WildcardValuer) Meta(_, _ string) (interface{}, bool)
func (*WildcardValuer) Value ¶
func (wv *WildcardValuer) Value(key, table string) (interface{}, bool)
type Wildcarder ¶
type WindowRange ¶
type WindowRange struct {
// contains filtered or unexported fields
}
func NewWindowRange ¶
func NewWindowRange(windowStart int64, windowEnd int64) *WindowRange
func (*WindowRange) FuncValue ¶
func (r *WindowRange) FuncValue(key string) (interface{}, bool)
type WindowRangeValuer ¶
type WindowRangeValuer struct {
*WindowRange
}
func (WindowRangeValuer) Meta ¶
func (w WindowRangeValuer) Meta(_, _ string) (interface{}, bool)
func (WindowRangeValuer) Value ¶
func (w WindowRangeValuer) Value(_, _ string) (interface{}, bool)
type WindowTuples ¶
type WindowTuples struct { Ctx api.StreamContext Content []Row // immutable *WindowRange AffiliateRow // contains filtered or unexported fields }
func (*WindowTuples) AddTuple ¶
func (w *WindowTuples) AddTuple(tuple Row) *WindowTuples
func (*WindowTuples) AggregateEval ¶
func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}
func (*WindowTuples) Clone ¶
func (w *WindowTuples) Clone() Collection
func (*WindowTuples) Filter ¶
func (w *WindowTuples) Filter(indexes []int) Collection
Filter the tuples by the given predicate
func (*WindowTuples) GetBySrc ¶
func (w *WindowTuples) GetBySrc(emitter string) []Row
func (*WindowTuples) GetTracerCtx ¶
func (w *WindowTuples) GetTracerCtx() api.StreamContext
func (*WindowTuples) GetWindowRange ¶
func (w *WindowTuples) GetWindowRange() *WindowRange
func (*WindowTuples) GroupRange ¶
func (w *WindowTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error
func (*WindowTuples) Index ¶
func (w *WindowTuples) Index(index int) Row
func (*WindowTuples) Len ¶
func (w *WindowTuples) Len() int
func (*WindowTuples) Meta ¶
func (w *WindowTuples) Meta(key, table string) (interface{}, bool)
func (*WindowTuples) Range ¶
func (w *WindowTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error
func (*WindowTuples) RangeOfTuples ¶
func (w *WindowTuples) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)
func (*WindowTuples) SetIsAgg ¶
func (w *WindowTuples) SetIsAgg(_ bool)
func (*WindowTuples) SetTracerCtx ¶
func (w *WindowTuples) SetTracerCtx(ctx api.StreamContext)
func (*WindowTuples) Swap ¶
func (w *WindowTuples) Swap(i, j int)
func (*WindowTuples) ToAggMaps ¶
func (w *WindowTuples) ToAggMaps() []map[string]interface{}
func (*WindowTuples) ToMap ¶
func (w *WindowTuples) ToMap() map[string]interface{}
func (*WindowTuples) ToMaps ¶
func (w *WindowTuples) ToMaps() []map[string]interface{}
func (*WindowTuples) ToRowMaps ¶
func (w *WindowTuples) ToRowMaps() []map[string]interface{}
func (*WindowTuples) Value ¶
func (w *WindowTuples) Value(key, table string) (interface{}, bool)