stdlib

package
v0.166.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2022 License: MIT Imports: 90 Imported by: 6

README

Flux Standard Library

This directory contains the implementation of the Flux standard library.

Code Organization

A Flux package is represented as a directory containing at least one file with a .flux extension. The package name is declared within the file using the package clause. Package names should be the same name as the directory.

Test files may placed in the same directory as the packing using the _test.flux suffix on the file name. The test files must have a _test suffix for the package name and the prefix must match the name of the non test package.

Because the above mirrors the Go package structure it is common to also have .go file and _test.go files that mirror the .flux files.

A typical Flux package structure:

stdlib/strings/
├── flux_gen.go
├── flux_test_gen.go
├── replaceAll_test.flux
├── replace_test.flux
├── strings.flux
├── strings.go
├── strings_test.go
├── subset_test.flux
├── title_test.flux
├── toLower_test.flux
└── toUpper_test.flux

The files flux_gen.go and flux_test_gen.go are generated using the builtin command. They contain the AST of the various .flux files in the package as Go structs. All *_test.flux files are encoded into flux_test_gen.go the non test .flux files are encoded in flux_gen.go

NOTE: The flux_test_gen.go file is not a Go test file as we want to include the Flux test code into the normal build. This enables downstream projects that import Flux to run the test suite define in the standard library against their implementation.

Third Party Contributions

We collect third part contributions into the contrib package. See the README for details on how to contribute a third party package to Flux.

Documentation

Overview

Package stdlib represents the Flux standard library. The Flux standard library is a collection of built-in packages that may be imported by any Flux script. Each package in the standard library exports a collection of values most interesting of which are the exported funtion values. These valeus are callable in the flux query processor. While flux may be extended at runtime by writing function expressions, there are some limitations for which a built-in function is necessary, such as the need for custom data structures, stateful procedures, complex looping and branching, and connection to external services. Another reason for implementing a built-in function is to provide a function that is broadly applicable for many users (e.g., sum() or max()).

The stdlib package is rarely accessed as a direct API. Rather, the query processing engine accepts named registrations for various interfaces implemented within the standard library and in the case of function values, executes them generically using an API that is common to all functions. The registration process is executed by running the init() function in each package file, and is then finalized by importing package builtin, which itself imports the standard library and runs a final setup routine that finalizes installation of the builtin functions and values to the query processor.

Because of this design, a built-in function implementation consists of a bundle of different interface implementations that are required at various phases of query execution. These phases are query, plan and execute. The query phase is for identifying initializing the parameter sets for each function, and initializing the internal representation of a query. The plan phase is for creating a final execution plan, and the execution phase is for physically accessing the data and computing a final result.

The query phase takes each function call in a query and performs a match against the registry to see if there are type definitions for a built-in operation. If matched, it will instantiate the correct flux.OperationSpec type for that function, given the runtime arguments. If a builtin OperationSpec is not found, then it will check for functions defined at runtime, and otherwise return an error. The following registrations are typically executed in the function's init() for the query phase to execute properly:

flux.RegisterPackageValue(pkgpath, name string, value values.Value)
flux.RegisterOpSpec(k flux.OperationKind, c flux.NewOperationSpec)

Note that to register a function value with a package, the value passed into flux.RegisterPackageValue is computed using the followingfunction:

flux.FunctionValue(name string, c CreateOperationSpec, sig semantic.FunctionPolySignature)

In the plan phase, an operation spec must be converted to a plan.ProcedureSpec. A query plan must know what operations to carry out, including the function names and parameters. In the trivial case, the OperationSpec and ProcedureSpec have identical fields and the operation spec may be encapsulated as part of the procedure spec. The base interface for a plan.ProcedureSpec requires a Kind() function, as well as a Copy() function which should perform a deep copy of the object. Refer to the following interfaces for more information about designing a procedure spec:

plan.ProcedureSpec
plan.PushDownProcedureSpec
plan.BoundedProcedureSpec
plan.YieldProcedureSpec
plan.AggregateProcedureSpec
plan.ParentAwareProcedureSpec

Once you have determined the interface(s) that must be implemented for your function, you register them with

plan.RegisterProcedureSpec(k ProcedureKind, c CreateProcedureSpec, qks ...flux.OperationKind)

The registration in this phase creates two lookups. First, it creates a named lookup in a similar fashion as for OperationSpecs in the query phase. Second, it creates a mapping from OperationSpec types to ProcedureSpec types so that the collection of OperationSpecs for the query can be quickly converted to corresponding Procedure specs. One feature to note is that the registration takes a list of flux.OperationSpec values. This is because several user-facing query functions may map to the same internal procedure.

The primary function of the plan phase is to re-order, re-write and possibly combine the operations described in the incoming query in order to improve the performance of the query execution. The planner has two primary operations for doing this: Pushdowns and ReWrites.

A push down operation is a planning technique for pushing the logic from one operation into another so that only a single composite function needs to be called instead of two simpler function call. A pushdown is implemented by implementing the plan.PushDownProcedureSpec interface, which requires functions that define the rules and methods for executing a pushdown operation.

A Rewrite rule is used to modify one or more ProcedureSpecs in cases where redundant or complementary operations can be combined to get a simpler result. Similar to a pushdown operation, the rewrite is triggered whenever certain rules apply. Rewrite rules are implemented differently and require a separate registration:

plan.RegisterRewriteRule(r RewriteRule)

Which in turn requires an implementation of plan.RewriteRule.

Finally, the execute phase is tasked with executing the specific data processing algorithm for the function. A function implementation registers an implementation of the execute.Transformation interface that implements functions that control how the execution engine will take an input table, apply the function, and produce an output table. A transformation implementation is registered via:

execute.RegisterTransformation(k plan.ProcedureKind, c execute.CreateTransformation)

The registration will record a mapping of the procedure's kind to the given transformation type.

In addition to implementing the transformation type, a number of helper types and functions are provided that facilitate the transformation process:

execute.Administration
execute.Dataset
execute.TableBuilderCache
execute.TableBuilder
execute.NewAggregateTransformationAndDataset
execute.NewRowSelectorTransformationAndDataset
flux.Table
flux.GroupKey
flux.ColMeta
flux.ColReader

The most important part of a function implementation is for the interface method execute.Transformation.Process(id execute.DatasetID, tbl flux.Table). While the full details of how to implement this function are out of the scope of this document, a typical implementation will do the following: 1. Validate the incoming table schema if needed 2. Construct the column and group key schema for the output table via the table builder. 3. Process the incoming table via flux.Table.Do, and use the input data to determine the output rows for the table builder. 4. Add rows to the output table.

Finally, there is a special class of functions do not receive an input table from another function's output. In other words, these transformations do not have a parent process that supplies it with table data. These transformation functions are referred to as sources, and naturally implement a connection to a data source (e.g. influxdb, prometheus, csvFile, etc.). They are registered using:

execute.RegisterSource(k plan.ProcedureKind, c execute.CreateSource)

The substantial part of a source implementation is its Run method, which should connect to the data source, collect its data into flux.Table structures, and apply any transformations associated with the source.

Index

Constants

This section is empty.

Variables

View Source
var FluxTestPackages = func() []*ast.Package {
	var pkgs []*ast.Package
	pkgs = append(pkgs, array.FluxTestPackages...)
	pkgs = append(pkgs, naivebayesclassifier.FluxTestPackages...)
	pkgs = append(pkgs, anomalydetection.FluxTestPackages...)
	pkgs = append(pkgs, statsmodels.FluxTestPackages...)
	pkgs = append(pkgs, tickscript.FluxTestPackages...)
	pkgs = append(pkgs, aggregate.FluxTestPackages...)
	pkgs = append(pkgs, events.FluxTestPackages...)
	pkgs = append(pkgs, csv.FluxTestPackages...)
	pkgs = append(pkgs, date.FluxTestPackages...)
	pkgs = append(pkgs, dict.FluxTestPackages...)
	pkgs = append(pkgs, experimental.FluxTestPackages...)
	pkgs = append(pkgs, aggregate1.FluxTestPackages...)
	pkgs = append(pkgs, array1.FluxTestPackages...)
	pkgs = append(pkgs, bitwise.FluxTestPackages...)
	pkgs = append(pkgs, geo.FluxTestPackages...)
	pkgs = append(pkgs, json.FluxTestPackages...)
	pkgs = append(pkgs, mqtt.FluxTestPackages...)
	pkgs = append(pkgs, oee.FluxTestPackages...)
	pkgs = append(pkgs, prometheus.FluxTestPackages...)
	pkgs = append(pkgs, record.FluxTestPackages...)
	pkgs = append(pkgs, table.FluxTestPackages...)
	pkgs = append(pkgs, generate.FluxTestPackages...)
	pkgs = append(pkgs, http.FluxTestPackages...)
	pkgs = append(pkgs, influxdb.FluxTestPackages...)
	pkgs = append(pkgs, monitor.FluxTestPackages...)
	pkgs = append(pkgs, sample.FluxTestPackages...)
	pkgs = append(pkgs, schema.FluxTestPackages...)
	pkgs = append(pkgs, secrets.FluxTestPackages...)
	pkgs = append(pkgs, tasks.FluxTestPackages...)
	pkgs = append(pkgs, debug.FluxTestPackages...)
	pkgs = append(pkgs, promql.FluxTestPackages...)
	pkgs = append(pkgs, interpolate.FluxTestPackages...)
	pkgs = append(pkgs, math.FluxTestPackages...)
	pkgs = append(pkgs, pagerduty.FluxTestPackages...)
	pkgs = append(pkgs, planner.FluxTestPackages...)
	pkgs = append(pkgs, regexp.FluxTestPackages...)
	pkgs = append(pkgs, sampledata.FluxTestPackages...)
	pkgs = append(pkgs, sql.FluxTestPackages...)
	pkgs = append(pkgs, strings.FluxTestPackages...)
	pkgs = append(pkgs, testing.FluxTestPackages...)
	pkgs = append(pkgs, chronograf.FluxTestPackages...)
	pkgs = append(pkgs, influxql.FluxTestPackages...)
	pkgs = append(pkgs, kapacitor.FluxTestPackages...)
	pkgs = append(pkgs, pandas.FluxTestPackages...)
	pkgs = append(pkgs, prometheus1.FluxTestPackages...)
	pkgs = append(pkgs, promql1.FluxTestPackages...)
	pkgs = append(pkgs, usage.FluxTestPackages...)
	pkgs = append(pkgs, types.FluxTestPackages...)
	pkgs = append(pkgs, universe.FluxTestPackages...)
	return pkgs
}()

Functions

func TestingBenchmarkCalls added in v0.49.0

func TestingBenchmarkCalls(pkg *ast.Package) *ast.File

TestingBenchmarkCalls constructs an ast.File that calls testing.benchmark for each test case within the package.

func TestingInspectCalls

func TestingInspectCalls(pkg *ast.Package) *ast.File

TestingInspectCalls constructs an ast.File that calls testing.inspect for each test case within the package.

func TestingRunCalls

func TestingRunCalls(pkg *ast.Package) *ast.File

TestingRunCalls constructs an ast.File that calls testing.run for each test case within the package.

Types

This section is empty.

Directories

Path Synopsis
contrib
csv
geo
iox
oee
influxdata
influxdb
Package influxdb implements the standard library functions for interacting with influxdb.
Package influxdb implements the standard library functions for interacting with influxdb.
internal
gen
Package socket implements a source that gets input from a socket connection and produces tables given a decoder.
Package socket implements a source that gets input from a socket connection and produces tables given a decoder.
Package universe contains the implementations for the builtin transformation functions.
Package universe contains the implementations for the builtin transformation functions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL