Documentation ¶
Overview ¶
Provides an API for constructing data processing pipelines.
The nodes defined in this package just define how nodes can be linked together not the actual implementation of the transformation functions.
Index ¶
- type AlertNode
- func (a *AlertNode) Alerta() *AlertaHandler
- func (n *AlertNode) Children() []Node
- func (n *AlertNode) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n *AlertNode) Desc() string
- func (a *AlertNode) Email(to ...string) *EmailHandler
- func (a *AlertNode) Exec(executable string, args ...string) *ExecHandler
- func (a *AlertNode) Flapping(low, high float64) *AlertNode
- func (a *AlertNode) HipChat() *HipChatHandler
- func (n *AlertNode) ID() ID
- func (a *AlertNode) Log(filepath string) *LogHandler
- func (n *AlertNode) Name() string
- func (a *AlertNode) OpsGenie() *OpsGenieHandler
- func (a *AlertNode) PagerDuty() *PagerDutyHandler
- func (n *AlertNode) Parents() []Node
- func (a *AlertNode) Post(url string) *PostHandler
- func (n *AlertNode) Provides() EdgeType
- func (n *AlertNode) SetName(name string)
- func (a *AlertNode) Slack() *SlackHandler
- func (a *AlertNode) StateChangesOnly() *AlertNode
- func (n *AlertNode) Stats(interval time.Duration) *StatsNode
- func (a *AlertNode) VictorOps() *VictorOpsHandler
- func (n *AlertNode) Wants() EdgeType
- type AlertaHandler
- func (n AlertaHandler) Children() []Node
- func (n AlertaHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n AlertaHandler) Desc() string
- func (n AlertaHandler) ID() ID
- func (n AlertaHandler) Name() string
- func (n AlertaHandler) Parents() []Node
- func (n AlertaHandler) Provides() EdgeType
- func (n AlertaHandler) SetName(name string)
- func (n AlertaHandler) Stats(interval time.Duration) *StatsNode
- func (n AlertaHandler) Wants() EdgeType
- type BatchNode
- func (n *BatchNode) Alert() *AlertNode
- func (n *BatchNode) Derivative(field string) *DerivativeNode
- func (n *BatchNode) Eval(expressions ...tick.Node) *EvalNode
- func (b *BatchNode) GroupBy(d ...interface{}) *BatchNode
- func (n *BatchNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *BatchNode) InfluxDBOut() *InfluxDBOutNode
- func (n *BatchNode) Join(others ...Node) *JoinNode
- func (n *BatchNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *BatchNode) Sample(rate interface{}) *SampleNode
- func (n *BatchNode) Union(node ...Node) *UnionNode
- func (n *BatchNode) Where(expression tick.Node) *WhereNode
- func (n *BatchNode) Window() *WindowNode
- type DeadmanService
- type DerivativeNode
- func (n *DerivativeNode) Alert() *AlertNode
- func (n *DerivativeNode) Derivative(field string) *DerivativeNode
- func (n *DerivativeNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *DerivativeNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *DerivativeNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *DerivativeNode) InfluxDBOut() *InfluxDBOutNode
- func (n *DerivativeNode) Join(others ...Node) *JoinNode
- func (n *DerivativeNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (d *DerivativeNode) NonNegative() *DerivativeNode
- func (n *DerivativeNode) Sample(rate interface{}) *SampleNode
- func (n *DerivativeNode) Union(node ...Node) *UnionNode
- func (n *DerivativeNode) Where(expression tick.Node) *WhereNode
- func (n *DerivativeNode) Window() *WindowNode
- type EdgeType
- type EmailHandler
- func (n EmailHandler) Children() []Node
- func (n EmailHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n EmailHandler) Desc() string
- func (n EmailHandler) ID() ID
- func (n EmailHandler) Name() string
- func (n EmailHandler) Parents() []Node
- func (n EmailHandler) Provides() EdgeType
- func (n EmailHandler) SetName(name string)
- func (n EmailHandler) Stats(interval time.Duration) *StatsNode
- func (n EmailHandler) Wants() EdgeType
- type EvalNode
- func (n *EvalNode) Alert() *AlertNode
- func (e *EvalNode) As(names ...string) *EvalNode
- func (n *EvalNode) Derivative(field string) *DerivativeNode
- func (n *EvalNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *EvalNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *EvalNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *EvalNode) InfluxDBOut() *InfluxDBOutNode
- func (n *EvalNode) Join(others ...Node) *JoinNode
- func (e *EvalNode) Keep(fields ...string) *EvalNode
- func (n *EvalNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *EvalNode) Sample(rate interface{}) *SampleNode
- func (n *EvalNode) Union(node ...Node) *UnionNode
- func (n *EvalNode) Where(expression tick.Node) *WhereNode
- func (n *EvalNode) Window() *WindowNode
- type ExecHandler
- func (n ExecHandler) Children() []Node
- func (n ExecHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n ExecHandler) Desc() string
- func (n ExecHandler) ID() ID
- func (n ExecHandler) Name() string
- func (n ExecHandler) Parents() []Node
- func (n ExecHandler) Provides() EdgeType
- func (n ExecHandler) SetName(name string)
- func (n ExecHandler) Stats(interval time.Duration) *StatsNode
- func (n ExecHandler) Wants() EdgeType
- type GroupByNode
- func (n *GroupByNode) Alert() *AlertNode
- func (n *GroupByNode) Derivative(field string) *DerivativeNode
- func (n *GroupByNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *GroupByNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *GroupByNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *GroupByNode) InfluxDBOut() *InfluxDBOutNode
- func (n *GroupByNode) Join(others ...Node) *JoinNode
- func (n *GroupByNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *GroupByNode) Sample(rate interface{}) *SampleNode
- func (n *GroupByNode) Union(node ...Node) *UnionNode
- func (n *GroupByNode) Where(expression tick.Node) *WhereNode
- func (n *GroupByNode) Window() *WindowNode
- type HTTPOutNode
- func (n *HTTPOutNode) Children() []Node
- func (n *HTTPOutNode) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n *HTTPOutNode) Desc() string
- func (n *HTTPOutNode) ID() ID
- func (n *HTTPOutNode) Name() string
- func (n *HTTPOutNode) Parents() []Node
- func (n *HTTPOutNode) Provides() EdgeType
- func (n *HTTPOutNode) SetName(name string)
- func (n *HTTPOutNode) Stats(interval time.Duration) *StatsNode
- func (n *HTTPOutNode) Wants() EdgeType
- type HipChatHandler
- func (n HipChatHandler) Children() []Node
- func (n HipChatHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n HipChatHandler) Desc() string
- func (n HipChatHandler) ID() ID
- func (n HipChatHandler) Name() string
- func (n HipChatHandler) Parents() []Node
- func (n HipChatHandler) Provides() EdgeType
- func (n HipChatHandler) SetName(name string)
- func (n HipChatHandler) Stats(interval time.Duration) *StatsNode
- func (n HipChatHandler) Wants() EdgeType
- type ID
- type InfluxDBOutNode
- func (n *InfluxDBOutNode) Children() []Node
- func (n *InfluxDBOutNode) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n *InfluxDBOutNode) Desc() string
- func (n *InfluxDBOutNode) ID() ID
- func (n *InfluxDBOutNode) Name() string
- func (n *InfluxDBOutNode) Parents() []Node
- func (n *InfluxDBOutNode) Provides() EdgeType
- func (n *InfluxDBOutNode) SetName(name string)
- func (n *InfluxDBOutNode) Stats(interval time.Duration) *StatsNode
- func (i *InfluxDBOutNode) Tag(key, value string) *InfluxDBOutNode
- func (n *InfluxDBOutNode) Wants() EdgeType
- type JoinNode
- func (n *JoinNode) Alert() *AlertNode
- func (j *JoinNode) As(names ...string) *JoinNode
- func (n *JoinNode) Derivative(field string) *DerivativeNode
- func (n *JoinNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *JoinNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *JoinNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *JoinNode) InfluxDBOut() *InfluxDBOutNode
- func (n *JoinNode) Join(others ...Node) *JoinNode
- func (n *JoinNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *JoinNode) Sample(rate interface{}) *SampleNode
- func (n *JoinNode) Union(node ...Node) *UnionNode
- func (n *JoinNode) Where(expression tick.Node) *WhereNode
- func (n *JoinNode) Window() *WindowNode
- type LogHandler
- func (n LogHandler) Children() []Node
- func (n LogHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n LogHandler) Desc() string
- func (n LogHandler) ID() ID
- func (n LogHandler) Name() string
- func (n LogHandler) Parents() []Node
- func (n LogHandler) Provides() EdgeType
- func (n LogHandler) SetName(name string)
- func (n LogHandler) Stats(interval time.Duration) *StatsNode
- func (n LogHandler) Wants() EdgeType
- type MapNode
- func (n *MapNode) Alert() *AlertNode
- func (n *MapNode) Derivative(field string) *DerivativeNode
- func (n *MapNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *MapNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *MapNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *MapNode) InfluxDBOut() *InfluxDBOutNode
- func (n *MapNode) Join(others ...Node) *JoinNode
- func (n *MapNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *MapNode) Sample(rate interface{}) *SampleNode
- func (n *MapNode) Union(node ...Node) *UnionNode
- func (n *MapNode) Where(expression tick.Node) *WhereNode
- func (n *MapNode) Window() *WindowNode
- type MapReduceInfo
- type Node
- type OpsGenieHandler
- func (n OpsGenieHandler) Children() []Node
- func (n OpsGenieHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n OpsGenieHandler) Desc() string
- func (n OpsGenieHandler) ID() ID
- func (n OpsGenieHandler) Name() string
- func (n OpsGenieHandler) Parents() []Node
- func (n OpsGenieHandler) Provides() EdgeType
- func (og *OpsGenieHandler) Recipients(recipients ...string) *OpsGenieHandler
- func (n OpsGenieHandler) SetName(name string)
- func (n OpsGenieHandler) Stats(interval time.Duration) *StatsNode
- func (og *OpsGenieHandler) Teams(teams ...string) *OpsGenieHandler
- func (n OpsGenieHandler) Wants() EdgeType
- type PagerDutyHandler
- func (n PagerDutyHandler) Children() []Node
- func (n PagerDutyHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n PagerDutyHandler) Desc() string
- func (n PagerDutyHandler) ID() ID
- func (n PagerDutyHandler) Name() string
- func (n PagerDutyHandler) Parents() []Node
- func (n PagerDutyHandler) Provides() EdgeType
- func (n PagerDutyHandler) SetName(name string)
- func (n PagerDutyHandler) Stats(interval time.Duration) *StatsNode
- func (n PagerDutyHandler) Wants() EdgeType
- type Pipeline
- type PostHandler
- func (n PostHandler) Children() []Node
- func (n PostHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n PostHandler) Desc() string
- func (n PostHandler) ID() ID
- func (n PostHandler) Name() string
- func (n PostHandler) Parents() []Node
- func (n PostHandler) Provides() EdgeType
- func (n PostHandler) SetName(name string)
- func (n PostHandler) Stats(interval time.Duration) *StatsNode
- func (n PostHandler) Wants() EdgeType
- type ReduceNode
- func (n *ReduceNode) Alert() *AlertNode
- func (n *ReduceNode) Derivative(field string) *DerivativeNode
- func (n *ReduceNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *ReduceNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *ReduceNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *ReduceNode) InfluxDBOut() *InfluxDBOutNode
- func (n *ReduceNode) Join(others ...Node) *JoinNode
- func (n *ReduceNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *ReduceNode) Sample(rate interface{}) *SampleNode
- func (n *ReduceNode) Union(node ...Node) *UnionNode
- func (r *ReduceNode) UsePointTimes() *ReduceNode
- func (n *ReduceNode) Where(expression tick.Node) *WhereNode
- func (n *ReduceNode) Window() *WindowNode
- type SampleNode
- func (n *SampleNode) Alert() *AlertNode
- func (n *SampleNode) Derivative(field string) *DerivativeNode
- func (n *SampleNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *SampleNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *SampleNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *SampleNode) InfluxDBOut() *InfluxDBOutNode
- func (n *SampleNode) Join(others ...Node) *JoinNode
- func (n *SampleNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *SampleNode) Sample(rate interface{}) *SampleNode
- func (n *SampleNode) Union(node ...Node) *UnionNode
- func (n *SampleNode) Where(expression tick.Node) *WhereNode
- func (n *SampleNode) Window() *WindowNode
- type SlackHandler
- func (n SlackHandler) Children() []Node
- func (n SlackHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n SlackHandler) Desc() string
- func (n SlackHandler) ID() ID
- func (n SlackHandler) Name() string
- func (n SlackHandler) Parents() []Node
- func (n SlackHandler) Provides() EdgeType
- func (n SlackHandler) SetName(name string)
- func (n SlackHandler) Stats(interval time.Duration) *StatsNode
- func (n SlackHandler) Wants() EdgeType
- type SourceBatchNode
- func (n *SourceBatchNode) Children() []Node
- func (n *SourceBatchNode) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n *SourceBatchNode) Desc() string
- func (n *SourceBatchNode) ID() ID
- func (n *SourceBatchNode) Name() string
- func (n *SourceBatchNode) Parents() []Node
- func (n *SourceBatchNode) Provides() EdgeType
- func (b *SourceBatchNode) Query(q string) *BatchNode
- func (n *SourceBatchNode) SetName(name string)
- func (n *SourceBatchNode) Stats(interval time.Duration) *StatsNode
- func (n *SourceBatchNode) Wants() EdgeType
- type StatsNode
- func (n *StatsNode) Alert() *AlertNode
- func (n *StatsNode) Derivative(field string) *DerivativeNode
- func (n *StatsNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *StatsNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *StatsNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *StatsNode) InfluxDBOut() *InfluxDBOutNode
- func (n *StatsNode) Join(others ...Node) *JoinNode
- func (n *StatsNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *StatsNode) Sample(rate interface{}) *SampleNode
- func (n *StatsNode) Union(node ...Node) *UnionNode
- func (n *StatsNode) Where(expression tick.Node) *WhereNode
- func (n *StatsNode) Window() *WindowNode
- type StreamNode
- func (n *StreamNode) Alert() *AlertNode
- func (n *StreamNode) Derivative(field string) *DerivativeNode
- func (n *StreamNode) Eval(expressions ...tick.Node) *EvalNode
- func (s *StreamNode) From() *StreamNode
- func (s *StreamNode) GroupBy(tag ...interface{}) *StreamNode
- func (n *StreamNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *StreamNode) InfluxDBOut() *InfluxDBOutNode
- func (n *StreamNode) Join(others ...Node) *JoinNode
- func (n *StreamNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *StreamNode) Sample(rate interface{}) *SampleNode
- func (n *StreamNode) Union(node ...Node) *UnionNode
- func (s *StreamNode) Where(expression tick.Node) *StreamNode
- func (n *StreamNode) Window() *WindowNode
- type UDFNode
- func (n *UDFNode) Alert() *AlertNode
- func (u *UDFNode) CallMethod(name string, args ...interface{}) (interface{}, error)
- func (n *UDFNode) Derivative(field string) *DerivativeNode
- func (u *UDFNode) Desc() string
- func (n *UDFNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *UDFNode) GroupBy(tag ...interface{}) *GroupByNode
- func (u *UDFNode) HasMethod(name string) bool
- func (u *UDFNode) HasProperty(name string) bool
- func (n *UDFNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *UDFNode) InfluxDBOut() *InfluxDBOutNode
- func (n *UDFNode) Join(others ...Node) *JoinNode
- func (n *UDFNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (u *UDFNode) Property(name string) interface{}
- func (n *UDFNode) Sample(rate interface{}) *SampleNode
- func (u *UDFNode) SetProperty(name string, value interface{}) error
- func (n *UDFNode) Union(node ...Node) *UnionNode
- func (n *UDFNode) Where(expression tick.Node) *WhereNode
- func (n *UDFNode) Window() *WindowNode
- type UnionNode
- func (n *UnionNode) Alert() *AlertNode
- func (n *UnionNode) Derivative(field string) *DerivativeNode
- func (n *UnionNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *UnionNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *UnionNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *UnionNode) InfluxDBOut() *InfluxDBOutNode
- func (n *UnionNode) Join(others ...Node) *JoinNode
- func (n *UnionNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *UnionNode) Sample(rate interface{}) *SampleNode
- func (n *UnionNode) Union(node ...Node) *UnionNode
- func (n *UnionNode) Where(expression tick.Node) *WhereNode
- func (n *UnionNode) Window() *WindowNode
- type VictorOpsHandler
- func (n VictorOpsHandler) Children() []Node
- func (n VictorOpsHandler) Deadman(threshold float64, interval time.Duration) *AlertNode
- func (n VictorOpsHandler) Desc() string
- func (n VictorOpsHandler) ID() ID
- func (n VictorOpsHandler) Name() string
- func (n VictorOpsHandler) Parents() []Node
- func (n VictorOpsHandler) Provides() EdgeType
- func (n VictorOpsHandler) SetName(name string)
- func (n VictorOpsHandler) Stats(interval time.Duration) *StatsNode
- func (n VictorOpsHandler) Wants() EdgeType
- type WhereNode
- func (n *WhereNode) Alert() *AlertNode
- func (n *WhereNode) Derivative(field string) *DerivativeNode
- func (n *WhereNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *WhereNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *WhereNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *WhereNode) InfluxDBOut() *InfluxDBOutNode
- func (n *WhereNode) Join(others ...Node) *JoinNode
- func (n *WhereNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *WhereNode) Sample(rate interface{}) *SampleNode
- func (n *WhereNode) Union(node ...Node) *UnionNode
- func (n *WhereNode) Where(expression tick.Node) *WhereNode
- func (n *WhereNode) Window() *WindowNode
- type WindowNode
- func (n *WindowNode) Alert() *AlertNode
- func (w *WindowNode) Align() *WindowNode
- func (n *WindowNode) Derivative(field string) *DerivativeNode
- func (n *WindowNode) Eval(expressions ...tick.Node) *EvalNode
- func (n *WindowNode) GroupBy(tag ...interface{}) *GroupByNode
- func (n *WindowNode) HttpOut(endpoint string) *HTTPOutNode
- func (n *WindowNode) InfluxDBOut() *InfluxDBOutNode
- func (n *WindowNode) Join(others ...Node) *JoinNode
- func (n *WindowNode) MapReduce(mr MapReduceInfo) *ReduceNode
- func (n *WindowNode) Sample(rate interface{}) *SampleNode
- func (n *WindowNode) Union(node ...Node) *UnionNode
- func (n *WindowNode) Where(expression tick.Node) *WhereNode
- func (n *WindowNode) Window() *WindowNode
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AlertNode ¶
type AlertNode struct { // Template for constructing a unique ID for a given alert. // // Available template data: // // * Name -- Measurement name. // * TaskName -- The name of the task // * Group -- Concatenation of all group-by tags of the form [key=value,]+. // If no groupBy is performed equal to literal 'nil'. // * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value. // // Example: // stream.from().measurement('cpu') // .groupBy('cpu') // .alert() // .id('kapacitor/{{ .Name }}/{{ .Group }}') // // ID: kapacitor/cpu/cpu=cpu0, // // Example: // stream... // .groupBy('service') // .alert() // .id('kapacitor/{{ index .Tags "service" }}') // // ID: kapacitor/authentication // // Example: // stream... // .groupBy('service', 'host') // .alert() // .id('kapacitor/{{ index .Tags "service" }}/{{ index .Tags "host" }}') // // ID: kapacitor/authentication/auth001.example.com // // Default: {{ .Name }}:{{ .Group }} Id string // Template for constructing a meaningful message for the alert. // // Available template data: // // * ID -- The ID of the alert. // * Name -- Measurement name. // * TaskName -- The name of the task // * Group -- Concatenation of all group-by tags of the form [key=value,]+. // If no groupBy is performed equal to literal 'nil'. // * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value. // * Level -- Alert Level, one of: INFO, WARNING, CRITICAL. // * Fields -- Map of fields. Use '{{ index .Fields "key" }}' to get a specific field value. // // Example: // stream... // .groupBy('service', 'host') // .alert() // .id('{{ index .Tags "service" }}/{{ index .Tags "host" }}') // .message('{{ .ID }} is {{ .Level}} value: {{ index .Fields "value" }}') // // Message: authentication/auth001.example.com is CRITICAL value:42 // // Default: {{ .ID }} is {{ .Level }} Message string // Filter expression for the INFO alert level. // An empty value indicates the level is invalid and is skipped. Info tick.Node // Filter expression for the WARNING alert level. // An empty value indicates the level is invalid and is skipped. Warn tick.Node // Filter expression for the CRITICAL alert level. // An empty value indicates the level is invalid and is skipped. Crit tick.Node //tick:ignore UseFlapping bool //tick:ignore FlapLow float64 //tick:ignore FlapHigh float64 // Number of previous states to remember when computing flapping levels and // checking for state changes. // Minimum value is 2 in order to keep track of current and previous states. // // Default: 21 History int64 // Send alerts only on state changes. // tick:ignore IsStateChangesOnly bool // Post the JSON alert data to the specified URL. // tick:ignore PostHandlers []*PostHandler // Email handlers // tick:ignore EmailHandlers []*EmailHandler // A commands to run when an alert triggers // tick:ignore ExecHandlers []*ExecHandler // Log JSON alert data to file. One event per line. // tick:ignore LogHandlers []*LogHandler // Send alert to VictorOps. // tick:ignore VictorOpsHandlers []*VictorOpsHandler // Send alert to PagerDuty. // tick:ignore PagerDutyHandlers []*PagerDutyHandler // Send alert to Slack. // tick:ignore SlackHandlers []*SlackHandler // Send alert to HipChat. // tick:ignore HipChatHandlers []*HipChatHandler // Send alert to Alerta. // tick:ignore AlertaHandlers []*AlertaHandler // Send alert to OpsGenie // tick:ignore OpsGenieHandlers []*OpsGenieHandler // contains filtered or unexported fields }
An AlertNode can trigger an event of varying severity levels, and pass the event to alert handlers. The criteria for triggering an alert is specified via a [lambda expression](/kapacitor/v0.2/tick/expr/). See AlertNode.Info, AlertNode.Warn, and AlertNode.Crit below.
Different event handlers can be configured for each AlertNode. Some handlers like Email, HipChat, Slack, OpsGenie, VictorOps and PagerDuty have a configuration option 'global' that indicates that all alerts implicitly use the handler.
Available event handlers:
- log -- log alert data to file.
- post -- HTTP POST data to a specified URL.
- email -- Send and email with alert data.
- exec -- Execute a command passing alert data over STDIN.
- HipChat -- Post alert message to HipChat room.
- Alerta -- Post alert message to Alerta.
- Slack -- Post alert message to Slack channel.
- OpsGenie -- Send alert to OpsGenie.
- VictorOps -- Send alert to VictorOps.
- PagerDuty -- Send alert to PagerDuty.
See below for more details on configuring each handler.
Each event that gets sent to a handler contains the following alert data:
- ID -- the ID of the alert, user defined.
- Message -- the alert message, user defined.
- Time -- the time the alert occurred.
- Level -- one of OK, INFO, WARNING or CRITICAL.
- Data -- influxql.Result containing the data that triggered the alert.
Events are sent to handlers if the alert is in a state other than 'OK' or the alert just changed to the 'OK' state from a non 'OK' state (a.k.a. the alert recovered). Using the AlertNode.StateChangesOnly property events will only be sent to handlers if the alert changed state.
It is valid to configure multiple alert handlers, even with the same type.
Example:
stream .groupBy('service') .alert() .id('kapacitor/{{ index .Tags "service" }}') .message('{{ .ID }} is {{ .Level }} value:{{ index .Fields "value" }}') .info(lambda: "value" > 10) .warn(lambda: "value" > 20) .crit(lambda: "value" > 30) .post("http://example.com/api/alert") .post("http://another.example.com/api/alert") .email('oncall@example.com')
It is assumed that each successive level filters a subset of the previous level. As a result, the filter will only be applied if a data point passed the previous level. In the above example, if value = 15 then the INFO and WARNING expressions would be evaluated, but not the CRITICAL expression. Each expression maintains its own state.
func (*AlertNode) Alerta ¶ added in v0.10.0
func (a *AlertNode) Alerta() *AlertaHandler
Send the alert to Alerta.
Example:
[alerta] enabled = true url = "https://alerta.yourdomain" token = "9hiWoDOZ9IbmHsOTeST123ABciWTIqXQVFDo63h9" environment = "Production" origin = "Kapacitor"
In order to not post a message every alert interval use AlertNode.StateChangesOnly so that only events where the alert changed state are sent to Alerta.
Send alerts to Alerta. The resource and event properties are required.
Example:
stream... .alert() .alerta() .resource('Hostname or service') .event('Something went wrong')
Alerta also accepts optional alert information.
Example:
stream... .alert() .alerta() .resource('Hostname or service') .event('Something went wrong') .environment('Development') .group('Dev. Servers')
NOTE: Alerta cannot be configured globally because of its required properties. tick:property
func (*AlertNode) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (*AlertNode) Email ¶
func (a *AlertNode) Email(to ...string) *EmailHandler
Email the alert data.
If the To list is empty, the To addresses from the configuration are used. The email subject is the AlertNode.Message property. The email body is the JSON alert data.
If the 'smtp' section in the configuration has the option: global = true then all alerts are sent via email without the need to explicitly state it in the TICKscript.
Example:
[smtp] enabled = true host = "localhost" port = 25 username = "" password = "" from = "kapacitor@example.com" to = ["oncall@example.com"] # Set global to true so all alert trigger emails. global = true
Example:
stream... .alert()
Send email to 'oncall@example.com' from 'kapacitor@example.com'
**NOTE**: The global option for email also implies stateChangesOnly is set on all alerts. tick:property
func (*AlertNode) Exec ¶
func (a *AlertNode) Exec(executable string, args ...string) *ExecHandler
Execute a command whenever an alert is triggered and pass the alert data over STDIN in JSON format. tick:property
func (*AlertNode) Flapping ¶
Perform flap detection on the alerts. The method used is similar method to Nagios: https://assets.nagios.com/downloads/nagioscore/docs/nagioscore/3/en/flapping.html
Each different alerting level is considered a different state. The low and high thresholds are inverted thresholds of a percentage of state changes. Meaning that if the percentage of state changes goes above the `high` threshold, the alert enters a flapping state. The alert remains in the flapping state until the percentage of state changes goes below the `low` threshold. Typical values are low: 0.25 and high: 0.5. The percentage values represent the number state changes over the total possible number of state changes. A percentage change of 0.5 means that the alert changed state in half of the recorded history, and remained the same in the other half of the history. tick:property
func (*AlertNode) HipChat ¶ added in v0.2.4
func (a *AlertNode) HipChat() *HipChatHandler
If the 'hipchat' section in the configuration has the option: global = true then all alerts are sent to HipChat without the need to explicitly state it in the TICKscript.
Example:
[hipchat] enabled = true url = "https://orgname.hipchat.com/v2/room" room = "Test Room" token = "9hiWoDOZ9IbmHsOTeST123ABciWTIqXQVFDo63h9" global = true
Example:
stream... .alert()
Send alert to HipChat using default room 'Test Room'. **NOTE**: The global option for HipChat also implies stateChangesOnly is set on all alerts. Also, the room can either be the room id (numerical) or the room name. tick:property
func (*AlertNode) Log ¶
func (a *AlertNode) Log(filepath string) *LogHandler
Log JSON alert data to file. One event per line. Must specify the absolute path to the log file. It will be created if it does not exist. tick:property
func (*AlertNode) OpsGenie ¶ added in v0.2.4
func (a *AlertNode) OpsGenie() *OpsGenieHandler
Send alert to OpsGenie. To use OpsGenie alerting you must first enable the 'Alert Ingestion API' in the 'Integrations' section of OpsGenie. Then place the API key from the URL into the 'opsgenie' section of the Kapacitor configuration.
Example:
[opsgenie] enabled = true api-key = "xxxxx" teams = ["everyone"] recipients = ["jim", "bob"]
With the correct configuration you can now use OpsGenie in TICKscripts.
Example:
stream... .alert() .opsGenie()
Send alerts to OpsGenie using the teams and recipients in the configuration file.
Example:
stream... .alert() .opsGenie() .teams('team_rocket','team_test')
Send alerts to OpsGenie with team set to 'team_rocket' and 'team_test'
If the 'opsgenie' section in the configuration has the option: global = true then all alerts are sent to OpsGenie without the need to explicitly state it in the TICKscript.
Example:
[opsgenie] enabled = true api-key = "xxxxx" recipients = ["johndoe"] global = true
Example:
stream... .alert()
Send alert to OpsGenie using the default recipients, found in the configuration. tick:property
func (*AlertNode) PagerDuty ¶
func (a *AlertNode) PagerDuty() *PagerDutyHandler
Send the alert to PagerDuty. To use PagerDuty alerting you must first follow the steps to enable a new 'Generic API' service.
From https://developer.pagerduty.com/documentation/integration/events
- In your account, under the Services tab, click "Add New Service".
- Enter a name for the service and select an escalation policy. Then, select "Generic API" for the Service Type.
- Click the "Add Service" button.
- Once the service is created, you'll be taken to the service page. On this page, you'll see the "Service key", which is needed to access the API
Place the 'service key' into the 'pagerduty' section of the Kapacitor configuration as the option 'service-key'.
Example:
[pagerduty] enabled = true service-key = "xxxxxxxxx"
With the correct configuration you can now use PagerDuty in TICKscripts.
Example:
stream... .alert() .pagerDuty()
If the 'pagerduty' section in the configuration has the option: global = true then all alerts are sent to PagerDuty without the need to explicitly state it in the TICKscript.
Example:
[pagerduty] enabled = true service-key = "xxxxxxxxx" global = true
Example:
stream... .alert()
Send alert to PagerDuty. tick:property
func (*AlertNode) Post ¶
func (a *AlertNode) Post(url string) *PostHandler
HTTP POST JSON alert data to a specified URL. tick:property
func (*AlertNode) Slack ¶
func (a *AlertNode) Slack() *SlackHandler
Send the alert to Slack. To allow Kapacitor to post to Slack, go to the URL https://slack.com/services/new/incoming-webhook and create a new incoming webhook and place the generated URL in the 'slack' configuration section.
Example:
[slack] enabled = true url = "https://hooks.slack.com/services/xxxxxxxxx/xxxxxxxxx/xxxxxxxxxxxxxxxxxxxxxxxx" channel = "#general"
In order to not post a message every alert interval use AlertNode.StateChangesOnly so that only events where the alert changed state are posted to the channel.
Example:
stream... .alert() .slack()
Send alerts to Slack channel in the configuration file.
Example:
stream... .alert() .slack() .channel('#alerts')
Send alerts to Slack channel '#alerts'
Example:
stream... .alert() .slack() .channel('@jsmith')
Send alert to user '@jsmith'
If the 'slack' section in the configuration has the option: global = true then all alerts are sent to Slack without the need to explicitly state it in the TICKscript.
Example:
[slack] enabled = true url = "https://hooks.slack.com/services/xxxxxxxxx/xxxxxxxxx/xxxxxxxxxxxxxxxxxxxxxxxx" channel = "#general" global = true
Example:
stream... .alert()
Send alert to Slack using default channel '#general'. **NOTE**: The global option for Slack also implies stateChangesOnly is set on all alerts. tick:property
func (*AlertNode) StateChangesOnly ¶
Only sends events where the state changed. Each different alert level OK, INFO, WARNING, and CRITICAL are considered different states.
Example:
stream... .window() .period(10s) .every(10s) .alert() .crit(lambda: "value" > 10) .stateChangesOnly() .slack()
If the "value" is greater than 10 for a total of 60s, then only two events will be sent. First, when the value crosses the threshold, and second, when it falls back into an OK state. Without stateChangesOnly, the alert would have triggered 7 times: 6 times for each 10s period where the condition was met and once more for the recovery.
tick:property
func (*AlertNode) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
func (*AlertNode) VictorOps ¶
func (a *AlertNode) VictorOps() *VictorOpsHandler
Send alert to VictorOps. To use VictorOps alerting you must first enable the 'Alert Ingestion API' in the 'Integrations' section of VictorOps. Then place the API key from the URL into the 'victorops' section of the Kapacitor configuration.
Example:
[victorops] enabled = true api-key = "xxxxx" routing-key = "everyone"
With the correct configuration you can now use VictorOps in TICKscripts.
Example:
stream... .alert() .victorOps()
Send alerts to VictorOps using the routing key in the configuration file.
Example:
stream... .alert() .victorOps() .routingKey('team_rocket')
Send alerts to VictorOps with routing key 'team_rocket'
If the 'victorops' section in the configuration has the option: global = true then all alerts are sent to VictorOps without the need to explicitly state it in the TICKscript.
Example:
[victorops] enabled = true api-key = "xxxxx" routing-key = "everyone" global = true
Example:
stream... .alert()
Send alert to VictorOps using the default routing key, found in the configuration. tick:property
type AlertaHandler ¶ added in v0.10.0
type AlertaHandler struct { *AlertNode // Alerta authentication token. // If empty uses the token from the configuration. Token string // Alerta resource. // This is a required field. Resource string // Alerta event. // This is a required field. Event string // Alerta environment. // If empty uses the environment from the configuration. Environment string // Alerta group. Group string // Alerta value. Value string // Alerta origin. // If empty uses the origin from the configuration. Origin string }
tick:embedded:AlertNode.Alerta
func (AlertaHandler) Children ¶ added in v0.10.0
func (n AlertaHandler) Children() []Node
tick:ignore
func (AlertaHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (AlertaHandler) Provides ¶ added in v0.10.0
func (n AlertaHandler) Provides() EdgeType
tick:ignore
func (AlertaHandler) SetName ¶ added in v0.10.0
func (n AlertaHandler) SetName(name string)
tick:ignore
func (AlertaHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type BatchNode ¶
type BatchNode struct { // The query text //tick:ignore QueryStr string // The period or length of time that will be queried from InfluxDB Period time.Duration // How often to query InfluxDB. // // The Every property is mutually exclusive with the Cron property. Every time.Duration // Define a schedule using a cron syntax. // // The specific cron implementation is documented here: // https://github.com/gorhill/cronexpr#implementation // // The Cron property is mutually exclusive with the Every property. Cron string // How far back in time to query from the current time // // For example an Offest of 2 hours and an Every of 5m, // Kapacitor will query InfluxDB every 5 minutes for the window of data 2 hours ago. // // This applies to Cron schedules as well. If the cron specifies to run every Sunday at // 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried. Offset time.Duration // The list of dimensions for the group-by clause. //tick:ignore Dimensions []interface{} // Fill the data. // Options are: // // - Any numerical value // - null - exhibits the same behavior as the default // - previous - reports the value of the previous window // - none - suppresses timestamps and values where the value is null Fill interface{} // contains filtered or unexported fields }
A BatchNode defines a source and a schedule for processing batch data. The data is queried from an InfluxDB database and then passed into the data pipeline.
Example: batch
.query(''' SELECT mean("value") FROM "telegraf"."default".cpu_usage_idle WHERE "host" = 'serverA' ''') .period(1m) .every(20s) .groupBy(time(10s), 'cpu') ...
In the above example InfluxDB is queried every 20 seconds; the window of time returned spans 1 minute and is grouped into 10 second buckets.
func (*BatchNode) Alert ¶
func (n *BatchNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*BatchNode) Derivative ¶
func (n *BatchNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*BatchNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*BatchNode) GroupBy ¶
Group the data by a set of dimensions. Can specify one time dimension.
This property adds a `GROUP BY` clause to the query so all the normal behaviors when quering InfluxDB with a `GROUP BY` apply. More details: https://influxdb.com/docs/v0.9/query_language/data_exploration.html#the-group-by-clause
Example:
batch .groupBy(time(10s), 'tag1', 'tag2'))
tick:property
func (*BatchNode) HttpOut ¶
func (n *BatchNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*BatchNode) InfluxDBOut ¶
func (n *BatchNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*BatchNode) MapReduce ¶
func (n *BatchNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*BatchNode) Sample ¶
func (n *BatchNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*BatchNode) Window ¶
func (n *BatchNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type DeadmanService ¶ added in v0.10.0
type DeadmanService interface { Interval() time.Duration Threshold() float64 Id() string Message() string Global() bool }
Information relavant to configuring a deadman's swith
type DerivativeNode ¶
type DerivativeNode struct { // The field to use when calculating the derivative // tick:ignore Field string // The new name of the derivative field. // Default is the name of the field used // when calculating the derivative. As string // The time unit of the resulting derivative value. // Default: 1s Unit time.Duration // Where negative values are acceptable. // tick:ignore NonNegativeFlag bool // contains filtered or unexported fields }
Compute the derivative of a stream or batch. The derivative is computed on a single field and behaves similarly to the InfluxQL derivative function. Deriviative is not a MapReduce function and as a result is not part of the normal influxql functions.
Example:
stream .from().measurement('net_rx_packets') .derivative('value') .unit(1s) // default .nonNegative() ...
Computes the derivative via:
(current - previous ) / ( time_difference / unit)
For batch edges the derivative is computed for each point in the batch and because of boundary conditions the number of points is reduced by one.
func (*DerivativeNode) Alert ¶
func (n *DerivativeNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*DerivativeNode) Derivative ¶
func (n *DerivativeNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*DerivativeNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*DerivativeNode) GroupBy ¶
func (n *DerivativeNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*DerivativeNode) HttpOut ¶
func (n *DerivativeNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*DerivativeNode) InfluxDBOut ¶
func (n *DerivativeNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*DerivativeNode) MapReduce ¶
func (n *DerivativeNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*DerivativeNode) NonNegative ¶
func (d *DerivativeNode) NonNegative() *DerivativeNode
If called the derivative will skip negative results. tick:property
func (*DerivativeNode) Sample ¶
func (n *DerivativeNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*DerivativeNode) Where ¶
Create a new node that filters the data stream by a given expression.
func (*DerivativeNode) Window ¶
func (n *DerivativeNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type EdgeType ¶
type EdgeType int
The type of data that travels along an edge connecting two nodes in a Pipeline.
type EmailHandler ¶ added in v0.2.4
Email AlertHandler tick:embedded:AlertNode.Email
func (EmailHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (EmailHandler) Provides ¶ added in v0.2.4
func (n EmailHandler) Provides() EdgeType
tick:ignore
func (EmailHandler) SetName ¶ added in v0.2.4
func (n EmailHandler) SetName(name string)
tick:ignore
func (EmailHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type EvalNode ¶
type EvalNode struct { // The name of the field that results from applying the expression. // tick:ignore AsList []string // tick:ignore Expressions []tick.Node // tick:ignore KeepFlag bool // List of fields to keep // if empty and KeepFlag is true // keep all fields. // tick:ignore KeepList []string // contains filtered or unexported fields }
Evaluates expressions on each data point it receives. A list of expressions may be provided and will be evaluated in the order they are given and results of previous expressions are made available to later expressions. See the property EvalNode.As for details on how to reference the results.
Example:
stream .eval(lambda: "error_count" / "total_count") .as('error_percent')
The above example will add a new field `error_percent` to each data point with the result of `error_count / total_count` where `error_count` and `total_count` are existing fields on the data point.
func (*EvalNode) Alert ¶
func (n *EvalNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*EvalNode) As ¶
List of names for each expression. The expressions are evaluated in order and the result of a previous expression will be available in later expressions via the name provided.
Example:
stream .eval(lambda: "value" * "value", lambda: 1.0 / "value2") .as('value2', 'inv_value2')
The above example calculates two fields from the value and names them `value2` and `inv_value2` respectively.
tick:property
func (*EvalNode) Derivative ¶
func (n *EvalNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*EvalNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*EvalNode) GroupBy ¶
func (n *EvalNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*EvalNode) HttpOut ¶
func (n *EvalNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*EvalNode) InfluxDBOut ¶
func (n *EvalNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*EvalNode) Keep ¶
If called the existing fields will be preserved in addition to the new fields being set. If not called then only new fields are preserved.
Optionally intermediate values can be discarded by passing a list of field names. Only fields in the list will be kept. If no list is given then all fields, new and old, are kept.
Example:
stream .eval(lambda: "value" * "value", lambda: 1.0 / "value2") .as('value2', 'inv_value2') .keep('value', 'inv_value2')
In the above example the original field `value` is preserved. In addition the new field `value2` is calculated and used in evaluating `inv_value2` but is discarded before the point is sent on to children nodes. The resulting point has only two fields `value` and `inv_value2`. tick:property
func (*EvalNode) MapReduce ¶
func (n *EvalNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*EvalNode) Sample ¶
func (n *EvalNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*EvalNode) Window ¶
func (n *EvalNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type ExecHandler ¶ added in v0.2.4
tick:embedded:AlertNode.Exec
func (ExecHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (ExecHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type GroupByNode ¶
type GroupByNode struct { //The dimensions by which to group to the data. // tick:ignore Dimensions []interface{} // contains filtered or unexported fields }
A GroupByNode will group the incoming data. Each group is then processed independently for the rest of the pipeline. Only tags that are dimensions in the grouping will be preserved; all other tags are dropped.
Example:
stream .groupBy('service', 'datacenter') ...
The above example groups the data along two dimensions `service` and `datacenter`. Groups are dynamically created as new data arrives and each group is processed independently.
func (*GroupByNode) Alert ¶
func (n *GroupByNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*GroupByNode) Derivative ¶
func (n *GroupByNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*GroupByNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*GroupByNode) GroupBy ¶
func (n *GroupByNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*GroupByNode) HttpOut ¶
func (n *GroupByNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*GroupByNode) InfluxDBOut ¶
func (n *GroupByNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*GroupByNode) MapReduce ¶
func (n *GroupByNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*GroupByNode) Sample ¶
func (n *GroupByNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*GroupByNode) Window ¶
func (n *GroupByNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type HTTPOutNode ¶
type HTTPOutNode struct { // The relative path where the cached data is exposed // tick:ignore Endpoint string // contains filtered or unexported fields }
An HTTPOutNode caches the most recent data for each group it has received.
The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
Example:
stream .window() .period(10s) .every(5s) .mapReduce(influxql.top('value', 10)) //Publish the top 10 results over the last 10s updated every 5s. .httpOut('top10')
func (*HTTPOutNode) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (*HTTPOutNode) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type HipChatHandler ¶ added in v0.2.4
type HipChatHandler struct { *AlertNode // HipChat room in which to post messages. // If empty uses the channel from the configuration. Room string // HipChat authentication token. // If empty uses the token from the configuration. Token string }
tick:embedded:AlertNode.HipChat
func (HipChatHandler) Children ¶ added in v0.2.4
func (n HipChatHandler) Children() []Node
tick:ignore
func (HipChatHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (HipChatHandler) Parents ¶ added in v0.2.4
func (n HipChatHandler) Parents() []Node
tick:ignore
func (HipChatHandler) Provides ¶ added in v0.2.4
func (n HipChatHandler) Provides() EdgeType
tick:ignore
func (HipChatHandler) SetName ¶ added in v0.2.4
func (n HipChatHandler) SetName(name string)
tick:ignore
func (HipChatHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type InfluxDBOutNode ¶
type InfluxDBOutNode struct { // The name of the database. Database string // The name of the retention policy. RetentionPolicy string // The name of the measurement. Measurement string // The write consistency to use when writing the data. WriteConsistency string // The precision to use when writing the data. Precision string // Static set of tags to add to all data points before writing them. //tick:ignore Tags map[string]string // contains filtered or unexported fields }
Writes the data to InfluxDB as it is received.
Example:
stream .eval(lambda: "errors" / "total") .as('error_percent') // Write the transformed data to InfluxDB .influxDBOut() .database('mydb') .retentionPolicy('myrp') .measurement('errors') .tag('kapacitor', 'true') .tag('version', '0.2')
func (*InfluxDBOutNode) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (*InfluxDBOutNode) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
func (*InfluxDBOutNode) Tag ¶
func (i *InfluxDBOutNode) Tag(key, value string) *InfluxDBOutNode
Add a static tag to all data points. Tag can be called more than once.
tick:property
type JoinNode ¶
type JoinNode struct { // The alias names of the two parents. // Note: // Names[1] corresponds to the left parent // Names[0] corresponds to the right parent // tick:ignore Names []string // The name of this new joined data stream. // If empty the name of the left parent is used. StreamName string // The maximum duration of time that two incoming points // can be apart and still be considered to be equal in time. // The joined data point's time will be rounded to the nearest // multiple of the tolerance duration. Tolerance time.Duration // Fill the data. // The fill option implies the type of join: inner or full outer // Options are: // // - none - (default) skip rows where a point is missing, inner join. // - null - fill missing points with null, full outer join. // - Any numerical value - fill fields with given value, full outer join. Fill interface{} // contains filtered or unexported fields }
Joins the data from any number of nodes. As each data point is received from a parent node it is paired with the next data points from the other parent nodes with a matching timestamp. Each parent node contributes at most one point to each joined point. A tolerance can be supplied to join points that do not have perfectly aligned timestamps. Any points that fall within the tolerance are joined on the timestamp. If multiple points fall within the same tolerance window than they are joined in the order they arrive.
Aliases are used to prefix all fields from the respective nodes.
The join can be an inner or outer join, see the JoinNode.Fill property.
Example:
var errors = stream .from().measurement('errors') var requests = stream .from().measurement('requests') // Join the errors and requests streams errors.join(requests) // Provide prefix names for the fields of the data points. .as('errors', 'requests') // points that are within 1 second are considered the same time. .tolerance(1s) // fill missing values with 0, implies outer join. .fill(0.0) // name the resulting stream .streamName('error_rate') // Both the "value" fields from each parent have been prefixed // with the respective names 'errors' and 'requests'. .eval(lambda: "errors.value" / "requests.value")) .as('rate') ...
In the above example the `errors` and `requests` streams are joined and then transformed to calculate a combined field.
func (*JoinNode) Alert ¶
func (n *JoinNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*JoinNode) As ¶
Prefix names for all fields from the respective nodes. Each field from the parent nodes will be prefixed with the provided name and a '.'. See the example above.
The names cannot have a dot '.' character.
tick:property
func (*JoinNode) Derivative ¶
func (n *JoinNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*JoinNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*JoinNode) GroupBy ¶
func (n *JoinNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*JoinNode) HttpOut ¶
func (n *JoinNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*JoinNode) InfluxDBOut ¶
func (n *JoinNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*JoinNode) MapReduce ¶
func (n *JoinNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*JoinNode) Sample ¶
func (n *JoinNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*JoinNode) Window ¶
func (n *JoinNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type LogHandler ¶ added in v0.2.4
type LogHandler struct { *AlertNode // Absolute path the the log file. // It will be created if it does not exist. // tick:ignore FilePath string }
tick:embedded:AlertNode.Log
func (LogHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (LogHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type MapNode ¶
type MapNode struct { // The map function // tick:ignore Map interface{} // contains filtered or unexported fields }
Performs a map operation on the data stream. In the map-reduce framework it is assumed that several different partitions of the data can be 'mapped' in parallel while only one 'reduce' operation will process all of the data stream.
Example:
stream .window() .period(10s) .every(10s) // Sum the values for each 10s window of data. .mapReduce(influxql.sum('value')) ...
func (*MapNode) Alert ¶
func (n *MapNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*MapNode) Derivative ¶
func (n *MapNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*MapNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*MapNode) GroupBy ¶
func (n *MapNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*MapNode) HttpOut ¶
func (n *MapNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*MapNode) InfluxDBOut ¶
func (n *MapNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*MapNode) MapReduce ¶
func (n *MapNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*MapNode) Sample ¶
func (n *MapNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*MapNode) Window ¶
func (n *MapNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type MapReduceInfo ¶
type MapReduceInfo struct { Map interface{} Reduce interface{} Edge EdgeType }
tick:ignore
type Node ¶
type Node interface { // List of parents of this node. Parents() []Node // List of children of this node. Children() []Node // Short description of the node does not need to be unique Desc() string // Friendly readable unique name of the node Name() string SetName(string) // Unique id for the node ID() ID // The type of input the node wants. Wants() EdgeType // The type of output the node provides. Provides() EdgeType // contains filtered or unexported methods }
Generic node in a pipeline
type OpsGenieHandler ¶ added in v0.2.4
type OpsGenieHandler struct { *AlertNode // OpsGenie Teams. // tick:ignore TeamsList []string // OpsGenie Recipients. // tick:ignore RecipientsList []string }
tick:embedded:AlertNode.OpsGenie
func (OpsGenieHandler) Children ¶ added in v0.2.4
func (n OpsGenieHandler) Children() []Node
tick:ignore
func (OpsGenieHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (OpsGenieHandler) Parents ¶ added in v0.2.4
func (n OpsGenieHandler) Parents() []Node
tick:ignore
func (OpsGenieHandler) Provides ¶ added in v0.2.4
func (n OpsGenieHandler) Provides() EdgeType
tick:ignore
func (*OpsGenieHandler) Recipients ¶ added in v0.2.4
func (og *OpsGenieHandler) Recipients(recipients ...string) *OpsGenieHandler
The list of recipients to be alerted. If empty defaults to the recipients from the configuration. tick:property
func (OpsGenieHandler) SetName ¶ added in v0.2.4
func (n OpsGenieHandler) SetName(name string)
tick:ignore
func (OpsGenieHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
func (*OpsGenieHandler) Teams ¶ added in v0.2.4
func (og *OpsGenieHandler) Teams(teams ...string) *OpsGenieHandler
The list of teams to be alerted. If empty defaults to the teams from the configuration. tick:property
type PagerDutyHandler ¶ added in v0.2.4
type PagerDutyHandler struct {
*AlertNode
}
tick:embedded:AlertNode.PagerDuty
func (PagerDutyHandler) Children ¶ added in v0.2.4
func (n PagerDutyHandler) Children() []Node
tick:ignore
func (PagerDutyHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (PagerDutyHandler) Parents ¶ added in v0.2.4
func (n PagerDutyHandler) Parents() []Node
tick:ignore
func (PagerDutyHandler) Provides ¶ added in v0.2.4
func (n PagerDutyHandler) Provides() EdgeType
tick:ignore
func (PagerDutyHandler) SetName ¶ added in v0.2.4
func (n PagerDutyHandler) SetName(name string)
tick:ignore
func (PagerDutyHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
A complete data processing pipeline. Starts with a single source. tick:ignore
func CreatePipeline ¶
func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope, deadman DeadmanService) (*Pipeline, error)
Create a pipeline from a given script. tick:ignore
type PostHandler ¶ added in v0.2.4
tick:embedded:AlertNode.Email
func (PostHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (PostHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type ReduceNode ¶
type ReduceNode struct { //The reduce function // tick:ignore Reduce interface{} // Whether to use the max time or the // time of the selected point // tick:ignore PointTimes bool // The name of the field, defaults to the name of // MR function used (i.e. influxql.mean -> 'mean') As string // contains filtered or unexported fields }
Performs a reduce operation on the data stream. In the map-reduce framework it is assumed that several different partitions of the data can be 'mapped' in parallel while only one 'reduce' operation will process all of the data stream.
Example:
stream .window() .period(10s) .every(10s) // Sum the values for each 10s window of data. .mapReduce(influxql.sum('value')) ...
func (*ReduceNode) Alert ¶
func (n *ReduceNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*ReduceNode) Derivative ¶
func (n *ReduceNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*ReduceNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*ReduceNode) GroupBy ¶
func (n *ReduceNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*ReduceNode) HttpOut ¶
func (n *ReduceNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*ReduceNode) InfluxDBOut ¶
func (n *ReduceNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*ReduceNode) MapReduce ¶
func (n *ReduceNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*ReduceNode) Sample ¶
func (n *ReduceNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*ReduceNode) UsePointTimes ¶
func (r *ReduceNode) UsePointTimes() *ReduceNode
Use the time of the selected point instead of the time of the batch.
Only applies to selector MR functions like first, last, top, bottom, etc. Aggregation functions always use the batch time. tick:property
func (*ReduceNode) Window ¶
func (n *ReduceNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type SampleNode ¶
type SampleNode struct { // Keep every Count point or batch // tick:ignore Count int64 // Keep one point or batch every Duration // tick:ignore Duration time.Duration // contains filtered or unexported fields }
Sample points or batches. One point will be emitted every count or duration specified.
Example:
stream. .sample(3)
Keep every third data point or batch.
Example:
stream. .sample(10s)
Keep only samples that land on the 10s boundary. See StreamNode.Truncate, BatchNode.GroupBy time or WindowNode.Align for ensuring data is aligned with a boundary.
func (*SampleNode) Alert ¶
func (n *SampleNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*SampleNode) Derivative ¶
func (n *SampleNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*SampleNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*SampleNode) GroupBy ¶
func (n *SampleNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*SampleNode) HttpOut ¶
func (n *SampleNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*SampleNode) InfluxDBOut ¶
func (n *SampleNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*SampleNode) MapReduce ¶
func (n *SampleNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*SampleNode) Sample ¶
func (n *SampleNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*SampleNode) Window ¶
func (n *SampleNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type SlackHandler ¶ added in v0.2.4
type SlackHandler struct { *AlertNode // Slack channel in which to post messages. // If empty uses the channel from the configuration. Channel string }
tick:embedded:AlertNode.Slack
func (SlackHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (SlackHandler) Provides ¶ added in v0.2.4
func (n SlackHandler) Provides() EdgeType
tick:ignore
func (SlackHandler) SetName ¶ added in v0.2.4
func (n SlackHandler) SetName(name string)
tick:ignore
func (SlackHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type SourceBatchNode ¶
type SourceBatchNode struct {
// contains filtered or unexported fields
}
A node that handles creating several child BatchNodes. Each call to `query` creates a child batch node that can further be configured. See BatchNode The `batch` variable in batch tasks is an instance of a SourceBatchNode.
Example:
var errors = batch .query('SELECT value from errors') ... var views = batch .query('SELECT value from views') ...
func (*SourceBatchNode) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (*SourceBatchNode) Query ¶
func (b *SourceBatchNode) Query(q string) *BatchNode
The query to execute. Must not contain a time condition in the `WHERE` clause or contain a `GROUP BY` clause. The time conditions are added dynamically according to the period, offset and schedule. The `GROUP BY` clause is added dynamically according to the dimensions passed to the `groupBy` method.
func (*SourceBatchNode) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type StatsNode ¶ added in v0.10.0
type StatsNode struct { // tick:ignore SourceNode Node // tick:ignore Interval time.Duration // contains filtered or unexported fields }
A StatsNode emits internal statistics about the another node at a given interval.
The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the other node is receiving. As a result the StatsNode is a root node in the task pipeline.
The currently available internal statistics:
- collected -- the number of points or batches this node has received.
Each stat is available as a field in the emitted data stream.
Example:
var data = stream.from()... // Emit statistics every 1 minute and cache them via the HTTP API. data.stats(1m).httpOut('stats') // Contiue normal processing of the data stream data....
WARNING: It is not recommened to join the stats stream with the orginal data stream. Since they operate on different clocks you could potentially create a deadlock. This is a limitation of the current implementation and may be removed in the future.
func (*StatsNode) Alert ¶ added in v0.10.0
func (n *StatsNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*StatsNode) Derivative ¶ added in v0.10.0
func (n *StatsNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*StatsNode) Eval ¶ added in v0.10.0
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*StatsNode) GroupBy ¶ added in v0.10.0
func (n *StatsNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*StatsNode) HttpOut ¶ added in v0.10.0
func (n *StatsNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*StatsNode) InfluxDBOut ¶ added in v0.10.0
func (n *StatsNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*StatsNode) Join ¶ added in v0.10.0
Join this node with other nodes. The data is joined on timestamp.
func (*StatsNode) MapReduce ¶ added in v0.10.0
func (n *StatsNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*StatsNode) Sample ¶ added in v0.10.0
func (n *StatsNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*StatsNode) Union ¶ added in v0.10.0
Perform the union of this node and all other given nodes.
func (*StatsNode) Where ¶ added in v0.10.0
Create a new node that filters the data stream by a given expression.
func (*StatsNode) Window ¶ added in v0.10.0
func (n *StatsNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type StreamNode ¶
type StreamNode struct { // An expression to filter the data stream. // tick:ignore Expression tick.Node // The dimensions by which to group to the data. // tick:ignore Dimensions []interface{} // The database name. // If empty any database will be used. Database string // The retention policy name // If empty any retention policy will be used. RetentionPolicy string // The measurement name // If empty any measurement will be used. Measurement string // Optional duration for truncating timestamps. // Helpful to ensure data points land on specfic boundaries // Example: // stream // .from().measurement('mydata') // .truncate(1s) // // All incoming data will be truncated to 1 second resolution. Truncate time.Duration // contains filtered or unexported fields }
A StreamNode represents the source of data being streamed to Kapacitor via any of its inputs. The stream node allows you to select which portion of the stream you want to process. The `stream` variable in stream tasks is an instance of a StreamNode.
Example:
stream .from() .database('mydb') .retentionPolicy('myrp') .measurement('mymeasurement') .where(lambda: "host" =~ /logger\d+/) .window() ...
The above example selects only data points from the database `mydb` and retention policy `myrp` and measurement `mymeasurement` where the tag `host` matches the regex `logger\d+`
func (*StreamNode) Alert ¶
func (n *StreamNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*StreamNode) Derivative ¶
func (n *StreamNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*StreamNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*StreamNode) From ¶
func (s *StreamNode) From() *StreamNode
Creates a new stream node that can be further filtered using the Database, RetentionPolicy, Measurement and Where properties. From can be called multiple times to create multiple independent forks of the data stream.
Example:
// Select the 'cpu' measurement from just the database 'mydb' // and retention policy 'myrp'. var cpu = stream.from() .database('mydb') .retentionPolicy('myrp') .measurement('cpu') // Select the 'load' measurement from any database and retention policy. var load = stream.from() .measurement('load') // Join cpu and load streams and do further processing. cpu.join(load) .as('cpu', 'load') ...
func (*StreamNode) GroupBy ¶
func (s *StreamNode) GroupBy(tag ...interface{}) *StreamNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*StreamNode) HttpOut ¶
func (n *StreamNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*StreamNode) InfluxDBOut ¶
func (n *StreamNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*StreamNode) MapReduce ¶
func (n *StreamNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*StreamNode) Sample ¶
func (n *StreamNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*StreamNode) Where ¶
func (s *StreamNode) Where(expression tick.Node) *StreamNode
Filter the current stream using the given expression. This expression is a Kapacitor expression. Kapacitor expressions are a superset of InfluxQL WHERE expressions. See the `Expression` docs for more information.
If empty then all data points are considered to match. tick:property
func (*StreamNode) Window ¶
func (n *StreamNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type UDFNode ¶ added in v0.10.0
type UDFNode struct { //tick:ignore Commander command.Commander // tick:ignore Timeout time.Duration // Options that were set on the node // tick:ignore Options []*udf.Option // contains filtered or unexported fields }
A UDFNode is a node that can run a User Defined Function (UDF) in a separate process.
A UDF is a custom script or binary that can communicate via Kapacitor's UDF RPC protocol. The path and arguments to the UDF program are specified in Kapacitor's configuration. Using TICKscripts you can invoke and configure your UDF for each task.
See the [README.md](https://github.com/influxdata/kapacitor/tree/master/udf/agent/) for details on how to write your own UDF.
UDFs are configured via Kapacitor's main configuration file.
Example:
[udf] [udf.functions] # Example moving average UDF. [udf.functions.movingAverage] prog = "/path/to/executable/moving_avg" args = [] timeout = "10s"
UDFs are first class objects in TICKscripts and are referenced via their configuration name.
Example:
// Given you have a UDF that computes a moving average // The UDF can define what its options are and then can be // invoked via a TICKscript like so: stream .from()... .movingAverage() .field('value') .size(100) .as('mavg') .httpOut('movingaverage')
NOTE: The UDF process runs as the same user as the Kapacitor daemon. As a result make the user is properly secured as well as the configuration file.
func (*UDFNode) Alert ¶ added in v0.10.0
func (n *UDFNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*UDFNode) CallMethod ¶ added in v0.10.0
tick:ignore
func (*UDFNode) Derivative ¶ added in v0.10.0
func (n *UDFNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*UDFNode) Eval ¶ added in v0.10.0
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*UDFNode) GroupBy ¶ added in v0.10.0
func (n *UDFNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*UDFNode) HasProperty ¶ added in v0.10.0
tick:ignore
func (*UDFNode) HttpOut ¶ added in v0.10.0
func (n *UDFNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*UDFNode) InfluxDBOut ¶ added in v0.10.0
func (n *UDFNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*UDFNode) Join ¶ added in v0.10.0
Join this node with other nodes. The data is joined on timestamp.
func (*UDFNode) MapReduce ¶ added in v0.10.0
func (n *UDFNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*UDFNode) Sample ¶ added in v0.10.0
func (n *UDFNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*UDFNode) SetProperty ¶ added in v0.10.0
tick:ignore
func (*UDFNode) Where ¶ added in v0.10.0
Create a new node that filters the data stream by a given expression.
func (*UDFNode) Window ¶ added in v0.10.0
func (n *UDFNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type UnionNode ¶
type UnionNode struct { // The new name of the stream. // If empty the name of the left node // (i.e. `leftNode.union(otherNode1, otherNode2)`) is used. Rename string // contains filtered or unexported fields }
Takes the union of all of its parents. The union is just a simple pass through. Each data points received from each parent is passed onto children nodes without modification.
Example:
var logins = stream.from().measurement('logins') var logouts = stream.from().measurement('logouts') var frontpage = stream.from().measurement('frontpage') // Union all user actions into a single stream logins.union(logouts, frontpage) .rename('user_actions') ...
func (*UnionNode) Alert ¶
func (n *UnionNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*UnionNode) Derivative ¶
func (n *UnionNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*UnionNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*UnionNode) GroupBy ¶
func (n *UnionNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*UnionNode) HttpOut ¶
func (n *UnionNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*UnionNode) InfluxDBOut ¶
func (n *UnionNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*UnionNode) MapReduce ¶
func (n *UnionNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*UnionNode) Sample ¶
func (n *UnionNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*UnionNode) Window ¶
func (n *UnionNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type VictorOpsHandler ¶ added in v0.2.4
type VictorOpsHandler struct { *AlertNode // The routing key to use for the alert. // Defaults to the value in the configuration if empty. RoutingKey string }
tick:embedded:AlertNode.VictorOps
func (VictorOpsHandler) Children ¶ added in v0.2.4
func (n VictorOpsHandler) Children() []Node
tick:ignore
func (VictorOpsHandler) Deadman ¶ added in v0.10.0
Helper function for creating an alert on low throughput, aka deadman's switch.
- Threshold -- trigger alert if throughput drops below threshold in points/interval. - Interval -- how often to check the throughput.
Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.deadman(100.0, 10s) //Do normal processing of data data....
The above is equivalent to this Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. data.stats(10s) .derivative('collected') .unit(10s) .nonNegative() .alert() .id('node \'stream0\' in task \'{{ .TaskName }}\'') .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') .crit(lamdba: "collected" <= 100.0) //Do normal processing of data data....
The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section.
Since the AlertNode is the last piece it can be further modified as normal. Example:
var data = stream.from()... // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. data.deadman(100.0, 10s).slack().channel('#dead_tasks') //Do normal processing of data data....
func (VictorOpsHandler) Parents ¶ added in v0.2.4
func (n VictorOpsHandler) Parents() []Node
tick:ignore
func (VictorOpsHandler) Provides ¶ added in v0.2.4
func (n VictorOpsHandler) Provides() EdgeType
tick:ignore
func (VictorOpsHandler) SetName ¶ added in v0.2.4
func (n VictorOpsHandler) SetName(name string)
tick:ignore
func (VictorOpsHandler) Stats ¶ added in v0.10.0
Create a new stream of data that contains the internal statistics of the node. The interval represents how often to emit the statistics based on real time. This means the interval time is independent of the times of the data points the source node is receiving.
type WhereNode ¶
type WhereNode struct { // The expression predicate. // tick:ignore Expression tick.Node // contains filtered or unexported fields }
The WhereNode filters the data stream by a given expression.
Example: var sums = stream
.groupBy('service', 'host') .mapReduce(influxdb.sum('value'))
//Watch particular host for issues. sums
.where(lambda: "host" == 'h001.example.com') .alert() .crit(lambda: TRUE) .email('user@example.com')
func (*WhereNode) Alert ¶
func (n *WhereNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*WhereNode) Derivative ¶
func (n *WhereNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*WhereNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*WhereNode) GroupBy ¶
func (n *WhereNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*WhereNode) HttpOut ¶
func (n *WhereNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*WhereNode) InfluxDBOut ¶
func (n *WhereNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*WhereNode) MapReduce ¶
func (n *WhereNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*WhereNode) Sample ¶
func (n *WhereNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*WhereNode) Window ¶
func (n *WhereNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.
type WindowNode ¶
type WindowNode struct { // The period, or length in time, of the window. Period time.Duration // How often the current window is emitted into the pipeline. Every time.Duration // Wether to align the window edges with the zero time // tick:ignore AlignFlag bool // contains filtered or unexported fields }
Windows data over time. A window has a length defined by `period` and a frequency at which it emits the window to the pipeline.
Example:
stream .window() .period(10m) .every(5m) .httpOut('recent')
The above windowing example emits a window to the pipeline every `5 minutes` and the window contains the last `10 minutes` worth of data. As a result each time the window is emitted it contains half new data and half old data.
NOTE: Time for a window (or any node) is implemented by inspecting the times on the incoming data points. As a result if the incoming data stream stops then no more windows will be emitted because time is no longer increasing for the window node.
func (*WindowNode) Alert ¶
func (n *WindowNode) Alert() *AlertNode
Create an alert node, which can trigger alerts.
func (*WindowNode) Align ¶
func (w *WindowNode) Align() *WindowNode
Wether to align the window edges with the zero time. If not aligned the window starts and ends relative to the first data point it receives. tick:property
func (*WindowNode) Derivative ¶
func (n *WindowNode) Derivative(field string) *DerivativeNode
Create a new node that computes the derivative of adjacent points.
func (*WindowNode) Eval ¶
Create an eval node that will evaluate the given transformation function to each data point.
A list of expressions may be provided and will be evaluated in the order they are given
and results of previous expressions are made available to later expressions.
func (*WindowNode) GroupBy ¶
func (n *WindowNode) GroupBy(tag ...interface{}) *GroupByNode
Group the data by a set of tags.
Can pass literal * to group by all dimensions. Example:
.groupBy(*)
func (*WindowNode) HttpOut ¶
func (n *WindowNode) HttpOut(endpoint string) *HTTPOutNode
Create an http output node that caches the most recent data it has received. The cached data is available at the given endpoint. The endpoint is the relative path from the API endpoint of the running task. For example if the task endpoint is at "/api/v1/task/<task_name>" and endpoint is "top10", then the data can be requested from "/api/v1/task/<task_name>/top10".
func (*WindowNode) InfluxDBOut ¶
func (n *WindowNode) InfluxDBOut() *InfluxDBOutNode
Create an influxdb output node that will store the incoming data into InfluxDB.
func (*WindowNode) MapReduce ¶
func (n *WindowNode) MapReduce(mr MapReduceInfo) *ReduceNode
Perform a map-reduce operation on the data. The built-in functions under `influxql` provide the selection,aggregation, and transformation functions from the InfluxQL language.
MapReduce may be applied to either a batch or a stream edge. In the case of a batch each batch is passed to the mapper idependently. In the case of a stream all incoming data points that have the exact same time are combined into a batch and sent to the mapper.
func (*WindowNode) Sample ¶
func (n *WindowNode) Sample(rate interface{}) *SampleNode
Create a new node that samples the incoming points or batches.
One point will be emitted every count or duration specified.
func (*WindowNode) Window ¶
func (n *WindowNode) Window() *WindowNode
Create a new node that windows the stream by time.
NOTE: Window can only be applied to stream edges.