Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config interface { PluginDefinitions() []PluginDefinition AppendPluginDefinition(pdef PluginDefinition) error OnPluginDefinitionAdded(func(PluginDefinition)) OnPluginDefinitionActivated(func(PluginDefinition)) OnPluginDefinitionDeactivated(func(PluginDefinition)) Activate(pdef PluginDefinition) Deactivate(pdef PluginDefinition) }
Config is an abstraction for configuration settings, usually loaded from a TOML based configuration file. It describes which plugins to use, in which order they run, and how to handle plugin settings. `inherit_settings_from_env` is a boolean that indicates if there are plugin settings available as environment variables. If `true` then any plugin setting will first try to load from it's corresponding environment variable, and secondly try to load from a specified `settings_file` for the plugin.
Example of a config file: ----- inherit_settings_from_env = true
[plugin] name = "kafka-stream-reader" path = "/usr/share/ouretl/plugins/ouretl-plugin-kafka-stream-reader.so.1.0.0" version = "1.0.0" priority = 1 settings_file = "/usr/share/ouretl/config-kafka-stream-reader.toml"
[plugin] name = "elasticsearch-writer" path = "/usr/share/ouretl/plugins/ouretl-plugin-elasticsearch-writer.so.4.1.0" version = "4.1.0" priority = 10 settings_file = "/usr/share/ouretl/config-elasticsearch-writer.toml" -----
Example of a plugin settings file: ----- string_variable = "value" int_variable = 1 -----
type DataHandlerPlugin ¶
type DataHandlerPlugin interface {
Handle(dm DataMessage, next func([]byte) error) error
}
DataHandlerPlugin defines a handler for a data message, where any type of handling can occur. A DataHandlerPlugin can be used either as a data transformation step or as a ETL `sink`. When the plugin has completed it's job, it should always call the provided `next` function with current state of the data message - unless an error occur. The `next` function, available as a function parameter, is a caller for the next plugin in line.
Example: ----- package main
type pluginDef struct{}
type jsonMessage struct { DataMessage string `json:"data"` ReceivedAt string `json:"received_at"` }
func GetHandler(_ ouretl.Config, _ ouretl.PluginSettings) (ouretl.DataHandlerPlugin, error) { return &pluginDef{} }
func (def *pluginDef) Handle(data ouretl.DataMessage, next func([]byte) error) error { s := string(data.Content()) m := &jsonMessage{ DataMessage: s, ReceivedAt: time.Now().Local().Format("2006-01-02 15:04:05"), } output, err := json.Marshal(m) if err != nil { return err } return next(output) }
-----
type DataMessage ¶
DataMessage is a wrapper abstraction for the raw message provided to the DataHandlerPlugins. Apart from the actual data content, the message has an `Origin` field containing a WorkerPlugin name and an `ID` field with a unique message ID.
type PluginDefinition ¶
type PluginDefinition interface { Name() string FilePath() string Version() string Priority() int IsActive() bool Settings() PluginSettings }
PluginDefinition is an abstraction for a plugin, and describes its runtime behavior and load it
type PluginSettings ¶
PluginSettings is an abstraction to access plugin specific settings, simply pass the setting name to extract the correct value, either from an environment variable or from the plugins settings file. Casting to the actual type is necessary when retrieving a setting value; ----- myValue, ok := psettings.Get("int_variable")
if ok { myIntValue := myValue.(int) }
-----
type WorkerPlugin ¶
WorkerPlugin defines a worker (or ETL `source`) that's always running, under ouretl-core. A worker plugin can choose to publish messages into the plugin pipeline, using the provided `target` function.
Example: ----- package main
type pluginDef struct{ portNumber string }
func GetWorker(_ ouretl.Config, settings ouretl.PluginSettings) (ouretl.WorkerPlugin, error) { return &pluginDef{ portNumber: settings.Get("PortSettings"), } }
func (def *pluginDef) Start(target func([]byte)) error { router := httprouter.New() router.POST("/", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { defer r.Body.Close() message, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return } target(message) w.WriteHeader(http.StatusAccepted) }) addr := fmt.Sprintf(":%s", def.portNumber) return http.ListenAndServe(addr, router) }
-----