agent

package
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2017 License: MIT Imports: 10 Imported by: 0

README

UDF Agents and Servers

A UDF is a User Defined Function, meaning that you can write your own functions/algorithms and plug them into Kapacitor. Your custom function runs in its own process and Kapacitor communicates with it via a defined protocol, see udf.proto. To facilitate working with the protocol several agents have been written in various languages that abstract the protocol communication through an interface in the respective languages. You can find those agent implementations in this directory and subdirectories based on language name.

Example uses of the agents can be found in the examples directory. These examples are working examples and are executed as part of the testing suite, see server_test.go.

Child process vs Socket

There are two approaches for writing UDFs.

  • A child process based approach where Kapacitor spawns a child process and communicates over STDIN/STDOUT.
  • A socket based approach where you start the UDF process externally and Kapacitor connects to it over a socket.

For the socket based approach there will only ever be one instance of your UDF process running. Each use of the UDF in a TICKscript will be a new connection the socket. Where as each use of a process based UDF means a new child process is spawned for each.

Design

The protocol for communicating with Kapacitor consists of Request and Response messages. The agents wrap the communication and serialization and expose an interface that needs to be implemented to handle each request/response. In addition to the request/response paradigm agents provide a way to stream data back to Kapacitor. Your UDF is in control of when new points or batches are sent back to Kapacitor.

Agents and Servers

There are two main objects provided in the current implementations, an Agent and a Server. The Agent is responsible for managing the communication over input and output streams. The Server is responsible for accepting new connections and creating new Agents to handle those new connections.

Both process based and socket based UDFs will need to use an Agent to handle the communication/serialization aspects of the protocol. Only socket based UDFs need use the Server.

Writing an Agent for a new Language

The UDF protocol is designed to be simple and consists of reading and writing protocol buffer messages.

In order to write a UDF in the language of your choice your language must have protocol buffer support and be able to read and write to a socket.

The basic steps are:

  1. Add the language to the udf/io.go generate comment so the udf.proto code exists for your language.
  2. Implement a Varint encoder/decoder, this is trivial see the python implementation.
  3. Implement a method for reading and writing streamed protobuf messages. See udf.proto for more details.
  4. Create an interface for handling each of the request/responses.
  5. Write a loop for reading from an input stream and calling the handler interface, and write responses to an output stream.
  6. Provide an thread safe mechanism for writing points and batches to the output stream independent of the handler interface. This is easily accomplished with a synchronized write method, see the python implementation.
  7. Implement the examples using your new agent.
  8. Add your example to the test suite in cmd/kapacitord/run/server_test.go.

For process based UDFs it is expected that the process terminate after STDIN is closed and the remaining requests processed. After STDIN is closed, the agent process can continue to send Responses to Kapacitor as long as a keepalive timeout does not occur. Once a keepalive timeout is reached and after a 2*keepalive_time grace period, if the process has not terminated then it will be forcefully terminated.

Docker

It is expected that the example can run inside the test suite. Since generating different protocol buffer code requires different plugins and libraries to run we make use of Docker to provide the necessary environment. This makes testing the code easier as the developer does not have to install each supported language locally.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Accepter added in v0.13.0

type Accepter interface {
	// Accept new connections from the listener and handle them accordingly.
	// The typical action is to create a new Agent with the connection as both its in and out objects.
	Accept(net.Conn)
}

type Agent

type Agent struct {

	// A channel for writing Responses, specifically Batch and Point responses.
	Responses chan<- *udf.Response

	// The handler for requests.
	Handler Handler
	// contains filtered or unexported fields
}

Go implementation of a Kapacitor UDF agent. This agent is responsible for reading and writing messages over a socket.

The Agent requires a Handler object in order to fulfill requests.

func New

func New(in io.ReadCloser, out io.WriteCloser) *Agent

Create a new Agent is the provided in/out objects. To create an Agent that reads from STDIN/STDOUT of the process use New(os.Stdin, os.Stdout)

func (*Agent) Start

func (a *Agent) Start() error

Start the Agent, you must set an Handler on the agent before starting.

func (*Agent) Wait

func (a *Agent) Wait() error

Wait for the Agent to terminate. The Agent will not terminate till the Responses channel is closed. You will need to close this channel externally, typically in the Stop method for the Handler. The Agent will terminate if the In reader is closed or an error occurs.

type Handler

type Handler interface {
	// Return the InfoResponse. Describing the properties of this Handler
	Info() (*udf.InfoResponse, error)
	// Initialize the Handler with the provided options.
	Init(*udf.InitRequest) (*udf.InitResponse, error)
	// Create a snapshot of the running state of the handler.
	Snaphost() (*udf.SnapshotResponse, error)
	// Restore a previous snapshot.
	Restore(*udf.RestoreRequest) (*udf.RestoreResponse, error)

	// A batch has begun.
	BeginBatch(*udf.BeginBatch) error
	// A point has arrived.
	Point(*udf.Point) error
	// The batch is complete.
	EndBatch(*udf.EndBatch) error

	// Gracefully stop the Handler.
	// No other methods will be called.
	Stop()
}

The Agent calls the appropriate methods on the Handler as it receives requests over a socket.

Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent. Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself. These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.

The Handler is called from a single goroutine, meaning methods will not be called concurrently.

To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.

type Server added in v0.13.0

type Server struct {
	// contains filtered or unexported fields
}

A server accepts connections on a listener and spawns new Agents for each connection.

func NewServer added in v0.13.0

func NewServer(l net.Listener, a Accepter) *Server

Create a new server.

func (*Server) Serve added in v0.13.0

func (s *Server) Serve() error

func (*Server) Stop added in v0.13.0

func (s *Server) Stop()

func (*Server) StopOnSignals added in v0.13.0

func (s *Server) StopOnSignals(signals ...os.Signal)

Register a signal handler to stop the Server for the given signals.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL