execute

package
v0.149.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2022 License: MIT Imports: 35 Imported by: 27

README

Flux Engine Developer Guide

This guide covers the Flux Engine design and related concepts. The engine includes:

  • Execution engine
  • Builtin sources and transformations
  • Memory layout and resource management

Concepts

The following concepts are important to understanding the Flux engine and the operations that can be executed within the engine. This guide will reference these concepts with the assumption that the reader has reviewed the following section.

Tables and Table Chunks

In Flux, a Table is a set of ordered rows grouped together by a common group key. The group key is a set of key/value pairs. The elements of the group key appear as columns in the table with the element key appearing as the column name and the element value appearing as the value, constant across all rows. In practical terms, this means that for any columns that are in the group key, the values in that table will all be the same.

The table will also contain columns that are not referenced by the group key. These columns are free to be any value as long as they are all the same type.

For a table, the following invariants should hold:

  • All values inside a column that is part of the group key should have the same value.
  • That value should match the group key's value.
  • The values in a column should all be of the same type.

The last of these invariants is loosely held. This means that while they should be true, individual transformations may not make an effort to enforce them. That means that if a transformation would have its performance affected by enforcing the invariant and the invariant does not affect the transformation's correctness, the transformation may not enforce it when producing the output. For an example, the group() transformation does not enforce the last invariant and relies on downstream transformations to enforce it.

The set of rows contained within a table are processed in chunks. A table chunk is a subset of the rows within a table that are in-memory.

The rows within a table are ordered. When table chunks are sent, the rows within the first are all ordered before the rows in the second. Transformations may choose to change the order, or they may choose to have the order be meaningful. For example, the derivative() function gives meaning to the order of the input rows while sort() will rearrange the order.

A table chunk is composed of a list of arrow arrays, each array corresponding to a column of the table.

Arrow Arrays

Apache Arrow is a language-independent columnar format. The flux engine utilizes the Arrow library to represent the columnar data inside of table chunks.

Execution Pipeline

The execution pipeline is the set of nodes passing data from a source to the result. The execution pipeline is composed of nodes. A node with no inputs is a source which produces data for nodes that are after, or downstream, of itself. A node that takes one or more inputs and produces an output is a transformation. The final node in the pipeline is the result which holds the results of the pipeline.

You can think of the execution pipeline in flux code.

A |> B |> C

This would translate into three nodes where A is a source and B and C are transformations. Flux code does not correspond directly with one function being one node. Some functions will be combined into a single node while other functions may get rearranged into other nodes or split into multiple nodes. It is the responsibility of the planner to convert flux code into an execution pipeline.

Dataset

A dataset is used to hold node state and manage sending messages to downstream nodes. Each transformation is associated with a single dataset.

When thinking about the execution pipeline, the dataset can be thought of as a distinct part of the node.

A |> B |> B(d) |> C |> C(d)
Message

A message is a signal that contains associated data that is being sent from one dataset to the downstream nodes. Each message has a lifetime where it is created by the sender and then acknowledged by the receiver. A message may hold onto memory and will release its reference to that memory after it has been acknowledged. The data contained within a message may be retained and used in other ways.

The following messages presently exist:

Name Data Description
ProcessChunk table.Chunk A table chunk is ready to be processed
FlushKey group key Data associated with the given group key should be flushed from the dataset
Finish error (optional) The upstream dataset will produce no more messages

The following messages exist, but are deprecated and should not be used by future transformations.

Name Data Description
Process flux.Table A full table is ready to be processed
UpdateWatermark time Data older than the given time will not be sent
UpdateProcessingTime time Marks the present time
RetractTable key Data associated with the given group key should be retracted
Memory Types

Data memory references memory used for storing user data. This is in contrast to process or program memory. Since flux is integrated in co-tenant environments, it needs to handle arbitrary user data and avoid crashing the system or causing a noisy neighbor performance problem. Flux does this by separating the concept of process memory and data memory.

Process memory is any memory required to execute code. Process memory is anything in Go that would allocate stack or heap memory and is tracked by the garbage collector. User code, by necessity, will use process memory. The amount of process memory used by a transformation should be designed to be fairly consistent. If a user has 10 rows or 10,000 rows, the process memory should not scale directly with the number of rows. This is not a strict requirement. There's no expectation that a transformation will use the same memory regardless of the number of rows, but just the requirement that it doesn't linearly scale with the number of rows or worse.

Data memory has different conditions. Data memory is memory that is used to store user data and data memory is tracked by the memory.Allocator. The flux engine places limits on the amount of data memory that can be used and will abort execution if more is used. It is allowed and expected that some transformations will have bad memory footprints for certain inputs.

The primary method of storing data is through immutable arrow arrays. There are also circumstances where mutable data is needed to implement an algorithm, but process memory is not an appropriate place to store that data (such as distinct() or sort()). See the Mutable Data section for how to handle these circumstances.

Side Effects

Side effects are metadata attached to a builtin function call to signify that the function performs actions outside the query execution. At least one side effect is required for a query to be valid. Examples of side effect functions are yield() and to(). The main package will turn an expression statement into a side effect by adding an implicit yield() to the end of the expression.

Execution Engine

The execution engine is central to executing flux queries.

Pipeline Creation

Before the execution engine begins, it's important to understand the steps that happen before execution. This is only a brief outline of those steps.

  • Text is parsed into an AST.
  • AST is converted into semantic graph.
  • Type inference runs on the semantic graph and assigns types to all nodes.
  • The interpreter uses a recursive-descent execution to evaluate the semantic nodes.
  • Side effects are gathered from the interpreter execution and side effects that are linked to table objects are collected into a spec.
  • The spec is converted into a plan spec (one-to-one mapping).
  • The plan spec is passed into the planner which executes logical rules, converts logical nodes to physical nodes, and then runs physical rules.
  • The plan spec is passed to the execution engine.

That last step is where the execution engine starts. The execution engine starts with an already constructed plan and the execution engine's job is to execute that as faithfully as possible.

Nodes and Transports

When initializing the execution engine, the plan contains a directional graph which can be converted into a pipeline. Each node in the graph corresponds to a source or a transformation. A source is a node that has no upstream (input) nodes. A source will produce data and send that data to the downstream (output) nodes. A transformation is a node that has one or more upstream (input) nodes. A transformation will also have one or more downstream (output) nodes. At the end of the pipeline, a result is waiting to read the output data.

Nodes are connected by transports. A transport exists to send messages from one node to the downstream node. Transformations implement the Transport interface using one of the transformation primitives mentioned in the next section.

Dispatcher and Consecutive Transport

Each transformation node in the pipeline implements the Transport interface and execution is controlled by the dispatcher. These transports are automatically wrapped by the consecutive transport. The consecutive transport is a transport on every node that keeps a message queue and processes messages from that queue inside of dispatcher worker threads.

The practical effect of this is that invoking ProcessMessage on a consecutive transport will not immediately execute the action associated with that message. Instead, the dispatcher will make decisions about which transport to run depending on the concurrency resource limit. If the concurrency limit is only one, then only one transformation will execute at a time. If the concurrency limit is more, we can have more than one transformation running concurrently. In all situations, it is impossible for the same node to execute in multiple dispatcher workers at the same time.

The dispatcher is initialized with a throughput. The throughput is used to determine how many messages will be processed by a single Transport before another Transport is given the worker thread. The throughput is not the concurrency.

Transformations

Most transformations fall into one of the following broad categories.

  • Narrow
  • Group
  • Aggregate

These three bases are the cornerstone of most transformations and understanding which one to choose will influence how you write your transformation.

Choosing a Transformation Type

A narrow transformation is one where the group key of the input is not changed and corresponds 1:1 with the output table's group key. A narrow transformation will also be able to produce its output as it receives its input without getting a final finish message to flush state. This is the simplest transformation type to implement and should be preferred over others when possible.

A group transformation is similar to a narrow transformation, except the output group key changes and group keys can be transformed one-to-one, one-to-many, many-to-one, or many-to-many. When a transformation will change the group key in some way but does not need a final finish message to send its output, this transformation should be preferred.

An aggregate transformation is a narrow transformation that is split into two phases: aggregation and computation. First, an aggregate transformation should meet the same requirement as a narrow transformation in regards to the group key input and output. The group key of the input is not changed and corresponds 1:1 with the output table's group key. After that, processing is split into the aggregation phase which reads the data, performs some processing, and outputs an intermediate state. When we receive the message that the group key can be flushed, we enter into the computation section of processing which turns the intermediate state into a materialized table.

Narrow Transformation

A narrow transformation is one where the group key of the input is not changed and corresponds 1:1 with the output table's group key. A narrow transformation will also be able to produce its output as it receives its input without getting a final finish message to flush state. This is the simplest transformation type to implement and should be preferred over others when possible.

There are two subtypes of narrow transformations: stateless and stateful.

A narrow transformation that is stateless is implemented using the NarrowTransformation interface.

type NarrowTransformation interface {
    Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error
}

The Process method is implemented to take a table chunk, transform it using the memory allocator for new allocations, and then send it to be processed by the dataset. A skeleton implementation is shown below:

func (t *MyNarrowTransformation) Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error {
    out, err := t.process(chunk, mem)
    if err != nil {
        return err
    }
    return d.Process(out)
}

func (t *MyNarrowTransformation) process(chunk table.Chunk, mem memory.Allocator) (table.Chunk, error) {
    /* transformation-specific logic */
}

Some examples where this version of the narrow transformation is used: filter() and fill().

An alternative is used when we need to maintain state between chunks.

type NarrowStateTransformation interface {
    Process(chunk table.Chunk, state interface{}, d *TransportDataset, mem memory.Allocator) (interface{}, bool, error)
}

The first time the group key is encountered, the state will be nil. It should be initialized by the Process method. The new state will be stored if it is returned along with the second return argument being true. If the second return argument is false, the state will not be modified. It is both ok and expected that the interface{} will be a pointer to a struct and will be modified in a mutable way. The state is not expected to be immutable.

A skeleton implementation is shown below:

type myState struct { ... }

func (t *MyNarrowTransformation) Process(chunk table.Chunk, state interface{}, d *TransportDataset, mem memory.Allocator) error {
    state := t.loadState(state)
    out, err := t.process(chunk, state, mem)
    if err != nil {
        return nil, false, err
    }

    if err := d.Process(out); err != nil {
        return nil, false, err
    }
    return state, true, nil
}

func (t *MyNarrowTransformation) loadState(state interface{}) *myState {
    if state == nil {
        return &myState{}
    }
    return state.(*myState)
}

func (t *MyNarrowTransformation) process(chunk table.Chunk, state *myState, mem memory.Allocator) (table.Chunk, error) {
    /* transformation-specific logic */
}

Some examples where this version of the narrow transformation is used: derivative().

Group Transformation

TODO: Write this section.

Aggregate Transformation

TODO: Need to refactor the existing aggregate transformation into a more generic interface.

Building Table Chunks

The above transformations involve taking input data, reading it, and producing a table. We need to know how to create a table chunk to create a table.

A table chunk is composed of a group key, a list of columns, and an array of values to represent each column. The following general steps are used to build every table:

  • Determine the columns and group key of the output table.
  • Determine the length of the table chunk.
  • Construct the array for each column.
  • Construct and send a table.Chunk using arrow.TableBuffer.
Determine the columns and group key of the output table

To determine the columns and group key of the output table will depend entirely on the transformation that is being implemented. Many transformations will not modify the group key. For transformations that do not modify the group key, the execute.NarrowTransformation transport implementation can greatly simplify the creation of those transformations.

Determine the length of the table chunk

After the columns and group key have been determined, the length of the next table chunk should be determined. Some transformations, like map(), will always output the same number of rows they receive in a chunk. These are the easiest. Others, like filter(), might reduce the length of the array and should determine the new length of the table chunk in advance. There are also cases where a transformation might need to rearrange data from different buffers or could produce more data. For these circumstances, chunk sizes should be limited to table.BufferSize.

It is not required that a transformation determine the length of a table chunk before producing one, but it is highly advised. Memory reallocation during table chunk creation is a top contributor to slowdown.

Construct the array for each column

We produce an array for each column in the table chunk using the github.com/influxdata/flux/array package. Each flux type corresponds to an array type according to the following table:

Flux Type Arrow Type
Int Int
UInt Uint
Float Float
String String
Bool Boolean
Time Int

At its simplest, creating an array is done using the given skeleton.

b := array.NewIntBuilder(mem)
b.Resize(10)
for i := 0; i < 10; i++ {
    b.Append(int64(i))
}
return b.NewArray()

Other techniques for building arrays efficiently are contained in the arrow arrays section below.

Construct and send a table.Chunk using arrow.TableBuffer

We construct a table.Chunk using arrow.TableBuffer.

buffer := arrow.TableBuffer{
    GroupKey: execute.NewGroupKey(...),
    Columns: []flux.ColMeta{...},
    Values: []array.Interface{...},
}
chunk := table.ChunkFromBuffer(buffer)

if err := d.Process(chunk); err != nil {
    return err
}

Arrow Arrays

When constructing arrow arrays, there are some general guidelines that apply to every transformation.

Preallocate Memory

Preallocate memory by determining the size of a chunk in advance and using Resize() to set the capacity of the builder. For strings, it is also helpful to preallocate memory for the data using ReserveData if this can be easily known. String appends are usually the biggest performance sink for efficiency.

Limit Chunk Sizes

Limit chunk sizes to table.BufferSize. The array values in a column are contained in contiguous data. When the chunk size gets larger, the memory allocator has to find a spot in memory that fits that large size. Larger chunk sizes are generally better for performance, but buffer sizes that are too large put too much pressure on the memory allocator and garbage collector. The table.BufferSize constant is an agreed upon size to balance these concerns. At the moment, this value is the same as the buffer size that comes from the influxdb storage engine. If we find that another buffer size works better in the future, we can change this one constant.

Prefer Column-Based Algorithms

Column-based algorithms are generally faster than row-based algorithms. A column-based algorithm is one that lends itself to constructing each column individually instead of by row. Consider the following two examples:

b.Resize(10)
switch b := b.(type) {
case *array.FloatBuilder:
    for i := 0; i < 10; i++ {
        b.Append(rand.Float64())
    }
case *array.IntBuilder:
    for i := 0; i < 10; i++ {
        b.Append(rand.Int64())
    }
}

b.Resize(10)
for i := 0; i < 10; i++ {
    switch b := b.(type) {
    case *array.FloatBuilder:
        b.Append(rand.Float64())
    case *array.IntBuilder:
        b.Append(rand.Int64())
    }
}

These are simple examples, but the first is column-based and the second is row-based. The row-based one spends a lot of time checking the type of the builder before appending the next value. This is a slow operation inside the for loop which itself is a hotspot for optimization. The first determines the type of the column first and then constructs the entire column using a specialized type. The first one only needs to pay the indirection cost of the interface once and can benefit from for loop optimizations.

One practical example is filter(). The filter() function is row-based in that it evaluates each row independently to determine if it is filtered or not. A naive implementation would construct the builder for each row, resize it to the maximum capacity, and then append each value to each column whenever the row passed the filter. It would then finish by reallocating the arrays to the proper size. A faster method is to use a bitset to keep track of which rows are filtered and which ones remain. We can allocate a bitset that holds the rows in the current table chunk, run the filter on each row, and record whether it passes the filter or not. We can then use that bitset to construct each column independently of each other.

Utilize Slices

Arrow arrays are immutable and have built in support for slicing data. A slice keeps the same reference to the underlying data, but limits the view of that data to a smaller section. Slices should be used when the data naturally slices itself. An example is the limit() transformation where the data naturally gets sliced and the extra memory that is retained may not matter very much.

Slices can have disadvantages though. An algorithm that goes out of its way to use slices to conserve memory can end up using more memory and cpu cycles. An example is filter(). With filter(), we can have a situation where we filter out every even row and keep every odd row. If we attempted to use slices for this, we would have a bunch of table chunks that were of length 1. Table chunks of one increase the number of table chunks and increase likelihood that we will spend more time on overhead than data processing. Since the slices reference the same underlying array, it also prevents us from releasing the memory used by data in the even rows which uses more memory in the overall query. This last part still applies to transformations like limit() even though that transformation can benefit from slices in some circumstances.

In circumstances like the above, copies can be much more efficient.

Copying Data

Data can be copied with the Copy utilities in the github.com/influxdata/flux/internal/arrowutil package. There are many copy utilities in there, but the most useful is likely CopyByIndexTo. This method takes a list of indices from the source array and copies them into the destination builder.

Dynamic Builders

Sometimes, we cannot avoid row-based algorithms and row-based algorithms are likely going to require dynamically appending values. There are two useful methods in the github.com/influxdata/flux/arrow for this.

The first is arrow.NewBuilder. This takes a column type and produces an appropriate builder for that column type.

The second is arrow.AppendValue. This one takes a builder, usually constructed with arrow.NewBuilder, and appends the value to the arrow builder.

The most common usage of these is like this:

b := arrow.NewBuilder(flux.ColumnType(v.Type()), mem)
if err := arrow.AppendValue(b, v); err != nil {
    return nil, err
}
return b.NewArray(), nil

Documentation

Overview

Package execute contains the implementation of the execution phase in the query engine.

Index

Constants

View Source
const (
	MaxTime = math.MaxInt64
	MinTime = math.MinInt64
)
View Source
const (
	DefaultStartColLabel = "_start"
	DefaultStopColLabel  = "_stop"
	DefaultTimeColLabel  = "_time"
	DefaultValueColLabel = "_value"
)
View Source
const OperatorProfilerContextKey = "operator-profiler"

Variables

View Source
var AllProfilers = make(map[string]CreateProfilerFunc)
View Source
var AllTime = Bounds{
	Start: MinTime,
	Stop:  MaxTime,
}
View Source
var DefaultSelectorConfig = SelectorConfig{
	Column: DefaultValueColLabel,
}
View Source
var DefaultSimpleAggregateConfig = SimpleAggregateConfig{
	Columns: []string{DefaultValueColLabel},
}

Functions

func AddNewTableCols

func AddNewTableCols(t flux.Table, builder TableBuilder, colMap []int) ([]int, error)

AddNewCols adds the columns of b onto builder that did not already exist. Returns the mapping of builder cols to table cols.

func AddTableCols

func AddTableCols(t flux.Table, builder TableBuilder) error

AddTableCols adds the columns of b onto builder.

func AddTableKeyCols

func AddTableKeyCols(key flux.GroupKey, builder TableBuilder) error

func AppendCol

func AppendCol(bj, cj int, cr flux.ColReader, builder TableBuilder) error

AppendCol append a column from cr onto builder The indexes bj and cj are builder and col reader indexes respectively.

func AppendCols

func AppendCols(cr flux.ColReader, builder TableBuilder) error

AppendCols appends all columns from cr onto builder. This function assumes that builder and cr have the same column schema.

func AppendKeyValues

func AppendKeyValues(key flux.GroupKey, builder TableBuilder) error

AppendKeyValues appends the key values to the right columns in the builder. The builder is expected to contain the key columns.

func AppendKeyValuesN added in v0.38.0

func AppendKeyValuesN(key flux.GroupKey, builder TableBuilder, n int) error

AppendKeyValuesN runs AppendKeyValues `n` times. This is different from ```

for i := 0; i < n; i++ {
  AppendKeyValues(key, builder)
}

``` Because it saves the overhead of calculating the column mapping `n` times.

func AppendMappedCols

func AppendMappedCols(cr flux.ColReader, builder TableBuilder, colMap []int) error

AppendMappedCols appends all columns from cr onto builder. The colMap is a map of builder column index to cr column index.

func AppendMappedRecordExplicit

func AppendMappedRecordExplicit(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error

AppendMappedRecordExplicit appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, no value is appended.

func AppendMappedRecordWithNulls added in v0.14.0

func AppendMappedRecordWithNulls(i int, cr flux.ColReader, builder TableBuilder, colMap []int) error

AppendMappedRecordWithNulls appends the records from cr onto builder, using colMap as a map of builder index to cr index. if an entry in the colMap indicates a mismatched column, the column is created with null values

func AppendMappedTable

func AppendMappedTable(t flux.Table, builder TableBuilder, colMap []int) error

AppendMappedTable appends data from table t onto builder. The colMap is a map of builder column index to table column index.

func AppendRecord

func AppendRecord(i int, cr flux.ColReader, builder TableBuilder) error

AppendRecord appends the record from cr onto builder assuming matching columns.

func AppendTable

func AppendTable(t flux.Table, builder TableBuilder) error

AppendTable appends data from table t onto builder. This function assumes builder and t have the same column schema.

func BuilderColsMatchReader added in v0.8.0

func BuilderColsMatchReader(builder TableBuilder, cr flux.ColReader) bool

BuilderColsMatchReader returns true if builder and cr have identical column sets (order dependent)

func CheckColType

func CheckColType(col flux.ColMeta, typ flux.ColType)

func Close added in v0.148.0

func Close(err error, c Closer) error

Close is a convenience method that will take an error and a Closer. This will call the Close method on the Closer. If the error is nil, it will return any error from the Close method. If the error was not nil, it will return the error.

func ColIdx

func ColIdx(label string, cols []flux.ColMeta) int

func ColMap

func ColMap(colMap []int, builder TableBuilder, cols []flux.ColMeta) []int

ColMap writes a mapping of builder index to cols index into colMap. When colMap does not have enough capacity a new colMap is allocated. The colMap is always returned

func ContainsStr

func ContainsStr(strs []string, str string) bool

func ConvertFromKind

func ConvertFromKind(k semantic.Nature) flux.ColType

func ConvertToKind

func ConvertToKind(t flux.ColType) semantic.Nature

func CopyTable

func CopyTable(t flux.Table) (flux.BufferedTable, error)

CopyTable returns a buffered copy of the table and consumes the input table. If the input table is already buffered, it "consumes" the input and returns the same table.

The buffered table can then be copied additional times using the BufferedTable.Copy method.

This method should be used sparingly if at all. It will retain each of the buffers of data coming out of a table so the entire table is materialized in memory. For large datasets, this could potentially cause a problem. The allocator is meant to catch when this happens and prevent it.

func FormatResult added in v0.20.0

func FormatResult(w io.Writer, res flux.Result) error

FormatResult prints the result to w.

func GroupKeyForRowOn

func GroupKeyForRowOn(i int, cr flux.ColReader, on map[string]bool) flux.GroupKey

func HasCol

func HasCol(label string, cols []flux.ColMeta) bool

func HaveExecutionDependencies added in v0.91.0

func HaveExecutionDependencies(ctx context.Context) bool

func NewAggregateTransformation

func NewAggregateTransformation(id DatasetID, t AggregateTransformation, mem memory.Allocator) (Transformation, Dataset, error)

NewAggregateTransformation constructs a Transformation and Dataset using the aggregateTransformation implementation.

func NewDataset

func NewDataset(id DatasetID, accMode AccumulationMode, cache DataCache) *dataset

func NewEmptyTable added in v0.49.0

func NewEmptyTable(key flux.GroupKey, cols []flux.ColMeta) flux.Table

NewEmptyTable constructs a new empty table with the given group key and columns.

func NewGroupKey

func NewGroupKey(cols []flux.ColMeta, values []values.Value) flux.GroupKey

func NewGroupTransformation added in v0.128.0

func NewGroupTransformation(id DatasetID, t GroupTransformation, mem memory.Allocator) (Transformation, Dataset, error)

func NewIndexSelectorTransformation

func NewIndexSelectorTransformation(d Dataset, c TableBuilderCache, selector IndexSelector, config SelectorConfig) *indexSelectorTransformation

func NewNarrowStateTransformation added in v0.130.0

func NewNarrowStateTransformation(id DatasetID, t NarrowStateTransformation, mem memory.Allocator) (Transformation, Dataset, error)

NewNarrowStateTransformation constructs a Transformation and Dataset using the NarrowStateTransformation implementation.

func NewNarrowTransformation added in v0.125.0

func NewNarrowTransformation(id DatasetID, t NarrowTransformation, mem memory.Allocator) (Transformation, Dataset, error)

NewNarrowTransformation constructs a Transformation and Dataset using the NarrowTransformation implementation.

func NewRowSelectorTransformation

func NewRowSelectorTransformation(d Dataset, c TableBuilderCache, selector RowSelector, config SelectorConfig) *rowSelectorTransformation

func NewSimpleAggregateTransformation added in v0.127.0

func NewSimpleAggregateTransformation(ctx context.Context, id DatasetID, agg SimpleAggregate, config SimpleAggregateConfig, mem memory.Allocator) (Transformation, Dataset, error)

func NewTableBuilderCache

func NewTableBuilderCache(a *memory.Allocator) *tableBuilderCache

func OperationType added in v0.125.0

func OperationType(t interface{}) string

OperationType returns a string representation of the transformation operation represented by the Transport.

func PanicUnknownType

func PanicUnknownType(typ flux.ColType)

func RegisterProfilerFactories added in v0.86.0

func RegisterProfilerFactories(cpfs ...CreateProfilerFunc)

func RegisterSource

func RegisterSource(k plan.ProcedureKind, c CreateSource)

func RegisterTransformation

func RegisterTransformation(k plan.ProcedureKind, c CreateTransformation)

RegisterTransformation adds a new registration mapping of procedure kind to transformation.

func ReplaceTransformation added in v0.14.0

func ReplaceTransformation(k plan.ProcedureKind, c CreateTransformation)

ReplaceTransformation changes an existing transformation registration.

func StartSpanFromContext added in v0.86.0

func StartSpanFromContext(ctx context.Context, operationName string, label string, opts ...opentracing.StartSpanOption) (context.Context, opentracing.Span)

Create a tracing span. Depending on whether the Jaeger tracing and/or the operator profiling are enabled, the Span produced by this function can be very different. It could be a no-op span, a Jaeger span, a no-op span wrapped by a profiling span, or a Jaeger span wrapped by a profiling span.

func TablesEqual added in v0.8.0

func TablesEqual(left, right flux.Table, alloc *memory.Allocator) (bool, error)

TablesEqual takes two flux tables and compares them. Returns false if the tables have different keys, different columns, or if the data in any column does not match. Returns true otherwise. This function will consume the ColumnReader so if you are calling this from the a Process method, you may need to copy the table if you need to iterate over the data for other calculations.

func ValueForRow

func ValueForRow(cr flux.ColReader, i, j int) values.Value

ValueForRow retrieves a value from an arrow column reader at the given index.

Types

type AccumulationMode

type AccumulationMode int
const (
	// DiscardingMode will discard the data associated with a group key
	// after it has been processed.
	DiscardingMode AccumulationMode = iota

	// AccumulatingMode will retain the data associated with a group key
	// after it has been processed. If it has already sent a table with
	// that group key to a downstream transformation, it will signal
	// to that transformation that the previous table should be retracted.
	//
	// This is not implemented at the moment.
	AccumulatingMode
)

type Administration

type Administration interface {
	Context() context.Context

	ResolveTime(qt flux.Time) Time
	StreamContext() StreamContext
	Allocator() *memory.Allocator
	Parents() []DatasetID
}

type AggregateTransformation added in v0.127.0

type AggregateTransformation interface {
	// Aggregate will process the table.Chunk with the state from the previous
	// time a table with this group key was invoked.
	//
	// If this group key has never been invoked before, the state will be nil.
	//
	// The transformation should return the new state and a boolean
	// value of true if the state was created or modified. If false is returned,
	// the new state will be discarded and any old state will be kept.
	//
	// It is ok for the transformation to modify the state if it is
	// a pointer. This is both allowed and recommended.
	Aggregate(chunk table.Chunk, state interface{}, mem memory.Allocator) (interface{}, bool, error)

	// Compute will signal the AggregateTransformation to compute
	// the aggregate for the given key from the provided state.
	//
	// The state will be the value that was returned from Aggregate.
	// If the Aggregate function never returned state, this function
	// will never be called.
	Compute(key flux.GroupKey, state interface{}, d *TransportDataset, mem memory.Allocator) error

	Closer
}

AggregateTransformation implements a transformation that aggregates the results from multiple TableView values and then outputs a Table with the same group key.

This is similar to NarrowTransformation that it does not modify the group key, but different because it will only output a table when the key is flushed.

type Allocator

type Allocator struct {
	*memory.Allocator
}

Allocator tracks the amount of memory being consumed by a query. The allocator provides methods similar to make and append, to allocate large slices of data. The allocator also provides a Free method to account for when memory will be freed.

func (*Allocator) AppendBools

func (a *Allocator) AppendBools(slice []bool, vs ...bool) []bool

AppendBools appends bools to a slice

func (*Allocator) AppendFloats

func (a *Allocator) AppendFloats(slice []float64, vs ...float64) []float64

AppendFloats appends float64s to a slice

func (*Allocator) AppendInts

func (a *Allocator) AppendInts(slice []int64, vs ...int64) []int64

AppendInts appends int64s to a slice

func (*Allocator) AppendStrings

func (a *Allocator) AppendStrings(slice []string, vs ...string) []string

AppendStrings appends strings to a slice. Only the string headers are accounted for.

func (*Allocator) AppendTimes

func (a *Allocator) AppendTimes(slice []Time, vs ...Time) []Time

AppendTimes appends Times to a slice

func (*Allocator) AppendUInts

func (a *Allocator) AppendUInts(slice []uint64, vs ...uint64) []uint64

AppendUInts appends uint64s to a slice

func (*Allocator) Bools

func (a *Allocator) Bools(l, c int) []bool

Bools makes a slice of bool values.

func (*Allocator) Floats

func (a *Allocator) Floats(l, c int) []float64

Floats makes a slice of float64 values.

func (*Allocator) Free

func (a *Allocator) Free(n, size int)

Free informs the allocator that memory has been freed.

func (*Allocator) GrowBools

func (a *Allocator) GrowBools(slice []bool, n int) []bool

func (*Allocator) GrowFloats

func (a *Allocator) GrowFloats(slice []float64, n int) []float64

func (*Allocator) GrowInts

func (a *Allocator) GrowInts(slice []int64, n int) []int64

func (*Allocator) GrowStrings

func (a *Allocator) GrowStrings(slice []string, n int) []string

func (*Allocator) GrowTimes

func (a *Allocator) GrowTimes(slice []Time, n int) []Time

func (*Allocator) GrowUInts

func (a *Allocator) GrowUInts(slice []uint64, n int) []uint64

func (*Allocator) Ints

func (a *Allocator) Ints(l, c int) []int64

Ints makes a slice of int64 values.

func (*Allocator) Strings

func (a *Allocator) Strings(l, c int) []string

Strings makes a slice of string values. Only the string headers are accounted for.

func (*Allocator) Times

func (a *Allocator) Times(l, c int) []Time

Times makes a slice of Time values.

func (*Allocator) UInts

func (a *Allocator) UInts(l, c int) []uint64

UInts makes a slice of uint64 values.

type AsyncTransport added in v0.125.0

type AsyncTransport interface {
	Transport
	// Finished reports when the AsyncTransport has completed and there is no more work to do.
	Finished() <-chan struct{}
}

AsyncTransport is a Transport that performs its work in a separate goroutine.

type BoolValueFunc

type BoolValueFunc interface {
	ValueBool() bool
}

type Bounds

type Bounds struct {
	Start Time
	Stop  Time
}

func (Bounds) Contains

func (b Bounds) Contains(t Time) bool

func (Bounds) Duration added in v0.19.0

func (b Bounds) Duration() Duration

func (Bounds) Equal

func (b Bounds) Equal(o Bounds) bool

func (*Bounds) Intersect added in v0.17.0

func (b *Bounds) Intersect(o Bounds) Bounds

Intersect returns the intersection of two bounds. It returns empty bounds if one of the input bounds are empty. TODO: there are several places that implement bounds and related utilities.

consider a central place for them?

func (Bounds) IsEmpty

func (b Bounds) IsEmpty() bool

func (Bounds) Overlaps

func (b Bounds) Overlaps(o Bounds) bool

func (Bounds) Shift

func (b Bounds) Shift(d Duration) Bounds

func (Bounds) String

func (b Bounds) String() string

type Closer added in v0.148.0

type Closer interface {
	// Close is invoked when the resource will no longer be used.
	Close() error
}

Closer is an interface to be implemented for a resource that will be closed at a defined time.

type ColListTable

type ColListTable struct {
	// contains filtered or unexported fields
}

ColListTable implements Table using list of columns. All data for the table is stored in RAM. As a result At* methods are provided directly on the table for easy access.

func (*ColListTable) Bools

func (t *ColListTable) Bools(j int) *array.Boolean

func (*ColListTable) Cols

func (t *ColListTable) Cols() []flux.ColMeta

func (*ColListTable) Do

func (t *ColListTable) Do(f func(flux.ColReader) error) error

func (*ColListTable) Done added in v0.31.0

func (t *ColListTable) Done()

func (*ColListTable) Empty

func (t *ColListTable) Empty() bool

func (*ColListTable) Floats

func (t *ColListTable) Floats(j int) *array.Float

func (*ColListTable) GetRow

func (t *ColListTable) GetRow(row int) values.Object

GetRow takes a row index and returns the record located at that index in the cache

func (*ColListTable) Ints

func (t *ColListTable) Ints(j int) *array.Int

func (*ColListTable) Key

func (t *ColListTable) Key() flux.GroupKey

func (*ColListTable) Len

func (t *ColListTable) Len() int

func (*ColListTable) NRows

func (t *ColListTable) NRows() int

func (*ColListTable) RefCount

func (t *ColListTable) RefCount(n int)

func (*ColListTable) Release added in v0.31.0

func (t *ColListTable) Release()

func (*ColListTable) Retain added in v0.31.0

func (t *ColListTable) Retain()

func (*ColListTable) Strings

func (t *ColListTable) Strings(j int) *array.String

func (*ColListTable) Times

func (t *ColListTable) Times(j int) *array.Int

func (*ColListTable) UInts

func (t *ColListTable) UInts(j int) *array.Uint

type ColListTableBuilder

type ColListTableBuilder struct {
	// contains filtered or unexported fields
}

func NewColListTableBuilder

func NewColListTableBuilder(key flux.GroupKey, a *memory.Allocator) *ColListTableBuilder

func (*ColListTableBuilder) AddCol

func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error)

func (*ColListTableBuilder) AppendBool

func (b *ColListTableBuilder) AppendBool(j int, value bool) error

func (*ColListTableBuilder) AppendBools

func (b *ColListTableBuilder) AppendBools(j int, vs *array.Boolean) error

func (*ColListTableBuilder) AppendFloat

func (b *ColListTableBuilder) AppendFloat(j int, value float64) error

func (*ColListTableBuilder) AppendFloats

func (b *ColListTableBuilder) AppendFloats(j int, vs *array.Float) error

func (*ColListTableBuilder) AppendInt

func (b *ColListTableBuilder) AppendInt(j int, value int64) error

func (*ColListTableBuilder) AppendInts

func (b *ColListTableBuilder) AppendInts(j int, vs *array.Int) error

func (*ColListTableBuilder) AppendNil added in v0.14.0

func (b *ColListTableBuilder) AppendNil(j int) error

func (*ColListTableBuilder) AppendString

func (b *ColListTableBuilder) AppendString(j int, value string) error

func (*ColListTableBuilder) AppendStrings

func (b *ColListTableBuilder) AppendStrings(j int, vs *array.String) error

func (*ColListTableBuilder) AppendTime

func (b *ColListTableBuilder) AppendTime(j int, value Time) error

func (*ColListTableBuilder) AppendTimes

func (b *ColListTableBuilder) AppendTimes(j int, vs *array.Int) error

func (*ColListTableBuilder) AppendUInt

func (b *ColListTableBuilder) AppendUInt(j int, value uint64) error

func (*ColListTableBuilder) AppendUInts

func (b *ColListTableBuilder) AppendUInts(j int, vs *array.Uint) error

func (*ColListTableBuilder) AppendValue

func (b *ColListTableBuilder) AppendValue(j int, v values.Value) error

func (*ColListTableBuilder) Bools added in v0.10.0

func (b *ColListTableBuilder) Bools(j int) []bool

func (*ColListTableBuilder) ClearData

func (b *ColListTableBuilder) ClearData()

func (*ColListTableBuilder) Cols

func (b *ColListTableBuilder) Cols() []flux.ColMeta

func (*ColListTableBuilder) Floats added in v0.10.0

func (b *ColListTableBuilder) Floats(j int) []float64

func (*ColListTableBuilder) GetRow added in v0.10.0

func (b *ColListTableBuilder) GetRow(row int) values.Object

GetRow takes a row index and returns the record located at that index in the cache

func (*ColListTableBuilder) GrowBools

func (b *ColListTableBuilder) GrowBools(j, n int) error

func (*ColListTableBuilder) GrowFloats

func (b *ColListTableBuilder) GrowFloats(j, n int) error

func (*ColListTableBuilder) GrowInts

func (b *ColListTableBuilder) GrowInts(j, n int) error

func (*ColListTableBuilder) GrowStrings

func (b *ColListTableBuilder) GrowStrings(j, n int) error

func (*ColListTableBuilder) GrowTimes

func (b *ColListTableBuilder) GrowTimes(j, n int) error

func (*ColListTableBuilder) GrowUInts

func (b *ColListTableBuilder) GrowUInts(j, n int) error

func (*ColListTableBuilder) Ints added in v0.10.0

func (b *ColListTableBuilder) Ints(j int) []int64

func (*ColListTableBuilder) Key

func (*ColListTableBuilder) Len added in v0.10.0

func (b *ColListTableBuilder) Len() int

func (*ColListTableBuilder) LevelColumns

func (b *ColListTableBuilder) LevelColumns() error

func (*ColListTableBuilder) NCols

func (b *ColListTableBuilder) NCols() int

func (*ColListTableBuilder) NRows

func (b *ColListTableBuilder) NRows() int

func (*ColListTableBuilder) Release added in v0.48.0

func (b *ColListTableBuilder) Release()

func (*ColListTableBuilder) SetBool

func (b *ColListTableBuilder) SetBool(i int, j int, value bool) error

func (*ColListTableBuilder) SetFloat

func (b *ColListTableBuilder) SetFloat(i int, j int, value float64) error

func (*ColListTableBuilder) SetInt

func (b *ColListTableBuilder) SetInt(i int, j int, value int64) error

func (*ColListTableBuilder) SetNil added in v0.14.0

func (b *ColListTableBuilder) SetNil(i, j int) error

func (*ColListTableBuilder) SetString

func (b *ColListTableBuilder) SetString(i int, j int, value string) error

func (*ColListTableBuilder) SetTime

func (b *ColListTableBuilder) SetTime(i int, j int, value Time) error

func (*ColListTableBuilder) SetUInt

func (b *ColListTableBuilder) SetUInt(i int, j int, value uint64) error

func (*ColListTableBuilder) SetValue

func (b *ColListTableBuilder) SetValue(i, j int, v values.Value) error

func (*ColListTableBuilder) SliceColumns added in v0.8.0

func (b *ColListTableBuilder) SliceColumns(start, stop int) error

SliceColumns iterates over each column of b and re-slices them to the range [start:stop].

func (*ColListTableBuilder) Sort

func (b *ColListTableBuilder) Sort(cols []string, desc bool)

func (*ColListTableBuilder) Strings added in v0.10.0

func (b *ColListTableBuilder) Strings(j int) []string

func (*ColListTableBuilder) Table

func (b *ColListTableBuilder) Table() (flux.Table, error)

func (*ColListTableBuilder) Times added in v0.10.0

func (b *ColListTableBuilder) Times(j int) []values.Time

func (*ColListTableBuilder) UInts added in v0.10.0

func (b *ColListTableBuilder) UInts(j int) []uint64

type CreateProfilerFunc added in v0.86.0

type CreateProfilerFunc func() Profiler

type CreateSource

type CreateSource func(spec plan.ProcedureSpec, id DatasetID, ctx Administration) (Source, error)

type CreateTransformation

type CreateTransformation func(id DatasetID, mode AccumulationMode, spec plan.ProcedureSpec, a Administration) (Transformation, Dataset, error)

type DataCache

type DataCache interface {
	Table(flux.GroupKey) (flux.Table, error)

	ForEach(func(flux.GroupKey) error) error
	ForEachWithContext(func(flux.GroupKey, Trigger, TableContext) error) error

	DiscardTable(flux.GroupKey)
	ExpireTable(flux.GroupKey)

	SetTriggerSpec(t plan.TriggerSpec)
}

DataCache holds all working data for a transformation.

type Dataset

type Dataset interface {
	Node

	RetractTable(key flux.GroupKey) error
	UpdateProcessingTime(t Time) error
	UpdateWatermark(mark Time) error
	Finish(error)

	SetTriggerSpec(t plan.TriggerSpec)
}

Dataset represents the set of data produced by a transformation.

func NewIndexSelectorTransformationAndDataset

func NewIndexSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector IndexSelector, config SelectorConfig, a *memory.Allocator) (*indexSelectorTransformation, Dataset)

func NewRowSelectorTransformationAndDataset

func NewRowSelectorTransformationAndDataset(id DatasetID, mode AccumulationMode, selector RowSelector, config SelectorConfig, a *memory.Allocator) (*rowSelectorTransformation, Dataset)

type DatasetContext added in v0.114.0

type DatasetContext interface {
	Dataset
	WithContext(ctx context.Context)
}

DatasetContext represents a Dataset with a context.Context attached.

type DatasetID

type DatasetID uuid.UUID
var ZeroDatasetID DatasetID

func DatasetIDFromNodeID

func DatasetIDFromNodeID(id plan.NodeID) DatasetID

func (DatasetID) IsZero

func (id DatasetID) IsZero() bool

func (DatasetID) String

func (id DatasetID) String() string

type Dispatcher

type Dispatcher interface {
	// Schedule fn to be executed
	Schedule(fn ScheduleFunc)
}

Dispatcher schedules work for a query. Each transformation submits work to be done to the dispatcher. Then the dispatcher schedules to work based on the available resources.

type DoBoolAgg

type DoBoolAgg interface {
	ValueFunc
	DoBool(*array.Boolean)
}

type DoBoolIndexSelector

type DoBoolIndexSelector interface {
	DoBool(*array.Boolean) []int
}

type DoBoolRowSelector

type DoBoolRowSelector interface {
	Rower
	DoBool(vs *array.Boolean, cr flux.ColReader)
}

type DoFloatAgg

type DoFloatAgg interface {
	ValueFunc
	DoFloat(*array.Float)
}

type DoFloatIndexSelector

type DoFloatIndexSelector interface {
	DoFloat(*array.Float) []int
}

type DoFloatRowSelector

type DoFloatRowSelector interface {
	Rower
	DoFloat(vs *array.Float, cr flux.ColReader)
}

type DoIntAgg

type DoIntAgg interface {
	ValueFunc
	DoInt(*array.Int)
}

type DoIntIndexSelector

type DoIntIndexSelector interface {
	DoInt(*array.Int) []int
}

type DoIntRowSelector

type DoIntRowSelector interface {
	Rower
	DoInt(vs *array.Int, cr flux.ColReader)
}

type DoStringAgg

type DoStringAgg interface {
	ValueFunc
	DoString(*array.String)
}

type DoStringIndexSelector

type DoStringIndexSelector interface {
	DoString(*array.String) []int
}

type DoStringRowSelector

type DoStringRowSelector interface {
	Rower
	DoString(vs *array.String, cr flux.ColReader)
}

type DoTimeIndexSelector added in v0.38.0

type DoTimeIndexSelector interface {
	DoTime(*array.Int) []int
}

type DoTimeRowSelector added in v0.38.0

type DoTimeRowSelector interface {
	Rower
	DoTime(vs *array.Int, cr flux.ColReader)
}

type DoUIntAgg

type DoUIntAgg interface {
	ValueFunc
	DoUInt(*array.Uint)
}

type DoUIntIndexSelector

type DoUIntIndexSelector interface {
	DoUInt(*array.Uint) []int
}

type DoUIntRowSelector

type DoUIntRowSelector interface {
	Rower
	DoUInt(vs *array.Uint, cr flux.ColReader)
}

type Duration

type Duration = values.Duration

type ExecutionDependencies added in v0.91.0

type ExecutionDependencies struct {
	// Must be set
	Allocator *memory.Allocator
	Now       *time.Time

	// Allowed to be nil
	Logger *zap.Logger

	// Metadata is passed up from any invocations of execution up to the parent
	// execution, and out through the statistics.
	Metadata metadata.Metadata

	ExecutionOptions *ExecutionOptions
}

ExecutionDependencies represents the dependencies that a function call executed by the Interpreter needs in order to trigger the execution of a flux.Program

func DefaultExecutionDependencies added in v0.91.0

func DefaultExecutionDependencies() ExecutionDependencies

func GetExecutionDependencies added in v0.91.0

func GetExecutionDependencies(ctx context.Context) ExecutionDependencies

func NewExecutionDependencies added in v0.91.0

func NewExecutionDependencies(allocator *memory.Allocator, now *time.Time, logger *zap.Logger) ExecutionDependencies

Create some execution dependencies. Any arg may be nil, this will choose some suitable defaults.

func (ExecutionDependencies) Inject added in v0.91.0

func (ExecutionDependencies) ResolveTimeable added in v0.148.0

func (d ExecutionDependencies) ResolveTimeable(t values.Value) (values.Time, error)

ResolveTimeable returns the time represented by a value. The value's type must be Timeable, one of time or duration.

type ExecutionNode added in v0.86.0

type ExecutionNode struct {
	// contains filtered or unexported fields
}

func (*ExecutionNode) Label added in v0.86.0

func (n *ExecutionNode) Label() string

func (*ExecutionNode) SetLabel added in v0.86.0

func (n *ExecutionNode) SetLabel(label string)

type ExecutionOptions added in v0.91.0

type ExecutionOptions struct {
	OperatorProfiler *OperatorProfiler
	Profilers        []Profiler
}

type Executor

type Executor interface {
	// Execute will begin execution of the plan.Spec using the memory allocator.
	// This returns a mapping of names to the query results.
	// This will also return a channel for the Metadata from the query. The channel
	// may return zero or more values. The returned channel must not require itself to
	// be read so the executor must allocate enough space in the channel so if the channel
	// is unread that it will not block.
	Execute(ctx context.Context, p *plan.Spec, a *memory.Allocator) (map[string]flux.Result, <-chan metadata.Metadata, error)
}

func NewExecutor

func NewExecutor(logger *zap.Logger) Executor

type FinishMsg

type FinishMsg interface {
	Message
	Error() error
}

type FloatValueFunc

type FloatValueFunc interface {
	ValueFloat() float64
}

type FlushKeyMsg added in v0.125.0

type FlushKeyMsg interface {
	Message
	Key() flux.GroupKey
}

type FormatOptions

type FormatOptions struct {
	// RepeatHeaderCount is the number of rows to print before printing the header again.
	// If zero then the headers are not repeated.
	RepeatHeaderCount int

	NullRepresentation string
}

func DefaultFormatOptions

func DefaultFormatOptions() *FormatOptions

type Formatter

type Formatter struct {
	// contains filtered or unexported fields
}

Formatter writes a table to a Writer.

func NewFormatter

func NewFormatter(tbl flux.Table, opts *FormatOptions) *Formatter

NewFormatter creates a Formatter for a given table. If opts is nil, the DefaultFormatOptions are used.

func (*Formatter) WriteTo

func (f *Formatter) WriteTo(out io.Writer) (int64, error)

WriteTo writes the formatted table data to w.

type GroupKeyBuilder

type GroupKeyBuilder struct {
	// contains filtered or unexported fields
}

GroupKeyBuilder is used to construct a GroupKey by keeping a mutable copy in memory.

func NewGroupKeyBuilder

func NewGroupKeyBuilder(key flux.GroupKey) *GroupKeyBuilder

NewGroupKeyBuilder creates a new GroupKeyBuilder from an existing GroupKey. If the GroupKey passed is nil, a blank GroupKeyBuilder is constructed.

func (*GroupKeyBuilder) AddKeyValue

func (gkb *GroupKeyBuilder) AddKeyValue(key string, value values.Value) *GroupKeyBuilder

AddKeyValue will add a new group key to the existing builder.

func (*GroupKeyBuilder) Build

func (gkb *GroupKeyBuilder) Build() (flux.GroupKey, error)

Build will construct the GroupKey. If there is any problem with the GroupKey (such as one of the columns is not a valid type), the error will be returned here.

func (*GroupKeyBuilder) Grow

func (gkb *GroupKeyBuilder) Grow(n int)

Grow will grow the internal capacity of the group key to the given number.

func (*GroupKeyBuilder) Len

func (gkb *GroupKeyBuilder) Len() int

Len returns the current length of the group key.

func (*GroupKeyBuilder) SetKeyValue added in v0.14.0

func (gkb *GroupKeyBuilder) SetKeyValue(key string, value values.Value) *GroupKeyBuilder

SetKeyValue will set an existing key/value to the given pair, or if key is not found, add a new group key to the existing builder.

type GroupLookup

type GroupLookup = groupkey.Lookup

func NewGroupLookup

func NewGroupLookup() *GroupLookup

NewGroupLookup constructs a GroupLookup.

type GroupTransformation added in v0.128.0

type GroupTransformation interface {
	Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error

	Closer
}

GroupTransformation is a transformation that can modify the group key. Other than modifying the group key, it is identical to a NarrowTransformation.

The main difference between this and NarrowTransformation is that NarrowTransformation will pass the FlushKeyMsg to the Dataset and GroupTransformation will swallow this Message.

type IndexSelector

type IndexSelector interface {
	NewTimeSelector() DoTimeIndexSelector
	NewBoolSelector() DoBoolIndexSelector
	NewIntSelector() DoIntIndexSelector
	NewUIntSelector() DoUIntIndexSelector
	NewFloatSelector() DoFloatIndexSelector
	NewStringSelector() DoStringIndexSelector
}

type IntValueFunc

type IntValueFunc interface {
	ValueInt() int64
}

type Message

type Message interface {
	// Type returns the MessageType for this Message.
	Type() MessageType

	// SrcDatasetID is the DatasetID that produced this Message.
	SrcDatasetID() DatasetID

	// Ack is used to acknowledge that the Message was received
	// and terminated. A Message may be passed between various
	// Transport implementations. When the Ack is received,
	// this signals to the Message to release any memory it may
	// have retained.
	Ack()

	// Dup is used to duplicate the Message.
	// This is useful when the Message has to be sent to multiple
	// receivers from a single sender.
	Dup() Message
}

Message is a message sent from one Dataset to another.

type MessageQueue

type MessageQueue interface {
	Push(Message)
	Pop() Message
}

MessageQueue provides a concurrency safe queue for messages. The queue must have a single consumer calling Pop.

type MessageType

type MessageType int
const (
	// RetractTableType is sent when the previous table for
	// a given group key should be retracted.
	RetractTableType MessageType = iota

	// ProcessType is sent when there is an entire flux.Table
	// ready to be processed from the upstream Dataset.
	ProcessType

	// UpdateWatermarkType is sent when there will be no more
	// points older than the watermark for any key.
	UpdateWatermarkType

	// UpdateProcessingTimeType is sent to update the current time.
	UpdateProcessingTimeType

	// FinishType is sent when there are no more messages from
	// the upstream Dataset or an upstream error occurred that
	// caused the execution to abort.
	FinishType

	// ProcessChunkType is sent when a new table.Chunk is ready to
	// be processed from the upstream Dataset.
	ProcessChunkType

	// FlushKeyType is sent when the upstream Dataset wishes
	// to flush the data associated with a key presently stored
	// in the Dataset.
	FlushKeyType
)

type MetadataNode added in v0.21.0

type MetadataNode interface {
	Node
	Metadata() metadata.Metadata
}

MetadataNode is a node that has additional metadata that should be added to the result after it is processed.

type NarrowStateTransformation added in v0.130.0

type NarrowStateTransformation interface {
	// Process will process the TableView.
	Process(chunk table.Chunk, state interface{}, d *TransportDataset, mem memory.Allocator) (interface{}, bool, error)

	Closer
}

NarrowStateTransformation is the same as a NarrowTransformation except that it retains state between processing buffers.

type NarrowTransformation added in v0.125.0

type NarrowTransformation interface {
	// Process will process the table.Chunk and send any output to the TransportDataset.
	Process(chunk table.Chunk, d *TransportDataset, mem memory.Allocator) error

	Closer
}

NarrowTransformation implements a transformation that processes a table.Chunk and does not modify its group key.

type Node

type Node interface {
	AddTransformation(t Transformation)
}

type OperatorProfiler added in v0.86.0

type OperatorProfiler struct {
	// contains filtered or unexported fields
}

func (*OperatorProfiler) GetResult added in v0.86.0

func (o *OperatorProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error)

func (*OperatorProfiler) GetSortedResult added in v0.91.0

func (o *OperatorProfiler) GetSortedResult(q flux.Query, alloc *memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error)

GetSortedResult is identical to GetResult, except it calls Sort() on the ColListTableBuilder to make testing easier. sortKeys and desc are passed directly into the Sort() call

func (*OperatorProfiler) Name added in v0.86.0

func (o *OperatorProfiler) Name() string

type OperatorProfilingResult added in v0.86.0

type OperatorProfilingResult struct {
	Type string
	// Those labels are actually their operation name. See flux/internal/spec.buildSpec.
	// Some examples are:
	// merged_fromRemote_range1_filter2_filter3_filter4, window5, window8, generated_yield, etc.
	Label string
	Start time.Time
	Stop  time.Time
}

type OperatorProfilingSpan added in v0.86.0

type OperatorProfilingSpan struct {
	opentracing.Span

	Result OperatorProfilingResult
	// contains filtered or unexported fields
}

func (*OperatorProfilingSpan) Finish added in v0.86.0

func (t *OperatorProfilingSpan) Finish()

func (*OperatorProfilingSpan) FinishWithOptions added in v0.86.0

func (t *OperatorProfilingSpan) FinishWithOptions(opts opentracing.FinishOptions)

type PassthroughDataset added in v0.49.0

type PassthroughDataset struct {
	// contains filtered or unexported fields
}

PassthroughDataset is a Dataset that will passthrough the processed data to the next Transformation.

func NewPassthroughDataset added in v0.49.0

func NewPassthroughDataset(id DatasetID) *PassthroughDataset

NewPassthroughDataset constructs a new PassthroughDataset.

func (*PassthroughDataset) AddTransformation added in v0.49.0

func (d *PassthroughDataset) AddTransformation(t Transformation)

func (*PassthroughDataset) Finish added in v0.49.0

func (d *PassthroughDataset) Finish(err error)

func (*PassthroughDataset) Process added in v0.49.0

func (d *PassthroughDataset) Process(tbl flux.Table) error

func (*PassthroughDataset) RetractTable added in v0.49.0

func (d *PassthroughDataset) RetractTable(key flux.GroupKey) error

func (*PassthroughDataset) SetTriggerSpec added in v0.49.0

func (d *PassthroughDataset) SetTriggerSpec(t plan.TriggerSpec)

func (*PassthroughDataset) UpdateProcessingTime added in v0.49.0

func (d *PassthroughDataset) UpdateProcessingTime(t Time) error

func (*PassthroughDataset) UpdateWatermark added in v0.49.0

func (d *PassthroughDataset) UpdateWatermark(mark Time) error

type ProcessChunkMsg added in v0.125.0

type ProcessChunkMsg interface {
	Message
	TableChunk() table.Chunk
}

type ProcessMsg

type ProcessMsg interface {
	Message
	Table() flux.Table
}

type Profiler added in v0.82.0

type Profiler interface {
	Name() string
	GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error)
	GetSortedResult(q flux.Query, alloc *memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error)
}

type QueryProfiler added in v0.82.1

type QueryProfiler struct{}

func (*QueryProfiler) GetResult added in v0.82.1

func (s *QueryProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error)

func (*QueryProfiler) GetSortedResult added in v0.91.0

func (s *QueryProfiler) GetSortedResult(q flux.Query, alloc *memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error)

GetSortedResult is identical to GetResult, except it calls Sort() on the ColListTableBuilder to make testing easier. sortKeys and desc are passed directly into the Sort() call

func (*QueryProfiler) Name added in v0.82.1

func (s *QueryProfiler) Name() string

type RandomAccessGroupLookup added in v0.59.0

type RandomAccessGroupLookup = groupkey.RandomAccessLookup

func NewRandomAccessGroupLookup added in v0.59.0

func NewRandomAccessGroupLookup() *RandomAccessGroupLookup

NewRandomAccessGroupLookup constructs a RandomAccessGroupLookup.

type RetractTableMsg

type RetractTableMsg interface {
	Message
	Key() flux.GroupKey
}

type Row

type Row struct {
	Values []interface{}
}

func ReadRow

func ReadRow(i int, cr flux.ColReader) (row Row)

type RowMapFn

type RowMapFn struct {
	// contains filtered or unexported fields
}

func NewRowMapFn

func NewRowMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowMapFn

func (*RowMapFn) Prepare

func (f *RowMapFn) Prepare(cols []flux.ColMeta) (*RowMapPreparedFn, error)

type RowMapPreparedFn added in v0.68.0

type RowMapPreparedFn struct {
	// contains filtered or unexported fields
}

func (*RowMapPreparedFn) Eval added in v0.68.0

func (f *RowMapPreparedFn) Eval(ctx context.Context, row int, cr flux.ColReader) (values.Object, error)

func (*RowMapPreparedFn) Type added in v0.68.0

type RowPredicateFn

type RowPredicateFn struct {
	// contains filtered or unexported fields
}

func NewRowPredicateFn

func NewRowPredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowPredicateFn

func (*RowPredicateFn) Prepare

func (f *RowPredicateFn) Prepare(cols []flux.ColMeta) (*RowPredicatePreparedFn, error)

type RowPredicatePreparedFn added in v0.68.0

type RowPredicatePreparedFn struct {
	// contains filtered or unexported fields
}

func (*RowPredicatePreparedFn) Eval added in v0.68.0

func (f *RowPredicatePreparedFn) Eval(ctx context.Context, record values.Object) (bool, error)

func (*RowPredicatePreparedFn) EvalRow added in v0.68.0

func (f *RowPredicatePreparedFn) EvalRow(ctx context.Context, row int, cr flux.ColReader) (bool, error)

func (*RowPredicatePreparedFn) InferredInputType added in v0.68.0

func (f *RowPredicatePreparedFn) InferredInputType() semantic.MonoType

InferredInputType will return the inferred input type. This type may contain type variables and will only contain the properties that could be inferred from type inference.

func (*RowPredicatePreparedFn) InputType added in v0.68.0

func (f *RowPredicatePreparedFn) InputType() semantic.MonoType

InputType will return the prepared input type. This will be a fully realized type that was created after preparing the function with Prepare.

type RowReader added in v0.34.1

type RowReader interface {
	Next() bool
	GetNextRow() ([]values.Value, error)
	ColumnNames() []string
	ColumnTypes() []flux.ColType
	SetColumns([]interface{})
	io.Closer
}

type RowReduceFn added in v0.23.0

type RowReduceFn struct {
	// contains filtered or unexported fields
}

func NewRowReduceFn added in v0.23.0

func NewRowReduceFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowReduceFn

func (*RowReduceFn) Prepare added in v0.23.0

func (f *RowReduceFn) Prepare(cols []flux.ColMeta, reducerType map[string]semantic.MonoType) (*RowReducePreparedFn, error)

type RowReducePreparedFn added in v0.68.0

type RowReducePreparedFn struct {
	// contains filtered or unexported fields
}

func (*RowReducePreparedFn) Eval added in v0.68.0

func (f *RowReducePreparedFn) Eval(ctx context.Context, row int, cr flux.ColReader, extraParams map[string]values.Value) (values.Object, error)

func (*RowReducePreparedFn) Type added in v0.68.0

type RowSelector

type RowSelector interface {
	NewTimeSelector() DoTimeRowSelector
	NewBoolSelector() DoBoolRowSelector
	NewIntSelector() DoIntRowSelector
	NewUIntSelector() DoUIntRowSelector
	NewFloatSelector() DoFloatRowSelector
	NewStringSelector() DoStringRowSelector
}

type Rower

type Rower interface {
	Rows() []Row
}

type ScheduleFunc

type ScheduleFunc func(ctx context.Context, throughput int)

ScheduleFunc is a function that represents work to do. The throughput is the maximum number of messages to process for this scheduling.

type SelectorConfig

type SelectorConfig struct {
	plan.DefaultCost
	Column string `json:"column"`
}

func (*SelectorConfig) ReadArgs

func (c *SelectorConfig) ReadArgs(args flux.Arguments) error

type SimpleAggregate added in v0.127.0

type SimpleAggregate interface {
	NewBoolAgg() DoBoolAgg
	NewIntAgg() DoIntAgg
	NewUIntAgg() DoUIntAgg
	NewFloatAgg() DoFloatAgg
	NewStringAgg() DoStringAgg
}

type SimpleAggregateConfig added in v0.127.0

type SimpleAggregateConfig struct {
	plan.DefaultCost
	Columns []string `json:"columns"`
}

func (SimpleAggregateConfig) Copy added in v0.127.0

func (*SimpleAggregateConfig) ReadArgs added in v0.127.0

func (c *SimpleAggregateConfig) ReadArgs(args flux.Arguments) error

type Source

type Source interface {
	Node
	Run(ctx context.Context)
	SetLabel(label string)
	Label() string
}

func CreateSourceFromDecoder added in v0.17.0

func CreateSourceFromDecoder(decoder SourceDecoder, dsid DatasetID, a Administration) (Source, error)

CreateSourceFromDecoder takes an implementation of a SourceDecoder, as well as a dataset ID and Administration type and creates an execute.Source.

func CreateSourceFromIterator added in v0.49.0

func CreateSourceFromIterator(iterator SourceIterator, dsid DatasetID) (Source, error)

CreateSourceFromIterator takes an implementation of a SourceIterator as well as a dataset ID and creates an execute.Source.

type SourceDecoder added in v0.17.0

type SourceDecoder interface {
	Connect(ctx context.Context) error
	Fetch(ctx context.Context) (bool, error)
	Decode(ctx context.Context) (flux.Table, error)
	Close() error
}

SourceDecoder is an interface that generalizes the process of retrieving data from an unspecified data source.

Connect implements the logic needed to connect directly to the data source.

Fetch implements a single fetch of data from the source (may be called multiple times). Should return false when there is no more data to retrieve.

Decode implements the process of marshaling the data returned by the source into a flux.Table type.

In executing the retrieval process, Connect is called once at the onset, and subsequent calls of Fetch() and Decode() are called iteratively until the data source is fully consumed.

type SourceIterator added in v0.49.0

type SourceIterator interface {
	// Do will invoke the Source and cause each materialized flux.Table
	// to the given function.
	Do(ctx context.Context, f func(flux.Table) error) error
}

SourceIterator is an interface for iterating over flux.Table values in a source. It provides a common interface for creating an execute.Source in an iterative way.

type StreamContext

type StreamContext interface {
	Bounds() *Bounds
}

StreamContext represents necessary context for a single stream of query data.

type StringValueFunc

type StringValueFunc interface {
	ValueString() string
}

type TableBuilder

type TableBuilder interface {
	Key() flux.GroupKey

	NRows() int
	NCols() int
	Cols() []flux.ColMeta

	// AddCol increases the size of the table by one column.
	// The index of the column is returned.
	AddCol(flux.ColMeta) (int, error)

	// Set sets the value at the specified coordinates
	// The rows and columns must exist before calling set, otherwise Set panics.
	SetValue(i, j int, value values.Value) error
	SetNil(i, j int) error

	// Append will add a single value to the end of a column.  Will set the number of
	// rows in the table to the size of the new column. It's the caller's job to make sure
	// that the expected number of rows in each column is equal.
	AppendBool(j int, value bool) error
	AppendInt(j int, value int64) error
	AppendUInt(j int, value uint64) error
	AppendFloat(j int, value float64) error
	AppendString(j int, value string) error
	AppendTime(j int, value Time) error
	AppendValue(j int, value values.Value) error
	AppendNil(j int) error

	// AppendBools and similar functions will append multiple values to column j.  As above,
	// it will set the numer of rows in the table to the size of the new column.  It's the
	// caller's job to make sure that the expected number of rows in each column is equal.
	AppendBools(j int, vs *array.Boolean) error
	AppendInts(j int, vs *array.Int) error
	AppendUInts(j int, vs *array.Uint) error
	AppendFloats(j int, vs *array.Float) error
	AppendStrings(j int, vs *array.String) error
	AppendTimes(j int, vs *array.Int) error

	// GrowBools and similar functions will extend column j by n zero-values for the respective type.
	// If the column has enough capacity, no reallocation is necessary.  If the capacity is insufficient,
	// a new slice is allocated with 1.5*newCapacity.  As with the Append functions, it is the
	// caller's job to make sure that the expected number of rows in each column is equal.
	GrowBools(j, n int) error
	GrowInts(j, n int) error
	GrowUInts(j, n int) error
	GrowFloats(j, n int) error
	GrowStrings(j, n int) error
	GrowTimes(j, n int) error

	// LevelColumns will check for columns that are too short and Grow them
	// so that each column is of uniform size.
	LevelColumns() error

	// Sort the rows of the by the values of the columns in the order listed.
	Sort(cols []string, desc bool)

	// ClearData removes all rows, while preserving the column meta data.
	ClearData()

	// Release releases any extraneous memory that has been retained.
	Release()

	// Table returns the table that has been built.
	// Further modifications of the builder will not effect the returned table.
	Table() (flux.Table, error)
}

TableBuilder builds tables that can be used multiple times

type TableBuilderCache

type TableBuilderCache interface {
	// TableBuilder returns an existing or new TableBuilder for the given meta data.
	// The boolean return value indicates if TableBuilder is new.
	TableBuilder(key flux.GroupKey) (TableBuilder, bool)
	ForEachBuilder(f func(flux.GroupKey, TableBuilder) error) error
}

type TableContext

type TableContext struct {
	Key   flux.GroupKey
	Count int
}

type TablePredicateFn added in v0.34.1

type TablePredicateFn struct {
	// contains filtered or unexported fields
}

func NewTablePredicateFn added in v0.34.1

func NewTablePredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) *TablePredicateFn

func (*TablePredicateFn) Prepare added in v0.34.1

type TablePredicatePreparedFn added in v0.68.0

type TablePredicatePreparedFn struct {
	// contains filtered or unexported fields
}

func (*TablePredicatePreparedFn) Eval added in v0.68.0

type Time

type Time = values.Time

func Now

func Now() Time

type Transformation

type Transformation interface {
	RetractTable(id DatasetID, key flux.GroupKey) error
	// Process takes in one flux Table, performs data processing on it and
	// writes that table to a DataCache
	Process(id DatasetID, tbl flux.Table) error
	UpdateWatermark(id DatasetID, t Time) error
	UpdateProcessingTime(id DatasetID, t Time) error
	// Finish indicates that the Transformation is done processing. It is
	// the last method called on the Transformation
	Finish(id DatasetID, err error)
}

Transformation represents functions that stream a set of tables, performs data processing on them and produces an output stream of tables

func NewTransformationFromTransport added in v0.148.0

func NewTransformationFromTransport(t Transport) Transformation

NewTransformationFromTransport will adapt a Transport to satisfy both the Transport and Transformation interfaces.

type TransformationSet added in v0.49.0

type TransformationSet []Transformation

TransformationSet is a group of transformations.

func (TransformationSet) Finish added in v0.49.0

func (ts TransformationSet) Finish(id DatasetID, err error)

func (TransformationSet) Process added in v0.49.0

func (ts TransformationSet) Process(id DatasetID, tbl flux.Table) error

func (TransformationSet) RetractTable added in v0.49.0

func (ts TransformationSet) RetractTable(id DatasetID, key flux.GroupKey) error

func (TransformationSet) UpdateProcessingTime added in v0.49.0

func (ts TransformationSet) UpdateProcessingTime(id DatasetID, time Time) error

func (TransformationSet) UpdateWatermark added in v0.49.0

func (ts TransformationSet) UpdateWatermark(id DatasetID, time Time) error

type Transport

type Transport interface {
	// ProcessMessage will process a message in the Transport.
	ProcessMessage(m Message) error
}

Transport is an interface for handling raw messages.

func WrapTransformationInTransport added in v0.125.0

func WrapTransformationInTransport(t Transformation, mem memory.Allocator) Transport

WrapTransformationInTransport will wrap a Transformation into a Transport to be used for the execution engine.

type TransportDataset added in v0.125.0

type TransportDataset struct {
	// contains filtered or unexported fields
}

TransportDataset holds data for a specific node and sends messages to downstream nodes using the Transport.

This Dataset also implements a shim for execute.Dataset so it can be integrated with the existing execution engine. These methods are stubs and do not do anything.

func NewTransportDataset added in v0.125.0

func NewTransportDataset(id DatasetID, mem memory.Allocator) *TransportDataset

NewTransportDataset constructs a TransportDataset.

func (*TransportDataset) AddTransformation added in v0.125.0

func (d *TransportDataset) AddTransformation(t Transformation)

AddTransformation is used to add downstream Transformation nodes to this Transport.

func (*TransportDataset) Delete added in v0.125.0

func (d *TransportDataset) Delete(key flux.GroupKey) (v interface{}, found bool)

func (*TransportDataset) Finish added in v0.125.0

func (d *TransportDataset) Finish(err error)

func (*TransportDataset) FlushKey added in v0.125.0

func (d *TransportDataset) FlushKey(key flux.GroupKey) error

FlushKey sends the flush key message to the downstream transports.

func (*TransportDataset) Lookup added in v0.125.0

func (d *TransportDataset) Lookup(key flux.GroupKey) (interface{}, bool)

func (*TransportDataset) LookupOrCreate added in v0.125.0

func (d *TransportDataset) LookupOrCreate(key flux.GroupKey, fn func() interface{}) interface{}

func (*TransportDataset) Process added in v0.125.0

func (d *TransportDataset) Process(chunk table.Chunk) error

Process sends the given Chunk to be processed by the downstream transports.

func (*TransportDataset) Range added in v0.127.0

func (d *TransportDataset) Range(f func(key flux.GroupKey, value interface{}) error) error

func (*TransportDataset) RetractTable added in v0.125.0

func (d *TransportDataset) RetractTable(key flux.GroupKey) error

func (*TransportDataset) Set added in v0.125.0

func (d *TransportDataset) Set(key flux.GroupKey, value interface{})

func (*TransportDataset) SetTriggerSpec added in v0.125.0

func (d *TransportDataset) SetTriggerSpec(t plan.TriggerSpec)

func (*TransportDataset) UpdateProcessingTime added in v0.125.0

func (d *TransportDataset) UpdateProcessingTime(t Time) error

func (*TransportDataset) UpdateWatermark added in v0.125.0

func (d *TransportDataset) UpdateWatermark(mark Time) error

type Trigger

type Trigger interface {
	Triggered(TriggerContext) bool
	Finished() bool
	Reset()
}

func NewTriggerFromSpec

func NewTriggerFromSpec(spec plan.TriggerSpec) Trigger

type TriggerContext

type TriggerContext struct {
	Table                 TableContext
	Watermark             Time
	CurrentProcessingTime Time
}

type UIntValueFunc

type UIntValueFunc interface {
	ValueUInt() uint64
}

type UpdateProcessingTimeMsg

type UpdateProcessingTimeMsg interface {
	Message
	ProcessingTime() Time
}

type UpdateWatermarkMsg

type UpdateWatermarkMsg interface {
	Message
	WatermarkTime() Time
}

type ValueFunc

type ValueFunc interface {
	Type() flux.ColType
	IsNull() bool
}

type Window

type Window struct {
	Every  Duration
	Period Duration
	Offset Duration
}

func NewWindow added in v0.19.0

func NewWindow(every, period, offset Duration) (Window, error)

NewWindow creates a window with the given parameters, and normalizes the offset to a small positive duration. It also validates that the durations are valid when used within a window.

func (Window) GetEarliestBounds added in v0.19.0

func (w Window) GetEarliestBounds(t Time) Bounds

GetEarliestBounds returns the bounds for the earliest window bounds that contains the given time t. For underlapping windows that do not contain time t, the window directly after time t will be returned.

func (Window) GetOverlappingBounds added in v0.19.0

func (w Window) GetOverlappingBounds(b Bounds) []Bounds

GetOverlappingBounds returns a slice of bounds for each window that overlaps the input bounds b.

func (Window) IsValid added in v0.53.0

func (w Window) IsValid() error

IsValid will check if this Window is valid and it will return an error if it isn't.

Directories

Path Synopsis
Package executetest contains utilities for testing the query execution phase.
Package executetest contains utilities for testing the query execution phase.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL