engine

package
v0.0.0-...-c6e60ff Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2017 License: Apache-2.0 Imports: 35 Imported by: 4

Documentation

Overview

Package engine provides a plugin based pipeline engine that decouples Input/Filter/Output plugins.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidParam = errors.New("invalid param")
	ErrQuitingSigal = errors.New("engine received quit signal")
)
View Source
var (

	// Globals returns the global configurations of dbus.
	// We export func instead of var to prevent the global var from being overwritten.
	Globals func() *GlobalConfig
)

Functions

func RegisterPlugin

func RegisterPlugin(name string, factory func() Plugin)

RegisterPlugin allows plugin to register itself to the engine. If duplicated name found, panic!

Types

type APIHandler

type APIHandler func(w http.ResponseWriter, req *http.Request, params map[string]interface{}) (interface{}, error)

APIHandler is the HTTP API server handler signature.

type Acker

type Acker interface {
	Ack(*Packet) error
}

Acker is a callback interface that is called when a packet is processed successfully.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is a lock-free ringbuffer that is 2X faster than golang channel. However, it is lacking in the rich features of golang channel.

func NewChannel

func NewChannel() *Channel

func (*Channel) Get

func (ch *Channel) Get() interface{}

func (*Channel) Put

func (ch *Channel) Put(value interface{})

type DAGBuilder

type DAGBuilder struct {
	*conf.Conf
}

DAGBuilder is the builder that helps to build a pipeline DAG.

Example:

package main

import (
	"time"

	"github.com/funkygao/dbus/engine"
)

func main() {
	builder := engine.NewDAGBuilder()
	builder.Input("in.mock", "MockInput").
	    Filter("filter.mock", "MockFilter", "in.mock").
		Output("out.mock", "MockOutput", "filter.mock").
		    SetDuration("sleep", time.Second)

	engine.New(nil).SubmitDAG(builder.CreateDAG()).ServeForever()
}

func NewDAGBuilder

func NewDAGBuilder() *DAGBuilder

NewDAGBuilder creates a DAGBuilder.

func (*DAGBuilder) BuildDAG

func (b *DAGBuilder) BuildDAG() *conf.Conf

func (*DAGBuilder) Filter

func (b *DAGBuilder) Filter(name, class string, matches ...string) *DAGBuilder

func (*DAGBuilder) Input

func (b *DAGBuilder) Input(name, class string) *DAGBuilder

func (*DAGBuilder) Output

func (b *DAGBuilder) Output(name, class string, matches ...string) *DAGBuilder

func (*DAGBuilder) Validate

func (b *DAGBuilder) Validate() error

type Engine

type Engine struct {
	sync.RWMutex
	*conf.Conf

	InputRunners map[string]*iRunner

	FilterRunners map[string]FilterRunner

	OutputRunners map[string]OutputRunner
	// contains filtered or unexported fields
}

Engine is the pipeline engine of the data bus system which manages the core loop.

func New

func New(globals *GlobalConfig) *Engine

New creates an engine.

func (*Engine) ClonePacket

func (e *Engine) ClonePacket(p *Packet) *Packet

ClonePacket is used for plugin Filter to generate new Packet: copy on write. The generated Packet will use dedicated filter recycle chan.

func (*Engine) ClusterManager

func (e *Engine) ClusterManager() cluster.Manager

ClusterManager returns the cluster manager. If cluster is disabled, returns nil.

func (*Engine) ExportDiagram

func (e *Engine) ExportDiagram(outfile string)

ExportDiagram exports the pipeline dataflow to a diagram.

func (*Engine) LoadFrom

func (e *Engine) LoadFrom(loc string) *Engine

LoadFrom load the configuration by location. The location can be empty: use default zk zone /dbus/conf. If config is stored on file, the loc arg is file path. If config is stored on zookeeper, the loc arg is like localhost:2181/foo/bar.

func (*Engine) RegisterAPI

func (e *Engine) RegisterAPI(path string, handlerFunc APIHandler) *mux.Route

func (*Engine) ServeForever

func (e *Engine) ServeForever() (ret error)

func (*Engine) Shutdown

func (e *Engine) Shutdown()

func (*Engine) SubmitDAG

func (e *Engine) SubmitDAG(cf *conf.Conf) *Engine

SubmitDAG submits a DAG configuration to the engine.

type Exchange

type Exchange interface {

	// InChan returns input channel from which Inputs can get reusable Packets.
	// The returned channel will be closed when engine stops.
	InChan() <-chan *Packet

	// Emit emits Packet into engine for consumers.
	Emit(pack *Packet)
}

Exchange is the packet tranport channel between plugins. Packet flows all through Exchange.

type Filter

type Filter interface {
	Plugin

	// Run starts the main loop of the Filter plugin.
	Run(r FilterRunner, h PluginHelper) (err error)
}

Filter is the filter plugin.

type FilterOutputRunner

type FilterOutputRunner interface {
	PluginRunner
	// contains filtered or unexported methods
}

FilterOutputRunner is the common interface shared by FilterRunner and OutputRunner.

type FilterRunner

type FilterRunner interface {
	FilterOutputRunner

	// Filter returns the underlying Filter plugin.
	Filter() Filter
}

FilterRunner is a helper for Filter plugin to access some context data.

type GlobalConfig

type GlobalConfig struct {
	*conf.Conf

	Zone    string
	Cluster string

	StartedAt      time.Time
	Debug          bool
	ClusterEnabled bool
	RouterTrack    bool
	WatchdogTick   time.Duration

	RPCPort int
	APIPort int

	InputRecyclePoolSize  int
	FilterRecyclePoolSize int
	HubChanSize           int
	PluginChanSize        int
	// contains filtered or unexported fields
}

GlobalConfig is the struct for holding global config values.

func DefaultGlobals

func DefaultGlobals() *GlobalConfig

func (*GlobalConfig) GetOrRegisterZkzone

func (g *GlobalConfig) GetOrRegisterZkzone(zone string) *zk.ZkZone

type Input

type Input interface {
	Plugin

	Acker

	// End notifies Input plugin it is safe to close the Acker.
	// When Input stops, Filter|Output might still depend on Input ack, that is what
	// End for.
	End(r InputRunner)

	// Run starts the main loop of the Input plugin.
	Run(r InputRunner, h PluginHelper) (err error)
}

Input is the input plugin.

type InputRunner

type InputRunner interface {
	PluginRunner

	// Input returns the associated Input plugin object.
	Input() Input

	// Stopper returns a channel for plugins to get notified when engine stops.
	Stopper() <-chan struct{}

	// Resources returns a channel that notifies Input plugin of the newly assigned resources in a cluster.
	// The newly assigned resource might be empty, which means the Input plugin should stop consuming the resource.
	Resources() <-chan []cluster.Resource
}

InputRunner is a helper for Input plugin to access some context data.

type KeyValuer

type KeyValuer interface {

	// Get returns value of the key.
	Get(k string) (v interface{}, ok bool)

	// Set stores a value for the key.
	Set(k string, v interface{})
}

KeyValuer is an interface that can be applied on Payloader.

type Output

type Output interface {
	Plugin

	// Run starts the main loop of the Output plugin.
	Run(r OutputRunner, h PluginHelper) (err error)
}

Output is the output plugin.

type OutputRunner

type OutputRunner interface {
	FilterOutputRunner

	// Output returns the underlying Output plugin.
	Output() Output

	// Ack notifies the packet's source Input plugin that it is processed successfully.
	Ack(*Packet) error
}

OutputRunner is a helper for Output plugin to access some context data.

type Packet

type Packet struct {

	// Ident is used for routing.
	Ident string

	// Metadata is used to hold arbitrary data you wish to include.
	// Engine completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}

	Payload Payloader
	// contains filtered or unexported fields
}

Packet is the pipeline data structure that is transferred between plugins.

TODO hide it to private.

func (*Packet) Recycle

func (p *Packet) Recycle()

Recycle decrement packet reference count and place it back to its recycle pool if possible.

func (*Packet) Reset

func (p *Packet) Reset()

func (*Packet) String

func (p *Packet) String() string

type Pauser

type Pauser interface {
	Pause(InputRunner) error
	Resume(InputRunner) error
}

Pauser is used for Input plugin. If a Plugin implements Pauser, it can pause/resume.

type Payloader

type Payloader interface {
	// Length returns the size of the payload in bytes.
	Length() int

	// Bytes returns the marshalled byte array of the payload.
	Encode() ([]byte, error)
}

Payloader defines the contract of Packet payload. Any plugin transferrable data must implement this interface.

type Plugin

type Plugin interface {

	// Init is called when engine is initializing the plugin.
	Init(config *conf.Conf)

	// SampleConfig returns a sample config section for this plugin.
	SampleConfig() string
}

Plugin is the base interface for all plugins.

type PluginHelper

type PluginHelper interface {

	// ClonePacket is used for plugin Filter to generate new Packet.
	ClonePacket(*Packet) *Packet

	// RegisterAPI allows plugins to register handlers on the global API server.
	RegisterAPI(path string, handlerFunc APIHandler) *mux.Route
}

PluginHelper is a helper for plugins to access partial functions of the singleton Engine.

type PluginRunner

type PluginRunner interface {

	// Name returns the name of the underlying plugin.
	Name() string

	// Class returns the class name of the underlying plugin.
	Class() string

	// Exchange returns an Exchange for the plugin to exchange packets.
	Exchange() Exchange

	// Plugin returns the underlying plugin object.
	Plugin() Plugin

	// Conf returns the underlying plugin specific configuration.
	Conf() *conf.Conf

	// SampleConfigItems returns a list of sample config items for the underlying plugin.
	SampleConfigItems() []string
	// contains filtered or unexported methods
}

PluginRunner is the base interface for the plugin runners.

type Restarter

type Restarter interface {
	CleanupForRestart() bool
}

Restarter is used for plugin for callback when the plugin restarts. Return value determines whether restart it or run once.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router is the router/hub shared among all plugins which dispatches packet along the plugins.

A normal packet lifecycle:

    +--------->---------------+
    |                         |
    |                         |
    |          +-------------------------------+
    |          |                               |
    |   +-------------+                 +-------------+
   	|   | | | | | | | | Input(ipool)    | | | | | | | | Input(ipool)
    |   +-------------+                 +-------------+
    |          |                               |
    |          +-------------------------------+
    |                          |
    |                   +-------------+
    |                   | | | | | | | | Hub(hpool) <------------------------------------+
    |                   +-------------+													|
    |                          |														|
    |          |-------------------------------------+									|
    |          |                                     |									|
    |   +-------------+                       +-------------+							|
    |   | | | | | | | | Output/Filter(ppool)  | | | | | | | | Output/Filter(ppool)		|
    |   +-------------+                       +-------------+							|
    |          |                                     |									|
    |          +-------------------------------------+									|
    |                        |    |														|
	+-------<----------------+	  +------------->---------------------------------------+
	     Recycle                             Emit

A normal cloned packet lifecycle:

        +-------------+
   	+-- | | | | | | | | Engine(fpool)
    |   +-------------+
    |          |
    |          | ClonePacket
    |          |
    |   +-------------+
    |   | | | | | | | | Filter
    |   +-------------+
    |          |
    |   +-------------+
    |   | | | | | | | | Output/Filter(ppool)
    |   +-------------+
    |          |
	+-------<--+
	     Recycle

func (*Router) Start

func (r *Router) Start(wg *sync.WaitGroup)

Start starts the router: dispatch pack from Input to MatchRunners.

func (*Router) Stop

func (r *Router) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL