Documentation ¶
Overview ¶
Provides an API for constructing data processing pipelines.
The nodes defined in this package just define how nodes can be linked together not the actual implementation of the transformation functions.
Index ¶
- type AlertNode
- func (a *AlertNode) Channel(channel string) *AlertNode
- func (n *AlertNode) Children() []Node
- func (n *AlertNode) Desc() string
- func (a *AlertNode) Email(to ...string) *AlertNode
- func (a *AlertNode) Exec(executable string, args ...string) *AlertNode
- func (a *AlertNode) Flapping(low, high float64) Node
- func (n *AlertNode) ID() ID
- func (n *AlertNode) Name() string
- func (a *AlertNode) PagerDuty() *AlertNode
- func (n *AlertNode) Parents() []Node
- func (n *AlertNode) Provides() EdgeType
- func (a *AlertNode) RoutingKey(routingKey string) *AlertNode
- func (n *AlertNode) SetName(name string)
- func (a *AlertNode) Slack() *AlertNode
- func (a *AlertNode) StateChangesOnly() *AlertNode
- func (a *AlertNode) VictorOps() *AlertNode
- func (n *AlertNode) Wants() EdgeType
- type BatchNode
- func (n *BatchNode) Alert() *AlertNode
- func (n *BatchNode) Derivative(field string) *DerivativeNode
- func (n *BatchNode) Eval(expressions ...tick.Node) *EvalNode
- func (b *BatchNode) GroupBy(d ...interface{}) *BatchNode
- func (n *BatchNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *BatchNode) InfluxDBOut() *InfluxDBOutNode
- func (n *BatchNode) Join(others ...Node) *JoinNode
- func (n *BatchNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *BatchNode) Sample(rate interface{}) *SampleNode
- func (n *BatchNode) Union(node ...Node) *UnionNode
- func (n *BatchNode) Where(expression tick.Node) *WhereNode
- func (n *BatchNode) Window() *WindowNode
- type DerivativeNode
- func (n *DerivativeNode) Alert() *AlertNode
- func (n *DerivativeNode) Derivative(field string) *DerivativeNode
- func (n *DerivativeNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *DerivativeNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *DerivativeNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *DerivativeNode) InfluxDBOut() *InfluxDBOutNode
- func (n *DerivativeNode) Join(others ...Node) *JoinNode
- func (n *DerivativeNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (d *DerivativeNode) NonNegative() *DerivativeNode
- func (n *DerivativeNode) Sample(rate interface{}) *SampleNode
- func (n *DerivativeNode) Union(node ...Node) *UnionNode
- func (n *DerivativeNode) Where(expression tick.Node) *WhereNode
- func (n *DerivativeNode) Window() *WindowNode
- type EdgeType
- type EvalNode
- func (n *EvalNode) Alert() *AlertNode
- func (e *EvalNode) As(names ...string) *EvalNode
- func (n *EvalNode) Derivative(field string) *DerivativeNode
- func (n *EvalNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *EvalNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *EvalNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *EvalNode) InfluxDBOut() *InfluxDBOutNode
- func (n *EvalNode) Join(others ...Node) *JoinNode
- func (e *EvalNode) Keep(fields ...string) *EvalNode
- func (n *EvalNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *EvalNode) Sample(rate interface{}) *SampleNode
- func (n *EvalNode) Union(node ...Node) *UnionNode
- func (n *EvalNode) Where(expression tick.Node) *WhereNode
- func (n *EvalNode) Window() *WindowNode
- type GroupByNode
- func (n *GroupByNode) Alert() *AlertNode
- func (n *GroupByNode) Derivative(field string) *DerivativeNode
- func (n *GroupByNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *GroupByNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *GroupByNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *GroupByNode) InfluxDBOut() *InfluxDBOutNode
- func (n *GroupByNode) Join(others ...Node) *JoinNode
- func (n *GroupByNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *GroupByNode) Sample(rate interface{}) *SampleNode
- func (n *GroupByNode) Union(node ...Node) *UnionNode
- func (n *GroupByNode) Where(expression tick.Node) *WhereNode
- func (n *GroupByNode) Window() *WindowNode
- type HTTPOutNode
- func (n *HTTPOutNode) Children() []Node
- func (n *HTTPOutNode) Desc() string
- func (n *HTTPOutNode) ID() ID
- func (n *HTTPOutNode) Name() string
- func (n *HTTPOutNode) Parents() []Node
- func (n *HTTPOutNode) Provides() EdgeType
- func (n *HTTPOutNode) SetName(name string)
- func (n *HTTPOutNode) Wants() EdgeType
- type ID
- type InfluxDBOutNode
- func (n *InfluxDBOutNode) Children() []Node
- func (n *InfluxDBOutNode) Desc() string
- func (n *InfluxDBOutNode) ID() ID
- func (n *InfluxDBOutNode) Name() string
- func (n *InfluxDBOutNode) Parents() []Node
- func (n *InfluxDBOutNode) Provides() EdgeType
- func (n *InfluxDBOutNode) SetName(name string)
- func (i *InfluxDBOutNode) Tag(key, value string) *InfluxDBOutNode
- func (n *InfluxDBOutNode) Wants() EdgeType
- type JoinNode
- func (n *JoinNode) Alert() *AlertNode
- func (j *JoinNode) As(names ...string) *JoinNode
- func (n *JoinNode) Derivative(field string) *DerivativeNode
- func (n *JoinNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *JoinNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *JoinNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *JoinNode) InfluxDBOut() *InfluxDBOutNode
- func (n *JoinNode) Join(others ...Node) *JoinNode
- func (n *JoinNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *JoinNode) Sample(rate interface{}) *SampleNode
- func (n *JoinNode) Union(node ...Node) *UnionNode
- func (n *JoinNode) Where(expression tick.Node) *WhereNode
- func (n *JoinNode) Window() *WindowNode
- type MapNode
- func (n *MapNode) Alert() *AlertNode
- func (n *MapNode) Derivative(field string) *DerivativeNode
- func (n *MapNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *MapNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *MapNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *MapNode) InfluxDBOut() *InfluxDBOutNode
- func (n *MapNode) Join(others ...Node) *JoinNode
- func (n *MapNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *MapNode) Sample(rate interface{}) *SampleNode
- func (n *MapNode) Union(node ...Node) *UnionNode
- func (n *MapNode) Where(expression tick.Node) *WhereNode
- func (n *MapNode) Window() *WindowNode
- type MapReduceInfo
- type Node
- type Pipeline
- type ReduceNode
- func (n *ReduceNode) Alert() *AlertNode
- func (n *ReduceNode) Derivative(field string) *DerivativeNode
- func (n *ReduceNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *ReduceNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *ReduceNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *ReduceNode) InfluxDBOut() *InfluxDBOutNode
- func (n *ReduceNode) Join(others ...Node) *JoinNode
- func (n *ReduceNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *ReduceNode) Sample(rate interface{}) *SampleNode
- func (n *ReduceNode) Union(node ...Node) *UnionNode
- func (r *ReduceNode) UsePointTimes() *ReduceNode
- func (n *ReduceNode) Where(expression tick.Node) *WhereNode
- func (n *ReduceNode) Window() *WindowNode
- type SampleNode
- func (n *SampleNode) Alert() *AlertNode
- func (n *SampleNode) Derivative(field string) *DerivativeNode
- func (n *SampleNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *SampleNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *SampleNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *SampleNode) InfluxDBOut() *InfluxDBOutNode
- func (n *SampleNode) Join(others ...Node) *JoinNode
- func (n *SampleNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *SampleNode) Sample(rate interface{}) *SampleNode
- func (n *SampleNode) Union(node ...Node) *UnionNode
- func (n *SampleNode) Where(expression tick.Node) *WhereNode
- func (n *SampleNode) Window() *WindowNode
- type SourceBatchNode
- func (n *SourceBatchNode) Children() []Node
- func (n *SourceBatchNode) Desc() string
- func (n *SourceBatchNode) ID() ID
- func (n *SourceBatchNode) Name() string
- func (n *SourceBatchNode) Parents() []Node
- func (n *SourceBatchNode) Provides() EdgeType
- func (b *SourceBatchNode) Query(q string) *BatchNode
- func (n *SourceBatchNode) SetName(name string)
- func (n *SourceBatchNode) Wants() EdgeType
- type StreamNode
- func (n *StreamNode) Alert() *AlertNode
- func (n *StreamNode) Derivative(field string) *DerivativeNode
- func (n *StreamNode) Eval(expressions ...tick.Node) *EvalNode
- func (s *StreamNode) From() *StreamNode
- func (n *StreamNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *StreamNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *StreamNode) InfluxDBOut() *InfluxDBOutNode
- func (n *StreamNode) Join(others ...Node) *JoinNode
- func (n *StreamNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *StreamNode) Sample(rate interface{}) *SampleNode
- func (n *StreamNode) Union(node ...Node) *UnionNode
- func (s *StreamNode) Where(expression tick.Node) *StreamNode
- func (n *StreamNode) Window() *WindowNode
- type UnionNode
- func (n *UnionNode) Alert() *AlertNode
- func (n *UnionNode) Derivative(field string) *DerivativeNode
- func (n *UnionNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *UnionNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *UnionNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *UnionNode) InfluxDBOut() *InfluxDBOutNode
- func (n *UnionNode) Join(others ...Node) *JoinNode
- func (n *UnionNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *UnionNode) Sample(rate interface{}) *SampleNode
- func (n *UnionNode) Union(node ...Node) *UnionNode
- func (n *UnionNode) Where(expression tick.Node) *WhereNode
- func (n *UnionNode) Window() *WindowNode
- type WhereNode
- func (n *WhereNode) Alert() *AlertNode
- func (n *WhereNode) Derivative(field string) *DerivativeNode
- func (n *WhereNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *WhereNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *WhereNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *WhereNode) InfluxDBOut() *InfluxDBOutNode
- func (n *WhereNode) Join(others ...Node) *JoinNode
- func (n *WhereNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *WhereNode) Sample(rate interface{}) *SampleNode
- func (n *WhereNode) Union(node ...Node) *UnionNode
- func (n *WhereNode) Where(expression tick.Node) *WhereNode
- func (n *WhereNode) Window() *WindowNode
- type WindowNode
- func (n *WindowNode) Alert() *AlertNode
- func (w *WindowNode) Align() *WindowNode
- func (n *WindowNode) Derivative(field string) *DerivativeNode
- func (n *WindowNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *WindowNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *WindowNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *WindowNode) InfluxDBOut() *InfluxDBOutNode
- func (n *WindowNode) Join(others ...Node) *JoinNode
- func (n *WindowNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *WindowNode) Sample(rate interface{}) *SampleNode
- func (n *WindowNode) Union(node ...Node) *UnionNode
- func (n *WindowNode) Where(expression tick.Node) *WhereNode
- func (n *WindowNode) Window() *WindowNode
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AlertNode ¶
type AlertNode struct { // Template for constructing a unique ID for a given alert. // // Available template data: // // * Name -- Measurement name. // * Group -- Concatenation of all group-by tags of the form [key=value,]+. // If no groupBy is performed equal to literal 'nil'. // * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value. // // Example: // stream.from().measurement('cpu') // .groupBy('cpu') // .alert() // .id('kapacitor/{{ .Name }}/{{ .Group }}') // // ID: kapacitor/cpu/cpu=cpu0, // // Example: // stream... // .groupBy('service') // .alert() // .id('kapacitor/{{ index .Tags "service" }}') // // ID: kapacitor/authentication // // Example: // stream... // .groupBy('service', 'host') // .alert() // .id('kapacitor/{{ index .Tags "service" }}/{{ index .Tags "host" }}') // // ID: kapacitor/authentication/auth001.example.com // // Default: {{ .Name }}:{{ .Group }} Id string // Template for constructing a meaningful message for the alert. // // Available template data: // // * ID -- The ID of the alert. // * Name -- Measurement name. // * Group -- Concatenation of all group-by tags of the form [key=value,]+. // If no groupBy is performed equal to literal 'nil'. // * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value. // * Level -- Alert Level, one of: INFO, WARNING, CRITICAL. // * Fields -- Map of fields. Use '{{ index .Fields "key" }}' to get a specific field value. // // Example: // stream... // .groupBy('service', 'host') // .alert() // .id('{{ index .Tags "service" }}/{{ index .Tags "host" }}') // .message('{{ .ID }} is {{ .Level}} value: {{ index .Fields "value" }}') // // Message: authentication/auth001.example.com is CRITICAL value:42 // // Default: {{ .ID }} is {{ .Level }} Message string // Filter expression for the INFO alert level. // An empty value indicates the level is invalid and is skipped. Info tick.Node // Filter expression for the WARNING alert level. // An empty value indicates the level is invalid and is skipped. Warn tick.Node // Filter expression for the CRITICAL alert level. // An empty value indicates the level is invalid and is skipped. Crit tick.Node //tick:ignore UseFlapping bool //tick:ignore FlapLow float64 //tick:ignore FlapHigh float64 // Number of previous states to remember when computing flapping levels and // checking for state changes. // Minimum value is 2 in order to keep track of current and previous states. // // Default: 21 History int // Post the JSON alert data to the specified URL. Post string // Email settings //tick:ignore UseEmail bool //tick:ignore ToList []string // A command to run when an alert triggers //tick:ignore Command []string // Log JSON alert data to file. One event per line. Log string // Send alert to VictorOps. // tick:ignore UseVictorOps bool // VictorOps RoutingKey. // tick:ignore VictorOpsRoutingKey string // Send alert to PagerDuty. // tick:ignore UsePagerDuty bool // Send alert to Slack. // tick:ignore UseSlack bool // Slack channel // tick:ignore SlackChannel string // Send alerts only on state changes. // tick:ignore IsStateChangesOnly bool // contains filtered or unexported fields }
An AlertNode can trigger an event of varying severity levels, and pass the event to alert handlers. The criteria for triggering an alert is specified via a [lambda expression](https://influxdb.com/docs/kapacitor/v0.1/tick/expr.html). See AlertNode.Info, AlertNode.Warn, and AlertNode.Crit below.
Different event handlers can be configured for each AlertNode. Some handlers like Email, Slack, VictorOps and PagerDuty have a configuration option 'global' that indicates that all alerts implicitly use the handler.
Available event handlers:
- log -- log alert data to file.
- post -- HTTP POST data to a specified URL.
- email -- Send and email with alert data.
- exec -- Execute a command passing alert data over STDIN.
- Slack -- Post alert message to Slack channel.
- VictorOps -- Send alert to VictorOps.
- PagerDuty -- Send alert to PagerDuty.
See below for more details on configuring each handler.
Each event that gets sent to a handler contains the following alert data:
- ID -- the ID of the alert, user defined.
- Message -- the alert message, user defined.
- Time -- the time the alert occurred.
- Level -- one of OK, INFO, WARNING or CRITICAL.
- Data -- influxql.Result containing the data that triggered the alert.
Events are sent to handlers if the alert is in a state other than 'OK' or the alert just changed to the 'OK' state from a non 'OK' state (a.k.a. the alert recovered). Using the AlertNode.StateChangesOnly property events will only be sent to handlers if the alert changed state.
Example:
stream .groupBy('service') .alert() .id('kapacitor/{{ index .Tags "service" }}') .message('{{ .ID }} is {{ .Level }} value:{{ index .Fields "value" }}') .info(lambda: "value" > 10) .warn(lambda: "value" > 20) .crit(lambda: "value" > 30) .post("http://example.com/api/alert")
It is assumed that each successive level filters a subset of the previous level. As a result, the filter will only be applied if a data point passed the previous level. In the above example, if value = 15 then the INFO and WARNING expressions would be evaluated, but not the CRITICAL expression. Each expression maintains its own state.
func (*AlertNode) Channel ¶
Slack channel in which to post messages. If empty uses the channel from the configuration. tick:property
func (*AlertNode) Email ¶
Email the alert data.
If the To list is empty, the To addresses from the configuration are used. The email subject is the AlertNode.Message property. The email body is the JSON alert data.
If the 'smtp' section in the configuration has the option: global = true then all alerts are sent via email without the need to explicitly state it in the TICKscript.
Example:
[smtp] enabled = true host = "localhost" port = 25 username = "" password = "" from = "kapacitor@example.com" to = ["oncall@example.com"] # Set global to true so all alert trigger emails. global = true
Example:
stream... .alert()
Send email to 'oncall@example.com' from 'kapacitor@example.com'
**NOTE**: The global option for email also implies stateChangesOnly is set on all alerts. tick:property
func (*AlertNode) Exec ¶
Execute a command whenever an alert is trigger and pass the alert data over STDIN in JSON format. tick:property
func (*AlertNode) Flapping ¶
Perform flap detection on the alerts. The method used is similar method to Nagios: https://assets.nagios.com/downloads/nagioscore/docs/nagioscore/3/en/flapping.html
Each different alerting level is considered a different state. The low and high thresholds are inverted thresholds of a percentage of state changes. Meaning that if the percentage of state changes goes above the `high` threshold, the alert enters a flapping state. The alert remains in the flapping state until the percentage of state changes goes below the `low` threshold. Typical values are low: 0.25 and high: 0.5. The percentage values represent the number state changes over the total possible number of state changes. A percentage change of 0.5 means that the alert changed state in half of the recorded history, and remained the same in the other half of the history. tick:property
func (*AlertNode) PagerDuty ¶
Send the alert to PagerDuty. To use PagerDuty alerting you must first follow the steps to enable a new 'Generic API' service.
From https://developer.pagerduty.com/documentation/integration/events
- In your account, under the Services tab, click "Add New Service".
- Enter a name for the service and select an escalation policy. Then, select "Generic API" for the Service Type.
- Click the "Add Service" button.
- Once the service is created, you'll be taken to the service page. On this page, you'll see the "Service key", which is needed to access the API
Place the 'service key' into the 'pagerduty' section of the Kapacitor configuration as the option 'service-key'.
Example:
[pagerduty] enabled = true service-key = "xxxxxxxxx"
With the correct configuration you can now use PagerDuty in TICKscripts.
Example:
stream... .alert() .pagerDuty()
If the 'pagerduty' section in the configuration has the option: global = true then all alerts are sent to PagerDuty without the need to explicitly state it in the TICKscript.
Example:
[pagerduty] enabled = true service-key = "xxxxxxxxx" global = true
Example:
stream... .alert()
Send alert to PagerDuty. tick:property
func (*AlertNode) RoutingKey ¶
The VictorOps routing key. If not set uses key specified in configuration. tick:property
func (*AlertNode) Slack ¶
Send the alert to Slack. To allow Kapacitor to post to Slack, go to the URL https://slack.com/services/new/incoming-webhook and create a new incoming webhook and place the generated URL in the 'slack' configuration section.
Example:
[slack] enabled = true url = "https://hooks.slack.com/services/xxxxxxxxx/xxxxxxxxx/xxxxxxxxxxxxxxxxxxxxxxxx" channel = "#general"
In order to not post a message every alert interval use AlertNode.StateChangesOnly so that only events where the alert changed state are posted to the channel.
Example:
stream... .alert() .slack()
Send alerts to Slack channel in the configuration file.
Example:
stream... .alert() .slack() .channel('#alerts')
Send alerts to Slack channel '#alerts'
Example:
stream... .alert() .slack() .channel('@jsmith')
Send alert to user '@jsmith'
If the 'slack' section in the configuration has the option: global = true then all alerts are sent to Slack without the need to explicitly state it in the TICKscript.
Example:
[slack] enabled = true url = "https://hooks.slack.com/services/xxxxxxxxx/xxxxxxxxx/xxxxxxxxxxxxxxxxxxxxxxxx" channel = "#general" global = true
Example:
stream... .alert()
Send alert to Slack using default channel '#general'. **NOTE**: The global option for Slack also implies stateChangesOnly is set on all alerts. tick:property
func (*AlertNode) StateChangesOnly ¶
Only sends events where the state changed. Each different alert level OK, INFO, WARNING, and CRITICAL are considered different states.
Example:
stream... .window() .period(10s) .every(10s) .alert() .crit(lambda: "value" > 10) .stateChangesOnly() .slack()
If the "value" is greater than 10 for a total of 60s, then only two events will be sent. First, when the value crosses the threshold, and second, when it falls back into an OK state. Without stateChangesOnly, the alert would have triggered 7 times: 6 times for each 10s period where the condition was met and once more for the recovery.
tick:property
func (*AlertNode) VictorOps ¶
Send alert to VictorOps. To use VictorOps alerting you must first enable the 'Alert Ingestion API' in the 'Integrations' section of VictorOps. Then place the API key from the URL into the 'victorops' section of the Kapacitor configuration.
Example:
[victorops] enabled = true api-key = "xxxxx" routing-key = "everyone"
With the correct configuration you can now use VictorOps in TICKscripts.
Example:
stream... .alert() .victorOps()
Send alerts to VictorOps using the routing key in the configuration file.
Example:
stream... .alert() .victorOps() .routingKey('team_rocket')
Send alerts to VictorOps with routing key 'team_rocket'
If the 'victorops' section in the configuration has the option: global = true then all alerts are sent to VictorOps without the need to explicitly state it in the TICKscript.
Example:
[victorops] enabled = true api-key = "xxxxx" routing-key = "everyone" global = true
Example:
stream... .alert()
Send alert to VictorOps using the default routing key, found in the configuration. tick:property
type BatchNode ¶
type BatchNode struct { // The query text //tick:ignore QueryStr string // The period or length of time that will be queried from InfluxDB Period time.Duration // How often to query InfluxDB. // // The Every property is mutually exclusive with the Cron property. Every time.Duration // Define a schedule using a cron syntax. // // The specific cron implementation is documented here: // https://github.com/gorhill/cronexpr#implementation // // The Cron property is mutually exclusive with the Every property. Cron string // How far back in time to query from the current time // // For example an Offest of 2 hours and an Every of 5m, // Kapacitor will query InfluxDB every 5 minutes for the window of data 2 hours ago. // // This applies to Cron schedules as well. If the cron specifies to run every Sunday at // 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried. Offset time.Duration // The list of dimensions for the group-by clause. //tick:ignore Dimensions []interface{} // Fill the data. // Options are: // // - Any numerical value // - null - exhibits the same behavior as the default // - previous - reports the value of the previous window // - none - suppresses timestamps and values where the value is null Fill interface{} // contains filtered or unexported fields }
A BatchNode defines a source and a schedule for processing batch data. The data is queried from an InfluxDB database and then passed into the data pipeline.
Example: batch
.query(''' SELECT mean("value") FROM "telegraf"."default".cpu_usage_idle WHERE "host" = 'serverA' ''') .period(1m) .every(20s) .groupBy(time(10s), 'cpu') ...
In the above example InfluxDB is queried every 20 seconds; the window of time returned spans 1 minute and is grouped into 10 second buckets.
func (*BatchNode) Alert ¶
func (n *BatchNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*BatchNode) Derivative ¶
func (n *BatchNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*BatchNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*BatchNode) GroupBy ¶
Group the data by a set of dimensions. Can specify one time dimension.
This property adds a `GROUP BY` clause to the query so all the normal behaviors when quering InfluxDB with a `GROUP BY` apply. More details: https://influxdb.com/docs/v0.9/query_language/data_exploration.html#the-group-by-clause
Example:
batch .groupBy(time(10s), 'tag1', 'tag2'))
tick:property
func (*BatchNode) HttpOut ¶
func (n *BatchNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*BatchNode) InfluxDBOut ¶
func (n *BatchNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*BatchNode) MapReduce ¶
func (n *BatchNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*BatchNode) Sample ¶
func (n *BatchNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*BatchNode) Window ¶
func (n *BatchNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type DerivativeNode ¶
type DerivativeNode struct { // The field to use when calculating the derivative // tick:ignore Field string // The new name of the derivative field. // Default is the name of the field used // when calculating the derivative. As string // The time unit of the resulting derivative value. // Default: 1s Unit time.Duration // Where negative values are acceptable. // tick:ignore NonNegativeFlag bool // contains filtered or unexported fields }
Compute the derivative of a stream or batch. The derivative is computed on a single field and behaves similarly to the InfluxQL derivative function. Deriviative is not a MapReduce function and as a result is not part of the normal influxql functions.
Example:
stream .from().measurement('net_rx_packets') .derivative('value') .unit(1s) // default .nonNegative() ...
Computes the derivative via:
(current - previous ) / ( time_difference / unit)
For batch edges the derivative is computed for each point in the batch and because of boundary conditions the number of points is reduced by one.
func (*DerivativeNode) Alert ¶
func (n *DerivativeNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*DerivativeNode) Derivative ¶
func (n *DerivativeNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*DerivativeNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*DerivativeNode) GroupBy ¶
func (n *DerivativeNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*DerivativeNode) HttpOut ¶
func (n *DerivativeNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*DerivativeNode) InfluxDBOut ¶
func (n *DerivativeNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*DerivativeNode) MapReduce ¶
func (n *DerivativeNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*DerivativeNode) NonNegative ¶
func (d *DerivativeNode) NonNegative() *DerivativeNode
If called the derivative will skip negative results. tick:property
func (*DerivativeNode) Sample ¶
func (n *DerivativeNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*DerivativeNode) Where ¶
Create a new node that filters the data stream by a given expression.
func (*DerivativeNode) Window ¶
func (n *DerivativeNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type EdgeType ¶
type EdgeType int
The type of data that travels along an edge connecting two nodes in a Pipeline.
type EvalNode ¶
type EvalNode struct { // The name of the field that results from applying the expression. // tick:ignore AsList []string // tick:ignore Expressions []tick.Node // tick:ignore KeepFlag bool // List of fields to keep // if empty and KeepFlag is true // keep all fields. // tick:ignore KeepList []string // contains filtered or unexported fields }
Evaluates expressions on each data point it receives. A list of expressions may be provided and will be evaluated in the order they are given and results of previous expressions are made available to later expressions. See the property EvalNode.As for details on how to reference the results.
Example:
stream .eval(lambda: "error_count" / "total_count") .as('error_percent')
The above example will add a new field `error_percent` to each data point with the result of `error_count / total_count` where `error_count` and `total_count` are existing fields on the data point.
func (*EvalNode) Alert ¶
func (n *EvalNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*EvalNode) As ¶
List of names for each expression. The expressions are evaluated in order and the result of a previous expression will be available in later expressions via the name provided.
Example:
stream .eval(lambda: "value" * "value", lambda: 1.0 / "value2") .as('value2', 'inv_value2')
The above example calculates two fields from the value and names them `value2` and `inv_value2` respectively.
tick:property
func (*EvalNode) Derivative ¶
func (n *EvalNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*EvalNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*EvalNode) GroupBy ¶
func (n *EvalNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*EvalNode) HttpOut ¶
func (n *EvalNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*EvalNode) InfluxDBOut ¶
func (n *EvalNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*EvalNode) Keep ¶
If called the existing fields will be preserved in addition to the new fields being set. If not called then only new fields are preserved.
Optionally intermediate values can be discarded by passing a list of field names. Only fields in the list will be kept. If no list is given then all fields, new and old, are kept.
Example:
stream .eval(lambda: "value" * "value", lambda: 1.0 / "value2") .as('value2', 'inv_value2') .keep('value', 'inv_value2')
In the above example the original field `value` is preserved. In addition the new field `value2` is calculated and used in evaluating `inv_value2` but is discarded before the point is sent on to children nodes. The resulting point has only two fields `value` and `inv_value2`. tick:property
func (*EvalNode) MapReduce ¶
func (n *EvalNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*EvalNode) Sample ¶
func (n *EvalNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*EvalNode) Window ¶
func (n *EvalNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type GroupByNode ¶
type GroupByNode struct { //The dimensions by which to group to the data. // tick:ignore Dimensions []interface{} // contains filtered or unexported fields }
A GroupByNode will group the incoming data. Each group is then processed independently for the rest of the pipeline. Only tags that are dimensions in the grouping will be preserved; all other tags are dropped.
Example:
stream .groupBy('service', 'datacenter') ...
The above example groups the data along two dimensions `service` and `datacenter`. Groups are dynamically created as new data arrives and each group is processed independently.
func (*GroupByNode) Alert ¶
func (n *GroupByNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*GroupByNode) Derivative ¶
func (n *GroupByNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*GroupByNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*GroupByNode) GroupBy ¶
func (n *GroupByNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*GroupByNode) HttpOut ¶
func (n *GroupByNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*GroupByNode) InfluxDBOut ¶
func (n *GroupByNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*GroupByNode) MapReduce ¶
func (n *GroupByNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*GroupByNode) Sample ¶
func (n *GroupByNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*GroupByNode) Window ¶
func (n *GroupByNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type HTTPOutNode ¶
type HTTPOutNode struct { // The relative path where the cached data is exposed // tick:ignore Endpoint string // contains filtered or unexported fields }
An HTTPOutNode caches the most recent data for each group it has received.
The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
Example:
stream .window() .period(10s) .every(5s) .mapReduce(influxql.top('value', 10)) //Publish the top 10 results over the last 10s updated every 5s. .httpOut('top10')
type InfluxDBOutNode ¶
type InfluxDBOutNode struct { // The name of the database. Database string // The name of the retention policy. RetentionPolicy string // The name of the measurement. Measurement string // The write consistency to use when writing the data. WriteConsistency string // The precision to use when writing the data. Precision string // Static set of tags to add to all data points before writing them. //tick:ignore Tags map[string]string // contains filtered or unexported fields }
Writes the data to InfluxDB as it is received.
Example:
stream .eval(lambda: "errors" / "total") .as('error_percent') // Write the transformed data to InfluxDB .influxDBOut() .database('mydb') .retentionPolicy('myrp') .measurement('errors') .tag('kapacitor', 'true') .tag('version', '0.1')
func (*InfluxDBOutNode) Tag ¶
func (i *InfluxDBOutNode) Tag(key, value string) *InfluxDBOutNode
Add a static tag to all data points. Tag can be called more than once.
tick:property
type JoinNode ¶
type JoinNode struct { // The alias names of the two parents. // Note: // Names[1] corresponds to the left parent // Names[0] corresponds to the right parent // tick:ignore Names []string // The name of this new joined data stream. // If empty the name of the left parent is used. StreamName string // The maximum duration of time that two incoming points // can be apart and still be considered to be equal in time. // The joined data point's time will be rounded to the nearest // multiple of the tolerance duration. Tolerance time.Duration // Fill the data. // The fill option implies the type of join: inner or full outer // Options are: // // - none - (default) skip rows where a point is missing, inner join. // - null - fill missing points with null, full outer join. // - Any numerical value - fill fields with given value, full outer join. Fill interface{} // contains filtered or unexported fields }
Joins the data from any number of nodes. As each data point is received from a parent node it is paired with the next data points from the other parent nodes with a matching timestamp. Each parent node contributes at most one point to each joined point. A tolerance can be supplied to join points that do not have perfectly aligned timestamps. Any points that fall within the tolerance are joined on the timestamp. If multiple points fall within the same tolerance window than they are joined in the order they arrive.
Aliases are used to prefix all fields from the respective nodes.
The join can be an inner or outer join, see the JoinNode.Fill property.
Example:
var errors = stream .from().measurement('errors') var requests = stream .from().measurement('requests') // Join the errors and requests streams errors.join(requests) // Provide prefix names for the fields of the data points. .as('errors', 'requests') // points that are within 1 second are considered the same time. .tolerance(1s) // fill missing values with 0, implies outer join. .fill(0.0) // name the resulting stream .streamName('error_rate') // Both the "value" fields from each parent have been prefixed // with the respective names 'errors' and 'requests'. .eval(lambda: "errors.value" / "requests.value")) .as('rate') ...
In the above example the `errors` and `requests` streams are joined and then transformed to calculate a combined field.
func (*JoinNode) Alert ¶
func (n *JoinNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*JoinNode) As ¶
Prefix names for all fields from the respective nodes. Each field from the parent nodes will be prefixed with the provided name and a '.'. See the example above.
The names cannot have a dot '.' character.
tick:property
func (*JoinNode) Derivative ¶
func (n *JoinNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*JoinNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*JoinNode) GroupBy ¶
func (n *JoinNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*JoinNode) HttpOut ¶
func (n *JoinNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*JoinNode) InfluxDBOut ¶
func (n *JoinNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*JoinNode) MapReduce ¶
func (n *JoinNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*JoinNode) Sample ¶
func (n *JoinNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*JoinNode) Window ¶
func (n *JoinNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type MapNode ¶
type MapNode struct { // The map function // tick:ignore Map interface{} // contains filtered or unexported fields }
Performs a map operation on the data stream. In the map-reduce framework it is assumed that several different partitions of the data can be 'mapped' in parallel while only one 'reduce' operation will process all of the data stream.
Example:
stream .window() .period(10s) .every(10s) // Sum the values for each 10s window of data. .mapReduce(influxql.sum('value')) ...
func (*MapNode) Alert ¶
func (n *MapNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*MapNode) Derivative ¶
func (n *MapNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*MapNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*MapNode) GroupBy ¶
func (n *MapNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*MapNode) HttpOut ¶
func (n *MapNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*MapNode) InfluxDBOut ¶
func (n *MapNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*MapNode) MapReduce ¶
func (n *MapNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*MapNode) Sample ¶
func (n *MapNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*MapNode) Window ¶
func (n *MapNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type MapReduceInfo ¶
type MapReduceInfo struct { Map interface{} Reduce interface{} Edge EdgeType }
tick:ignore
type Node ¶
type Node interface { // List of parents of this node. Parents() []Node // List of children of this node. Children() []Node // Short description of the node does not need to be unique Desc() string // Friendly readable unique name of the node Name() string SetName(string) // Unique id for the node ID() ID // The type of input the node wants. Wants() EdgeType // The type of output the node provides. Provides() EdgeType // contains filtered or unexported methods }
Generic node in a pipeline
type Pipeline ¶
type Pipeline struct { Source Node // contains filtered or unexported fields }
A complete data processing pipeline. Starts with a single source. tick:ignore
func CreatePipeline ¶
Create a pipeline from a given script. tick:ignore
type ReduceNode ¶
type ReduceNode struct { //The reduce function // tick:ignore Reduce interface{} // Whether to use the max time or the // time of the selected point // tick:ignore PointTimes bool // The name of the field, defaults to the name of // MR function used (i.e. influxql.mean -> 'mean') As string // contains filtered or unexported fields }
Performs a reduce operation on the data stream. In the map-reduce framework it is assumed that several different partitions of the data can be 'mapped' in parallel while only one 'reduce' operation will process all of the data stream.
Example:
stream .window() .period(10s) .every(10s) // Sum the values for each 10s window of data. .mapReduce(influxql.sum('value')) ...
func (*ReduceNode) Alert ¶
func (n *ReduceNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*ReduceNode) Derivative ¶
func (n *ReduceNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*ReduceNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*ReduceNode) GroupBy ¶
func (n *ReduceNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*ReduceNode) HttpOut ¶
func (n *ReduceNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*ReduceNode) InfluxDBOut ¶
func (n *ReduceNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*ReduceNode) MapReduce ¶
func (n *ReduceNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*ReduceNode) Sample ¶
func (n *ReduceNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*ReduceNode) UsePointTimes ¶
func (r *ReduceNode) UsePointTimes() *ReduceNode
Use the time of the selected point instead of the time of the batch.
Only applies to selector MR functions like first, last, top, bottom, etc. Aggregation functions always use the batch time. tick:property
func (*ReduceNode) Window ¶
func (n *ReduceNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type SampleNode ¶
type SampleNode struct { // Keep every Count point or batch // tick:ignore Count int64 // Keep one point or batch every Duration // tick:ignore Duration time.Duration // contains filtered or unexported fields }
Sample points or batches. One point will be emitted every count or duration specified.
Example:
stream. .sample(3)
Keep every third data point or batch.
Example:
stream. .sample(10s)
Keep only samples that land on the 10s boundary. See StreamNode.Truncate, BatchNode.GroupBy time or WindowNode.Align for ensuring data is aligned with a boundary.
func (*SampleNode) Alert ¶
func (n *SampleNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*SampleNode) Derivative ¶
func (n *SampleNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*SampleNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*SampleNode) GroupBy ¶
func (n *SampleNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*SampleNode) HttpOut ¶
func (n *SampleNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*SampleNode) InfluxDBOut ¶
func (n *SampleNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*SampleNode) MapReduce ¶
func (n *SampleNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*SampleNode) Sample ¶
func (n *SampleNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*SampleNode) Window ¶
func (n *SampleNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type SourceBatchNode ¶
type SourceBatchNode struct {
// contains filtered or unexported fields
}
A node that handles creating several child BatchNodes. Each call to `query` creates a child batch node that can further be configured. See BatchNode The `batch` variable in batch tasks is an instance of a SourceBatchNode.
Example:
var errors = batch .query('SELECT value from errors') ... var views = batch .query('SELECT value from views') ...
func (*SourceBatchNode) Query ¶
func (b *SourceBatchNode) Query(q string) *BatchNode
The query to execute. Must not contain a time condition in the `WHERE` clause or contain a `GROUP BY` clause. The time conditions are added dynamically according to the period, offset and schedule. The `GROUP BY` clause is added dynamically according to the dimensions passed to the `groupBy` method.
type StreamNode ¶
type StreamNode struct { // An expression to filter the data stream. // tick:ignore Expression tick.Node // The database name. // If empty any database will be used. Database string // The retention policy name // If empty any retention policy will be used. RetentionPolicy string // The measurement name // If empty any measurement will be used. Measurement string // Optional duration for truncating timestamps. // Helpful to ensure data points land on specfic boundaries // Example: // stream // .from().measurement('mydata') // .truncate(1s) // // All incoming data will be truncated to 1 second resolution. Truncate time.Duration // contains filtered or unexported fields }
A StreamNode represents the source of data being streamed to Kapacitor via any of its inputs. The stream node allows you to select which portion of the stream you want to process. The `stream` variable in stream tasks is an instance of a StreamNode.
Example:
stream .from() .database('mydb') .retentionPolicy('myrp') .measurement('mymeasurement') .where(lambda: "host" =~ /logger\d+/) .window() ...
The above example selects only data points from the database `mydb` and retention policy `myrp` and measurement `mymeasurement` where the tag `host` matches the regex `logger\d+`
func (*StreamNode) Alert ¶
func (n *StreamNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*StreamNode) Derivative ¶
func (n *StreamNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*StreamNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*StreamNode) From ¶
func (s *StreamNode) From() *StreamNode
Creates a new stream node that can be further filtered using the Database, RetentionPolicy, Measurement and Where properties. From can be called multiple times to create multiple independent forks of the data stream.
Example:
// Select the 'cpu' measurement from just the database 'mydb' // and retention policy 'myrp'. var cpu = stream.from() .database('mydb') .retentionPolicy('myrp') .measurement('cpu') // Select the 'load' measurement from any database and retention policy. var load = stream.from() .measurement('load') // Join cpu and load streams and do further processing. cpu.join(load) .as('cpu', 'load') ...
func (*StreamNode) GroupBy ¶
func (n *StreamNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*StreamNode) HttpOut ¶
func (n *StreamNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*StreamNode) InfluxDBOut ¶
func (n *StreamNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*StreamNode) MapReduce ¶
func (n *StreamNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*StreamNode) Sample ¶
func (n *StreamNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*StreamNode) Where ¶
func (s *StreamNode) Where(expression tick.Node) *StreamNode
Filter the current stream using the given expression. This expression is a Kapacitor expression. Kapacitor expressions are a superset of InfluxQL WHERE expressions. See the `Expression` docs for more information.
If empty then all data points are considered to match. tick:property
func (*StreamNode) Window ¶
func (n *StreamNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type UnionNode ¶
type UnionNode struct { // The new name of the stream. // If empty the name of the left node // (i.e. `leftNode.union(otherNode1, otherNode2)`) is used. Rename string // contains filtered or unexported fields }
Takes the union of all of its parents. The union is just a simple pass through. Each data points received from each parent is passed onto children nodes without modification.
Example:
var logins = stream.from().measurement('logins') var logouts = stream.from().measurement('logouts') var frontpage = stream.from().measurement('frontpage') // Union all user actions into a single stream logins.union(logouts, frontpage) .rename('user_actions') ...
func (*UnionNode) Alert ¶
func (n *UnionNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*UnionNode) Derivative ¶
func (n *UnionNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*UnionNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*UnionNode) GroupBy ¶
func (n *UnionNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*UnionNode) HttpOut ¶
func (n *UnionNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*UnionNode) InfluxDBOut ¶
func (n *UnionNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*UnionNode) MapReduce ¶
func (n *UnionNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*UnionNode) Sample ¶
func (n *UnionNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*UnionNode) Window ¶
func (n *UnionNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type WhereNode ¶
type WhereNode struct { // The expression predicate. // tick:ignore Expression tick.Node // contains filtered or unexported fields }
The WhereNode filters the data stream by a given expression.
Example: var sums = stream
.groupBy('service', 'host') .mapReduce(influxdb.sum('value'))
//Watch particular host for issues. sums
.where(lambda: "host" == 'h001.example.com') .alert() .crit(lambda: TRUE) .email('user@example.com')
func (*WhereNode) Alert ¶
func (n *WhereNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*WhereNode) Derivative ¶
func (n *WhereNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*WhereNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*WhereNode) GroupBy ¶
func (n *WhereNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*WhereNode) HttpOut ¶
func (n *WhereNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*WhereNode) InfluxDBOut ¶
func (n *WhereNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*WhereNode) MapReduce ¶
func (n *WhereNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*WhereNode) Sample ¶
func (n *WhereNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*WhereNode) Window ¶
func (n *WhereNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type WindowNode ¶
type WindowNode struct { // The period, or length in time, of the window. Period time.Duration // How often the current window is emitted into the pipeline. Every time.Duration // Wether to align the window edges with the zero time // tick:ignore AlignFlag bool // contains filtered or unexported fields }
Windows data over time. A window has a length defined by `period` and a frequency at which it emits the window to the pipeline.
Example:
stream .window() .period(10m) .every(5m) .httpOut('recent')
The above windowing example emits a window to the pipeline every `5 minutes` and the window contains the last `10 minutes` worth of data. As a result each time the window is emitted it contains half new data and half old data.
NOTE: Time for a window (or any node) is implemented by inspecting the times on the incoming data points. As a result if the incoming data stream stops then no more windows will be emitted because time is no longer increasing for the window node.
func (*WindowNode) Alert ¶
func (n *WindowNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*WindowNode) Align ¶
func (w *WindowNode) Align() *WindowNode
Wether to align the window edges with the zero time. If not aligned the window starts and ends relative to the first data point it receives. tick:property
func (*WindowNode) Derivative ¶
func (n *WindowNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*WindowNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*WindowNode) GroupBy ¶
func (n *WindowNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*WindowNode) HttpOut ¶
func (n *WindowNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*WindowNode) InfluxDBOut ¶
func (n *WindowNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*WindowNode) MapReduce ¶
func (n *WindowNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*WindowNode) Sample ¶
func (n *WindowNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*WindowNode) Window ¶
func (n *WindowNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.