Documentation ¶
Overview ¶
Package execute contains the implementation of the execution phase in the query engine.
Index ¶
- Constants
- Variables
- func AddNewTableCols(t flux.Table, builder TableBuilder, colMap []int) ([]int, error)
- func AddTableCols(t flux.Table, builder TableBuilder) error
- func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
- func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
- func AppendCols(cr flux.ColReader, builder TableBuilder) error
- func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
- func AppendKeyValuesN(key flux.GroupKey, builder TableBuilder, n int) error
- func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordExplicit(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
- func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
- func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
- func AppendTable(t flux.Table, builder TableBuilder) error
- func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool
- func CheckColType(col flux.ColMeta, typ flux.ColType)
- func Close(err error, c Closer) error
- func ColIdx(label string, cols []flux.ColMeta) int
- func ColMap(colMap []int, builder TableBuilder, cols []flux.ColMeta) []int
- func ContainsStr(strs []string, str string) bool
- func ConvertFromKind(k semantic.Nature) flux.ColType
- func ConvertToKind(t flux.ColType) semantic.Nature
- func CopyTable(t flux.Table) (flux.BufferedTable, error)
- func FormatResult(w io.Writer, res flux.Result) error
- func GroupKeyForRowOn(i int, cr flux.ColReader, on map[string]bool) flux.GroupKey
- func HasCol(label string, cols []flux.ColMeta) bool
- func HaveExecutionDependencies(ctx context.Context) bool
- func NewAggregateParallelTransformation(id DatasetID, parents []DatasetID, t AggregateParallelTransformation, ...) (Transformation, Dataset, error)
- func NewAggregateTransformation(id DatasetID, t AggregateTransformation, mem memory.Allocator) (Transformation, Dataset, error)
- func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
- func NewEmptyTable(key flux.GroupKey, cols []flux.ColMeta) flux.Table
- func NewGroupKey(cols []flux.ColMeta, values []values.Value) flux.GroupKey
- func NewGroupTransformation(id DatasetID, t GroupTransformation, mem memory.Allocator) (Transformation, Dataset, error)
- func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
- func NewNarrowStateTransformation[T any](id DatasetID, t NarrowStateTransformation[T], mem memory.Allocator) (Transformation, Dataset, error)
- func NewNarrowTransformation(id DatasetID, t NarrowTransformation, mem memory.Allocator) (Transformation, Dataset, error)
- func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
- func NewSimpleAggregateTransformation(ctx context.Context, id DatasetID, agg SimpleAggregate, ...) (Transformation, Dataset, error)
- func NewTableBuilderCache(a memory.Allocator) *tableBuilderCache
- func OperationType(t interface{}) string
- func PanicUnknownType(typ flux.ColType)
- func RegisterProfilerFactories(cpfs ...CreateProfilerFunc)
- func RegisterSource(k plan.ProcedureKind, c CreateSource)
- func RegisterTransformation(k plan.ProcedureKind, c CreateTransformation)
- func ReplaceTransformation(k plan.ProcedureKind, c CreateTransformation)
- func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error)
- func ValueForRow(cr flux.ColReader, i, j int) values.Value
- type AccumulationMode
- type Administration
- type AggregateParallelTransformation
- type AggregateTransformation
- type Allocator
- func (a *Allocator) AppendBools(slice []bool, vs ...bool) []bool
- func (a *Allocator) AppendFloats(slice []float64, vs ...float64) []float64
- func (a *Allocator) AppendInts(slice []int64, vs ...int64) []int64
- func (a *Allocator) AppendStrings(slice []string, vs ...string) []string
- func (a *Allocator) AppendTimes(slice []Time, vs ...Time) []Time
- func (a *Allocator) AppendUInts(slice []uint64, vs ...uint64) []uint64
- func (a *Allocator) Bools(l, c int) []bool
- func (a *Allocator) Floats(l, c int) []float64
- func (a *Allocator) Free(n, size int)
- func (a *Allocator) GrowBools(slice []bool, n int) []bool
- func (a *Allocator) GrowFloats(slice []float64, n int) []float64
- func (a *Allocator) GrowInts(slice []int64, n int) []int64
- func (a *Allocator) GrowStrings(slice []string, n int) []string
- func (a *Allocator) GrowTimes(slice []Time, n int) []Time
- func (a *Allocator) GrowUInts(slice []uint64, n int) []uint64
- func (a *Allocator) Ints(l, c int) []int64
- func (a *Allocator) Strings(l, c int) []string
- func (a *Allocator) Times(l, c int) []Time
- func (a *Allocator) UInts(l, c int) []uint64
- type AsyncTransport
- type BoolValueFunc
- type Bounds
- type ChunkBuilder
- type Closer
- type ColListTable
- func (t *ColListTable) Bools(j int) *array.Boolean
- func (t *ColListTable) Cols() []flux.ColMeta
- func (t *ColListTable) Do(f func(flux.ColReader) error) error
- func (t *ColListTable) Done()
- func (t *ColListTable) Empty() bool
- func (t *ColListTable) Floats(j int) *array.Float
- func (t *ColListTable) Ints(j int) *array.Int
- func (t *ColListTable) Key() flux.GroupKey
- func (t *ColListTable) Len() int
- func (t *ColListTable) NRows() int
- func (t *ColListTable) RefCount(n int)
- func (t *ColListTable) Release()
- func (t *ColListTable) Retain()
- func (t *ColListTable) Strings(j int) *array.String
- func (t *ColListTable) Times(j int) *array.Int
- func (t *ColListTable) UInts(j int) *array.Uint
- type ColListTableBuilder
- func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
- func (b *ColListTableBuilder) AppendBool(j int, value bool) error
- func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error
- func (b *ColListTableBuilder) AppendFloat(j int, value float64) error
- func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float) error
- func (b *ColListTableBuilder) AppendInt(j int, value int64) error
- func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int) error
- func (b *ColListTableBuilder) AppendNil(j int) error
- func (b *ColListTableBuilder) AppendString(j int, value string) error
- func (b *ColListTableBuilder) AppendStrings(j int, vs *array.String) error
- func (b *ColListTableBuilder) AppendTime(j int, value Time) error
- func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int) error
- func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error
- func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint) error
- func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error
- func (b *ColListTableBuilder) Bools(j int) []bool
- func (b *ColListTableBuilder) ClearData()
- func (b *ColListTableBuilder) Cols() []flux.ColMeta
- func (b *ColListTableBuilder) Floats(j int) []float64
- func (b *ColListTableBuilder) GetRow(row int) values.Object
- func (b *ColListTableBuilder) GrowBools(j, n int) error
- func (b *ColListTableBuilder) GrowFloats(j, n int) error
- func (b *ColListTableBuilder) GrowInts(j, n int) error
- func (b *ColListTableBuilder) GrowStrings(j, n int) error
- func (b *ColListTableBuilder) GrowTimes(j, n int) error
- func (b *ColListTableBuilder) GrowUInts(j, n int) error
- func (b *ColListTableBuilder) Ints(j int) []int64
- func (b *ColListTableBuilder) Key() flux.GroupKey
- func (b *ColListTableBuilder) Len() int
- func (b *ColListTableBuilder) LevelColumns() error
- func (b *ColListTableBuilder) NCols() int
- func (b *ColListTableBuilder) NRows() int
- func (b *ColListTableBuilder) Release()
- func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error
- func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error
- func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error
- func (b *ColListTableBuilder) SetNil(i, j int) error
- func (b *ColListTableBuilder) SetString(i int, j int, value string) error
- func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error
- func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error
- func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error
- func (b *ColListTableBuilder) SliceColumns(start, stop int) error
- func (b *ColListTableBuilder) Sort(cols []string, desc bool)
- func (b *ColListTableBuilder) Strings(j int) []string
- func (b *ColListTableBuilder) Table() (flux.Table, error)
- func (b *ColListTableBuilder) Times(j int) []values.Time
- func (b *ColListTableBuilder) UInts(j int) []uint64
- type CreateProfilerFunc
- type CreateSource
- type CreateTransformation
- type DataCache
- type Dataset
- type DatasetContext
- type DatasetID
- type Dispatcher
- type DoBoolAgg
- type DoBoolIndexSelector
- type DoBoolRowSelector
- type DoFloatAgg
- type DoFloatIndexSelector
- type DoFloatRowSelector
- type DoIntAgg
- type DoIntIndexSelector
- type DoIntRowSelector
- type DoStringAgg
- type DoStringIndexSelector
- type DoStringRowSelector
- type DoTimeIndexSelector
- type DoTimeRowSelector
- type DoUIntAgg
- type DoUIntIndexSelector
- type DoUIntRowSelector
- type Duration
- type ExecutionDependencies
- type ExecutionNode
- type ExecutionOptions
- type Executor
- type FinishMsg
- type FloatValueFunc
- type FlushKeyMsg
- type FormatOptions
- type Formatter
- type GroupKeyBuilder
- func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder
- func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)
- func (gkb *GroupKeyBuilder) Grow(n int)
- func (gkb *GroupKeyBuilder) Len() int
- func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder
- type GroupLookup
- type GroupTransformation
- type IndexSelector
- type IntValueFunc
- type Message
- type MessageQueue
- type MessageType
- type MetadataNode
- type NarrowStateTransformation
- type NarrowTransformation
- type Node
- type OperatorProfiler
- type ParallelOpts
- type PassthroughDataset
- func (d *PassthroughDataset) AddTransformation(t Transformation)
- func (d *PassthroughDataset) Finish(err error)
- func (d *PassthroughDataset) Process(tbl flux.Table) error
- func (d *PassthroughDataset) RetractTable(key flux.GroupKey) error
- func (d *PassthroughDataset) SetTriggerSpec(t plan.TriggerSpec)
- func (d *PassthroughDataset) UpdateProcessingTime(t Time) error
- func (d *PassthroughDataset) UpdateWatermark(mark Time) error
- type ProcessChunkMsg
- type ProcessMsg
- type Profiler
- type QueryProfiler
- type RandomAccessGroupLookup
- type RetractTableMsg
- type Row
- type RowJoinFn
- type RowJoinPreparedFn
- type RowMapFn
- type RowMapPreparedFn
- type RowPredicateFn
- type RowPredicatePreparedFn
- func (f *RowPredicatePreparedFn) Eval(ctx context.Context, record values.Object) (bool, error)
- func (f *RowPredicatePreparedFn) EvalRow(ctx context.Context, row int, cr flux.ColReader) (bool, error)
- func (f *RowPredicatePreparedFn) InferredInputType() semantic.MonoType
- func (f *RowPredicatePreparedFn) InputType() semantic.MonoType
- type RowReader
- type RowReduceFn
- type RowReducePreparedFn
- type RowSelector
- type Rower
- type ScheduleFunc
- type SelectorConfig
- type SimpleAggregate
- type SimpleAggregateConfig
- type Source
- type SourceDecoder
- type SourceIterator
- type StreamContext
- type StringValueFunc
- type TableBuilder
- type TableBuilderCache
- type TableContext
- type TablePredicateFn
- type TablePredicatePreparedFn
- type Time
- type Transformation
- type TransformationSet
- func (ts TransformationSet) Finish(id DatasetID, err error)
- func (ts TransformationSet) Process(id DatasetID, tbl flux.Table) error
- func (ts TransformationSet) RetractTable(id DatasetID, key flux.GroupKey) error
- func (ts TransformationSet) UpdateProcessingTime(id DatasetID, time Time) error
- func (ts TransformationSet) UpdateWatermark(id DatasetID, time Time) error
- type Transport
- type TransportDataset
- func (d *TransportDataset) AddTransformation(t Transformation)
- func (d *TransportDataset) Delete(key flux.GroupKey) (v interface{}, found bool)
- func (d *TransportDataset) Finish(err error)
- func (d *TransportDataset) FlushKey(key flux.GroupKey) error
- func (d *TransportDataset) Lookup(key flux.GroupKey) (interface{}, bool)
- func (d *TransportDataset) LookupOrCreate(key flux.GroupKey, fn func() interface{}) interface{}
- func (d *TransportDataset) Process(chunk table.Chunk) error
- func (d *TransportDataset) Range(f func(key flux.GroupKey, value interface{}) error) error
- func (d *TransportDataset) RetractTable(key flux.GroupKey) error
- func (d *TransportDataset) Set(key flux.GroupKey, value interface{})
- func (d *TransportDataset) SetTriggerSpec(t plan.TriggerSpec)
- func (d *TransportDataset) UpdateProcessingTime(t Time) error
- func (d *TransportDataset) UpdateWatermark(mark Time) error
- type Trigger
- type TriggerContext
- type UIntValueFunc
- type UpdateProcessingTimeMsg
- type UpdateWatermarkMsg
- type ValueFunc
- type VectorMapFn
- type VectorMapPreparedFn
- type Window
Constants ¶
const ( MaxTime = math.MaxInt64 MinTime = math.MinInt64 )
const ( DefaultStartColLabel = "_start" DefaultStopColLabel = "_stop" DefaultTimeColLabel = "_time" DefaultValueColLabel = "_value" )
const MaxFeatureFlagQueryConcurrencyIncrease = 256
Variables ¶
var AllProfilers = make(map[string]CreateProfilerFunc)
var AllTime = Bounds{ Start: MinTime, Stop: MaxTime, }
var DefaultSelectorConfig = SelectorConfig{ Column: DefaultValueColLabel, }
var DefaultSimpleAggregateConfig = SimpleAggregateConfig{ Columns: []string{DefaultValueColLabel}, }
Functions ¶
func AddNewTableCols ¶
AddNewCols adds the columns of b onto builder that did not already exist. Returns the mapping of builder cols to table cols.
func AddTableCols ¶
func AddTableCols(t flux.Table, builder TableBuilder) error
AddTableCols adds the columns of b onto builder.
func AddTableKeyCols ¶
func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error
func AppendCol ¶
func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error
AppendCol append a column from cr onto builder The indexes bj and cj are builder and col reader indexes respectively.
func AppendCols ¶
func AppendCols(cr flux.ColReader, builder TableBuilder) error
AppendCols appends all columns from cr onto builder. This function assumes that builder and cr have the same column schema.
func AppendKeyValues ¶
func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error
AppendKeyValues appends the key values to the right columns in the builder. The builder is expected to contain the key columns.
func AppendKeyValuesN ¶ added in v0.38.0
func AppendKeyValuesN(key flux.GroupKey, builder TableBuilder, n int) error
AppendKeyValuesN runs AppendKeyValues `n` times. This is different from ```
for i := 0; i < n; i++ { AppendKeyValues(key, builder) }
``` Because it saves the overhead of calculating the column mapping `n` times.
func AppendMappedCols ¶
func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedCols appends all columns from cr onto builder. The colMap is a map of builder column index to cr column index.
func AppendMappedRecordExplicit ¶
AppendMappedRecordExplicit appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, no value is appended.
func AppendMappedRecordWithNulls ¶ added in v0.14.0
func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error
AppendMappedRecordWithNulls appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, the column is created with null values
func AppendMappedTable ¶
func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error
AppendMappedTable appends data from table t onto builder. The colMap is a map of builder column index to table column index.
func AppendRecord ¶
func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error
AppendRecord appends the record from cr onto builder assuming matching columns.
func AppendTable ¶
func AppendTable(t flux.Table, builder TableBuilder) error
AppendTable appends data from table t onto builder. This function assumes builder and t have the same column schema.
func BuilderColsMatchReader ¶ added in v0.8.0
func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool
BuilderColsMatchReader returns true if builder and cr have identical column sets (order dependent)
func Close ¶ added in v0.148.0
Close is a convenience method that will take an error and a Closer. This will call the Close method on the Closer. If the error is nil, it will return any error from the Close method. If the error was not nil, it will return the error.
func ColMap ¶
func ColMap(colMap []int, builder TableBuilder, cols []flux.ColMeta) []int
ColMap writes a mapping of builder index to cols index into colMap. When colMap does not have enough capacity a new colMap is allocated. The colMap is always returned
func ContainsStr ¶
func CopyTable ¶
func CopyTable(t flux.Table) (flux.BufferedTable, error)
CopyTable returns a buffered copy of the table and consumes the input table. If the input table is already buffered, it "consumes" the input and returns the same table.
The buffered table can then be copied additional times using the BufferedTable.Copy method.
This method should be used sparingly if at all. It will retain each of the buffers of data coming out of a table so the entire table is materialized in memory. For large datasets, this could potentially cause a problem. The allocator is meant to catch when this happens and prevent it.
func FormatResult ¶ added in v0.20.0
FormatResult prints the result to w.
func GroupKeyForRowOn ¶
func HaveExecutionDependencies ¶ added in v0.91.0
func NewAggregateParallelTransformation ¶ added in v0.171.0
func NewAggregateParallelTransformation(id DatasetID, parents []DatasetID, t AggregateParallelTransformation, mem memory.Allocator) (Transformation, Dataset, error)
NewAggregateParallelTransformation constructs a Transformation and Dataset using the AggregateParallelTransformation implementation.
func NewAggregateTransformation ¶
func NewAggregateTransformation(id DatasetID, t AggregateTransformation, mem memory.Allocator) (Transformation, Dataset, error)
NewAggregateTransformation constructs a Transformation and Dataset using the aggregateTransformation implementation.
func NewDataset ¶
func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset
func NewEmptyTable ¶ added in v0.49.0
NewEmptyTable constructs a new empty table with the given group key and columns.
func NewGroupTransformation ¶ added in v0.128.0
func NewGroupTransformation(id DatasetID, t GroupTransformation, mem memory.Allocator) (Transformation, Dataset, error)
func NewIndexSelectorTransformation ¶
func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation
func NewNarrowStateTransformation ¶ added in v0.130.0
func NewNarrowStateTransformation[T any](id DatasetID, t NarrowStateTransformation[T], mem memory.Allocator) (Transformation, Dataset, error)
NewNarrowStateTransformation constructs a Transformation and Dataset using the NarrowStateTransformation implementation.
func NewNarrowTransformation ¶ added in v0.125.0
func NewNarrowTransformation(id DatasetID, t NarrowTransformation, mem memory.Allocator) (Transformation, Dataset, error)
NewNarrowTransformation constructs a Transformation and Dataset using the NarrowTransformation implementation.
func NewRowSelectorTransformation ¶
func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation
func NewSimpleAggregateTransformation ¶ added in v0.127.0
func NewSimpleAggregateTransformation(ctx context.Context, id DatasetID, agg SimpleAggregate, config SimpleAggregateConfig, mem memory.Allocator) (Transformation, Dataset, error)
func NewTableBuilderCache ¶
func OperationType ¶ added in v0.125.0
func OperationType(t interface{}) string
OperationType returns a string representation of the transformation operation represented by the Transport.
func PanicUnknownType ¶
func RegisterProfilerFactories ¶ added in v0.86.0
func RegisterProfilerFactories(cpfs ...CreateProfilerFunc)
func RegisterSource ¶
func RegisterSource(k plan.ProcedureKind, c CreateSource)
func RegisterTransformation ¶
func RegisterTransformation(k plan.ProcedureKind, c CreateTransformation)
RegisterTransformation adds a new registration mapping of procedure kind to transformation.
func ReplaceTransformation ¶ added in v0.14.0
func ReplaceTransformation(k plan.ProcedureKind, c CreateTransformation)
ReplaceTransformation changes an existing transformation registration.
func TablesEqual ¶ added in v0.8.0
TablesEqual takes two flux tables and compares them. Returns false if the tables have different keys, different columns, or if the data in any column does not match. Returns true otherwise. This function will consume the ColumnReader so if you are calling this from the a Process method, you may need to copy the table if you need to iterate over the data for other calculations.
Types ¶
type AccumulationMode ¶
type AccumulationMode int
const ( // DiscardingMode will discard the data associated with a group key // after it has been processed. DiscardingMode AccumulationMode = iota // AccumulatingMode will retain the data associated with a group key // after it has been processed. If it has already sent a table with // that group key to a downstream transformation, it will signal // to that transformation that the previous table should be retracted. // // This is not implemented at the moment. AccumulatingMode )
type Administration ¶
type Administration interface { Context() context.Context ResolveTime(qt flux.Time) Time StreamContext() StreamContext Allocator() memory.Allocator Parents() []DatasetID ParallelOpts() ParallelOpts }
type AggregateParallelTransformation ¶ added in v0.171.0
type AggregateParallelTransformation interface { AggregateTransformation // Merge will take two existing states produced by the Aggregate method and merge them // into a single state. Merge(into, from interface{}, mem memory.Allocator) (interface{}, error) }
AggregateParallelTransformation is an AggregateTransformation that is capable of processing chunks from within the same group key in parallel.
The thing that differentiates this from a normal AggregateTransformation is having multiple parents and the capability to merge two existing states into a single one that will be passed to Compute.
type AggregateTransformation ¶ added in v0.127.0
type AggregateTransformation interface { // Aggregate will process the table.Chunk with the state from the previous // time a table with this group key was invoked. // // If this group key has never been invoked before, the state will be nil. // // The transformation should return the new state and a boolean // value of true if the state was created or modified. If false is returned, // the new state will be discarded and any old state will be kept. // // It is ok for the transformation to modify the state if it is // a pointer. This is both allowed and recommended. Aggregate(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error) // Compute will signal the AggregateTransformation to compute // the aggregate for the given key from the provided state. // // The state will be the value that was returned from Aggregate. // If the Aggregate function never returned state, this function // will never be called. Compute(key flux.GroupKey, state interface{}, d *TransportDataset, mem memory.Allocator) error Closer }
AggregateTransformation implements a transformation that aggregates the results from multiple TableView values and then outputs a Table with the same group key.
This is similar to NarrowTransformation that it does not modify the group key, but different because it will only output a table when the key is flushed.
type Allocator ¶
Allocator is used to track memory allocations for directly allocated structs. Normally, you should use arrow builders and the memory.Allocator by itself to create arrays, but the Allocator is used by older builders that were pre-arrow and directly allocated Go slices rather than relying on arrow's builders.
func (*Allocator) AppendBools ¶
AppendBools appends bools to a slice
func (*Allocator) AppendFloats ¶
AppendFloats appends float64s to a slice
func (*Allocator) AppendInts ¶
AppendInts appends int64s to a slice
func (*Allocator) AppendStrings ¶
AppendStrings appends strings to a slice. Only the string headers are accounted for.
func (*Allocator) AppendTimes ¶
AppendTimes appends Times to a slice
func (*Allocator) AppendUInts ¶
AppendUInts appends uint64s to a slice
func (*Allocator) Strings ¶
Strings makes a slice of string values. Only the string headers are accounted for.
type AsyncTransport ¶ added in v0.125.0
type AsyncTransport interface { Transport // Finished reports when the AsyncTransport has completed and there is no more work to do. Finished() <-chan struct{} // TransportProfile returns the profile for this transport. // This is only valid after the channel returned by Finished is closed. TransportProfile() flux.TransportProfile }
AsyncTransport is a Transport that performs its work in a separate goroutine.
type BoolValueFunc ¶
type BoolValueFunc interface {
ValueBool() bool
}
type Bounds ¶
func FromFluxBounds ¶ added in v0.153.0
type ChunkBuilder ¶ added in v0.172.0
type ChunkBuilder struct {
// contains filtered or unexported fields
}
func NewChunkBuilder ¶ added in v0.172.0
func (*ChunkBuilder) AppendRecord ¶ added in v0.172.0
func (b *ChunkBuilder) AppendRecord(record values.Object) error
type Closer ¶ added in v0.148.0
type Closer interface { // Close is invoked when the resource will no longer be used. Close() error }
Closer is an interface to be implemented for a resource that will be closed at a defined time.
type ColListTable ¶
type ColListTable struct {
// contains filtered or unexported fields
}
ColListTable implements Table using list of columns. All data for the table is stored in RAM. As a result At* methods are provided directly on the table for easy access.
func (*ColListTable) Cols ¶
func (t *ColListTable) Cols() []flux.ColMeta
func (*ColListTable) Done ¶ added in v0.31.0
func (t *ColListTable) Done()
func (*ColListTable) Empty ¶
func (t *ColListTable) Empty() bool
func (*ColListTable) Key ¶
func (t *ColListTable) Key() flux.GroupKey
func (*ColListTable) Len ¶
func (t *ColListTable) Len() int
func (*ColListTable) NRows ¶
func (t *ColListTable) NRows() int
func (*ColListTable) RefCount ¶
func (t *ColListTable) RefCount(n int)
func (*ColListTable) Release ¶ added in v0.31.0
func (t *ColListTable) Release()
func (*ColListTable) Retain ¶ added in v0.31.0
func (t *ColListTable) Retain()
type ColListTableBuilder ¶
type ColListTableBuilder struct {
// contains filtered or unexported fields
}
func NewColListTableBuilder ¶
func NewColListTableBuilder(key flux.GroupKey, a memory.Allocator) *ColListTableBuilder
func (*ColListTableBuilder) AddCol ¶
func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)
func (*ColListTableBuilder) AppendBool ¶
func (b *ColListTableBuilder) AppendBool(j int, value bool) error
func (*ColListTableBuilder) AppendBools ¶
func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error
func (*ColListTableBuilder) AppendFloat ¶
func (b *ColListTableBuilder) AppendFloat(j int, value float64) error
func (*ColListTableBuilder) AppendFloats ¶
func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float) error
func (*ColListTableBuilder) AppendInt ¶
func (b *ColListTableBuilder) AppendInt(j int, value int64) error
func (*ColListTableBuilder) AppendInts ¶
func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int) error
func (*ColListTableBuilder) AppendNil ¶ added in v0.14.0
func (b *ColListTableBuilder) AppendNil(j int) error
func (*ColListTableBuilder) AppendString ¶
func (b *ColListTableBuilder) AppendString(j int, value string) error
func (*ColListTableBuilder) AppendStrings ¶
func (b *ColListTableBuilder) AppendStrings(j int, vs *array.String) error
func (*ColListTableBuilder) AppendTime ¶
func (b *ColListTableBuilder) AppendTime(j int, value Time) error
func (*ColListTableBuilder) AppendTimes ¶
func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int) error
func (*ColListTableBuilder) AppendUInt ¶
func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error
func (*ColListTableBuilder) AppendUInts ¶
func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint) error
func (*ColListTableBuilder) AppendValue ¶
func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error
func (*ColListTableBuilder) Bools ¶ added in v0.10.0
func (b *ColListTableBuilder) Bools(j int) []bool
func (*ColListTableBuilder) ClearData ¶
func (b *ColListTableBuilder) ClearData()
func (*ColListTableBuilder) Cols ¶
func (b *ColListTableBuilder) Cols() []flux.ColMeta
func (*ColListTableBuilder) Floats ¶ added in v0.10.0
func (b *ColListTableBuilder) Floats(j int) []float64
func (*ColListTableBuilder) GetRow ¶ added in v0.10.0
func (b *ColListTableBuilder) GetRow(row int) values.Object
GetRow takes a row index and returns the record located at that index in the cache
func (*ColListTableBuilder) GrowBools ¶
func (b *ColListTableBuilder) GrowBools(j, n int) error
func (*ColListTableBuilder) GrowFloats ¶
func (b *ColListTableBuilder) GrowFloats(j, n int) error
func (*ColListTableBuilder) GrowInts ¶
func (b *ColListTableBuilder) GrowInts(j, n int) error
func (*ColListTableBuilder) GrowStrings ¶
func (b *ColListTableBuilder) GrowStrings(j, n int) error
func (*ColListTableBuilder) GrowTimes ¶
func (b *ColListTableBuilder) GrowTimes(j, n int) error
func (*ColListTableBuilder) GrowUInts ¶
func (b *ColListTableBuilder) GrowUInts(j, n int) error
func (*ColListTableBuilder) Ints ¶ added in v0.10.0
func (b *ColListTableBuilder) Ints(j int) []int64
func (*ColListTableBuilder) Key ¶
func (b *ColListTableBuilder) Key() flux.GroupKey
func (*ColListTableBuilder) Len ¶ added in v0.10.0
func (b *ColListTableBuilder) Len() int
func (*ColListTableBuilder) LevelColumns ¶
func (b *ColListTableBuilder) LevelColumns() error
func (*ColListTableBuilder) NCols ¶
func (b *ColListTableBuilder) NCols() int
func (*ColListTableBuilder) NRows ¶
func (b *ColListTableBuilder) NRows() int
func (*ColListTableBuilder) Release ¶ added in v0.48.0
func (b *ColListTableBuilder) Release()
func (*ColListTableBuilder) SetBool ¶
func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error
func (*ColListTableBuilder) SetFloat ¶
func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error
func (*ColListTableBuilder) SetInt ¶
func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error
func (*ColListTableBuilder) SetNil ¶ added in v0.14.0
func (b *ColListTableBuilder) SetNil(i, j int) error
func (*ColListTableBuilder) SetString ¶
func (b *ColListTableBuilder) SetString(i int, j int, value string) error
func (*ColListTableBuilder) SetTime ¶
func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error
func (*ColListTableBuilder) SetUInt ¶
func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error
func (*ColListTableBuilder) SetValue ¶
func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error
func (*ColListTableBuilder) SliceColumns ¶ added in v0.8.0
func (b *ColListTableBuilder) SliceColumns(start, stop int) error
SliceColumns iterates over each column of b and re-slices them to the range [start:stop].
func (*ColListTableBuilder) Sort ¶
func (b *ColListTableBuilder) Sort(cols []string, desc bool)
func (*ColListTableBuilder) Strings ¶ added in v0.10.0
func (b *ColListTableBuilder) Strings(j int) []string
func (*ColListTableBuilder) Times ¶ added in v0.10.0
func (b *ColListTableBuilder) Times(j int) []values.Time
func (*ColListTableBuilder) UInts ¶ added in v0.10.0
func (b *ColListTableBuilder) UInts(j int) []uint64
type CreateProfilerFunc ¶ added in v0.86.0
type CreateProfilerFunc func() Profiler
type CreateSource ¶
type CreateSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)
type CreateTransformation ¶
type CreateTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)
type DataCache ¶
type DataCache interface { Table(flux.GroupKey) (flux.Table, error) ForEach(func(flux.GroupKey) error) error ForEachWithContext(func(flux.GroupKey, Trigger, TableContext) error) error DiscardTable(flux.GroupKey) ExpireTable(flux.GroupKey) SetTriggerSpec(t plan.TriggerSpec) }
DataCache holds all working data for a transformation.
type Dataset ¶
type Dataset interface { Node RetractTable(key flux.GroupKey) error UpdateProcessingTime(t Time) error UpdateWatermark(mark Time) error Finish(error) SetTriggerSpec(t plan.TriggerSpec) }
Dataset represents the set of data produced by a transformation.
func NewIndexSelectorTransformationAndDataset ¶
func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, config SelectorConfig, a memory.Allocator) (*indexSelectorTransformation, Dataset)
func NewRowSelectorTransformationAndDataset ¶
func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, config SelectorConfig, a memory.Allocator) (*rowSelectorTransformation, Dataset)
type DatasetContext ¶ added in v0.114.0
DatasetContext represents a Dataset with a context.Context attached.
type Dispatcher ¶
type Dispatcher interface { // Schedule fn to be executed Schedule(fn ScheduleFunc) }
Dispatcher schedules work for a query. Each transformation submits work to be done to the dispatcher. Then the dispatcher schedules to work based on the available resources.
type DoBoolIndexSelector ¶
type DoBoolRowSelector ¶
type DoFloatAgg ¶
type DoFloatIndexSelector ¶
type DoFloatRowSelector ¶
type DoIntIndexSelector ¶
type DoIntRowSelector ¶
type DoStringAgg ¶
type DoStringIndexSelector ¶
type DoStringRowSelector ¶
type DoTimeIndexSelector ¶ added in v0.38.0
type DoTimeRowSelector ¶ added in v0.38.0
type DoUIntIndexSelector ¶
type DoUIntRowSelector ¶
type ExecutionDependencies ¶ added in v0.91.0
type ExecutionDependencies struct { // Must be set Allocator memory.Allocator Now *time.Time // Allowed to be nil Logger *zap.Logger // Metadata is passed up from any invocations of execution up to the parent // execution, and out through the statistics. Metadata metadata.Metadata ExecutionOptions *ExecutionOptions }
ExecutionDependencies represents the dependencies that a function call executed by the Interpreter needs in order to trigger the execution of a flux.Program
func DefaultExecutionDependencies ¶ added in v0.91.0
func DefaultExecutionDependencies() ExecutionDependencies
func GetExecutionDependencies ¶ added in v0.91.0
func GetExecutionDependencies(ctx context.Context) ExecutionDependencies
func NewExecutionDependencies ¶ added in v0.91.0
func NewExecutionDependencies(allocator memory.Allocator, now *time.Time, logger *zap.Logger) ExecutionDependencies
Create some execution dependencies. Any arg may be nil, this will choose some suitable defaults.
func (ExecutionDependencies) Inject ¶ added in v0.91.0
func (d ExecutionDependencies) Inject(ctx context.Context) context.Context
func (ExecutionDependencies) ResolveTimeable ¶ added in v0.148.0
ResolveTimeable returns the time represented by a value. The value's type must be Timeable, one of time or duration.
type ExecutionNode ¶ added in v0.86.0
type ExecutionNode struct {
// contains filtered or unexported fields
}
func (*ExecutionNode) Label ¶ added in v0.86.0
func (n *ExecutionNode) Label() string
func (*ExecutionNode) SetLabel ¶ added in v0.86.0
func (n *ExecutionNode) SetLabel(label string)
type ExecutionOptions ¶ added in v0.91.0
type ExecutionOptions struct { OperatorProfiler *OperatorProfiler Profilers []Profiler }
type Executor ¶
type Executor interface { // Execute will begin execution of the plan.Spec using the memory allocator. // This returns a mapping of names to the query results. // This will also return a channel for the Metadata from the query. The channel // may return zero or more values. The returned channel must not require itself to // be read so the executor must allocate enough space in the channel so if the channel // is unread that it will not block. Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error) }
func NewExecutor ¶
type FloatValueFunc ¶
type FloatValueFunc interface {
ValueFloat() float64
}
type FlushKeyMsg ¶ added in v0.125.0
type FormatOptions ¶
type FormatOptions struct { // RepeatHeaderCount is the number of rows to print before printing the header again. // If zero then the headers are not repeated. RepeatHeaderCount int NullRepresentation string }
func DefaultFormatOptions ¶
func DefaultFormatOptions() *FormatOptions
type Formatter ¶
type Formatter struct {
// contains filtered or unexported fields
}
Formatter writes a table to a Writer.
func NewFormatter ¶
func NewFormatter(tbl flux.Table, opts *FormatOptions) *Formatter
NewFormatter creates a Formatter for a given table. If opts is nil, the DefaultFormatOptions are used.
type GroupKeyBuilder ¶
type GroupKeyBuilder struct {
// contains filtered or unexported fields
}
GroupKeyBuilder is used to construct a GroupKey by keeping a mutable copy in memory.
func NewGroupKeyBuilder ¶
func NewGroupKeyBuilder(key flux.GroupKey) *GroupKeyBuilder
NewGroupKeyBuilder creates a new GroupKeyBuilder from an existing GroupKey. If the GroupKey passed is nil, a blank GroupKeyBuilder is constructed.
func (*GroupKeyBuilder) AddKeyValue ¶
func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder
AddKeyValue will add a new group key to the existing builder.
func (*GroupKeyBuilder) Build ¶
func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)
Build will construct the GroupKey. If there is any problem with the GroupKey (such as one of the columns is not a valid type), the error will be returned here.
func (*GroupKeyBuilder) Grow ¶
func (gkb *GroupKeyBuilder) Grow(n int)
Grow will grow the internal capacity of the group key to the given number.
func (*GroupKeyBuilder) Len ¶
func (gkb *GroupKeyBuilder) Len() int
Len returns the current length of the group key.
func (*GroupKeyBuilder) SetKeyValue ¶ added in v0.14.0
func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder
SetKeyValue will set an existing key/value to the given pair, or if key is not found, add a new group key to the existing builder.
type GroupLookup ¶
type GroupTransformation ¶ added in v0.128.0
type GroupTransformation interface { Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error Closer }
GroupTransformation is a transformation that can modify the group key. Other than modifying the group key, it is identical to a NarrowTransformation.
The main difference between this and NarrowTransformation is that NarrowTransformation will pass the FlushKeyMsg to the Dataset and GroupTransformation will swallow this Message.
type IndexSelector ¶
type IndexSelector interface { NewTimeSelector() DoTimeIndexSelector NewBoolSelector() DoBoolIndexSelector NewIntSelector() DoIntIndexSelector NewUIntSelector() DoUIntIndexSelector NewFloatSelector() DoFloatIndexSelector NewStringSelector() DoStringIndexSelector }
type IntValueFunc ¶
type IntValueFunc interface {
ValueInt() int64
}
type Message ¶
type Message interface { // Type returns the MessageType for this Message. Type() MessageType // SrcDatasetID is the DatasetID that produced this Message. SrcDatasetID() DatasetID // Ack is used to acknowledge that the Message was received // and terminated. A Message may be passed between various // Transport implementations. When the Ack is received, // this signals to the Message to release any memory it may // have retained. Ack() // Dup is used to duplicate the Message. // This is useful when the Message has to be sent to multiple // receivers from a single sender. Dup() Message }
Message is a message sent from one Dataset to another.
type MessageQueue ¶
MessageQueue provides a concurrency safe queue for messages. The queue must have a single consumer calling Pop.
type MessageType ¶
type MessageType int
const ( // RetractTableType is sent when the previous table for // a given group key should be retracted. RetractTableType MessageType = iota // ProcessType is sent when there is an entire flux.Table // ready to be processed from the upstream Dataset. ProcessType // UpdateWatermarkType is sent when there will be no more // points older than the watermark for any key. UpdateWatermarkType // UpdateProcessingTimeType is sent to update the current time. UpdateProcessingTimeType // FinishType is sent when there are no more messages from // the upstream Dataset or an upstream error occurred that // caused the execution to abort. FinishType // ProcessChunkType is sent when a new table.Chunk is ready to // be processed from the upstream Dataset. ProcessChunkType // FlushKeyType is sent when the upstream Dataset wishes // to flush the data associated with a key presently stored // in the Dataset. FlushKeyType )
type MetadataNode ¶ added in v0.21.0
MetadataNode is a node that has additional metadata that should be added to the result after it is processed.
type NarrowStateTransformation ¶ added in v0.130.0
type NarrowStateTransformation[T any] interface { // Process will process the TableView. Process(chunk table.Chunk, state T, d *TransportDataset, mem memory.Allocator) (T, bool, error) Closer }
NarrowStateTransformation is the same as a NarrowTransformation except that it retains state between processing buffers.
type NarrowTransformation ¶ added in v0.125.0
type NarrowTransformation interface { // Process will process the table.Chunk and send any output to the TransportDataset. Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error Closer }
NarrowTransformation implements a transformation that processes a table.Chunk and does not modify its group key.
type Node ¶
type Node interface {
AddTransformation(t Transformation)
}
type OperatorProfiler ¶ added in v0.86.0
type OperatorProfiler struct{}
func (*OperatorProfiler) GetSortedResult ¶ added in v0.91.0
func (o *OperatorProfiler) GetSortedResult(q flux.Query, alloc memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error)
GetSortedResult is identical to GetResult, except it calls Sort() on the ColListTableBuilder to make testing easier. sortKeys and desc are passed directly into the Sort() call
func (*OperatorProfiler) Name ¶ added in v0.86.0
func (o *OperatorProfiler) Name() string
type ParallelOpts ¶ added in v0.157.0
type PassthroughDataset ¶ added in v0.49.0
type PassthroughDataset struct {
// contains filtered or unexported fields
}
PassthroughDataset is a Dataset that will passthrough the processed data to the next Transformation.
func NewPassthroughDataset ¶ added in v0.49.0
func NewPassthroughDataset(id DatasetID) *PassthroughDataset
NewPassthroughDataset constructs a new PassthroughDataset.
func (*PassthroughDataset) AddTransformation ¶ added in v0.49.0
func (d *PassthroughDataset) AddTransformation(t Transformation)
func (*PassthroughDataset) Finish ¶ added in v0.49.0
func (d *PassthroughDataset) Finish(err error)
func (*PassthroughDataset) Process ¶ added in v0.49.0
func (d *PassthroughDataset) Process(tbl flux.Table) error
func (*PassthroughDataset) RetractTable ¶ added in v0.49.0
func (d *PassthroughDataset) RetractTable(key flux.GroupKey) error
func (*PassthroughDataset) SetTriggerSpec ¶ added in v0.49.0
func (d *PassthroughDataset) SetTriggerSpec(t plan.TriggerSpec)
func (*PassthroughDataset) UpdateProcessingTime ¶ added in v0.49.0
func (d *PassthroughDataset) UpdateProcessingTime(t Time) error
func (*PassthroughDataset) UpdateWatermark ¶ added in v0.49.0
func (d *PassthroughDataset) UpdateWatermark(mark Time) error
type ProcessChunkMsg ¶ added in v0.125.0
type ProcessMsg ¶
type QueryProfiler ¶ added in v0.82.1
type QueryProfiler struct{}
func (*QueryProfiler) GetSortedResult ¶ added in v0.91.0
func (s *QueryProfiler) GetSortedResult(q flux.Query, alloc memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error)
GetSortedResult is identical to GetResult, except it calls Sort() on the ColListTableBuilder to make testing easier. sortKeys and desc are passed directly into the Sort() call
func (*QueryProfiler) Name ¶ added in v0.82.1
func (s *QueryProfiler) Name() string
type RandomAccessGroupLookup ¶ added in v0.59.0
type RandomAccessGroupLookup = groupkey.RandomAccessLookup
func NewRandomAccessGroupLookup ¶ added in v0.59.0
func NewRandomAccessGroupLookup() *RandomAccessGroupLookup
NewRandomAccessGroupLookup constructs a RandomAccessGroupLookup.
type RetractTableMsg ¶
type RowJoinFn ¶ added in v0.175.0
type RowJoinFn struct {
// contains filtered or unexported fields
}
func NewRowJoinFn ¶ added in v0.175.0
func NewRowJoinFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowJoinFn
func (*RowJoinFn) ReturnType ¶ added in v0.175.0
type RowJoinPreparedFn ¶ added in v0.175.0
type RowJoinPreparedFn struct {
// contains filtered or unexported fields
}
type RowMapFn ¶
type RowMapFn struct {
// contains filtered or unexported fields
}
func NewRowMapFn ¶
func NewRowMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowMapFn
type RowMapPreparedFn ¶ added in v0.68.0
type RowMapPreparedFn struct {
// contains filtered or unexported fields
}
func (*RowMapPreparedFn) Type ¶ added in v0.68.0
func (f *RowMapPreparedFn) Type() semantic.MonoType
type RowPredicateFn ¶
type RowPredicateFn struct {
// contains filtered or unexported fields
}
func NewRowPredicateFn ¶
func NewRowPredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowPredicateFn
func (*RowPredicateFn) Prepare ¶
func (f *RowPredicateFn) Prepare(cols []flux.ColMeta) (*RowPredicatePreparedFn, error)
type RowPredicatePreparedFn ¶ added in v0.68.0
type RowPredicatePreparedFn struct {
// contains filtered or unexported fields
}
func (*RowPredicatePreparedFn) InferredInputType ¶ added in v0.68.0
func (f *RowPredicatePreparedFn) InferredInputType() semantic.MonoType
InferredInputType will return the inferred input type. This type may contain type variables and will only contain the properties that could be inferred from type inference.
func (*RowPredicatePreparedFn) InputType ¶ added in v0.68.0
func (f *RowPredicatePreparedFn) InputType() semantic.MonoType
InputType will return the prepared input type. This will be a fully realized type that was created after preparing the function with Prepare.
type RowReduceFn ¶ added in v0.23.0
type RowReduceFn struct {
// contains filtered or unexported fields
}
func NewRowReduceFn ¶ added in v0.23.0
func NewRowReduceFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowReduceFn
func (*RowReduceFn) Prepare ¶ added in v0.23.0
func (f *RowReduceFn) Prepare(cols []flux.ColMeta, reducerType map[string]semantic.MonoType) (*RowReducePreparedFn, error)
type RowReducePreparedFn ¶ added in v0.68.0
type RowReducePreparedFn struct {
// contains filtered or unexported fields
}
func (*RowReducePreparedFn) Type ¶ added in v0.68.0
func (f *RowReducePreparedFn) Type() semantic.MonoType
type RowSelector ¶
type RowSelector interface { NewTimeSelector() DoTimeRowSelector NewBoolSelector() DoBoolRowSelector NewIntSelector() DoIntRowSelector NewUIntSelector() DoUIntRowSelector NewFloatSelector() DoFloatRowSelector NewStringSelector() DoStringRowSelector }
type ScheduleFunc ¶
ScheduleFunc is a function that represents work to do. The throughput is the maximum number of messages to process for this scheduling.
type SelectorConfig ¶
type SelectorConfig struct { plan.DefaultCost Column string `json:"column"` }
func (SelectorConfig) PassThroughAttribute ¶ added in v0.177.0
func (c SelectorConfig) PassThroughAttribute(attrKey string) bool
PassThroughAttribute implements the PassThroughAttributer interface used by the planner. Selector functions preserve collation of their input rows.
type SimpleAggregate ¶ added in v0.127.0
type SimpleAggregate interface { NewBoolAgg() DoBoolAgg NewIntAgg() DoIntAgg NewUIntAgg() DoUIntAgg NewFloatAgg() DoFloatAgg NewStringAgg() DoStringAgg }
type SimpleAggregateConfig ¶ added in v0.127.0
type SimpleAggregateConfig struct { plan.DefaultCost Columns []string `json:"columns"` }
func (SimpleAggregateConfig) Copy ¶ added in v0.127.0
func (c SimpleAggregateConfig) Copy() SimpleAggregateConfig
func (SimpleAggregateConfig) PassThroughAttribute ¶ added in v0.177.0
func (c SimpleAggregateConfig) PassThroughAttribute(attrKey string) bool
PassThroughAttribute implements the PassThroughAttributer interface used by the planner. Aggregate functions preserve collation of their input rows, albeit trivially, since there can be only one row in each output table.
type Source ¶
func CreateSourceFromDecoder ¶ added in v0.17.0
func CreateSourceFromDecoder(decoder SourceDecoder, dsid DatasetID, a Administration) (Source, error)
CreateSourceFromDecoder takes an implementation of a SourceDecoder, as well as a dataset ID and Administration type and creates an execute.Source.
func CreateSourceFromIterator ¶ added in v0.49.0
func CreateSourceFromIterator(iterator SourceIterator, dsid DatasetID) (Source, error)
CreateSourceFromIterator takes an implementation of a SourceIterator as well as a dataset ID and creates an execute.Source.
type SourceDecoder ¶ added in v0.17.0
type SourceDecoder interface { Connect(ctx context.Context) error Fetch(ctx context.Context) (bool, error) Decode(ctx context.Context) (flux.Table, error) Close() error }
SourceDecoder is an interface that generalizes the process of retrieving data from an unspecified data source.
Connect implements the logic needed to connect directly to the data source.
Fetch implements a single fetch of data from the source (may be called multiple times). Should return false when there is no more data to retrieve.
Decode implements the process of marshaling the data returned by the source into a flux.Table type.
In executing the retrieval process, Connect is called once at the onset, and subsequent calls of Fetch() and Decode() are called iteratively until the data source is fully consumed.
type SourceIterator ¶ added in v0.49.0
type SourceIterator interface { // Do will invoke the Source and cause each materialized flux.Table // to the given function. Do(ctx context.Context, f func(flux.Table) error) error }
SourceIterator is an interface for iterating over flux.Table values in a source. It provides a common interface for creating an execute.Source in an iterative way.
type StreamContext ¶
type StreamContext interface {
Bounds() *Bounds
}
StreamContext represents necessary context for a single stream of query data.
type StringValueFunc ¶
type StringValueFunc interface {
ValueString() string
}
type TableBuilder ¶
type TableBuilder interface { Key() flux.GroupKey NRows() int NCols() int Cols() []flux.ColMeta // AddCol increases the size of the table by one column. // The index of the column is returned. AddCol(flux.ColMeta) (int, error) // Set sets the value at the specified coordinates // The rows and columns must exist before calling set, otherwise Set panics. SetValue(i, j int, value values.Value) error SetNil(i, j int) error // Append will add a single value to the end of a column. Will set the number of // rows in the table to the size of the new column. It's the caller's job to make sure // that the expected number of rows in each column is equal. AppendBool(j int, value bool) error AppendInt(j int, value int64) error AppendUInt(j int, value uint64) error AppendFloat(j int, value float64) error AppendString(j int, value string) error AppendTime(j int, value Time) error AppendValue(j int, value values.Value) error AppendNil(j int) error // AppendBools and similar functions will append multiple values to column j. As above, // it will set the numer of rows in the table to the size of the new column. It's the // caller's job to make sure that the expected number of rows in each column is equal. AppendBools(j int, vs *array.Boolean) error AppendInts(j int, vs *array.Int) error AppendUInts(j int, vs *array.Uint) error AppendFloats(j int, vs *array.Float) error AppendStrings(j int, vs *array.String) error AppendTimes(j int, vs *array.Int) error // GrowBools and similar functions will extend column j by n zero-values for the respective type. // If the column has enough capacity, no reallocation is necessary. If the capacity is insufficient, // a new slice is allocated with 1.5*newCapacity. As with the Append functions, it is the // caller's job to make sure that the expected number of rows in each column is equal. GrowBools(j, n int) error GrowInts(j, n int) error GrowUInts(j, n int) error GrowFloats(j, n int) error GrowStrings(j, n int) error GrowTimes(j, n int) error // LevelColumns will check for columns that are too short and Grow them // so that each column is of uniform size. LevelColumns() error // Sort the rows of the by the values of the columns in the order listed. Sort(cols []string, desc bool) // ClearData removes all rows, while preserving the column meta data. ClearData() // Release releases any extraneous memory that has been retained. Release() // Table returns the table that has been built. // Further modifications of the builder will not effect the returned table. Table() (flux.Table, error) }
TableBuilder builds tables that can be used multiple times
type TableBuilderCache ¶
type TableBuilderCache interface { // TableBuilder returns an existing or new TableBuilder for the given meta data. // The boolean return value indicates if TableBuilder is new. TableBuilder(key flux.GroupKey) (TableBuilder, bool) ForEachBuilder(f func(flux.GroupKey, TableBuilder) error) error }
type TableContext ¶
type TablePredicateFn ¶ added in v0.34.1
type TablePredicateFn struct {
// contains filtered or unexported fields
}
func NewTablePredicateFn ¶ added in v0.34.1
func NewTablePredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) *TablePredicateFn
func (*TablePredicateFn) Prepare ¶ added in v0.34.1
func (f *TablePredicateFn) Prepare(tbl flux.Table) (*TablePredicatePreparedFn, error)
type TablePredicatePreparedFn ¶ added in v0.68.0
type TablePredicatePreparedFn struct {
// contains filtered or unexported fields
}
type Transformation ¶
type Transformation interface { RetractTable(id DatasetID, key flux.GroupKey) error // Process takes in one flux Table, performs data processing on it and // writes that table to a DataCache Process(id DatasetID, tbl flux.Table) error UpdateWatermark(id DatasetID, t Time) error UpdateProcessingTime(id DatasetID, t Time) error // Finish indicates that the Transformation is done processing. It is // the last method called on the Transformation Finish(id DatasetID, err error) }
Transformation represents functions that stream a set of tables, performs data processing on them and produces an output stream of tables
func NewTransformationFromTransport ¶ added in v0.148.0
func NewTransformationFromTransport(t Transport) Transformation
NewTransformationFromTransport will adapt a Transport to satisfy both the Transport and Transformation interfaces.
type TransformationSet ¶ added in v0.49.0
type TransformationSet []Transformation
TransformationSet is a group of transformations.
func (TransformationSet) Finish ¶ added in v0.49.0
func (ts TransformationSet) Finish(id DatasetID, err error)
func (TransformationSet) Process ¶ added in v0.49.0
func (ts TransformationSet) Process(id DatasetID, tbl flux.Table) error
func (TransformationSet) RetractTable ¶ added in v0.49.0
func (ts TransformationSet) RetractTable(id DatasetID, key flux.GroupKey) error
func (TransformationSet) UpdateProcessingTime ¶ added in v0.49.0
func (ts TransformationSet) UpdateProcessingTime(id DatasetID, time Time) error
func (TransformationSet) UpdateWatermark ¶ added in v0.49.0
func (ts TransformationSet) UpdateWatermark(id DatasetID, time Time) error
type Transport ¶
type Transport interface { // ProcessMessage will process a message in the Transport. // // Messages sent to the Transport may be one of many types. // Known message should be handled as is appropriate, but // unknown messages should be acked but otherwise ignored. // An error should not be returned for unknown messages. ProcessMessage(m Message) error }
Transport is an interface for handling raw messages.
func WrapTransformationInTransport ¶ added in v0.125.0
func WrapTransformationInTransport(t Transformation, mem memory.Allocator) Transport
WrapTransformationInTransport will wrap a Transformation into a Transport to be used for the execution engine.
type TransportDataset ¶ added in v0.125.0
type TransportDataset struct {
// contains filtered or unexported fields
}
TransportDataset holds data for a specific node and sends messages to downstream nodes using the Transport.
This Dataset also implements a shim for execute.Dataset so it can be integrated with the existing execution engine. These methods are stubs and do not do anything.
func NewTransportDataset ¶ added in v0.125.0
func NewTransportDataset(id DatasetID, mem memory.Allocator) *TransportDataset
NewTransportDataset constructs a TransportDataset.
func (*TransportDataset) AddTransformation ¶ added in v0.125.0
func (d *TransportDataset) AddTransformation(t Transformation)
AddTransformation is used to add downstream Transformation nodes to this Transport.
func (*TransportDataset) Delete ¶ added in v0.125.0
func (d *TransportDataset) Delete(key flux.GroupKey) (v interface{}, found bool)
func (*TransportDataset) Finish ¶ added in v0.125.0
func (d *TransportDataset) Finish(err error)
func (*TransportDataset) FlushKey ¶ added in v0.125.0
func (d *TransportDataset) FlushKey(key flux.GroupKey) error
FlushKey sends the flush key message to the downstream transports.
func (*TransportDataset) Lookup ¶ added in v0.125.0
func (d *TransportDataset) Lookup(key flux.GroupKey) (interface{}, bool)
func (*TransportDataset) LookupOrCreate ¶ added in v0.125.0
func (d *TransportDataset) LookupOrCreate(key flux.GroupKey, fn func() interface{}) interface{}
func (*TransportDataset) Process ¶ added in v0.125.0
func (d *TransportDataset) Process(chunk table.Chunk) error
Process sends the given Chunk to be processed by the downstream transports.
func (*TransportDataset) Range ¶ added in v0.127.0
func (d *TransportDataset) Range(f func(key flux.GroupKey, value interface{}) error) error
func (*TransportDataset) RetractTable ¶ added in v0.125.0
func (d *TransportDataset) RetractTable(key flux.GroupKey) error
func (*TransportDataset) Set ¶ added in v0.125.0
func (d *TransportDataset) Set(key flux.GroupKey, value interface{})
func (*TransportDataset) SetTriggerSpec ¶ added in v0.125.0
func (d *TransportDataset) SetTriggerSpec(t plan.TriggerSpec)
func (*TransportDataset) UpdateProcessingTime ¶ added in v0.125.0
func (d *TransportDataset) UpdateProcessingTime(t Time) error
func (*TransportDataset) UpdateWatermark ¶ added in v0.125.0
func (d *TransportDataset) UpdateWatermark(mark Time) error
type Trigger ¶
type Trigger interface { Triggered(TriggerContext) bool Finished() bool Reset() }
func NewTriggerFromSpec ¶
func NewTriggerFromSpec(spec plan.TriggerSpec) Trigger
type TriggerContext ¶
type TriggerContext struct { Table TableContext Watermark Time CurrentProcessingTime Time }
type UIntValueFunc ¶
type UIntValueFunc interface {
ValueUInt() uint64
}
type UpdateProcessingTimeMsg ¶
type UpdateWatermarkMsg ¶
type VectorMapFn ¶ added in v0.154.0
type VectorMapFn struct {
// contains filtered or unexported fields
}
func NewVectorMapFn ¶ added in v0.154.0
func NewVectorMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) *VectorMapFn
func (*VectorMapFn) Prepare ¶ added in v0.154.0
func (f *VectorMapFn) Prepare(cols []flux.ColMeta) (*VectorMapPreparedFn, error)
type VectorMapPreparedFn ¶ added in v0.154.0
type VectorMapPreparedFn struct {
// contains filtered or unexported fields
}
func (*VectorMapPreparedFn) Type ¶ added in v0.154.0
func (f *VectorMapPreparedFn) Type() semantic.MonoType
type Window ¶
func NewWindow ¶ added in v0.19.0
NewWindow creates a window with the given parameters, and normalizes the offset to a small positive duration. It also validates that the durations are valid when used within a window.
func (Window) GetEarliestBounds ¶ added in v0.19.0
GetEarliestBounds returns the bounds for the earliest window bounds that contains the given time t. For underlapping windows that do not contain time t, the window directly after time t will be returned.
func (Window) GetOverlappingBounds ¶ added in v0.19.0
GetOverlappingBounds returns a slice of bounds for each window that overlaps the input bounds b.
Source Files ¶
- aggregate.go
- allocator.go
- bounds.go
- chunk_builder.go
- closer.go
- dataset.go
- dependencies.go
- dispatcher.go
- executor.go
- format.go
- group_key.go
- group_key_builder.go
- group_lookup.go
- group_transformation.go
- narrow_state_transformation.go
- narrow_transformation.go
- profiler.go
- queue.go
- recover.go
- result.go
- ring.go
- row_fn.go
- rowreader.go
- selector.go
- source.go
- source_iterator.go
- table.go
- transformation.go
- transport.go
- trigger.go
- vector_fn.go
- window.go
Directories ¶
Path | Synopsis |
---|---|
Package executetest contains utilities for testing the query execution phase.
|
Package executetest contains utilities for testing the query execution phase. |
static
Package static provides utilities for easily constructing static tables that are meant for tests.
|
Package static provides utilities for easily constructing static tables that are meant for tests. |