Documentation ¶
Overview ¶
Package module contains the low-level utilities for running Metricbeat modules and metricsets. This is useful for building your own tool that has a module and sub-module concept. If you want to reuse the whole Metricbeat framework see the github.com/elastic/beats/metricbeat/beater package that provides a higher level interface.
This contains the tools for instantiating modules, running them, and connecting their outputs to the Beat's output pipeline.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PublishChannels ¶
PublishChannels publishes the events read from each channel to the given publisher client. If the publisher client blocks for any reason then events will not be read from the given channels.
This method blocks until all of the channels have been closed and are fully read. To stop the method immediately, close the channels and close the publisher client to ensure that publishing does not block. This may result is some events being discarded.
Types ¶
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector configures and establishes a beat.Client for publishing events to the publisher pipeline.
func NewConnector ¶
type Factory ¶
type Factory struct {
// contains filtered or unexported fields
}
Factory creates new Runner instances from configuration objects. It is used to register and reload modules.
func NewFactory ¶
NewFactory creates new Reloader instance for the given config
func (*Factory) CheckConfig ¶
CheckConfig checks if a config is valid or not
type Option ¶
type Option func(wrapper *Wrapper)
Option specifies some optional arguments used for configuring the behavior of a module Wrapper.
func WithEventModifier ¶
func WithEventModifier(modifier mb.EventModifier) Option
WithEventModifier attaches an EventModifier that will be executed for each event generated by the MetricSets of the module. Multiple EventModifiers can be added and they will be executed in the order in which they were added.
func WithMaxStartDelay ¶
WithMaxStartDelay specifies the upper bound for the random startup delay for each MetricSet in the module. By default there is no delay.
func WithMetricSetInfo ¶
func WithMetricSetInfo() Option
WithMetricSetInfo attaches an EventModifier that adds information about the MetricSet that generated the event. It will always add the metricset and module names. And it will add the host and rtt (round-trip time in microseconds) values if they are non-zero values.
"metricset": { "host": "apache", "module": "apache", "name": "status", "rtt": 115 }
func WithServiceName ¶
func WithServiceName() Option
WithServiceName sets the `service.name` field of the event with the value given to the `service.name` setting in the module configuration.
type Runner ¶
type Runner interface { // fmt.Stringer is required here because when used as a cfgfile.Runner // we need a way to print a threadsafe set of fields since it will likely // be printed from a concurrent goroutine. fmt.Stringer // Start starts the Module. If Start is called more than once, only the // first will start the Module. Start() // Stop stops the Module and waits for module's MetricSets to exit. The // publisher.Client will be closed by Stop. If Stop is called more than // once, only the first stop the Module and wait for it to exit. Stop() }
Runner is a facade for a Wrapper that provides a simple interface for starting and stopping a Module.
Example ¶
ExampleRunner demonstrates how to use Runner to start and stop a module.
// A *beat.Beat is injected into a Beater when it runs and contains the // Publisher used to publish events. This Beat pointer is created here only // for demonstration purposes. var b *beat.Beat config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, "metricsets": []string{eventFetcherName}, }) if err != nil { return } // Create a new Wrapper based on the configuration. m, err := module.NewWrapper(config, mb.Registry, module.WithMetricSetInfo()) if err != nil { return } connector, err := module.NewConnector(b.Publisher, config, nil) if err != nil { return } client, err := connector.Connect() if err != nil { return } // Create the Runner facade. runner := module.NewRunner(client, m) // Start the module and have it publish to a new publisher.Client. runner.Start() // Stop the module. This blocks until all MetricSets in the Module have // stopped and the publisher.Client is closed. runner.Stop()
Output:
type Wrapper ¶
Wrapper contains the Module and the private data associated with running the Module and its MetricSets.
Use NewWrapper or NewWrappers to construct new Wrappers.
Example ¶
ExampleWrapper demonstrates how to create a single Wrapper from configuration, start the module, and consume events generated by the module.
// Build a configuration object. config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, "metricsets": []string{eventFetcherName}, }) if err != nil { fmt.Println("Error:", err) return } // Create a new Wrapper based on the configuration. m, err := module.NewWrapper(config, mb.Registry, module.WithMetricSetInfo()) if err != nil { fmt.Println("Error:", err) return } // Run the module until done is closed. done := make(chan struct{}) output := m.Start(done) // Process events from the output channel until it is closed. var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for event := range output { event.Fields.Put("event.duration", 111) output, err := encodeEvent(event) if err == nil { fmt.Println(output) } } }() // Simulate running for a while. time.Sleep(50 * time.Millisecond) // When finished with the module, close the done channel. When the Module // stops it will automatically close its output channel so that the output // for loop stops. close(done) wg.Wait()
Output: { "@metadata": { "beat": "noindex", "type": "_doc", "version": "1.2.3" }, "@timestamp": "2016-05-10T23:27:58.485Z", "event": { "dataset": "fake.eventfetcher", "duration": 111, "module": "fake" }, "fake": { "eventfetcher": { "metric": 1 } }, "metricset": { "name": "eventfetcher", "period": 10000 }, "service": { "type": "fake" } }
func NewWrapper ¶
NewWrapper create a new Module and its associated MetricSets based on the given configuration.
func (*Wrapper) MetricSets ¶
func (mw *Wrapper) MetricSets() []*metricSetWrapper
MetricSets return the list of metricsets of the module
func (*Wrapper) Start ¶
Start starts the Module's MetricSet workers which are responsible for fetching metrics. The workers will continue to periodically fetch until the done channel is closed. When the done channel is closed all MetricSet workers will stop and the returned output channel will be closed.
The returned channel is buffered with a length one one. It must drained to prevent blocking the operation of the MetricSets.
Start should be called only once in the life of a Wrapper.