Documentation ¶
Overview ¶
Package mock contains mock implementations of the query package interfaces for testing.
Index ¶
- Variables
- func CreateMockFromSource(spec plan.ProcedureSpec, id execute.DatasetID, ctx execute.Administration) (execute.Source, error)
- type Administration
- func (a *Administration) Allocator() memory.Allocator
- func (a *Administration) Context() context.Context
- func (a *Administration) ParallelOpts() execute.ParallelOpts
- func (a *Administration) Parents() []execute.DatasetID
- func (a *Administration) ResolveTime(qt flux.Time) execute.Time
- func (a *Administration) StreamContext() execute.StreamContext
- type AggregateParallelTransformation
- func (a *AggregateParallelTransformation) Aggregate(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error)
- func (a *AggregateParallelTransformation) Close() error
- func (a *AggregateParallelTransformation) Compute(key flux.GroupKey, state interface{}, d *execute.TransportDataset, ...) error
- func (a *AggregateParallelTransformation) Merge(into, from interface{}, mem memory.Allocator) (interface{}, error)
- type AggregateTransformation
- type AscendingTimeProvider
- type Compiler
- type Dependency
- type Executor
- type GroupTransformation
- type InfluxDBProvider
- type MqttClient
- type MqttDialer
- type NarrowStateTransformation
- type NarrowTransformation
- type Program
- type Query
- func (q *Query) Cancel()
- func (q *Query) Done()
- func (q *Query) Err() error
- func (q *Query) ProduceResults(resultProvider func(results chan<- flux.Result, canceled <-chan struct{}))
- func (q *Query) ProfilerResults() (flux.ResultIterator, error)
- func (q *Query) Results() <-chan flux.Result
- func (q *Query) SetErr(err error)
- func (q *Query) SetStatistics(stats flux.Statistics)
- func (q *Query) Statistics() flux.Statistics
- type SecretService
- type Source
- type Transformation
- func (t *Transformation) Finish(id execute.DatasetID, err error)
- func (t *Transformation) Process(id execute.DatasetID, tbl flux.Table) error
- func (t *Transformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (t *Transformation) UpdateProcessingTime(id execute.DatasetID, ts execute.Time) error
- func (t *Transformation) UpdateWatermark(id execute.DatasetID, ts execute.Time) error
- type Transport
- func (t *Transport) Finish(id execute.DatasetID, err error)
- func (t *Transport) Process(id execute.DatasetID, tbl flux.Table) error
- func (t *Transport) ProcessMessage(m execute.Message) error
- func (t *Transport) RetractTable(id execute.DatasetID, key flux.GroupKey) error
- func (t *Transport) UpdateProcessingTime(id execute.DatasetID, ts execute.Time) error
- func (t *Transport) UpdateWatermark(id execute.DatasetID, ts execute.Time) error
Constants ¶
This section is empty.
Variables ¶
var EmptyStatistics <-chan flux.Statistics
Functions ¶
func CreateMockFromSource ¶
func CreateMockFromSource(spec plan.ProcedureSpec, id execute.DatasetID, ctx execute.Administration) (execute.Source, error)
CreateMockFromSource will register a mock "from" source. Use it like this in the init() of your test:
execute.RegisterSource(influxdb.FromKind, mock.CreateMockFromSource)
Types ¶
type Administration ¶
type Administration struct {
// contains filtered or unexported fields
}
Administration is a mock implementation of the execute.Administration interface. This may be used for tests that require implementation of this interface.
func AdministrationWithContext ¶
func AdministrationWithContext(ctx context.Context) *Administration
func (*Administration) Allocator ¶
func (a *Administration) Allocator() memory.Allocator
func (*Administration) Context ¶
func (a *Administration) Context() context.Context
func (*Administration) ParallelOpts ¶
func (a *Administration) ParallelOpts() execute.ParallelOpts
func (*Administration) Parents ¶
func (a *Administration) Parents() []execute.DatasetID
func (*Administration) ResolveTime ¶
func (a *Administration) ResolveTime(qt flux.Time) execute.Time
func (*Administration) StreamContext ¶
func (a *Administration) StreamContext() execute.StreamContext
type AggregateParallelTransformation ¶
type AggregateParallelTransformation struct { AggregateFn func(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error) MergeFn func(into, from interface{}, mem memory.Allocator) (interface{}, error) ComputeFn func(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error CloseFn func() error }
func (*AggregateParallelTransformation) Close ¶
func (a *AggregateParallelTransformation) Close() error
func (*AggregateParallelTransformation) Compute ¶
func (a *AggregateParallelTransformation) Compute(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error
type AggregateTransformation ¶
type AggregateTransformation struct { AggregateFn func(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error) ComputeFn func(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error CloseFn func() error }
func (*AggregateTransformation) Close ¶
func (a *AggregateTransformation) Close() error
func (*AggregateTransformation) Compute ¶
func (a *AggregateTransformation) Compute(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error
type AscendingTimeProvider ¶
type AscendingTimeProvider struct {
Start int64
}
AscendingTimeProvider provides ascending timestamps every nanosecond starting from Start.
func (*AscendingTimeProvider) CurrentTime ¶
func (atp *AscendingTimeProvider) CurrentTime() values.Time
type Compiler ¶
type Compiler struct { CompileFn func(ctx context.Context) (flux.Program, error) Type flux.CompilerType }
func (Compiler) CompilerType ¶
func (c Compiler) CompilerType() flux.CompilerType
type Dependency ¶
type Executor ¶
type Executor struct {
ExecuteFn func(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error)
}
Executor is a mock implementation of an execute.Executor.
func NewExecutor ¶
func NewExecutor() *Executor
NewExecutor returns a mock Executor where its methods will return zero values.
type GroupTransformation ¶
type GroupTransformation struct { ProcessFn func(chunk table.Chunk, d *execute.TransportDataset, mem memory.Allocator) error CloseFn func() error }
func (*GroupTransformation) Close ¶
func (a *GroupTransformation) Close() error
func (*GroupTransformation) Process ¶
func (n *GroupTransformation) Process(chunk table.Chunk, d *execute.TransportDataset, mem memory.Allocator) error
type InfluxDBProvider ¶
type MqttClient ¶
type MqttClient struct { PublishFn func(ctx context.Context, topic string, qos byte, retain bool, payload interface{}) error CloseFn func() error }
func (MqttClient) Close ¶
func (m MqttClient) Close() error
type MqttDialer ¶
type NarrowStateTransformation ¶
type NarrowStateTransformation[T any] struct { ProcessFn func(chunk table.Chunk, state T, d *execute.TransportDataset, mem memory.Allocator) (T, bool, error) CloseFn func() error }
func (*NarrowStateTransformation[T]) Close ¶
func (a *NarrowStateTransformation[T]) Close() error
type NarrowTransformation ¶
type NarrowTransformation struct { ProcessFn func(chunk table.Chunk, d *execute.TransportDataset, mem memory.Allocator) error CloseFn func() error }
func (*NarrowTransformation) Close ¶
func (a *NarrowTransformation) Close() error
func (*NarrowTransformation) Process ¶
func (n *NarrowTransformation) Process(chunk table.Chunk, d *execute.TransportDataset, mem memory.Allocator) error
type Program ¶
type Program struct { StartFn func(ctx context.Context, alloc memory.Allocator) (*Query, error) ExecuteFn func(ctx context.Context, q *Query, alloc memory.Allocator) }
Program is a mock program that can be returned by the mock compiler. It will construct a mock query that will then be passed to ExecuteFn.
type Query ¶
type Query struct { ResultsCh chan flux.Result CancelFn func() Canceled chan struct{} // contains filtered or unexported fields }
Query provides a customizable query that implements flux.Query. Results, as well as errors, statistics, and the cancel function can be set.
func (*Query) ProduceResults ¶
func (q *Query) ProduceResults(resultProvider func(results chan<- flux.Result, canceled <-chan struct{}))
ProduceResults lets the user provide a function to produce results on the channel returned by `Results`. `resultProvider` should check if `canceled` has been closed before sending results. E.g.: ```
func (results chan<- flux.Result, canceled <-chan struct{}) { for _, r := range resultsSlice { select { case <-canceled: return default: results <- r } } }
``` `resultProvider` is run in a separate goroutine and Results() is closed after function completion. ProduceResults can be called only once per Query.
func (*Query) ProfilerResults ¶
func (q *Query) ProfilerResults() (flux.ResultIterator, error)
func (*Query) SetStatistics ¶
func (q *Query) SetStatistics(stats flux.Statistics)
SetStatistics sets stats for this query. Stats will be available after `Done` is called.
func (*Query) Statistics ¶
func (q *Query) Statistics() flux.Statistics
type SecretService ¶
func (SecretService) LoadSecret ¶
type Source ¶
type Source struct { execute.ExecutionNode AddTransformationFn func(transformation execute.Transformation) RunFn func(ctx context.Context) }
Source is a mock source that performs the given functions. By default it does nothing.
func (*Source) AddTransformation ¶
func (s *Source) AddTransformation(t execute.Transformation)
type Transformation ¶
type Transformation struct { ProcessFn func(id execute.DatasetID, tbl flux.Table) error FinishFn func(id execute.DatasetID, err error) }