Documentation ¶
Overview ¶
Package pipeline provides all adaptoremented functionality to move data through transporter.
A transporter pipeline consists of a tree of Nodes, with the root Node attached to the source database, and each child node is either a data transformer or a database sink. Node's can be defined like:
transporter.NewNode("source", "mongo", map[string]interface{}{"uri": "mongodb://localhost/, "namespace": "test.colln", "debug": false, "tail": true}). Add(transporter.NewNode("out", "file", map[string]interface{}{"uri": "stdout://"}))
and pipelines can be defined :
pipeline, err := transporter.NewPipeline(source, events.NewNoopEmitter(), 1*time.Second) if err != nil { fmt.Println(err) os.Exit(1) } pipeline.Run()
the event emitter's are defined in transporter/events, and are used to deliver error/metrics/etc about the running process
Package pipeline provides all adaptoremented functionality to move data through transporter.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Node ¶
type Node struct { Name string `json:"name"` // the name of this node Type string `json:"type"` // the node's type, used to create the adaptorementation Children []*Node `json:"children"` // the nodes are set up as a tree, this is an array of this nodes children Parent *Node `json:"parent"` // this node's parent node, if this is nil, this is a 'source' node Transforms []*Transform // contains filtered or unexported fields }
A Node is the basic building blocks of transporter pipelines. Nodes are constructed in a tree, with the first node broadcasting data to each of it's children. Node tree's can be constructed as follows:
source := transporter.NewNode("name1", "mongo", adaptor.Config{"uri": "mongodb://localhost/boom", "namespace": "boom.foo", "debug": true}) sink1 := transporter.NewNode("foofile", "file", adaptor.Config{"uri": "stdout://"}) sink2 := transporter.NewNode("foofile2", "file", adaptor.Config{"uri": "stdout://"}) source.Add(sink1) source.Add(sink2)
func (*Node) Endpoints ¶
Endpoints recurses down the node tree and accumulates a map associating node name with node type this is primarily used with the boot event
func (*Node) Path ¶
Path returns a string representation of the names of all the node's parents concatenated with "/" used in metrics eg. for the following tree source := transporter.NewNode("name1", "mongo", adaptor.Config{"uri": "mongodb://localhost/boom", "namespace": "boom.foo", "debug": true})
sink1 := transporter.NewNode("foofile", "file", adaptor.Config{"uri": "stdout://"}) source.Add(sink1)
'source' will have a Path of 'name1', and 'sink1' will have a path of 'name1/sink1'
func (*Node) Start ¶
Start starts the nodes children in a go routine, and then runs either Start() or Listen() on the node's adaptor. Root nodes (nodes with no parent) will run Start() and will emit messages to it's children, All descendant nodes run Listen() on the adaptor
func (*Node) Stop ¶
func (n *Node) Stop()
Stop this node's adaptor, and sends a stop to each child of this node
type Pipeline ¶
type Pipeline struct { // Err is the fatal error that was sent from the adaptor // that caused us to stop this process. If this is nil, then // the transporter is running Err error // contains filtered or unexported fields }
A Pipeline is a the end to end description of a transporter data flow. including the source, sink, and all the transformers along the way
func NewDefaultPipeline ¶
func NewDefaultPipeline(source *Node, uri, key, pid, version string, interval time.Duration) (*Pipeline, error)
NewDefaultPipeline returns a new Transporter Pipeline with the given node tree, and uses the events.HttpPostEmitter to deliver metrics. eg.
source := transporter.NewNode("source", "mongo", adaptor.Config{"uri": "mongodb://localhost/", "namespace": "boom.foo", "debug": false, "tail": true}). Add(transporter.NewNode("out", "file", adaptor.Config{"uri": "stdout://"})) pipeline, err := transporter.NewDefaultPipeline(source, events.Api{URI: "http://localhost/endpoint"}, 1*time.Second) if err != nil { fmt.Println(err) os.Exit(1) }
pipeline.Run()
func NewPipeline ¶
func NewPipeline(version string, source *Node, emit events.EmitFunc, interval time.Duration) (*Pipeline, error)
NewPipeline creates a new Transporter Pipeline using the given tree of nodes, and Event Emitter eg.
source := transporter.NewNode("source", "mongo", adaptor.Config{"uri": "mongodb://localhost/", "namespace": "boom.foo", "debug": false, "tail": true}). Add(transporter.NewNode("out", "file", adaptor.Config{"uri": "stdout://"})) pipeline, err := transporter.NewPipeline("version", source, events.NewNoopEmitter(), 1*time.Second) if err != nil { fmt.Println(err) os.Exit(1) }
pipeline.Run()