kapacitor

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2015 License: MIT Imports: 33 Imported by: 0

README

Kapacitor Circle CI

Open source framework for processing, monitoring, and alerting on time series data

Installation

Kapacitor has two binaries:

  • kapacitor – a CLI program for calling the Kapacitor API.
  • kapacitord – the Kapacitor server daemon.

You can either download the binaries directly from the downloads page or go get them:

go get github.com/influxdb/kapacitor/cmd/kapacitor
go get github.com/influxdb/kapacitor/cmd/kapacitord

Configuration

An example configuration file can be found here

Kapacitor can also provide an example config for you using this command:

kapacitord config

Getting Started

This README gives you a high level overview of what Kapacitor is and what its like to use it. As well as some details of how it works. To get started using Kapacitor see this guide.

Basic Example

Kapacitor use a DSL named TICKscript to define tasks.

A simple TICKscript that alerts on high cpu usage looks like this:

stream
    .from().measurement('cpu_usage_idle')
    .groupBy('host')
    .window()
        .period(1m)
        .every(1m)
    .mapReduce(influxql.mean('value'))
    .eval(lambda: 100.0 - "mean")
        .as('used')
    .alert()
        .message('{{ .Level}}: {{ .Name }}/{{ index .Tags "host" }} has high cpu usage: {{ index .Fields "used" }}')
        .warn(lambda: "used" > 70.0)
        .crit(lambda: "used" > 85.0)

        // Send alert to hander of choice.

        // Slack
        .slack()
        .channel('#alerts')

        // VictorOps
        .victorOps()
        .routingKey('team_rocket')

        // PagerDuty
        .pagerDuty()

Place the above script into a file cpu_alert.tick then run these commands to start the task:

# Define the task (assumes cpu data is in db 'telegraf')
kapacitor define \
    -name cpu_alert \
    -type stream \
    -dbrp telegraf.default \
    -tick ./cpu_alert.tick
# Start the task
kapacitor enable cpu_alert

For more complete examples see the documentation

Documentation

Overview

A data pipeline processing engine.

See the README for more complete examples and guides.

Code Organization:

The pipeline package provides an API for how nodes can be connected to form a pipeline. The individual implementations of each node exist in this kapacitor package. The reason for the separation is to keep the exported API from the pipeline package clean as it is consumed via the TICKscripts (a DSL for Kapacitor).

Other Concepts:

Stream vs Batch -- Use of the word 'stream' indicates data arrives a single data point at a time. Use of the word 'batch' indicates data arrives in sets or batches or data points.

Task -- A task represents a concrete workload to perform. It consists of a pipeline and an identifying name. Basic CRUD operations can be performed on tasks.

Task Master -- Responsible for executing a task in a specific environment.

Replay -- Replays static datasets against tasks.

Index

Constants

View Source
const (
	// List of names for top-level exported vars
	ClusterIDVarName = "cluster_id"
	ServerIDVarName  = "server_id"
	HostVarName      = "host"
	ProductVarName   = "product"
	VersionVarName   = "version"

	NumTasksVarName         = "num_tasks"
	NumEnabledTasksVarName  = "num_enabled_tasks"
	NumSubscriptionsVarName = "num_subscriptions"

	UptimeVarName = "uptime"

	// The name of the product
	Product = "kapacitor"
)

Variables

View Source
var (
	// Global expvars
	NumTasks         = &expvar.Int{}
	NumEnabledTasks  = &expvar.Int{}
	NumSubscriptions = &expvar.Int{}
)
View Source
var ErrAborted = errors.New("edged aborted")
View Source
var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
View Source
var ErrTaskMasterOpen = errors.New("TaskMaster is open")
View Source
var ErrWrongTaskType = errors.New("wrong task type")

Functions

func CreateDBRPMap

func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool

func EvalPredicate

func EvalPredicate(se *tick.StatefulExpr, fields models.Fields, tags map[string]string) (bool, error)

Evaluate a given expression as a boolean predicate against a set of fields and tags

func GetFloatVar

func GetFloatVar(name string) float64

Gets an exported var and returns its float value

func GetIntVar

func GetIntVar(name string) int64

Gets an exported var and returns its int value

func GetStringVar

func GetStringVar(name string) string

Gets an exported var and returns its unquoted string contents

func NewStatistics

func NewStatistics(name string, tags map[string]string) *expvar.Map

NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, and values are placed at the key "values". The "values" map is returned so that statistics can be set.

func Uptime added in v0.2.0

func Uptime() time.Duration

func WriteBatchForRecording

func WriteBatchForRecording(w io.Writer, b models.Batch) error

func WritePointForRecording

func WritePointForRecording(w io.Writer, p models.Point, precision string) error

Types

type AlertData

type AlertData struct {
	ID      string          `json:"id"`
	Message string          `json:"message"`
	Time    time.Time       `json:"time"`
	Level   AlertLevel      `json:"level"`
	Data    influxql.Result `json:"data"`
}

type AlertHandler

type AlertHandler func(ad *AlertData)

type AlertLevel

type AlertLevel int
const (
	OKAlert AlertLevel = iota
	InfoAlert
	WarnAlert
	CritAlert
)

func (AlertLevel) MarshalText

func (l AlertLevel) MarshalText() ([]byte, error)

func (AlertLevel) String

func (l AlertLevel) String() string

type AlertNode

type AlertNode struct {
	// contains filtered or unexported fields
}

func (*AlertNode) Err

func (n *AlertNode) Err() error

type BatchCollector

type BatchCollector interface {
	CollectBatch(models.Batch) error
	Close()
}

type BatchNode

type BatchNode struct {
	// contains filtered or unexported fields
}

func (*BatchNode) DBRPs

func (b *BatchNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*BatchNode) Err

func (n *BatchNode) Err() error

func (*BatchNode) Queries

func (b *BatchNode) Queries(start, stop time.Time) []string

func (*BatchNode) Start

func (b *BatchNode) Start()

type DBRP

type DBRP struct {
	Database        string `json:"db"`
	RetentionPolicy string `json:"rp"`
}

func (DBRP) String

func (d DBRP) String() string

type DerivativeNode

type DerivativeNode struct {
	// contains filtered or unexported fields
}

func (*DerivativeNode) Err

func (n *DerivativeNode) Err() error

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

func (*Edge) Abort added in v0.2.1

func (e *Edge) Abort()

Abort all next and collect calls. Items in flight may or may not be processed.

func (*Edge) Close

func (e *Edge) Close()

Close the edge, this can only be called after all collect calls to the edge have finished.

func (*Edge) CollectBatch

func (e *Edge) CollectBatch(b models.Batch) error

func (*Edge) CollectMaps

func (e *Edge) CollectMaps(m *MapResult) error

func (*Edge) CollectPoint

func (e *Edge) CollectPoint(p models.Point) error

func (*Edge) Next

func (e *Edge) Next() (p models.PointInterface, ok bool)

func (*Edge) NextBatch

func (e *Edge) NextBatch() (b models.Batch, ok bool)

func (*Edge) NextMaps

func (e *Edge) NextMaps() (m *MapResult, ok bool)

func (*Edge) NextPoint

func (e *Edge) NextPoint() (p models.Point, ok bool)

type EvalNode

type EvalNode struct {
	// contains filtered or unexported fields
}

func (*EvalNode) Err

func (n *EvalNode) Err() error

type ExecutingTask

type ExecutingTask struct {
	Task *Task
	// contains filtered or unexported fields
}

A task that is ready for execution.

func NewExecutingTask

func NewExecutingTask(tm *TaskMaster, t *Task) (*ExecutingTask, error)

Create a new task from a defined kapacitor.

func (*ExecutingTask) BatchCount

func (et *ExecutingTask) BatchCount() (int, error)

func (*ExecutingTask) BatchQueries

func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([][]string, error)

Get the next `num` batch queries that the batcher will run starting at time `start`.

func (*ExecutingTask) Err

func (et *ExecutingTask) Err() error

Wait till the task finishes and return any error

func (*ExecutingTask) GetOutput

func (et *ExecutingTask) GetOutput(name string) (Output, error)

Get a named output.

func (*ExecutingTask) StartBatching

func (et *ExecutingTask) StartBatching() error

Instruct source batch node to start querying and sending batches of data

type GroupByNode

type GroupByNode struct {
	// contains filtered or unexported fields
}

func (*GroupByNode) Err

func (n *GroupByNode) Err() error

type HTTPOutNode

type HTTPOutNode struct {
	// contains filtered or unexported fields
}

func (*HTTPOutNode) Endpoint

func (h *HTTPOutNode) Endpoint() string

func (*HTTPOutNode) Err

func (n *HTTPOutNode) Err() error

type InfluxDBOutNode

type InfluxDBOutNode struct {
	// contains filtered or unexported fields
}

func (*InfluxDBOutNode) Err

func (n *InfluxDBOutNode) Err() error

type JoinNode

type JoinNode struct {
	// contains filtered or unexported fields
}

func (*JoinNode) Err

func (n *JoinNode) Err() error

type LogService

type LogService interface {
	NewLogger(prefix string, flag int) *log.Logger
}

type MapFunc

type MapFunc func(in *tsdb.MapInput) interface{}

type MapInfo

type MapInfo struct {
	Field string
	Func  MapFunc
}

type MapNode

type MapNode struct {
	// contains filtered or unexported fields
}

func (*MapNode) Err

func (n *MapNode) Err() error

type MapResult

type MapResult struct {
	Name  string
	Group models.GroupID
	Dims  []string
	Tags  map[string]string
	TMax  time.Time
	Outs  []interface{}
}

type Node

type Node interface {
	pipeline.Node

	// wait for the node to finish processing and return any errors
	Err() error
	// contains filtered or unexported methods
}

A node that can be in an executor.

type Output

type Output interface {
	Endpoint() string
}

An output of a pipeline. Still need to improve this interface to expose different types of outputs.

type Query

type Query struct {
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery(q string) (*Query, error)

func (*Query) DBRPs

func (q *Query) DBRPs() ([]DBRP, error)

Return the db rp pairs of the query

func (*Query) Dimensions

func (q *Query) Dimensions(dims []interface{}) error

Set the dimensions on the query

func (*Query) Fill

func (q *Query) Fill(option influxql.FillOption, value interface{})

func (*Query) Start

func (q *Query) Start(s time.Time)

Set the start time of the query

func (*Query) Stop

func (q *Query) Stop(s time.Time)

Set the stop time of the query

func (*Query) String

func (q *Query) String() string

type ReduceFunc

type ReduceFunc func(in []interface{}, tmax time.Time, useTMax bool, as string) interface{}

type ReduceNode

type ReduceNode struct {
	// contains filtered or unexported fields
}

func (*ReduceNode) Err

func (n *ReduceNode) Err() error

type Replay

type Replay struct {
	Setter clock.Setter
	// contains filtered or unexported fields
}

Replay engine that can replay static data sets against a specific executor and its tasks.

func NewReplay

func NewReplay(c clock.Clock) *Replay

Create a new replay engine.

func (*Replay) ReplayBatch

func (r *Replay) ReplayBatch(data []io.ReadCloser, batches []BatchCollector, recTime bool) <-chan error

Replay a data set against an executor. If source time is true then the replay will use the times stored in the recording instead of the clock time.

func (*Replay) ReplayStream

func (r *Replay) ReplayStream(data io.ReadCloser, stream StreamCollector, recTime bool, precision string) <-chan error

Replay a data set against an executor.

type Result

type Result influxql.Result

The result from an output.

func ResultFromJSON

func ResultFromJSON(in io.Reader) (r Result)

Unmarshal a Result object from JSON.

type SampleNode

type SampleNode struct {
	// contains filtered or unexported fields
}

func (*SampleNode) Err

func (n *SampleNode) Err() error

type SourceBatchNode

type SourceBatchNode struct {
	// contains filtered or unexported fields
}

func (*SourceBatchNode) Count

func (s *SourceBatchNode) Count() int

func (*SourceBatchNode) DBRPs

func (s *SourceBatchNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*SourceBatchNode) Err

func (s *SourceBatchNode) Err() error

func (*SourceBatchNode) Queries

func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string

func (*SourceBatchNode) Start

func (s *SourceBatchNode) Start()

type StatsData added in v0.2.1

type StatsData struct {
	Name   string                 `json:"name"`
	Tags   map[string]string      `json:"tags"`
	Values map[string]interface{} `json:"values"`
}

func GetStatsData

func GetStatsData() ([]StatsData, error)

Return all stats data from the expvars.

type StreamCollector

type StreamCollector interface {
	CollectPoint(models.Point) error
	Close()
}

type StreamNode

type StreamNode struct {
	// contains filtered or unexported fields
}

func (*StreamNode) Err

func (n *StreamNode) Err() error

type Task

type Task struct {
	Name     string
	Pipeline *pipeline.Pipeline
	Type     TaskType
	DBRPs    []DBRP
}

The complete definition of a task, its name, pipeline and type.

func NewBatcher

func NewBatcher(name, script string, dbrps []DBRP) (*Task, error)

Create a new batcher task from a script.

func NewStreamer

func NewStreamer(name, script string, dbrps []DBRP) (*Task, error)

Create a new streamer task from a script.

func NewTask

func NewTask(name, script string, tt TaskType, dbrps []DBRP) (*Task, error)

func (*Task) Dot

func (t *Task) Dot() []byte

type TaskMaster

type TaskMaster struct {
	HTTPDService interface {
		AddRoutes([]httpd.Route) error
		DelRoutes([]httpd.Route)
		URL() string
	}
	InfluxDBService interface {
		NewClient() (*client.Client, error)
	}
	SMTPService interface {
		Global() bool
		SendMail(to []string, subject string, msg string)
	}
	VictorOpsService interface {
		Global() bool
		Alert(routingKey, messageType, message, entityID string, t time.Time, extra interface{}) error
	}
	PagerDutyService interface {
		Global() bool
		Alert(incidentKey, desc string, details interface{}) error
	}
	SlackService interface {
		Global() bool
		Alert(channel, message string, level AlertLevel) error
	}
	LogService LogService
	// contains filtered or unexported fields
}

An execution framework for a set of tasks.

func NewTaskMaster

func NewTaskMaster(l LogService) *TaskMaster

Create a new Executor with a given clock.

func (*TaskMaster) BatchCollectors

func (tm *TaskMaster) BatchCollectors(name string) []BatchCollector

func (*TaskMaster) Close

func (tm *TaskMaster) Close() error

func (*TaskMaster) DelFork

func (tm *TaskMaster) DelFork(name string)

func (*TaskMaster) Drain added in v0.2.1

func (tm *TaskMaster) Drain()

func (*TaskMaster) IsExecuting added in v0.2.1

func (tm *TaskMaster) IsExecuting(name string) bool

func (*TaskMaster) New

func (tm *TaskMaster) New() *TaskMaster

Returns a new TaskMaster instance with the same services as the current one.

func (*TaskMaster) NewFork

func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) (*Edge, error)

func (*TaskMaster) Open

func (tm *TaskMaster) Open() (err error)

func (*TaskMaster) StartTask

func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)

func (*TaskMaster) StopTask

func (tm *TaskMaster) StopTask(name string) error

func (*TaskMaster) Stream

func (tm *TaskMaster) Stream(name string) (StreamCollector, error)

func (*TaskMaster) WritePoints added in v0.2.1

func (tm *TaskMaster) WritePoints(pts *cluster.WritePointsRequest) error

type TaskType

type TaskType int

The type of a task

const (
	StreamTask TaskType = iota
	BatchTask
)

func (TaskType) String

func (t TaskType) String() string

type UnionNode

type UnionNode struct {
	// contains filtered or unexported fields
}

func (*UnionNode) Err

func (n *UnionNode) Err() error

type WhereNode

type WhereNode struct {
	// contains filtered or unexported fields
}

func (*WhereNode) Err

func (n *WhereNode) Err() error

type WindowNode

type WindowNode struct {
	// contains filtered or unexported fields
}

func (*WindowNode) Err

func (n *WindowNode) Err() error

Directories

Path Synopsis
A clock that provides blocking calls that wait until absolute times have occurred.
A clock that provides blocking calls that wait until absolute times have occurred.
cmd
Contains integration and end-to-end tests
Contains integration and end-to-end tests
Provides a set of structures for passing data around Kapacitor.
Provides a set of structures for passing data around Kapacitor.
Provides an API for constructing data processing pipelines.
Provides an API for constructing data processing pipelines.
services
httpd
Provides an HTTP API exposing many components of Kapacitor.
Provides an HTTP API exposing many components of Kapacitor.
reporting
Sends anonymous reports to InfluxData
Sends anonymous reports to InfluxData
stats
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
udp
TICKscript is a simple invocation chaining DSL.
TICKscript is a simple invocation chaining DSL.
cmd/tickdoc
Tickdoc is a simple utility similiar to godoc that generates documentation from comments.
Tickdoc is a simple utility similiar to godoc that generates documentation from comments.
Provides an io.Writer that filters log messages based on a log level.
Provides an io.Writer that filters log messages based on a log level.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL