Documentation ¶
Overview ¶
Package engine provides a plugin based pipeline engine that decouples Input/Filter/Output plugins.
Index ¶
- Variables
- func RegisterPlugin(name string, factory func() Plugin)
- type APIHandler
- type Acker
- type Channel
- type DAGBuilder
- type Engine
- func (e *Engine) ClonePacket(p *Packet) *Packet
- func (e *Engine) ClusterManager() cluster.Manager
- func (e *Engine) ExportDiagram(outfile string)
- func (e *Engine) LoadFrom(loc string) *Engine
- func (e *Engine) RegisterAPI(path string, handlerFunc APIHandler) *mux.Route
- func (e *Engine) ServeForever() (ret error)
- func (e *Engine) Shutdown()
- func (e *Engine) SubmitDAG(cf *conf.Conf) *Engine
- type Exchange
- type Filter
- type FilterOutputRunner
- type FilterRunner
- type GlobalConfig
- type Input
- type InputRunner
- type KeyValuer
- type Output
- type OutputRunner
- type Packet
- type Pauser
- type Payloader
- type Plugin
- type PluginHelper
- type PluginRunner
- type Restarter
- type Router
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidParam = errors.New("invalid param") ErrQuitingSigal = errors.New("engine received quit signal") )
var ( // Globals returns the global configurations of dbus. // We export func instead of var to prevent the global var from being overwritten. Globals func() *GlobalConfig )
Functions ¶
func RegisterPlugin ¶
RegisterPlugin allows plugin to register itself to the engine. If duplicated name found, panic!
Types ¶
type APIHandler ¶
type APIHandler func(w http.ResponseWriter, req *http.Request, params map[string]interface{}) (interface{}, error)
APIHandler is the HTTP API server handler signature.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel is a lock-free ringbuffer that is 2X faster than golang channel. However, it is lacking in the rich features of golang channel.
func NewChannel ¶
func NewChannel() *Channel
type DAGBuilder ¶
DAGBuilder is the builder that helps to build a pipeline DAG.
Example:
package main import ( "time" "github.com/funkygao/dbus/engine" ) func main() { builder := engine.NewDAGBuilder() builder.Input("in.mock", "MockInput"). Filter("filter.mock", "MockFilter", "in.mock"). Output("out.mock", "MockOutput", "filter.mock"). SetDuration("sleep", time.Second) engine.New(nil).SubmitDAG(builder.CreateDAG()).ServeForever() }
func (*DAGBuilder) BuildDAG ¶
func (b *DAGBuilder) BuildDAG() *conf.Conf
func (*DAGBuilder) Filter ¶
func (b *DAGBuilder) Filter(name, class string, matches ...string) *DAGBuilder
func (*DAGBuilder) Input ¶
func (b *DAGBuilder) Input(name, class string) *DAGBuilder
func (*DAGBuilder) Output ¶
func (b *DAGBuilder) Output(name, class string, matches ...string) *DAGBuilder
func (*DAGBuilder) Validate ¶
func (b *DAGBuilder) Validate() error
type Engine ¶
type Engine struct { sync.RWMutex *conf.Conf InputRunners map[string]*iRunner FilterRunners map[string]FilterRunner OutputRunners map[string]OutputRunner // contains filtered or unexported fields }
Engine is the pipeline engine of the data bus system which manages the core loop.
func (*Engine) ClonePacket ¶
ClonePacket is used for plugin Filter to generate new Packet: copy on write. The generated Packet will use dedicated filter recycle chan.
func (*Engine) ClusterManager ¶
ClusterManager returns the cluster manager. If cluster is disabled, returns nil.
func (*Engine) ExportDiagram ¶
ExportDiagram exports the pipeline dataflow to a diagram.
func (*Engine) LoadFrom ¶
LoadFrom load the configuration by location. The location can be empty: use default zk zone /dbus/conf. If config is stored on file, the loc arg is file path. If config is stored on zookeeper, the loc arg is like localhost:2181/foo/bar.
func (*Engine) RegisterAPI ¶
func (e *Engine) RegisterAPI(path string, handlerFunc APIHandler) *mux.Route
func (*Engine) ServeForever ¶
type Exchange ¶
type Exchange interface { // InChan returns input channel from which Inputs can get reusable Packets. // The returned channel will be closed when engine stops. InChan() <-chan *Packet // Emit emits Packet into engine for consumers. Emit(pack *Packet) }
Exchange is the packet tranport channel between plugins. Packet flows all through Exchange.
type Filter ¶
type Filter interface { Plugin // Run starts the main loop of the Filter plugin. Run(r FilterRunner, h PluginHelper) (err error) }
Filter is the filter plugin.
type FilterOutputRunner ¶
type FilterOutputRunner interface { PluginRunner // contains filtered or unexported methods }
FilterOutputRunner is the common interface shared by FilterRunner and OutputRunner.
type FilterRunner ¶
type FilterRunner interface { FilterOutputRunner // Filter returns the underlying Filter plugin. Filter() Filter }
FilterRunner is a helper for Filter plugin to access some context data.
type GlobalConfig ¶
type GlobalConfig struct { *conf.Conf Zone string Cluster string StartedAt time.Time Debug bool ClusterEnabled bool RouterTrack bool WatchdogTick time.Duration RPCPort int APIPort int InputRecyclePoolSize int FilterRecyclePoolSize int HubChanSize int PluginChanSize int // contains filtered or unexported fields }
GlobalConfig is the struct for holding global config values.
func DefaultGlobals ¶
func DefaultGlobals() *GlobalConfig
func (*GlobalConfig) GetOrRegisterZkzone ¶
func (g *GlobalConfig) GetOrRegisterZkzone(zone string) *zk.ZkZone
type Input ¶
type Input interface { Plugin Acker // End notifies Input plugin it is safe to close the Acker. // When Input stops, Filter|Output might still depend on Input ack, that is what // End for. End(r InputRunner) // Run starts the main loop of the Input plugin. Run(r InputRunner, h PluginHelper) (err error) }
Input is the input plugin.
type InputRunner ¶
type InputRunner interface { PluginRunner // Input returns the associated Input plugin object. Input() Input // Stopper returns a channel for plugins to get notified when engine stops. Stopper() <-chan struct{} // Resources returns a channel that notifies Input plugin of the newly assigned resources in a cluster. // The newly assigned resource might be empty, which means the Input plugin should stop consuming the resource. Resources() <-chan []cluster.Resource }
InputRunner is a helper for Input plugin to access some context data.
type KeyValuer ¶
type KeyValuer interface { // Get returns value of the key. Get(k string) (v interface{}, ok bool) // Set stores a value for the key. Set(k string, v interface{}) }
KeyValuer is an interface that can be applied on Payloader.
type Output ¶
type Output interface { Plugin // Run starts the main loop of the Output plugin. Run(r OutputRunner, h PluginHelper) (err error) }
Output is the output plugin.
type OutputRunner ¶
type OutputRunner interface { FilterOutputRunner // Output returns the underlying Output plugin. Output() Output // Ack notifies the packet's source Input plugin that it is processed successfully. Ack(*Packet) error }
OutputRunner is a helper for Output plugin to access some context data.
type Packet ¶
type Packet struct { // Ident is used for routing. Ident string // Metadata is used to hold arbitrary data you wish to include. // Engine completely ignores this field and is only to be used for // pass-through data. Metadata interface{} Payload Payloader // contains filtered or unexported fields }
Packet is the pipeline data structure that is transferred between plugins.
TODO hide it to private.
type Pauser ¶
type Pauser interface { Pause(InputRunner) error Resume(InputRunner) error }
Pauser is used for Input plugin. If a Plugin implements Pauser, it can pause/resume.
type Payloader ¶
type Payloader interface { // Length returns the size of the payload in bytes. Length() int // Bytes returns the marshalled byte array of the payload. Encode() ([]byte, error) }
Payloader defines the contract of Packet payload. Any plugin transferrable data must implement this interface.
type Plugin ¶
type Plugin interface { // Init is called when engine is initializing the plugin. Init(config *conf.Conf) // SampleConfig returns a sample config section for this plugin. SampleConfig() string }
Plugin is the base interface for all plugins.
type PluginHelper ¶
type PluginHelper interface { // ClonePacket is used for plugin Filter to generate new Packet. ClonePacket(*Packet) *Packet // RegisterAPI allows plugins to register handlers on the global API server. RegisterAPI(path string, handlerFunc APIHandler) *mux.Route }
PluginHelper is a helper for plugins to access partial functions of the singleton Engine.
type PluginRunner ¶
type PluginRunner interface { // Name returns the name of the underlying plugin. Name() string // Class returns the class name of the underlying plugin. Class() string // Exchange returns an Exchange for the plugin to exchange packets. Exchange() Exchange // Plugin returns the underlying plugin object. Plugin() Plugin // Conf returns the underlying plugin specific configuration. Conf() *conf.Conf // SampleConfigItems returns a list of sample config items for the underlying plugin. SampleConfigItems() []string // contains filtered or unexported methods }
PluginRunner is the base interface for the plugin runners.
type Restarter ¶
type Restarter interface {
CleanupForRestart() bool
}
Restarter is used for plugin for callback when the plugin restarts. Return value determines whether restart it or run once.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router is the router/hub shared among all plugins which dispatches packet along the plugins.
A normal packet lifecycle:
+--------->---------------+ | | | | | +-------------------------------+ | | | | +-------------+ +-------------+ | | | | | | | | | Input(ipool) | | | | | | | | Input(ipool) | +-------------+ +-------------+ | | | | +-------------------------------+ | | | +-------------+ | | | | | | | | | Hub(hpool) <------------------------------------+ | +-------------+ | | | | | |-------------------------------------+ | | | | | | +-------------+ +-------------+ | | | | | | | | | | Output/Filter(ppool) | | | | | | | | Output/Filter(ppool) | | +-------------+ +-------------+ | | | | | | +-------------------------------------+ | | | | | +-------<----------------+ +------------->---------------------------------------+ Recycle Emit
A normal cloned packet lifecycle:
+-------------+ +-- | | | | | | | | Engine(fpool) | +-------------+ | | | | ClonePacket | | | +-------------+ | | | | | | | | | Filter | +-------------+ | | | +-------------+ | | | | | | | | | Output/Filter(ppool) | +-------------+ | | +-------<--+ Recycle
Source Files ¶
- admin_v1.go
- api_v1.go
- api_v2.go
- apiserver.go
- builder.go
- channel.go
- diagram.go
- doc.go
- engine.go
- errors.go
- exchange.go
- filter.go
- global.go
- helper.go
- input.go
- matcher.go
- metrics.go
- output.go
- packet.go
- plugin.go
- rebalance.go
- router.go
- rpc_v1.go
- rpc_v2.go
- rpcserver.go
- runner.go
- sos.go
- upgrade.go
- util.go
- watchdog.go