steps

package
v0.0.35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2019 License: GPL-3.0 Imports: 29 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BatchProcessorParameters = reg.RegisteredParameters{}.
	Optional("tags", reg.List(reg.String()), []string{}).
	Optional("timeout", reg.Duration(), time.Duration(0)).
	Optional("ignore-close", reg.Bool(), false).
	Optional("forward-immediately", reg.Bool(), false)

TODO "ignore-header-change"

View Source
var MetricSplitterDescription = "" /* 237-byte string literal not displayed */

Functions

func AddDecoupleStep

func AddDecoupleStep(p *bitflow.SamplePipeline, params map[string]interface{}) error

func AppendToSample

func AppendToSample(s *bitflow.Sample, values []float64)

func FillSample

func FillSample(s *bitflow.Sample, values []float64)

func FillSampleFromMatrix

func FillSampleFromMatrix(s *bitflow.Sample, row int, mat *mat.Dense)

func FillSamplesFromMatrix

func FillSamplesFromMatrix(s []*bitflow.Sample, mat *mat.Dense)

func GetMinMax

func GetMinMax(header *bitflow.Header, samples []*bitflow.Sample) ([]float64, []float64)

func IsValidNumber

func IsValidNumber(val float64) bool

func MakeBatchProcessor added in v0.0.28

func MakeBatchProcessor(params map[string]interface{}) (res *bitflow.BatchProcessor, err error)

func NewSampleShuffler

func NewSampleShuffler() *bitflow.SimpleBatchProcessingStep

func NewTaggingProcessor

func NewTaggingProcessor(tags map[string]string) bitflow.SampleProcessor

func RegisterAppendTimeDifference

func RegisterAppendTimeDifference(b reg.ProcessorRegistry)

func RegisterDecouple

func RegisterDecouple(b reg.ProcessorRegistry)

func RegisterDrop

func RegisterDrop(b reg.ProcessorRegistry)

func RegisterDropErrorsStep

func RegisterDropErrorsStep(b reg.ProcessorRegistry)

func RegisterDuplicateTimestampFilter

func RegisterDuplicateTimestampFilter(b reg.ProcessorRegistry)

func RegisterExcludeMetricsFilter

func RegisterExcludeMetricsFilter(b reg.ProcessorRegistry)

func RegisterExpression

func RegisterExpression(b reg.ProcessorRegistry)

func RegisterFillUpStep

func RegisterFillUpStep(b reg.ProcessorRegistry)

func RegisterFilterExpression

func RegisterFilterExpression(b reg.ProcessorRegistry)

func RegisterForks

func RegisterForks(b reg.ProcessorRegistry)

This function is placed in this package to avoid circular dependency between the fork and the query package.

func RegisterGraphiteOutput

func RegisterGraphiteOutput(b reg.ProcessorRegistry)

func RegisterHttpTagger

func RegisterHttpTagger(b reg.ProcessorRegistry)

func RegisterIncludeMetricsFilter

func RegisterIncludeMetricsFilter(b reg.ProcessorRegistry)

func RegisterLoggingSteps

func RegisterLoggingSteps(b reg.ProcessorRegistry)

func RegisterMergeHeaders

func RegisterMergeHeaders(b reg.ProcessorRegistry)

func RegisterMetricMapper

func RegisterMetricMapper(b reg.ProcessorRegistry)

func RegisterMetricRenamer

func RegisterMetricRenamer(b reg.ProcessorRegistry)

func RegisterMetricSplitter added in v0.0.6

func RegisterMetricSplitter(b reg.ProcessorRegistry)

func RegisterNoop

func RegisterNoop(b reg.ProcessorRegistry)

func RegisterOpentsdbOutput

func RegisterOpentsdbOutput(b reg.ProcessorRegistry)

func RegisterOutputFiles

func RegisterOutputFiles(b reg.ProcessorRegistry)

func RegisterParseTags

func RegisterParseTags(b reg.ProcessorRegistry)

func RegisterPauseTagger

func RegisterPauseTagger(b reg.ProcessorRegistry)

func RegisterPickHead

func RegisterPickHead(b reg.ProcessorRegistry)

func RegisterPickPercent

func RegisterPickPercent(b reg.ProcessorRegistry)

func RegisterPickTail

func RegisterPickTail(b reg.ProcessorRegistry)

func RegisterPipelineRateSynchronizer

func RegisterPipelineRateSynchronizer(b reg.ProcessorRegistry)

func RegisterResendStep

func RegisterResendStep(b reg.ProcessorRegistry)

func RegisterSampleShuffler

func RegisterSampleShuffler(b reg.ProcessorRegistry)

func RegisterSampleSorter

func RegisterSampleSorter(b reg.ProcessorRegistry)

func RegisterSetCurrentTime

func RegisterSetCurrentTime(b reg.ProcessorRegistry)

func RegisterSkipHead

func RegisterSkipHead(b reg.ProcessorRegistry)

func RegisterSleep

func RegisterSleep(b reg.ProcessorRegistry)

func RegisterStoreStats

func RegisterStoreStats(b reg.ProcessorRegistry)

func RegisterStripMetrics

func RegisterStripMetrics(b reg.ProcessorRegistry)

func RegisterSubpipelineStreamMerger

func RegisterSubpipelineStreamMerger(b reg.ProcessorRegistry)

func RegisterSubprocessRunner

func RegisterSubprocessRunner(b reg.ProcessorRegistry)

func RegisterTagSynchronizer

func RegisterTagSynchronizer(b reg.ProcessorRegistry)

func RegisterTaggingProcessor

func RegisterTaggingProcessor(b reg.ProcessorRegistry)

func RegisterVarianceMetricsFilter

func RegisterVarianceMetricsFilter(b reg.ProcessorRegistry)

func SampleToVector

func SampleToVector(sample *bitflow.Sample) []float64

func ScaleMinMax

func ScaleMinMax(val, min, max, outputMin, outputMax float64) float64

func ScaleStddev

func ScaleStddev(val float64, mean, stddev, min, max float64) float64

func SendPeriodically

func SendPeriodically(sample *bitflow.Sample, header *bitflow.Header, receiver bitflow.SampleSink, interval time.Duration, wg *sync.WaitGroup) golib.StopChan

func SplitShellCommand

func SplitShellCommand(s string) []string

func ValuesToVector

func ValuesToVector(input []bitflow.Value) []float64

Types

type AbstractBatchMetricMapper

type AbstractBatchMetricMapper struct {
	Description      fmt.Stringer
	ConstructIndices func(header *bitflow.Header, samples []*bitflow.Sample) ([]int, []string)
}

func NewMetricVarianceFilter

func NewMetricVarianceFilter(minimumWeightedStddev float64) *AbstractBatchMetricMapper

func (*AbstractBatchMetricMapper) ProcessBatch

func (mapper *AbstractBatchMetricMapper) ProcessBatch(header *bitflow.Header, samples []*bitflow.Sample) (*bitflow.Header, []*bitflow.Sample, error)

func (*AbstractBatchMetricMapper) String

func (mapper *AbstractBatchMetricMapper) String() string

type AbstractMetricFilter

type AbstractMetricFilter struct {
	AbstractMetricMapper
	IncludeFilter func(name string) bool // Return true if metric should be included
}

type AbstractMetricMapper

type AbstractMetricMapper struct {
	bitflow.NoopProcessor
	Description      fmt.Stringer
	ConstructIndices func(header *bitflow.Header) ([]int, []string)
	// contains filtered or unexported fields
}

func (*AbstractMetricMapper) Sample

func (m *AbstractMetricMapper) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*AbstractMetricMapper) String

func (m *AbstractMetricMapper) String() string

type BlockManager

type BlockManager struct {
	// contains filtered or unexported fields
}

func NewBlockManager

func NewBlockManager() *BlockManager

func (*BlockManager) GetList

func (m *BlockManager) GetList(key string) *BlockerList

func (*BlockManager) NewBlocker

func (m *BlockManager) NewBlocker(key string) *BlockingProcessor

func (*BlockManager) NewReleaser

func (m *BlockManager) NewReleaser(key string) *ReleasingProcessor

func (*BlockManager) RegisterBlockingProcessor

func (m *BlockManager) RegisterBlockingProcessor(b reg.ProcessorRegistry)

func (*BlockManager) RegisterReleasingProcessor

func (m *BlockManager) RegisterReleasingProcessor(b reg.ProcessorRegistry)

type BlockerList

type BlockerList struct {
	Blockers []*BlockingProcessor
}

func (*BlockerList) Add

func (l *BlockerList) Add(blocker *BlockingProcessor)

func (*BlockerList) ReleaseAll

func (l *BlockerList) ReleaseAll()

type BlockingProcessor

type BlockingProcessor struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

func (*BlockingProcessor) Close

func (p *BlockingProcessor) Close()

func (*BlockingProcessor) Release

func (p *BlockingProcessor) Release()

func (*BlockingProcessor) Sample

func (p *BlockingProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*BlockingProcessor) String

func (p *BlockingProcessor) String() string

type DecouplingProcessor

type DecouplingProcessor struct {
	bitflow.NoopProcessor

	ChannelBuffer int // Must be set before calling Start()
	// contains filtered or unexported fields
}

Decouple the incoming samples from the MetricSink through a looping goroutine and a channel. Creates potential parallelism in the pipeline.

func (*DecouplingProcessor) Close

func (p *DecouplingProcessor) Close()

func (*DecouplingProcessor) Sample

func (p *DecouplingProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*DecouplingProcessor) Start

func (*DecouplingProcessor) String

func (p *DecouplingProcessor) String() string

type DropErrorsProcessor

type DropErrorsProcessor struct {
	bitflow.NoopProcessor
	LogError   bool
	LogWarning bool
	LogDebug   bool
	LogInfo    bool
}

func (*DropErrorsProcessor) Sample

func (p *DropErrorsProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*DropErrorsProcessor) String

func (p *DropErrorsProcessor) String() string

type Expression

type Expression struct {
	// contains filtered or unexported fields
}

func NewExpression

func NewExpression(expressionString string) (*Expression, error)

func (*Expression) Evaluate

func (p *Expression) Evaluate(sample *bitflow.Sample, header *bitflow.Header) (interface{}, error)

func (*Expression) EvaluateBool

func (p *Expression) EvaluateBool(sample *bitflow.Sample, header *bitflow.Header) (bool, error)

func (*Expression) UpdateHeader

func (p *Expression) UpdateHeader(header *bitflow.Header) error

type ExpressionProcessor

type ExpressionProcessor struct {
	bitflow.NoopProcessor
	Filter bool
	// contains filtered or unexported fields
}

func (*ExpressionProcessor) AddExpression

func (p *ExpressionProcessor) AddExpression(expressionString string) error

func (*ExpressionProcessor) MergeProcessor

func (p *ExpressionProcessor) MergeProcessor(otherProcessor bitflow.SampleProcessor) bool

func (*ExpressionProcessor) Sample

func (p *ExpressionProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*ExpressionProcessor) String

func (p *ExpressionProcessor) String() string

type FeatureStats

type FeatureStats struct {
	onlinestats.Running
	Min float64
	Max float64
}

func GetStats

func GetStats(header *bitflow.Header, samples []*bitflow.Sample) []FeatureStats

func NewFeatureStats

func NewFeatureStats() *FeatureStats

func (*FeatureStats) Push

func (stats *FeatureStats) Push(values ...float64)

func (*FeatureStats) ScaleMinMax

func (stats *FeatureStats) ScaleMinMax(val float64, outputMin, outputMax float64) float64

func (*FeatureStats) ScaleStddev

func (stats *FeatureStats) ScaleStddev(val float64) float64

type FillUpProcessor

type FillUpProcessor struct {
	bitflow.NoopProcessor
	MinMissingInterval time.Duration
	StepInterval       time.Duration
	// contains filtered or unexported fields
}

func (*FillUpProcessor) Sample

func (p *FillUpProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*FillUpProcessor) String

func (p *FillUpProcessor) String() string

type HttpTagger

type HttpTagger struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

func NewHttpTagger

func NewHttpTagger(pathPrefix string, r *mux.Router) *HttpTagger

func NewStandaloneHttpTagger

func NewStandaloneHttpTagger(pathPrefix string, endpoint string) *HttpTagger

func (*HttpTagger) HasTags

func (tagger *HttpTagger) HasTags() bool

func (*HttpTagger) NumTags

func (tagger *HttpTagger) NumTags() int

func (*HttpTagger) Sample

func (tagger *HttpTagger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*HttpTagger) String

func (tagger *HttpTagger) String() string

func (*HttpTagger) Tags

func (tagger *HttpTagger) Tags() map[string]string

type MetricFilter

type MetricFilter struct {
	AbstractMetricFilter
	// contains filtered or unexported fields
}

func NewMetricFilter

func NewMetricFilter() *MetricFilter

func (*MetricFilter) Exclude

func (filter *MetricFilter) Exclude(regex *regexp.Regexp) *MetricFilter

func (*MetricFilter) ExcludeRegex

func (filter *MetricFilter) ExcludeRegex(regexStr string) (*MetricFilter, error)

func (*MetricFilter) ExcludeStr

func (filter *MetricFilter) ExcludeStr(substr string) *MetricFilter

func (*MetricFilter) Include

func (filter *MetricFilter) Include(regex *regexp.Regexp) *MetricFilter

func (*MetricFilter) IncludeRegex

func (filter *MetricFilter) IncludeRegex(regexStr string) (*MetricFilter, error)

func (*MetricFilter) IncludeStr

func (filter *MetricFilter) IncludeStr(substr string) *MetricFilter

func (*MetricFilter) MergeProcessor

func (filter *MetricFilter) MergeProcessor(other bitflow.SampleProcessor) bool

func (*MetricFilter) String

func (filter *MetricFilter) String() string

type MetricMapper

type MetricMapper struct {
	AbstractMetricMapper
	Metrics []string
}

func NewMetricMapper

func NewMetricMapper(metrics []string) *MetricMapper

func (*MetricMapper) String

func (mapper *MetricMapper) String() string

type MetricMapperHelper

type MetricMapperHelper struct {
	bitflow.HeaderChecker
	// contains filtered or unexported fields
}

type MetricRenamer

type MetricRenamer struct {
	AbstractMetricMapper
	// contains filtered or unexported fields
}

func NewMetricRenamer

func NewMetricRenamer(regexes []*regexp.Regexp, replacements []string) *MetricRenamer

func (*MetricRenamer) MergeProcessor

func (r *MetricRenamer) MergeProcessor(other bitflow.SampleProcessor) bool

func (*MetricRenamer) String

func (r *MetricRenamer) String() string

type MetricSplitter added in v0.0.6

type MetricSplitter struct {
	NoopProcessor

	Splitters []*regexp.Regexp
	// contains filtered or unexported fields
}

func NewMetricSplitter added in v0.0.6

func NewMetricSplitter(regexes []string) (*MetricSplitter, error)

func (*MetricSplitter) Sample added in v0.0.6

func (m *MetricSplitter) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*MetricSplitter) Split added in v0.0.6

func (m *MetricSplitter) Split(sample *bitflow.Sample, header *bitflow.Header) []bitflow.SampleAndHeader

func (*MetricSplitter) String added in v0.0.6

func (m *MetricSplitter) String() string

type MetricWindow

type MetricWindow struct {
	// contains filtered or unexported fields
}

func NewMetricWindow

func NewMetricWindow(size int) *MetricWindow

func (*MetricWindow) Data

func (w *MetricWindow) Data() []bitflow.Value

func (*MetricWindow) Empty

func (w *MetricWindow) Empty() bool

func (*MetricWindow) FastData

func (w *MetricWindow) FastData() []bitflow.Value

Avoid copying if possible. Dangerous.

func (*MetricWindow) FillData

func (w *MetricWindow) FillData(target []bitflow.Value) []bitflow.Value

func (*MetricWindow) Full

func (w *MetricWindow) Full() bool

func (*MetricWindow) Pop

func (w *MetricWindow) Pop() bitflow.Value

Remove and return the oldest value. The oldest value is also deleted by Push() when the window is full.

func (*MetricWindow) Push

func (w *MetricWindow) Push(val bitflow.Value)

func (*MetricWindow) Size

func (w *MetricWindow) Size() int

type MultiHeaderMerger

type MultiHeaderMerger struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

Can tolerate multiple headers, fills missing data up with default values.

func NewMultiHeaderMerger

func NewMultiHeaderMerger() *MultiHeaderMerger

func (*MultiHeaderMerger) Close

func (p *MultiHeaderMerger) Close()

func (*MultiHeaderMerger) OutputSampleSize

func (p *MultiHeaderMerger) OutputSampleSize(sampleSize int) int

func (*MultiHeaderMerger) Sample

func (p *MultiHeaderMerger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*MultiHeaderMerger) String

func (p *MultiHeaderMerger) String() string

type NoopProcessor

type NoopProcessor struct {
	bitflow.NoopProcessor
}

func (*NoopProcessor) String

func (*NoopProcessor) String() string

type PauseTagger

type PauseTagger struct {
	bitflow.NoopProcessor
	MinimumPause time.Duration
	Tag          string
	// contains filtered or unexported fields
}

func (*PauseTagger) Sample

func (d *PauseTagger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*PauseTagger) String

func (d *PauseTagger) String() string

type PipelineRateSynchronizer

type PipelineRateSynchronizer struct {
	ChannelSize      int
	ChannelCloseHook func(lastSample *bitflow.Sample, lastHeader *bitflow.Header)
	// contains filtered or unexported fields
}

func (*PipelineRateSynchronizer) NewSynchronizationStep

func (s *PipelineRateSynchronizer) NewSynchronizationStep() bitflow.SampleProcessor

type ReleasingProcessor

type ReleasingProcessor struct {
	bitflow.NoopProcessor
	// contains filtered or unexported fields
}

func (*ReleasingProcessor) Close

func (p *ReleasingProcessor) Close()

func (*ReleasingProcessor) String

func (p *ReleasingProcessor) String() string

type ResendProcessor

type ResendProcessor struct {
	bitflow.NoopProcessor
	Interval time.Duration
	// contains filtered or unexported fields
}

func (*ResendProcessor) Close

func (p *ResendProcessor) Close()

func (*ResendProcessor) Sample

func (p *ResendProcessor) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*ResendProcessor) Start

func (*ResendProcessor) String

func (p *ResendProcessor) String() string

type RestDataSource

type RestDataSource struct {
	bitflow.AbstractSampleSource
	// contains filtered or unexported fields
}

func (*RestDataSource) Close

func (source *RestDataSource) Close()

func (*RestDataSource) EmitSample

func (source *RestDataSource) EmitSample(sample *bitflow.Sample, header *bitflow.Header)

func (*RestDataSource) EmitSampleAndHeader added in v0.0.32

func (source *RestDataSource) EmitSampleAndHeader(sample bitflow.SampleAndHeader)

func (*RestDataSource) EmitSamples added in v0.0.32

func (source *RestDataSource) EmitSamples(samples []bitflow.SampleAndHeader)

func (*RestDataSource) Endpoint

func (source *RestDataSource) Endpoint() string

func (*RestDataSource) Serve

func (source *RestDataSource) Serve(verb string, path string, httpLogFile string, serve func(*gin.Context)) error

func (*RestDataSource) Start

func (source *RestDataSource) Start(wg *sync.WaitGroup) golib.StopChan

func (*RestDataSource) String added in v0.0.32

func (source *RestDataSource) String() string

type RestEndpoint

type RestEndpoint struct {
	// contains filtered or unexported fields
}

func (*RestEndpoint) NewDataSource

func (endpoint *RestEndpoint) NewDataSource(outgoingSampleBuffer int) *RestDataSource

type RestEndpointFactory

type RestEndpointFactory struct {
	// contains filtered or unexported fields
}

func (*RestEndpointFactory) GetEndpoint

func (s *RestEndpointFactory) GetEndpoint(endpointString string) *RestEndpoint

type RestReplyHelpers

type RestReplyHelpers struct {
}

func (RestReplyHelpers) ReplyError

func (h RestReplyHelpers) ReplyError(context *gin.Context, errorMessage string)

func (RestReplyHelpers) ReplyGenericError

func (h RestReplyHelpers) ReplyGenericError(context *gin.Context, errorMessage string)

func (RestReplyHelpers) ReplySuccess

func (h RestReplyHelpers) ReplySuccess(context *gin.Context, message string)

type SampleFilter

type SampleFilter struct {
	bitflow.NoopProcessor
	Description   fmt.Stringer
	IncludeFilter func(sample *bitflow.Sample, header *bitflow.Header) (bool, error) // Return true if sample should be included
}

func (*SampleFilter) Sample

func (p *SampleFilter) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*SampleFilter) String

func (p *SampleFilter) String() string

type SampleSlice

type SampleSlice struct {
	// contains filtered or unexported fields
}

func (SampleSlice) Len

func (s SampleSlice) Len() int

func (SampleSlice) Less

func (s SampleSlice) Less(i, j int) bool

func (SampleSlice) Swap

func (s SampleSlice) Swap(i, j int)

type SampleSorter

type SampleSorter struct {
	Tags []string
}

Sort based on given Tags, use Timestamp as last sort criterion

func (*SampleSorter) ProcessBatch

func (sorter *SampleSorter) ProcessBatch(header *bitflow.Header, samples []*bitflow.Sample) (*bitflow.Header, []*bitflow.Sample, error)

func (*SampleSorter) String

func (sorter *SampleSorter) String() string

type SimpleTextMarshaller

type SimpleTextMarshaller struct {
	Description  string
	MetricPrefix string
	NameFixer    func(string) string
	WriteValue   func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error
}

func (SimpleTextMarshaller) ShouldCloseAfterFirstSample added in v0.0.26

func (SimpleTextMarshaller) ShouldCloseAfterFirstSample() bool

ShouldCloseAfterFirstSample defines that text streams can stream without closing

func (*SimpleTextMarshaller) String

func (o *SimpleTextMarshaller) String() string

func (*SimpleTextMarshaller) WriteHeader

func (o *SimpleTextMarshaller) WriteHeader(header *bitflow.Header, hasTags bool, writer io.Writer) error

func (*SimpleTextMarshaller) WriteSample

func (o *SimpleTextMarshaller) WriteSample(sample *bitflow.Sample, header *bitflow.Header, hasTags bool, writer io.Writer) error

type SimpleTextMarshallerFactory

type SimpleTextMarshallerFactory struct {
	Description string
	NameFixer   func(string) string
	WriteValue  func(name string, val float64, sample *bitflow.Sample, writer io.Writer) error
}

type StoreStats

type StoreStats struct {
	bitflow.NoopProcessor
	TargetFile string
	// contains filtered or unexported fields
}

func NewStoreStats

func NewStoreStats(targetFile string) *StoreStats

func (*StoreStats) Close

func (stats *StoreStats) Close()

func (*StoreStats) Sample

func (stats *StoreStats) Sample(inSample *bitflow.Sample, header *bitflow.Header) error

func (*StoreStats) StoreStatistics

func (stats *StoreStats) StoreStatistics() error

func (*StoreStats) String

func (stats *StoreStats) String() string

type SubprocessRunner

type SubprocessRunner struct {
	bitflow.NoopProcessor
	Cmd  string
	Args []string

	Reader     bitflow.SampleReader
	Writer     bitflow.SampleWriter
	Marshaller bitflow.Marshaller
	// contains filtered or unexported fields
}

func (*SubprocessRunner) Close

func (r *SubprocessRunner) Close()

func (*SubprocessRunner) Configure

func (r *SubprocessRunner) Configure(marshallingFormat string, f *bitflow.EndpointFactory) error

func (*SubprocessRunner) Sample

func (r *SubprocessRunner) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*SubprocessRunner) Start

func (*SubprocessRunner) String

func (r *SubprocessRunner) String() string

type SynchronizedStreamMerger

type SynchronizedStreamMerger struct {
	bitflow.NoopProcessor

	MergeTag        string
	MergeInterval   time.Duration
	ExpectedStreams int
	MergeSamples    func([]*bitflow.Sample, []*bitflow.Header) (*bitflow.Sample, *bitflow.Header)

	Description        string
	DebugQueueLengths  bool
	DebugWaitingQueues bool
	// contains filtered or unexported fields
}

func (*SynchronizedStreamMerger) Close

func (p *SynchronizedStreamMerger) Close()

func (*SynchronizedStreamMerger) Sample

func (p *SynchronizedStreamMerger) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*SynchronizedStreamMerger) StreamClosed

func (p *SynchronizedStreamMerger) StreamClosed(name string)

func (*SynchronizedStreamMerger) StreamOfSampleClosed

func (p *SynchronizedStreamMerger) StreamOfSampleClosed(lastSample *bitflow.Sample, lastHeader *bitflow.Header)

func (*SynchronizedStreamMerger) String

func (p *SynchronizedStreamMerger) String() string

type TagSynchronizer

type TagSynchronizer struct {
	bitflow.NoopProcessor

	StreamIdentifierTag string
	ReferenceStream     string
	NumTargetStreams    int
	// contains filtered or unexported fields
}

This processor copies tags from a "reference" sample stream to a number of "target" sample streams. Streams are identified by the value of a given tag, where the reference stream holds a special value that must be given. The target streams can have arbitrary values. The tag synchronization is done by time: one reference sample affects all target samples after its timestamp, and before the timestamp of the follow-up reference sample. Target samples with timestamps BEFORE any reference sample are forwarded unmodified (with a warning). Target samples AFTER the last reference sample will receive the tags from the last reference sample. All streams are assumed to be sorted by time, arrive in parallel, and are forwarded in the same order.

func (*TagSynchronizer) Close

func (s *TagSynchronizer) Close()

func (*TagSynchronizer) Sample

func (s *TagSynchronizer) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*TagSynchronizer) Start

func (*TagSynchronizer) String

func (s *TagSynchronizer) String() string

type UniqueTagPrinter

type UniqueTagPrinter struct {
	bitflow.NoopProcessor
	Tag   string
	Count bool
	// contains filtered or unexported fields
}

func NewUniqueTagCounter

func NewUniqueTagCounter(tag string) *UniqueTagPrinter

func NewUniqueTagPrinter

func NewUniqueTagPrinter(tag string) *UniqueTagPrinter

func (*UniqueTagPrinter) Close

func (printer *UniqueTagPrinter) Close()

func (*UniqueTagPrinter) Sample

func (printer *UniqueTagPrinter) Sample(sample *bitflow.Sample, header *bitflow.Header) error

func (*UniqueTagPrinter) String

func (printer *UniqueTagPrinter) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL