README
¶
Flux - Influx data language
fluxd
is an HTTP server for running Flux queries to one or more InfluxDB
servers.
fluxd
runs on port 8093
by default
Specification
A complete specification can be found in SPEC.md.
Installation
Generic Installation
- Use InfluxDB nightly builds, which can be found here: https://portal.influxdata.com/downloads. Write data to the instance by using telegraf or some other data source.
Note: InfluxDB uses port 8082
for handling Flux queries. Ensure this port is accessible. This port has no authentication.
-
Download
fluxd
from nightly builds: https://portal.influxdata.com/downloads . -
Start
fluxd
. It will connect to the default host and port which islocalhost:8082
. To run in federated mode, add the--storage-hosts
option with each host separated by a comma.
fluxd --storage-hosts localhost:8082
- To run a query, POST a Flux query string to
/query
as thequery
parameter:
curl -XPOST --data-urlencode \
'query=from(bucket:"telegraf/autogen")
|> filter(fn: (r) => r._measurement == "cpu" AND r._field == "usage_user")
|> range(start:-170h)
|> sum()' \
http://localhost:8093/query?organization=my-org
Any value can be used for the organization
parameter. It does not apply to running flux queries against the InfluxDB 1.x nightlies but is required.
Docker Installation
There are now images for Flux and InfluxDB nightlies. If you have docker installed on your machine, this can be an easier method of trying out Flux on your local computer.
- Create a docker network.
docker network create influxdb
- Start the InfluxDB nightly. You can use either the
nightly
ornightly-alpine
tag.
docker volume create influxdb
docker run -d --name=influxdb --net=influxdb -p 8086:8086 -v influxdb:/var/lib/influxdb quay.io/influxdb/influxdb:nightly
Note: If you run influxd
in a container and fluxd
outside of a container, you must add -p 8082:8082
to expose the flux port.
- Start fluxd using the nightly image. There is no
alpine
image for this yet.
docker run -d --name=flux --net=influxdb -p 8093:8093 quay.io/influxdb/flux:nightly
-
Follow the instructions from the General Installation section for how to query the server.
-
When updating, ensure that you pull both nightlies and restart them at the same time. We are making changes often and using the nightlies from the same night will result in fewer problems than only updating one of them.
Prometheus metrics
Metrics are exposed on /metrics
.
fluxd
records the number of queries and the number of different functions within Flux queries
Federated Mode
By passing multiple hosts to the --storage-hosts
option, fluxd
will query multiple InfluxDB servers.
For example:
fluxd --storage-hosts influxdb1:8082,influxdb2:8082
The results from multiple InfluxDB are merged together as if there was one server.
Getting Started
Flux runs a query by reading a data source into a collection of tables and then passing each table through transformation steps to describe the desired query operations. Each table is composed of zero or more rows. Transformations are represented as functions which take a table of data as an input argument and return a new table that has been transformed. There are also special functions that combine and separate existing tables into new tables with a different grouping.
Basic Syntax
All queries begin with the function from
. It is a source function that does not accept any input, but produces a stream of tables as output. All queries must be followed by a range
function which will limit the time range that data is selected from. Even if you want all data, you must specify a time range. This is so it is explicit that a user wants to query the entire database instead of the much more common range of time data.
from(bucket: "telegraf/autogen") |> range(start: -5m)
Function parameters are all keyword arguments. There are no positional arguments. The from
function takes a single parameter: db
. This specifies the InfluxDB database it should read from. At the moment, it is not possible to read from anything other than the default retention policy for a database. The from
function will organize the data so that each series is its own table. That means that all transformations will happen per series unless this is changed by using group
or window
(explained below). If you are familiar with InfluxDB 1.x, this is the opposite of InfluxQL's default behavior. InfluxQL would automatically group all series into the same table.
Functions are separated by the |>
operator. This operator will take the stream of tables from one function and it will send it as input to another function that takes a stream of tables as the input. In this case, the from
function outputs a stream of tables and sends that output to the range
function which filters out any rows from each table that are not within the specified time range. The range
function takes two parameters: start
and stop
. The default stop
time is the current time and a duration, like -5m
, can be used to specify a relative time from the current time. That means the above query is asking for all data within the last 5 minutes.
The from
function creates a table where each row has the following attributes:
_measurement
- the measurement of the series_field
- the field of the series_value
- the output value_start
- the start time of the table (equal to the range start)_stop
- the stop time of the table (equal to the range stop)_time
- the time for each row
The |>
operator is used extensively in flux so you will see it in all of the query examples. Each transformation can be chained to another transformation so, while the examples below will be simple, they can be combined to yield the desired query and table structure.
Filter rows with an anonymous function
The rows can be filtered by using the filter
function. When communicating with influxd
, the measurement name is put into the _measurement
tag and the field name is put into the _field
tag. If you wanted to filter by a specific measurement or field, you could do that by using filter like this:
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
The filter
function takes a function returning a boolean as its only parameter. The function parameter name is fn
. An anonymous function is defined by using parenthesis to specify the function arguments, using the =>
operator, and then either followed by a single line with the expression. See the User Defined Functions section for more information about defining functions and how to create your own.
When accessing data in a table, dot syntax or indexing syntax can be used. In the above, we used dot syntax. You can also use the indexing syntax like this: r["_measurement"]
. This document will use the dot syntax, but either one can be used anywhere.
It is also common in flux to break up longer lines by including a newline between the different function calls. It is convention to have a newline followed by a tab and the pipe operator before writing the next function.
Limit the number of rows
The limit
function can be used to limit the number of points in each table. It takes a single parameter which is n
.
from(bucket: "telegraf/autogen") |> range(start: -5m) |> limit(n: 1)
Aggregates and Selectors
Aggregates and selectors will execute the aggregation/selection function on each table. The output is defined for each function, but most aggregates will output a single row for each table and most selectors will select a single row for each table.
To find the mean of each table, you could use the mean()
function.
from(bucket: "telegraf/autogen") |> range(start: -5m) |> mean()
If you wanted to find the maximum value in each table, you could use max()
.
from(bucket: "telegraf/autogen") |> range(start: -5m) |> max()
The full list of aggregation and selection functions is located in the spec.
Grouping and Windowing
Since flux will group each series into its own table, we sometimes need to modify this grouping if we wanted to combine different series. As an example, what if we wanted to know the average user cpu usage for each AWS region? A common schema would be to have two tags: region
and host
. We would write that query like this:
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> group(columns: ["region"])
|> mean()
The group
function would take every row that had the same region
value and put it into a single table. If we had servers in two different regions, it would result in us having two different tables.
Similarly, if we wanted to group points into buckets of time, the window
function can do that. We can modify the above function to give us the mean for every minute pretty easily.
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> group(columns: ["region"])
|> window(every: 1m)
|> mean()
Map
It is also possible to perform math and rename the columns by using the map
function. The map
function takes a function and will execute that function on each row to output a new row for the output table.
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> map(fn: (r) => {_value: r._value / 100})
The function passed into map must return an object. An object is a key/value structure. By default, the map
function will merge any columns within the grouping key into the new row so you do not have to specify all of the existing columns that you do not want to modify. If you do not want to automtaically merge those columns, you can use mergeKey: false
as a parameter to map
.
Note: Math support is limited right now and the filter is required because the query engine will throw an error if the value is of different types with different series. So you must filter the results so only fields with a single type are selected at the moment.
User Defined Functions
A user can define their own function which can then be reused. To do this, we use the function syntax and assign it to a variable.
add = (table=<-, n) => map(table: table, fn: (r) => {_value: r._value + n})
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> add(n: 5)
When defining a function, default arguments can be specified by using an equals sign. In addition, a table processing function can be specified by including one parameter that takes <-
as an input. The typical parameter name for these is table
, but it can be any name since the pipe operator does not use a specific name. In the above example, we build a new function around the existing map
function by passing the table to the map
function as a parameter instead of with the pipe. If you wanted to use the pipe operator instead, the following is also valid:
add = (table=<-, n) => table |> map(fn: (r) => {_value: r._value + n})
from(bucket: "telegraf/autogen") |> range(start: -5m)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_user")
|> add(n: 5)
Documentation
¶
Index ¶
- Constants
- Variables
- func AddDialectMappings(mappings flux.DialectMappings) error
- func BucketsAccessed(ast *ast.Package, orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter, err error)
- func ContextWithRequest(ctx context.Context, req *Request) context.Context
- func SetReturnNoContent(header http.Header, withError bool)
- type AsyncQueryService
- type Authorizer
- type BooleanIterator
- type BooleanPoint
- type BucketAwareOperationSpec
- type BucketLookup
- func (b *BucketLookup) FindAllBuckets(ctx context.Context, orgID platform.ID) ([]*platform.Bucket, int)
- func (b *BucketLookup) Lookup(ctx context.Context, orgID platform.ID, name string) (platform.ID, bool)
- func (b *BucketLookup) LookupName(ctx context.Context, orgID platform.ID, id platform.ID) string
- type FloatIterator
- type FloatPoint
- type IntegerIterator
- type IntegerPoint
- type Interval
- type Iterator
- type IteratorCost
- type IteratorOptions
- type IteratorStats
- type Log
- type Logger
- type LoggingProxyQueryService
- type MathValuer
- type NoContentDialect
- type NoContentEncoder
- type NoContentWithErrorDialect
- type NoContentWithErrorEncoder
- type OrganizationLookup
- type ProxyQueryService
- type ProxyQueryServiceAsyncBridge
- type ProxyRequest
- type QueryService
- type QueryServiceBridge
- type QueryServiceProxyBridge
- type REPLQuerier
- type Request
- func (r *Request) ApplyOptions(header http.Header) error
- func (r Request) MarshalJSON() ([]byte, error)
- func (r *Request) UnmarshalJSON(data []byte) error
- func (r *Request) WithCompilerMappings(mappings flux.CompilerMappings)
- func (r *Request) WithOption(option RequestHeaderOption)
- func (r *Request) WithReturnNoContent(withError bool)
- type RequestHeaderOption
- type SecretLookup
- type StringIterator
- type StringPoint
- type TagSet
- type Tags
- type UnsignedIterator
- type UnsignedPoint
Constants ¶
const ( NoContentDialectType = "no-content" NoContentWErrDialectType = "no-content-with-error" )
const ( PreferHeaderKey = "Prefer" PreferNoContentHeaderValue = "return-no-content" PreferNoContentWErrHeaderValue = "return-no-content-with-error" )
const ZeroTime = int64(math.MinInt64)
ZeroTime is the Unix nanosecond timestamp for no time. This time is not used by the query engine or the storage engine as a valid time.
Variables ¶
var ( // ErrQueryInterrupted is an error returned when the query is interrupted. ErrQueryInterrupted = errors.New("query interrupted") )
var OpenAuthorizer = openAuthorizer{}
OpenAuthorizer can be shared by all goroutines.
Functions ¶
func AddDialectMappings ¶
func AddDialectMappings(mappings flux.DialectMappings) error
AddDialectMappings adds the mappings for the no-content dialects.
func BucketsAccessed ¶
func BucketsAccessed(ast *ast.Package, orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter, err error)
BucketsAccessed returns the set of buckets read and written by a query spec
func ContextWithRequest ¶
ContextWithRequest returns a new context with a reference to the request.
func SetReturnNoContent ¶
SetReturnNoContent sets the header for a Request to return no content.
Types ¶
type AsyncQueryService ¶
type AsyncQueryService interface { // Query submits a query for execution returning immediately. // Done must be called on any returned Query objects. Query(ctx context.Context, req *Request) (flux.Query, error) }
AsyncQueryService represents a service for performing queries where the results are delivered asynchronously.
type Authorizer ¶
type Authorizer interface { // AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name. AuthorizeDatabase(p influxql.Privilege, name string) bool // AuthorizeQuery returns an error if the query cannot be executed AuthorizeQuery(database string, query *influxql.Query) error // AuthorizeSeriesRead determines if a series is authorized for reading AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool // AuthorizeSeriesWrite determines if a series is authorized for writing AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool }
Authorizer determines if certain operations are authorized.
type BooleanIterator ¶
type BooleanIterator interface { Iterator Next() (*BooleanPoint, error) }
BooleanIterator represents a stream of boolean points.
type BooleanPoint ¶
type BooleanPoint struct { Name string Tags Tags Time int64 Value bool Aux []interface{} // Total number of points that were combined into this point from an aggregate. // If this is zero, the point is not the result of an aggregate function. Aggregated uint32 Nil bool }
BooleanPoint represents a point with a bool value. DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. See TestPoint_Fields in influxql/point_test.go for more details.
type BucketAwareOperationSpec ¶
type BucketAwareOperationSpec interface {
BucketsAccessed(orgID *platform.ID) (readBuckets, writeBuckets []platform.BucketFilter)
}
BucketAwareOperationSpec specifies an operation that reads or writes buckets
type BucketLookup ¶
type BucketLookup struct {
BucketService platform.BucketService
}
BucketLookup converts Flux bucket lookups into platform.BucketService calls.
func FromBucketService ¶
func FromBucketService(srv platform.BucketService) *BucketLookup
FromBucketService wraps an platform.BucketService in the BucketLookup interface.
func (*BucketLookup) FindAllBuckets ¶
func (*BucketLookup) Lookup ¶
func (b *BucketLookup) Lookup(ctx context.Context, orgID platform.ID, name string) (platform.ID, bool)
Lookup returns the bucket id and its existence given an org id and bucket name.
func (*BucketLookup) LookupName ¶
LookupName returns an bucket name given its organization ID and its bucket ID.
type FloatIterator ¶
type FloatIterator interface { Iterator Next() (*FloatPoint, error) }
FloatIterator represents a stream of float points.
type FloatPoint ¶
type FloatPoint struct { Name string Tags Tags Time int64 Value float64 Aux []interface{} // Total number of points that were combined into this point from an aggregate. // If this is zero, the point is not the result of an aggregate function. Aggregated uint32 Nil bool }
FloatPoint represents a point with a float64 value. DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. See TestPoint_Fields in influxql/point_test.go for more details.
type IntegerIterator ¶
type IntegerIterator interface { Iterator Next() (*IntegerPoint, error) }
IntegerIterator represents a stream of integer points.
type IntegerPoint ¶
type IntegerPoint struct { Name string Tags Tags Time int64 Value int64 Aux []interface{} // Total number of points that were combined into this point from an aggregate. // If this is zero, the point is not the result of an aggregate function. Aggregated uint32 Nil bool }
IntegerPoint represents a point with a int64 value. DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. See TestPoint_Fields in influxql/point_test.go for more details.
type Iterator ¶
type Iterator interface { Stats() IteratorStats Close() error }
Iterator represents a generic interface for all Iterators. Most iterator operations are done on the typed sub-interfaces.
type IteratorCost ¶
type IteratorCost struct { // The total number of shards that are touched by this query. NumShards int64 // The total number of non-unique series that are accessed by this query. // This number matches the number of cursors created by the query since // one cursor is created for every series. NumSeries int64 // CachedValues returns the number of cached values that may be read by this // query. CachedValues int64 // The total number of non-unique files that may be accessed by this query. // This will count the number of files accessed by each series so files // will likely be double counted. NumFiles int64 // The number of blocks that had the potential to be accessed. BlocksRead int64 // The amount of data that can be potentially read. BlockSize int64 }
IteratorCost contains statistics retrieved for explaining what potential cost may be incurred by instantiating an iterator.
func (IteratorCost) Combine ¶
func (c IteratorCost) Combine(other IteratorCost) IteratorCost
Combine combines the results of two IteratorCost structures into one.
type IteratorOptions ¶
type IteratorOptions struct { // Expression to iterate for. // This can be VarRef or a Call. Expr influxql.Expr // Auxiliary tags or values to also retrieve for the point. Aux []influxql.VarRef // Data sources from which to receive data. This is only used for encoding // measurements over RPC and is no longer used in the open source version. Sources []influxql.Source // Group by interval and tags. Interval Interval Dimensions []string // The final dimensions of the query (stays the same even in subqueries). GroupBy map[string]struct{} // Dimensions to group points by in intermediate iterators. Location *time.Location // Fill options. Fill influxql.FillOption FillValue interface{} // Condition to filter by. Condition influxql.Expr // Time range for the iterator. StartTime int64 EndTime int64 // Sorted in time ascending order if true. Ascending bool // Limits the number of points per series. Limit, Offset int // Limits the number of series. SLimit, SOffset int // Removes the measurement name. Useful for meta queries. StripName bool // Removes duplicate rows from raw queries. Dedupe bool // Determines if this is a query for raw data or an aggregate/selector. Ordered bool // Limits on the creation of iterators. MaxSeriesN int // If this channel is set and is closed, the iterator should try to exit // and close as soon as possible. InterruptCh <-chan struct{} // Authorizer can limit access to data Authorizer Authorizer }
IteratorOptions is an object passed to CreateIterator to specify creation options.
func (IteratorOptions) SeekTime ¶
func (opt IteratorOptions) SeekTime() int64
SeekTime returns the time the iterator should start from. For ascending iterators this is the start time, for descending iterators it's the end time.
func (IteratorOptions) StopTime ¶
func (opt IteratorOptions) StopTime() int64
StopTime returns the time the iterator should end at. For ascending iterators this is the end time, for descending iterators it's the start time.
type IteratorStats ¶
IteratorStats represents statistics about an iterator. Some statistics are available immediately upon iterator creation while some are derived as the iterator processes data.
type Log ¶
type Log struct { // Time is the time the query was completed Time time.Time // OrganizationID is the ID of the organization that requested the query OrganizationID platform.ID // TraceID is the ID of the trace related to this query TraceID string // Sampled specifies whether the trace for TraceID was chosen for permanent storage // by the sampling mechanism of the tracer Sampled bool // Error is any error encountered by the query Error error // ProxyRequest is the query request ProxyRequest *ProxyRequest // ResponseSize is the size in bytes of the query response ResponseSize int64 // Statistics is a set of statistics about the query execution Statistics flux.Statistics }
Log captures a query and any relevant metadata for the query execution.
type LoggingProxyQueryService ¶
type LoggingProxyQueryService struct {
// contains filtered or unexported fields
}
LoggingProxyQueryService wraps a ProxyQueryService and logs the queries.
func NewLoggingProxyQueryService ¶
func NewLoggingProxyQueryService(log *zap.Logger, queryLogger Logger, proxyQueryService ProxyQueryService) *LoggingProxyQueryService
func (*LoggingProxyQueryService) Check ¶
func (s *LoggingProxyQueryService) Check(ctx context.Context) check.Response
func (*LoggingProxyQueryService) Query ¶
func (s *LoggingProxyQueryService) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (stats flux.Statistics, err error)
Query executes and logs the query.
func (*LoggingProxyQueryService) SetNowFunctionForTesting ¶
func (s *LoggingProxyQueryService) SetNowFunctionForTesting(nowFunction func() time.Time)
type MathValuer ¶
type MathValuer struct{}
func (MathValuer) Call ¶
func (v MathValuer) Call(name string, args []interface{}) (interface{}, bool)
func (MathValuer) Value ¶
func (MathValuer) Value(key string) (interface{}, bool)
type NoContentDialect ¶
type NoContentDialect struct{}
NoContentDialect is a dialect that provides an Encoder that discards query results. When invoking `dialect.Encoder().Encode(writer, results)`, `results` get consumed, while the `writer` is left intact. It is an HTTPDialect that sets the response status code to 204 NoContent.
func NewNoContentDialect ¶
func NewNoContentDialect() *NoContentDialect
func (*NoContentDialect) DialectType ¶
func (d *NoContentDialect) DialectType() flux.DialectType
func (*NoContentDialect) Encoder ¶
func (d *NoContentDialect) Encoder() flux.MultiResultEncoder
func (*NoContentDialect) SetHeaders ¶
func (d *NoContentDialect) SetHeaders(w http.ResponseWriter)
type NoContentEncoder ¶
type NoContentEncoder struct{}
func (*NoContentEncoder) Encode ¶
func (e *NoContentEncoder) Encode(w io.Writer, results flux.ResultIterator) (int64, error)
type NoContentWithErrorDialect ¶
type NoContentWithErrorDialect struct {
csv.ResultEncoderConfig
}
NoContentWithErrorDialect is a dialect that provides an Encoder that discards query results, but it encodes runtime errors from the Flux query in CSV format. To discover if there was any runtime error in the query, one should check the response size. If it is equal to zero, then no error was present. Otherwise one can decode the response body to get the error. For example: ``` _, err = csv.NewResultDecoder(csv.ResultDecoderConfig{}).Decode(bytes.NewReader(res))
if err != nil { // we got some runtime error }
```
func NewNoContentWithErrorDialect ¶
func NewNoContentWithErrorDialect() *NoContentWithErrorDialect
func (*NoContentWithErrorDialect) DialectType ¶
func (d *NoContentWithErrorDialect) DialectType() flux.DialectType
func (*NoContentWithErrorDialect) Encoder ¶
func (d *NoContentWithErrorDialect) Encoder() flux.MultiResultEncoder
func (*NoContentWithErrorDialect) SetHeaders ¶
func (d *NoContentWithErrorDialect) SetHeaders(w http.ResponseWriter)
type NoContentWithErrorEncoder ¶
type NoContentWithErrorEncoder struct {
// contains filtered or unexported fields
}
func (*NoContentWithErrorEncoder) Encode ¶
func (e *NoContentWithErrorEncoder) Encode(w io.Writer, results flux.ResultIterator) (int64, error)
type OrganizationLookup ¶
type OrganizationLookup struct {
OrganizationService platform.OrganizationService
}
OrganizationLookup converts organization name lookups into platform.OrganizationService calls.
func FromOrganizationService ¶
func FromOrganizationService(srv platform.OrganizationService) *OrganizationLookup
FromOrganizationService wraps a platform.OrganizationService in the OrganizationLookup interface.
func (*OrganizationLookup) Lookup ¶
Lookup returns the organization ID and its existence given an organization name.
func (*OrganizationLookup) LookupName ¶
LookupName returns an organization name given its ID.
type ProxyQueryService ¶
type ProxyQueryService interface { check.Checker // Query performs the requested query and encodes the results into w. // The number of bytes written to w is returned __independent__ of any error. Query(ctx context.Context, w io.Writer, req *ProxyRequest) (flux.Statistics, error) }
ProxyQueryService performs queries and encodes the result into a writer. The results are opaque to a ProxyQueryService.
type ProxyQueryServiceAsyncBridge ¶
type ProxyQueryServiceAsyncBridge struct {
AsyncQueryService AsyncQueryService
}
ProxyQueryServiceAsyncBridge implements ProxyQueryService while consuming an AsyncQueryService
func (ProxyQueryServiceAsyncBridge) Check ¶
func (ProxyQueryServiceAsyncBridge) Check(context.Context) check.Response
Check returns the status of this query service. Since this bridge consumes an AsyncQueryService, which is not available over the network, this check always passes.
func (ProxyQueryServiceAsyncBridge) Query ¶
func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (flux.Statistics, error)
type ProxyRequest ¶
type ProxyRequest struct { // Request is the basic query request Request Request `json:"request"` // Dialect is the result encoder Dialect flux.Dialect `json:"dialect"` // contains filtered or unexported fields }
ProxyRequest specifies a query request and the dialect for the results.
func (ProxyRequest) MarshalJSON ¶
func (r ProxyRequest) MarshalJSON() ([]byte, error)
func (*ProxyRequest) UnmarshalJSON ¶
func (r *ProxyRequest) UnmarshalJSON(data []byte) error
UnmarshalJSON populates the request from the JSON data. WithCompilerMappings and WithDialectMappings must have been called or an error will occur.
func (*ProxyRequest) WithCompilerMappings ¶
func (r *ProxyRequest) WithCompilerMappings(mappings flux.CompilerMappings)
WithCompilerMappings sets the compiler type mappings on the request.
func (*ProxyRequest) WithDialectMappings ¶
func (r *ProxyRequest) WithDialectMappings(mappings flux.DialectMappings)
WithDialectMappings sets the dialect type mappings on the request.
type QueryService ¶
type QueryService interface { check.Checker // Query submits a query for execution returning a results iterator. // Cancel must be called on any returned results to free resources. Query(ctx context.Context, req *Request) (flux.ResultIterator, error) }
QueryService represents a type capable of performing queries.
type QueryServiceBridge ¶
type QueryServiceBridge struct {
AsyncQueryService AsyncQueryService
}
QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface.
func (QueryServiceBridge) Check ¶
func (QueryServiceBridge) Check(context.Context) check.Response
Check returns the status of this query service. Since this bridge consumes an AsyncQueryService, which is not available over the network, this check always passes.
func (QueryServiceBridge) Query ¶
func (b QueryServiceBridge) Query(ctx context.Context, req *Request) (flux.ResultIterator, error)
type QueryServiceProxyBridge ¶
type QueryServiceProxyBridge struct {
ProxyQueryService ProxyQueryService
}
QueryServiceProxyBridge implements QueryService while consuming a ProxyQueryService interface.
func (QueryServiceProxyBridge) Check ¶
func (b QueryServiceProxyBridge) Check(ctx context.Context) check.Response
func (QueryServiceProxyBridge) Query ¶
func (b QueryServiceProxyBridge) Query(ctx context.Context, req *Request) (flux.ResultIterator, error)
type REPLQuerier ¶
type REPLQuerier struct { // Authorization is the authorization to provide for all requests Authorization *platform.Authorization // OrganizationID is the ID to provide for all requests OrganizationID platform.ID QueryService QueryService }
REPLQuerier implements the repl.Querier interface while consuming a QueryService
func (*REPLQuerier) Query ¶
func (q *REPLQuerier) Query(ctx context.Context, deps flux.Dependencies, compiler flux.Compiler) (flux.ResultIterator, error)
Query will pack a query to be sent to a remote server for execution. deps may be safely ignored since they will be correctly initialized on the server side.
type Request ¶
type Request struct { // Scope Authorization *platform.Authorization `json:"authorization,omitempty"` OrganizationID platform.ID `json:"organization_id"` // Compiler converts the query to a specification to run against the data. Compiler flux.Compiler `json:"compiler"` // Source represents the ultimate source of the request. Source string `json:"source"` // contains filtered or unexported fields }
Request represents the query to run. Options to mutate the header associated to this Request can be specified via `WithOption` or associated methods. One should always `Request.ApplyOptions()` before encoding and sending the request.
func RequestFromContext ¶
RequestFromContext retrieves a *Request from a context. If not request exists on the context nil is returned.
func (*Request) ApplyOptions ¶
ApplyOptions applies every option added to this Request to the given header.
func (Request) MarshalJSON ¶
func (*Request) UnmarshalJSON ¶
UnmarshalJSON populates the request from the JSON data. WithCompilerMappings must have been called or an error will occur.
func (*Request) WithCompilerMappings ¶
func (r *Request) WithCompilerMappings(mappings flux.CompilerMappings)
WithCompilerMappings sets the query type mappings on the request.
func (*Request) WithOption ¶
func (r *Request) WithOption(option RequestHeaderOption)
WithOption adds a RequestHeaderOption to this Request.
func (*Request) WithReturnNoContent ¶
WithReturnNoContent makes this Request return no content.
type RequestHeaderOption ¶
RequestHeaderOption is a function that mutates the header associated to a Request.
type SecretLookup ¶
type SecretLookup struct {
SecretService platform.SecretService
}
SecretLookup wraps the platform.SecretService to perform lookups based on the organization in the context.
func FromSecretService ¶
func FromSecretService(srv platform.SecretService) *SecretLookup
FromSecretService wraps a platform.OrganizationService in the OrganizationLookup interface.
func (*SecretLookup) LoadSecret ¶
LoadSecret loads the secret associated with the key in the current organization context.
type StringIterator ¶
type StringIterator interface { Iterator Next() (*StringPoint, error) }
StringIterator represents a stream of string points.
type StringPoint ¶
type StringPoint struct { Name string Tags Tags Time int64 Value string Aux []interface{} // Total number of points that were combined into this point from an aggregate. // If this is zero, the point is not the result of an aggregate function. Aggregated uint32 Nil bool }
StringPoint represents a point with a string value. DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. See TestPoint_Fields in influxql/point_test.go for more details.
type TagSet ¶
type TagSet struct { Tags map[string]string Filters []influxql.Expr SeriesKeys []string Key []byte }
TagSet is a fundamental concept within the query system. It represents a composite series, composed of multiple individual series that share a set of tag attributes.
type Tags ¶
type Tags struct{}
Tags represent a map of keys and values. It memoizes its key so it can be used efficiently during query execution.
type UnsignedIterator ¶
type UnsignedIterator interface { Iterator Next() (*UnsignedPoint, error) }
UnsignedIterator represents a stream of unsigned points.
type UnsignedPoint ¶
type UnsignedPoint struct { Name string Tags Tags Time int64 Value uint64 Aux []interface{} // Total number of points that were combined into this point from an aggregate. // If this is zero, the point is not the result of an aggregate function. Aggregated uint32 Nil bool }
UnsignedPoint represents a point with a uint64 value. DO NOT ADD ADDITIONAL FIELDS TO THIS STRUCT. See TestPoint_Fields in influxql/point_test.go for more details.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package builtin ensures all packages related to Flux built-ins are imported and initialized.
|
Package builtin ensures all packages related to Flux built-ins are imported and initialized. |
Package control keeps track of resources and manages queries.
|
Package control keeps track of resources and manages queries. |
Package influxql implements the transpiler for executing influxql queries in the 2.0 query engine.
|
Package influxql implements the transpiler for executing influxql queries in the 2.0 query engine. |
spectests
Package spectests the influxql transpiler specification tests.
|
Package spectests the influxql transpiler specification tests. |
Package promql implements a promql parser to build flux query specifications from promql.
|
Package promql implements a promql parser to build flux query specifications from promql. |