xsql

package
v2.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DEFAULT_FIELD_NAME_PREFIX string = "kuiper_field_"
	PRIVATE_PREFIX            string = "$$"
)

Variables

View Source
var (

	// ImplicitStateFuncs is a set of functions that read/update global state implicitly.
	ImplicitStateFuncs = map[string]bool{
		"last_hit_time":      true,
		"last_hit_count":     true,
		"last_agg_hit_time":  true,
		"last_agg_hit_count": true,
	}
)
View Source
var (
	Language = &ParseTree{}
)
View Source
var WindowFuncs = map[string]struct{}{
	"tumblingwindow": {},
	"hoppingwindow":  {},
	"sessionwindow":  {},
	"slidingwindow":  {},
	"countwindow":    {},
	"dedup_trigger":  {},
}

Functions

func Eval

func Eval(expr ast.Expr, m Valuer) interface{}

Eval evaluates expr against a map.

func ExecFunc

func ExecFunc(funcName string, f api.Function, args []interface{}, fctx api.FunctionContext) (interface{}, bool)

func GetDataSource

func GetDataSource(m kv.KeyValue, name string) (stmt *ast.StreamStmt, err error)

func GetStatementFromSql

func GetStatementFromSql(sql string) (stmt *ast.SelectStatement, err error)

func GetStreams

func GetStreams(stmt *ast.SelectStatement) (result []string)

func HasAggFuncs

func HasAggFuncs(node ast.Node) bool

func IsAggregate

func IsAggregate(expr ast.Expr) (r bool)

IsAggregate check if an expression is aggregate with the binding alias info

func IsTextFormat

func IsTextFormat(format string) bool

func NewAggregateFunctionValuers

func NewAggregateFunctionValuers(p *funcRuntime) (*FunctionValuer, *AggregateFunctionValuer)

Should only be called by stream to make sure a single instance for an operation

func NewFuncRuntime

func NewFuncRuntime(ctx api.StreamContext) *funcRuntime

func Validate

func Validate(stmt *ast.SelectStatement) error

Validate select statement without context. This is the pre-validation. In planner, there will be a more comprehensive validation after binding

func WithAggFields

func WithAggFields(stmt *ast.SelectStatement) bool

Types

type AffiliateRow

type AffiliateRow struct {
	CalCols  map[string]interface{} // mutable and must be cloned when broadcast
	AliasMap map[string]interface{}
	// contains filtered or unexported fields
}

AffiliateRow part of other row types do help calculation of newly added cols

func (*AffiliateRow) AliasValue

func (d *AffiliateRow) AliasValue(key string) (interface{}, bool)

func (*AffiliateRow) AppendAlias

func (d *AffiliateRow) AppendAlias(key string, value interface{}) bool

func (*AffiliateRow) Clone

func (d *AffiliateRow) Clone() AffiliateRow

func (*AffiliateRow) Del

func (d *AffiliateRow) Del(col string)

func (*AffiliateRow) IsEmpty

func (d *AffiliateRow) IsEmpty() bool

func (*AffiliateRow) MergeMap

func (d *AffiliateRow) MergeMap(cachedMap map[string]interface{})

func (*AffiliateRow) Pick

func (d *AffiliateRow) Pick(cols [][]string) [][]string

func (*AffiliateRow) Set

func (d *AffiliateRow) Set(col string, value interface{})

func (*AffiliateRow) Value

func (d *AffiliateRow) Value(key, table string) (interface{}, bool)

type AggregateCallValuer

type AggregateCallValuer interface {
	CallValuer
	GetAllTuples() AggregateData
	GetSingleCallValuer() CallValuer
}

type AggregateData

type AggregateData interface {
	AggregateEval(expr ast.Expr, v CallValuer) []interface{}
}

AggregateData Could be a tuple or collection

type AggregateFunctionValuer

type AggregateFunctionValuer struct {
	// contains filtered or unexported fields
}

func (*AggregateFunctionValuer) AliasValue

func (v *AggregateFunctionValuer) AliasValue(_ string) (interface{}, bool)

func (*AggregateFunctionValuer) AppendAlias

func (*AggregateFunctionValuer) AppendAlias(string, interface{}) bool

func (*AggregateFunctionValuer) Call

func (v *AggregateFunctionValuer) Call(name string, funcId int, args []interface{}) (interface{}, bool)

func (*AggregateFunctionValuer) FuncValue

func (v *AggregateFunctionValuer) FuncValue(key string) (interface{}, bool)

func (*AggregateFunctionValuer) GetAllTuples

func (v *AggregateFunctionValuer) GetAllTuples() AggregateData

func (*AggregateFunctionValuer) GetSingleCallValuer

func (v *AggregateFunctionValuer) GetSingleCallValuer() CallValuer

func (*AggregateFunctionValuer) Meta

func (v *AggregateFunctionValuer) Meta(_, _ string) (interface{}, bool)

func (*AggregateFunctionValuer) SetData

func (v *AggregateFunctionValuer) SetData(data AggregateData)

func (*AggregateFunctionValuer) Value

func (v *AggregateFunctionValuer) Value(_, _ string) (interface{}, bool)

type Alias

type Alias struct {
	AliasMap map[string]interface{}
}

Alias will not need to convert cases

type AliasValuer

type AliasValuer interface {
	// AliasValue Get the value of alias
	AliasValue(name string) (interface{}, bool)
	// AppendAlias set the alias result
	AppendAlias(key string, value interface{}) bool
}

AliasValuer is used to calculate and cache the alias value

type BracketEvalResult

type BracketEvalResult struct {
	Start, End int
}

type CallValuer

type CallValuer interface {
	Valuer

	// Call is invoked to evaluate a function call (if possible).
	Call(name string, funcId int, args []interface{}) (interface{}, bool)
}

CallValuer implements the Call method for evaluating function calls.

type Collection

type Collection interface {
	HasTracerCtx
	api.MessageTupleList
	SortingData
	// GroupRange through each group. For non-grouped collection, the whole data is a single group
	GroupRange(func(i int, aggRow CollectionRow) (bool, error)) error
	// Range through each row. For grouped collection, each row is an aggregation of groups
	Range(func(i int, r ReadonlyRow) (bool, error)) error
	// RangeSet range through each row by cloning the row
	RangeSet(func(i int, r Row) (bool, error)) error
	Filter(indexes []int) Collection
	GetWindowRange() *WindowRange
	// ToMaps returns the data as a map
	ToMaps() []map[string]interface{}
	// SetIsAgg Set by project, indicate if the collection is used in an aggregate context which will affect ToMaps output
	SetIsAgg(isAgg bool)
	// ToAggMaps returns the aggregated data as a map
	ToAggMaps() []map[string]interface{}
	// ToRowMaps returns all the data in the collection
	ToRowMaps() []map[string]interface{}
	// GetBySrc returns the rows by the given emitter
	GetBySrc(emitter string) []Row
	// Clone the collection
	Clone() Collection
}

Collection A collection of rows as a table. It is used for window, join, group by, etc.

type CollectionRow

type CollectionRow interface {
	RawRow
	AggregateData
}

CollectionRow is the aggregation row of a non-grouped collection. Thinks of it as a single group. The row data is immutable

type ControlTuple

type ControlTuple interface {
	ControlType() string
}

type EOFTuple

type EOFTuple int

type EmittedData

type EmittedData interface {
	// GetEmitter returns the emitter of the row
	GetEmitter() string
}

EmittedData is data that is produced by a specific source

type ErrorSourceTuple

type ErrorSourceTuple struct {
	Error error `json:"error"`
}

func (*ErrorSourceTuple) Message

func (t *ErrorSourceTuple) Message() map[string]interface{}

func (*ErrorSourceTuple) Meta

func (t *ErrorSourceTuple) Meta() map[string]interface{}

func (*ErrorSourceTuple) Timestamp

func (t *ErrorSourceTuple) Timestamp() time.Time

type Event

type Event interface {
	GetTimestamp() time.Time
	IsWatermark() bool
}

type FuncValuer

type FuncValuer interface {
	FuncValue(key string) (interface{}, bool)
}

FuncValuer can calculate function type value like window_start and window_end

type FunctionValuer

type FunctionValuer struct {
	// contains filtered or unexported fields
}

FunctionValuer ONLY use NewFunctionValuer function to initialize

func NewFunctionValuer

func NewFunctionValuer(p *funcRuntime) *FunctionValuer

Should only be called by stream to make sure a single instance for an operation

func (*FunctionValuer) AliasValue

func (*FunctionValuer) AliasValue(string) (interface{}, bool)

func (*FunctionValuer) AppendAlias

func (*FunctionValuer) AppendAlias(string, interface{}) bool

func (*FunctionValuer) Call

func (fv *FunctionValuer) Call(name string, funcId int, args []interface{}) (interface{}, bool)

func (*FunctionValuer) Meta

func (*FunctionValuer) Meta(_, _ string) (interface{}, bool)

func (*FunctionValuer) Value

func (*FunctionValuer) Value(_, _ string) (interface{}, bool)

type GroupedTuples

type GroupedTuples struct {
	Ctx     api.StreamContext
	Content []Row
	*WindowRange
	AffiliateRow
	// contains filtered or unexported fields
}

GroupedTuples is a collection of tuples grouped by a key

func (*GroupedTuples) AggregateEval

func (s *GroupedTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}

func (*GroupedTuples) All

func (s *GroupedTuples) All(_ string) (map[string]interface{}, bool)

func (*GroupedTuples) Clone

func (s *GroupedTuples) Clone() Row

func (*GroupedTuples) GetTracerCtx

func (s *GroupedTuples) GetTracerCtx() api.StreamContext

func (*GroupedTuples) Meta

func (s *GroupedTuples) Meta(key, table string) (interface{}, bool)

func (*GroupedTuples) Pick

func (s *GroupedTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)

func (*GroupedTuples) SetTracerCtx

func (s *GroupedTuples) SetTracerCtx(ctx api.StreamContext)

func (*GroupedTuples) ToMap

func (s *GroupedTuples) ToMap() map[string]interface{}

func (*GroupedTuples) Value

func (s *GroupedTuples) Value(key, table string) (interface{}, bool)

type GroupedTuplesSet

type GroupedTuplesSet struct {
	Ctx    api.StreamContext
	Groups []*GroupedTuples
	*WindowRange
}

func (*GroupedTuplesSet) Clone

func (s *GroupedTuplesSet) Clone() Collection

func (*GroupedTuplesSet) Filter

func (s *GroupedTuplesSet) Filter(groups []int) Collection

Filter clone and return the filtered set

func (*GroupedTuplesSet) GetBySrc

func (s *GroupedTuplesSet) GetBySrc(_ string) []Row

GetBySrc to be implemented to support join after join

func (*GroupedTuplesSet) GetTracerCtx

func (s *GroupedTuplesSet) GetTracerCtx() api.StreamContext

func (*GroupedTuplesSet) GetWindowRange

func (s *GroupedTuplesSet) GetWindowRange() *WindowRange

func (*GroupedTuplesSet) GroupRange

func (s *GroupedTuplesSet) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error

func (*GroupedTuplesSet) Index

func (s *GroupedTuplesSet) Index(i int) Row

func (*GroupedTuplesSet) Len

func (s *GroupedTuplesSet) Len() int

func (*GroupedTuplesSet) Range

func (s *GroupedTuplesSet) Range(f func(i int, r ReadonlyRow) (bool, error)) error

func (*GroupedTuplesSet) RangeOfTuples

func (s *GroupedTuplesSet) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)

func (*GroupedTuplesSet) RangeSet

func (s *GroupedTuplesSet) RangeSet(f func(i int, r Row) (bool, error)) error

func (*GroupedTuplesSet) SetIsAgg

func (s *GroupedTuplesSet) SetIsAgg(_ bool)

func (*GroupedTuplesSet) SetTracerCtx

func (s *GroupedTuplesSet) SetTracerCtx(ctx api.StreamContext)

func (*GroupedTuplesSet) Swap

func (s *GroupedTuplesSet) Swap(i, j int)

func (*GroupedTuplesSet) ToAggMaps

func (s *GroupedTuplesSet) ToAggMaps() []map[string]any

func (*GroupedTuplesSet) ToMaps

func (s *GroupedTuplesSet) ToMaps() []map[string]any

func (*GroupedTuplesSet) ToRowMaps

func (s *GroupedTuplesSet) ToRowMaps() []map[string]any

type HasTracerCtx

type HasTracerCtx interface {
	GetTracerCtx() api.StreamContext
	SetTracerCtx(ctx api.StreamContext)
}

type JoinTuple

type JoinTuple struct {
	Ctx    api.StreamContext
	Tuples []Row // The content is immutable, but the slice may be added or removed
	AffiliateRow
	// contains filtered or unexported fields
}

JoinTuple is a row produced by a join operation

func (*JoinTuple) AddTuple

func (jt *JoinTuple) AddTuple(tuple Row)

func (*JoinTuple) AddTuples

func (jt *JoinTuple) AddTuples(tuples []Row)

func (*JoinTuple) AggregateEval

func (jt *JoinTuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{}

func (*JoinTuple) All

func (jt *JoinTuple) All(stream string) (map[string]interface{}, bool)

func (*JoinTuple) Clone

func (jt *JoinTuple) Clone() Row

func (*JoinTuple) GetEmitter

func (jt *JoinTuple) GetEmitter() string

func (*JoinTuple) GetTracerCtx

func (jt *JoinTuple) GetTracerCtx() api.StreamContext

func (*JoinTuple) Meta

func (jt *JoinTuple) Meta(key, table string) (interface{}, bool)

func (*JoinTuple) Pick

func (jt *JoinTuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)

func (*JoinTuple) SetTracerCtx

func (jt *JoinTuple) SetTracerCtx(ctx api.StreamContext)

func (*JoinTuple) ToMap

func (jt *JoinTuple) ToMap() map[string]interface{}

func (*JoinTuple) Value

func (jt *JoinTuple) Value(key, table string) (interface{}, bool)

type JoinTuples

type JoinTuples struct {
	Ctx     api.StreamContext
	Content []*JoinTuple
	*WindowRange

	AffiliateRow
	// contains filtered or unexported fields
}

func (*JoinTuples) AggregateEval

func (s *JoinTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}

func (*JoinTuples) All

func (s *JoinTuples) All(_ string) (map[string]interface{}, bool)

func (*JoinTuples) Clone

func (s *JoinTuples) Clone() Collection

func (*JoinTuples) Filter

func (s *JoinTuples) Filter(indexes []int) Collection

Filter the tuples by the given predicate

func (*JoinTuples) GetBySrc

func (s *JoinTuples) GetBySrc(_ string) []Row

GetBySrc to be implemented to support join after join

func (*JoinTuples) GetTracerCtx

func (s *JoinTuples) GetTracerCtx() api.StreamContext

func (*JoinTuples) GetWindowRange

func (s *JoinTuples) GetWindowRange() *WindowRange

func (*JoinTuples) GroupRange

func (s *JoinTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error

func (*JoinTuples) Index

func (s *JoinTuples) Index(i int) Row

func (*JoinTuples) Len

func (s *JoinTuples) Len() int

func (*JoinTuples) Meta

func (s *JoinTuples) Meta(key, table string) (interface{}, bool)

func (*JoinTuples) Pick

func (s *JoinTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)

func (*JoinTuples) Range

func (s *JoinTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error

func (*JoinTuples) RangeOfTuples

func (s *JoinTuples) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)

func (*JoinTuples) RangeSet

func (s *JoinTuples) RangeSet(f func(i int, r Row) (bool, error)) error

func (*JoinTuples) SetIsAgg

func (s *JoinTuples) SetIsAgg(_ bool)

func (*JoinTuples) SetTracerCtx

func (s *JoinTuples) SetTracerCtx(ctx api.StreamContext)

func (*JoinTuples) Swap

func (s *JoinTuples) Swap(i, j int)

func (*JoinTuples) ToAggMaps

func (s *JoinTuples) ToAggMaps() []map[string]interface{}

func (*JoinTuples) ToMap

func (s *JoinTuples) ToMap() map[string]interface{}

func (*JoinTuples) ToMaps

func (s *JoinTuples) ToMaps() []map[string]interface{}

func (*JoinTuples) ToRowMaps

func (s *JoinTuples) ToRowMaps() []map[string]interface{}

func (*JoinTuples) Value

func (s *JoinTuples) Value(key, table string) (interface{}, bool)

type Message

type Message map[string]interface{}

Message is a valuer that substitutes values for the mapped interface. It is the basic type for data events.

func ToMessage

func ToMessage(input interface{}) (Message, bool)

func (Message) Get

func (m Message) Get(key string) (value any, ok bool)

func (Message) Meta

func (m Message) Meta(key, table string) (interface{}, bool)

func (Message) Range

func (m Message) Range(f func(key string, value any) bool)

func (Message) ToMap

func (m Message) ToMap() map[string]any

func (Message) Value

func (m Message) Value(key, _ string) (interface{}, bool)

type MetaData

type MetaData interface {
	MetaData() Metadata
}

type Metadata

type Metadata Message

func (Metadata) Meta

func (m Metadata) Meta(key, table string) (interface{}, bool)

func (Metadata) Value

func (m Metadata) Value(key, table string) (interface{}, bool)

type MultiFunc

type MultiFunc interface {
	ValidateWithName(args []ast.Expr, name string) error
	ExecWithName(args []interface{}, ctx api.FunctionContext, name string) (interface{}, bool)
}

MultiFunc hack for builtin functions that works for multiple functions

type MultiSorter

type MultiSorter struct {
	Ctx api.StreamContext
	SortingData
	// contains filtered or unexported fields
}

MultiSorter implements the Sort interface, sorting the changes within.

func OrderedBy

func OrderedBy(fields ast.SortFields, fv *FunctionValuer, afv *AggregateFunctionValuer) *MultiSorter

OrderedBy returns a Sorter that sorts using the less functions, in order. Call its Sort method to sort the data.

func (*MultiSorter) GetTracerCtx

func (ms *MultiSorter) GetTracerCtx() api.StreamContext

func (*MultiSorter) Less

func (ms *MultiSorter) Less(i, j int) bool

Less is part of sort.Interface. It is implemented by looping along the less functions until it finds a comparison that discriminates between the two items (one is less than the other). Note that it can call the less functions twice per call. We could change the functions to return -1, 0, 1 and reduce the number of calls for greater efficiency: an exercise for the reader.

func (*MultiSorter) SetTracerCtx

func (ms *MultiSorter) SetTracerCtx(ctx api.StreamContext)

func (*MultiSorter) Sort

func (ms *MultiSorter) Sort(data SortingData) error

Sort sorts the argument slice according to the fewer functions passed to OrderedBy.

func (*MultiSorter) Swap

func (ms *MultiSorter) Swap(i, j int)

type ParseTree

type ParseTree struct {
	Handlers map[string]func(*Parser) (ast.Statement, error)
	Tokens   map[string]*ParseTree
	Keys     []string
}

func (*ParseTree) Handle

func (pt *ParseTree) Handle(lit string, fn func(*Parser) (ast.Statement, error))

func (*ParseTree) Parse

func (pt *ParseTree) Parse(p *Parser) (ast.Statement, error)

type Parser

type Parser struct {
	// contains filtered or unexported fields
}

func NewParser

func NewParser(r io.Reader) *Parser

func NewParserWithSources

func NewParserWithSources(r io.Reader, sources []string) *Parser

func (*Parser) ConvertToWindows

func (p *Parser) ConvertToWindows(wtype ast.WindowType, args []ast.Expr) (*ast.Window, error)

func (*Parser) Parse

func (p *Parser) Parse() (*ast.SelectStatement, error)

func (*Parser) ParseCondition

func (p *Parser) ParseCondition() (ast.Expr, error)

func (*Parser) ParseCreateStmt

func (p *Parser) ParseCreateStmt() (ast.Statement, error)

func (*Parser) ParseExpr

func (p *Parser) ParseExpr() (ast.Expr, error)

func (*Parser) ParseJoin

func (p *Parser) ParseJoin(joinType ast.JoinType) (*ast.Join, error)

func (*Parser) ParseLimit

func (p *Parser) ParseLimit() (ast.Expr, error)

func (*Parser) ParseOver4Window

func (p *Parser) ParseOver4Window() (ast.Expr, error)

func (*Parser) ParseQueries

func (p *Parser) ParseQueries() ([]ast.SelectStatement, error)

type RawRow

type RawRow interface {
	ReadonlyRow
	// Del Only for some ops like functionOp * and Alias
	Del(col string)
	// Set Only for some ops like functionOp *
	Set(col string, value interface{})
	// ToMap converts the row to a map to export to other systems *
	ToMap() map[string]interface{}
	// Pick the columns and discard others. It replaces the underlying message with a new value. There are 3 types to pick: column, alias and anonymous expressions.
	// cols is a list [columnname, tablename]
	Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)
}

RawRow is the basic data type for logical row. It could be a row or a collection row.

type RawTuple

type RawTuple struct {
	Ctx       api.StreamContext
	Emitter   string
	Timestamp time.Time
	Rawdata   []byte
	Metadata  Metadata // immutable
	Props     map[string]string
}

func (*RawTuple) AllProps

func (r *RawTuple) AllProps() map[string]string

func (*RawTuple) DynamicProps

func (r *RawTuple) DynamicProps(template string) (string, bool)

func (*RawTuple) GetTracerCtx

func (r *RawTuple) GetTracerCtx() api.StreamContext

func (*RawTuple) Meta

func (r *RawTuple) Meta(key, table string) (any, bool)

func (*RawTuple) Raw

func (r *RawTuple) Raw() []byte

func (*RawTuple) Replace

func (r *RawTuple) Replace(new []byte)

func (*RawTuple) SetTracerCtx

func (r *RawTuple) SetTracerCtx(ctx api.StreamContext)

type ReadonlyRow

type ReadonlyRow interface {
	HasTracerCtx
	Valuer
	AliasValuer
	Wildcarder
}

type Row

type Row interface {
	RawRow
	Clone() Row
}

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

func NewScanner

func NewScanner(r io.Reader) *Scanner

func (*Scanner) Scan

func (s *Scanner) Scan() (tok ast.Token, lit string)

func (*Scanner) ScanBackquoteIdent

func (s *Scanner) ScanBackquoteIdent() (tok ast.Token, lit string)

func (*Scanner) ScanDigit

func (s *Scanner) ScanDigit() (tok ast.Token, lit string)

func (*Scanner) ScanIdent

func (s *Scanner) ScanIdent() (tok ast.Token, lit string)

func (*Scanner) ScanNumber

func (s *Scanner) ScanNumber(startWithDot bool, isNeg bool) (tok ast.Token, lit string)

func (*Scanner) ScanString

func (s *Scanner) ScanString(isSingle bool) (tok ast.Token, lit string)

func (*Scanner) ScanWhiteSpace

func (s *Scanner) ScanWhiteSpace() (tok ast.Token, lit string)

type SortingData

type SortingData interface {
	HasTracerCtx
	Len() int
	Swap(i, j int)
	Index(i int) Row
}

type StreamInfo

type StreamInfo struct {
	StreamType ast.StreamType `json:"streamType"`
	StreamKind string         `json:"streamKind"`
	Statement  string         `json:"statement"`
}

func GetDataSourceStatement

func GetDataSourceStatement(m kv.KeyValue, name string) (*StreamInfo, error)

type TransformedTupleList

type TransformedTupleList struct {
	Ctx     api.StreamContext
	Content []api.MessageTuple
	Maps    []map[string]any
	Props   map[string]string
}

func (*TransformedTupleList) AllProps

func (l *TransformedTupleList) AllProps() map[string]string

func (*TransformedTupleList) Clone

func (*TransformedTupleList) DynamicProps

func (l *TransformedTupleList) DynamicProps(template string) (string, bool)

func (*TransformedTupleList) GetTracerCtx

func (l *TransformedTupleList) GetTracerCtx() api.StreamContext

func (*TransformedTupleList) Len

func (l *TransformedTupleList) Len() int

func (*TransformedTupleList) RangeOfTuples

func (l *TransformedTupleList) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)

func (*TransformedTupleList) SetTracerCtx

func (l *TransformedTupleList) SetTracerCtx(ctx api.StreamContext)

func (*TransformedTupleList) ToMaps

func (l *TransformedTupleList) ToMaps() []map[string]any

type Tuple

type Tuple struct {
	Ctx       api.StreamContext
	Emitter   string
	Message   Message // the original pointer is immutable & big; may be cloned.
	Timestamp time.Time
	Metadata  Metadata // immutable
	Props     map[string]string

	AffiliateRow
	// contains filtered or unexported fields
}

Tuple The input row, produced by the source

func (*Tuple) AggregateEval

func (t *Tuple) AggregateEval(expr ast.Expr, v CallValuer) []interface{}

func (*Tuple) All

func (t *Tuple) All(string) (map[string]any, bool)

func (*Tuple) AllMeta

func (t *Tuple) AllMeta() map[string]any

func (*Tuple) AllProps

func (t *Tuple) AllProps() map[string]string

func (*Tuple) Clone

func (t *Tuple) Clone() Row

func (*Tuple) Created

func (t *Tuple) Created() time.Time

func (*Tuple) DynamicProps

func (t *Tuple) DynamicProps(template string) (string, bool)

func (*Tuple) FuncValue

func (t *Tuple) FuncValue(key string) (interface{}, bool)

func (*Tuple) GetEmitter

func (t *Tuple) GetEmitter() string

func (*Tuple) GetTimestamp

func (t *Tuple) GetTimestamp() time.Time

func (*Tuple) GetTracerCtx

func (t *Tuple) GetTracerCtx() api.StreamContext

func (*Tuple) IsWatermark

func (t *Tuple) IsWatermark() bool

func (*Tuple) Meta

func (t *Tuple) Meta(key, table string) (interface{}, bool)

func (*Tuple) MetaData

func (t *Tuple) MetaData() Metadata

func (*Tuple) Pick

func (t *Tuple) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)

func (*Tuple) SetTracerCtx

func (t *Tuple) SetTracerCtx(ctx api.StreamContext)

func (*Tuple) ToMap

func (t *Tuple) ToMap() map[string]interface{}

ToMap should only use in sink.

func (*Tuple) Value

func (t *Tuple) Value(key, table string) (interface{}, bool)

type Valuer

type Valuer interface {
	// Value returns the value and existence flag for a given key.
	Value(key, table string) (interface{}, bool)
	Meta(key, table string) (interface{}, bool)
}

Valuer is the interface that wraps the Value() method.

func MultiAggregateValuer

func MultiAggregateValuer(data AggregateData, singleCallValuer CallValuer, valuers ...Valuer) Valuer

func MultiValuer

func MultiValuer(valuers ...Valuer) Valuer

MultiValuer returns a Valuer that iterates over multiple Valuer instances to find a match.

type ValuerEval

type ValuerEval struct {
	Valuer Valuer

	// IntegerFloatDivision will set the eval system to treat
	// a division between two integers as a floating point division.
	IntegerFloatDivision bool
}

ValuerEval will evaluate an expression using the Valuer.

func (*ValuerEval) Eval

func (v *ValuerEval) Eval(expr ast.Expr) interface{}

Eval evaluates an expression and returns a value. map the expression to the correct valuer

type WatermarkTuple

type WatermarkTuple struct {
	Timestamp time.Time
}

func (*WatermarkTuple) GetTimestamp

func (t *WatermarkTuple) GetTimestamp() time.Time

func (*WatermarkTuple) IsWatermark

func (t *WatermarkTuple) IsWatermark() bool

type WildcardValuer

type WildcardValuer struct {
	Data Wildcarder
}

func (*WildcardValuer) Meta

func (wv *WildcardValuer) Meta(_, _ string) (interface{}, bool)

func (*WildcardValuer) Value

func (wv *WildcardValuer) Value(key, table string) (interface{}, bool)

type Wildcarder

type Wildcarder interface {
	// All Value returns the value and existence flag for a given key.
	All(table string) (map[string]any, bool)
}

type WindowRange

type WindowRange struct {
	// contains filtered or unexported fields
}

func NewWindowRange

func NewWindowRange(windowStart int64, windowEnd int64) *WindowRange

func (*WindowRange) FuncValue

func (r *WindowRange) FuncValue(key string) (interface{}, bool)

type WindowRangeValuer

type WindowRangeValuer struct {
	*WindowRange
}

func (WindowRangeValuer) Meta

func (w WindowRangeValuer) Meta(_, _ string) (interface{}, bool)

func (WindowRangeValuer) Value

func (w WindowRangeValuer) Value(_, _ string) (interface{}, bool)

type WindowTuples

type WindowTuples struct {
	Ctx     api.StreamContext
	Content []Row // immutable
	*WindowRange

	AffiliateRow
	// contains filtered or unexported fields
}

func (*WindowTuples) AddTuple

func (w *WindowTuples) AddTuple(tuple Row) *WindowTuples

func (*WindowTuples) AggregateEval

func (w *WindowTuples) AggregateEval(expr ast.Expr, v CallValuer) []interface{}

func (*WindowTuples) All

func (w *WindowTuples) All(_ string) (map[string]interface{}, bool)

func (*WindowTuples) Clone

func (w *WindowTuples) Clone() Collection

func (*WindowTuples) Filter

func (w *WindowTuples) Filter(indexes []int) Collection

Filter the tuples by the given predicate

func (*WindowTuples) GetBySrc

func (w *WindowTuples) GetBySrc(emitter string) []Row

func (*WindowTuples) GetTracerCtx

func (w *WindowTuples) GetTracerCtx() api.StreamContext

func (*WindowTuples) GetWindowRange

func (w *WindowTuples) GetWindowRange() *WindowRange

func (*WindowTuples) GroupRange

func (w *WindowTuples) GroupRange(f func(i int, aggRow CollectionRow) (bool, error)) error

func (*WindowTuples) Index

func (w *WindowTuples) Index(index int) Row

func (*WindowTuples) Len

func (w *WindowTuples) Len() int

func (*WindowTuples) Meta

func (w *WindowTuples) Meta(key, table string) (interface{}, bool)

func (*WindowTuples) Pick

func (w *WindowTuples) Pick(allWildcard bool, cols [][]string, wildcardEmitters map[string]bool, except []string)

func (*WindowTuples) Range

func (w *WindowTuples) Range(f func(i int, r ReadonlyRow) (bool, error)) error

func (*WindowTuples) RangeOfTuples

func (w *WindowTuples) RangeOfTuples(f func(index int, tuple api.MessageTuple) bool)

func (*WindowTuples) RangeSet

func (w *WindowTuples) RangeSet(f func(i int, r Row) (bool, error)) error

func (*WindowTuples) SetIsAgg

func (w *WindowTuples) SetIsAgg(_ bool)

func (*WindowTuples) SetTracerCtx

func (w *WindowTuples) SetTracerCtx(ctx api.StreamContext)

func (*WindowTuples) Swap

func (w *WindowTuples) Swap(i, j int)

func (*WindowTuples) ToAggMaps

func (w *WindowTuples) ToAggMaps() []map[string]interface{}

func (*WindowTuples) ToMap

func (w *WindowTuples) ToMap() map[string]interface{}

func (*WindowTuples) ToMaps

func (w *WindowTuples) ToMaps() []map[string]interface{}

func (*WindowTuples) ToRowMaps

func (w *WindowTuples) ToRowMaps() []map[string]interface{}

func (*WindowTuples) Value

func (w *WindowTuples) Value(key, table string) (interface{}, bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL