Documentation ¶
Overview ¶
A data pipeline processing engine.
See the README for more complete examples and guides.
Code Organization:
The pipeline package provides an API for how nodes can be connected to form a pipeline. The individual implementations of each node exist in this kapacitor package. The reason for the separation is to keep the exported API from the pipeline package clean as it is consumed via the TICKscripts (a DSL for Kapacitor).
Other Concepts:
Stream vs Batch -- Use of the word 'stream' indicates data arrives a single data point at a time. Use of the word 'batch' indicates data arrives in sets or batches or data points.
Task -- A task represents a concrete workload to perform. It consists of a pipeline and an identifying name. Basic CRUD operations can be performed on tasks.
Task Master -- Responsible for executing a task in a specific environment.
Replay -- Replays static datasets against tasks.
Index ¶
- Constants
- Variables
- func ConvertResultTimes(r *Result)
- func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool
- func DeleteStatistics(key string)
- func EvalPredicate(se *tick.StatefulExpr, now time.Time, fields models.Fields, tags models.Tags) (bool, error)
- func NewStatistics(name string, tags map[string]string) (string, *kexpvar.Map)
- func Uptime() time.Duration
- func WriteBatchForRecording(w io.Writer, b models.Batch) error
- func WritePointForRecording(w io.Writer, p models.Point, precision string) error
- type AlertData
- type AlertHandler
- type AlertLevel
- type AlertNode
- type BatchCollector
- type BatchNode
- type DBRP
- type DerivativeNode
- type Edge
- func (e *Edge) Abort()
- func (e *Edge) Close()
- func (e *Edge) CollectBatch(b models.Batch) error
- func (e *Edge) CollectMaps(m *MapResult) error
- func (e *Edge) CollectPoint(p models.Point) error
- func (e *Edge) Next() (p models.PointInterface, ok bool)
- func (e *Edge) NextBatch() (b models.Batch, ok bool)
- func (e *Edge) NextMaps() (m *MapResult, ok bool)
- func (e *Edge) NextPoint() (p models.Point, ok bool)
- type EvalNode
- type ExecutingTask
- func (et *ExecutingTask) BatchCount() (int, error)
- func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([][]string, error)
- func (et *ExecutingTask) EDot(labels bool) []byte
- func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error)
- func (et *ExecutingTask) GetOutput(name string) (Output, error)
- func (et *ExecutingTask) Snapshot() (*TaskSnapshot, error)
- func (et *ExecutingTask) StartBatching() error
- func (et *ExecutingTask) StopStats()
- func (et *ExecutingTask) Wait() error
- type ExecutionStats
- type GroupByNode
- type HTTPOutNode
- type InfluxDBOutNode
- type InfluxQLNode
- type JoinNode
- type LogNode
- type LogService
- type MapFunc
- type MapInfo
- type MapNode
- type MapResult
- type MaxDuration
- type NoOpNode
- type Node
- type Output
- type Query
- type ReduceFunc
- type ReduceNode
- type Replay
- type Result
- type SampleNode
- type ShiftNode
- type SourceBatchNode
- type SourceStreamNode
- type StatsData
- type StatsNode
- type StreamCollector
- type StreamNode
- type Task
- type TaskMaster
- func (tm *TaskMaster) BatchCollectors(name string) []BatchCollector
- func (tm *TaskMaster) Close() error
- func (tm *TaskMaster) CreateTICKScope() *tick.Scope
- func (tm *TaskMaster) DelFork(name string)
- func (tm *TaskMaster) Drain()
- func (tm *TaskMaster) ExecutingDot(name string, labels bool) string
- func (tm *TaskMaster) ExecutionStats(name string) (ExecutionStats, error)
- func (tm *TaskMaster) IsExecuting(name string) bool
- func (tm *TaskMaster) New() *TaskMaster
- func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) (*Edge, error)
- func (tm *TaskMaster) NewTask(name, script string, tt TaskType, dbrps []DBRP, snapshotInterval time.Duration) (*Task, error)
- func (tm *TaskMaster) Open() (err error)
- func (tm *TaskMaster) SnapshotTask(name string) (*TaskSnapshot, error)
- func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)
- func (tm *TaskMaster) StopTask(name string) error
- func (tm *TaskMaster) StopTasks()
- func (tm *TaskMaster) Stream(name string) (StreamCollector, error)
- func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, ...) error
- type TaskSnapshot
- type TaskType
- type UDFNode
- type UDFProcess
- func (p *UDFProcess) Abort(err error)
- func (p *UDFProcess) Info() (UDFProcessInfo, error)
- func (p *UDFProcess) Init(options []*udf.Option) error
- func (p *UDFProcess) Restore(snapshot []byte) error
- func (p *UDFProcess) Snapshot() ([]byte, error)
- func (p *UDFProcess) Start() error
- func (p *UDFProcess) Stop() error
- type UDFProcessInfo
- type UDFService
- type UnionNode
- type WhereNode
- type WindowNode
Constants ¶
const ( // List of names for top-level exported vars ClusterIDVarName = "cluster_id" ServerIDVarName = "server_id" HostVarName = "host" ProductVarName = "product" VersionVarName = "version" NumTasksVarName = "num_tasks" NumEnabledTasksVarName = "num_enabled_tasks" NumSubscriptionsVarName = "num_subscriptions" UptimeVarName = "uptime" // The name of the product Product = "kapacitor" )
Variables ¶
var ( // Global expvars NumTasksVar = &kexpvar.Int{} NumEnabledTasksVar = &kexpvar.Int{} NumSubscriptionsVar = &kexpvar.Int{} ClusterIDVar = &kexpvar.String{} ServerIDVar = &kexpvar.String{} HostVar = &kexpvar.String{} ProductVar = &kexpvar.String{} VersionVar = &kexpvar.String{} )
var ErrAborted = errors.New("edged aborted")
var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
var ErrTaskMasterOpen = errors.New("TaskMaster is open")
var ErrUDFProcessStopped = errors.New("process already stopped")
var ErrWrongTaskType = errors.New("wrong task type")
Functions ¶
func ConvertResultTimes ¶ added in v0.10.1
func ConvertResultTimes(r *Result)
func CreateDBRPMap ¶
func EvalPredicate ¶
func EvalPredicate(se *tick.StatefulExpr, now time.Time, fields models.Fields, tags models.Tags) (bool, error)
Evaluate a given expression as a boolean predicate against a set of fields and tags
func NewStatistics ¶
NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, and values are placed at the key "values". The "values" map is returned so that statistics can be set.
Types ¶
type AlertHandler ¶
type AlertHandler func(ad *AlertData)
type AlertLevel ¶
type AlertLevel int
const ( OKAlert AlertLevel = iota InfoAlert WarnAlert CritAlert )
func (AlertLevel) MarshalText ¶
func (l AlertLevel) MarshalText() ([]byte, error)
func (AlertLevel) String ¶
func (l AlertLevel) String() string
func (*AlertLevel) UnmarshalText ¶ added in v0.10.1
func (l *AlertLevel) UnmarshalText(text []byte) error
type BatchCollector ¶
type BatchNode ¶
type BatchNode struct {
// contains filtered or unexported fields
}
type DerivativeNode ¶
type DerivativeNode struct {
// contains filtered or unexported fields
}
type Edge ¶
type Edge struct {
// contains filtered or unexported fields
}
func (*Edge) Abort ¶ added in v0.2.1
func (e *Edge) Abort()
Abort all next and collect calls. Items in flight may or may not be processed.
func (*Edge) Close ¶
func (e *Edge) Close()
Close the edge, this can only be called after all collect calls to the edge have finished.
func (*Edge) CollectMaps ¶
type ExecutingTask ¶
type ExecutingTask struct { Task *Task // contains filtered or unexported fields }
A task that is ready for execution.
func NewExecutingTask ¶
func NewExecutingTask(tm *TaskMaster, t *Task) (*ExecutingTask, error)
Create a new task from a defined kapacitor.
func (*ExecutingTask) BatchCount ¶
func (et *ExecutingTask) BatchCount() (int, error)
func (*ExecutingTask) BatchQueries ¶
func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([][]string, error)
Get the next `num` batch queries that the batcher will run starting at time `start`.
func (*ExecutingTask) EDot ¶ added in v0.2.3
func (et *ExecutingTask) EDot(labels bool) []byte
Return a graphviz .dot formatted byte array. Label edges with relavant execution information.
func (*ExecutingTask) ExecutionStats ¶ added in v0.11.0
func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error)
func (*ExecutingTask) GetOutput ¶
func (et *ExecutingTask) GetOutput(name string) (Output, error)
Get a named output.
func (*ExecutingTask) Snapshot ¶ added in v0.10.0
func (et *ExecutingTask) Snapshot() (*TaskSnapshot, error)
func (*ExecutingTask) StartBatching ¶
func (et *ExecutingTask) StartBatching() error
Instruct source batch node to start querying and sending batches of data
func (*ExecutingTask) StopStats ¶ added in v0.11.0
func (et *ExecutingTask) StopStats()
Stop all stats nodes
func (*ExecutingTask) Wait ¶ added in v0.11.0
func (et *ExecutingTask) Wait() error
Wait till the task finishes and return any error
type ExecutionStats ¶ added in v0.11.0
type GroupByNode ¶
type GroupByNode struct {
// contains filtered or unexported fields
}
type HTTPOutNode ¶
type HTTPOutNode struct {
// contains filtered or unexported fields
}
func (*HTTPOutNode) Endpoint ¶
func (h *HTTPOutNode) Endpoint() string
type InfluxDBOutNode ¶
type InfluxDBOutNode struct {
// contains filtered or unexported fields
}
type InfluxQLNode ¶ added in v0.11.0
type InfluxQLNode struct {
// contains filtered or unexported fields
}
type MaxDuration ¶ added in v0.11.0
type MaxDuration struct {
// contains filtered or unexported fields
}
MaxDuration is a 64-bit int variable representing a duration in nanoseconds,that satisfies the expvar.Var interface. When setting a value it will only be set if it is greater than the current value.
func (*MaxDuration) IntValue ¶ added in v0.11.0
func (v *MaxDuration) IntValue() int64
func (*MaxDuration) Set ¶ added in v0.11.0
func (v *MaxDuration) Set(next int64)
Set sets value if it is greater than current value. If set was successful and a setter exists, will pass on value to setter.
func (*MaxDuration) String ¶ added in v0.11.0
func (v *MaxDuration) String() string
type Node ¶
type Node interface { pipeline.Node // wait for the node to finish processing and return any errors Wait() error // contains filtered or unexported methods }
A node that can be in an executor.
type Output ¶
type Output interface {
Endpoint() string
}
An output of a pipeline. Still need to improve this interface to expose different types of outputs.
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
func (*Query) Dimensions ¶
Set the dimensions on the query
func (*Query) Fill ¶
func (q *Query) Fill(option influxql.FillOption, value interface{})
type ReduceFunc ¶
type ReduceNode ¶
type ReduceNode struct {
// contains filtered or unexported fields
}
type Replay ¶
Replay engine that can replay static data sets against a specific executor and its tasks.
func (*Replay) ReplayBatch ¶
func (r *Replay) ReplayBatch(data []io.ReadCloser, batches []BatchCollector, recTime bool) <-chan error
Replay a data set against an executor. If source time is true then the replay will use the times stored in the recording instead of the clock time.
func (*Replay) ReplayStream ¶
func (r *Replay) ReplayStream(data io.ReadCloser, stream StreamCollector, recTime bool, precision string) <-chan error
Replay a data set against an executor.
type SampleNode ¶
type SampleNode struct {
// contains filtered or unexported fields
}
type ShiftNode ¶ added in v0.11.0
type ShiftNode struct {
// contains filtered or unexported fields
}
type SourceBatchNode ¶
type SourceBatchNode struct {
// contains filtered or unexported fields
}
func (*SourceBatchNode) Abort ¶ added in v0.10.1
func (s *SourceBatchNode) Abort()
func (*SourceBatchNode) Count ¶
func (s *SourceBatchNode) Count() int
func (*SourceBatchNode) DBRPs ¶
func (s *SourceBatchNode) DBRPs() ([]DBRP, error)
Return list of databases and retention policies the batcher will query.
func (*SourceBatchNode) Queries ¶
func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string
func (*SourceBatchNode) Start ¶
func (s *SourceBatchNode) Start()
func (*SourceBatchNode) Wait ¶ added in v0.11.0
func (s *SourceBatchNode) Wait() error
type SourceStreamNode ¶ added in v0.11.0
type SourceStreamNode struct {
// contains filtered or unexported fields
}
type StatsData ¶ added in v0.2.1
type StatsData struct { Name string `json:"name"` Tags map[string]string `json:"tags"` Values map[string]interface{} `json:"values"` }
func GetStatsData ¶
Return all stats data from the expvars.
type StatsNode ¶ added in v0.10.0
type StatsNode struct {
// contains filtered or unexported fields
}
type StreamCollector ¶
type StreamNode ¶
type StreamNode struct {
// contains filtered or unexported fields
}
type Task ¶
type Task struct { Name string Pipeline *pipeline.Pipeline Type TaskType DBRPs []DBRP SnapshotInterval time.Duration }
The complete definition of a task, its name, pipeline and type.
type TaskMaster ¶
type TaskMaster struct { HTTPDService interface { AddRoutes([]httpd.Route) error DelRoutes([]httpd.Route) URL() string } TaskStore interface { SaveSnapshot(name string, snapshot *TaskSnapshot) error HasSnapshot(name string) bool LoadSnapshot(name string) (*TaskSnapshot, error) } DeadmanService pipeline.DeadmanService UDFService UDFService InfluxDBService interface { NewDefaultClient() (client.Client, error) NewNamedClient(name string) (client.Client, error) } SMTPService interface { Global() bool StateChangesOnly() bool SendMail(to []string, subject string, msg string) error } OpsGenieService interface { Global() bool Alert(teams []string, recipients []string, messageType, message, entityID string, t time.Time, details interface{}) error } VictorOpsService interface { Global() bool Alert(routingKey, messageType, message, entityID string, t time.Time, extra interface{}) error } PagerDutyService interface { Global() bool Alert(incidentKey, desc string, level AlertLevel, details interface{}) error } SlackService interface { Global() bool StateChangesOnly() bool Alert(channel, message string, level AlertLevel) error } HipChatService interface { Global() bool StateChangesOnly() bool Alert(room, token, message string, level AlertLevel) error } AlertaService interface { Alert(token, resource, event, environment, severity, status, group, value, message, origin string, service []string, data interface{}) error } SensuService interface { Alert(name, output string, level AlertLevel) error } TalkService interface { Alert(title, text string) error } TimingService interface { NewTimer(timer.Setter) timer.Timer } LogService LogService // contains filtered or unexported fields }
An execution framework for a set of tasks.
func NewTaskMaster ¶
func NewTaskMaster(l LogService) *TaskMaster
Create a new Executor with a given clock.
func (*TaskMaster) BatchCollectors ¶
func (tm *TaskMaster) BatchCollectors(name string) []BatchCollector
func (*TaskMaster) Close ¶
func (tm *TaskMaster) Close() error
func (*TaskMaster) CreateTICKScope ¶ added in v0.10.0
func (tm *TaskMaster) CreateTICKScope() *tick.Scope
func (*TaskMaster) DelFork ¶
func (tm *TaskMaster) DelFork(name string)
func (*TaskMaster) Drain ¶ added in v0.2.1
func (tm *TaskMaster) Drain()
func (*TaskMaster) ExecutingDot ¶ added in v0.2.3
func (tm *TaskMaster) ExecutingDot(name string, labels bool) string
func (*TaskMaster) ExecutionStats ¶ added in v0.11.0
func (tm *TaskMaster) ExecutionStats(name string) (ExecutionStats, error)
func (*TaskMaster) IsExecuting ¶ added in v0.2.1
func (tm *TaskMaster) IsExecuting(name string) bool
func (*TaskMaster) New ¶
func (tm *TaskMaster) New() *TaskMaster
Returns a new TaskMaster instance with the same services as the current one.
func (*TaskMaster) NewFork ¶
func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) (*Edge, error)
func (*TaskMaster) NewTask ¶ added in v0.10.0
func (tm *TaskMaster) NewTask( name, script string, tt TaskType, dbrps []DBRP, snapshotInterval time.Duration, ) (*Task, error)
Create a new task in the context of a TaskMaster
func (*TaskMaster) Open ¶
func (tm *TaskMaster) Open() (err error)
func (*TaskMaster) SnapshotTask ¶ added in v0.10.0
func (tm *TaskMaster) SnapshotTask(name string) (*TaskSnapshot, error)
func (*TaskMaster) StartTask ¶
func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)
func (*TaskMaster) StopTask ¶
func (tm *TaskMaster) StopTask(name string) error
func (*TaskMaster) StopTasks ¶ added in v0.10.1
func (tm *TaskMaster) StopTasks()
func (*TaskMaster) Stream ¶
func (tm *TaskMaster) Stream(name string) (StreamCollector, error)
func (*TaskMaster) WritePoints ¶ added in v0.2.1
func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, points []imodels.Point) error
type TaskSnapshot ¶ added in v0.10.0
type UDFNode ¶ added in v0.10.0
type UDFNode struct {
// contains filtered or unexported fields
}
User defined function
type UDFProcess ¶ added in v0.10.0
type UDFProcess struct { PointIn chan<- models.Point BatchIn chan<- models.Batch PointOut <-chan models.Point BatchOut <-chan models.Batch // contains filtered or unexported fields }
Wraps an external process and sends and receives data over STDIN and STDOUT. Lines received over STDERR are logged via normal Kapacitor logging.
Once a UDFProcess is created and started the owner can send points or batches to the subprocess by writing them to the PointIn or BatchIn channels respectively, and according to the type of process created.
The UDFProcess may be Aborted at anytime for various reasons. It is the owner's responsibility via the abortCallback to stop writing to the *In channels since no more selects on the channels will be performed.
Calling Stop on the process should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.
Callling Info returns information about available options the process has.
Calling Init is required to process data. The behavior is undefined if you send points/batches to the process without calling Init.
func NewUDFProcess ¶ added in v0.10.0
func (*UDFProcess) Abort ¶ added in v0.10.0
func (p *UDFProcess) Abort(err error)
Abort the process. Data in-flight will not be processed.
func (*UDFProcess) Info ¶ added in v0.10.0
func (p *UDFProcess) Info() (UDFProcessInfo, error)
Get information about the process, available options etc. Info need not be called every time a process is started.
func (*UDFProcess) Init ¶ added in v0.10.0
func (p *UDFProcess) Init(options []*udf.Option) error
Initialize the process with a set of Options. Calling Init is required even if you do not have any specific Options, just pass nil
func (*UDFProcess) Restore ¶ added in v0.10.0
func (p *UDFProcess) Restore(snapshot []byte) error
Request to restore a snapshot.
func (*UDFProcess) Snapshot ¶ added in v0.10.0
func (p *UDFProcess) Snapshot() ([]byte, error)
Request a snapshot from the process.
func (*UDFProcess) Stop ¶ added in v0.10.0
func (p *UDFProcess) Stop() error
Stop the UDFProcess cleanly.
Calling Stop should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.
type UDFProcessInfo ¶ added in v0.10.0
type UDFService ¶ added in v0.10.0
type UDFService interface { FunctionList() []string FunctionInfo(name string) (UDFProcessInfo, bool) }
type WindowNode ¶
type WindowNode struct {
// contains filtered or unexported fields
}
Source Files ¶
- alert.go
- batch.go
- derivative.go
- doc.go
- edge.go
- eval.go
- expr.go
- functions.go
- global_stats.go
- group_by.go
- http_out.go
- influxdb_out.go
- influxql.gen.go
- influxql.go
- join.go
- log.go
- map_reduce.go
- node.go
- noop.go
- output.go
- query.go
- replay.go
- result.go
- sample.go
- shift.go
- stats.go
- stream.go
- task.go
- task_master.go
- udf.go
- udf_process.go
- union.go
- where.go
- window.go
Directories ¶
Path | Synopsis |
---|---|
A clock that provides blocking calls that wait until absolute times have occurred.
|
A clock that provides blocking calls that wait until absolute times have occurred. |
cmd
|
|
This package is a fork of the golang expvar expvar.Var types.
|
This package is a fork of the golang expvar expvar.Var types. |
Contains integration and end-to-end tests
|
Contains integration and end-to-end tests |
Provides a set of structures for passing data around Kapacitor.
|
Provides a set of structures for passing data around Kapacitor. |
Provides an API for constructing data processing pipelines.
|
Provides an API for constructing data processing pipelines. |
services
|
|
httpd
Provides an HTTP API exposing many components of Kapacitor.
|
Provides an HTTP API exposing many components of Kapacitor. |
reporting
Sends anonymous reports to InfluxData
|
Sends anonymous reports to InfluxData |
stats
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
|
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy. |
TICKscript is a simple invocation chaining DSL.
|
TICKscript is a simple invocation chaining DSL. |
cmd/tickdoc
Tickdoc is a simple utility similiar to godoc that generates documentation from comments.
|
Tickdoc is a simple utility similiar to godoc that generates documentation from comments. |
Package udf is a generated protocol buffer package.
|
Package udf is a generated protocol buffer package. |
Provides an io.Writer that filters log messages based on a log level.
|
Provides an io.Writer that filters log messages based on a log level. |