Documentation ¶
Overview ¶
Package executetest contains utilities for testing the query execution phase.
Index ¶
- Constants
- Variables
- func AggFuncBenchmarkHelper(b *testing.B, agg execute.SimpleAggregate, data *array.Float, want interface{})
- func AggFuncTestHelper(t *testing.T, agg execute.SimpleAggregate, data *array.Float, want interface{})
- func CreateAllocatingFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)
- func CreateFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)
- func CreateParallelFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)
- func CreateToTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, ...) (execute.Transformation, execute.Dataset, error)
- func EqualResult(w, g flux.Result) error
- func EqualResultIterators(want, got flux.ResultIterator) error
- func EqualResults(want, got []flux.Result) error
- func FunctionExpression(t testing.TB, source string, args ...interface{}) *semantic.FunctionExpression
- func IndexSelectorFuncBenchmarkHelper(b *testing.B, selector execute.IndexSelector, data flux.BufferedTable)
- func IndexSelectorFuncTestHelper(t *testing.T, selector execute.IndexSelector, data flux.Table, want [][]int)
- func MustCopyTable(tbl flux.Table) flux.Table
- func NewDefaultTestFlagger() feature.Dependency
- func NewDevNullStore() execute.Transformation
- func NewTestExecuteDependencies() flux.Dependency
- func NewToProcedure(flux.OperationSpec, plan.Administration) (plan.ProcedureSpec, error)
- func NewYieldProcedureSpec(name string) plan.PhysicalProcedureSpec
- func NewYieldTransformation(d execute.Dataset, cache execute.TableBuilderCache) *yieldTransformation
- func NormalizeTables(bs []*Table)
- func ProcessBenchmarkHelper(b *testing.B, ...)
- func ProcessTestHelper(t *testing.T, data []flux.Table, want []*Table, wantErr error, ...)
- func ProcessTestHelper2(t *testing.T, data []flux.Table, want []*Table, wantErr error, ...)
- func RandomDatasetID() execute.DatasetID
- func RowSelectorFuncBenchmarkHelper(b *testing.B, selector execute.RowSelector, data flux.BufferedTable)
- func RowSelectorFuncTestHelper(t *testing.T, selector execute.RowSelector, data flux.Table, ...)
- func RunBenchmark(b *testing.B, tables []flux.BufferedTable, ...)
- func RunSourceHelper(t *testing.T, ctx context.Context, want []*Table, wantErr error, ...)
- func RunTableTests(t *testing.T, tt TableTest)
- func TransformationPassThroughTestHelper(t *testing.T, newTr NewTransformation)
- type AllocatingFromProcedureSpec
- func (s *AllocatingFromProcedureSpec) AddTransformation(t execute.Transformation)
- func (s *AllocatingFromProcedureSpec) Copy() plan.ProcedureSpec
- func (AllocatingFromProcedureSpec) Cost(inStats []plan.Statistics) (cost plan.Cost, outStats plan.Statistics)
- func (AllocatingFromProcedureSpec) Kind() plan.ProcedureKind
- func (s *AllocatingFromProcedureSpec) Run(ctx context.Context)
- type ColReader
- func (cr *ColReader) Bools(j int) *array.Boolean
- func (cr *ColReader) Cols() []flux.ColMeta
- func (cr *ColReader) Floats(j int) *array.Float
- func (cr *ColReader) Ints(j int) *array.Int
- func (cr *ColReader) Key() flux.GroupKey
- func (cr *ColReader) Len() int
- func (cr *ColReader) Release()
- func (cr *ColReader) Retain()
- func (cr *ColReader) Strings(j int) *array.String
- func (cr *ColReader) Times(j int) *array.Int
- func (cr *ColReader) UInts(j int) *array.Uint
- type CreateNewTransformationWithDeps
- type DataStore
- func (d *DataStore) DiscardTable(key flux.GroupKey)
- func (d *DataStore) Err() error
- func (d *DataStore) ExpireTable(key flux.GroupKey)
- func (d *DataStore) Finish(id execute.DatasetID, err error)
- func (d *DataStore) ForEach(f func(key flux.GroupKey) error) error
- func (d *DataStore) ForEachWithContext(f func(flux.GroupKey, execute.Trigger, execute.TableContext) error) error
- func (d *DataStore) Process(id execute.DatasetID, tbl flux.Table) error
- func (d *DataStore) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (d *DataStore) SetTriggerSpec(t plan.TriggerSpec)
- func (d *DataStore) Table(key flux.GroupKey) (flux.Table, error)
- func (d *DataStore) UpdateProcessingTime(id execute.DatasetID, t execute.Time) error
- func (d *DataStore) UpdateWatermark(id execute.DatasetID, t execute.Time) error
- type Dataset
- func (d *Dataset) AddTransformation(t execute.Transformation)
- func (d *Dataset) Finish(err error)
- func (d *Dataset) RetractTable(key flux.GroupKey) error
- func (d *Dataset) SetTriggerSpec(t plan.TriggerSpec)
- func (d *Dataset) UpdateProcessingTime(t execute.Time) error
- func (d *Dataset) UpdateWatermark(mark execute.Time) error
- type FromProcedureSpec
- func (src *FromProcedureSpec) AddTransformation(t execute.Transformation)
- func (src *FromProcedureSpec) Copy() plan.ProcedureSpec
- func (src *FromProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics)
- func (src *FromProcedureSpec) Kind() plan.ProcedureKind
- func (src *FromProcedureSpec) Run(ctx context.Context)
- type NewTransformation
- type ParallelFromProcedureSpec
- func (src *ParallelFromProcedureSpec) AddTransformation(t execute.Transformation)
- func (src *ParallelFromProcedureSpec) Copy() plan.ProcedureSpec
- func (src *ParallelFromProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics)
- func (src *ParallelFromProcedureSpec) Kind() plan.ProcedureKind
- func (src *ParallelFromProcedureSpec) OutputAttributes() plan.PhysicalAttributes
- func (src *ParallelFromProcedureSpec) Run(ctx context.Context)
- type ParallelTable
- type Result
- type RowWiseTable
- type SortedTables
- type SourceUrlValidationTestCases
- type Table
- type TableIterator
- type TableTest
- type TestFlagger
- type TfUrlValidationTest
- type TfUrlValidationTestCase
- type ToProcedureSpec
- type ToTransformation
- func (t *ToTransformation) Finish(id execute.DatasetID, err error)
- func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error
- func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error
- func (t *ToTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error
- type YieldProcedureSpec
Constants ¶
const AllocatingFromTestKind = "allocating-from-test"
const FromTestKind = "from-test"
const ParallelFromTestKind = "parallel-from-test"
const ParallelGroupColName = "_parallel_group"
const ToTestKind = "to-test"
ToTestKind represents an side-effect producing kind for testing
const YieldKind = "yield-test"
Variables ¶
var UnlimitedAllocator = &memory.ResourceAllocator{}
Functions ¶
func AggFuncBenchmarkHelper ¶
func AggFuncBenchmarkHelper(b *testing.B, agg execute.SimpleAggregate, data *array.Float, want interface{})
AggFuncBenchmarkHelper benchmarks the aggregate function over data and compares to wantValue
func AggFuncTestHelper ¶
func AggFuncTestHelper(t *testing.T, agg execute.SimpleAggregate, data *array.Float, want interface{})
AggFuncTestHelper splits the data in half, runs Do over each split and compares the Value to want.
func CreateAllocatingFromSource ¶ added in v0.28.3
func CreateAllocatingFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)
func CreateFromSource ¶
func CreateFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)
func CreateParallelFromSource ¶ added in v0.157.0
func CreateParallelFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)
func CreateToTransformation ¶ added in v0.8.0
func CreateToTransformation(id execute.DatasetID, mode execute.AccumulationMode, spec plan.ProcedureSpec, a execute.Administration) (execute.Transformation, execute.Dataset, error)
func EqualResult ¶ added in v0.36.2
EqualResult compares to results for equality
func EqualResultIterators ¶ added in v0.36.2
func EqualResultIterators(want, got flux.ResultIterator) error
EqualResultIterators compares two ResultIterators for equality
func EqualResults ¶ added in v0.7.1
EqualResults compares two lists of Flux Results for equality
func FunctionExpression ¶ added in v0.68.0
func FunctionExpression(t testing.TB, source string, args ...interface{}) *semantic.FunctionExpression
FunctionExpression will take a function expression as a string and return the *semantic.FunctionExpression.
This will cause a fatal error in the test on failure.
func IndexSelectorFuncBenchmarkHelper ¶
func IndexSelectorFuncBenchmarkHelper(b *testing.B, selector execute.IndexSelector, data flux.BufferedTable)
func NewDefaultTestFlagger ¶ added in v0.185.0
func NewDefaultTestFlagger() feature.Dependency
NewDefaultTestFlagger gives a flagger dependency for a test harnesses to use as a baseline.
Likely this will be made redundant by <https://github.com/influxdata/flux/issues/4777> since testcases will then be able to manage their own feature selection.
func NewDevNullStore ¶ added in v0.55.0
func NewDevNullStore() execute.Transformation
func NewTestExecuteDependencies ¶ added in v0.41.0
func NewTestExecuteDependencies() flux.Dependency
func NewToProcedure ¶ added in v0.8.0
func NewToProcedure(flux.OperationSpec, plan.Administration) (plan.ProcedureSpec, error)
func NewYieldProcedureSpec ¶
func NewYieldProcedureSpec(name string) plan.PhysicalProcedureSpec
func NewYieldTransformation ¶ added in v0.21.0
func NewYieldTransformation(d execute.Dataset, cache execute.TableBuilderCache) *yieldTransformation
func NormalizeTables ¶
func NormalizeTables(bs []*Table)
NormalizeTables ensures that each table is normalized and that tables and columns are sorted in alphabetical order for consistent testing
func ProcessBenchmarkHelper ¶ added in v0.55.0
func ProcessTestHelper ¶
func ProcessTestHelper( t *testing.T, data []flux.Table, want []*Table, wantErr error, create func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation, floatOptions ...cmp.Option, )
floatOptions is a list of comparison options for floating point values. if not passed by the caller, defaultFloatOptions will be used
func ProcessTestHelper2 ¶ added in v0.49.0
func ProcessTestHelper2( t *testing.T, data []flux.Table, want []*Table, wantErr error, create func(id execute.DatasetID, alloc memory.Allocator) (execute.Transformation, execute.Dataset), floatOptions ...cmp.Option, )
floatOptions is a list of comparison options for floating point values. if not passed by the caller, defaultFloatOptions will be used
func RandomDatasetID ¶
func RowSelectorFuncBenchmarkHelper ¶
func RowSelectorFuncBenchmarkHelper(b *testing.B, selector execute.RowSelector, data flux.BufferedTable)
func RunBenchmark ¶ added in v0.59.0
func RunSourceHelper ¶ added in v0.66.0
func RunSourceHelper( t *testing.T, ctx context.Context, want []*Table, wantErr error, create func(id execute.DatasetID) execute.Source, )
RunSourceHelper is a helper for testing an execute.Source. This can be called with a list of wanted tables from the source. The create function should create the source. If there is an error creating the source, `t.Fatal` can be called to abort the unit test by calling it from inside of a closure.
func RunTableTests ¶ added in v0.36.0
RunTableTests will run the common table tests over each table in the returned TableIterator. The function will be called for each test.
func TransformationPassThroughTestHelper ¶
func TransformationPassThroughTestHelper(t *testing.T, newTr NewTransformation)
Types ¶
type AllocatingFromProcedureSpec ¶ added in v0.28.3
type AllocatingFromProcedureSpec struct { execute.ExecutionNode ByteCount int // contains filtered or unexported fields }
AllocatingFromProcedureSpec is a procedure spec AND an execution node that allocates ByteCount bytes during execution.
func (*AllocatingFromProcedureSpec) AddTransformation ¶ added in v0.28.3
func (s *AllocatingFromProcedureSpec) AddTransformation(t execute.Transformation)
func (*AllocatingFromProcedureSpec) Copy ¶ added in v0.28.3
func (s *AllocatingFromProcedureSpec) Copy() plan.ProcedureSpec
func (AllocatingFromProcedureSpec) Cost ¶ added in v0.28.3
func (AllocatingFromProcedureSpec) Cost(inStats []plan.Statistics) (cost plan.Cost, outStats plan.Statistics)
func (AllocatingFromProcedureSpec) Kind ¶ added in v0.28.3
func (AllocatingFromProcedureSpec) Kind() plan.ProcedureKind
func (*AllocatingFromProcedureSpec) Run ¶ added in v0.28.3
func (s *AllocatingFromProcedureSpec) Run(ctx context.Context)
type CreateNewTransformationWithDeps ¶ added in v0.51.0
type CreateNewTransformationWithDeps func(d execute.Dataset, deps flux.Dependencies, cache execute.TableBuilderCache, spec plan.ProcedureSpec) (execute.Transformation, error)
sql.createToSQLTransformation() and kafka.createToKafkaTransformation() converts plan.ProcedureSpec to their struct implementations ToSQLProcedureSpec and ToKafkaProcedureSpec respectively. This complicated the test harness requiring us to provide CreateNewTransformationWithDeps functions to do the plan.ProcedureSpec conversion and call the subsequent factory method namely: kafka.NewToKafkaTransformation() and sql.NewToSQLTransformation() See also: sql/to_test.go/TestToSql_NewTransformation and kafka/to_test.go/TestToKafka_NewTransformation
type DataStore ¶ added in v0.55.0
type DataStore struct { execute.ExecutionNode // contains filtered or unexported fields }
DataStore will store the incoming tables from an upstream transformation or source.
func NewDataStore ¶ added in v0.55.0
func NewDataStore() *DataStore
func (*DataStore) DiscardTable ¶ added in v0.55.0
func (*DataStore) ExpireTable ¶ added in v0.55.0
func (*DataStore) ForEachWithContext ¶ added in v0.55.0
func (*DataStore) RetractTable ¶ added in v0.55.0
func (*DataStore) SetTriggerSpec ¶ added in v0.55.0
func (d *DataStore) SetTriggerSpec(t plan.TriggerSpec)
func (*DataStore) UpdateProcessingTime ¶ added in v0.55.0
type Dataset ¶
type Dataset struct { ID execute.DatasetID Retractions []flux.GroupKey ProcessingTimeUpdates []execute.Time WatermarkUpdates []execute.Time Finished bool FinishedErr error }
func NewDataset ¶
func (*Dataset) AddTransformation ¶
func (d *Dataset) AddTransformation(t execute.Transformation)
func (*Dataset) SetTriggerSpec ¶
func (d *Dataset) SetTriggerSpec(t plan.TriggerSpec)
type FromProcedureSpec ¶
type FromProcedureSpec struct { execute.ExecutionNode // contains filtered or unexported fields }
FromProcedureSpec is a procedure spec AND an execution Node. It simulates the execution of a basic physical scan operation.
func NewFromProcedureSpec ¶
func NewFromProcedureSpec(data []*Table) *FromProcedureSpec
NewFromProcedureSpec specifies a from-test procedure with source data
func (*FromProcedureSpec) AddTransformation ¶
func (src *FromProcedureSpec) AddTransformation(t execute.Transformation)
func (*FromProcedureSpec) Copy ¶
func (src *FromProcedureSpec) Copy() plan.ProcedureSpec
func (*FromProcedureSpec) Cost ¶
func (src *FromProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics)
func (*FromProcedureSpec) Kind ¶
func (src *FromProcedureSpec) Kind() plan.ProcedureKind
func (*FromProcedureSpec) Run ¶
func (src *FromProcedureSpec) Run(ctx context.Context)
type NewTransformation ¶
type NewTransformation func(execute.Dataset, execute.TableBuilderCache) execute.Transformation
type ParallelFromProcedureSpec ¶ added in v0.157.0
type ParallelFromProcedureSpec struct { execute.ExecutionNode // contains filtered or unexported fields }
ParallelFromProcedureSpec is a procedure spec AND an execution Node similar to FromProcedureSpec. It differs in that it is aware of the possibility for parallel execution.
func NewParallelFromProcedureSpec ¶ added in v0.157.0
func NewParallelFromProcedureSpec(factor int, data []*ParallelTable) *ParallelFromProcedureSpec
NewParallelFromProcedureSpec specifies a from-test procedure with source data
func (*ParallelFromProcedureSpec) AddTransformation ¶ added in v0.157.0
func (src *ParallelFromProcedureSpec) AddTransformation(t execute.Transformation)
func (*ParallelFromProcedureSpec) Copy ¶ added in v0.157.0
func (src *ParallelFromProcedureSpec) Copy() plan.ProcedureSpec
func (*ParallelFromProcedureSpec) Cost ¶ added in v0.157.0
func (src *ParallelFromProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics)
func (*ParallelFromProcedureSpec) Kind ¶ added in v0.157.0
func (src *ParallelFromProcedureSpec) Kind() plan.ProcedureKind
func (*ParallelFromProcedureSpec) OutputAttributes ¶ added in v0.175.0
func (src *ParallelFromProcedureSpec) OutputAttributes() plan.PhysicalAttributes
func (*ParallelFromProcedureSpec) Run ¶ added in v0.157.0
func (src *ParallelFromProcedureSpec) Run(ctx context.Context)
type ParallelTable ¶ added in v0.157.0
type Result ¶
func ConvertResult ¶ added in v0.36.2
ConvertResult produces a result object from any flux.Result type.
func (*Result) Tables ¶
func (r *Result) Tables() flux.TableIterator
type RowWiseTable ¶ added in v0.14.0
type RowWiseTable struct {
*Table
}
RowWiseTable is a flux Table implementation that calls f once for each row in its Do method.
type SortedTables ¶
type SortedTables []*Table
func (SortedTables) Len ¶
func (b SortedTables) Len() int
func (SortedTables) Swap ¶
func (b SortedTables) Swap(i int, j int)
type SourceUrlValidationTestCases ¶ added in v0.51.0
type SourceUrlValidationTestCases []struct { Name string Spec plan.ProcedureSpec V url.Validator ErrMsg string }
Some sources are located by a URL. e.g. sql.from, socket.from the URL/DSN supplied by the user need to be validated by a URLValidator{} before we can establish the connection. This struct (as well as the Run() method) acts as a test harness for that.
func (*SourceUrlValidationTestCases) Run ¶ added in v0.51.0
func (testCases *SourceUrlValidationTestCases) Run(t *testing.T, fn execute.CreateSource)
type Table ¶
type Table struct { // GroupKey of the table. Does not need to be set explicitly. GroupKey flux.GroupKey // KeyCols is a list of column that are part of the group key. // The column type is deduced from the ColMeta slice. KeyCols []string // KeyValues is a list of values for the group key columns. // Only needs to be set when no data is present on the table. KeyValues []interface{} // ColMeta is a list of columns of the table. ColMeta []flux.ColMeta // Data is a list of rows, i.e. Data[row][col] // Each row must be a list with length equal to len(ColMeta) Data [][]interface{} // Err contains the error that should be returned // by this table when calling Do. Err error // Alloc is the allocator used to create the column readers. // Memory is not tracked unless this is set. Alloc memory.Allocator // IsDone indicates if this table has been used. IsDone bool }
Table is an implementation of execute.Table It is designed to make it easy to statically declare the data within the table. Not all fields need to be set. See comments on each field. Use Normalize to ensure that all fields are set before equality comparisons.
type TableIterator ¶
type TableTest ¶ added in v0.36.0
type TableTest struct { // NewFn returns a new TableIterator that can be processed. // The table iterator that is produced should have multiple // tables of different shapes and sizes to get coverage of // as much of the code as possible. The TableIterator will // be created once for each subtest. NewFn func(ctx context.Context, alloc memory.Allocator) flux.TableIterator // IsDone will report if the table is considered done for reading. // The call to Done should force this to be true, but it is possible // for this to return true before the table has been processed. IsDone func(flux.Table) bool }
type TestFlagger ¶ added in v0.152.0
type TestFlagger map[string]interface{}
type TfUrlValidationTest ¶ added in v0.51.0
type TfUrlValidationTest struct { CreateFn CreateNewTransformationWithDeps Cases []TfUrlValidationTestCase }
func (*TfUrlValidationTest) Run ¶ added in v0.51.0
func (test *TfUrlValidationTest) Run(t *testing.T)
type TfUrlValidationTestCase ¶ added in v0.51.0
type ToProcedureSpec ¶ added in v0.8.0
type ToProcedureSpec struct{}
ToProcedureSpec defines an output operation. That is, an operation that does not transform its input data but performs a side effect while passing its input data through to the next op.
func (*ToProcedureSpec) Copy ¶ added in v0.8.0
func (s *ToProcedureSpec) Copy() plan.ProcedureSpec
func (*ToProcedureSpec) Cost ¶ added in v0.8.0
func (s *ToProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics)
func (*ToProcedureSpec) Kind ¶ added in v0.8.0
func (s *ToProcedureSpec) Kind() plan.ProcedureKind
type ToTransformation ¶ added in v0.8.0
type ToTransformation struct { execute.ExecutionNode // contains filtered or unexported fields }
ToTransformation simulates an output or an identity transformation
func (*ToTransformation) Finish ¶ added in v0.8.0
func (t *ToTransformation) Finish(id execute.DatasetID, err error)
func (*ToTransformation) RetractTable ¶ added in v0.8.0
func (*ToTransformation) UpdateProcessingTime ¶ added in v0.8.0
func (*ToTransformation) UpdateWatermark ¶ added in v0.8.0
type YieldProcedureSpec ¶
type YieldProcedureSpec struct { plan.DefaultCost // contains filtered or unexported fields }
func (YieldProcedureSpec) Copy ¶
func (y YieldProcedureSpec) Copy() plan.ProcedureSpec
func (YieldProcedureSpec) Kind ¶
func (YieldProcedureSpec) Kind() plan.ProcedureKind
func (YieldProcedureSpec) YieldName ¶
func (y YieldProcedureSpec) YieldName() string