Documentation ¶
Overview ¶
Package execute contains the implementation of the execution phase in the query engine.
Index ¶
- Constants
- Variables
- func AddNewTableCols(t flux.Table, builder TableBuilder, colMap []int) ([]int, error)
- func AddTableCols(t flux.Table, builder TableBuilder) error
- func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
- func AggregateSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
- func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
- func AppendCols(cr flux.ColReader, builder TableBuilder) error
- func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
- func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordExplicit(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordWithDefaults(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
- func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
- func AppendRecordForCols(i int, cr flux.ColReader, builder TableBuilder, cols []flux.ColMeta) error
- func AppendTable(t flux.Table, builder TableBuilder) error
- func CacheOneTimeTable(t flux.Table, a *Allocator) (flux.Table, error)
- func CheckColType(col flux.ColMeta, typ flux.ColType)
- func ColIdx(label string, cols []flux.ColMeta) int
- func ColMap(colMap []int, builder TableBuilder, cr flux.ColReader) []int
- func ColsMatch(builder TableBuilder, cr flux.ColReader) bool
- func ContainsStr(strs []string, str string) bool
- func ConvertFromKind(k semantic.Nature) flux.ColType
- func ConvertToKind(t flux.ColType) semantic.Nature
- func CopyTable(t flux.Table, a *Allocator) (flux.Table, error)
- func GroupKeyForRowOn(i int, cr flux.ColReader, on map[string]bool) flux.GroupKey
- func HasCol(label string, cols []flux.ColMeta) bool
- func NewAggregateTransformation(d Dataset, c TableBuilderCache, agg Aggregate, config AggregateConfig) *aggregateTransformation
- func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
- func NewGroupKey(cols []flux.ColMeta, values []values.Value) flux.GroupKey
- func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
- func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
- func NewTableBuilderCache(a *Allocator) *tableBuilderCache
- func PanicUnknownType(typ flux.ColType)
- func RegisterSource(k plan.ProcedureKind, c CreateNewPlannerSource)
- func RegisterTransformation(k plan.ProcedureKind, c CreateNewPlannerTransformation)
- func SelectorSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
- func ValueForRow(cr flux.ColReader, i, j int) values.Value
- type AccumulationMode
- type Administration
- type Aggregate
- type AggregateConfig
- type AllocError
- type Allocator
- func (a *Allocator) AppendBools(slice []bool, vs ...bool) []bool
- func (a *Allocator) AppendFloats(slice []float64, vs ...float64) []float64
- func (a *Allocator) AppendInts(slice []int64, vs ...int64) []int64
- func (a *Allocator) AppendStrings(slice []string, vs ...string) []string
- func (a *Allocator) AppendTimes(slice []Time, vs ...Time) []Time
- func (a *Allocator) AppendUInts(slice []uint64, vs ...uint64) []uint64
- func (a *Allocator) Bools(l, c int) []bool
- func (a *Allocator) Floats(l, c int) []float64
- func (a *Allocator) Free(n, size int)
- func (a *Allocator) GrowBools(slice []bool, n int) []bool
- func (a *Allocator) GrowFloats(slice []float64, n int) []float64
- func (a *Allocator) GrowInts(slice []int64, n int) []int64
- func (a *Allocator) GrowStrings(slice []string, n int) []string
- func (a *Allocator) GrowTimes(slice []Time, n int) []Time
- func (a *Allocator) GrowUInts(slice []uint64, n int) []uint64
- func (a *Allocator) Ints(l, c int) []int64
- func (a *Allocator) Max() int64
- func (a *Allocator) Strings(l, c int) []string
- func (a *Allocator) Times(l, c int) []Time
- func (a *Allocator) UInts(l, c int) []uint64
- type BoolValueFunc
- type Bounds
- type ColListTable
- func (t *ColListTable) Bools(j int) []bool
- func (t *ColListTable) Cols() []flux.ColMeta
- func (t *ColListTable) Copy() *ColListTable
- func (t *ColListTable) Do(f func(flux.ColReader) error) error
- func (t *ColListTable) Empty() bool
- func (t *ColListTable) Floats(j int) []float64
- func (t *ColListTable) GetRow(row int) values.Object
- func (t *ColListTable) Ints(j int) []int64
- func (t *ColListTable) Key() flux.GroupKey
- func (t *ColListTable) Len() int
- func (t *ColListTable) NRows() int
- func (t *ColListTable) RefCount(n int)
- func (t *ColListTable) Strings(j int) []string
- func (t *ColListTable) Times(j int) []Time
- func (t *ColListTable) UInts(j int) []uint64
- type ColListTableBuilder
- func (b ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
- func (b ColListTableBuilder) AppendBool(j int, value bool) error
- func (b ColListTableBuilder) AppendBools(j int, values []bool) error
- func (b ColListTableBuilder) AppendFloat(j int, value float64) error
- func (b ColListTableBuilder) AppendFloats(j int, values []float64) error
- func (b ColListTableBuilder) AppendInt(j int, value int64) error
- func (b ColListTableBuilder) AppendInts(j int, values []int64) error
- func (b ColListTableBuilder) AppendString(j int, value string) error
- func (b ColListTableBuilder) AppendStrings(j int, values []string) error
- func (b ColListTableBuilder) AppendTime(j int, value Time) error
- func (b ColListTableBuilder) AppendTimes(j int, values []Time) error
- func (b ColListTableBuilder) AppendUInt(j int, value uint64) error
- func (b ColListTableBuilder) AppendUInts(j int, values []uint64) error
- func (b ColListTableBuilder) AppendValue(j int, v values.Value) error
- func (b ColListTableBuilder) ClearData()
- func (b ColListTableBuilder) Cols() []flux.ColMeta
- func (b ColListTableBuilder) GrowBools(j, n int) error
- func (b ColListTableBuilder) GrowFloats(j, n int) error
- func (b ColListTableBuilder) GrowInts(j, n int) error
- func (b ColListTableBuilder) GrowStrings(j, n int) error
- func (b ColListTableBuilder) GrowTimes(j, n int) error
- func (b ColListTableBuilder) GrowUInts(j, n int) error
- func (b ColListTableBuilder) Key() flux.GroupKey
- func (b ColListTableBuilder) LevelColumns() error
- func (b ColListTableBuilder) NCols() int
- func (b ColListTableBuilder) NRows() int
- func (b ColListTableBuilder) RawTable() *ColListTable
- func (b ColListTableBuilder) SetBool(i int, j int, value bool) error
- func (b ColListTableBuilder) SetFloat(i int, j int, value float64) error
- func (b ColListTableBuilder) SetInt(i int, j int, value int64) error
- func (b ColListTableBuilder) SetString(i int, j int, value string) error
- func (b ColListTableBuilder) SetTime(i int, j int, value Time) error
- func (b ColListTableBuilder) SetUInt(i int, j int, value uint64) error
- func (b ColListTableBuilder) SetValue(i, j int, v values.Value) error
- func (b ColListTableBuilder) Sort(cols []string, desc bool)
- func (b ColListTableBuilder) Table() (flux.Table, error)
- type CreateNewPlannerSource
- type CreateNewPlannerTransformation
- type CreateSource
- type CreateTransformation
- type DataCache
- type Dataset
- func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, agg Aggregate, config AggregateConfig, ...) (*aggregateTransformation, Dataset)
- func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, ...) (*indexSelectorTransformation, Dataset)
- func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, ...) (*rowSelectorTransformation, Dataset)
- type DatasetID
- type Dependencies
- type Dispatcher
- type DoBoolAgg
- type DoBoolIndexSelector
- type DoBoolRowSelector
- type DoFloatAgg
- type DoFloatIndexSelector
- type DoFloatRowSelector
- type DoIntAgg
- type DoIntIndexSelector
- type DoIntRowSelector
- type DoStringAgg
- type DoStringIndexSelector
- type DoStringRowSelector
- type DoUIntAgg
- type DoUIntIndexSelector
- type DoUIntRowSelector
- type Duration
- type Executor
- type FinishMsg
- type FloatValueFunc
- type FormatOptions
- type Formatter
- type GroupKeyBuilder
- type GroupLookup
- type IndexSelector
- type IntValueFunc
- type Message
- type MessageQueue
- type MessageType
- type Node
- type OneTimeTable
- type ProcessMsg
- type Record
- func (r *Record) Array() values.Array
- func (r *Record) Bool() bool
- func (r *Record) Duration() values.Duration
- func (r *Record) Equal(rhs values.Value) bool
- func (r *Record) Float() float64
- func (r *Record) Function() values.Function
- func (r *Record) Get(name string) (values.Value, bool)
- func (r *Record) Int() int64
- func (r *Record) Len() int
- func (r *Record) Object() values.Object
- func (r *Record) PolyType() semantic.PolyType
- func (r *Record) Range(f func(name string, v values.Value))
- func (r *Record) Regexp() *regexp.Regexp
- func (r *Record) Set(name string, v values.Value)
- func (r *Record) Str() string
- func (r *Record) Time() values.Time
- func (r *Record) Type() semantic.Type
- func (r *Record) UInt() uint64
- type RetractTableMsg
- type Row
- type RowMapFn
- type RowPredicateFn
- type RowSelector
- type Rower
- type ScheduleFunc
- type SelectorConfig
- type Source
- type StreamContext
- type StringValueFunc
- type TableBuilder
- type TableBuilderCache
- type TableContext
- type Time
- type Transformation
- type Transport
- type Trigger
- type TriggerContext
- type UIntValueFunc
- type UpdateProcessingTimeMsg
- type UpdateWatermarkMsg
- type ValueFunc
- type Window
Constants ¶
const ( MaxTime = math.MaxInt64 MinTime = math.MinInt64 )
const ( DefaultStartColLabel = "_start" DefaultStopColLabel = "_stop" DefaultTimeColLabel = "_time" DefaultValueColLabel = "_value" )
Variables ¶
var AllTime = Bounds{ Start: MinTime, Stop: MaxTime, }
var DefaultAggregateConfig = AggregateConfig{ Columns: []string{DefaultValueColLabel}, }
var DefaultTriggerSpec = flux.AfterWatermarkTriggerSpec{}
DefaultTriggerSpec defines the triggering that should be used for datasets whose parent transformation is not a windowing transformation.
Functions ¶
func AddNewTableCols ¶
AddNewCols adds the columns of b onto builder that did not already exist. Returns the mapping of builder cols to table cols.
func AddTableCols ¶
func AddTableCols(t flux.Table, builder TableBuilder) error
AddTableCols adds the columns of b onto builder.
func AddTableKeyCols ¶
func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
func AggregateSignature ¶
func AggregateSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
AggregateSignature returns a function signature common to all aggregate functions, with any additional arguments.
func AppendCol ¶
func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
AppendCol append a column from cr onto builder The indexes bj and cj are builder and col reader indexes respectively.
func AppendCols ¶
func AppendCols(cr flux.ColReader, builder TableBuilder) error
AppendCols appends all columns from cr onto builder. This function assumes that builder and cr have the same column schema.
func AppendKeyValues ¶
func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
func AppendMappedCols ¶
func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedCols appends all columns from cr onto builder. The colMap is a map of builder column index to cr column index.
func AppendMappedRecordExplicit ¶
AppendMappedRecordWExplicit appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, no value is appended.
func AppendMappedRecordWithDefaults ¶
func AppendMappedRecordWithDefaults(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedRecordWithDefaults appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, a default value is assigned to the builder's column
func AppendMappedTable ¶
func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
AppendMappedTable appends data from table t onto builder. The colMap is a map of builder column index to table column index.
func AppendRecord ¶
func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
AppendRecord appends the record from cr onto builder assuming matching columns.
func AppendRecordForCols ¶
AppendRecordForCols appends the only the columns provided from cr onto builder.
func AppendTable ¶
func AppendTable(t flux.Table, builder TableBuilder) error
AppendTable appends data from table t onto builder. This function assumes builder and t have the same column schema.
func CacheOneTimeTable ¶
CacheOneTimeTable returns a table that can be read multiple times. If the table is not a OneTimeTable it is returned directly. Otherwise its contents are read into a new table.
func ColMap ¶
func ColMap(colMap []int, builder TableBuilder, cr flux.ColReader) []int
ColMap writes a mapping of builder index to column reader index into colMap. When colMap does not have enough capacity a new colMap is allocated. The colMap is always returned
func ColsMatch ¶
func ColsMatch(builder TableBuilder, cr flux.ColReader) bool
ColsMatch returns true if builder and cr have identical column sets (order dependent)
func ContainsStr ¶
func GroupKeyForRowOn ¶
func NewAggregateTransformation ¶
func NewAggregateTransformation(d Dataset, c TableBuilderCache, agg Aggregate, config AggregateConfig) *aggregateTransformation
func NewDataset ¶
func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
func NewIndexSelectorTransformation ¶
func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
func NewRowSelectorTransformation ¶
func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
func NewTableBuilderCache ¶
func NewTableBuilderCache(a *Allocator) *tableBuilderCache
func PanicUnknownType ¶
func RegisterSource ¶
func RegisterSource(k plan.ProcedureKind, c CreateNewPlannerSource)
func RegisterTransformation ¶
func RegisterTransformation(k plan.ProcedureKind, c CreateNewPlannerTransformation)
func SelectorSignature ¶
func SelectorSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
SelectorSignature returns a function signature common to all selector functions, with any additional arguments.
Types ¶
type AccumulationMode ¶
type AccumulationMode int
const ( DiscardingMode AccumulationMode = iota AccumulatingMode AccumulatingRetractingMode )
type Administration ¶
type Administration interface { Context() context.Context ResolveTime(qt flux.Time) Time StreamContext() StreamContext Allocator() *Allocator Parents() []DatasetID Dependencies() Dependencies }
type Aggregate ¶
type Aggregate interface { NewBoolAgg() DoBoolAgg NewIntAgg() DoIntAgg NewUIntAgg() DoUIntAgg NewFloatAgg() DoFloatAgg NewStringAgg() DoStringAgg }
type AggregateConfig ¶
type AggregateConfig struct { plan.DefaultCost Columns []string `json:"columns"` }
func (AggregateConfig) Copy ¶
func (c AggregateConfig) Copy() AggregateConfig
type AllocError ¶
func (AllocError) Error ¶
func (a AllocError) Error() string
type Allocator ¶
type Allocator struct { Limit int64 // contains filtered or unexported fields }
Allocator tracks the amount of memory being consumed by a query. The allocator provides methods similar to make and append, to allocate large slices of data. The allocator also provides a Free method to account for when memory will be freed.
func (*Allocator) AppendBools ¶
AppendBools appends bools to a slice
func (*Allocator) AppendFloats ¶
AppendFloats appends float64s to a slice
func (*Allocator) AppendInts ¶
AppendInts appends int64s to a slice
func (*Allocator) AppendStrings ¶
AppendStrings appends strings to a slice. Only the string headers are accounted for.
func (*Allocator) AppendTimes ¶
AppendTimes appends Times to a slice
func (*Allocator) AppendUInts ¶
AppendUInts appends uint64s to a slice
func (*Allocator) Max ¶
Max reports the maximum amount of allocated memory at any point in the query.
func (*Allocator) Strings ¶
Strings makes a slice of string values. Only the string headers are accounted for.
type BoolValueFunc ¶
type BoolValueFunc interface {
ValueBool() bool
}
type ColListTable ¶
type ColListTable struct {
// contains filtered or unexported fields
}
ColListTable implements Table using list of columns. All data for the table is stored in RAM. As a result At* methods are provided directly on the table for easy access.
func (*ColListTable) Bools ¶
func (t *ColListTable) Bools(j int) []bool
func (*ColListTable) Cols ¶
func (t *ColListTable) Cols() []flux.ColMeta
func (*ColListTable) Copy ¶
func (t *ColListTable) Copy() *ColListTable
func (*ColListTable) Empty ¶
func (t *ColListTable) Empty() bool
func (*ColListTable) Floats ¶
func (t *ColListTable) Floats(j int) []float64
func (*ColListTable) GetRow ¶
func (t *ColListTable) GetRow(row int) values.Object
GetRow takes a row index and returns the record located at that index in the cache
func (*ColListTable) Ints ¶
func (t *ColListTable) Ints(j int) []int64
func (*ColListTable) Key ¶
func (t *ColListTable) Key() flux.GroupKey
func (*ColListTable) Len ¶
func (t *ColListTable) Len() int
func (*ColListTable) NRows ¶
func (t *ColListTable) NRows() int
func (*ColListTable) RefCount ¶
func (t *ColListTable) RefCount(n int)
func (*ColListTable) Strings ¶
func (t *ColListTable) Strings(j int) []string
func (*ColListTable) Times ¶
func (t *ColListTable) Times(j int) []Time
func (*ColListTable) UInts ¶
func (t *ColListTable) UInts(j int) []uint64
type ColListTableBuilder ¶
type ColListTableBuilder struct {
// contains filtered or unexported fields
}
func NewColListTableBuilder ¶
func NewColListTableBuilder(key flux.GroupKey, a *Allocator) *ColListTableBuilder
func (ColListTableBuilder) AddCol ¶
func (b ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
func (ColListTableBuilder) AppendBool ¶
func (b ColListTableBuilder) AppendBool(j int, value bool) error
func (ColListTableBuilder) AppendBools ¶
func (b ColListTableBuilder) AppendBools(j int, values []bool) error
func (ColListTableBuilder) AppendFloat ¶
func (b ColListTableBuilder) AppendFloat(j int, value float64) error
func (ColListTableBuilder) AppendFloats ¶
func (b ColListTableBuilder) AppendFloats(j int, values []float64) error
func (ColListTableBuilder) AppendInt ¶
func (b ColListTableBuilder) AppendInt(j int, value int64) error
func (ColListTableBuilder) AppendInts ¶
func (b ColListTableBuilder) AppendInts(j int, values []int64) error
func (ColListTableBuilder) AppendString ¶
func (b ColListTableBuilder) AppendString(j int, value string) error
func (ColListTableBuilder) AppendStrings ¶
func (b ColListTableBuilder) AppendStrings(j int, values []string) error
func (ColListTableBuilder) AppendTime ¶
func (b ColListTableBuilder) AppendTime(j int, value Time) error
func (ColListTableBuilder) AppendTimes ¶
func (b ColListTableBuilder) AppendTimes(j int, values []Time) error
func (ColListTableBuilder) AppendUInt ¶
func (b ColListTableBuilder) AppendUInt(j int, value uint64) error
func (ColListTableBuilder) AppendUInts ¶
func (b ColListTableBuilder) AppendUInts(j int, values []uint64) error
func (ColListTableBuilder) AppendValue ¶
func (b ColListTableBuilder) AppendValue(j int, v values.Value) error
func (ColListTableBuilder) ClearData ¶
func (b ColListTableBuilder) ClearData()
func (ColListTableBuilder) Cols ¶
func (b ColListTableBuilder) Cols() []flux.ColMeta
func (ColListTableBuilder) GrowBools ¶
func (b ColListTableBuilder) GrowBools(j, n int) error
func (ColListTableBuilder) GrowFloats ¶
func (b ColListTableBuilder) GrowFloats(j, n int) error
func (ColListTableBuilder) GrowInts ¶
func (b ColListTableBuilder) GrowInts(j, n int) error
func (ColListTableBuilder) GrowStrings ¶
func (b ColListTableBuilder) GrowStrings(j, n int) error
func (ColListTableBuilder) GrowTimes ¶
func (b ColListTableBuilder) GrowTimes(j, n int) error
func (ColListTableBuilder) GrowUInts ¶
func (b ColListTableBuilder) GrowUInts(j, n int) error
func (ColListTableBuilder) Key ¶
func (b ColListTableBuilder) Key() flux.GroupKey
func (ColListTableBuilder) LevelColumns ¶
func (b ColListTableBuilder) LevelColumns() error
func (ColListTableBuilder) NCols ¶
func (b ColListTableBuilder) NCols() int
func (ColListTableBuilder) NRows ¶
func (b ColListTableBuilder) NRows() int
func (ColListTableBuilder) RawTable ¶
func (b ColListTableBuilder) RawTable() *ColListTable
RawTable returns the underlying table being constructed. The table returned will be modified by future calls to any TableBuilder methods.
func (ColListTableBuilder) SetBool ¶
func (b ColListTableBuilder) SetBool(i int, j int, value bool) error
func (ColListTableBuilder) SetFloat ¶
func (b ColListTableBuilder) SetFloat(i int, j int, value float64) error
func (ColListTableBuilder) SetInt ¶
func (b ColListTableBuilder) SetInt(i int, j int, value int64) error
func (ColListTableBuilder) SetString ¶
func (b ColListTableBuilder) SetString(i int, j int, value string) error
func (ColListTableBuilder) SetTime ¶
func (b ColListTableBuilder) SetTime(i int, j int, value Time) error
func (ColListTableBuilder) SetUInt ¶
func (b ColListTableBuilder) SetUInt(i int, j int, value uint64) error
func (ColListTableBuilder) SetValue ¶
func (b ColListTableBuilder) SetValue(i, j int, v values.Value) error
func (ColListTableBuilder) Sort ¶
func (b ColListTableBuilder) Sort(cols []string, desc bool)
type CreateNewPlannerSource ¶
type CreateNewPlannerSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)
type CreateNewPlannerTransformation ¶
type CreateNewPlannerTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)
type CreateSource ¶
type CreateSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)
type CreateTransformation ¶
type CreateTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)
type DataCache ¶
type DataCache interface { Table(flux.GroupKey) (flux.Table, error) ForEach(func(flux.GroupKey)) ForEachWithContext(func(flux.GroupKey, Trigger, TableContext)) DiscardTable(flux.GroupKey) ExpireTable(flux.GroupKey) SetTriggerSpec(t flux.TriggerSpec) }
DataCache holds all working data for a transformation.
type Dataset ¶
type Dataset interface { Node RetractTable(key flux.GroupKey) error UpdateProcessingTime(t Time) error UpdateWatermark(mark Time) error Finish(error) SetTriggerSpec(t flux.TriggerSpec) }
Dataset represents the set of data produced by a transformation.
func NewAggregateTransformationAndDataset ¶
func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, agg Aggregate, config AggregateConfig, a *Allocator) (*aggregateTransformation, Dataset)
func NewIndexSelectorTransformationAndDataset ¶
func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, config SelectorConfig, a *Allocator) (*indexSelectorTransformation, Dataset)
func NewRowSelectorTransformationAndDataset ¶
func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, config SelectorConfig, a *Allocator) (*rowSelectorTransformation, Dataset)
type Dependencies ¶
type Dependencies map[string]interface{}
Dependencies represents the provided dependencies to the execution environment. The dependencies is opaque.
type Dispatcher ¶
type Dispatcher interface { // Schedule fn to be executed Schedule(fn ScheduleFunc) }
Dispatcher schedules work for a query. Each transformation submits work to be done to the dispatcher. Then the dispatcher schedules to work based on the available resources.
type DoBoolIndexSelector ¶
type DoBoolRowSelector ¶
type DoFloatAgg ¶
type DoFloatIndexSelector ¶
type DoFloatRowSelector ¶
type DoIntIndexSelector ¶
type DoIntRowSelector ¶
type DoStringAgg ¶
type DoStringIndexSelector ¶
type DoStringRowSelector ¶
type DoUIntIndexSelector ¶
type DoUIntRowSelector ¶
type Executor ¶
type Executor interface {
Execute(ctx context.Context, p *plan.PlanSpec, a *Allocator) (map[string]flux.Result, error)
}
func NewExecutor ¶
func NewExecutor(deps Dependencies, logger *zap.Logger) Executor
type FloatValueFunc ¶
type FloatValueFunc interface {
ValueFloat() float64
}
type FormatOptions ¶
type FormatOptions struct { // RepeatHeaderCount is the number of rows to print before printing the header again. // If zero then the headers are not repeated. RepeatHeaderCount int }
func DefaultFormatOptions ¶
func DefaultFormatOptions() *FormatOptions
type Formatter ¶
type Formatter struct {
// contains filtered or unexported fields
}
Formatter writes a table to a Writer.
func NewFormatter ¶
func NewFormatter(tbl flux.Table, opts *FormatOptions) *Formatter
NewFormatter creates a Formatter for a given table. If opts is nil, the DefaultFormatOptions are used.
type GroupKeyBuilder ¶
type GroupKeyBuilder struct {
// contains filtered or unexported fields
}
GroupKeyBuilder is used to construct a GroupKey by keeping a mutable copy in memory.
func NewGroupKeyBuilder ¶
func NewGroupKeyBuilder(key flux.GroupKey) *GroupKeyBuilder
NewGroupKeyBuilder creates a new GroupKeyBuilder from an existing GroupKey. If the GroupKey passed is nil, a blank GroupKeyBuilder is constructed.
func (*GroupKeyBuilder) AddKeyValue ¶
func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder
AddKeyValue will add a new group key to the existing builder.
func (*GroupKeyBuilder) Build ¶
func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)
Build will construct the GroupKey. If there is any problem with the GroupKey (such as one of the columns is not a valid type), the error will be returned here.
func (*GroupKeyBuilder) Grow ¶
func (gkb *GroupKeyBuilder) Grow(n int)
Grow will grow the internal capacity of the group key to the given number.
func (*GroupKeyBuilder) Len ¶
func (gkb *GroupKeyBuilder) Len() int
Len returns the current length of the group key.
type GroupLookup ¶
type GroupLookup struct {
// contains filtered or unexported fields
}
func NewGroupLookup ¶
func NewGroupLookup() *GroupLookup
func (*GroupLookup) Delete ¶
func (l *GroupLookup) Delete(key flux.GroupKey) (v interface{}, found bool)
func (*GroupLookup) Range ¶
func (l *GroupLookup) Range(f func(key flux.GroupKey, value interface{}))
Range will iterate over all groups keys in sorted order. Range must not be called within another call to Range. It is safe to call Set/Delete while ranging.
func (*GroupLookup) Set ¶
func (l *GroupLookup) Set(key flux.GroupKey, value interface{})
type IndexSelector ¶
type IndexSelector interface { NewBoolSelector() DoBoolIndexSelector NewIntSelector() DoIntIndexSelector NewUIntSelector() DoUIntIndexSelector NewFloatSelector() DoFloatIndexSelector NewStringSelector() DoStringIndexSelector }
type IntValueFunc ¶
type IntValueFunc interface {
ValueInt() int64
}
type Message ¶
type Message interface { Type() MessageType SrcDatasetID() DatasetID }
type MessageQueue ¶
MessageQueue provides a concurrency safe queue for messages. The queue must have a single consumer calling Pop.
type MessageType ¶
type MessageType int
const ( RetractTableType MessageType = iota ProcessType UpdateWatermarkType UpdateProcessingTimeType FinishType )
type Node ¶
type Node interface {
AddTransformation(t Transformation)
}
type OneTimeTable ¶
OneTimeTable is a Table that permits reading data only once. Specifically the ValueIterator may only be consumed once from any of the columns.
type ProcessMsg ¶
type RetractTableMsg ¶
type RowMapFn ¶
type RowMapFn struct {
// contains filtered or unexported fields
}
func NewRowMapFn ¶
func NewRowMapFn(fn *semantic.FunctionExpression) (*RowMapFn, error)
type RowPredicateFn ¶
type RowPredicateFn struct {
// contains filtered or unexported fields
}
func NewRowPredicateFn ¶
func NewRowPredicateFn(fn *semantic.FunctionExpression) (*RowPredicateFn, error)
type RowSelector ¶
type RowSelector interface { NewBoolSelector() DoBoolRowSelector NewIntSelector() DoIntRowSelector NewUIntSelector() DoUIntRowSelector NewFloatSelector() DoFloatRowSelector NewStringSelector() DoStringRowSelector }
type ScheduleFunc ¶
type ScheduleFunc func(throughput int)
ScheduleFunc is a function that represents work to do. The throughput is the maximum number of messages to process for this scheduling.
type SelectorConfig ¶
type SelectorConfig struct { plan.DefaultCost Column string `json:"column"` }
type StreamContext ¶
type StreamContext interface {
Bounds() *Bounds
}
StreamContext represents necessary context for a single stream of query data.
type StringValueFunc ¶
type StringValueFunc interface {
ValueString() string
}
type TableBuilder ¶
type TableBuilder interface { Key() flux.GroupKey NRows() int NCols() int Cols() []flux.ColMeta // AddCol increases the size of the table by one column. // The index of the column is returned. AddCol(flux.ColMeta) (int, error) // Set sets the value at the specified coordinates // The rows and columns must exist before calling set, otherwise Set panics. SetBool(i, j int, value bool) error SetInt(i, j int, value int64) error SetUInt(i, j int, value uint64) error SetFloat(i, j int, value float64) error SetString(i, j int, value string) error SetTime(i, j int, value Time) error SetValue(i, j int, value values.Value) error // Append will add a single value to the end of a column. Will set the number of // rows in the table to the size of the new column. It's the caller's job to make sure // that the expected number of rows in each column is equal. AppendBool(j int, value bool) error AppendInt(j int, value int64) error AppendUInt(j int, value uint64) error AppendFloat(j int, value float64) error AppendString(j int, value string) error AppendTime(j int, value Time) error AppendValue(j int, value values.Value) error // AppendBools and similar functions will append multiple values to column j. As above, // it will set the numer of rows in the table to the size of the new column. It's the // caller's job to make sure that the expected number of rows in each column is equal. AppendBools(j int, values []bool) error AppendInts(j int, values []int64) error AppendUInts(j int, values []uint64) error AppendFloats(j int, values []float64) error AppendStrings(j int, values []string) error AppendTimes(j int, values []Time) error // GrowBools and similar functions will extend column j by n zero-values for the respective type. // If the column has enough capacity, no reallocation is necessary. If the capacity is insufficient, // a new slice is allocated with 1.5*newCapacity. As with the Append functions, it is the // caller's job to make sure that the expected number of rows in each column is equal. GrowBools(j, n int) error GrowInts(j, n int) error GrowUInts(j, n int) error GrowFloats(j, n int) error GrowStrings(j, n int) error GrowTimes(j, n int) error // LevelColumns will check for columns that are too short and Grow them // so that each column is of uniform size. LevelColumns() error // Sort the rows of the by the values of the columns in the order listed. Sort(cols []string, desc bool) // Clear removes all rows, while preserving the column meta data. ClearData() // Table returns the table that has been built. // Further modifications of the builder will not effect the returned table. Table() (flux.Table, error) }
TableBuilder builds tables that can be used multiple times
type TableBuilderCache ¶
type TableBuilderCache interface { // TableBuilder returns an existing or new TableBuilder for the given meta data. // The boolean return value indicates if TableBuilder is new. TableBuilder(key flux.GroupKey) (TableBuilder, bool) ForEachBuilder(f func(flux.GroupKey, TableBuilder)) }
type TableContext ¶
type Transformation ¶
type Transport ¶
type Transport interface { Transformation // Finished reports when the Transport has completed and there is no more work to do. Finished() <-chan struct{} }
type Trigger ¶
type Trigger interface { Triggered(TriggerContext) bool Finished() bool Reset() }
func NewTriggerFromSpec ¶
func NewTriggerFromSpec(spec flux.TriggerSpec) Trigger
type TriggerContext ¶
type TriggerContext struct { Table TableContext Watermark Time CurrentProcessingTime Time }
type UIntValueFunc ¶
type UIntValueFunc interface {
ValueUInt() uint64
}
type UpdateProcessingTimeMsg ¶
type UpdateWatermarkMsg ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package executetest contains utilities for testing the query execution phase.
|
Package executetest contains utilities for testing the query execution phase. |