pipeline

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2017 License: BSD-3-Clause Imports: 13 Imported by: 0

Documentation

Overview

Package pipeline provides all adaptoremented functionality to move data through transporter.

A transporter pipeline consists of a tree of Nodes, with the root Node attached to the source database, and each child node is either a data transformer or a database sink. Node's can be defined like:

transporter.NewNode("source", "mongo", map[string]interface{}{"uri": "mongodb://localhost/, "namespace": "test.colln", "debug": false, "tail": true}).
  Add(transporter.NewNode("out", "file", map[string]interface{}{"uri": "stdout://"}))

and pipelines can be defined :

pipeline, err := transporter.NewPipeline(source, events.NewNoopEmitter(), 1*time.Second)
if err != nil {
  fmt.Println(err)
  os.Exit(1)
}
pipeline.Run()

the event emitter's are defined in transporter/events, and are used to deliver error/metrics/etc about the running process

Package pipeline provides all adaptoremented functionality to move data through transporter.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Node

type Node struct {
	Name       string  `json:"name"`     // the name of this node
	Type       string  `json:"type"`     // the node's type, used to create the adaptorementation
	Children   []*Node `json:"children"` // the nodes are set up as a tree, this is an array of this nodes children
	Parent     *Node   `json:"parent"`   // this node's parent node, if this is nil, this is a 'source' node
	Transforms []*Transform
	// contains filtered or unexported fields
}

A Node is the basic building blocks of transporter pipelines. Nodes are constructed in a tree, with the first node broadcasting data to each of it's children. Node tree's can be constructed as follows:

source := transporter.NewNode("name1", "mongo", adaptor.Config{"uri": "mongodb://localhost/boom", "namespace": "boom.foo", "debug": true})
sink1 := transporter.NewNode("foofile", "file", adaptor.Config{"uri": "stdout://"})
sink2 := transporter.NewNode("foofile2", "file", adaptor.Config{"uri": "stdout://"})
source.Add(sink1)
source.Add(sink2)

func NewNode

func NewNode(name, kind, ns string, a adaptor.Adaptor, parent *Node) (*Node, error)

NewNode creates a new Node struct

func (*Node) Endpoints

func (n *Node) Endpoints() map[string]string

Endpoints recurses down the node tree and accumulates a map associating node name with node type this is primarily used with the boot event

func (*Node) Path

func (n *Node) Path() string

Path returns a string representation of the names of all the node's parents concatenated with "/" used in metrics eg. for the following tree source := transporter.NewNode("name1", "mongo", adaptor.Config{"uri": "mongodb://localhost/boom", "namespace": "boom.foo", "debug": true})

sink1 := transporter.NewNode("foofile", "file", adaptor.Config{"uri": "stdout://"})
source.Add(sink1)

'source' will have a Path of 'name1', and 'sink1' will have a path of 'name1/sink1'

func (*Node) Start

func (n *Node) Start() error

Start starts the nodes children in a go routine, and then runs either Start() or Listen() on the node's adaptor. Root nodes (nodes with no parent) will run Start() and will emit messages to it's children, All descendant nodes run Listen() on the adaptor

func (*Node) Stop

func (n *Node) Stop()

Stop this node's adaptor, and sends a stop to each child of this node

func (*Node) String

func (n *Node) String() string

String

func (*Node) Validate

func (n *Node) Validate() bool

Validate ensures that the node tree conforms to a proper structure. Node trees must have at least one source, and one sink. dangling transformers are forbidden. Validate only knows about default adaptors in the adaptor package, it can't validate any custom adaptors

type Pipeline

type Pipeline struct {

	// Err is the fatal error that was sent from the adaptor
	// that caused us to stop this process.  If this is nil, then
	// the transporter is running
	Err error
	// contains filtered or unexported fields
}

A Pipeline is a the end to end description of a transporter data flow. including the source, sink, and all the transformers along the way

func NewDefaultPipeline

func NewDefaultPipeline(source *Node, uri, key, pid, version string, interval time.Duration) (*Pipeline, error)

NewDefaultPipeline returns a new Transporter Pipeline with the given node tree, and uses the events.HttpPostEmitter to deliver metrics. eg.

  source :=
  	transporter.NewNode("source", "mongo", adaptor.Config{"uri": "mongodb://localhost/", "namespace": "boom.foo", "debug": false, "tail": true}).
	  	Add(transporter.NewNode("out", "file", adaptor.Config{"uri": "stdout://"}))
  pipeline, err := transporter.NewDefaultPipeline(source, events.Api{URI: "http://localhost/endpoint"}, 1*time.Second)
  if err != nil {
	  fmt.Println(err)
	  os.Exit(1)
  }

pipeline.Run()

func NewPipeline

func NewPipeline(version string, source *Node, emit events.EmitFunc, interval time.Duration) (*Pipeline, error)

NewPipeline creates a new Transporter Pipeline using the given tree of nodes, and Event Emitter eg.

  source :=
  	transporter.NewNode("source", "mongo", adaptor.Config{"uri": "mongodb://localhost/", "namespace": "boom.foo", "debug": false, "tail": true}).
	  	Add(transporter.NewNode("out", "file", adaptor.Config{"uri": "stdout://"}))
  pipeline, err := transporter.NewPipeline("version", source, events.NewNoopEmitter(), 1*time.Second)
  if err != nil {
	  fmt.Println(err)
	  os.Exit(1)
  }

pipeline.Run()

func (*Pipeline) Run

func (pipeline *Pipeline) Run() error

Run the pipeline

func (*Pipeline) Stop

func (pipeline *Pipeline) Stop()

Stop sends a stop signal to the emitter and all the nodes, whether they are running or not. the node's database adaptors are expected to clean up after themselves, and stop will block until all nodes have stopped successfully

func (*Pipeline) String

func (pipeline *Pipeline) String() string

type Transform added in v0.3.0

type Transform struct {
	Name     string
	Fn       function.Function
	NsFilter *regexp.Regexp
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL