Documentation ¶
Overview ¶
Package execute contains the implementation of the execution phase in the query engine.
Index ¶
- Constants
- Variables
- func AddNewTableCols(t flux.Table, builder TableBuilder, colMap []int) ([]int, error)
- func AddTableCols(t flux.Table, builder TableBuilder) error
- func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
- func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
- func AppendCols(cr flux.ColReader, builder TableBuilder) error
- func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
- func AppendKeyValuesN(key flux.GroupKey, builder TableBuilder, n int) error
- func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordExplicit(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
- func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
- func AppendTable(t flux.Table, builder TableBuilder) error
- func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool
- func CheckColType(col flux.ColMeta, typ flux.ColType)
- func ColIdx(label string, cols []flux.ColMeta) int
- func ColMap(colMap []int, builder TableBuilder, cols []flux.ColMeta) []int
- func ContainsStr(strs []string, str string) bool
- func ConvertFromKind(k semantic.Nature) flux.ColType
- func ConvertToKind(t flux.ColType) semantic.Nature
- func CopyTable(t flux.Table) (flux.BufferedTable, error)
- func FormatResult(w io.Writer, res flux.Result) error
- func GroupKeyForRowOn(i int, cr flux.ColReader, on map[string]bool) flux.GroupKey
- func HasCol(label string, cols []flux.ColMeta) bool
- func NewAggregateTransformation(d Dataset, c TableBuilderCache, agg Aggregate, config AggregateConfig) *aggregateTransformation
- func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
- func NewEmptyTable(key flux.GroupKey, cols []flux.ColMeta) flux.Table
- func NewGroupKey(cols []flux.ColMeta, values []values.Value) flux.GroupKey
- func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
- func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
- func NewTableBuilderCache(a *memory.Allocator) *tableBuilderCache
- func PanicUnknownType(typ flux.ColType)
- func RegisterSource(k plan.ProcedureKind, c CreateSource)
- func RegisterTransformation(k plan.ProcedureKind, c CreateTransformation)
- func ReplaceTransformation(k plan.ProcedureKind, c CreateTransformation)
- func TablesEqual(left, right flux.Table, alloc *memory.Allocator) (bool, error)
- func ValueForRow(cr flux.ColReader, i, j int) values.Value
- type AccumulationMode
- type Administration
- type Aggregate
- type AggregateConfig
- type Allocator
- func (a *Allocator) AppendBools(slice []bool, vs ...bool) []bool
- func (a *Allocator) AppendFloats(slice []float64, vs ...float64) []float64
- func (a *Allocator) AppendInts(slice []int64, vs ...int64) []int64
- func (a *Allocator) AppendStrings(slice []string, vs ...string) []string
- func (a *Allocator) AppendTimes(slice []Time, vs ...Time) []Time
- func (a *Allocator) AppendUInts(slice []uint64, vs ...uint64) []uint64
- func (a *Allocator) Bools(l, c int) []bool
- func (a *Allocator) Floats(l, c int) []float64
- func (a *Allocator) Free(n, size int)
- func (a *Allocator) GrowBools(slice []bool, n int) []bool
- func (a *Allocator) GrowFloats(slice []float64, n int) []float64
- func (a *Allocator) GrowInts(slice []int64, n int) []int64
- func (a *Allocator) GrowStrings(slice []string, n int) []string
- func (a *Allocator) GrowTimes(slice []Time, n int) []Time
- func (a *Allocator) GrowUInts(slice []uint64, n int) []uint64
- func (a *Allocator) Ints(l, c int) []int64
- func (a *Allocator) Strings(l, c int) []string
- func (a *Allocator) Times(l, c int) []Time
- func (a *Allocator) UInts(l, c int) []uint64
- type BoolValueFunc
- type Bounds
- type ColListTable
- func (t *ColListTable) Bools(j int) *array.Boolean
- func (t *ColListTable) Cols() []flux.ColMeta
- func (t *ColListTable) Do(f func(flux.ColReader) error) error
- func (t *ColListTable) Done()
- func (t *ColListTable) Empty() bool
- func (t *ColListTable) Floats(j int) *array.Float64
- func (t *ColListTable) GetRow(row int) values.Object
- func (t *ColListTable) Ints(j int) *array.Int64
- func (t *ColListTable) Key() flux.GroupKey
- func (t *ColListTable) Len() int
- func (t *ColListTable) NRows() int
- func (t *ColListTable) RefCount(n int)
- func (t *ColListTable) Release()
- func (t *ColListTable) Retain()
- func (t *ColListTable) Strings(j int) *array.Binary
- func (t *ColListTable) Times(j int) *array.Int64
- func (t *ColListTable) UInts(j int) *array.Uint64
- type ColListTableBuilder
- func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
- func (b *ColListTableBuilder) AppendBool(j int, value bool) error
- func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error
- func (b *ColListTableBuilder) AppendFloat(j int, value float64) error
- func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float64) error
- func (b *ColListTableBuilder) AppendInt(j int, value int64) error
- func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int64) error
- func (b *ColListTableBuilder) AppendNil(j int) error
- func (b *ColListTableBuilder) AppendString(j int, value string) error
- func (b *ColListTableBuilder) AppendStrings(j int, vs *array.Binary) error
- func (b *ColListTableBuilder) AppendTime(j int, value Time) error
- func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int64) error
- func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error
- func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint64) error
- func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error
- func (b *ColListTableBuilder) Bools(j int) []bool
- func (b *ColListTableBuilder) ClearData()
- func (b *ColListTableBuilder) Cols() []flux.ColMeta
- func (b *ColListTableBuilder) Floats(j int) []float64
- func (b *ColListTableBuilder) GetRow(row int) values.Object
- func (b *ColListTableBuilder) GrowBools(j, n int) error
- func (b *ColListTableBuilder) GrowFloats(j, n int) error
- func (b *ColListTableBuilder) GrowInts(j, n int) error
- func (b *ColListTableBuilder) GrowStrings(j, n int) error
- func (b *ColListTableBuilder) GrowTimes(j, n int) error
- func (b *ColListTableBuilder) GrowUInts(j, n int) error
- func (b *ColListTableBuilder) Ints(j int) []int64
- func (b *ColListTableBuilder) Key() flux.GroupKey
- func (b *ColListTableBuilder) Len() int
- func (b *ColListTableBuilder) LevelColumns() error
- func (b *ColListTableBuilder) NCols() int
- func (b *ColListTableBuilder) NRows() int
- func (b *ColListTableBuilder) Release()
- func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error
- func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error
- func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error
- func (b *ColListTableBuilder) SetNil(i, j int) error
- func (b *ColListTableBuilder) SetString(i int, j int, value string) error
- func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error
- func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error
- func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error
- func (b *ColListTableBuilder) SliceColumns(start, stop int) error
- func (b *ColListTableBuilder) Sort(cols []string, desc bool)
- func (b *ColListTableBuilder) Strings(j int) []string
- func (b *ColListTableBuilder) Table() (flux.Table, error)
- func (b *ColListTableBuilder) Times(j int) []values.Time
- func (b *ColListTableBuilder) UInts(j int) []uint64
- type CreateSource
- type CreateTransformation
- type DataCache
- type Dataset
- func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, agg Aggregate, config AggregateConfig, ...) (*aggregateTransformation, Dataset)
- func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, ...) (*indexSelectorTransformation, Dataset)
- func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, ...) (*rowSelectorTransformation, Dataset)
- type DatasetID
- type Dispatcher
- type DoBoolAgg
- type DoBoolIndexSelector
- type DoBoolRowSelector
- type DoFloatAgg
- type DoFloatIndexSelector
- type DoFloatRowSelector
- type DoIntAgg
- type DoIntIndexSelector
- type DoIntRowSelector
- type DoStringAgg
- type DoStringIndexSelector
- type DoStringRowSelector
- type DoTimeIndexSelector
- type DoTimeRowSelector
- type DoUIntAgg
- type DoUIntIndexSelector
- type DoUIntRowSelector
- type Duration
- type Executor
- type FinishMsg
- type FloatValueFunc
- type FormatOptions
- type Formatter
- type GroupKeyBuilder
- func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder
- func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)
- func (gkb *GroupKeyBuilder) Grow(n int)
- func (gkb *GroupKeyBuilder) Len() int
- func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder
- type GroupLookup
- func (l *GroupLookup) Clear()
- func (l *GroupLookup) Delete(key flux.GroupKey) (v interface{}, found bool)
- func (l *GroupLookup) Lookup(key flux.GroupKey) (interface{}, bool)
- func (l *GroupLookup) LookupOrCreate(key flux.GroupKey, fn func() interface{}) interface{}
- func (l *GroupLookup) Range(f func(key flux.GroupKey, value interface{}))
- func (l *GroupLookup) Set(key flux.GroupKey, value interface{})
- type IndexSelector
- type IntValueFunc
- type Message
- type MessageQueue
- type MessageType
- type MetadataNode
- type Node
- type PassthroughDataset
- func (d *PassthroughDataset) AddTransformation(t Transformation)
- func (d *PassthroughDataset) Finish(err error)
- func (d *PassthroughDataset) Process(tbl flux.Table) error
- func (d *PassthroughDataset) RetractTable(key flux.GroupKey) error
- func (d *PassthroughDataset) SetTriggerSpec(t plan.TriggerSpec)
- func (d *PassthroughDataset) UpdateProcessingTime(t Time) error
- func (d *PassthroughDataset) UpdateWatermark(mark Time) error
- type ProcessMsg
- type RetractTableMsg
- type Row
- type RowMapFn
- type RowPredicateFn
- func (f *RowPredicateFn) Eval(ctx context.Context, record values.Object) (bool, error)
- func (f *RowPredicateFn) EvalRow(ctx context.Context, row int, cr flux.ColReader) (bool, error)
- func (f *RowPredicateFn) InferredInputType() semantic.MonoType
- func (f *RowPredicateFn) InputType() semantic.MonoType
- func (f *RowPredicateFn) Prepare(cols []flux.ColMeta) error
- type RowReader
- type RowReduceFn
- type RowSelector
- type Rower
- type ScheduleFunc
- type SelectorConfig
- type Source
- type SourceDecoder
- type SourceIterator
- type StreamContext
- type StringValueFunc
- type TableBuilder
- type TableBuilderCache
- type TableContext
- type TablePredicateFn
- type Time
- type Transformation
- type TransformationSet
- func (ts TransformationSet) Finish(id DatasetID, err error)
- func (ts TransformationSet) Process(id DatasetID, tbl flux.Table) error
- func (ts TransformationSet) RetractTable(id DatasetID, key flux.GroupKey) error
- func (ts TransformationSet) UpdateProcessingTime(id DatasetID, time Time) error
- func (ts TransformationSet) UpdateWatermark(id DatasetID, time Time) error
- type Transport
- type Trigger
- type TriggerContext
- type UIntValueFunc
- type UpdateProcessingTimeMsg
- type UpdateWatermarkMsg
- type ValueFunc
- type Window
Constants ¶
const ( MaxTime = math.MaxInt64 MinTime = math.MinInt64 )
const ( DefaultStartColLabel = "_start" DefaultStopColLabel = "_stop" DefaultTimeColLabel = "_time" DefaultValueColLabel = "_value" )
Variables ¶
var AllTime = Bounds{ Start: MinTime, Stop: MaxTime, }
var DefaultAggregateConfig = AggregateConfig{ Columns: []string{DefaultValueColLabel}, }
var DefaultSelectorConfig = SelectorConfig{ Column: DefaultValueColLabel, }
Functions ¶
func AddNewTableCols ¶
AddNewCols adds the columns of b onto builder that did not already exist. Returns the mapping of builder cols to table cols.
func AddTableCols ¶
func AddTableCols(t flux.Table, builder TableBuilder) error
AddTableCols adds the columns of b onto builder.
func AddTableKeyCols ¶
func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
func AppendCol ¶
func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
AppendCol append a column from cr onto builder The indexes bj and cj are builder and col reader indexes respectively.
func AppendCols ¶
func AppendCols(cr flux.ColReader, builder TableBuilder) error
AppendCols appends all columns from cr onto builder. This function assumes that builder and cr have the same column schema.
func AppendKeyValues ¶
func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
AppendKeyValues appends the key values to the right columns in the builder. The builder is expected to contain the key columns.
func AppendKeyValuesN ¶
func AppendKeyValuesN(key flux.GroupKey, builder TableBuilder, n int) error
AppendKeyValuesN runs AppendKeyValues `n` times. This is different from ```
for i := 0; i < n; i++ { AppendKeyValues(key, builder) }
``` Because it saves the overhead of calculating the column mapping `n` times.
func AppendMappedCols ¶
func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedCols appends all columns from cr onto builder. The colMap is a map of builder column index to cr column index.
func AppendMappedRecordExplicit ¶
AppendMappedRecordExplicit appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, no value is appended.
func AppendMappedRecordWithNulls ¶
func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedRecordWithNulls appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, the column is created with null values
func AppendMappedTable ¶
func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
AppendMappedTable appends data from table t onto builder. The colMap is a map of builder column index to table column index.
func AppendRecord ¶
func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
AppendRecord appends the record from cr onto builder assuming matching columns.
func AppendTable ¶
func AppendTable(t flux.Table, builder TableBuilder) error
AppendTable appends data from table t onto builder. This function assumes builder and t have the same column schema.
func BuilderColsMatchReader ¶
func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool
BuilderColsMatchReader returns true if builder and cr have identical column sets (order dependent)
func ColMap ¶
func ColMap(colMap []int, builder TableBuilder, cols []flux.ColMeta) []int
ColMap writes a mapping of builder index to cols index into colMap. When colMap does not have enough capacity a new colMap is allocated. The colMap is always returned
func ContainsStr ¶
func CopyTable ¶
func CopyTable(t flux.Table) (flux.BufferedTable, error)
CopyTable returns a buffered copy of the table and consumes the input table. If the input table is already buffered, it "consumes" the input and returns the same table.
The buffered table can then be copied additional times using the BufferedTable.Copy method.
This method should be used sparingly if at all. It will retain each of the buffers of data coming out of a table so the entire table is materialized in memory. For large datasets, this could potentially cause a problem. The allocator is meant to catch when this happens and prevent it.
func FormatResult ¶
FormatResult prints the result to w.
func GroupKeyForRowOn ¶
func NewAggregateTransformation ¶
func NewAggregateTransformation(d Dataset, c TableBuilderCache, agg Aggregate, config AggregateConfig) *aggregateTransformation
func NewDataset ¶
func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
func NewEmptyTable ¶
NewEmptyTable constructs a new empty table with the given group key and columns.
func NewIndexSelectorTransformation ¶
func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
func NewRowSelectorTransformation ¶
func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
func NewTableBuilderCache ¶
func PanicUnknownType ¶
func RegisterSource ¶
func RegisterSource(k plan.ProcedureKind, c CreateSource)
func RegisterTransformation ¶
func RegisterTransformation(k plan.ProcedureKind, c CreateTransformation)
RegisterTransformation adds a new registration mapping of procedure kind to transformation.
func ReplaceTransformation ¶
func ReplaceTransformation(k plan.ProcedureKind, c CreateTransformation)
ReplaceTransformation changes an existing transformation registration.
func TablesEqual ¶
TablesEqual takes two flux tables and compares them. Returns false if the tables have different keys, different columns, or if the data in any column does not match. Returns true otherwise. This function will consume the ColumnReader so if you are calling this from the a Process method, you may need to copy the table if you need to iterate over the data for other calculations.
Types ¶
type AccumulationMode ¶
type AccumulationMode int
const ( // DiscardingMode will discard the data associated with a group key // after it has been processed. DiscardingMode AccumulationMode = iota // AccumulatingMode will retain the data associated with a group key // after it has been processed. If it has already sent a table with // that group key to a downstream transformation, it will signal // to that transformation that the previous table should be retracted. // // This is not implemented at the moment. AccumulatingMode )
type Administration ¶
type Aggregate ¶
type Aggregate interface { NewBoolAgg() DoBoolAgg NewIntAgg() DoIntAgg NewUIntAgg() DoUIntAgg NewFloatAgg() DoFloatAgg NewStringAgg() DoStringAgg }
type AggregateConfig ¶
type AggregateConfig struct { plan.DefaultCost Columns []string `json:"columns"` }
func (AggregateConfig) Copy ¶
func (c AggregateConfig) Copy() AggregateConfig
type Allocator ¶
Allocator tracks the amount of memory being consumed by a query. The allocator provides methods similar to make and append, to allocate large slices of data. The allocator also provides a Free method to account for when memory will be freed.
func (*Allocator) AppendBools ¶
AppendBools appends bools to a slice
func (*Allocator) AppendFloats ¶
AppendFloats appends float64s to a slice
func (*Allocator) AppendInts ¶
AppendInts appends int64s to a slice
func (*Allocator) AppendStrings ¶
AppendStrings appends strings to a slice. Only the string headers are accounted for.
func (*Allocator) AppendTimes ¶
AppendTimes appends Times to a slice
func (*Allocator) AppendUInts ¶
AppendUInts appends uint64s to a slice
func (*Allocator) Strings ¶
Strings makes a slice of string values. Only the string headers are accounted for.
type BoolValueFunc ¶
type BoolValueFunc interface {
ValueBool() bool
}
type Bounds ¶
type ColListTable ¶
type ColListTable struct {
// contains filtered or unexported fields
}
ColListTable implements Table using list of columns. All data for the table is stored in RAM. As a result At* methods are provided directly on the table for easy access.
func (*ColListTable) Cols ¶
func (t *ColListTable) Cols() []flux.ColMeta
func (*ColListTable) Done ¶
func (t *ColListTable) Done()
func (*ColListTable) Empty ¶
func (t *ColListTable) Empty() bool
func (*ColListTable) GetRow ¶
func (t *ColListTable) GetRow(row int) values.Object
GetRow takes a row index and returns the record located at that index in the cache
func (*ColListTable) Key ¶
func (t *ColListTable) Key() flux.GroupKey
func (*ColListTable) Len ¶
func (t *ColListTable) Len() int
func (*ColListTable) NRows ¶
func (t *ColListTable) NRows() int
func (*ColListTable) RefCount ¶
func (t *ColListTable) RefCount(n int)
func (*ColListTable) Release ¶
func (t *ColListTable) Release()
func (*ColListTable) Retain ¶
func (t *ColListTable) Retain()
type ColListTableBuilder ¶
type ColListTableBuilder struct {
// contains filtered or unexported fields
}
func NewColListTableBuilder ¶
func NewColListTableBuilder(key flux.GroupKey, a *memory.Allocator) *ColListTableBuilder
func (*ColListTableBuilder) AddCol ¶
func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
func (*ColListTableBuilder) AppendBool ¶
func (b *ColListTableBuilder) AppendBool(j int, value bool) error
func (*ColListTableBuilder) AppendBools ¶
func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error
func (*ColListTableBuilder) AppendFloat ¶
func (b *ColListTableBuilder) AppendFloat(j int, value float64) error
func (*ColListTableBuilder) AppendFloats ¶
func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float64) error
func (*ColListTableBuilder) AppendInt ¶
func (b *ColListTableBuilder) AppendInt(j int, value int64) error
func (*ColListTableBuilder) AppendInts ¶
func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int64) error
func (*ColListTableBuilder) AppendNil ¶
func (b *ColListTableBuilder) AppendNil(j int) error
func (*ColListTableBuilder) AppendString ¶
func (b *ColListTableBuilder) AppendString(j int, value string) error
func (*ColListTableBuilder) AppendStrings ¶
func (b *ColListTableBuilder) AppendStrings(j int, vs *array.Binary) error
func (*ColListTableBuilder) AppendTime ¶
func (b *ColListTableBuilder) AppendTime(j int, value Time) error
func (*ColListTableBuilder) AppendTimes ¶
func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int64) error
func (*ColListTableBuilder) AppendUInt ¶
func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error
func (*ColListTableBuilder) AppendUInts ¶
func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint64) error
func (*ColListTableBuilder) AppendValue ¶
func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error
func (*ColListTableBuilder) Bools ¶
func (b *ColListTableBuilder) Bools(j int) []bool
func (*ColListTableBuilder) ClearData ¶
func (b *ColListTableBuilder) ClearData()
func (*ColListTableBuilder) Cols ¶
func (b *ColListTableBuilder) Cols() []flux.ColMeta
func (*ColListTableBuilder) Floats ¶
func (b *ColListTableBuilder) Floats(j int) []float64
func (*ColListTableBuilder) GetRow ¶
func (b *ColListTableBuilder) GetRow(row int) values.Object
GetRow takes a row index and returns the record located at that index in the cache
func (*ColListTableBuilder) GrowBools ¶
func (b *ColListTableBuilder) GrowBools(j, n int) error
func (*ColListTableBuilder) GrowFloats ¶
func (b *ColListTableBuilder) GrowFloats(j, n int) error
func (*ColListTableBuilder) GrowInts ¶
func (b *ColListTableBuilder) GrowInts(j, n int) error
func (*ColListTableBuilder) GrowStrings ¶
func (b *ColListTableBuilder) GrowStrings(j, n int) error
func (*ColListTableBuilder) GrowTimes ¶
func (b *ColListTableBuilder) GrowTimes(j, n int) error
func (*ColListTableBuilder) GrowUInts ¶
func (b *ColListTableBuilder) GrowUInts(j, n int) error
func (*ColListTableBuilder) Ints ¶
func (b *ColListTableBuilder) Ints(j int) []int64
func (*ColListTableBuilder) Key ¶
func (b *ColListTableBuilder) Key() flux.GroupKey
func (*ColListTableBuilder) Len ¶
func (b *ColListTableBuilder) Len() int
func (*ColListTableBuilder) LevelColumns ¶
func (b *ColListTableBuilder) LevelColumns() error
func (*ColListTableBuilder) NCols ¶
func (b *ColListTableBuilder) NCols() int
func (*ColListTableBuilder) NRows ¶
func (b *ColListTableBuilder) NRows() int
func (*ColListTableBuilder) Release ¶
func (b *ColListTableBuilder) Release()
func (*ColListTableBuilder) SetBool ¶
func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error
func (*ColListTableBuilder) SetFloat ¶
func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error
func (*ColListTableBuilder) SetInt ¶
func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error
func (*ColListTableBuilder) SetNil ¶
func (b *ColListTableBuilder) SetNil(i, j int) error
func (*ColListTableBuilder) SetString ¶
func (b *ColListTableBuilder) SetString(i int, j int, value string) error
func (*ColListTableBuilder) SetTime ¶
func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error
func (*ColListTableBuilder) SetUInt ¶
func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error
func (*ColListTableBuilder) SetValue ¶
func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error
func (*ColListTableBuilder) SliceColumns ¶
func (b *ColListTableBuilder) SliceColumns(start, stop int) error
SliceColumns iterates over each column of b and re-slices them to the range [start:stop].
func (*ColListTableBuilder) Sort ¶
func (b *ColListTableBuilder) Sort(cols []string, desc bool)
func (*ColListTableBuilder) Strings ¶
func (b *ColListTableBuilder) Strings(j int) []string
func (*ColListTableBuilder) UInts ¶
func (b *ColListTableBuilder) UInts(j int) []uint64
type CreateSource ¶
type CreateSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)
type CreateTransformation ¶
type CreateTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)
type DataCache ¶
type DataCache interface { Table(flux.GroupKey) (flux.Table, error) ForEach(func(flux.GroupKey)) ForEachWithContext(func(flux.GroupKey, Trigger, TableContext)) DiscardTable(flux.GroupKey) ExpireTable(flux.GroupKey) SetTriggerSpec(t plan.TriggerSpec) }
DataCache holds all working data for a transformation.
type Dataset ¶
type Dataset interface { Node RetractTable(key flux.GroupKey) error UpdateProcessingTime(t Time) error UpdateWatermark(mark Time) error Finish(error) SetTriggerSpec(t plan.TriggerSpec) }
Dataset represents the set of data produced by a transformation.
func NewAggregateTransformationAndDataset ¶
func NewAggregateTransformationAndDataset(id DatasetID, mode AccumulationMode, agg Aggregate, config AggregateConfig, a *memory.Allocator) (*aggregateTransformation, Dataset)
func NewIndexSelectorTransformationAndDataset ¶
func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, config SelectorConfig, a *memory.Allocator) (*indexSelectorTransformation, Dataset)
func NewRowSelectorTransformationAndDataset ¶
func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, config SelectorConfig, a *memory.Allocator) (*rowSelectorTransformation, Dataset)
type Dispatcher ¶
type Dispatcher interface { // Schedule fn to be executed Schedule(fn ScheduleFunc) }
Dispatcher schedules work for a query. Each transformation submits work to be done to the dispatcher. Then the dispatcher schedules to work based on the available resources.
type DoBoolIndexSelector ¶
type DoBoolRowSelector ¶
type DoFloatAgg ¶
type DoFloatIndexSelector ¶
type DoFloatRowSelector ¶
type DoIntIndexSelector ¶
type DoIntRowSelector ¶
type DoStringAgg ¶
type DoStringIndexSelector ¶
type DoStringRowSelector ¶
type DoTimeIndexSelector ¶
type DoTimeRowSelector ¶
type DoUIntIndexSelector ¶
type DoUIntRowSelector ¶
type Executor ¶
type Executor interface { // Execute will begin execution of the plan.Spec using the memory allocator. // This returns a mapping of names to the query results. // This will also return a channel for the Metadata from the query. The channel // may return zero or more values. The returned channel must not require itself to // be read so the executor must allocate enough space in the channel so if the channel // is unread that it will not block. Execute(ctx context.Context, p *plan.Spec, a *memory.Allocator) (map[string]flux.Result, <-chan flux.Metadata, error) }
func NewExecutor ¶
type FloatValueFunc ¶
type FloatValueFunc interface {
ValueFloat() float64
}
type FormatOptions ¶
type FormatOptions struct { // RepeatHeaderCount is the number of rows to print before printing the header again. // If zero then the headers are not repeated. RepeatHeaderCount int NullRepresentation string }
func DefaultFormatOptions ¶
func DefaultFormatOptions() *FormatOptions
type Formatter ¶
type Formatter struct {
// contains filtered or unexported fields
}
Formatter writes a table to a Writer.
func NewFormatter ¶
func NewFormatter(tbl flux.Table, opts *FormatOptions) *Formatter
NewFormatter creates a Formatter for a given table. If opts is nil, the DefaultFormatOptions are used.
type GroupKeyBuilder ¶
type GroupKeyBuilder struct {
// contains filtered or unexported fields
}
GroupKeyBuilder is used to construct a GroupKey by keeping a mutable copy in memory.
func NewGroupKeyBuilder ¶
func NewGroupKeyBuilder(key flux.GroupKey) *GroupKeyBuilder
NewGroupKeyBuilder creates a new GroupKeyBuilder from an existing GroupKey. If the GroupKey passed is nil, a blank GroupKeyBuilder is constructed.
func (*GroupKeyBuilder) AddKeyValue ¶
func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder
AddKeyValue will add a new group key to the existing builder.
func (*GroupKeyBuilder) Build ¶
func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)
Build will construct the GroupKey. If there is any problem with the GroupKey (such as one of the columns is not a valid type), the error will be returned here.
func (*GroupKeyBuilder) Grow ¶
func (gkb *GroupKeyBuilder) Grow(n int)
Grow will grow the internal capacity of the group key to the given number.
func (*GroupKeyBuilder) Len ¶
func (gkb *GroupKeyBuilder) Len() int
Len returns the current length of the group key.
func (*GroupKeyBuilder) SetKeyValue ¶
func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder
SetKeyValue will set an existing key/value to the given pair, or if key is not found, add a new group key to the existing builder.
type GroupLookup ¶
type GroupLookup struct {
// contains filtered or unexported fields
}
GroupLookup is a container that maps group keys to a value.
The GroupLookup container is optimized for appending values in order and iterating over them in the same order. The GroupLookup will always have a deterministic order for the Range call, but that order may be influenced by the order that inserts happen.
At the current moment, the GroupLookup maintains the groups in sorted order although future implementations may change that.
To optimize inserts, the lookup is kept in an array of arrays. The first layer keeps a group of sorted key groups and each of these groups maintains their own sorted list of keys. Each time a new key is added, it is appended to the end of one of the key lists. If a key needs to be added in the middle of a list, the list is split into two so that the key can be appended. The index of the last list to be used is maintained so that future inserts can skip past the first search for the key list and an insert can be done in constant time. Similarly, a lookup for a key that was just inserted will also be in constant time with the worst case time being O(n log n).
func (*GroupLookup) Clear ¶
func (l *GroupLookup) Clear()
Clear will clear the group lookup and reset it to contain nothing.
func (*GroupLookup) Delete ¶
func (l *GroupLookup) Delete(key flux.GroupKey) (v interface{}, found bool)
Delete will remove the key from this GroupLookup. It will return the same thing as a call to Lookup.
func (*GroupLookup) Lookup ¶
func (l *GroupLookup) Lookup(key flux.GroupKey) (interface{}, bool)
Lookup will retrieve the value associated with the given key if it exists.
func (*GroupLookup) LookupOrCreate ¶
func (l *GroupLookup) LookupOrCreate(key flux.GroupKey, fn func() interface{}) interface{}
LookupOrCreate will retrieve the value associated with the given key or, if it does not exist, will invoke the function to create one and set it in the group lookup.
func (*GroupLookup) Range ¶
func (l *GroupLookup) Range(f func(key flux.GroupKey, value interface{}))
Range will iterate over all groups keys in a stable ordering. Range must not be called within another call to Range. It is safe to call Set/Delete while ranging.
func (*GroupLookup) Set ¶
func (l *GroupLookup) Set(key flux.GroupKey, value interface{})
Set will set the value for the given key. It will overwrite an existing value.
type IndexSelector ¶
type IndexSelector interface { NewTimeSelector() DoTimeIndexSelector NewBoolSelector() DoBoolIndexSelector NewIntSelector() DoIntIndexSelector NewUIntSelector() DoUIntIndexSelector NewFloatSelector() DoFloatIndexSelector NewStringSelector() DoStringIndexSelector }
type IntValueFunc ¶
type IntValueFunc interface {
ValueInt() int64
}
type Message ¶
type Message interface { Type() MessageType SrcDatasetID() DatasetID }
type MessageQueue ¶
MessageQueue provides a concurrency safe queue for messages. The queue must have a single consumer calling Pop.
type MessageType ¶
type MessageType int
const ( RetractTableType MessageType = iota ProcessType UpdateWatermarkType UpdateProcessingTimeType FinishType )
type MetadataNode ¶
MetadataNode is a node that has additional metadata that should be added to the result after it is processed.
type Node ¶
type Node interface {
AddTransformation(t Transformation)
}
type PassthroughDataset ¶
type PassthroughDataset struct {
// contains filtered or unexported fields
}
PassthroughDataset is a Dataset that will passthrough the processed data to the next Transformation.
func NewPassthroughDataset ¶
func NewPassthroughDataset(id DatasetID) *PassthroughDataset
NewPassthroughDataset constructs a new PassthroughDataset.
func (*PassthroughDataset) AddTransformation ¶
func (d *PassthroughDataset) AddTransformation(t Transformation)
func (*PassthroughDataset) Finish ¶
func (d *PassthroughDataset) Finish(err error)
func (*PassthroughDataset) RetractTable ¶
func (d *PassthroughDataset) RetractTable(key flux.GroupKey) error
func (*PassthroughDataset) SetTriggerSpec ¶
func (d *PassthroughDataset) SetTriggerSpec(t plan.TriggerSpec)
func (*PassthroughDataset) UpdateProcessingTime ¶
func (d *PassthroughDataset) UpdateProcessingTime(t Time) error
func (*PassthroughDataset) UpdateWatermark ¶
func (d *PassthroughDataset) UpdateWatermark(mark Time) error
type ProcessMsg ¶
type RetractTableMsg ¶
type RowMapFn ¶
type RowMapFn struct {
// contains filtered or unexported fields
}
func NewRowMapFn ¶
type RowPredicateFn ¶
type RowPredicateFn struct {
// contains filtered or unexported fields
}
func NewRowPredicateFn ¶
func NewRowPredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) (*RowPredicateFn, error)
func (*RowPredicateFn) InferredInputType ¶ added in v0.99.11
func (f *RowPredicateFn) InferredInputType() semantic.MonoType
InferredInputType will return the inferred input type. This type may contain type variables and will only contain the properties that could be inferred from type inference.
func (*RowPredicateFn) InputType ¶
func (f *RowPredicateFn) InputType() semantic.MonoType
InputType will return the prepared input type. This will be a fully realized type that was created after preparing the function with Prepare.
type RowReduceFn ¶
type RowReduceFn struct {
// contains filtered or unexported fields
}
func NewRowReduceFn ¶
func NewRowReduceFn(fn *semantic.FunctionExpression, scope compiler.Scope) (*RowReduceFn, error)
func (*RowReduceFn) Type ¶
func (f *RowReduceFn) Type() semantic.MonoType
type RowSelector ¶
type RowSelector interface { NewTimeSelector() DoTimeRowSelector NewBoolSelector() DoBoolRowSelector NewIntSelector() DoIntRowSelector NewUIntSelector() DoUIntRowSelector NewFloatSelector() DoFloatRowSelector NewStringSelector() DoStringRowSelector }
type ScheduleFunc ¶
ScheduleFunc is a function that represents work to do. The throughput is the maximum number of messages to process for this scheduling.
type SelectorConfig ¶
type SelectorConfig struct { plan.DefaultCost Column string `json:"column"` }
type Source ¶
func CreateSourceFromDecoder ¶
func CreateSourceFromDecoder(decoder SourceDecoder, dsid DatasetID, a Administration) (Source, error)
CreateSourceFromDecoder takes an implementation of a SourceDecoder, as well as a dataset ID and Administration type and creates an execute.Source.
func CreateSourceFromIterator ¶
func CreateSourceFromIterator(iterator SourceIterator, dsid DatasetID) (Source, error)
CreateSourceFromIterator takes an implementation of a SourceIterator as well as a dataset ID and creates an execute.Source.
type SourceDecoder ¶
type SourceDecoder interface { Connect(ctx context.Context) error Fetch(ctx context.Context) (bool, error) Decode(ctx context.Context) (flux.Table, error) Close() error }
SourceDecoder is an interface that generalizes the process of retrieving data from an unspecified data source.
Connect implements the logic needed to connect directly to the data source.
Fetch implements a single fetch of data from the source (may be called multiple times). Should return false when there is no more data to retrieve.
Decode implements the process of marshaling the data returned by the source into a flux.Table type.
In executing the retrieval process, Connect is called once at the onset, and subsequent calls of Fetch() and Decode() are called iteratively until the data source is fully consumed.
type SourceIterator ¶
type SourceIterator interface { // Do will invoke the Source and cause each materialized flux.Table // to the given function. Do(ctx context.Context, f func(flux.Table) error) error }
SourceIterator is an interface for iterating over flux.Table values in a source. It provides a common interface for creating an execute.Source in an iterative way.
type StreamContext ¶
type StreamContext interface {
Bounds() *Bounds
}
StreamContext represents necessary context for a single stream of query data.
type StringValueFunc ¶
type StringValueFunc interface {
ValueString() string
}
type TableBuilder ¶
type TableBuilder interface { Key() flux.GroupKey NRows() int NCols() int Cols() []flux.ColMeta // AddCol increases the size of the table by one column. // The index of the column is returned. AddCol(flux.ColMeta) (int, error) // Set sets the value at the specified coordinates // The rows and columns must exist before calling set, otherwise Set panics. SetValue(i, j int, value values.Value) error SetNil(i, j int) error // Append will add a single value to the end of a column. Will set the number of // rows in the table to the size of the new column. It's the caller's job to make sure // that the expected number of rows in each column is equal. AppendBool(j int, value bool) error AppendInt(j int, value int64) error AppendUInt(j int, value uint64) error AppendFloat(j int, value float64) error AppendString(j int, value string) error AppendTime(j int, value Time) error AppendValue(j int, value values.Value) error AppendNil(j int) error // AppendBools and similar functions will append multiple values to column j. As above, // it will set the numer of rows in the table to the size of the new column. It's the // caller's job to make sure that the expected number of rows in each column is equal. AppendBools(j int, vs *array.Boolean) error AppendInts(j int, vs *array.Int64) error AppendUInts(j int, vs *array.Uint64) error AppendFloats(j int, vs *array.Float64) error AppendStrings(j int, vs *array.Binary) error AppendTimes(j int, vs *array.Int64) error // GrowBools and similar functions will extend column j by n zero-values for the respective type. // If the column has enough capacity, no reallocation is necessary. If the capacity is insufficient, // a new slice is allocated with 1.5*newCapacity. As with the Append functions, it is the // caller's job to make sure that the expected number of rows in each column is equal. GrowBools(j, n int) error GrowInts(j, n int) error GrowUInts(j, n int) error GrowFloats(j, n int) error GrowStrings(j, n int) error GrowTimes(j, n int) error // LevelColumns will check for columns that are too short and Grow them // so that each column is of uniform size. LevelColumns() error // Sort the rows of the by the values of the columns in the order listed. Sort(cols []string, desc bool) // ClearData removes all rows, while preserving the column meta data. ClearData() // Release releases any extraneous memory that has been retained. Release() // Table returns the table that has been built. // Further modifications of the builder will not effect the returned table. Table() (flux.Table, error) }
TableBuilder builds tables that can be used multiple times
type TableBuilderCache ¶
type TableBuilderCache interface { // TableBuilder returns an existing or new TableBuilder for the given meta data. // The boolean return value indicates if TableBuilder is new. TableBuilder(key flux.GroupKey) (TableBuilder, bool) ForEachBuilder(f func(flux.GroupKey, TableBuilder)) }
type TableContext ¶
type TablePredicateFn ¶
type TablePredicateFn struct {
// contains filtered or unexported fields
}
func NewTablePredicateFn ¶
func NewTablePredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) (*TablePredicateFn, error)
type Transformation ¶
type Transformation interface { RetractTable(id DatasetID, key flux.GroupKey) error // Process takes in one flux Table, performs data processing on it and // writes that table to a DataCache Process(id DatasetID, tbl flux.Table) error UpdateWatermark(id DatasetID, t Time) error UpdateProcessingTime(id DatasetID, t Time) error // Finish indicates that the Transformation is done processing. It is // the last method called on the Transformation Finish(id DatasetID, err error) }
Transformation represents functions that stream a set of tables, performs data processing on them and produces an output stream of tables
type TransformationSet ¶
type TransformationSet []Transformation
TransformationSet is a group of transformations.
func (TransformationSet) Finish ¶
func (ts TransformationSet) Finish(id DatasetID, err error)
func (TransformationSet) Process ¶
func (ts TransformationSet) Process(id DatasetID, tbl flux.Table) error
func (TransformationSet) RetractTable ¶
func (ts TransformationSet) RetractTable(id DatasetID, key flux.GroupKey) error
func (TransformationSet) UpdateProcessingTime ¶
func (ts TransformationSet) UpdateProcessingTime(id DatasetID, time Time) error
func (TransformationSet) UpdateWatermark ¶
func (ts TransformationSet) UpdateWatermark(id DatasetID, time Time) error
type Transport ¶
type Transport interface { Transformation // Finished reports when the Transport has completed and there is no more work to do. Finished() <-chan struct{} }
type Trigger ¶
type Trigger interface { Triggered(TriggerContext) bool Finished() bool Reset() }
func NewTriggerFromSpec ¶
func NewTriggerFromSpec(spec plan.TriggerSpec) Trigger
type TriggerContext ¶
type TriggerContext struct { Table TableContext Watermark Time CurrentProcessingTime Time }
type UIntValueFunc ¶
type UIntValueFunc interface {
ValueUInt() uint64
}
type UpdateProcessingTimeMsg ¶
type UpdateWatermarkMsg ¶
type Window ¶
func NewWindow ¶
NewWindow creates a window with the given parameters, and normalizes the offset to a small positive duration. It also validates that the durations are valid when used within a window.
func (Window) GetEarliestBounds ¶
GetEarliestBounds returns the bounds for the earliest window bounds that contains the given time t. For underlapping windows that do not contain time t, the window directly after time t will be returned.
func (Window) GetOverlappingBounds ¶
GetOverlappingBounds returns a slice of bounds for each window that overlaps the input bounds b.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package executetest contains utilities for testing the query execution phase.
|
Package executetest contains utilities for testing the query execution phase. |