mock

package
v0.195.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2024 License: MIT Imports: 14 Imported by: 2

Documentation

Overview

Package mock contains mock implementations of the query package interfaces for testing.

Index

Constants

This section is empty.

Variables

View Source
var EmptyStatistics <-chan flux.Statistics

Functions

func CreateMockFromSource added in v0.26.0

func CreateMockFromSource(spec plan.ProcedureSpec, id execute.DatasetID, ctx execute.Administration) (execute.Source, error)

CreateMockFromSource will register a mock "from" source. Use it like this in the init() of your test:

execute.RegisterSource(influxdb.FromKind, mock.CreateMockFromSource)

Types

type Administration added in v0.50.0

type Administration struct {
	// contains filtered or unexported fields
}

Administration is a mock implementation of the execute.Administration interface. This may be used for tests that require implementation of this interface.

func AdministrationWithContext added in v0.51.0

func AdministrationWithContext(ctx context.Context) *Administration

func (*Administration) Allocator added in v0.50.0

func (a *Administration) Allocator() memory.Allocator

func (*Administration) Context added in v0.50.0

func (a *Administration) Context() context.Context

func (*Administration) ParallelOpts added in v0.157.0

func (a *Administration) ParallelOpts() execute.ParallelOpts

func (*Administration) Parents added in v0.50.0

func (a *Administration) Parents() []execute.DatasetID

func (*Administration) ResolveTime added in v0.50.0

func (a *Administration) ResolveTime(qt flux.Time) execute.Time

func (*Administration) StreamContext added in v0.50.0

func (a *Administration) StreamContext() execute.StreamContext

type AggregateParallelTransformation added in v0.171.0

type AggregateParallelTransformation struct {
	AggregateFn func(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error)
	MergeFn     func(into, from interface{}, mem memory.Allocator) (interface{}, error)
	ComputeFn   func(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error
	CloseFn     func() error
}

func (*AggregateParallelTransformation) Aggregate added in v0.171.0

func (a *AggregateParallelTransformation) Aggregate(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error)

func (*AggregateParallelTransformation) Close added in v0.171.0

func (*AggregateParallelTransformation) Compute added in v0.171.0

func (a *AggregateParallelTransformation) Compute(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error

func (*AggregateParallelTransformation) Merge added in v0.171.0

func (a *AggregateParallelTransformation) Merge(into, from interface{}, mem memory.Allocator) (interface{}, error)

type AggregateTransformation added in v0.127.0

type AggregateTransformation struct {
	AggregateFn func(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error)
	ComputeFn   func(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error
	CloseFn     func() error
}

func (*AggregateTransformation) Aggregate added in v0.127.0

func (a *AggregateTransformation) Aggregate(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error)

func (*AggregateTransformation) Close added in v0.148.0

func (a *AggregateTransformation) Close() error

func (*AggregateTransformation) Compute added in v0.127.0

func (a *AggregateTransformation) Compute(key flux.GroupKey, state interface{}, d *execute.TransportDataset, mem memory.Allocator) error

type AscendingTimeProvider added in v0.21.0

type AscendingTimeProvider struct {
	Start int64
}

AscendingTimeProvider provides ascending timestamps every nanosecond starting from Start.

func (*AscendingTimeProvider) CurrentTime added in v0.21.0

func (atp *AscendingTimeProvider) CurrentTime() values.Time

type Compiler

type Compiler struct {
	CompileFn func(ctx context.Context) (flux.Program, error)
	Type      flux.CompilerType
}

func (Compiler) Compile

func (c Compiler) Compile(ctx context.Context, runtime flux.Runtime) (flux.Program, error)

func (Compiler) CompilerType

func (c Compiler) CompilerType() flux.CompilerType

type Dependency added in v0.161.0

type Dependency struct {
	InjectFn func(ctx context.Context) context.Context
}

func (Dependency) Inject added in v0.161.0

func (d Dependency) Inject(ctx context.Context) context.Context

type Executor

type Executor struct {
	ExecuteFn func(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error)
}

Executor is a mock implementation of an execute.Executor.

func NewExecutor

func NewExecutor() *Executor

NewExecutor returns a mock Executor where its methods will return zero values.

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, p *plan.Spec, a memory.Allocator) (map[string]flux.Result, <-chan flux.Statistics, error)

type GroupTransformation added in v0.128.0

type GroupTransformation struct {
	ProcessFn func(chunk table.Chunk, d *execute.TransportDataset, mem memory.Allocator) error
	CloseFn   func() error
}

func (*GroupTransformation) Close added in v0.148.0

func (a *GroupTransformation) Close() error

func (*GroupTransformation) Process added in v0.128.0

type InfluxDBProvider added in v0.117.0

type InfluxDBProvider struct {
	influxdb.UnimplementedProvider
	WriterForFn func(ctx context.Context, conf influxdb.Config) (influxdb.Writer, error)
}

func (InfluxDBProvider) WriterFor added in v0.117.0

func (m InfluxDBProvider) WriterFor(ctx context.Context, conf influxdb.Config) (influxdb.Writer, error)

type MqttClient added in v0.136.0

type MqttClient struct {
	PublishFn func(ctx context.Context, topic string, qos byte, retain bool, payload interface{}) error
	CloseFn   func() error
}

func (MqttClient) Close added in v0.136.0

func (m MqttClient) Close() error

func (MqttClient) Publish added in v0.136.0

func (m MqttClient) Publish(ctx context.Context, topic string, qos byte, retain bool, payload interface{}) error

type MqttDialer added in v0.136.0

type MqttDialer struct {
	DialFn func(ctx context.Context, brokers []string, options mqtt.Options) (mqtt.Client, error)
}

func (MqttDialer) Dial added in v0.136.0

func (m MqttDialer) Dial(ctx context.Context, brokers []string, options mqtt.Options) (mqtt.Client, error)

type NarrowStateTransformation added in v0.130.0

type NarrowStateTransformation[T any] struct {
	ProcessFn func(chunk table.Chunk, state T, d *execute.TransportDataset, mem memory.Allocator) (T, bool, error)
	CloseFn   func() error
}

func (*NarrowStateTransformation[T]) Close added in v0.148.0

func (a *NarrowStateTransformation[T]) Close() error

func (*NarrowStateTransformation[T]) Process added in v0.130.0

func (n *NarrowStateTransformation[T]) Process(chunk table.Chunk, state T, d *execute.TransportDataset, mem memory.Allocator) (T, bool, error)

type NarrowTransformation added in v0.125.0

type NarrowTransformation struct {
	ProcessFn func(chunk table.Chunk, d *execute.TransportDataset, mem memory.Allocator) error
	CloseFn   func() error
}

func (*NarrowTransformation) Close added in v0.148.0

func (a *NarrowTransformation) Close() error

func (*NarrowTransformation) Process added in v0.125.0

type Program added in v0.26.0

type Program struct {
	StartFn   func(ctx context.Context, alloc memory.Allocator) (*Query, error)
	ExecuteFn func(ctx context.Context, q *Query, alloc memory.Allocator)
}

Program is a mock program that can be returned by the mock compiler. It will construct a mock query that will then be passed to ExecuteFn.

func (*Program) Start added in v0.26.0

func (p *Program) Start(ctx context.Context, alloc memory.Allocator) (flux.Query, error)

type Query added in v0.26.0

type Query struct {
	ResultsCh chan flux.Result
	CancelFn  func()

	Canceled chan struct{}
	// contains filtered or unexported fields
}

Query provides a customizable query that implements flux.Query. Results, as well as errors, statistics, and the cancel function can be set.

func (*Query) Cancel added in v0.26.0

func (q *Query) Cancel()

func (*Query) Done added in v0.26.0

func (q *Query) Done()

func (*Query) Err added in v0.26.0

func (q *Query) Err() error

func (*Query) ProduceResults added in v0.27.0

func (q *Query) ProduceResults(resultProvider func(results chan<- flux.Result, canceled <-chan struct{}))

ProduceResults lets the user provide a function to produce results on the channel returned by `Results`. `resultProvider` should check if `canceled` has been closed before sending results. E.g.: ```

 func (results chan<- flux.Result, canceled <-chan struct{}) {
	 for _, r := range resultsSlice {
		 select {
		 case <-canceled:
		 	 return
		 default:
			 results <- r
		 }
	 }
 }

``` `resultProvider` is run in a separate goroutine and Results() is closed after function completion. ProduceResults can be called only once per Query.

func (*Query) ProfilerResults added in v0.82.0

func (q *Query) ProfilerResults() (flux.ResultIterator, error)

func (*Query) Results added in v0.26.0

func (q *Query) Results() <-chan flux.Result

func (*Query) SetErr added in v0.27.0

func (q *Query) SetErr(err error)

SetErr sets the error for this query and `Cancel`s it

func (*Query) SetStatistics added in v0.27.0

func (q *Query) SetStatistics(stats flux.Statistics)

SetStatistics sets stats for this query. Stats will be available after `Done` is called.

func (*Query) Statistics added in v0.26.0

func (q *Query) Statistics() flux.Statistics

type SecretService added in v0.41.0

type SecretService map[string]string

func (SecretService) LoadSecret added in v0.41.0

func (s SecretService) LoadSecret(ctx context.Context, k string) (string, error)

type Source added in v0.26.0

type Source struct {
	execute.ExecutionNode
	AddTransformationFn func(transformation execute.Transformation)
	RunFn               func(ctx context.Context)
}

Source is a mock source that performs the given functions. By default it does nothing.

func (*Source) AddTransformation added in v0.26.0

func (s *Source) AddTransformation(t execute.Transformation)

func (*Source) Run added in v0.26.0

func (s *Source) Run(ctx context.Context)

type Transformation added in v0.125.0

type Transformation struct {
	ProcessFn func(id execute.DatasetID, tbl flux.Table) error
	FinishFn  func(id execute.DatasetID, err error)
}

func (*Transformation) Finish added in v0.125.0

func (t *Transformation) Finish(id execute.DatasetID, err error)

func (*Transformation) Process added in v0.125.0

func (t *Transformation) Process(id execute.DatasetID, tbl flux.Table) error

func (*Transformation) RetractTable added in v0.125.0

func (t *Transformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error

func (*Transformation) UpdateProcessingTime added in v0.125.0

func (t *Transformation) UpdateProcessingTime(id execute.DatasetID, ts execute.Time) error

func (*Transformation) UpdateWatermark added in v0.125.0

func (t *Transformation) UpdateWatermark(id execute.DatasetID, ts execute.Time) error

type Transport added in v0.125.0

type Transport struct {
	ProcessMessageFn func(m execute.Message) error
}

func (*Transport) Finish added in v0.125.0

func (t *Transport) Finish(id execute.DatasetID, err error)

func (*Transport) Process added in v0.125.0

func (t *Transport) Process(id execute.DatasetID, tbl flux.Table) error

func (*Transport) ProcessMessage added in v0.125.0

func (t *Transport) ProcessMessage(m execute.Message) error

func (*Transport) RetractTable added in v0.125.0

func (t *Transport) RetractTable(id execute.DatasetID, key flux.GroupKey) error

func (*Transport) UpdateProcessingTime added in v0.125.0

func (t *Transport) UpdateProcessingTime(id execute.DatasetID, ts execute.Time) error

func (*Transport) UpdateWatermark added in v0.125.0

func (t *Transport) UpdateWatermark(id execute.DatasetID, ts execute.Time) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL