kapacitor

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2016 License: MIT Imports: 44 Imported by: 0

README

Kapacitor Circle CI Docker pulls

Open source framework for processing, monitoring, and alerting on time series data

Installation

Kapacitor has two binaries:

  • kapacitor – a CLI program for calling the Kapacitor API.
  • kapacitord – the Kapacitor server daemon.

You can either download the binaries directly from the downloads page or go get them:

go get github.com/influxdata/kapacitor/cmd/kapacitor
go get github.com/influxdata/kapacitor/cmd/kapacitord

Configuration

An example configuration file can be found here

Kapacitor can also provide an example config for you using this command:

kapacitord config

Getting Started

This README gives you a high level overview of what Kapacitor is and what its like to use it. As well as some details of how it works. To get started using Kapacitor see this guide. After you finish the getting started exercise you can check out the TICKscripts for different Telegraf plugins.

Basic Example

Kapacitor use a DSL named TICKscript to define tasks.

A simple TICKscript that alerts on high cpu usage looks like this:

stream
    |from()
        .measurement('cpu_usage_idle')
        .groupBy('host')
    |window()
        .period(1m)
        .every(1m)
    |mean('value')
    |eval(lambda: 100.0 - "mean")
        .as('used')
    |alert()
        .message('{{ .Level}}: {{ .Name }}/{{ index .Tags "host" }} has high cpu usage: {{ index .Fields "used" }}')
        .warn(lambda: "used" > 70.0)
        .crit(lambda: "used" > 85.0)

        // Send alert to hander of choice.

        // Slack
        .slack()
        .channel('#alerts')

        // VictorOps
        .victorOps()
        .routingKey('team_rocket')

        // PagerDuty
        .pagerDuty()

Place the above script into a file cpu_alert.tick then run these commands to start the task:

# Define the task (assumes cpu data is in db 'telegraf')
kapacitor define \
    cpu_alert \
    -type stream \
    -dbrp telegraf.default \
    -tick ./cpu_alert.tick
# Start the task
kapacitor enable cpu_alert

For more complete examples see the documentation.

Documentation

Overview

A data pipeline processing engine.

See the README for more complete examples and guides.

Code Organization:

The pipeline package provides an API for how nodes can be connected to form a pipeline. The individual implementations of each node exist in this kapacitor package. The reason for the separation is to keep the exported API from the pipeline package clean as it is consumed via the TICKscripts (a DSL for Kapacitor).

Other Concepts:

Stream vs Batch -- Use of the word 'stream' indicates data arrives a single data point at a time. Use of the word 'batch' indicates data arrives in sets or batches or data points.

Task -- A task represents a concrete workload to perform. It consists of a pipeline and an identifying name. Basic CRUD operations can be performed on tasks.

Task Master -- Responsible for executing a task in a specific environment.

Replay -- Replays static datasets against tasks.

Index

Constants

View Source
const (
	// List of names for top-level exported vars
	ClusterIDVarName = "cluster_id"
	ServerIDVarName  = "server_id"
	HostVarName      = "host"
	ProductVarName   = "product"
	VersionVarName   = "version"

	NumTasksVarName         = "num_tasks"
	NumEnabledTasksVarName  = "num_enabled_tasks"
	NumSubscriptionsVarName = "num_subscriptions"

	UptimeVarName = "uptime"

	// The name of the product
	Product = "kapacitor"
)
View Source
const (
	MainTaskMaster = "main"
)

Variables

View Source
var (
	// Global expvars
	NumTasksVar         = &kexpvar.Int{}
	NumEnabledTasksVar  = &kexpvar.Int{}
	NumSubscriptionsVar = kexpvar.NewIntSum()

	ClusterIDVar = &kexpvar.String{}
	ServerIDVar  = &kexpvar.String{}
	HostVar      = &kexpvar.String{}
	ProductVar   = &kexpvar.String{}
	VersionVar   = &kexpvar.String{}
)
View Source
var ErrAborted = errors.New("edged aborted")
View Source
var ErrEmptyEmit = errors.New("error call to emit produced no results")
View Source
var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
View Source
var ErrTaskMasterOpen = errors.New("TaskMaster is open")
View Source
var ErrWrongTaskType = errors.New("wrong task type")

Functions

func ConvertResultTimes added in v0.10.1

func ConvertResultTimes(r *Result)

func CreateDBRPMap

func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool

func DeleteStatistics added in v0.11.0

func DeleteStatistics(key string)

Remove a statistics map.

func EvalPredicate

func EvalPredicate(se stateful.Expression, scopePool stateful.ScopePool, now time.Time, fields models.Fields, tags models.Tags) (bool, error)

EvalPredicate - Evaluate a given expression as a boolean predicate against a set of fields and tags

func NewStatistics

func NewStatistics(name string, tags map[string]string) (string, *kexpvar.Map)

NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, and values are placed at the key "values". The "values" map is returned so that statistics can be set.

func ReplayBatchFromChan added in v1.0.0

func ReplayBatchFromChan(clck clock.Clock, batches []<-chan models.Batch, collectors []BatchCollector, recTime bool) <-chan error

Replay batch data from a channel source.

func ReplayBatchFromIO added in v1.0.0

func ReplayBatchFromIO(clck clock.Clock, data []io.ReadCloser, collectors []BatchCollector, recTime bool) <-chan error

Replay batch data from an IO source.

func ReplayStreamFromChan added in v1.0.0

func ReplayStreamFromChan(clck clock.Clock, points <-chan models.Point, collector StreamCollector, recTime bool) <-chan error

Replay stream data from a channel source.

func ReplayStreamFromIO added in v1.0.0

func ReplayStreamFromIO(clck clock.Clock, data io.ReadCloser, collector StreamCollector, recTime bool, precision string) <-chan error

Replay stream data from an IO source.

func Uptime added in v0.2.0

func Uptime() time.Duration

func WriteBatchForRecording

func WriteBatchForRecording(w io.Writer, b models.Batch) error

func WritePointForRecording

func WritePointForRecording(w io.Writer, p models.Point, precision string) error

Types

type AlertData

type AlertData struct {
	ID       string          `json:"id"`
	Message  string          `json:"message"`
	Details  string          `json:"details"`
	Time     time.Time       `json:"time"`
	Duration time.Duration   `json:"duration"`
	Level    AlertLevel      `json:"level"`
	Data     influxql.Result `json:"data"`
	// contains filtered or unexported fields
}

type AlertHandler

type AlertHandler func(ad *AlertData)

type AlertLevel

type AlertLevel int
const (
	OKAlert AlertLevel = iota
	InfoAlert
	WarnAlert
	CritAlert
)

func (AlertLevel) MarshalText

func (l AlertLevel) MarshalText() ([]byte, error)

func (AlertLevel) String

func (l AlertLevel) String() string

func (*AlertLevel) UnmarshalText added in v0.10.1

func (l *AlertLevel) UnmarshalText(text []byte) error

type AlertNode

type AlertNode struct {
	// contains filtered or unexported fields
}

func (*AlertNode) Wait added in v0.11.0

func (n *AlertNode) Wait() error

type BatchCollector

type BatchCollector interface {
	CollectBatch(models.Batch) error
	Close()
}

type BatchNode

type BatchNode struct {
	// contains filtered or unexported fields
}

func (*BatchNode) Abort added in v0.10.1

func (s *BatchNode) Abort()

func (*BatchNode) Count added in v0.13.0

func (s *BatchNode) Count() int

func (*BatchNode) DBRPs

func (s *BatchNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*BatchNode) Queries

func (s *BatchNode) Queries(start, stop time.Time) ([]BatchQueries, error)

func (*BatchNode) Start

func (s *BatchNode) Start()

func (*BatchNode) Wait added in v0.11.0

func (s *BatchNode) Wait() error

type BatchQueries added in v1.0.0

type BatchQueries struct {
	Queries            []*Query
	Cluster            string
	GroupByMeasurement bool
}

type CombineNode added in v1.0.0

type CombineNode struct {
	// contains filtered or unexported fields
}

func (*CombineNode) Wait added in v1.0.0

func (n *CombineNode) Wait() error

type Command added in v0.13.0

type Command interface {
	Start() error
	Wait() error

	StdinPipe() (io.WriteCloser, error)
	StdoutPipe() (io.Reader, error)
	StderrPipe() (io.Reader, error)

	Kill()
}

type CommandInfo added in v0.13.0

type CommandInfo struct {
	Prog string
	Args []string
	Env  []string
}

Necessary information to create a new command

func (CommandInfo) NewCommand added in v0.13.0

func (ci CommandInfo) NewCommand() Command

Create a new Command using golang exec package and the information.

type Commander added in v0.13.0

type Commander interface {
	NewCommand() Command
}

type DBRP

type DBRP struct {
	Database        string `json:"db"`
	RetentionPolicy string `json:"rp"`
}

func (DBRP) String

func (d DBRP) String() string

type DefaultNode added in v0.13.0

type DefaultNode struct {
	// contains filtered or unexported fields
}

func (*DefaultNode) Wait added in v0.13.0

func (n *DefaultNode) Wait() error

type DeleteNode added in v1.0.0

type DeleteNode struct {
	// contains filtered or unexported fields
}

func (*DeleteNode) Wait added in v1.0.0

func (n *DeleteNode) Wait() error

type DerivativeNode

type DerivativeNode struct {
	// contains filtered or unexported fields
}

func (*DerivativeNode) Wait added in v0.11.0

func (n *DerivativeNode) Wait() error

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

func (*Edge) Abort added in v0.2.1

func (e *Edge) Abort()

Abort all next and collect calls. Items in flight may or may not be processed.

func (*Edge) Close

func (e *Edge) Close()

Close the edge, this can only be called after all collect calls to the edge have finished.

func (*Edge) CollectBatch

func (e *Edge) CollectBatch(b models.Batch) error

func (*Edge) CollectPoint

func (e *Edge) CollectPoint(p models.Point) error

func (*Edge) Next

func (e *Edge) Next() (p models.PointInterface, ok bool)

func (*Edge) NextBatch

func (e *Edge) NextBatch() (b models.Batch, ok bool)

func (*Edge) NextPoint

func (e *Edge) NextPoint() (p models.Point, ok bool)

type EvalNode

type EvalNode struct {
	// contains filtered or unexported fields
}

func (*EvalNode) Wait added in v0.11.0

func (n *EvalNode) Wait() error

type ExecutingTask

type ExecutingTask struct {
	Task *Task
	// contains filtered or unexported fields
}

A task that is ready for execution.

func NewExecutingTask

func NewExecutingTask(tm *TaskMaster, t *Task) (*ExecutingTask, error)

Create a new task from a defined kapacitor.

func (*ExecutingTask) BatchCount

func (et *ExecutingTask) BatchCount() (int, error)

func (*ExecutingTask) BatchQueries

func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([]BatchQueries, error)

Get the next `num` batch queries that the batcher will run starting at time `start`.

func (*ExecutingTask) EDot added in v0.2.3

func (et *ExecutingTask) EDot(labels bool) []byte

Return a graphviz .dot formatted byte array. Label edges with relavant execution information.

func (*ExecutingTask) ExecutionStats added in v0.11.0

func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error)

func (*ExecutingTask) GetOutput

func (et *ExecutingTask) GetOutput(name string) (Output, error)

Get a named output.

func (*ExecutingTask) Snapshot added in v0.10.0

func (et *ExecutingTask) Snapshot() (*TaskSnapshot, error)

func (*ExecutingTask) StartBatching

func (et *ExecutingTask) StartBatching() error

Instruct source batch node to start querying and sending batches of data

func (*ExecutingTask) StopStats added in v0.11.0

func (et *ExecutingTask) StopStats()

Stop all stats nodes

func (*ExecutingTask) Wait added in v0.11.0

func (et *ExecutingTask) Wait() error

Wait till the task finishes and return any error

type ExecutionStats added in v0.11.0

type ExecutionStats struct {
	TaskStats map[string]interface{}
	NodeStats map[string]map[string]interface{}
}

type FlattenNode added in v1.0.0

type FlattenNode struct {
	// contains filtered or unexported fields
}

func (*FlattenNode) Wait added in v1.0.0

func (n *FlattenNode) Wait() error

type FromNode added in v0.13.0

type FromNode struct {
	// contains filtered or unexported fields
}

func (*FromNode) Wait added in v0.13.0

func (n *FromNode) Wait() error

type GroupByNode

type GroupByNode struct {
	// contains filtered or unexported fields
}

func (*GroupByNode) Wait added in v0.11.0

func (n *GroupByNode) Wait() error

type HTTPOutNode

type HTTPOutNode struct {
	// contains filtered or unexported fields
}

func (*HTTPOutNode) Endpoint

func (h *HTTPOutNode) Endpoint() string

func (*HTTPOutNode) Wait added in v0.11.0

func (n *HTTPOutNode) Wait() error

type InfluxDBOutNode

type InfluxDBOutNode struct {
	// contains filtered or unexported fields
}

func (*InfluxDBOutNode) Wait added in v0.11.0

func (n *InfluxDBOutNode) Wait() error

type InfluxQLNode added in v0.11.0

type InfluxQLNode struct {
	// contains filtered or unexported fields
}

func (*InfluxQLNode) Wait added in v0.11.0

func (n *InfluxQLNode) Wait() error

type JoinNode

type JoinNode struct {
	// contains filtered or unexported fields
}

func (*JoinNode) Wait added in v0.11.0

func (n *JoinNode) Wait() error

type K8sAutoscaleNode added in v1.1.0

type K8sAutoscaleNode struct {
	// contains filtered or unexported fields
}

func (*K8sAutoscaleNode) Wait added in v1.1.0

func (n *K8sAutoscaleNode) Wait() error

type LogNode added in v0.11.0

type LogNode struct {
	// contains filtered or unexported fields
}

func (*LogNode) Wait added in v0.11.0

func (n *LogNode) Wait() error

type LogService

type LogService interface {
	NewLogger(prefix string, flag int) *log.Logger
}

type MaxDuration added in v0.11.0

type MaxDuration struct {
	// contains filtered or unexported fields
}

MaxDuration is a 64-bit int variable representing a duration in nanoseconds,that satisfies the expvar.Var interface. When setting a value it will only be set if it is greater than the current value.

func (*MaxDuration) IntValue added in v0.11.0

func (v *MaxDuration) IntValue() int64

func (*MaxDuration) Set added in v0.11.0

func (v *MaxDuration) Set(next int64)

Set sets value if it is greater than current value. If set was successful and a setter exists, will pass on value to setter.

func (*MaxDuration) String added in v0.11.0

func (v *MaxDuration) String() string

func (*MaxDuration) StringValue added in v1.0.0

func (v *MaxDuration) StringValue() string

type NoOpNode added in v0.11.0

type NoOpNode struct {
	// contains filtered or unexported fields
}

func (*NoOpNode) Wait added in v0.11.0

func (n *NoOpNode) Wait() error

type Node

type Node interface {
	pipeline.Node

	// wait for the node to finish processing and return any errors
	Wait() error
	// contains filtered or unexported methods
}

A node that can be in an executor.

type NoopMetaClient added in v1.0.0

type NoopMetaClient struct{}

func (*NoopMetaClient) Authenticate added in v1.0.0

func (m *NoopMetaClient) Authenticate(username, password string) (ui *meta.UserInfo, err error)

func (*NoopMetaClient) CreateDatabase added in v1.0.0

func (m *NoopMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error)

func (*NoopMetaClient) CreateDatabaseWithRetentionPolicy added in v1.0.0

func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)

func (*NoopMetaClient) CreateRetentionPolicy added in v1.0.0

func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)

func (*NoopMetaClient) Database added in v1.0.0

func (m *NoopMetaClient) Database(name string) *meta.DatabaseInfo

func (*NoopMetaClient) RetentionPolicy added in v1.0.0

func (m *NoopMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)

func (*NoopMetaClient) Users added in v1.0.0

func (m *NoopMetaClient) Users() ([]meta.UserInfo, error)

func (*NoopMetaClient) WaitForLeader added in v1.0.0

func (m *NoopMetaClient) WaitForLeader(d time.Duration) error

type Output

type Output interface {
	Endpoint() string
}

An output of a pipeline. Still need to improve this interface to expose different types of outputs.

type Query

type Query struct {
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery(queryString string) (*Query, error)

func (*Query) Clone added in v1.0.1

func (q *Query) Clone() (*Query, error)

Deep clone this query

func (*Query) DBRPs

func (q *Query) DBRPs() ([]DBRP, error)

Return the db rp pairs of the query

func (*Query) Dimensions

func (q *Query) Dimensions(dims []interface{}) error

Set the dimensions on the query

func (*Query) Fill

func (q *Query) Fill(option influxql.FillOption, value interface{})

func (*Query) IsGroupedByTime added in v1.0.1

func (q *Query) IsGroupedByTime() bool

func (*Query) SetStartTime added in v1.0.1

func (q *Query) SetStartTime(s time.Time)

Set the start time of the query

func (*Query) SetStopTime added in v1.0.1

func (q *Query) SetStopTime(s time.Time)

Set the stop time of the query

func (*Query) StartTime added in v1.0.1

func (q *Query) StartTime() time.Time

Set the start time of the query

func (*Query) StopTime added in v1.0.1

func (q *Query) StopTime() time.Time

Set the stop time of the query

func (*Query) String

func (q *Query) String() string

type QueryNode added in v0.13.0

type QueryNode struct {
	// contains filtered or unexported fields
}

func (*QueryNode) Abort added in v0.13.0

func (b *QueryNode) Abort()

func (*QueryNode) Cluster added in v1.0.0

func (b *QueryNode) Cluster() string

func (*QueryNode) DBRPs added in v0.13.0

func (b *QueryNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*QueryNode) GroupByMeasurement added in v1.0.0

func (b *QueryNode) GroupByMeasurement() bool

func (*QueryNode) Queries added in v0.13.0

func (b *QueryNode) Queries(start, stop time.Time) ([]*Query, error)

func (*QueryNode) Start added in v0.13.0

func (b *QueryNode) Start()

func (*QueryNode) Wait added in v0.13.0

func (n *QueryNode) Wait() error

type Result

type Result influxql.Result

The result from an output.

func ResultFromJSON

func ResultFromJSON(in io.Reader) (r Result)

Unmarshal a Result object from JSON.

type SampleNode

type SampleNode struct {
	// contains filtered or unexported fields
}

func (*SampleNode) Wait added in v0.11.0

func (n *SampleNode) Wait() error

type ShiftNode added in v0.11.0

type ShiftNode struct {
	// contains filtered or unexported fields
}

func (*ShiftNode) Wait added in v0.11.0

func (n *ShiftNode) Wait() error

type Socket added in v0.13.0

type Socket interface {
	Open() error
	Close() error
	In() io.WriteCloser
	Out() io.Reader
}

func NewSocketConn added in v1.0.0

func NewSocketConn(path string) Socket

type StatsData added in v0.2.1

type StatsData struct {
	Name   string                 `json:"name"`
	Tags   map[string]string      `json:"tags"`
	Values map[string]interface{} `json:"values"`
}

func GetStatsData

func GetStatsData() ([]StatsData, error)

Return all stats data from the expvars.

type StatsNode added in v0.10.0

type StatsNode struct {
	// contains filtered or unexported fields
}

func (*StatsNode) Wait added in v0.11.0

func (n *StatsNode) Wait() error

type StreamCollector

type StreamCollector interface {
	CollectPoint(models.Point) error
	Close()
}

type StreamNode

type StreamNode struct {
	// contains filtered or unexported fields
}

func (*StreamNode) Wait added in v0.11.0

func (n *StreamNode) Wait() error

type Task

type Task struct {
	ID               string
	Pipeline         *pipeline.Pipeline
	Type             TaskType
	DBRPs            []DBRP
	SnapshotInterval time.Duration
}

The complete definition of a task, its id, pipeline and type.

func (*Task) Dot

func (t *Task) Dot() []byte

func (*Task) Measurements added in v0.13.0

func (t *Task) Measurements() []string

returns all the measurements from a FromNode

type TaskMaster

type TaskMaster struct {
	HTTPDService interface {
		AddRoutes([]httpd.Route) error
		DelRoutes([]httpd.Route)
		URL() string
	}
	TaskStore interface {
		SaveSnapshot(id string, snapshot *TaskSnapshot) error
		HasSnapshot(id string) bool
		LoadSnapshot(id string) (*TaskSnapshot, error)
	}
	DeadmanService pipeline.DeadmanService

	UDFService UDFService

	InfluxDBService interface {
		NewNamedClient(name string) (influxdb.Client, error)
	}
	SMTPService interface {
		Global() bool
		StateChangesOnly() bool
		SendMail(to []string, subject string, msg string) error
	}
	OpsGenieService interface {
		Global() bool
		Alert(teams []string, recipients []string, messageType, message, entityID string, t time.Time, details interface{}) error
	}
	VictorOpsService interface {
		Global() bool
		Alert(routingKey, messageType, message, entityID string, t time.Time, extra interface{}) error
	}
	PagerDutyService interface {
		Global() bool
		Alert(serviceKey, incidentKey, desc string, level AlertLevel, details interface{}) error
	}
	SlackService interface {
		Global() bool
		StateChangesOnly() bool
		Alert(channel, message, username, iconEmoji string, level AlertLevel) error
	}
	TelegramService interface {
		Global() bool
		StateChangesOnly() bool
		Alert(chatId, parseMode, message string, disableWebPagePreview, disableNotification bool) error
	}
	HipChatService interface {
		Global() bool
		StateChangesOnly() bool
		Alert(room, token, message string, level AlertLevel) error
	}
	AlertaService interface {
		Alert(token,
			resource,
			event,
			environment,
			severity,
			group,
			value,
			message,
			origin string,
			service []string,
			data interface{}) error
	}
	SensuService interface {
		Alert(name, output string, level AlertLevel) error
	}
	TalkService interface {
		Alert(title, text string) error
	}
	TimingService interface {
		NewTimer(timer.Setter) timer.Timer
	}
	K8sService interface {
		Client() (k8s.Client, error)
	}
	LogService LogService

	DefaultRetentionPolicy string
	// contains filtered or unexported fields
}

An execution framework for a set of tasks.

func NewTaskMaster

func NewTaskMaster(id string, l LogService) *TaskMaster

Create a new Executor with a given clock.

func (*TaskMaster) BatchCollectors

func (tm *TaskMaster) BatchCollectors(id string) []BatchCollector

func (*TaskMaster) Close

func (tm *TaskMaster) Close() error

func (*TaskMaster) CreateTICKScope added in v0.10.0

func (tm *TaskMaster) CreateTICKScope() *stateful.Scope

func (*TaskMaster) DelFork

func (tm *TaskMaster) DelFork(id string)

func (*TaskMaster) Drain added in v0.2.1

func (tm *TaskMaster) Drain()

func (*TaskMaster) ExecutingDot added in v0.2.3

func (tm *TaskMaster) ExecutingDot(id string, labels bool) string

func (*TaskMaster) ExecutionStats added in v0.11.0

func (tm *TaskMaster) ExecutionStats(id string) (ExecutionStats, error)

func (*TaskMaster) IsExecuting added in v0.2.1

func (tm *TaskMaster) IsExecuting(id string) bool

func (*TaskMaster) New

func (tm *TaskMaster) New(id string) *TaskMaster

Returns a new TaskMaster instance with the same services as the current one.

func (*TaskMaster) NewFork

func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP, measurements []string) (*Edge, error)

func (*TaskMaster) NewTask added in v0.10.0

func (tm *TaskMaster) NewTask(
	id,
	script string,
	tt TaskType,
	dbrps []DBRP,
	snapshotInterval time.Duration,
	vars map[string]tick.Var,
) (*Task, error)

Create a new task in the context of a TaskMaster

func (*TaskMaster) NewTemplate added in v1.0.0

func (tm *TaskMaster) NewTemplate(
	id,
	script string,
	tt TaskType,
) (*Template, error)

Create a new template in the context of a TaskMaster

func (*TaskMaster) Open

func (tm *TaskMaster) Open() (err error)

func (*TaskMaster) SnapshotTask added in v0.10.0

func (tm *TaskMaster) SnapshotTask(id string) (*TaskSnapshot, error)

func (*TaskMaster) StartTask

func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)

func (*TaskMaster) StopTask

func (tm *TaskMaster) StopTask(id string) error

func (*TaskMaster) StopTasks added in v0.10.1

func (tm *TaskMaster) StopTasks()

func (*TaskMaster) Stream

func (tm *TaskMaster) Stream(name string) (StreamCollector, error)

func (*TaskMaster) WritePoints added in v0.2.1

func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, points []imodels.Point) error

type TaskMasterLookup added in v1.0.0

type TaskMasterLookup struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTaskMasterLookup added in v1.0.0

func NewTaskMasterLookup() *TaskMasterLookup

func (*TaskMasterLookup) Delete added in v1.0.0

func (tml *TaskMasterLookup) Delete(tm *TaskMaster)

func (*TaskMasterLookup) Get added in v1.0.0

func (tml *TaskMasterLookup) Get(id string) *TaskMaster

func (*TaskMasterLookup) Main added in v1.0.0

func (tml *TaskMasterLookup) Main() *TaskMaster

func (*TaskMasterLookup) Set added in v1.0.0

func (tml *TaskMasterLookup) Set(tm *TaskMaster)

type TaskSnapshot added in v0.10.0

type TaskSnapshot struct {
	NodeSnapshots map[string][]byte
}

type TaskType

type TaskType int

The type of a task

const (
	StreamTask TaskType = iota
	BatchTask
)

func (TaskType) MarshalText added in v0.13.0

func (t TaskType) MarshalText() ([]byte, error)

func (TaskType) String

func (t TaskType) String() string

func (*TaskType) UnmarshalText added in v0.13.0

func (t *TaskType) UnmarshalText(text []byte) error

type Template added in v1.0.0

type Template struct {
	// contains filtered or unexported fields
}

func (*Template) Dot added in v1.0.0

func (t *Template) Dot() string

func (*Template) Vars added in v1.0.0

func (t *Template) Vars() map[string]tick.Var

type TimeDimension added in v1.0.0

type TimeDimension struct {
	Length time.Duration
	Offset time.Duration
}

type UDFNode added in v0.10.0

type UDFNode struct {
	// contains filtered or unexported fields
}

User defined function

func (*UDFNode) Wait added in v0.11.0

func (n *UDFNode) Wait() error

type UDFProcess added in v0.10.0

type UDFProcess struct {
	// contains filtered or unexported fields
}

UDFProcess wraps an external process and sends and receives data over STDIN and STDOUT. Lines received over STDERR are logged via normal Kapacitor logging.

func NewUDFProcess added in v0.10.0

func NewUDFProcess(
	commander Commander,
	l *log.Logger,
	timeout time.Duration,
	abortCallback func(),
) *UDFProcess

func (*UDFProcess) Abort added in v0.10.0

func (p *UDFProcess) Abort(err error)

func (*UDFProcess) BatchIn added in v0.10.0

func (p *UDFProcess) BatchIn() chan<- models.Batch

func (*UDFProcess) BatchOut added in v0.10.0

func (p *UDFProcess) BatchOut() <-chan models.Batch

func (*UDFProcess) Close added in v0.13.0

func (p *UDFProcess) Close() error

Stop the UDFProcess cleanly.

Calling Close should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.

func (*UDFProcess) Info added in v0.10.0

func (p *UDFProcess) Info() (udf.Info, error)

func (*UDFProcess) Init added in v0.10.0

func (p *UDFProcess) Init(options []*udf.Option) error

func (*UDFProcess) Open added in v0.13.0

func (p *UDFProcess) Open() error

Open the UDFProcess

func (*UDFProcess) PointIn added in v0.10.0

func (p *UDFProcess) PointIn() chan<- models.Point

func (*UDFProcess) PointOut added in v0.10.0

func (p *UDFProcess) PointOut() <-chan models.Point

func (*UDFProcess) Restore added in v0.10.0

func (p *UDFProcess) Restore(snapshot []byte) error

func (*UDFProcess) Snapshot added in v0.10.0

func (p *UDFProcess) Snapshot() ([]byte, error)

type UDFService added in v0.10.0

type UDFService interface {
	List() []string
	Info(name string) (udf.Info, bool)
	Create(name string, l *log.Logger, abortCallback func()) (udf.Interface, error)
}

type UDFSocket added in v0.13.0

type UDFSocket struct {
	// contains filtered or unexported fields
}

func NewUDFSocket added in v0.13.0

func NewUDFSocket(
	socket Socket,
	l *log.Logger,
	timeout time.Duration,
	abortCallback func(),
) *UDFSocket

func (*UDFSocket) Abort added in v0.13.0

func (s *UDFSocket) Abort(err error)

func (*UDFSocket) BatchIn added in v0.13.0

func (s *UDFSocket) BatchIn() chan<- models.Batch

func (*UDFSocket) BatchOut added in v0.13.0

func (s *UDFSocket) BatchOut() <-chan models.Batch

func (*UDFSocket) Close added in v0.13.0

func (s *UDFSocket) Close() error

func (*UDFSocket) Info added in v0.13.0

func (s *UDFSocket) Info() (udf.Info, error)

func (*UDFSocket) Init added in v0.13.0

func (s *UDFSocket) Init(options []*udf.Option) error

func (*UDFSocket) Open added in v0.13.0

func (s *UDFSocket) Open() error

func (*UDFSocket) PointIn added in v0.13.0

func (s *UDFSocket) PointIn() chan<- models.Point

func (*UDFSocket) PointOut added in v0.13.0

func (s *UDFSocket) PointOut() <-chan models.Point

func (*UDFSocket) Restore added in v0.13.0

func (s *UDFSocket) Restore(snapshot []byte) error

func (*UDFSocket) Snapshot added in v0.13.0

func (s *UDFSocket) Snapshot() ([]byte, error)

type UnionNode

type UnionNode struct {
	// contains filtered or unexported fields
}

func (*UnionNode) Wait added in v0.11.0

func (n *UnionNode) Wait() error

type WhereNode

type WhereNode struct {
	// contains filtered or unexported fields
}

func (*WhereNode) Wait added in v0.11.0

func (n *WhereNode) Wait() error

type WindowNode

type WindowNode struct {
	// contains filtered or unexported fields
}

func (*WindowNode) Wait added in v0.11.0

func (n *WindowNode) Wait() error

Directories

Path Synopsis
client
v1
Kapacitor HTTP API client written in Go
Kapacitor HTTP API client written in Go
A clock that provides blocking calls that wait until absolute times have occurred.
A clock that provides blocking calls that wait until absolute times have occurred.
cmd
This package is a fork of the golang expvar expvar.Var types.
This package is a fork of the golang expvar expvar.Var types.
Contains integration and end-to-end tests
Contains integration and end-to-end tests
Provides a set of structures for passing data around Kapacitor.
Provides a set of structures for passing data around Kapacitor.
Provides an API for constructing data processing pipelines.
Provides an API for constructing data processing pipelines.
Provides a server type for starting and configuring a Kapacitor server.
Provides a server type for starting and configuring a Kapacitor server.
services
config/override
Overrider provides an API for overriding and reading redacted values for a configuration object.
Overrider provides an API for overriding and reading redacted values for a configuration object.
httpd
Provides an HTTP API exposing many components of Kapacitor.
Provides an HTTP API exposing many components of Kapacitor.
k8s
reporting
Sends anonymous reports to InfluxData
Sends anonymous reports to InfluxData
stats
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
storage
The storage package provides a key/value based interface for storing Kapacitor metadata.
The storage package provides a key/value based interface for storing Kapacitor metadata.
udf
udp
TICKscript is a simple invocation chaining DSL.
TICKscript is a simple invocation chaining DSL.
ast
cmd/tickdoc
Tickdoc is a simple utility similar to godoc that generates documentation from comments.
Tickdoc is a simple utility similar to godoc that generates documentation from comments.
udf
Package udf is a generated protocol buffer package.
Package udf is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL