executetest

package
v0.195.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2024 License: MIT Imports: 33 Imported by: 2

Documentation

Overview

Package executetest contains utilities for testing the query execution phase.

Index

Constants

View Source
const AllocatingFromTestKind = "allocating-from-test"
View Source
const FromTestKind = "from-test"
View Source
const ParallelFromTestKind = "parallel-from-test"
View Source
const ParallelGroupColName = "_parallel_group"
View Source
const ToTestKind = "to-test"

ToTestKind represents an side-effect producing kind for testing

View Source
const YieldKind = "yield-test"

Variables

View Source
var UnlimitedAllocator = &memory.ResourceAllocator{
	Allocator: arrowmem.DefaultAllocator,
}

Functions

func AggFuncBenchmarkHelper

func AggFuncBenchmarkHelper(b *testing.B, agg execute.SimpleAggregate, data *array.Float, want interface{})

AggFuncBenchmarkHelper benchmarks the aggregate function over data and compares to wantValue

func AggFuncTestHelper

func AggFuncTestHelper(t *testing.T, agg execute.SimpleAggregate, data *array.Float, want interface{})

AggFuncTestHelper splits the data in half, runs Do over each split and compares the Value to want.

func CreateAllocatingFromSource added in v0.28.3

func CreateAllocatingFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)

func CreateParallelFromSource added in v0.157.0

func CreateParallelFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error)

func EqualResult added in v0.36.2

func EqualResult(w, g flux.Result) error

EqualResult compares to results for equality

func EqualResultIterators added in v0.36.2

func EqualResultIterators(want, got flux.ResultIterator) error

EqualResultIterators compares two ResultIterators for equality

func EqualResults added in v0.7.1

func EqualResults(want, got []flux.Result) error

EqualResults compares two lists of Flux Results for equality

func FunctionExpression added in v0.68.0

func FunctionExpression(t testing.TB, source string, args ...interface{}) *semantic.FunctionExpression

FunctionExpression will take a function expression as a string and return the *semantic.FunctionExpression.

This will cause a fatal error in the test on failure.

func IndexSelectorFuncBenchmarkHelper

func IndexSelectorFuncBenchmarkHelper(b *testing.B, selector execute.IndexSelector, data flux.BufferedTable)

func IndexSelectorFuncTestHelper

func IndexSelectorFuncTestHelper(t *testing.T, selector execute.IndexSelector, data flux.Table, want [][]int)

func MustCopyTable

func MustCopyTable(tbl flux.Table) flux.Table

func NewDefaultTestFlagger added in v0.185.0

func NewDefaultTestFlagger() feature.Dependency

NewDefaultTestFlagger gives a flagger dependency for a test harnesses to use as a baseline.

Likely this will be made redundant by <https://github.com/influxdata/flux/issues/4777> since testcases will then be able to manage their own feature selection.

func NewDevNullStore added in v0.55.0

func NewDevNullStore() execute.Transformation

func NewTestExecuteDependencies added in v0.41.0

func NewTestExecuteDependencies() flux.Dependency

func NewToProcedure added in v0.8.0

func NewYieldProcedureSpec

func NewYieldProcedureSpec(name string) plan.PhysicalProcedureSpec

func NewYieldTransformation added in v0.21.0

func NewYieldTransformation(d execute.Dataset, cache execute.TableBuilderCache) *yieldTransformation

func NormalizeTables

func NormalizeTables(bs []*Table)

NormalizeTables ensures that each table is normalized and that tables and columns are sorted in alphabetical order for consistent testing

func ProcessBenchmarkHelper added in v0.55.0

func ProcessBenchmarkHelper(
	b *testing.B,
	genInput func(alloc memory.Allocator) (flux.TableIterator, error),
	create func(id execute.DatasetID, alloc memory.Allocator) (execute.Transformation, execute.Dataset),
)

func ProcessTestHelper

func ProcessTestHelper(
	t *testing.T,
	data []flux.Table,
	want []*Table,
	wantErr error,
	create func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation,
	floatOptions ...cmp.Option,
)

floatOptions is a list of comparison options for floating point values. if not passed by the caller, defaultFloatOptions will be used

func ProcessTestHelper2 added in v0.49.0

func ProcessTestHelper2(
	t *testing.T,
	data []flux.Table,
	want []*Table,
	wantErr error,
	create func(id execute.DatasetID, alloc memory.Allocator) (execute.Transformation, execute.Dataset),
	floatOptions ...cmp.Option,
)

floatOptions is a list of comparison options for floating point values. if not passed by the caller, defaultFloatOptions will be used

func RandomDatasetID

func RandomDatasetID() execute.DatasetID

func RowSelectorFuncBenchmarkHelper

func RowSelectorFuncBenchmarkHelper(b *testing.B, selector execute.RowSelector, data flux.BufferedTable)

func RowSelectorFuncTestHelper

func RowSelectorFuncTestHelper(t *testing.T, selector execute.RowSelector, data flux.Table, want []execute.Row)

func RunBenchmark added in v0.59.0

func RunBenchmark(
	b *testing.B,
	tables []flux.BufferedTable,
	create func(id execute.DatasetID, alloc memory.Allocator) (execute.Transformation, execute.Dataset),
	alloc memory.Allocator,
)

func RunSourceHelper added in v0.66.0

func RunSourceHelper(
	t *testing.T,
	ctx context.Context,
	want []*Table,
	wantErr error,
	create func(id execute.DatasetID) execute.Source,
)

RunSourceHelper is a helper for testing an execute.Source. This can be called with a list of wanted tables from the source. The create function should create the source. If there is an error creating the source, `t.Fatal` can be called to abort the unit test by calling it from inside of a closure.

func RunTableTests added in v0.36.0

func RunTableTests(t *testing.T, tt TableTest)

RunTableTests will run the common table tests over each table in the returned TableIterator. The function will be called for each test.

func TransformationPassThroughTestHelper

func TransformationPassThroughTestHelper(t *testing.T, newTr NewTransformation)

Types

type AllocatingFromProcedureSpec added in v0.28.3

type AllocatingFromProcedureSpec struct {
	execute.ExecutionNode
	ByteCount int
	// contains filtered or unexported fields
}

AllocatingFromProcedureSpec is a procedure spec AND an execution node that allocates ByteCount bytes during execution.

func (*AllocatingFromProcedureSpec) AddTransformation added in v0.28.3

func (s *AllocatingFromProcedureSpec) AddTransformation(t execute.Transformation)

func (*AllocatingFromProcedureSpec) Copy added in v0.28.3

func (AllocatingFromProcedureSpec) Cost added in v0.28.3

func (AllocatingFromProcedureSpec) Cost(inStats []plan.Statistics) (cost plan.Cost, outStats plan.Statistics)

func (AllocatingFromProcedureSpec) Kind added in v0.28.3

func (*AllocatingFromProcedureSpec) Run added in v0.28.3

type ColReader

type ColReader struct {
	// contains filtered or unexported fields
}

func (*ColReader) Bools

func (cr *ColReader) Bools(j int) *array.Boolean

func (*ColReader) Cols

func (cr *ColReader) Cols() []flux.ColMeta

func (*ColReader) Floats

func (cr *ColReader) Floats(j int) *array.Float

func (*ColReader) Ints

func (cr *ColReader) Ints(j int) *array.Int

func (*ColReader) Key

func (cr *ColReader) Key() flux.GroupKey

func (*ColReader) Len

func (cr *ColReader) Len() int

func (*ColReader) Release added in v0.31.0

func (cr *ColReader) Release()

func (*ColReader) Retain added in v0.31.0

func (cr *ColReader) Retain()

func (*ColReader) Strings

func (cr *ColReader) Strings(j int) *array.String

func (*ColReader) Times

func (cr *ColReader) Times(j int) *array.Int

func (*ColReader) UInts

func (cr *ColReader) UInts(j int) *array.Uint

type CreateNewTransformationWithDeps added in v0.51.0

type CreateNewTransformationWithDeps func(d execute.Dataset, deps flux.Dependencies,
	cache execute.TableBuilderCache, spec plan.ProcedureSpec) (execute.Transformation, error)

sql.createToSQLTransformation() and kafka.createToKafkaTransformation() converts plan.ProcedureSpec to their struct implementations ToSQLProcedureSpec and ToKafkaProcedureSpec respectively. This complicated the test harness requiring us to provide CreateNewTransformationWithDeps functions to do the plan.ProcedureSpec conversion and call the subsequent factory method namely: kafka.NewToKafkaTransformation() and sql.NewToSQLTransformation() See also: sql/to_test.go/TestToSql_NewTransformation and kafka/to_test.go/TestToKafka_NewTransformation

type DataStore added in v0.55.0

type DataStore struct {
	execute.ExecutionNode
	// contains filtered or unexported fields
}

DataStore will store the incoming tables from an upstream transformation or source.

func NewDataStore added in v0.55.0

func NewDataStore() *DataStore

func (*DataStore) DiscardTable added in v0.55.0

func (d *DataStore) DiscardTable(key flux.GroupKey)

func (*DataStore) Err added in v0.55.0

func (d *DataStore) Err() error

func (*DataStore) ExpireTable added in v0.55.0

func (d *DataStore) ExpireTable(key flux.GroupKey)

func (*DataStore) Finish added in v0.55.0

func (d *DataStore) Finish(id execute.DatasetID, err error)

func (*DataStore) ForEach added in v0.55.0

func (d *DataStore) ForEach(f func(key flux.GroupKey) error) error

func (*DataStore) ForEachWithContext added in v0.55.0

func (d *DataStore) ForEachWithContext(f func(flux.GroupKey, execute.Trigger, execute.TableContext) error) error

func (*DataStore) Process added in v0.55.0

func (d *DataStore) Process(id execute.DatasetID, tbl flux.Table) error

func (*DataStore) RetractTable added in v0.55.0

func (d *DataStore) RetractTable(id execute.DatasetID, key flux.GroupKey) error

func (*DataStore) SetTriggerSpec added in v0.55.0

func (d *DataStore) SetTriggerSpec(t plan.TriggerSpec)

func (*DataStore) Table added in v0.55.0

func (d *DataStore) Table(key flux.GroupKey) (flux.Table, error)

func (*DataStore) UpdateProcessingTime added in v0.55.0

func (d *DataStore) UpdateProcessingTime(id execute.DatasetID, t execute.Time) error

func (*DataStore) UpdateWatermark added in v0.55.0

func (d *DataStore) UpdateWatermark(id execute.DatasetID, t execute.Time) error

type Dataset

type Dataset struct {
	ID                    execute.DatasetID
	Retractions           []flux.GroupKey
	ProcessingTimeUpdates []execute.Time
	WatermarkUpdates      []execute.Time
	Finished              bool
	FinishedErr           error
}

func NewDataset

func NewDataset(id execute.DatasetID) *Dataset

func (*Dataset) AddTransformation

func (d *Dataset) AddTransformation(t execute.Transformation)

func (*Dataset) Finish

func (d *Dataset) Finish(err error)

func (*Dataset) RetractTable

func (d *Dataset) RetractTable(key flux.GroupKey) error

func (*Dataset) SetTriggerSpec

func (d *Dataset) SetTriggerSpec(t plan.TriggerSpec)

func (*Dataset) UpdateProcessingTime

func (d *Dataset) UpdateProcessingTime(t execute.Time) error

func (*Dataset) UpdateWatermark

func (d *Dataset) UpdateWatermark(mark execute.Time) error

type FromProcedureSpec

type FromProcedureSpec struct {
	execute.ExecutionNode
	// contains filtered or unexported fields
}

FromProcedureSpec is a procedure spec AND an execution Node. It simulates the execution of a basic physical scan operation.

func NewFromProcedureSpec

func NewFromProcedureSpec(data []*Table) *FromProcedureSpec

NewFromProcedureSpec specifies a from-test procedure with source data

func (*FromProcedureSpec) AddTransformation

func (src *FromProcedureSpec) AddTransformation(t execute.Transformation)

func (*FromProcedureSpec) Copy

func (src *FromProcedureSpec) Copy() plan.ProcedureSpec

func (*FromProcedureSpec) Cost

func (src *FromProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics)

func (*FromProcedureSpec) Kind

func (src *FromProcedureSpec) Kind() plan.ProcedureKind

func (*FromProcedureSpec) Run

func (src *FromProcedureSpec) Run(ctx context.Context)

type ParallelFromProcedureSpec added in v0.157.0

type ParallelFromProcedureSpec struct {
	execute.ExecutionNode
	// contains filtered or unexported fields
}

ParallelFromProcedureSpec is a procedure spec AND an execution Node similar to FromProcedureSpec. It differs in that it is aware of the possibility for parallel execution.

func NewParallelFromProcedureSpec added in v0.157.0

func NewParallelFromProcedureSpec(factor int, data []*ParallelTable) *ParallelFromProcedureSpec

NewParallelFromProcedureSpec specifies a from-test procedure with source data

func (*ParallelFromProcedureSpec) AddTransformation added in v0.157.0

func (src *ParallelFromProcedureSpec) AddTransformation(t execute.Transformation)

func (*ParallelFromProcedureSpec) Copy added in v0.157.0

func (*ParallelFromProcedureSpec) Cost added in v0.157.0

func (*ParallelFromProcedureSpec) Kind added in v0.157.0

func (*ParallelFromProcedureSpec) OutputAttributes added in v0.175.0

func (src *ParallelFromProcedureSpec) OutputAttributes() plan.PhysicalAttributes

func (*ParallelFromProcedureSpec) Run added in v0.157.0

type ParallelTable added in v0.157.0

type ParallelTable struct {
	*Table
	// ResidesOnPartition indicates which partition the table is on.
	ResidesOnPartition int
	// ParallelGroup is assigned during execution so we can see in the
	// execution output which group the data originated from.
	ParallelGroup int
}

func (*ParallelTable) Do added in v0.157.0

func (t *ParallelTable) Do(f func(flux.ColReader) error) error

type Result

type Result struct {
	Nm   string
	Tbls []*Table
	Err  error
}

func ConvertResult added in v0.36.2

func ConvertResult(result flux.Result) *Result

ConvertResult produces a result object from any flux.Result type.

func NewResult

func NewResult(tables []*Table) *Result

func (*Result) Name

func (r *Result) Name() string

func (*Result) Normalize

func (r *Result) Normalize()

func (*Result) Tables

func (r *Result) Tables() flux.TableIterator

type RowWiseTable added in v0.14.0

type RowWiseTable struct {
	*Table
}

RowWiseTable is a flux Table implementation that calls f once for each row in its Do method.

func (*RowWiseTable) Do added in v0.14.0

func (t *RowWiseTable) Do(f func(flux.ColReader) error) error

Do calls f once for each row in the table

type SortedTables

type SortedTables []*Table

func (SortedTables) Len

func (b SortedTables) Len() int

func (SortedTables) Less

func (b SortedTables) Less(i int, j int) bool

func (SortedTables) Swap

func (b SortedTables) Swap(i int, j int)

type SourceUrlValidationTestCases added in v0.51.0

type SourceUrlValidationTestCases []struct {
	Name   string
	Spec   plan.ProcedureSpec
	V      url.Validator
	ErrMsg string
}

Some sources are located by a URL. e.g. sql.from, socket.from the URL/DSN supplied by the user need to be validated by a URLValidator{} before we can establish the connection. This struct (as well as the Run() method) acts as a test harness for that.

func (*SourceUrlValidationTestCases) Run added in v0.51.0

type Table

type Table struct {
	// GroupKey of the table. Does not need to be set explicitly.
	GroupKey flux.GroupKey
	// KeyCols is a list of column that are part of the group key.
	// The column type is deduced from the ColMeta slice.
	KeyCols []string
	// KeyValues is a list of values for the group key columns.
	// Only needs to be set when no data is present on the table.
	KeyValues []interface{}
	// ColMeta is a list of columns of the table.
	ColMeta []flux.ColMeta
	// Data is a list of rows, i.e. Data[row][col]
	// Each row must be a list with length equal to len(ColMeta)
	Data [][]interface{}
	// Err contains the error that should be returned
	// by this table when calling Do.
	Err error
	// Alloc is the allocator used to create the column readers.
	// Memory is not tracked unless this is set.
	Alloc memory.Allocator
	// IsDone indicates if this table has been used.
	IsDone bool
}

Table is an implementation of execute.Table It is designed to make it easy to statically declare the data within the table. Not all fields need to be set. See comments on each field. Use Normalize to ensure that all fields are set before equality comparisons.

func ConvertTable

func ConvertTable(tbl flux.Table) (*Table, error)

func TablesFromCache

func TablesFromCache(c execute.DataCache) ([]*Table, error)

func (*Table) Cols

func (t *Table) Cols() []flux.ColMeta

func (*Table) Do

func (t *Table) Do(f func(flux.ColReader) error) error

func (*Table) Done added in v0.31.0

func (t *Table) Done()

func (*Table) Empty

func (t *Table) Empty() bool

func (*Table) Key

func (t *Table) Key() flux.GroupKey

func (*Table) Normalize

func (t *Table) Normalize()

Normalize ensures all fields of the table are set correctly.

type TableIterator

type TableIterator struct {
	Tables []*Table
	Err    error
}

func (*TableIterator) Do

func (ti *TableIterator) Do(f func(flux.Table) error) error

type TableTest added in v0.36.0

type TableTest struct {
	// NewFn returns a new TableIterator that can be processed.
	// The table iterator that is produced should have multiple
	// tables of different shapes and sizes to get coverage of
	// as much of the code as possible. The TableIterator will
	// be created once for each subtest.
	NewFn func(ctx context.Context, alloc memory.Allocator) flux.TableIterator

	// IsDone will report if the table is considered done for reading.
	// The call to Done should force this to be true, but it is possible
	// for this to return true before the table has been processed.
	IsDone func(flux.Table) bool
}

type TestFlagger added in v0.152.0

type TestFlagger map[string]interface{}

func (TestFlagger) FlagValue added in v0.152.0

func (t TestFlagger) FlagValue(ctx context.Context, flag feature.Flag) interface{}

type TfUrlValidationTest added in v0.51.0

type TfUrlValidationTest struct {
	CreateFn CreateNewTransformationWithDeps
	Cases    []TfUrlValidationTestCase
}

func (*TfUrlValidationTest) Run added in v0.51.0

func (test *TfUrlValidationTest) Run(t *testing.T)

type TfUrlValidationTestCase added in v0.51.0

type TfUrlValidationTestCase struct {
	Name      string
	Spec      plan.ProcedureSpec
	Validator url.Validator
	WantErr   string
}

type ToProcedureSpec added in v0.8.0

type ToProcedureSpec struct{}

ToProcedureSpec defines an output operation. That is, an operation that does not transform its input data but performs a side effect while passing its input data through to the next op.

func (*ToProcedureSpec) Copy added in v0.8.0

func (*ToProcedureSpec) Cost added in v0.8.0

func (s *ToProcedureSpec) Cost(inStats []plan.Statistics) (plan.Cost, plan.Statistics)

func (*ToProcedureSpec) Kind added in v0.8.0

type ToTransformation added in v0.8.0

type ToTransformation struct {
	execute.ExecutionNode
	// contains filtered or unexported fields
}

ToTransformation simulates an output or an identity transformation

func (*ToTransformation) Finish added in v0.8.0

func (t *ToTransformation) Finish(id execute.DatasetID, err error)

func (*ToTransformation) Process added in v0.8.0

func (t *ToTransformation) Process(id execute.DatasetID, tbl flux.Table) error

func (*ToTransformation) RetractTable added in v0.8.0

func (t *ToTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error

func (*ToTransformation) UpdateProcessingTime added in v0.8.0

func (t *ToTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error

func (*ToTransformation) UpdateWatermark added in v0.8.0

func (t *ToTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error

type YieldProcedureSpec

type YieldProcedureSpec struct {
	plan.DefaultCost
	// contains filtered or unexported fields
}

func (YieldProcedureSpec) Copy

func (YieldProcedureSpec) Kind

func (YieldProcedureSpec) YieldName

func (y YieldProcedureSpec) YieldName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL