Documentation ¶
Overview ¶
A data pipeline processing engine.
See the README for more complete examples and guides.
Code Organization:
The pipeline package provides an API for how nodes can be connected to form a pipeline. The individual implementations of each node exist in this kapacitor package. The reason for the separation is to keep the exported API from the pipeline package clean as it is consumed via the TICKscripts (a DSL for Kapacitor).
Other Concepts:
Stream vs Batch -- Use of the word 'stream' indicates data arrives a single data point at a time. Use of the word 'batch' indicates data arrives in sets or batches or data points.
Task -- A task represents a concrete workload to perform. It consists of a pipeline and an identifying name. Basic CRUD operations can be performed on tasks.
Task Master -- Responsible for executing a task in a specific environment.
Replay -- Replays static datasets against tasks.
Index ¶
- Constants
- Variables
- func ConvertResultTimes(r *Result)
- func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool
- func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, ...) (bool, error)
- func ReplayBatchFromChan(clck clock.Clock, batches []<-chan edge.BufferedBatchMessage, ...) <-chan error
- func ReplayBatchFromIO(clck clock.Clock, data []io.ReadCloser, collectors []BatchCollector, ...) <-chan error
- func ReplayStreamFromChan(clck clock.Clock, points <-chan edge.PointMessage, collector StreamCollector, ...) <-chan error
- func ReplayStreamFromIO(clck clock.Clock, data io.ReadCloser, collector StreamCollector, recTime bool, ...) <-chan error
- func WriteBatchForRecording(w io.Writer, b edge.BufferedBatchMessage) error
- func WritePointForRecording(w io.Writer, p edge.PointMessage, precision string) error
- type AlertNode
- type AutoscaleNode
- type BarrierNode
- type BatchCollector
- type BatchNode
- type BatchQueries
- type ChangeDetectNode
- type CombineNode
- type DBRP
- type DefaultNode
- func (n *DefaultNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *DefaultNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *DefaultNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *DefaultNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *DefaultNode) Done()
- func (n *DefaultNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *DefaultNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *DefaultNode) Wait() error
- type DeleteNode
- func (n *DeleteNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *DeleteNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *DeleteNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *DeleteNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *DeleteNode) Done()
- func (n *DeleteNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *DeleteNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *DeleteNode) Wait() error
- type DerivativeNode
- type Diagnostic
- type Edge
- type EdgeDiagnostic
- type EvalNode
- type ExecutingTask
- func (et *ExecutingTask) BatchCount() (int, error)
- func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([]BatchQueries, error)
- func (et *ExecutingTask) EDot(labels bool) []byte
- func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error)
- func (et *ExecutingTask) GetOutput(name string) (Output, error)
- func (et *ExecutingTask) Snapshot() (*TaskSnapshot, error)
- func (et *ExecutingTask) StartBatching() error
- func (et *ExecutingTask) StopStats()
- func (et *ExecutingTask) Wait() error
- type ExecutionStats
- type FlattenNode
- type FromNode
- func (n *FromNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *FromNode) BatchPoint(edge.BatchPointMessage) (edge.Message, error)
- func (n *FromNode) BeginBatch(edge.BeginBatchMessage) (edge.Message, error)
- func (n *FromNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *FromNode) Done()
- func (n *FromNode) EndBatch(edge.EndBatchMessage) (edge.Message, error)
- func (n *FromNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *FromNode) Wait() error
- type GroupByNode
- func (n *GroupByNode) Barrier(b edge.BarrierMessage) error
- func (n *GroupByNode) BatchPoint(bp edge.BatchPointMessage) error
- func (n *GroupByNode) BeginBatch(begin edge.BeginBatchMessage) error
- func (n *GroupByNode) DeleteGroup(d edge.DeleteGroupMessage) error
- func (n *GroupByNode) Done()
- func (n *GroupByNode) EndBatch(end edge.EndBatchMessage) error
- func (n *GroupByNode) Point(p edge.PointMessage) error
- func (n *GroupByNode) Wait() error
- type HTTPOutNode
- type HTTPPostNode
- type InfluxDBOutNode
- func (n *InfluxDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) Done()
- func (n *InfluxDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *InfluxDBOutNode) Wait() error
- type InfluxQLNode
- type JoinNode
- type KapacitorLoopbackNode
- func (n *KapacitorLoopbackNode) Barrier(edge.BarrierMessage) error
- func (n *KapacitorLoopbackNode) BatchPoint(bp edge.BatchPointMessage) error
- func (n *KapacitorLoopbackNode) BeginBatch(begin edge.BeginBatchMessage) error
- func (n *KapacitorLoopbackNode) DeleteGroup(edge.DeleteGroupMessage) error
- func (n *KapacitorLoopbackNode) Done()
- func (n *KapacitorLoopbackNode) EndBatch(edge.EndBatchMessage) error
- func (n *KapacitorLoopbackNode) Point(p edge.PointMessage) error
- func (n *KapacitorLoopbackNode) Wait() error
- type LogNode
- func (n *LogNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *LogNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *LogNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *LogNode) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error)
- func (n *LogNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *LogNode) Done()
- func (n *LogNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *LogNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *LogNode) Wait() error
- type LogService
- type MaxDuration
- type NoOpNode
- type Node
- type NodeDiagnostic
- type NoopMetaClient
- func (m *NoopMetaClient) Authenticate(username, password string) (ui *meta.UserInfo, err error)
- func (m *NoopMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error)
- func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
- func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
- func (m *NoopMetaClient) Database(name string) *meta.DatabaseInfo
- func (m *NoopMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
- func (m *NoopMetaClient) Users() ([]meta.UserInfo, error)
- func (m *NoopMetaClient) WaitForLeader(d time.Duration) error
- type Output
- type Query
- func (q *Query) AlignGroup()
- func (q *Query) Clone() (*Query, error)
- func (q *Query) DBRPs() ([]DBRP, error)
- func (q *Query) Dimensions(dims []interface{}) error
- func (q *Query) Fill(option influxql.FillOption, value interface{})
- func (q *Query) IsGroupedByTime() bool
- func (q *Query) SetStartTime(s time.Time)
- func (q *Query) SetStopTime(s time.Time)
- func (q *Query) StartTime() time.Time
- func (q *Query) StopTime() time.Time
- func (q *Query) String() string
- type QueryNode
- type Result
- type SampleNode
- type ShiftNode
- func (n *ShiftNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *ShiftNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *ShiftNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *ShiftNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *ShiftNode) Done()
- func (n *ShiftNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *ShiftNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *ShiftNode) Wait() error
- type SideloadNode
- func (n *SideloadNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
- func (n *SideloadNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
- func (n *SideloadNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
- func (n *SideloadNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
- func (n *SideloadNode) Done()
- func (n *SideloadNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
- func (n *SideloadNode) Point(p edge.PointMessage) (edge.Message, error)
- func (n *SideloadNode) Wait() error
- type Socket
- type StateTrackingNode
- type StatsNode
- type StreamCollector
- type StreamEdge
- type StreamNode
- type Task
- type TaskDiagnostic
- type TaskMaster
- func (tm *TaskMaster) BatchCollectors(id string) []BatchCollector
- func (tm *TaskMaster) Close() error
- func (tm *TaskMaster) CreateTICKScope() *stateful.Scope
- func (tm *TaskMaster) DelFork(id string)
- func (tm *TaskMaster) DeleteTask(id string) error
- func (tm *TaskMaster) Drain()
- func (tm *TaskMaster) ExecutingDot(id string, labels bool) string
- func (tm *TaskMaster) ExecutionStats(id string) (ExecutionStats, error)
- func (tm *TaskMaster) ID() string
- func (tm *TaskMaster) IsExecuting(id string) bool
- func (tm *TaskMaster) New(id string) *TaskMaster
- func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP, measurements []string) (edge.StatsEdge, error)
- func (tm *TaskMaster) NewTask(id, script string, tt TaskType, dbrps []DBRP, snapshotInterval time.Duration, ...) (*Task, error)
- func (tm *TaskMaster) NewTemplate(id, script string, tt TaskType) (*Template, error)
- func (tm *TaskMaster) Open() (err error)
- func (tm *TaskMaster) SnapshotTask(id string) (*TaskSnapshot, error)
- func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)
- func (tm *TaskMaster) StopTask(id string) error
- func (tm *TaskMaster) StopTasks()
- func (tm *TaskMaster) Stream(name string) (StreamCollector, error)
- func (tm *TaskMaster) WriteKapacitorPoint(p edge.PointMessage) error
- func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, ...) error
- type TaskMasterLookup
- type TaskSnapshot
- type TaskType
- type Template
- type TimeDimension
- type UDFNode
- type UDFProcess
- func (p *UDFProcess) Abort(err error)
- func (p *UDFProcess) Close() error
- func (p *UDFProcess) In() chan<- edge.Message
- func (p *UDFProcess) Info() (udf.Info, error)
- func (p *UDFProcess) Init(options []*agent.Option) error
- func (p *UDFProcess) Open() error
- func (p *UDFProcess) Out() <-chan edge.Message
- func (p *UDFProcess) Restore(snapshot []byte) error
- func (p *UDFProcess) Snapshot() ([]byte, error)
- type UDFService
- type UDFSocket
- func (s *UDFSocket) Abort(err error)
- func (s *UDFSocket) Close() error
- func (s *UDFSocket) In() chan<- edge.Message
- func (s *UDFSocket) Info() (udf.Info, error)
- func (s *UDFSocket) Init(options []*agent.Option) error
- func (s *UDFSocket) Open() error
- func (s *UDFSocket) Out() <-chan edge.Message
- func (s *UDFSocket) Restore(snapshot []byte) error
- func (s *UDFSocket) Snapshot() ([]byte, error)
- type UnionNode
- type WhereNode
- type WindowNode
Constants ¶
const (
MainTaskMaster = "main"
)
Variables ¶
var ErrAborted = errors.New("edged aborted")
var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
var ErrTaskMasterOpen = errors.New("TaskMaster is open")
var ErrWrongTaskType = errors.New("wrong task type")
Functions ¶
func ConvertResultTimes ¶ added in v0.10.1
func ConvertResultTimes(r *Result)
func CreateDBRPMap ¶
func EvalPredicate ¶
func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, p edge.FieldsTagsTimeGetter) (bool, error)
EvalPredicate - Evaluate a given expression as a boolean predicate against a set of fields and tags
func ReplayBatchFromChan ¶ added in v1.0.0
func ReplayBatchFromChan(clck clock.Clock, batches []<-chan edge.BufferedBatchMessage, collectors []BatchCollector, recTime bool) <-chan error
Replay batch data from a channel source.
func ReplayBatchFromIO ¶ added in v1.0.0
func ReplayBatchFromIO(clck clock.Clock, data []io.ReadCloser, collectors []BatchCollector, recTime bool) <-chan error
Replay batch data from an IO source.
func ReplayStreamFromChan ¶ added in v1.0.0
func ReplayStreamFromChan(clck clock.Clock, points <-chan edge.PointMessage, collector StreamCollector, recTime bool) <-chan error
Replay stream data from a channel source.
func ReplayStreamFromIO ¶ added in v1.0.0
func ReplayStreamFromIO(clck clock.Clock, data io.ReadCloser, collector StreamCollector, recTime bool, precision string) <-chan error
Replay stream data from an IO source.
func WriteBatchForRecording ¶
func WriteBatchForRecording(w io.Writer, b edge.BufferedBatchMessage) error
func WritePointForRecording ¶
Types ¶
type AlertNode ¶
type AlertNode struct {
// contains filtered or unexported fields
}
type AutoscaleNode ¶ added in v1.4.0
type AutoscaleNode struct {
// contains filtered or unexported fields
}
type BarrierNode ¶ added in v1.4.0
type BarrierNode struct {
// contains filtered or unexported fields
}
type BatchCollector ¶
type BatchCollector interface { CollectBatch(edge.BufferedBatchMessage) error Close() error }
type BatchNode ¶
type BatchNode struct {
// contains filtered or unexported fields
}
type BatchQueries ¶ added in v1.0.0
type ChangeDetectNode ¶ added in v1.5.0
type ChangeDetectNode struct {
// contains filtered or unexported fields
}
type CombineNode ¶ added in v1.0.0
type CombineNode struct {
// contains filtered or unexported fields
}
type DefaultNode ¶ added in v0.13.0
type DefaultNode struct {
// contains filtered or unexported fields
}
func (*DefaultNode) Barrier ¶ added in v1.4.0
func (n *DefaultNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
func (*DefaultNode) BatchPoint ¶ added in v1.4.0
func (n *DefaultNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
func (*DefaultNode) BeginBatch ¶ added in v1.4.0
func (n *DefaultNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
func (*DefaultNode) DeleteGroup ¶ added in v1.4.0
func (n *DefaultNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
func (*DefaultNode) Done ¶ added in v1.5.0
func (n *DefaultNode) Done()
func (*DefaultNode) EndBatch ¶ added in v1.4.0
func (n *DefaultNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
func (*DefaultNode) Point ¶ added in v1.4.0
func (n *DefaultNode) Point(p edge.PointMessage) (edge.Message, error)
type DeleteNode ¶ added in v1.0.0
type DeleteNode struct {
// contains filtered or unexported fields
}
func (*DeleteNode) Barrier ¶ added in v1.4.0
func (n *DeleteNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
func (*DeleteNode) BatchPoint ¶ added in v1.4.0
func (n *DeleteNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
func (*DeleteNode) BeginBatch ¶ added in v1.4.0
func (n *DeleteNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
func (*DeleteNode) DeleteGroup ¶ added in v1.4.0
func (n *DeleteNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
func (*DeleteNode) Done ¶ added in v1.5.0
func (n *DeleteNode) Done()
func (*DeleteNode) EndBatch ¶ added in v1.4.0
func (n *DeleteNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
func (*DeleteNode) Point ¶ added in v1.4.0
func (n *DeleteNode) Point(p edge.PointMessage) (edge.Message, error)
type DerivativeNode ¶
type DerivativeNode struct {
// contains filtered or unexported fields
}
type Diagnostic ¶ added in v1.4.0
type Diagnostic interface { WithTaskContext(task string) TaskDiagnostic WithTaskMasterContext(tm string) Diagnostic WithNodeContext(node string) NodeDiagnostic WithEdgeContext(task, parent, child string) EdgeDiagnostic TaskMasterOpened() TaskMasterClosed() StartingTask(id string) StartedTask(id string) StoppedTask(id string) StoppedTaskWithError(id string, err error) TaskMasterDot(d string) }
type EdgeDiagnostic ¶ added in v1.4.0
type EdgeDiagnostic interface {
ClosingEdge(collected, emitted int64)
}
type EvalNode ¶
type EvalNode struct {
// contains filtered or unexported fields
}
type ExecutingTask ¶
type ExecutingTask struct { Task *Task // contains filtered or unexported fields }
A task that is ready for execution.
func NewExecutingTask ¶
func NewExecutingTask(tm *TaskMaster, t *Task) (*ExecutingTask, error)
Create a new task from a defined kapacitor.
func (*ExecutingTask) BatchCount ¶
func (et *ExecutingTask) BatchCount() (int, error)
func (*ExecutingTask) BatchQueries ¶
func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([]BatchQueries, error)
Get the next `num` batch queries that the batcher will run starting at time `start`.
func (*ExecutingTask) EDot ¶ added in v0.2.3
func (et *ExecutingTask) EDot(labels bool) []byte
Return a graphviz .dot formatted byte array. Label edges with relavant execution information.
func (*ExecutingTask) ExecutionStats ¶ added in v0.11.0
func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error)
func (*ExecutingTask) GetOutput ¶
func (et *ExecutingTask) GetOutput(name string) (Output, error)
Get a named output.
func (*ExecutingTask) Snapshot ¶ added in v0.10.0
func (et *ExecutingTask) Snapshot() (*TaskSnapshot, error)
func (*ExecutingTask) StartBatching ¶
func (et *ExecutingTask) StartBatching() error
Instruct source batch node to start querying and sending batches of data
func (*ExecutingTask) StopStats ¶ added in v0.11.0
func (et *ExecutingTask) StopStats()
Stop all stats nodes
func (*ExecutingTask) Wait ¶ added in v0.11.0
func (et *ExecutingTask) Wait() error
Wait till the task finishes and return any error
type ExecutionStats ¶ added in v0.11.0
type FlattenNode ¶ added in v1.0.0
type FlattenNode struct {
// contains filtered or unexported fields
}
type FromNode ¶ added in v0.13.0
type FromNode struct {
// contains filtered or unexported fields
}
func (*FromNode) BatchPoint ¶ added in v1.4.0
func (*FromNode) BeginBatch ¶ added in v1.4.0
func (*FromNode) DeleteGroup ¶ added in v1.4.0
type GroupByNode ¶
type GroupByNode struct {
// contains filtered or unexported fields
}
func (*GroupByNode) Barrier ¶ added in v1.4.0
func (n *GroupByNode) Barrier(b edge.BarrierMessage) error
func (*GroupByNode) BatchPoint ¶ added in v1.4.0
func (n *GroupByNode) BatchPoint(bp edge.BatchPointMessage) error
func (*GroupByNode) BeginBatch ¶ added in v1.4.0
func (n *GroupByNode) BeginBatch(begin edge.BeginBatchMessage) error
func (*GroupByNode) DeleteGroup ¶ added in v1.4.0
func (n *GroupByNode) DeleteGroup(d edge.DeleteGroupMessage) error
func (*GroupByNode) Done ¶ added in v1.5.0
func (n *GroupByNode) Done()
func (*GroupByNode) EndBatch ¶ added in v1.4.0
func (n *GroupByNode) EndBatch(end edge.EndBatchMessage) error
func (*GroupByNode) Point ¶ added in v1.4.0
func (n *GroupByNode) Point(p edge.PointMessage) error
type HTTPOutNode ¶
type HTTPOutNode struct {
// contains filtered or unexported fields
}
func (*HTTPOutNode) Endpoint ¶
func (n *HTTPOutNode) Endpoint() string
type HTTPPostNode ¶ added in v1.3.0
type HTTPPostNode struct {
// contains filtered or unexported fields
}
type InfluxDBOutNode ¶
type InfluxDBOutNode struct {
// contains filtered or unexported fields
}
func (*InfluxDBOutNode) Barrier ¶ added in v1.4.0
func (n *InfluxDBOutNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
func (*InfluxDBOutNode) BatchPoint ¶ added in v1.4.0
func (n *InfluxDBOutNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
func (*InfluxDBOutNode) BeginBatch ¶ added in v1.4.0
func (n *InfluxDBOutNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
func (*InfluxDBOutNode) BufferedBatch ¶ added in v1.4.0
func (n *InfluxDBOutNode) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error)
func (*InfluxDBOutNode) DeleteGroup ¶ added in v1.4.0
func (n *InfluxDBOutNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
func (*InfluxDBOutNode) Done ¶ added in v1.5.0
func (n *InfluxDBOutNode) Done()
func (*InfluxDBOutNode) EndBatch ¶ added in v1.4.0
func (n *InfluxDBOutNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
func (*InfluxDBOutNode) Point ¶ added in v1.4.0
func (n *InfluxDBOutNode) Point(p edge.PointMessage) (edge.Message, error)
type InfluxQLNode ¶ added in v0.11.0
type InfluxQLNode struct {
// contains filtered or unexported fields
}
type JoinNode ¶
type JoinNode struct {
// contains filtered or unexported fields
}
func (*JoinNode) Barrier ¶ added in v1.4.0
func (n *JoinNode) Barrier(src int, b edge.BarrierMessage) error
func (*JoinNode) BufferedBatch ¶ added in v1.4.0
func (n *JoinNode) BufferedBatch(src int, batch edge.BufferedBatchMessage) error
type KapacitorLoopbackNode ¶ added in v1.3.0
type KapacitorLoopbackNode struct {
// contains filtered or unexported fields
}
func (*KapacitorLoopbackNode) Barrier ¶ added in v1.4.0
func (n *KapacitorLoopbackNode) Barrier(edge.BarrierMessage) error
func (*KapacitorLoopbackNode) BatchPoint ¶ added in v1.4.0
func (n *KapacitorLoopbackNode) BatchPoint(bp edge.BatchPointMessage) error
func (*KapacitorLoopbackNode) BeginBatch ¶ added in v1.4.0
func (n *KapacitorLoopbackNode) BeginBatch(begin edge.BeginBatchMessage) error
func (*KapacitorLoopbackNode) DeleteGroup ¶ added in v1.4.0
func (n *KapacitorLoopbackNode) DeleteGroup(edge.DeleteGroupMessage) error
func (*KapacitorLoopbackNode) Done ¶ added in v1.5.0
func (n *KapacitorLoopbackNode) Done()
func (*KapacitorLoopbackNode) EndBatch ¶ added in v1.4.0
func (n *KapacitorLoopbackNode) EndBatch(edge.EndBatchMessage) error
func (*KapacitorLoopbackNode) Point ¶ added in v1.4.0
func (n *KapacitorLoopbackNode) Point(p edge.PointMessage) error
type LogNode ¶ added in v0.11.0
type LogNode struct {
// contains filtered or unexported fields
}
func (*LogNode) BatchPoint ¶ added in v1.4.0
func (*LogNode) BeginBatch ¶ added in v1.4.0
func (*LogNode) BufferedBatch ¶ added in v1.4.0
func (*LogNode) DeleteGroup ¶ added in v1.4.0
type MaxDuration ¶ added in v0.11.0
type MaxDuration struct {
// contains filtered or unexported fields
}
MaxDuration is a 64-bit int variable representing a duration in nanoseconds,that satisfies the expvar.Var interface. When setting a value it will only be set if it is greater than the current value.
func (*MaxDuration) IntValue ¶ added in v0.11.0
func (v *MaxDuration) IntValue() int64
func (*MaxDuration) Set ¶ added in v0.11.0
func (v *MaxDuration) Set(next int64)
Set sets value if it is greater than current value. If set was successful and a setter exists, will pass on value to setter.
func (*MaxDuration) String ¶ added in v0.11.0
func (v *MaxDuration) String() string
func (*MaxDuration) StringValue ¶ added in v1.0.0
func (v *MaxDuration) StringValue() string
type NoOpNode ¶ added in v0.11.0
type NoOpNode struct {
// contains filtered or unexported fields
}
type Node ¶
type Node interface { pipeline.Node // wait for the node to finish processing and return any errors Wait() error // contains filtered or unexported methods }
A node that can be in an executor.
type NodeDiagnostic ¶ added in v1.4.0
type NodeDiagnostic interface { Error(msg string, err error, ctx ...keyvalue.T) // AlertNode AlertTriggered(level alert.Level, id string, message string, rows *models.Row) // AutoscaleNode SettingReplicas(new int, old int, id string) // QueryNode StartingBatchQuery(q string) // LogNode LogPointData(key, prefix string, data edge.PointMessage) LogBatchData(key, prefix string, data edge.BufferedBatchMessage) //UDF UDFLog(s string) }
type NoopMetaClient ¶ added in v1.0.0
type NoopMetaClient struct{}
func (*NoopMetaClient) Authenticate ¶ added in v1.0.0
func (m *NoopMetaClient) Authenticate(username, password string) (ui *meta.UserInfo, err error)
func (*NoopMetaClient) CreateDatabase ¶ added in v1.0.0
func (m *NoopMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error)
func (*NoopMetaClient) CreateDatabaseWithRetentionPolicy ¶ added in v1.0.0
func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
func (*NoopMetaClient) CreateRetentionPolicy ¶ added in v1.0.0
func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
func (*NoopMetaClient) Database ¶ added in v1.0.0
func (m *NoopMetaClient) Database(name string) *meta.DatabaseInfo
func (*NoopMetaClient) RetentionPolicy ¶ added in v1.0.0
func (m *NoopMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
func (*NoopMetaClient) Users ¶ added in v1.0.0
func (m *NoopMetaClient) Users() ([]meta.UserInfo, error)
func (*NoopMetaClient) WaitForLeader ¶ added in v1.0.0
func (m *NoopMetaClient) WaitForLeader(d time.Duration) error
type Output ¶
type Output interface {
Endpoint() string
}
An output of a pipeline. Still need to improve this interface to expose different types of outputs.
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
func (*Query) AlignGroup ¶ added in v1.2.0
func (q *Query) AlignGroup()
func (*Query) Dimensions ¶
Set the dimensions on the query
func (*Query) Fill ¶
func (q *Query) Fill(option influxql.FillOption, value interface{})
func (*Query) IsGroupedByTime ¶ added in v1.0.1
func (*Query) SetStartTime ¶ added in v1.0.1
Set the start time of the query
func (*Query) SetStopTime ¶ added in v1.0.1
Set the stop time of the query
type QueryNode ¶ added in v0.13.0
type QueryNode struct {
// contains filtered or unexported fields
}
func (*QueryNode) DBRPs ¶ added in v0.13.0
Return list of databases and retention policies the batcher will query.
func (*QueryNode) GroupByMeasurement ¶ added in v1.0.0
type Result ¶
The result from an output.
func ResultFromJSON ¶
Unmarshal a Result object from JSON.
type SampleNode ¶
type SampleNode struct {
// contains filtered or unexported fields
}
type ShiftNode ¶ added in v0.11.0
type ShiftNode struct {
// contains filtered or unexported fields
}
func (*ShiftNode) BatchPoint ¶ added in v1.4.0
func (*ShiftNode) BeginBatch ¶ added in v1.4.0
func (*ShiftNode) DeleteGroup ¶ added in v1.4.0
type SideloadNode ¶ added in v1.4.0
type SideloadNode struct {
// contains filtered or unexported fields
}
func (*SideloadNode) Barrier ¶ added in v1.4.0
func (n *SideloadNode) Barrier(b edge.BarrierMessage) (edge.Message, error)
func (*SideloadNode) BatchPoint ¶ added in v1.4.0
func (n *SideloadNode) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error)
func (*SideloadNode) BeginBatch ¶ added in v1.4.0
func (n *SideloadNode) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error)
func (*SideloadNode) DeleteGroup ¶ added in v1.4.0
func (n *SideloadNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error)
func (*SideloadNode) Done ¶ added in v1.5.0
func (n *SideloadNode) Done()
func (*SideloadNode) EndBatch ¶ added in v1.4.0
func (n *SideloadNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error)
func (*SideloadNode) Point ¶ added in v1.4.0
func (n *SideloadNode) Point(p edge.PointMessage) (edge.Message, error)
type Socket ¶ added in v0.13.0
func NewSocketConn ¶ added in v1.0.0
type StateTrackingNode ¶ added in v1.3.0
type StateTrackingNode struct {
// contains filtered or unexported fields
}
type StatsNode ¶ added in v0.10.0
type StatsNode struct {
// contains filtered or unexported fields
}
type StreamCollector ¶
type StreamCollector interface { CollectPoint(edge.PointMessage) error Close() error }
type StreamEdge ¶ added in v1.4.0
type StreamEdge interface { CollectPoint(edge.PointMessage) error EmitPoint() (edge.PointMessage, bool) Close() error }
type StreamNode ¶
type StreamNode struct {
// contains filtered or unexported fields
}
type Task ¶
type Task struct { ID string Pipeline *pipeline.Pipeline Type TaskType DBRPs []DBRP SnapshotInterval time.Duration }
The complete definition of a task, its id, pipeline and type.
func (*Task) Measurements ¶ added in v0.13.0
returns all the measurements from a FromNode
type TaskDiagnostic ¶ added in v1.4.0
type TaskMaster ¶
type TaskMaster struct { ServerInfo vars.Infoer HTTPDService interface { AddRoutes([]httpd.Route) error DelRoutes([]httpd.Route) URL() string } TaskStore interface { SaveSnapshot(id string, snapshot *TaskSnapshot) error HasSnapshot(id string) bool LoadSnapshot(id string) (*TaskSnapshot, error) } DeadmanService pipeline.DeadmanService UDFService UDFService AlertService interface { alertservice.AnonHandlerRegistrar alertservice.Events alertservice.TopicPersister alertservice.InhibitorLookup } InfluxDBService interface { NewNamedClient(name string) (influxdb.Client, error) } SMTPService interface { Global() bool StateChangesOnly() bool Handler(smtp.HandlerConfig, ...keyvalue.T) alert.Handler } MQTTService interface { Handler(mqtt.HandlerConfig, ...keyvalue.T) alert.Handler } OpsGenieService interface { Global() bool Handler(opsgenie.HandlerConfig, ...keyvalue.T) alert.Handler } OpsGenie2Service interface { Global() bool Handler(opsgenie2.HandlerConfig, ...keyvalue.T) alert.Handler } VictorOpsService interface { Global() bool Handler(victorops.HandlerConfig, ...keyvalue.T) alert.Handler } PagerDutyService interface { Global() bool Handler(pagerduty.HandlerConfig, ...keyvalue.T) alert.Handler } PagerDuty2Service interface { Global() bool Handler(pagerduty2.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } PushoverService interface { Handler(pushover.HandlerConfig, ...keyvalue.T) alert.Handler } HTTPPostService interface { Handler(httppost.HandlerConfig, ...keyvalue.T) alert.Handler Endpoint(string) (*httppost.Endpoint, bool) } SlackService interface { Global() bool StateChangesOnly() bool Handler(slack.HandlerConfig, ...keyvalue.T) alert.Handler } SNMPTrapService interface { Handler(snmptrap.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } TelegramService interface { Global() bool StateChangesOnly() bool Handler(telegram.HandlerConfig, ...keyvalue.T) alert.Handler } HipChatService interface { Global() bool StateChangesOnly() bool Handler(hipchat.HandlerConfig, ...keyvalue.T) alert.Handler } KafkaService interface { Handler(kafka.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } AlertaService interface { DefaultHandlerConfig() alerta.HandlerConfig Handler(alerta.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } SensuService interface { Handler(sensu.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } TalkService interface { Handler(...keyvalue.T) alert.Handler } TimingService interface { NewTimer(timer.Setter) timer.Timer } K8sService interface { Client(string) (k8s.Client, error) } SwarmService interface { Client(string) (swarm.Client, error) } EC2Service interface { Client(string) (ec2.Client, error) } SideloadService interface { Source(dir string) (sideload.Source, error) } Commander command.Commander DefaultRetentionPolicy string // contains filtered or unexported fields }
An execution framework for a set of tasks.
func NewTaskMaster ¶
func NewTaskMaster(id string, info vars.Infoer, d Diagnostic) *TaskMaster
Create a new Executor with a given clock.
func (*TaskMaster) BatchCollectors ¶
func (tm *TaskMaster) BatchCollectors(id string) []BatchCollector
func (*TaskMaster) Close ¶
func (tm *TaskMaster) Close() error
func (*TaskMaster) CreateTICKScope ¶ added in v0.10.0
func (tm *TaskMaster) CreateTICKScope() *stateful.Scope
func (*TaskMaster) DelFork ¶
func (tm *TaskMaster) DelFork(id string)
func (*TaskMaster) DeleteTask ¶ added in v1.2.0
func (tm *TaskMaster) DeleteTask(id string) error
func (*TaskMaster) Drain ¶ added in v0.2.1
func (tm *TaskMaster) Drain()
func (*TaskMaster) ExecutingDot ¶ added in v0.2.3
func (tm *TaskMaster) ExecutingDot(id string, labels bool) string
func (*TaskMaster) ExecutionStats ¶ added in v0.11.0
func (tm *TaskMaster) ExecutionStats(id string) (ExecutionStats, error)
func (*TaskMaster) ID ¶ added in v1.2.0
func (tm *TaskMaster) ID() string
func (*TaskMaster) IsExecuting ¶ added in v0.2.1
func (tm *TaskMaster) IsExecuting(id string) bool
func (*TaskMaster) New ¶
func (tm *TaskMaster) New(id string) *TaskMaster
Returns a new TaskMaster instance with the same services as the current one.
func (*TaskMaster) NewTask ¶ added in v0.10.0
func (tm *TaskMaster) NewTask( id, script string, tt TaskType, dbrps []DBRP, snapshotInterval time.Duration, vars map[string]tick.Var, ) (*Task, error)
Create a new task in the context of a TaskMaster
func (*TaskMaster) NewTemplate ¶ added in v1.0.0
func (tm *TaskMaster) NewTemplate( id, script string, tt TaskType, ) (*Template, error)
Create a new template in the context of a TaskMaster
func (*TaskMaster) Open ¶
func (tm *TaskMaster) Open() (err error)
func (*TaskMaster) SnapshotTask ¶ added in v0.10.0
func (tm *TaskMaster) SnapshotTask(id string) (*TaskSnapshot, error)
func (*TaskMaster) StartTask ¶
func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)
func (*TaskMaster) StopTask ¶
func (tm *TaskMaster) StopTask(id string) error
func (*TaskMaster) StopTasks ¶ added in v0.10.1
func (tm *TaskMaster) StopTasks()
func (*TaskMaster) Stream ¶
func (tm *TaskMaster) Stream(name string) (StreamCollector, error)
func (*TaskMaster) WriteKapacitorPoint ¶ added in v1.3.0
func (tm *TaskMaster) WriteKapacitorPoint(p edge.PointMessage) error
func (*TaskMaster) WritePoints ¶ added in v0.2.1
func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, points []imodels.Point) error
type TaskMasterLookup ¶ added in v1.0.0
func NewTaskMasterLookup ¶ added in v1.0.0
func NewTaskMasterLookup() *TaskMasterLookup
func (*TaskMasterLookup) Delete ¶ added in v1.0.0
func (tml *TaskMasterLookup) Delete(tm *TaskMaster)
func (*TaskMasterLookup) Get ¶ added in v1.0.0
func (tml *TaskMasterLookup) Get(id string) *TaskMaster
func (*TaskMasterLookup) Main ¶ added in v1.0.0
func (tml *TaskMasterLookup) Main() *TaskMaster
func (*TaskMasterLookup) Set ¶ added in v1.0.0
func (tml *TaskMasterLookup) Set(tm *TaskMaster)
type TaskSnapshot ¶ added in v0.10.0
type TaskType ¶
type TaskType int
The type of a task
func (TaskType) MarshalText ¶ added in v0.13.0
func (*TaskType) UnmarshalText ¶ added in v0.13.0
type TimeDimension ¶ added in v1.0.0
type UDFNode ¶ added in v0.10.0
type UDFNode struct {
// contains filtered or unexported fields
}
User defined function
type UDFProcess ¶ added in v0.10.0
type UDFProcess struct {
// contains filtered or unexported fields
}
UDFProcess wraps an external process and sends and receives data over STDIN and STDOUT. Lines received over STDERR are logged via normal Kapacitor logging.
func NewUDFProcess ¶ added in v0.10.0
func NewUDFProcess( taskName, nodeName string, commander command.Commander, cmdSpec command.Spec, d udf.Diagnostic, timeout time.Duration, abortCallback func(), ) *UDFProcess
func (*UDFProcess) Abort ¶ added in v0.10.0
func (p *UDFProcess) Abort(err error)
func (*UDFProcess) Close ¶ added in v0.13.0
func (p *UDFProcess) Close() error
Stop the UDFProcess cleanly.
Calling Close should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.
func (*UDFProcess) In ¶ added in v1.4.0
func (p *UDFProcess) In() chan<- edge.Message
func (*UDFProcess) Init ¶ added in v0.10.0
func (p *UDFProcess) Init(options []*agent.Option) error
func (*UDFProcess) Out ¶ added in v1.4.0
func (p *UDFProcess) Out() <-chan edge.Message
func (*UDFProcess) Restore ¶ added in v0.10.0
func (p *UDFProcess) Restore(snapshot []byte) error
func (*UDFProcess) Snapshot ¶ added in v0.10.0
func (p *UDFProcess) Snapshot() ([]byte, error)
type UDFService ¶ added in v0.10.0
type UDFSocket ¶ added in v0.13.0
type UDFSocket struct {
// contains filtered or unexported fields
}
func NewUDFSocket ¶ added in v0.13.0
type UnionNode ¶
type UnionNode struct {
// contains filtered or unexported fields
}
func (*UnionNode) Barrier ¶ added in v1.4.0
func (n *UnionNode) Barrier(src int, b edge.BarrierMessage) error
func (*UnionNode) BufferedBatch ¶ added in v1.4.0
func (n *UnionNode) BufferedBatch(src int, batch edge.BufferedBatchMessage) error
type WhereNode ¶
type WhereNode struct {
// contains filtered or unexported fields
}
type WindowNode ¶
type WindowNode struct {
// contains filtered or unexported fields
}
func (*WindowNode) DeleteGroup ¶ added in v1.4.0
func (n *WindowNode) DeleteGroup(group models.GroupID)
Source Files ¶
- alert.go
- autoscale.go
- barrier.go
- batch.go
- change_detect.go
- combine.go
- default.go
- delete.go
- derivative.go
- doc.go
- edge.go
- eval.go
- expr.go
- flatten.go
- group_by.go
- http_out.go
- http_post.go
- influxdb_out.go
- influxql.gen.go
- influxql.go
- join.go
- kapacitor_loopback.go
- log.go
- metaclient.go
- node.go
- noop.go
- output.go
- query.go
- replay.go
- result.go
- sample.go
- shift.go
- sideload.go
- state_tracking.go
- stats.go
- stream.go
- task.go
- task_master.go
- template.go
- udf.go
- union.go
- where.go
- window.go
Directories ¶
Path | Synopsis |
---|---|
Alert provides a framework for tracking events in a publish subscribe system.
|
Alert provides a framework for tracking events in a publish subscribe system. |
client
|
|
v1
Kapacitor HTTP API client written in Go
|
Kapacitor HTTP API client written in Go |
A clock that provides blocking calls that wait until absolute times have occurred.
|
A clock that provides blocking calls that wait until absolute times have occurred. |
cmd
|
|
Package edge provides mechanisms for message passing along edges.
|
Package edge provides mechanisms for message passing along edges. |
This package is a fork of the golang expvar expvar.Var types.
|
This package is a fork of the golang expvar expvar.Var types. |
Contains integration and end-to-end tests
|
Contains integration and end-to-end tests |
Provides a set of structures for passing data around Kapacitor.
|
Provides a set of structures for passing data around Kapacitor. |
Provides an API for constructing data processing pipelines.
|
Provides an API for constructing data processing pipelines. |
Provides a server type for starting and configuring a Kapacitor server.
|
Provides a server type for starting and configuring a Kapacitor server. |
services
|
|
alert
Alert provides an implementation of the HTTP API for managing alert topics, handlers and events.
|
Alert provides an implementation of the HTTP API for managing alert topics, handlers and events. |
config/override
Overrider provides an API for overriding and reading redacted values for a configuration object.
|
Overrider provides an API for overriding and reading redacted values for a configuration object. |
httpd
Provides an HTTP API exposing many components of Kapacitor.
|
Provides an HTTP API exposing many components of Kapacitor. |
reporting
Sends anonymous reports to InfluxData
|
Sends anonymous reports to InfluxData |
stats
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
|
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy. |
storage
The storage package provides a key/value based interface for storing Kapacitor metadata.
|
The storage package provides a key/value based interface for storing Kapacitor metadata. |
TICKscript is a simple invocation chaining DSL.
|
TICKscript is a simple invocation chaining DSL. |
cmd/tickdoc
Tickdoc is a simple utility similar to godoc that generates documentation from comments.
|
Tickdoc is a simple utility similar to godoc that generates documentation from comments. |
agent
Package agent is a generated protocol buffer package.
|
Package agent is a generated protocol buffer package. |
Package uuid generates and parses UUIDs.
|
Package uuid generates and parses UUIDs. |