Documentation ¶
Index ¶
- Constants
- Variables
- func Register(name, desc string, ...)
- type Config
- type Elasticsearch
- type ErrAdaptor
- type Error
- type ErrorLevel
- type File
- type FileConfig
- type Mongodb
- type MongodbConfig
- type Registry
- type RegistryEntry
- type Rethinkdb
- type SslConfig
- type StopStartListener
- func Createadaptor(kind, path string, extra Config, p *pipe.Pipe) (adaptor StopStartListener, err error)
- func NewElasticsearch(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)
- func NewFile(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)
- func NewMongodb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)
- func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error)
- func NewTransformer(pipe *pipe.Pipe, path string, extra Config) (StopStartListener, error)
- type SyncDoc
- type Transformer
- type TransformerConfig
Constants ¶
const ( MONGO_BUFFER_SIZE int = 1e6 MONGO_BUFFER_LEN int = 5e5 )
Variables ¶
var ( // Adaptors is a registry of adaptor types, constructors and configs Adaptors = make(Registry) )
Functions ¶
func Register ¶
func Register(name, desc string, fn func(*pipe.Pipe, string, Config) (StopStartListener, error), config interface{})
Register registers an adaptor (database adaptor) for use with Transporter The second argument, fn, is a constructor that returns an instance of the given adaptor, config is an instance of the adaptor's config struct
Types ¶
type Config ¶
type Config map[string]interface{}
Config is an alias to map[string]interface{} and helps us turn a fuzzy document into a conrete named struct
type Elasticsearch ¶
type Elasticsearch struct {
// contains filtered or unexported fields
}
Elasticsearch is an adaptor to connect a pipeline to an elasticsearch cluster.
func (*Elasticsearch) Start ¶
func (e *Elasticsearch) Start() error
Start the adaptor as a source (not implemented)
type ErrAdaptor ¶ added in v0.0.3
type ErrAdaptor struct {
// contains filtered or unexported fields
}
ErrAdaptor gives the details of the failed adaptor
func (ErrAdaptor) Error ¶ added in v0.0.3
func (a ErrAdaptor) Error() string
type Error ¶
type Error struct { Lvl ErrorLevel Str string Path string Record interface{} }
Error is an error that happened during an adaptor's operation. Error's include both an indication of the severity, Level, as well as a reference to the Record that was in process when the error occured
func NewError ¶
func NewError(lvl ErrorLevel, path, str string, record interface{}) Error
NewError creates an Error type with the specificed level, path, message and record
type ErrorLevel ¶
type ErrorLevel int
ErrorLevel indicated the severity of the error
const ( NOTICE ErrorLevel = iota WARNING ERROR CRITICAL )
Adaptor errors have levels to indicate their severity. CRITICAL errors indicate that the program cannot continue running.
ERROR errors indicate a problem with a specific document or message. a document might not have been applied properly to a source, but the program can continue
WARNING Todo
NOTICE ToDo
type File ¶
type File struct {
// contains filtered or unexported fields
}
File is an adaptor that can be used as a source / sink for file's on disk, as well as a sink to stdout.
type FileConfig ¶
type FileConfig struct { // URI pointing to the resource. We only recognize file:// and stdout:// currently URI string `json:"uri" doc:"the uri to connect to, ie stdout://, file:///tmp/output"` }
FileConfig is used to configure the File Adaptor,
type Mongodb ¶
type Mongodb struct {
// contains filtered or unexported fields
}
Mongodb is an adaptor to read / write to mongodb. it works as a source by copying files, and then optionally tailing the oplog
type MongodbConfig ¶
type MongodbConfig struct { URI string `json:"uri" doc:"the uri to connect to, in the form mongodb://user:password@host.com:27017/auth_database"` Namespace string `json:"namespace" doc:"mongo namespace to read/write"` Ssl *SslConfig `json:"ssl,omitempty" doc:"ssl options for connection"` Timeout string `json:timeout" doc:"timeout for establishing connection, format must be parsable by time.ParseDuration and defaults to 10s"` Debug bool `json:"debug" doc:"display debug information"` Tail bool `json:"tail" doc:"if tail is true, then the mongodb source will tail the oplog after copying the namespace"` Wc int `` /* 143-byte string literal not displayed */ FSync bool `json:"fsync" doc:"When writing, should we flush to disk before returning success"` Bulk bool `json:"bulk" doc:"use a buffer to bulk insert documents"` }
MongodbConfig provides configuration options for a mongodb adaptor the notable difference between this and dbConfig is the presence of the Tail option
type Registry ¶
type Registry map[string]RegistryEntry
Registry maps the adaptor's name to the RegistryEntry
type RegistryEntry ¶
type RegistryEntry struct { Name string Description string Constructor func(*pipe.Pipe, string, Config) (StopStartListener, error) Config interface{} }
RegistryEntry stores the adaptor constructor and configuration struct
func (*RegistryEntry) About ¶
func (r *RegistryEntry) About() string
About inspects the RegistryEntry's Config object, and uses each field's tags as a docstring
type Rethinkdb ¶
type Rethinkdb struct {
// contains filtered or unexported fields
}
Rethinkdb is an adaptor that writes metrics to rethinkdb (http://rethinkdb.com/) An open-source distributed database
type SslConfig ¶ added in v0.0.4
type SslConfig struct {
CaCerts []string `json:"cacerts,omitempty" doc:"array of root CAs to use in order to verify the server certificates"`
}
type StopStartListener ¶
StopStartListener defines the interface that all database connectors and nodes must follow. Start() consumes data from the interface, Listen() listens on a pipe, processes data, and then emits it. Stop() shuts down the adaptor
func Createadaptor ¶
func Createadaptor(kind, path string, extra Config, p *pipe.Pipe) (adaptor StopStartListener, err error)
Createadaptor instantiates an adaptor given the adaptor type and the Config. Constructors are expected to be in the form
func NewWhatever(p *pipe.Pipe, extra Config) (*Whatever, error) {}
and are expected to confirm to the adaptor interface
func NewElasticsearch ¶
NewElasticsearch creates a new Elasticsearch adaptor. Elasticsearch adaptors cannot be used as a source,
func NewMongodb ¶
NewMongodb creates a new Mongodb adaptor
func NewRethinkdb ¶
NewRethinkdb creates a new Rethinkdb database adaptor
func NewTransformer ¶
NewTransformer creates a new transformer object
type Transformer ¶
type Transformer struct {
// contains filtered or unexported fields
}
Transformer is an adaptor which consumes data from a source, transforms it using a supplied javascript function and then emits it. The javascript transformation function is supplied as a seperate file on disk, and is called by calling the defined module.exports function
func (*Transformer) Listen ¶
func (t *Transformer) Listen() (err error)
Listen starts the transformer's listener, reads each message from the incoming channel transformers it into mejson, and then uses the supplied javascript module.exports function to transform the document. The document is then emited to this adaptor's children
func (*Transformer) Start ¶
func (t *Transformer) Start() error
Start the adaptor as a source (not implemented for this adaptor)
type TransformerConfig ¶
type TransformerConfig struct { // file containing transformer javascript // must define a module.exports = function(doc) { .....; return doc } Filename string `json:"filename" doc:"the filename containing the javascript transform fn"` Namespace string `json:"namespace" doc:"namespace to transform"` // verbose output Debug bool `json:"debug" doc:"display debug information"` // debug mode }
TransformerConfig holds config options for a transformer adaptor