Documentation ¶
Index ¶
- Constants
- func AddRecurrentSeries(newSerie *metrics.Serie)
- func DestroySender(id check.ID)
- func SetDefaultAggregator(agg *BufferedAggregator)
- func SetSender(sender Sender, id check.ID) error
- func StopDefaultAggregator()
- type BufferedAggregator
- func (agg *BufferedAggregator) AddAgentStartupTelemetry(agentVersion string)
- func (agg *BufferedAggregator) Flush(start time.Time, waitForSerializer bool)
- func (agg *BufferedAggregator) GetBufferedChannels() (chan []metrics.MetricSample, chan []*metrics.Event, ...)
- func (agg *BufferedAggregator) GetBufferedMetricsWithTsChannel() chan []metrics.MetricSample
- func (agg *BufferedAggregator) GetChannels() (chan *metrics.MetricSample, chan metrics.Event, chan metrics.ServiceCheck)
- func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message
- func (agg *BufferedAggregator) GetEvents() metrics.Events
- func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)
- func (agg *BufferedAggregator) GetServiceChecks() metrics.ServiceChecks
- func (agg *BufferedAggregator) IsInputQueueEmpty() bool
- func (agg *BufferedAggregator) SetHostname(hostname string)
- func (agg *BufferedAggregator) Stop()
- type CheckSampler
- type Context
- type RawSender
- type Sender
- type SerieSignature
- type Stats
- type TimeSampler
Constants ¶
const DefaultFlushInterval = 15 * time.Second // flush interval
DefaultFlushInterval aggregator default flush interval
const MetricSamplePoolBatchSize = 32
MetricSamplePoolBatchSize is the batch size of the metric sample pool.
Variables ¶
This section is empty.
Functions ¶
func AddRecurrentSeries ¶
AddRecurrentSeries adds a serie to the series that are sent at every flush
func DestroySender ¶
DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator) Should be called when no sender with this ID is used anymore The metrics of this (these) sender(s) that haven't been flushed yet will be lost
func SetDefaultAggregator ¶
func SetDefaultAggregator(agg *BufferedAggregator)
SetDefaultAggregator allows to force a custom Aggregator as the default one and run it. This is useful for testing or benchmarking.
func SetSender ¶
SetSender returns the passed sender with the passed ID. This is largely for testing purposes
func StopDefaultAggregator ¶
func StopDefaultAggregator()
StopDefaultAggregator stops the default aggregator. Based on 'flushData' waiting metrics (from checks or closed dogstatsd buckets) will be sent to the serializer before stopping.
Types ¶
type BufferedAggregator ¶
type BufferedAggregator struct { // metricSamplePool is a pool of slices of metric sample to avoid allocations. // Used by the Dogstatsd Batcher. MetricSamplePool *metrics.MetricSamplePool TickerChan <-chan time.Time // For test/benchmark purposes: it allows the flush to be controlled from the outside ServerlessFlush chan bool ServerlessFlushDone chan struct{} // [sts] MetricPrefix string // The prefix used for metrics generated in the aggregator. // contains filtered or unexported fields }
BufferedAggregator aggregates metrics in buckets for dogstatsd Metrics
func InitAggregator ¶
func InitAggregator(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string) *BufferedAggregator
InitAggregator returns the Singleton instance
func InitAggregatorWithFlushInterval ¶
func InitAggregatorWithFlushInterval(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string, flushInterval time.Duration) *BufferedAggregator
InitAggregatorWithFlushInterval returns the Singleton instance with a configured flush interval
func NewBufferedAggregator ¶
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string, flushInterval time.Duration) *BufferedAggregator
NewBufferedAggregator instantiates a BufferedAggregator
func (*BufferedAggregator) AddAgentStartupTelemetry ¶
func (agg *BufferedAggregator) AddAgentStartupTelemetry(agentVersion string)
AddAgentStartupTelemetry adds a startup event and count to be sent on the next flush
func (*BufferedAggregator) Flush ¶
func (agg *BufferedAggregator) Flush(start time.Time, waitForSerializer bool)
Flush flushes the data contained in the BufferedAggregator into the Forwarder. This method can be called from multiple routines.
func (*BufferedAggregator) GetBufferedChannels ¶
func (agg *BufferedAggregator) GetBufferedChannels() (chan []metrics.MetricSample, chan []*metrics.Event, chan []*metrics.ServiceCheck)
GetBufferedChannels returns a channel which can be subsequently used to send MetricSamples, Event or ServiceCheck
func (*BufferedAggregator) GetBufferedMetricsWithTsChannel ¶
func (agg *BufferedAggregator) GetBufferedMetricsWithTsChannel() chan []metrics.MetricSample
GetBufferedMetricsWithTsChannel returns the channel to send MetricSamples containing their timestamp.
func (*BufferedAggregator) GetChannels ¶
func (agg *BufferedAggregator) GetChannels() (chan *metrics.MetricSample, chan metrics.Event, chan metrics.ServiceCheck)
GetChannels returns a channel which can be subsequently used to send MetricSamples, Event or ServiceCheck
func (*BufferedAggregator) GetEventPlatformEvents ¶
func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message
GetEventPlatformEvents grabs the event platform events from the queue and clears them. Note that this works only if using the 'noop' event platform forwarder
func (*BufferedAggregator) GetEvents ¶
func (agg *BufferedAggregator) GetEvents() metrics.Events
GetEvents grabs the events from the queue and clears it
func (*BufferedAggregator) GetSeriesAndSketches ¶
func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)
GetSeriesAndSketches grabs all the series & sketches from the queue and clears the queue The parameter `before` is used as an end interval while retrieving series and sketches from the time sampler. Metrics and sketches before this timestamp should be returned.
func (*BufferedAggregator) GetServiceChecks ¶
func (agg *BufferedAggregator) GetServiceChecks() metrics.ServiceChecks
GetServiceChecks grabs all the service checks from the queue and clears the queue
func (*BufferedAggregator) IsInputQueueEmpty ¶
func (agg *BufferedAggregator) IsInputQueueEmpty() bool
IsInputQueueEmpty returns true if every input channel for the aggregator are empty. This is mainly useful for tests and benchmark
func (*BufferedAggregator) SetHostname ¶
func (agg *BufferedAggregator) SetHostname(hostname string)
SetHostname sets the hostname that the aggregator uses by default on all the data it sends Blocks until the main aggregator goroutine has finished handling the update
func (*BufferedAggregator) Stop ¶
func (agg *BufferedAggregator) Stop()
Stop stops the aggregator. Based on 'flushData' waiting metrics (from checks or closed dogstatsd buckets) will be sent to the serializer before stopping.
type CheckSampler ¶
type CheckSampler struct {
// contains filtered or unexported fields
}
CheckSampler aggregates metrics from one Check instance
type Context ¶
Context holds the elements that form a context, and can be serialized into a context key
type RawSender ¶
type RawSender interface { SendRawMetricSample(sample *metrics.MetricSample) SendRawServiceCheck(sc *metrics.ServiceCheck) Event(e metrics.Event) }
RawSender interface to submit samples to aggregator directly
type Sender ¶
type Sender interface { Commit() Gauge(metric string, value float64, hostname string, tags []string) Rate(metric string, value float64, hostname string, tags []string) Count(metric string, value float64, hostname string, tags []string) MonotonicCount(metric string, value float64, hostname string, tags []string) MonotonicCountWithFlushFirstValue(metric string, value float64, hostname string, tags []string, flushFirstValue bool) Counter(metric string, value float64, hostname string, tags []string) Histogram(metric string, value float64, hostname string, tags []string) Historate(metric string, value float64, hostname string, tags []string) ServiceCheck(checkName string, status metrics.ServiceCheckStatus, hostname string, tags []string, message string) HistogramBucket(metric string, value int64, lowerBound, upperBound float64, monotonic bool, hostname string, tags []string, flushFirstValue bool) Event(e metrics.Event) EventPlatformEvent(rawEvent string, eventType string) GetSenderStats() check.SenderStats DisableDefaultHostname(disable bool) SetCheckCustomTags(tags []string) SetCheckService(service string) FinalizeCheckServiceTag() OrchestratorMetadata(msgs []serializer.ProcessMessageBody, clusterID string, nodeType int) }
Sender allows sending metrics from checks/a check
func GetDefaultSender ¶
GetDefaultSender returns the default sender
type SerieSignature ¶
type SerieSignature struct {
// contains filtered or unexported fields
}
SerieSignature holds the elements that allow to know whether two similar `Serie`s from the same bucket can be merged into one
type Stats ¶
type Stats struct { Flushes [32]int64 // circular buffer of recent flushes stat FlushIndex int // last flush position in circular buffer LastFlush int64 // most recent flush stat, provided for convenience Name string // contains filtered or unexported fields }
Stats stores a statistic from several past flushes allowing computations like median or percentiles
type TimeSampler ¶
type TimeSampler struct {
// contains filtered or unexported fields
}
TimeSampler aggregates metrics by buckets of 'interval' seconds
func NewTimeSampler ¶
func NewTimeSampler(interval int64) *TimeSampler
NewTimeSampler returns a newly initialized TimeSampler