Documentation ¶
Index ¶
- Constants
- Variables
- func AddDecoupleStep(p *bitflow.SamplePipeline, params map[string]interface{}) error
- func AddTagChangeListenerParams(step *reg.RegisteredStep)
- func AppendToSample(s *bitflow.Sample, values []float64)
- func FillSample(s *bitflow.Sample, values []float64)
- func FillSampleFromMatrix(s *bitflow.Sample, row int, mat *mat.Dense)
- func FillSamplesFromMatrix(s []*bitflow.Sample, mat *mat.Dense)
- func GetMinMax(header *bitflow.Header, samples []*bitflow.Sample) ([]float64, []float64)
- func IsValidNumber(val float64) bool
- func MakeBatchProcessor(params map[string]interface{}) (res *bitflow.BatchProcessor, err error)
- func NewSampleShuffler() *bitflow.SimpleBatchProcessingStep
- func NewTagMapper(sourceTag, targetTag string, mapping map[string]string) bitflow.SampleProcessor
- func NewTaggingProcessor(tags map[string]string) bitflow.SampleProcessor
- func RegisterAppendTimeDifference(b reg.ProcessorRegistry)
- func RegisterConsoleBoxOutput(e *bitflow.EndpointFactory)
- func RegisterDecouple(b reg.ProcessorRegistry)
- func RegisterDrop(b reg.ProcessorRegistry)
- func RegisterDropErrorsStep(b reg.ProcessorRegistry)
- func RegisterDropInvalid(b reg.ProcessorRegistry)
- func RegisterDuplicateTimestampFilter(b reg.ProcessorRegistry)
- func RegisterDynamicSource(factory *bitflow.EndpointFactory)
- func RegisterExcludeMetricsFilter(b reg.ProcessorRegistry)
- func RegisterExpression(b reg.ProcessorRegistry)
- func RegisterFillUpStep(b reg.ProcessorRegistry)
- func RegisterFilterExpression(b reg.ProcessorRegistry)
- func RegisterForks(b reg.ProcessorRegistry)
- func RegisterGeneratorSource(factory *bitflow.EndpointFactory)
- func RegisterGraphiteOutput(b reg.ProcessorRegistry)
- func RegisterHttpTagger(b reg.ProcessorRegistry)
- func RegisterIncludeMetricsFilter(b reg.ProcessorRegistry)
- func RegisterLoggingSteps(b reg.ProcessorRegistry)
- func RegisterMergeHeaders(b reg.ProcessorRegistry)
- func RegisterMetricMapper(b reg.ProcessorRegistry)
- func RegisterMetricRenamer(b reg.ProcessorRegistry)
- func RegisterMetricSplitter(b reg.ProcessorRegistry)
- func RegisterNoop(b reg.ProcessorRegistry)
- func RegisterOpentsdbOutput(b reg.ProcessorRegistry)
- func RegisterOutputFiles(b reg.ProcessorRegistry)
- func RegisterParseTags(b reg.ProcessorRegistry)
- func RegisterPauseTagger(b reg.ProcessorRegistry)
- func RegisterPickHead(b reg.ProcessorRegistry)
- func RegisterPickPercent(b reg.ProcessorRegistry)
- func RegisterPickTail(b reg.ProcessorRegistry)
- func RegisterPipelineRateSynchronizer(b reg.ProcessorRegistry)
- func RegisterPrometheusMarshaller(endpoints *bitflow.EndpointFactory)
- func RegisterResendStep(b reg.ProcessorRegistry)
- func RegisterSampleShuffler(b reg.ProcessorRegistry)
- func RegisterSampleSorter(b reg.ProcessorRegistry)
- func RegisterSetCurrentTime(b reg.ProcessorRegistry)
- func RegisterSkipHead(b reg.ProcessorRegistry)
- func RegisterSleep(b reg.ProcessorRegistry)
- func RegisterStoreStats(b reg.ProcessorRegistry)
- func RegisterStripMetrics(b reg.ProcessorRegistry)
- func RegisterSubProcessRunner(b reg.ProcessorRegistry)
- func RegisterSubpipelineStreamMerger(b reg.ProcessorRegistry)
- func RegisterTagChangeRunner(b reg.ProcessorRegistry)
- func RegisterTagMapping(b reg.ProcessorRegistry)
- func RegisterTagSynchronizer(b reg.ProcessorRegistry)
- func RegisterTaggingProcessor(b reg.ProcessorRegistry)
- func RegisterVarianceMetricsFilter(b reg.ProcessorRegistry)
- func SampleToVector(sample *bitflow.Sample) []float64
- func ScaleMinMax(val, min, max, outputMin, outputMax float64) float64
- func ScaleStddev(val float64, mean, stddev, min, max float64) float64
- func SendPeriodically(sample *bitflow.Sample, header *bitflow.Header, receiver bitflow.SampleSink, ...) golib.StopChan
- func SplitShellCommand(s string) []string
- func ValuesToVector(input []bitflow.Value) []float64
- type AbstractBatchMetricMapper
- type AbstractMetricFilter
- type AbstractMetricMapper
- type BlockManager
- func (m *BlockManager) GetList(key string) *BlockerList
- func (m *BlockManager) NewBlocker(key string) *BlockingProcessor
- func (m *BlockManager) NewReleaser(key string) *ReleasingProcessor
- func (m *BlockManager) RegisterBlockingProcessor(b reg.ProcessorRegistry)
- func (m *BlockManager) RegisterReleasingProcessor(b reg.ProcessorRegistry)
- type BlockerList
- type BlockingProcessor
- type ConsoleBoxSink
- func (sink *ConsoleBoxSink) Close()
- func (sink *ConsoleBoxSink) Sample(sample *bitflow.Sample, header *bitflow.Header) error
- func (sink *ConsoleBoxSink) Start(wg *sync.WaitGroup) golib.StopChan
- func (sink *ConsoleBoxSink) Stop()
- func (sink *ConsoleBoxSink) String() string
- func (sink *ConsoleBoxSink) WritesToConsole() bool
- type DecouplingProcessor
- type DropErrorsProcessor
- type DynamicSource
- type Expression
- type ExpressionProcessor
- func (p *ExpressionProcessor) AddExpression(expressionString string) error
- func (p *ExpressionProcessor) MergeProcessor(otherProcessor bitflow.SampleProcessor) bool
- func (p *ExpressionProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error
- func (p *ExpressionProcessor) String() string
- type FeatureStats
- type FillUpProcessor
- type GeneratorSource
- type HttpTagger
- type MetricFilter
- func (filter *MetricFilter) Exclude(regex *regexp.Regexp) *MetricFilter
- func (filter *MetricFilter) ExcludeRegex(regexStr string) (*MetricFilter, error)
- func (filter *MetricFilter) ExcludeStr(substr string) *MetricFilter
- func (filter *MetricFilter) Include(regex *regexp.Regexp) *MetricFilter
- func (filter *MetricFilter) IncludeRegex(regexStr string) (*MetricFilter, error)
- func (filter *MetricFilter) IncludeStr(substr string) *MetricFilter
- func (filter *MetricFilter) MergeProcessor(other bitflow.SampleProcessor) bool
- func (filter *MetricFilter) String() string
- type MetricMapper
- type MetricMapperHelper
- type MetricRenamer
- type MetricSplitter
- type MetricWindow
- func (w *MetricWindow) Data() []bitflow.Value
- func (w *MetricWindow) Empty() bool
- func (w *MetricWindow) FastData() []bitflow.Value
- func (w *MetricWindow) FillData(target []bitflow.Value) []bitflow.Value
- func (w *MetricWindow) Full() bool
- func (w *MetricWindow) Pop() bitflow.Value
- func (w *MetricWindow) Push(val bitflow.Value)
- func (w *MetricWindow) Size() int
- type MultiHeaderMerger
- type NoopProcessor
- type PauseTagger
- type PipelineRateSynchronizer
- type PrometheusMarshaller
- func (PrometheusMarshaller) ShouldCloseAfterFirstSample() bool
- func (PrometheusMarshaller) String() string
- func (PrometheusMarshaller) WriteHeader(header *bitflow.Header, withTags bool, output io.Writer) error
- func (m PrometheusMarshaller) WriteSample(sample *bitflow.Sample, header *bitflow.Header, withTags bool, ...) error
- type ReleasingProcessor
- type ResendProcessor
- type RestDataSource
- func (source *RestDataSource) Close()
- func (source *RestDataSource) EmitSample(sample *bitflow.Sample, header *bitflow.Header)
- func (source *RestDataSource) EmitSampleAndHeader(sample bitflow.SampleAndHeader)
- func (source *RestDataSource) EmitSampleAndHeaderTimeout(sample bitflow.SampleAndHeader, timeout time.Duration) bool
- func (source *RestDataSource) EmitSampleTimeout(sample *bitflow.Sample, header *bitflow.Header, timeout time.Duration) bool
- func (source *RestDataSource) EmitSamples(samples []bitflow.SampleAndHeader)
- func (source *RestDataSource) EmitSamplesTimeout(samples []bitflow.SampleAndHeader, timeout time.Duration) bool
- func (source *RestDataSource) Endpoint() string
- func (source *RestDataSource) Serve(verb string, path string, handlers ...gin.HandlerFunc) error
- func (source *RestDataSource) Start(wg *sync.WaitGroup) golib.StopChan
- func (source *RestDataSource) String() string
- type RestEndpoint
- type RestEndpointFactory
- type RestReplyHelpers
- func (h RestReplyHelpers) Reply(context *gin.Context, message string, statusCode int, contentType string)
- func (h RestReplyHelpers) ReplyCode(context *gin.Context, message string, statusCode int)
- func (h RestReplyHelpers) ReplyError(context *gin.Context, errorMessage string)
- func (h RestReplyHelpers) ReplyGenericError(context *gin.Context, errorMessage string)
- func (h RestReplyHelpers) ReplySuccess(context *gin.Context, message string)
- type SampleFilter
- type SampleSlice
- type SampleSorter
- type SimpleTextMarshaller
- func (SimpleTextMarshaller) ShouldCloseAfterFirstSample() bool
- func (o *SimpleTextMarshaller) String() string
- func (o *SimpleTextMarshaller) WriteHeader(header *bitflow.Header, hasTags bool, writer io.Writer) error
- func (o *SimpleTextMarshaller) WriteSample(sample *bitflow.Sample, header *bitflow.Header, hasTags bool, writer io.Writer) error
- type SimpleTextMarshallerFactory
- type StoreStats
- type SubProcessRunner
- func (r *SubProcessRunner) Close()
- func (r *SubProcessRunner) Configure(marshallingFormat string, f *bitflow.EndpointFactory) error
- func (r *SubProcessRunner) Sample(sample *bitflow.Sample, header *bitflow.Header) error
- func (r *SubProcessRunner) Start(wg *sync.WaitGroup) golib.StopChan
- func (r *SubProcessRunner) String() string
- type SynchronizedStreamMerger
- func (p *SynchronizedStreamMerger) Close()
- func (p *SynchronizedStreamMerger) Sample(sample *bitflow.Sample, header *bitflow.Header) error
- func (p *SynchronizedStreamMerger) StreamClosed(name string)
- func (p *SynchronizedStreamMerger) StreamOfSampleClosed(lastSample *bitflow.Sample, lastHeader *bitflow.Header)
- func (p *SynchronizedStreamMerger) String() string
- type TagChangeCallback
- type TagChangeListener
- func (t *TagChangeListener) Close()
- func (t *TagChangeListener) ReadParameters(params map[string]interface{})
- func (t *TagChangeListener) Sample(sample *bitflow.Sample, header *bitflow.Header) error
- func (t *TagChangeListener) Start(wg *sync.WaitGroup) golib.StopChan
- func (t *TagChangeListener) String() string
- type TagChangeRunner
- type TagSynchronizer
- type UniqueTagPrinter
Constants ¶
const ConsoleBoxEndpoint = bitflow.EndpointType("box")
const DynamicSourceEndpointType = "dynamic"
const GeneratorSourceEndpointType = "generate"
const PrometheusMarshallingFormat = bitflow.MarshallingFormat("prometheus")
Variables ¶
var ( ConsoleBoxSettings = gotermBox.CliLogBox{ NoUtf8: false, LogLines: 10, MessageBuffer: 500, } ConsoleBoxUpdateInterval = 500 * time.Millisecond ConsoleBoxMinUpdateInterval = 50 * time.Millisecond // ConsoleBoxOutputTestMode is a flag used by tests to suppress initialization routines // that are not testable. It is a hack to keep the EndpointFactory easy to use // while making it testable. ConsoleBoxOutputTestMode = false )
var BatchProcessorParameters = reg.RegisteredParameters{}. Optional("flush-tags", reg.List(reg.String()), []string{}, "Flush the current batch when one or more of the given tags change"). Optional("flush-no-samples-timeout", reg.Duration(), time.Duration(0)). Optional("flush-sample-lag-timeout", reg.Duration(), time.Duration(0)). Optional("flush-num-samples", reg.Int(), 0). Optional("flush-time-diff", reg.Duration(), time.Duration(0)). Optional("ignore-close", reg.Bool(), false, "Do not flush the remaining samples, when the pipeline is closed", "The default behavior is to flush on close"). Optional("forward-immediately", reg.Bool(), false, "In addition to the regular batching functionality, output each incoming sample immediately", "This will possibly duplicate each incoming sample, since the regular batch processing results are forwarded as well")
TODO "ignore-header-change"
var DynamicSourceParameters = reg.RegisteredParameters{}. Optional("update-time", reg.Duration(), 2*time.Second)
var GeneratorSourceParameters = reg.RegisteredParameters{}. Optional("interval", reg.Duration(), 500*time.Millisecond)
var MetricSplitterDescription = "" /* 237-byte string literal not displayed */
Functions ¶
func AddDecoupleStep ¶
func AddDecoupleStep(p *bitflow.SamplePipeline, params map[string]interface{}) error
func AddTagChangeListenerParams ¶ added in v0.0.42
func AddTagChangeListenerParams(step *reg.RegisteredStep)
func AppendToSample ¶
func FillSample ¶
func IsValidNumber ¶
func MakeBatchProcessor ¶ added in v0.0.28
func MakeBatchProcessor(params map[string]interface{}) (res *bitflow.BatchProcessor, err error)
func NewSampleShuffler ¶
func NewSampleShuffler() *bitflow.SimpleBatchProcessingStep
func NewTagMapper ¶ added in v0.0.52
func NewTagMapper(sourceTag, targetTag string, mapping map[string]string) bitflow.SampleProcessor
func NewTaggingProcessor ¶
func NewTaggingProcessor(tags map[string]string) bitflow.SampleProcessor
func RegisterAppendTimeDifference ¶
func RegisterAppendTimeDifference(b reg.ProcessorRegistry)
func RegisterConsoleBoxOutput ¶ added in v0.0.52
func RegisterConsoleBoxOutput(e *bitflow.EndpointFactory)
func RegisterDecouple ¶
func RegisterDecouple(b reg.ProcessorRegistry)
func RegisterDrop ¶
func RegisterDrop(b reg.ProcessorRegistry)
func RegisterDropErrorsStep ¶
func RegisterDropErrorsStep(b reg.ProcessorRegistry)
func RegisterDropInvalid ¶ added in v0.0.50
func RegisterDropInvalid(b reg.ProcessorRegistry)
func RegisterDuplicateTimestampFilter ¶
func RegisterDuplicateTimestampFilter(b reg.ProcessorRegistry)
func RegisterDynamicSource ¶ added in v0.0.37
func RegisterDynamicSource(factory *bitflow.EndpointFactory)
func RegisterExcludeMetricsFilter ¶
func RegisterExcludeMetricsFilter(b reg.ProcessorRegistry)
func RegisterExpression ¶
func RegisterExpression(b reg.ProcessorRegistry)
func RegisterFillUpStep ¶
func RegisterFillUpStep(b reg.ProcessorRegistry)
func RegisterFilterExpression ¶
func RegisterFilterExpression(b reg.ProcessorRegistry)
func RegisterForks ¶
func RegisterForks(b reg.ProcessorRegistry)
This function is placed in this package to avoid circular dependency between the fork and the query package.
func RegisterGeneratorSource ¶ added in v0.0.42
func RegisterGeneratorSource(factory *bitflow.EndpointFactory)
func RegisterGraphiteOutput ¶
func RegisterGraphiteOutput(b reg.ProcessorRegistry)
func RegisterHttpTagger ¶
func RegisterHttpTagger(b reg.ProcessorRegistry)
func RegisterIncludeMetricsFilter ¶
func RegisterIncludeMetricsFilter(b reg.ProcessorRegistry)
func RegisterLoggingSteps ¶
func RegisterLoggingSteps(b reg.ProcessorRegistry)
func RegisterMergeHeaders ¶
func RegisterMergeHeaders(b reg.ProcessorRegistry)
func RegisterMetricMapper ¶
func RegisterMetricMapper(b reg.ProcessorRegistry)
func RegisterMetricRenamer ¶
func RegisterMetricRenamer(b reg.ProcessorRegistry)
func RegisterMetricSplitter ¶ added in v0.0.6
func RegisterMetricSplitter(b reg.ProcessorRegistry)
func RegisterNoop ¶
func RegisterNoop(b reg.ProcessorRegistry)
func RegisterOpentsdbOutput ¶
func RegisterOpentsdbOutput(b reg.ProcessorRegistry)
func RegisterOutputFiles ¶
func RegisterOutputFiles(b reg.ProcessorRegistry)
func RegisterParseTags ¶
func RegisterParseTags(b reg.ProcessorRegistry)
func RegisterPauseTagger ¶
func RegisterPauseTagger(b reg.ProcessorRegistry)
func RegisterPickHead ¶
func RegisterPickHead(b reg.ProcessorRegistry)
func RegisterPickPercent ¶
func RegisterPickPercent(b reg.ProcessorRegistry)
func RegisterPickTail ¶
func RegisterPickTail(b reg.ProcessorRegistry)
func RegisterPipelineRateSynchronizer ¶
func RegisterPipelineRateSynchronizer(b reg.ProcessorRegistry)
func RegisterPrometheusMarshaller ¶ added in v0.0.52
func RegisterPrometheusMarshaller(endpoints *bitflow.EndpointFactory)
func RegisterResendStep ¶
func RegisterResendStep(b reg.ProcessorRegistry)
func RegisterSampleShuffler ¶
func RegisterSampleShuffler(b reg.ProcessorRegistry)
func RegisterSampleSorter ¶
func RegisterSampleSorter(b reg.ProcessorRegistry)
func RegisterSetCurrentTime ¶
func RegisterSetCurrentTime(b reg.ProcessorRegistry)
func RegisterSkipHead ¶
func RegisterSkipHead(b reg.ProcessorRegistry)
func RegisterSleep ¶
func RegisterSleep(b reg.ProcessorRegistry)
func RegisterStoreStats ¶
func RegisterStoreStats(b reg.ProcessorRegistry)
func RegisterStripMetrics ¶
func RegisterStripMetrics(b reg.ProcessorRegistry)
func RegisterSubProcessRunner ¶ added in v0.0.55
func RegisterSubProcessRunner(b reg.ProcessorRegistry)
func RegisterSubpipelineStreamMerger ¶
func RegisterSubpipelineStreamMerger(b reg.ProcessorRegistry)
func RegisterTagChangeRunner ¶ added in v0.0.42
func RegisterTagChangeRunner(b reg.ProcessorRegistry)
func RegisterTagMapping ¶ added in v0.0.52
func RegisterTagMapping(b reg.ProcessorRegistry)
func RegisterTagSynchronizer ¶
func RegisterTagSynchronizer(b reg.ProcessorRegistry)
func RegisterTaggingProcessor ¶
func RegisterTaggingProcessor(b reg.ProcessorRegistry)
func RegisterVarianceMetricsFilter ¶
func RegisterVarianceMetricsFilter(b reg.ProcessorRegistry)
func SampleToVector ¶
func ScaleMinMax ¶
func ScaleStddev ¶
func SendPeriodically ¶
func SplitShellCommand ¶
func ValuesToVector ¶
Types ¶
type AbstractBatchMetricMapper ¶
type AbstractBatchMetricMapper struct { Description fmt.Stringer ConstructIndices func(header *bitflow.Header, samples []*bitflow.Sample) ([]int, []string) }
func NewMetricVarianceFilter ¶
func NewMetricVarianceFilter(minimumWeightedStddev float64) *AbstractBatchMetricMapper
func (*AbstractBatchMetricMapper) ProcessBatch ¶
func (*AbstractBatchMetricMapper) String ¶
func (mapper *AbstractBatchMetricMapper) String() string
type AbstractMetricFilter ¶
type AbstractMetricFilter struct { AbstractMetricMapper IncludeFilter func(name string) bool // Return true if metric should be included }
type AbstractMetricMapper ¶
type AbstractMetricMapper struct { bitflow.NoopProcessor Description fmt.Stringer ConstructIndices func(header *bitflow.Header) ([]int, []string) // contains filtered or unexported fields }
func (*AbstractMetricMapper) String ¶
func (m *AbstractMetricMapper) String() string
type BlockManager ¶
type BlockManager struct {
// contains filtered or unexported fields
}
func NewBlockManager ¶
func NewBlockManager() *BlockManager
func (*BlockManager) GetList ¶
func (m *BlockManager) GetList(key string) *BlockerList
func (*BlockManager) NewBlocker ¶
func (m *BlockManager) NewBlocker(key string) *BlockingProcessor
func (*BlockManager) NewReleaser ¶
func (m *BlockManager) NewReleaser(key string) *ReleasingProcessor
func (*BlockManager) RegisterBlockingProcessor ¶
func (m *BlockManager) RegisterBlockingProcessor(b reg.ProcessorRegistry)
func (*BlockManager) RegisterReleasingProcessor ¶
func (m *BlockManager) RegisterReleasingProcessor(b reg.ProcessorRegistry)
type BlockerList ¶
type BlockerList struct {
Blockers []*BlockingProcessor
}
func (*BlockerList) Add ¶
func (l *BlockerList) Add(blocker *BlockingProcessor)
func (*BlockerList) ReleaseAll ¶
func (l *BlockerList) ReleaseAll()
type BlockingProcessor ¶
type BlockingProcessor struct { bitflow.NoopProcessor // contains filtered or unexported fields }
func (*BlockingProcessor) Close ¶
func (p *BlockingProcessor) Close()
func (*BlockingProcessor) Release ¶
func (p *BlockingProcessor) Release()
func (*BlockingProcessor) String ¶
func (p *BlockingProcessor) String() string
type ConsoleBoxSink ¶ added in v0.0.52
type ConsoleBoxSink struct { bitflow.AbstractSampleOutput gotermBox.CliLogBoxTask // ImmediateScreenUpdate causes the console box to be updated immediately // whenever a sample is received by this ConsoleBoxSink. Otherwise, the screen // will be updated in regular intervals based on the settings in CliLogBoxTask. ImmediateScreenUpdate bool // contains filtered or unexported fields }
ConsoleBoxSink implements the SampleSink interface by printing the received samples to the standard out. Contrary to the ConsoleSink, the screen is erased before printing a new sample, and the output is embedded in a box that shows the last lines of log output at the bottom. ConsoleBoxSink does not implement MarshallingSampleSink, because it uses its own, fixed marshaller.
Multiple embedded fields provide access to configuration options.
Init() must be called as early as possible when using ConsoleBoxSink, to make sure that all log messages are capture and none are overwritten by the box.
func (*ConsoleBoxSink) Close ¶ added in v0.0.52
func (sink *ConsoleBoxSink) Close()
Close implements the SampleSink interface. It stops the screen refresh goroutine.
func (*ConsoleBoxSink) Sample ¶ added in v0.0.52
Sample implements the SampleSink interface. The latest sample is stored and displayed on the console on the next screen refresh. Intermediate samples might get lost without being displayed.
func (*ConsoleBoxSink) Start ¶ added in v0.0.52
func (sink *ConsoleBoxSink) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleSink interface. It starts a goroutine that regularly refreshes the screen to display the current sample values and latest log output lines.
func (*ConsoleBoxSink) Stop ¶ added in v0.0.52
func (sink *ConsoleBoxSink) Stop()
Stop shadows the Stop() method from gotermBox.CliLogBoxTask to make sure that this SampleSink is actually closed in the Close() method.
func (*ConsoleBoxSink) String ¶ added in v0.0.52
func (sink *ConsoleBoxSink) String() string
String implements the SampleSink interface.
func (*ConsoleBoxSink) WritesToConsole ¶ added in v0.0.52
func (sink *ConsoleBoxSink) WritesToConsole() bool
Implement the bitflow.ConsoleSampleSink interface
type DecouplingProcessor ¶
type DecouplingProcessor struct { bitflow.NoopProcessor ChannelBuffer int // Must be set before calling Start() // contains filtered or unexported fields }
Decouple the incoming samples from the MetricSink through a looping goroutine and a channel. Creates potential parallelism in the pipeline.
func (*DecouplingProcessor) Close ¶
func (p *DecouplingProcessor) Close()
func (*DecouplingProcessor) Start ¶
func (p *DecouplingProcessor) Start(wg *sync.WaitGroup) golib.StopChan
func (*DecouplingProcessor) String ¶
func (p *DecouplingProcessor) String() string
type DropErrorsProcessor ¶
type DropErrorsProcessor struct { bitflow.NoopProcessor LogError bool LogWarning bool LogDebug bool LogInfo bool }
func (*DropErrorsProcessor) String ¶
func (p *DropErrorsProcessor) String() string
type DynamicSource ¶ added in v0.0.37
type DynamicSource struct { bitflow.AbstractSampleSource URL string FetchTimeout time.Duration Endpoints *bitflow.EndpointFactory // contains filtered or unexported fields }
func (*DynamicSource) Close ¶ added in v0.0.37
func (s *DynamicSource) Close()
func (*DynamicSource) Start ¶ added in v0.0.37
func (s *DynamicSource) Start(wg *sync.WaitGroup) (_ golib.StopChan)
func (*DynamicSource) String ¶ added in v0.0.37
func (s *DynamicSource) String() string
type Expression ¶
type Expression struct {
// contains filtered or unexported fields
}
func NewExpression ¶
func NewExpression(expressionString string) (*Expression, error)
func (*Expression) EvaluateBool ¶
func (*Expression) UpdateHeader ¶
func (p *Expression) UpdateHeader(header *bitflow.Header) error
type ExpressionProcessor ¶
type ExpressionProcessor struct { bitflow.NoopProcessor Filter bool // contains filtered or unexported fields }
func (*ExpressionProcessor) AddExpression ¶
func (p *ExpressionProcessor) AddExpression(expressionString string) error
func (*ExpressionProcessor) MergeProcessor ¶
func (p *ExpressionProcessor) MergeProcessor(otherProcessor bitflow.SampleProcessor) bool
func (*ExpressionProcessor) String ¶
func (p *ExpressionProcessor) String() string
type FeatureStats ¶
func NewFeatureStats ¶
func NewFeatureStats() *FeatureStats
func (*FeatureStats) Push ¶
func (stats *FeatureStats) Push(values ...float64)
func (*FeatureStats) ScaleMinMax ¶
func (stats *FeatureStats) ScaleMinMax(val float64, outputMin, outputMax float64) float64
func (*FeatureStats) ScaleStddev ¶
func (stats *FeatureStats) ScaleStddev(val float64) float64
type FillUpProcessor ¶
type FillUpProcessor struct { bitflow.NoopProcessor MinMissingInterval time.Duration StepInterval time.Duration // contains filtered or unexported fields }
func (*FillUpProcessor) String ¶
func (p *FillUpProcessor) String() string
type GeneratorSource ¶ added in v0.0.42
type GeneratorSource struct { bitflow.AbstractSampleSource // contains filtered or unexported fields }
func (*GeneratorSource) Close ¶ added in v0.0.42
func (s *GeneratorSource) Close()
func (*GeneratorSource) Start ¶ added in v0.0.42
func (s *GeneratorSource) Start(wg *sync.WaitGroup) (_ golib.StopChan)
func (*GeneratorSource) String ¶ added in v0.0.42
func (s *GeneratorSource) String() string
type HttpTagger ¶
type HttpTagger struct { bitflow.NoopProcessor // contains filtered or unexported fields }
func NewHttpTagger ¶
func NewHttpTagger(pathPrefix string, r *mux.Router) *HttpTagger
func NewStandaloneHttpTagger ¶
func NewStandaloneHttpTagger(pathPrefix string, endpoint string) *HttpTagger
func (*HttpTagger) HasTags ¶
func (tagger *HttpTagger) HasTags() bool
func (*HttpTagger) NumTags ¶
func (tagger *HttpTagger) NumTags() int
func (*HttpTagger) String ¶
func (tagger *HttpTagger) String() string
func (*HttpTagger) Tags ¶
func (tagger *HttpTagger) Tags() map[string]string
type MetricFilter ¶
type MetricFilter struct { AbstractMetricFilter // contains filtered or unexported fields }
func NewMetricFilter ¶
func NewMetricFilter() *MetricFilter
func (*MetricFilter) Exclude ¶
func (filter *MetricFilter) Exclude(regex *regexp.Regexp) *MetricFilter
func (*MetricFilter) ExcludeRegex ¶
func (filter *MetricFilter) ExcludeRegex(regexStr string) (*MetricFilter, error)
func (*MetricFilter) ExcludeStr ¶
func (filter *MetricFilter) ExcludeStr(substr string) *MetricFilter
func (*MetricFilter) Include ¶
func (filter *MetricFilter) Include(regex *regexp.Regexp) *MetricFilter
func (*MetricFilter) IncludeRegex ¶
func (filter *MetricFilter) IncludeRegex(regexStr string) (*MetricFilter, error)
func (*MetricFilter) IncludeStr ¶
func (filter *MetricFilter) IncludeStr(substr string) *MetricFilter
func (*MetricFilter) MergeProcessor ¶
func (filter *MetricFilter) MergeProcessor(other bitflow.SampleProcessor) bool
func (*MetricFilter) String ¶
func (filter *MetricFilter) String() string
type MetricMapper ¶
type MetricMapper struct { AbstractMetricMapper Metrics []string }
func NewMetricMapper ¶
func NewMetricMapper(metrics []string) *MetricMapper
func (*MetricMapper) String ¶
func (mapper *MetricMapper) String() string
type MetricMapperHelper ¶
type MetricMapperHelper struct { bitflow.HeaderChecker // contains filtered or unexported fields }
type MetricRenamer ¶
type MetricRenamer struct { AbstractMetricMapper // contains filtered or unexported fields }
func NewMetricRenamer ¶
func NewMetricRenamer(regexes []*regexp.Regexp, replacements []string) *MetricRenamer
func (*MetricRenamer) MergeProcessor ¶
func (r *MetricRenamer) MergeProcessor(other bitflow.SampleProcessor) bool
func (*MetricRenamer) String ¶
func (r *MetricRenamer) String() string
type MetricSplitter ¶ added in v0.0.6
type MetricSplitter struct { NoopProcessor Splitters []*regexp.Regexp // contains filtered or unexported fields }
func NewMetricSplitter ¶ added in v0.0.6
func NewMetricSplitter(regexes []string) (*MetricSplitter, error)
func (*MetricSplitter) Split ¶ added in v0.0.6
func (m *MetricSplitter) Split(sample *bitflow.Sample, header *bitflow.Header) []bitflow.SampleAndHeader
func (*MetricSplitter) String ¶ added in v0.0.6
func (m *MetricSplitter) String() string
type MetricWindow ¶
type MetricWindow struct {
// contains filtered or unexported fields
}
func NewMetricWindow ¶
func NewMetricWindow(size int) *MetricWindow
func (*MetricWindow) Data ¶
func (w *MetricWindow) Data() []bitflow.Value
func (*MetricWindow) Empty ¶
func (w *MetricWindow) Empty() bool
func (*MetricWindow) FastData ¶
func (w *MetricWindow) FastData() []bitflow.Value
Avoid copying if possible. Dangerous.
func (*MetricWindow) FillData ¶
func (w *MetricWindow) FillData(target []bitflow.Value) []bitflow.Value
func (*MetricWindow) Full ¶
func (w *MetricWindow) Full() bool
func (*MetricWindow) Pop ¶
func (w *MetricWindow) Pop() bitflow.Value
Remove and return the oldest value. The oldest value is also deleted by Push() when the window is full.
func (*MetricWindow) Push ¶
func (w *MetricWindow) Push(val bitflow.Value)
func (*MetricWindow) Size ¶
func (w *MetricWindow) Size() int
type MultiHeaderMerger ¶
type MultiHeaderMerger struct { bitflow.NoopProcessor // contains filtered or unexported fields }
Can tolerate multiple headers, fills missing data up with default values.
func NewMultiHeaderMerger ¶
func NewMultiHeaderMerger() *MultiHeaderMerger
func (*MultiHeaderMerger) Close ¶
func (p *MultiHeaderMerger) Close()
func (*MultiHeaderMerger) OutputSampleSize ¶
func (p *MultiHeaderMerger) OutputSampleSize(sampleSize int) int
func (*MultiHeaderMerger) String ¶
func (p *MultiHeaderMerger) String() string
type NoopProcessor ¶
type NoopProcessor struct {
bitflow.NoopProcessor
}
func (*NoopProcessor) String ¶
func (*NoopProcessor) String() string
type PauseTagger ¶
type PauseTagger struct { bitflow.NoopProcessor MinimumPause time.Duration Tag string // contains filtered or unexported fields }
func (*PauseTagger) String ¶
func (d *PauseTagger) String() string
type PipelineRateSynchronizer ¶
type PipelineRateSynchronizer struct { ChannelSize int ChannelCloseHook func(lastSample *bitflow.Sample, lastHeader *bitflow.Header) // contains filtered or unexported fields }
func (*PipelineRateSynchronizer) NewSynchronizationStep ¶
func (s *PipelineRateSynchronizer) NewSynchronizationStep() bitflow.SampleProcessor
type PrometheusMarshaller ¶ added in v0.0.52
type PrometheusMarshaller struct { }
PrometheusMarshaller marshals Headers and Samples to the prometheus exposition format
func (PrometheusMarshaller) ShouldCloseAfterFirstSample ¶ added in v0.0.52
func (PrometheusMarshaller) ShouldCloseAfterFirstSample() bool
ShouldCloseAfterFirstSample defines that prometheus streams should close after first sent sample
func (PrometheusMarshaller) String ¶ added in v0.0.52
func (PrometheusMarshaller) String() string
String implements the Marshaller interface.
func (PrometheusMarshaller) WriteHeader ¶ added in v0.0.52
func (PrometheusMarshaller) WriteHeader(header *bitflow.Header, withTags bool, output io.Writer) error
WriteHeader implements the Marshaller interface. It is empty, because the prometheus exposition format doesn't need one
func (PrometheusMarshaller) WriteSample ¶ added in v0.0.52
func (m PrometheusMarshaller) WriteSample(sample *bitflow.Sample, header *bitflow.Header, withTags bool, writer io.Writer) error
WriteSample implements the Marshaller interface. See the PrometheusMarshaller godoc for information about the format.
type ReleasingProcessor ¶
type ReleasingProcessor struct { bitflow.NoopProcessor // contains filtered or unexported fields }
func (*ReleasingProcessor) Close ¶
func (p *ReleasingProcessor) Close()
func (*ReleasingProcessor) String ¶
func (p *ReleasingProcessor) String() string
type ResendProcessor ¶
type ResendProcessor struct { bitflow.NoopProcessor Interval time.Duration // contains filtered or unexported fields }
func (*ResendProcessor) Close ¶
func (p *ResendProcessor) Close()
func (*ResendProcessor) String ¶
func (p *ResendProcessor) String() string
type RestDataSource ¶
type RestDataSource struct { bitflow.AbstractSampleSource // contains filtered or unexported fields }
func (*RestDataSource) Close ¶
func (source *RestDataSource) Close()
func (*RestDataSource) EmitSample ¶
func (source *RestDataSource) EmitSample(sample *bitflow.Sample, header *bitflow.Header)
func (*RestDataSource) EmitSampleAndHeader ¶ added in v0.0.32
func (source *RestDataSource) EmitSampleAndHeader(sample bitflow.SampleAndHeader)
func (*RestDataSource) EmitSampleAndHeaderTimeout ¶ added in v0.0.37
func (source *RestDataSource) EmitSampleAndHeaderTimeout(sample bitflow.SampleAndHeader, timeout time.Duration) bool
func (*RestDataSource) EmitSampleTimeout ¶ added in v0.0.37
func (*RestDataSource) EmitSamples ¶ added in v0.0.32
func (source *RestDataSource) EmitSamples(samples []bitflow.SampleAndHeader)
func (*RestDataSource) EmitSamplesTimeout ¶ added in v0.0.37
func (source *RestDataSource) EmitSamplesTimeout(samples []bitflow.SampleAndHeader, timeout time.Duration) bool
func (*RestDataSource) Endpoint ¶
func (source *RestDataSource) Endpoint() string
func (*RestDataSource) Serve ¶
func (source *RestDataSource) Serve(verb string, path string, handlers ...gin.HandlerFunc) error
func (*RestDataSource) Start ¶
func (source *RestDataSource) Start(wg *sync.WaitGroup) golib.StopChan
func (*RestDataSource) String ¶ added in v0.0.32
func (source *RestDataSource) String() string
type RestEndpoint ¶
type RestEndpoint struct {
// contains filtered or unexported fields
}
func (*RestEndpoint) NewDataSource ¶
func (endpoint *RestEndpoint) NewDataSource(outgoingSampleBuffer int) *RestDataSource
type RestEndpointFactory ¶
type RestEndpointFactory struct {
// contains filtered or unexported fields
}
func (*RestEndpointFactory) GetEndpoint ¶
func (s *RestEndpointFactory) GetEndpoint(endpointString string) *RestEndpoint
type RestReplyHelpers ¶
type RestReplyHelpers struct { }
func (RestReplyHelpers) ReplyCode ¶ added in v0.0.39
func (h RestReplyHelpers) ReplyCode(context *gin.Context, message string, statusCode int)
func (RestReplyHelpers) ReplyError ¶
func (h RestReplyHelpers) ReplyError(context *gin.Context, errorMessage string)
func (RestReplyHelpers) ReplyGenericError ¶
func (h RestReplyHelpers) ReplyGenericError(context *gin.Context, errorMessage string)
func (RestReplyHelpers) ReplySuccess ¶
func (h RestReplyHelpers) ReplySuccess(context *gin.Context, message string)
type SampleFilter ¶
type SampleFilter struct { bitflow.NoopProcessor Description fmt.Stringer IncludeFilter func(sample *bitflow.Sample, header *bitflow.Header) (bool, error) // Return true if sample should be included }
func (*SampleFilter) String ¶
func (p *SampleFilter) String() string
type SampleSlice ¶
type SampleSlice struct {
// contains filtered or unexported fields
}
func (SampleSlice) Len ¶
func (s SampleSlice) Len() int
func (SampleSlice) Less ¶
func (s SampleSlice) Less(i, j int) bool
func (SampleSlice) Swap ¶
func (s SampleSlice) Swap(i, j int)
type SampleSorter ¶
type SampleSorter struct {
Tags []string
}
Sort based on given Tags, use Timestamp as last sort criterion
func (*SampleSorter) ProcessBatch ¶
func (*SampleSorter) String ¶
func (sorter *SampleSorter) String() string
type SimpleTextMarshaller ¶
type SimpleTextMarshaller struct { Description string MetricPrefix string NameFixer func(string) string WriteValue func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error }
func (SimpleTextMarshaller) ShouldCloseAfterFirstSample ¶ added in v0.0.26
func (SimpleTextMarshaller) ShouldCloseAfterFirstSample() bool
ShouldCloseAfterFirstSample defines that text streams can stream without closing
func (*SimpleTextMarshaller) String ¶
func (o *SimpleTextMarshaller) String() string
func (*SimpleTextMarshaller) WriteHeader ¶
type StoreStats ¶
type StoreStats struct { bitflow.NoopProcessor TargetFile string // contains filtered or unexported fields }
func NewStoreStats ¶
func NewStoreStats(targetFile string) *StoreStats
func (*StoreStats) Close ¶
func (stats *StoreStats) Close()
func (*StoreStats) StoreStatistics ¶
func (stats *StoreStats) StoreStatistics() error
func (*StoreStats) String ¶
func (stats *StoreStats) String() string
type SubProcessRunner ¶ added in v0.0.55
type SubProcessRunner struct { bitflow.NoopProcessor Cmd string Args []string Reader bitflow.SampleReader Writer bitflow.SampleWriter Marshaller bitflow.Marshaller // contains filtered or unexported fields }
func (*SubProcessRunner) Close ¶ added in v0.0.55
func (r *SubProcessRunner) Close()
func (*SubProcessRunner) Configure ¶ added in v0.0.55
func (r *SubProcessRunner) Configure(marshallingFormat string, f *bitflow.EndpointFactory) error
func (*SubProcessRunner) Start ¶ added in v0.0.55
func (r *SubProcessRunner) Start(wg *sync.WaitGroup) golib.StopChan
func (*SubProcessRunner) String ¶ added in v0.0.55
func (r *SubProcessRunner) String() string
type SynchronizedStreamMerger ¶
type SynchronizedStreamMerger struct { bitflow.NoopProcessor MergeTag string MergeInterval time.Duration ExpectedStreams int MergeSamples func([]*bitflow.Sample, []*bitflow.Header) (*bitflow.Sample, *bitflow.Header) Description string DebugQueueLengths bool DebugWaitingQueues bool // contains filtered or unexported fields }
func (*SynchronizedStreamMerger) Close ¶
func (p *SynchronizedStreamMerger) Close()
func (*SynchronizedStreamMerger) StreamClosed ¶
func (p *SynchronizedStreamMerger) StreamClosed(name string)
func (*SynchronizedStreamMerger) StreamOfSampleClosed ¶
func (p *SynchronizedStreamMerger) StreamOfSampleClosed(lastSample *bitflow.Sample, lastHeader *bitflow.Header)
func (*SynchronizedStreamMerger) String ¶
func (p *SynchronizedStreamMerger) String() string
type TagChangeCallback ¶ added in v0.0.42
type TagChangeListener ¶ added in v0.0.42
type TagChangeListener struct { bitflow.AbstractSampleProcessor Tag string UpdateInterval time.Duration ExpirationTimeout time.Duration ExpireOnClose bool Callback TagChangeCallback // contains filtered or unexported fields }
func (*TagChangeListener) Close ¶ added in v0.0.42
func (t *TagChangeListener) Close()
func (*TagChangeListener) ReadParameters ¶ added in v0.0.42
func (t *TagChangeListener) ReadParameters(params map[string]interface{})
func (*TagChangeListener) Start ¶ added in v0.0.42
func (t *TagChangeListener) Start(wg *sync.WaitGroup) golib.StopChan
func (*TagChangeListener) String ¶ added in v0.0.42
func (t *TagChangeListener) String() string
type TagChangeRunner ¶ added in v0.0.42
type TagChangeRunner struct { TagChangeListener Program string Args []string PreserveStdout bool // contains filtered or unexported fields }
func (*TagChangeRunner) Expired ¶ added in v0.0.42
func (r *TagChangeRunner) Expired(value string, allValues []string) bool
func (*TagChangeRunner) String ¶ added in v0.0.42
func (r *TagChangeRunner) String() string
type TagSynchronizer ¶
type TagSynchronizer struct { bitflow.NoopProcessor StreamIdentifierTag string ReferenceStream string NumTargetStreams int // contains filtered or unexported fields }
This processor copies tags from a "reference" sample stream to a number of "target" sample streams. Streams are identified by the value of a given tag, where the reference stream holds a special value that must be given. The target streams can have arbitrary values. The tag synchronization is done by time: one reference sample affects all target samples after its timestamp, and before the timestamp of the follow-up reference sample. Target samples with timestamps BEFORE any reference sample are forwarded unmodified (with a warning). Target samples AFTER the last reference sample will receive the tags from the last reference sample. All streams are assumed to be sorted by time, arrive in parallel, and are forwarded in the same order.
func (*TagSynchronizer) Close ¶
func (s *TagSynchronizer) Close()
func (*TagSynchronizer) String ¶
func (s *TagSynchronizer) String() string
type UniqueTagPrinter ¶
type UniqueTagPrinter struct { bitflow.NoopProcessor Tag string Count bool // contains filtered or unexported fields }
func NewUniqueTagCounter ¶
func NewUniqueTagCounter(tag string) *UniqueTagPrinter
func NewUniqueTagPrinter ¶
func NewUniqueTagPrinter(tag string) *UniqueTagPrinter
func (*UniqueTagPrinter) Close ¶
func (printer *UniqueTagPrinter) Close()
func (*UniqueTagPrinter) String ¶
func (printer *UniqueTagPrinter) String() string
Source Files ¶
- batch.go
- block.go
- decouple.go
- drop.go
- error_handling.go
- expression.go
- expression_processor.go
- filter.go
- filter_duplicate_timestamps.go
- fork.go
- generate-samples.go
- helpers.go
- input-dynamic.go
- input_http.go
- logging.go
- marshall_prometheus.go
- metrics.go
- metrics_misc.go
- metrics_split.go
- multi_header_merger.go
- noop.go
- output.go
- output_console_box.go
- output_tcp_text.go
- pause_tagger.go
- pick_samples.go
- rate_synchronizer.go
- resend.go
- sample_merger.go
- shuffle.go
- sleep.go
- sort.go
- stats.go
- subprocess.go
- synchronize_tags.go
- tag-change-callback.go
- tag-change-runner.go
- tags-http.go
- tags.go
- window.go