fluxd

package
v0.0.0-...-035149e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2014 License: Unlicense Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultConfig is the default configuration used for NSQ communications by each binding if one is not specified.
	DefaultConfig *nsq.Config
)

Functions

This section is empty.

Types

type BatchBinding

type BatchBinding struct {
	//Topic is the NSQ topic to listen on
	Topic string

	// Channel is the NSQ channel to listen on
	Channel string

	// Endpoint is the database and decode logic
	Endpoint BatchDB

	// Workers sets the number of concurent goroutines serving this binding; defaults to 1
	Workers int

	// MaxMsg sets the maximum number of messages collected before upload; defaults to 50
	MaxMsg int

	// BatchTime sets the maximum time spend waiting to collect messages before upload; defaults to 250ms
	BatchTime time.Duration
	// contains filtered or unexported fields
}

BatchBinding types connect NSQ channels, flux/msg schemas, and database endpoints, but they use database request batching.

type BatchDB

type BatchDB interface {
	// A BatchDB must fulfill the DB interface. The Req
	// method should point the request towards the batch
	// endpoint of the database, if it is different
	// from the standard endpoint.
	DB
	// EntryPrefix is written immediately before each individual entry
	EntryPrefix() []byte

	// EntryPostfix is written immediately after each individual entry
	EntryPostfix() []byte

	// BatchPrefix is written at the beggining of the request body
	// (before all entries, e.g. '[')
	BatchPrefix() []byte

	// BatchPostfix is written at the end of the request body
	// (after all entries, e.g. ']')
	BatchPostfix() []byte

	// Concat is placed between every entry
	// (e.g. ',')
	Concat() []byte
}

BatchDB represents a database connection that handles batch uploads. XxxPrefix and XxxPostfix methods are used to format the body.

type Binding

type Binding struct {
	// Topic is the NSQ topic to listen on
	Topic string
	// Channel is the NSQ channel to listen on
	Channel string
	// Endpoint is the database and decode logic used for this topic & channel
	Endpoint DB
	// Workers sets the number of concurrent goroutines serving this binding; defaults to 1
	Workers int
	// contains filtered or unexported fields
}

Binding types connect NSQ channels, flux/msg schemas, and database endpoints.

type DB

type DB interface {
	// Init gives you an opportunity to do custom initialization and
	// validation on the DB. Calls to Server.Run will cause the
	// driver to call Init() exactly once.
	Init() error

	// Translate should turn the body of an NSQ message
	// into a valid body to be used to write
	// to the database.
	Translate(p []byte, w msg.Writer) error

	// Req should return a valid *http.Request to be performed
	// by an http client. 'r' should be used
	// as the body of the request.
	Req(r io.Reader) *http.Request

	// Validate is used to validate a response from a server
	// after data is sent. Validate() should return
	// a non-nil error to mark the response as failed.
	// Validate is also responsible for closing the response
	// body.
	Validate(*http.Response) error
}

DB is the interface that fluxd uses to communicate with a database. All calls should be thread-safe.

type ElasticsearchDB

type ElasticsearchDB struct {
	Schema msg.Schema
	Addr   string
	Index  string
	Dtype  string
	// contains filtered or unexported fields
}

ElasticsearchDB conforms to the DB interface. It POSTs to http://{Addr}/{Index}/{Dtype}/ using the output of Translate() as the message body.

func (*ElasticsearchDB) Address

func (e *ElasticsearchDB) Address() string

Address returns the endpoint that this db POSTs to.

func (*ElasticsearchDB) Init

func (e *ElasticsearchDB) Init() error

func (*ElasticsearchDB) Req

func (e *ElasticsearchDB) Req(r io.Reader) *http.Request

Req returns the proper POST request to Addr/Index/Dtype

func (*ElasticsearchDB) Translate

func (e *ElasticsearchDB) Translate(p []byte, w msg.Writer) error

Translate uses e.Schema to write json into 'w'. Per the elasticsearch type specification, binary types are encoded to base64-encoded quoted strings.

func (*ElasticsearchDB) Validate

func (e *ElasticsearchDB) Validate(res *http.Response) error

Validate returns an error if res.StatusCode is not 200 or 201

type InfluxDB

type InfluxDB struct {
	Schema msg.Schema
	Addr   string
	DBname string
	// contains filtered or unexported fields
}

InfluxDB implements the BatchBinding interface. It uses the first field in the Schema as the series name.

func (*InfluxDB) Address

func (d *InfluxDB) Address() string

Address returns {Addr}/db/{DBname}/series?u=root&p=root, but only computes the string once.

func (*InfluxDB) BatchPostfix

func (d *InfluxDB) BatchPostfix() []byte

BatchPostfix returns ']'

func (*InfluxDB) BatchPrefix

func (d *InfluxDB) BatchPrefix() []byte

BatchPrefix returns '['

func (*InfluxDB) Concat

func (d *InfluxDB) Concat() []byte

Concat returns ','

func (*InfluxDB) EntryPostfix

func (d *InfluxDB) EntryPostfix() []byte

EntryPostfix returns nil

func (*InfluxDB) EntryPrefix

func (d *InfluxDB) EntryPrefix() []byte

EntryPrefix returns nil

func (*InfluxDB) Init

func (d *InfluxDB) Init() error

Init must be called before the call to Server.Run() in order to initialize unexported struct members.

func (*InfluxDB) Req

func (d *InfluxDB) Req(r io.Reader) (req *http.Request)

Req resturns a POST request to d.Address() with 'r' as the body.

func (*InfluxDB) Translate

func (d *InfluxDB) Translate(p []byte, w msg.Writer) error

Tranlsate writes InfluxDB-compatible JSON from 'p' into 'w', returning an error if it encounters a problem decoding the data. Note that the msg.PackExt type is not supported, as it cannot be written as "flat" data. The first value in the Schema is assumed to be the series name. (Any other arrangement requires a significantly more complicated implementation.)

func (*InfluxDB) Validate

func (d *InfluxDB) Validate(res *http.Response) error

Validate returns an error if the response status code is not 200 or 201.

type Server

type Server struct {
	// Config used for NSQ connections; defaults to DefaultConfig
	NSQConfig *nsq.Config
	// Address(es) of nsqlookupd(s)
	Lookupdaddrs []string
	// If true, requests are redirected to StdOut instead of
	// to the http endpoint. Useful for testing.
	UseStdout bool
	// contains filtered or unexported fields
}

Server represents a collection of bindings to NSQLookupd instances.

func (*Server) Bind

func (s *Server) Bind(b *Binding)

Bind adds a binding to the server. Bind must be called before Run.

func (*Server) BindBatch

func (s *Server) BindBatch(b *BatchBinding)

BindBatch adds a batched binding to the server. Bind must be called before Run.

func (*Server) Run

func (s *Server) Run() error

Run blocks until all bindings exit gracefully, usually after a call to Stop. Run immediately returns an error if the server is not configured correctly.

func (*Server) Stop

func (s *Server) Stop()

Stop ends all of the server processes gracefully. It does not block.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL