Overview ¶
Package execute contains the implementation of the execution phase in the query engine.
Index ¶
- Constants
- Variables
- func AddNewTableCols(t flux.Table, builder TableBuilder, colMap []int) ([]int, error)
- func AddTableCols(t flux.Table, builder TableBuilder) error
- func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
- func AggregateSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
- func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
- func AppendCols(cr flux.ColReader, builder TableBuilder) error
- func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
- func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordExplicit(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
- func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
- func AppendTable(t flux.Table, builder TableBuilder) error
- func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool
- func CacheOneTimeTable(t flux.Table, a *memory.Allocator) (flux.Table, error)
- func CheckColType(col flux.ColMeta, typ flux.ColType)
- func ColIdx(label string, cols []flux.ColMeta) int
- func ColMap(colMap []int, builder TableBuilder, cr flux.ColReader) []int
- func ContainsStr(strs []string, str string) bool
- func ConvertFromKind(k semantic.Nature) flux.ColType
- func ConvertToKind(t flux.ColType) semantic.Nature
- func CopyTable(t flux.Table, a *memory.Allocator) (flux.Table, error)
- func FormatResult(w io.Writer, res flux.Result) error
- func GroupKeyForRowOn(i int, cr flux.ColReader, on map[string]bool) flux.GroupKey
- func HasCol(label string, cols []flux.ColMeta) bool
- func NewAggregateTransformation(d Dataset, c TableBuilderCache, agg Aggregate, config AggregateConfig) *aggregateTransformation
- func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
- func NewGroupKey(cols []flux.ColMeta, values []values.Value) flux.GroupKey
- func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
- func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
- func NewTableBuilderCache(a *memory.Allocator) *tableBuilderCache
- func PanicUnknownType(typ flux.ColType)
- func RegisterSource(k plan.ProcedureKind, c CreateNewPlannerSource)
- func RegisterTransformation(k plan.ProcedureKind, c CreateNewPlannerTransformation)
- func ReplaceTransformation(k plan.ProcedureKind, c CreateNewPlannerTransformation)
- func SelectorSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
- func TablesEqual(left, right flux.Table, alloc *memory.Allocator) (bool, error)
- func ValueForRow(cr flux.ColReader, i, j int) values.Value
- type AccumulationMode
- type Administration
- type Aggregate
- type AggregateConfig
- type Allocator
- func (a *Allocator) AppendBools(slice []bool, vs ...bool) []bool
- func (a *Allocator) AppendFloats(slice []float64, vs ...float64) []float64
- func (a *Allocator) AppendInts(slice []int64, vs ...int64) []int64
- func (a *Allocator) AppendStrings(slice []string, vs ...string) []string
- func (a *Allocator) AppendTimes(slice []Time, vs ...Time) []Time
- func (a *Allocator) AppendUInts(slice []uint64, vs ...uint64) []uint64
- func (a *Allocator) Bools(l, c int) []bool
- func (a *Allocator) Floats(l, c int) []float64
- func (a *Allocator) Free(n, size int)
- func (a *Allocator) GrowBools(slice []bool, n int) []bool
- func (a *Allocator) GrowFloats(slice []float64, n int) []float64
- func (a *Allocator) GrowInts(slice []int64, n int) []int64
- func (a *Allocator) GrowStrings(slice []string, n int) []string
- func (a *Allocator) GrowTimes(slice []Time, n int) []Time
- func (a *Allocator) GrowUInts(slice []uint64, n int) []uint64
- func (a *Allocator) Ints(l, c int) []int64
- func (a *Allocator) Strings(l, c int) []string
- func (a *Allocator) Times(l, c int) []Time
- func (a *Allocator) UInts(l, c int) []uint64
- type BoolValueFunc
- type Bounds
- type ColListTable
- func (t *ColListTable) Bools(j int) *array.Boolean
- func (t *ColListTable) Cols() []flux.ColMeta
- func (t *ColListTable) Copy() *ColListTable
- func (t *ColListTable) Do(f func(flux.ColReader) error) error
- func (t *ColListTable) Empty() bool
- func (t *ColListTable) Floats(j int) *array.Float64
- func (t *ColListTable) GetRow(row int) values.Object
- func (t *ColListTable) Ints(j int) *array.Int64
- func (t *ColListTable) Key() flux.GroupKey
- func (t *ColListTable) Len() int
- func (t *ColListTable) NRows() int
- func (t *ColListTable) RefCount(n int)
- func (t *ColListTable) Strings(j int) *array.Binary
- func (t *ColListTable) Times(j int) *array.Int64
- func (t *ColListTable) UInts(j int) *array.Uint64
- type ColListTableBuilder
- func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
- func (b *ColListTableBuilder) AppendBool(j int, value bool) error
- func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error
- func (b *ColListTableBuilder) AppendFloat(j int, value float64) error
- func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float64) error
- func (b *ColListTableBuilder) AppendInt(j int, value int64) error
- func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int64) error
- func (b *ColListTableBuilder) AppendNil(j int) error
- func (b *ColListTableBuilder) AppendString(j int, value string) error
- func (b *ColListTableBuilder) AppendStrings(j int, vs *array.Binary) error
- func (b *ColListTableBuilder) AppendTime(j int, value Time) error
- func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int64) error
- func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error
- func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint64) error
- func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error
- func (b *ColListTableBuilder) Bools(j int) []bool
- func (b *ColListTableBuilder) ClearData()
- func (b *ColListTableBuilder) Cols() []flux.ColMeta
- func (b *ColListTableBuilder) Floats(j int) []float64
- func (b *ColListTableBuilder) GetRow(row int) values.Object
- func (b *ColListTableBuilder) GrowBools(j, n int) error
- func (b *ColListTableBuilder) GrowFloats(j, n int) error
- func (b *ColListTableBuilder) GrowInts(j, n int) error
- func (b *ColListTableBuilder) GrowStrings(j, n int) error
- func (b *ColListTableBuilder) GrowTimes(j, n int) error
- func (b *ColListTableBuilder) GrowUInts(j, n int) error
- func (b *ColListTableBuilder) Ints(j int) []int64
- func (b *ColListTableBuilder) Key() flux.GroupKey
- func (b *ColListTableBuilder) Len() int
- func (b *ColListTableBuilder) LevelColumns() error
- func (b *ColListTableBuilder) NCols() int
- func (b *ColListTableBuilder) NRows() int
- func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error
- func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error
- func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error
- func (b *ColListTableBuilder) SetNil(i, j int) error
- func (b *ColListTableBuilder) SetString(i int, j int, value string) error
- func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error
- func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error
- func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error
- func (b *ColListTableBuilder) SliceColumns(start, stop int) error
- func (b *ColListTableBuilder) Sort(cols []string, desc bool)
- func (b *ColListTableBuilder) Strings(j int) []string
- func (b *ColListTableBuilder) Table() (flux.Table, error)
- func (b *ColListTableBuilder) Times(j int) []values.Time
- func (b *ColListTableBuilder) UInts(j int) []uint64
- type CreateNewPlannerSource
- type CreateNewPlannerTransformation
- type CreateSource
- type CreateTransformation
- type DataCache
- type Dataset
- func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, agg Aggregate, config AggregateConfig, ...) (*aggregateTransformation, Dataset)
- func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, ...) (*indexSelectorTransformation, Dataset)
- func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, ...) (*rowSelectorTransformation, Dataset)
- type DatasetID
- type Dependencies
- type Dispatcher
- type DoBoolAgg
- type DoBoolIndexSelector
- type DoBoolRowSelector
- type DoFloatAgg
- type DoFloatIndexSelector
- type DoFloatRowSelector
- type DoIntAgg
- type DoIntIndexSelector
- type DoIntRowSelector
- type DoStringAgg
- type DoStringIndexSelector
- type DoStringRowSelector
- type DoUIntAgg
- type DoUIntIndexSelector
- type DoUIntRowSelector
- type Duration
- type Executor
- type FinishMsg
- type FloatValueFunc
- type FormatOptions
- type Formatter
- type GroupKeyBuilder
- func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder
- func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)
- func (gkb *GroupKeyBuilder) Grow(n int)
- func (gkb *GroupKeyBuilder) Len() int
- func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder
- type GroupLookup
- type IndexSelector
- type IntValueFunc
- type Message
- type MessageQueue
- type MessageType
- type MetadataNode
- type Node
- type OneTimeTable
- type ProcessMsg
- type Record
- func (r *Record) Array() values.Array
- func (r *Record) Bool() bool
- func (r *Record) Duration() values.Duration
- func (r *Record) Equal(rhs values.Value) bool
- func (r *Record) Float() float64
- func (r *Record) Function() values.Function
- func (r *Record) Get(name string) (values.Value, bool)
- func (r *Record) Int() int64
- func (r *Record) IsNull() bool
- func (r *Record) Len() int
- func (r *Record) Object() values.Object
- func (r *Record) PolyType() semantic.PolyType
- func (r *Record) Range(f func(name string, v values.Value))
- func (r *Record) Regexp() *regexp.Regexp
- func (r *Record) Set(name string, v values.Value)
- func (r *Record) Str() string
- func (r *Record) Time() values.Time
- func (r *Record) Type() semantic.Type
- func (r *Record) UInt() uint64
- type RetractTableMsg
- type Row
- type RowMapFn
- type RowPredicateFn
- type RowReduceFn
- type RowSelector
- type Rower
- type ScheduleFunc
- type SelectorConfig
- type Source
- type SourceDecoder
- type StreamContext
- type StringValueFunc
- type TableBuilder
- type TableBuilderCache
- type TableContext
- type Time
- type Transformation
- type Transport
- type Trigger
- type TriggerContext
- type UIntValueFunc
- type UpdateProcessingTimeMsg
- type UpdateWatermarkMsg
- type ValueFunc
- type Window
Constants ¶
const ( MaxTime = math.MaxInt64 MinTime = math.MinInt64 )
const ( DefaultStartColLabel = "_start" DefaultStopColLabel = "_stop" DefaultTimeColLabel = "_time" DefaultValueColLabel = "_value" )
Variables ¶
var AllTime = Bounds{ Start: MinTime, Stop: MaxTime, }
var DefaultAggregateConfig = AggregateConfig{ Columns: []string{DefaultValueColLabel}, }
var DefaultSelectorConfig = SelectorConfig{ Column: DefaultValueColLabel, }
Functions ¶
func AddNewTableCols ¶
AddNewCols adds the columns of b onto builder that did not already exist. Returns the mapping of builder cols to table cols.
func AddTableCols ¶
func AddTableCols(t flux.Table, builder TableBuilder) error
AddTableCols adds the columns of b onto builder.
func AddTableKeyCols ¶
func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
func AggregateSignature ¶
func AggregateSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
AggregateSignature returns a function signature common to all aggregate functions, with any additional arguments.
func AppendCol ¶
func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
AppendCol append a column from cr onto builder The indexes bj and cj are builder and col reader indexes respectively.
func AppendCols ¶
func AppendCols(cr flux.ColReader, builder TableBuilder) error
AppendCols appends all columns from cr onto builder. This function assumes that builder and cr have the same column schema.
func AppendKeyValues ¶
func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
func AppendMappedCols ¶
func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedCols appends all columns from cr onto builder. The colMap is a map of builder column index to cr column index.
func AppendMappedRecordExplicit ¶
AppendMappedRecordExplicit appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, no value is appended.
func AppendMappedRecordWithNulls ¶ added in v0.14.0
func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedRecordWithNulls appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, the column is created with null values
func AppendMappedTable ¶
func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
AppendMappedTable appends data from table t onto builder. The colMap is a map of builder column index to table column index.
func AppendRecord ¶
func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
AppendRecord appends the record from cr onto builder assuming matching columns.
func AppendTable ¶
func AppendTable(t flux.Table, builder TableBuilder) error
AppendTable appends data from table t onto builder. This function assumes builder and t have the same column schema.
func BuilderColsMatchReader ¶ added in v0.8.0
func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool
BuilderColsMatchReader returns true if builder and cr have identical column sets (order dependent)
func CacheOneTimeTable ¶
CacheOneTimeTable returns a table that can be read multiple times. If the table is not a OneTimeTable it is returned directly. Otherwise its contents are read into a new table.
func ColMap ¶
func ColMap(colMap []int, builder TableBuilder, cr flux.ColReader) []int
ColMap writes a mapping of builder index to column reader index into colMap. When colMap does not have enough capacity a new colMap is allocated. The colMap is always returned
func ContainsStr ¶
func FormatResult ¶ added in v0.20.0
FormatResult prints the result to w.
func GroupKeyForRowOn ¶
func NewAggregateTransformation ¶
func NewAggregateTransformation(d Dataset, c TableBuilderCache, agg Aggregate, config AggregateConfig) *aggregateTransformation
func NewDataset ¶
func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
func NewIndexSelectorTransformation ¶
func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
func NewRowSelectorTransformation ¶
func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
func NewTableBuilderCache ¶
func PanicUnknownType ¶
func RegisterSource ¶
func RegisterSource(k plan.ProcedureKind, c CreateNewPlannerSource)
func RegisterTransformation ¶
func RegisterTransformation(k plan.ProcedureKind, c CreateNewPlannerTransformation)
RegisterTransformation adds a new registration mapping of procedure kind to transformation.
func ReplaceTransformation ¶ added in v0.14.0
func ReplaceTransformation(k plan.ProcedureKind, c CreateNewPlannerTransformation)
ReplaceTransformation changes an existing transformation registration.
func SelectorSignature ¶
func SelectorSignature(args map[string]semantic.PolyType, required []string) semantic.FunctionPolySignature
SelectorSignature returns a function signature common to all selector functions, with any additional arguments.
func TablesEqual ¶ added in v0.8.0
TablesEqual takes two flux tables and compares them. Returns false if the tables have different keys, different columns, or if the data in any column does not match. Returns true otherwise. This function will consume the ColumnReader so if you are calling this from the a Process method, you may need to copy the table if you need to iterate over the data for other calculations.
Types ¶
type AccumulationMode ¶
type AccumulationMode int
const ( DiscardingMode AccumulationMode = iota AccumulatingMode AccumulatingRetractingMode )
type Administration ¶
type Administration interface { Context() context.Context ResolveTime(qt flux.Time) Time StreamContext() StreamContext Allocator() *memory.Allocator Parents() []DatasetID Dependencies() Dependencies }
type Aggregate ¶
type Aggregate interface { NewBoolAgg() DoBoolAgg NewIntAgg() DoIntAgg NewUIntAgg() DoUIntAgg NewFloatAgg() DoFloatAgg NewStringAgg() DoStringAgg }
type AggregateConfig ¶
type AggregateConfig struct { plan.DefaultCost Columns []string `json:"columns"` }
func (AggregateConfig) Copy ¶
func (c AggregateConfig) Copy() AggregateConfig
type Allocator ¶
Allocator tracks the amount of memory being consumed by a query. The allocator provides methods similar to make and append, to allocate large slices of data. The allocator also provides a Free method to account for when memory will be freed.
func (*Allocator) AppendBools ¶
AppendBools appends bools to a slice
func (*Allocator) AppendFloats ¶
AppendFloats appends float64s to a slice
func (*Allocator) AppendInts ¶
AppendInts appends int64s to a slice
func (*Allocator) AppendStrings ¶
AppendStrings appends strings to a slice. Only the string headers are accounted for.
func (*Allocator) AppendTimes ¶
AppendTimes appends Times to a slice
func (*Allocator) AppendUInts ¶
AppendUInts appends uint64s to a slice
func (*Allocator) Strings ¶
Strings makes a slice of string values. Only the string headers are accounted for.
type BoolValueFunc ¶
type BoolValueFunc interface {
ValueBool() bool
type Bounds ¶
type ColListTable ¶
type ColListTable struct {
// contains filtered or unexported fields
ColListTable implements Table using list of columns. All data for the table is stored in RAM. As a result At* methods are provided directly on the table for easy access.
func (*ColListTable) Cols ¶
func (t *ColListTable) Cols() []flux.ColMeta
func (*ColListTable) Copy ¶
func (t *ColListTable) Copy() *ColListTable
func (*ColListTable) Empty ¶
func (t *ColListTable) Empty() bool
func (*ColListTable) GetRow ¶
func (t *ColListTable) GetRow(row int) values.Object
GetRow takes a row index and returns the record located at that index in the cache
func (*ColListTable) Key ¶
func (t *ColListTable) Key() flux.GroupKey
func (*ColListTable) Len ¶
func (t *ColListTable) Len() int
func (*ColListTable) NRows ¶
func (t *ColListTable) NRows() int
func (*ColListTable) RefCount ¶
func (t *ColListTable) RefCount(n int)
type ColListTableBuilder ¶
type ColListTableBuilder struct {
// contains filtered or unexported fields
func NewColListTableBuilder ¶
func NewColListTableBuilder(key flux.GroupKey, a *memory.Allocator) *ColListTableBuilder
func (*ColListTableBuilder) AddCol ¶
func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
func (*ColListTableBuilder) AppendBool ¶
func (b *ColListTableBuilder) AppendBool(j int, value bool) error
func (*ColListTableBuilder) AppendBools ¶
func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error
func (*ColListTableBuilder) AppendFloat ¶
func (b *ColListTableBuilder) AppendFloat(j int, value float64) error
func (*ColListTableBuilder) AppendFloats ¶
func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float64) error
func (*ColListTableBuilder) AppendInt ¶
func (b *ColListTableBuilder) AppendInt(j int, value int64) error
func (*ColListTableBuilder) AppendInts ¶
func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int64) error
func (*ColListTableBuilder) AppendNil ¶ added in v0.14.0
func (b *ColListTableBuilder) AppendNil(j int) error
func (*ColListTableBuilder) AppendString ¶
func (b *ColListTableBuilder) AppendString(j int, value string) error
func (*ColListTableBuilder) AppendStrings ¶
func (b *ColListTableBuilder) AppendStrings(j int, vs *array.Binary) error
func (*ColListTableBuilder) AppendTime ¶
func (b *ColListTableBuilder) AppendTime(j int, value Time) error
func (*ColListTableBuilder) AppendTimes ¶
func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int64) error
func (*ColListTableBuilder) AppendUInt ¶
func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error
func (*ColListTableBuilder) AppendUInts ¶
func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint64) error
func (*ColListTableBuilder) AppendValue ¶
func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error
func (*ColListTableBuilder) Bools ¶ added in v0.10.0
func (b *ColListTableBuilder) Bools(j int) []bool
func (*ColListTableBuilder) ClearData ¶
func (b *ColListTableBuilder) ClearData()
func (*ColListTableBuilder) Cols ¶
func (b *ColListTableBuilder) Cols() []flux.ColMeta
func (*ColListTableBuilder) Floats ¶ added in v0.10.0
func (b *ColListTableBuilder) Floats(j int) []float64
func (*ColListTableBuilder) GetRow ¶ added in v0.10.0
func (b *ColListTableBuilder) GetRow(row int) values.Object
GetRow takes a row index and returns the record located at that index in the cache
func (*ColListTableBuilder) GrowBools ¶
func (b *ColListTableBuilder) GrowBools(j, n int) error
func (*ColListTableBuilder) GrowFloats ¶
func (b *ColListTableBuilder) GrowFloats(j, n int) error
func (*ColListTableBuilder) GrowInts ¶
func (b *ColListTableBuilder) GrowInts(j, n int) error
func (*ColListTableBuilder) GrowStrings ¶
func (b *ColListTableBuilder) GrowStrings(j, n int) error
func (*ColListTableBuilder) GrowTimes ¶
func (b *ColListTableBuilder) GrowTimes(j, n int) error
func (*ColListTableBuilder) GrowUInts ¶
func (b *ColListTableBuilder) GrowUInts(j, n int) error
func (*ColListTableBuilder) Ints ¶ added in v0.10.0
func (b *ColListTableBuilder) Ints(j int) []int64
func (*ColListTableBuilder) Key ¶
func (b *ColListTableBuilder) Key() flux.GroupKey
func (*ColListTableBuilder) Len ¶ added in v0.10.0
func (b *ColListTableBuilder) Len() int
func (*ColListTableBuilder) LevelColumns ¶
func (b *ColListTableBuilder) LevelColumns() error
func (*ColListTableBuilder) NCols ¶
func (b *ColListTableBuilder) NCols() int
func (*ColListTableBuilder) NRows ¶
func (b *ColListTableBuilder) NRows() int
func (*ColListTableBuilder) SetBool ¶
func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error
func (*ColListTableBuilder) SetFloat ¶
func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error
func (*ColListTableBuilder) SetInt ¶
func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error
func (*ColListTableBuilder) SetNil ¶ added in v0.14.0
func (b *ColListTableBuilder) SetNil(i, j int) error
func (*ColListTableBuilder) SetString ¶
func (b *ColListTableBuilder) SetString(i int, j int, value string) error
func (*ColListTableBuilder) SetTime ¶
func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error
func (*ColListTableBuilder) SetUInt ¶
func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error
func (*ColListTableBuilder) SetValue ¶
func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error
func (*ColListTableBuilder) SliceColumns ¶ added in v0.8.0
func (b *ColListTableBuilder) SliceColumns(start, stop int) error
SliceColumns iterates over each column of b and re-slices them to the range [start:stop].
func (*ColListTableBuilder) Sort ¶
func (b *ColListTableBuilder) Sort(cols []string, desc bool)
func (*ColListTableBuilder) Strings ¶ added in v0.10.0
func (b *ColListTableBuilder) Strings(j int) []string
func (*ColListTableBuilder) Times ¶ added in v0.10.0
func (b *ColListTableBuilder) Times(j int) []values.Time
func (*ColListTableBuilder) UInts ¶ added in v0.10.0
func (b *ColListTableBuilder) UInts(j int) []uint64
type CreateNewPlannerSource ¶
type CreateNewPlannerSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)
type CreateNewPlannerTransformation ¶
type CreateNewPlannerTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)
type CreateSource ¶
type CreateSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)
type CreateTransformation ¶
type CreateTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)
type DataCache ¶
type DataCache interface { Table(flux.GroupKey) (flux.Table, error) ForEach(func(flux.GroupKey)) ForEachWithContext(func(flux.GroupKey, Trigger, TableContext)) DiscardTable(flux.GroupKey) ExpireTable(flux.GroupKey) SetTriggerSpec(t plan.TriggerSpec) }
DataCache holds all working data for a transformation.
type Dataset ¶
type Dataset interface { Node RetractTable(key flux.GroupKey) error UpdateProcessingTime(t Time) error UpdateWatermark(mark Time) error Finish(error) SetTriggerSpec(t plan.TriggerSpec) }
Dataset represents the set of data produced by a transformation.
func NewAggregateTransformationAndDataset ¶
func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, agg Aggregate, config AggregateConfig, a *memory.Allocator) (*aggregateTransformation, Dataset)
func NewIndexSelectorTransformationAndDataset ¶
func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, config SelectorConfig, a *memory.Allocator) (*indexSelectorTransformation, Dataset)
func NewRowSelectorTransformationAndDataset ¶
func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, config SelectorConfig, a *memory.Allocator) (*rowSelectorTransformation, Dataset)
type Dependencies ¶
type Dependencies map[string]interface{}
Dependencies represents the provided dependencies to the execution environment. The dependencies is opaque.
type Dispatcher ¶
type Dispatcher interface { // Schedule fn to be executed Schedule(fn ScheduleFunc) }
Dispatcher schedules work for a query. Each transformation submits work to be done to the dispatcher. Then the dispatcher schedules to work based on the available resources.
type DoBoolIndexSelector ¶
type DoBoolRowSelector ¶
type DoFloatAgg ¶
type DoFloatIndexSelector ¶
type DoFloatRowSelector ¶
type DoIntIndexSelector ¶
type DoIntRowSelector ¶
type DoStringAgg ¶
type DoStringIndexSelector ¶
type DoStringRowSelector ¶
type DoUIntIndexSelector ¶
type DoUIntRowSelector ¶
type Executor ¶
type Executor interface { // Execute will begin execution of the plan.Spec using the memory allocator. // This returns a mapping of names to the query results. // This will also return a channel for the Metadata from the query. The channel // may return zero or more values. The returned channel must not require itself to // be read so the executor must allocate enough space in the channel so if the channel // is unread that it will not block. Execute(ctx context.Context, p *plan.Spec, a *memory.Allocator) (map[string]flux.Result, <-chan flux.Metadata, error) }
func NewExecutor ¶
func NewExecutor(deps Dependencies, logger *zap.Logger) Executor
type FloatValueFunc ¶
type FloatValueFunc interface {
ValueFloat() float64
type FormatOptions ¶
type FormatOptions struct { // RepeatHeaderCount is the number of rows to print before printing the header again. // If zero then the headers are not repeated. RepeatHeaderCount int NullRepresentation string }
func DefaultFormatOptions ¶
func DefaultFormatOptions() *FormatOptions
type Formatter ¶
type Formatter struct {
// contains filtered or unexported fields
Formatter writes a table to a Writer.
func NewFormatter ¶
func NewFormatter(tbl flux.Table, opts *FormatOptions) *Formatter
NewFormatter creates a Formatter for a given table. If opts is nil, the DefaultFormatOptions are used.
type GroupKeyBuilder ¶
type GroupKeyBuilder struct {
// contains filtered or unexported fields
GroupKeyBuilder is used to construct a GroupKey by keeping a mutable copy in memory.
func NewGroupKeyBuilder ¶
func NewGroupKeyBuilder(key flux.GroupKey) *GroupKeyBuilder
NewGroupKeyBuilder creates a new GroupKeyBuilder from an existing GroupKey. If the GroupKey passed is nil, a blank GroupKeyBuilder is constructed.
func (*GroupKeyBuilder) AddKeyValue ¶
func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder
AddKeyValue will add a new group key to the existing builder.
func (*GroupKeyBuilder) Build ¶
func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)
Build will construct the GroupKey. If there is any problem with the GroupKey (such as one of the columns is not a valid type), the error will be returned here.
func (*GroupKeyBuilder) Grow ¶
func (gkb *GroupKeyBuilder) Grow(n int)
Grow will grow the internal capacity of the group key to the given number.
func (*GroupKeyBuilder) Len ¶
func (gkb *GroupKeyBuilder) Len() int
Len returns the current length of the group key.
func (*GroupKeyBuilder) SetKeyValue ¶ added in v0.14.0
func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder
SetKeyValue will set an existing key/value to the given pair, or if key is not found, add a new group key to the existing builder.
type GroupLookup ¶
type GroupLookup struct {
// contains filtered or unexported fields
GroupLookup is a container that maps group keys to a value.
The GroupLookup container is optimized for appending values in order and iterating over them in the same order. The GroupLookup will always have a deterministic order for the Range call, but that order may be influenced by the order that inserts happen.
At the current moment, the GroupLookup maintains the groups in sorted order although future implementations may change that.
To optimize inserts, the lookup is kept in an array of arrays. The first layer keeps a group of sorted key groups and each of these groups maintains their own sorted list of keys. Each time a new key is added, it is appended to the end of one of the key lists. If a key needs to be added in the middle of a list, the list is split into two so that the key can be appended. The index of the last list to be used is maintained so that future inserts can skip past the first search for the key list and an insert can be done in constant time. Similarly, a lookup for a key that was just inserted will also be in constant time with the worst case time being O(n log n).
func (*GroupLookup) Delete ¶
func (l *GroupLookup) Delete(key flux.GroupKey) (v interface{}, found bool)
Delete will remove the key from this GroupLookup. It will return the same thing as a call to Lookup.
func (*GroupLookup) Lookup ¶
func (l *GroupLookup) Lookup(key flux.GroupKey) (interface{}, bool)
Lookup will retrieve the value associated with the given key if it exists.
func (*GroupLookup) Range ¶
func (l *GroupLookup) Range(f func(key flux.GroupKey, value interface{}))
Range will iterate over all groups keys in a stable ordering. Range must not be called within another call to Range. It is safe to call Set/Delete while ranging.
func (*GroupLookup) Set ¶
func (l *GroupLookup) Set(key flux.GroupKey, value interface{})
Set will set the value for the given key. It will overwrite an existing value.
type IndexSelector ¶
type IndexSelector interface { NewBoolSelector() DoBoolIndexSelector NewIntSelector() DoIntIndexSelector NewUIntSelector() DoUIntIndexSelector NewFloatSelector() DoFloatIndexSelector NewStringSelector() DoStringIndexSelector }
type IntValueFunc ¶
type IntValueFunc interface {
ValueInt() int64
type Message ¶
type Message interface { Type() MessageType SrcDatasetID() DatasetID }
type MessageQueue ¶
MessageQueue provides a concurrency safe queue for messages. The queue must have a single consumer calling Pop.
type MessageType ¶
type MessageType int
const ( RetractTableType MessageType = iota ProcessType UpdateWatermarkType UpdateProcessingTimeType FinishType )
type MetadataNode ¶ added in v0.21.0
MetadataNode is a node that has additional metadata that should be added to the result after it is processed.
type Node ¶
type Node interface {
AddTransformation(t Transformation)
type OneTimeTable ¶
OneTimeTable is a Table that permits reading data only once. Specifically the ValueIterator may only be consumed once from any of the columns.
type ProcessMsg ¶
type RetractTableMsg ¶
type RowMapFn ¶
type RowMapFn struct {
// contains filtered or unexported fields
func NewRowMapFn ¶
func NewRowMapFn(fn *semantic.FunctionExpression) (*RowMapFn, error)
type RowPredicateFn ¶
type RowPredicateFn struct {
// contains filtered or unexported fields
func NewRowPredicateFn ¶
func NewRowPredicateFn(fn *semantic.FunctionExpression) (*RowPredicateFn, error)
type RowReduceFn ¶ added in v0.23.0
type RowReduceFn struct {
// contains filtered or unexported fields
func NewRowReduceFn ¶ added in v0.23.0
func NewRowReduceFn(fn *semantic.FunctionExpression) (*RowReduceFn, error)
func (*RowReduceFn) Type ¶ added in v0.23.0
func (f *RowReduceFn) Type() semantic.Type
type RowSelector ¶
type RowSelector interface { NewBoolSelector() DoBoolRowSelector NewIntSelector() DoIntRowSelector NewUIntSelector() DoUIntRowSelector NewFloatSelector() DoFloatRowSelector NewStringSelector() DoStringRowSelector }
type ScheduleFunc ¶
type ScheduleFunc func(throughput int)
ScheduleFunc is a function that represents work to do. The throughput is the maximum number of messages to process for this scheduling.
type SelectorConfig ¶
type SelectorConfig struct { plan.DefaultCost Column string `json:"column"` }
type Source ¶
func CreateSourceFromDecoder ¶ added in v0.17.0
func CreateSourceFromDecoder(decoder SourceDecoder, dsid DatasetID, a Administration) (Source, error)
CreateSourceFromDecoder takes an implementation of a SourceDecoder, as well as a dataset ID and Administration type and creates an execute.Source.
type SourceDecoder ¶ added in v0.17.0
type SourceDecoder interface { Connect() error Fetch() (bool, error) Decode() (flux.Table, error) Close() error }
SourceDecoder is an interface that generalizes the process of retrieving data from an unspecified data source.
Connect implements the logic needed to connect directly to the data source.
Fetch implements a single fetch of data from the source (may be called multiple times). Should return false when there is no more data to retrieve.
Decode implements the process of marshaling the data returned by the source into a flux.Table type.
In executing the retrieval process, Connect is called once at the onset, and subsequent calls of Fetch() and Decode() are called iteratively until the data source is fully consumed.
type StreamContext ¶
type StreamContext interface {
Bounds() *Bounds
StreamContext represents necessary context for a single stream of query data.
type StringValueFunc ¶
type StringValueFunc interface {
ValueString() string
type TableBuilder ¶
type TableBuilder interface { Key() flux.GroupKey NRows() int NCols() int Cols() []flux.ColMeta // AddCol increases the size of the table by one column. // The index of the column is returned. AddCol(flux.ColMeta) (int, error) // Set sets the value at the specified coordinates // The rows and columns must exist before calling set, otherwise Set panics. SetValue(i, j int, value values.Value) error SetNil(i, j int) error // Append will add a single value to the end of a column. Will set the number of // rows in the table to the size of the new column. It's the caller's job to make sure // that the expected number of rows in each column is equal. AppendBool(j int, value bool) error AppendInt(j int, value int64) error AppendUInt(j int, value uint64) error AppendFloat(j int, value float64) error AppendString(j int, value string) error AppendTime(j int, value Time) error AppendValue(j int, value values.Value) error AppendNil(j int) error // AppendBools and similar functions will append multiple values to column j. As above, // it will set the numer of rows in the table to the size of the new column. It's the // caller's job to make sure that the expected number of rows in each column is equal. AppendBools(j int, vs *array.Boolean) error AppendInts(j int, vs *array.Int64) error AppendUInts(j int, vs *array.Uint64) error AppendFloats(j int, vs *array.Float64) error AppendStrings(j int, vs *array.Binary) error AppendTimes(j int, vs *array.Int64) error // GrowBools and similar functions will extend column j by n zero-values for the respective type. // If the column has enough capacity, no reallocation is necessary. If the capacity is insufficient, // a new slice is allocated with 1.5*newCapacity. As with the Append functions, it is the // caller's job to make sure that the expected number of rows in each column is equal. GrowBools(j, n int) error GrowInts(j, n int) error GrowUInts(j, n int) error GrowFloats(j, n int) error GrowStrings(j, n int) error GrowTimes(j, n int) error // LevelColumns will check for columns that are too short and Grow them // so that each column is of uniform size. LevelColumns() error // Sort the rows of the by the values of the columns in the order listed. Sort(cols []string, desc bool) // Clear removes all rows, while preserving the column meta data. ClearData() // Table returns the table that has been built. // Further modifications of the builder will not effect the returned table. Table() (flux.Table, error) }
TableBuilder builds tables that can be used multiple times
type TableBuilderCache ¶
type TableBuilderCache interface { // TableBuilder returns an existing or new TableBuilder for the given meta data. // The boolean return value indicates if TableBuilder is new. TableBuilder(key flux.GroupKey) (TableBuilder, bool) ForEachBuilder(f func(flux.GroupKey, TableBuilder)) }
type TableContext ¶
type Transformation ¶
type Transport ¶
type Transport interface { Transformation // Finished reports when the Transport has completed and there is no more work to do. Finished() <-chan struct{} }
type Trigger ¶
type Trigger interface { Triggered(TriggerContext) bool Finished() bool Reset() }
func NewTriggerFromSpec ¶
func NewTriggerFromSpec(spec plan.TriggerSpec) Trigger
type TriggerContext ¶
type TriggerContext struct { Table TableContext Watermark Time CurrentProcessingTime Time }
type UIntValueFunc ¶
type UIntValueFunc interface {
ValueUInt() uint64
type UpdateProcessingTimeMsg ¶
type UpdateWatermarkMsg ¶
type Window ¶
func NewWindow ¶ added in v0.19.0
NewWindow creates a window with the given parameters, and normalizes the offset to a small positive duration.
func (Window) GetEarliestBounds ¶ added in v0.19.0
GetEarliestBounds returns the bounds for the earliest window bounds that contains the given time t. For underlapping windows that do not contain time t, the window directly after time t will be returned.
func (Window) GetOverlappingBounds ¶ added in v0.19.0
GetOverlappingBounds returns a slice of bounds for each window that overlaps the input bounds b.
Source Files
Path | Synopsis |
Package executetest contains utilities for testing the query execution phase.
Package executetest contains utilities for testing the query execution phase. |