README
¶
package aggregator
The Aggregator is the first thing a metric hits during its journey towards the intake. This package is responsible for metrics reception and aggregation before passing them to the forwarder. It computes rates and histograms and passes them to the Serializer.
For now sources of metrics are DogStatsD and Python/Go checks. DogStatsD directly send MetricSample to the Aggregator while checks use the sender to do so.
MetricSample are the raw metric value that flow from our 2 sources to the different metric types (Gauge, Count, ...).
+===========+ +===============+
+ DogStatsD + + checks +
+===========+ | Python and Go |
++ +===============+
|| ++
|| vv
|| .+------+.
|| . Sender .
|| '+---+--+'
|| |
vv v
.+----------------------------------------+--+.
+ Aggregator +
'+--+-------------------------------------+--+'
| |
| |
v v
.+-----+-----+. .+-----+------+.
+ TimeSampler + + CheckSampler +
'+-----+-----+' '+-----+------+'
| |
| |
+ .+---------------+. +
'+------->+ ContextMetrics +<-------+'
'+-------+-------+'
|
v
.+-------+-------+.
+ Metrics +
| Gauge |
| Count |
| Histogram |
| Rate |
| Set |
+ ... +
'+--------+------+'
|| +=================+
++==============>+ Serializer |
+=================+
Sender
The Sender is used by calling code (namely: checks) that wants to send metric
samples upstream. Sender exposes a high level interface mapping to different
metric types supported upstream (Gauges, Counters, etc). To get an instance of
the global default sender, call GetDefaultSender
, the function will take care
of initialising everything, Aggregator included.
Aggregator
For now the package provides only one Aggregator implementation, the
BufferedAggregator
, named after its capabilities of storing in memory the
samples it receives. The Aggregator should be used as a singleton, the function
InitAggregator
takes care of this and should be considered the right way to
get an Aggregator instance at any time. An Aggregator has its own loop that
needs to be started with the run
method, in the case of the
BufferedAggregator
the buffer is flushed at defined intervals. An Aggregator
receives metric samples using one or more channels and those samples are
processed by different samplers (TimeSampler
or CheckSampler
).
Sampler
Metrics come this way as samples (e.g. in case of rates, the actual metric is computed over samples in a given time) and samplers take care of store and process them depending on where samples come from. We currently use two different samplers, one for samples coming from Dogstatsd, the other one for samples coming from checks. In the latter case, we have one sampler instance per check instance (this is to support running the same check at different intervals).
Metric
We have different kind of metrics (Gauge, Count, ...). Those are responsible to
compute final Serie
(set of points) to forwarde the the Datadog backend.
Documentation
¶
Index ¶
- Constants
- func AddRecurrentSeries(newSerie *metrics.Serie)
- func DestroySender(id check.ID)
- func SetDefaultAggregator(agg *BufferedAggregator)
- func SetSender(sender Sender, id check.ID) error
- func StopDefaultAggregator()
- type BufferedAggregator
- func (agg *BufferedAggregator) AddAgentStartupTelemetry(agentVersion string)
- func (agg *BufferedAggregator) Flush(start time.Time, waitForSerializer bool)
- func (agg *BufferedAggregator) GetBufferedChannels() (chan []metrics.MetricSample, chan []*metrics.Event, ...)
- func (agg *BufferedAggregator) GetBufferedMetricsWithTsChannel() chan []metrics.MetricSample
- func (agg *BufferedAggregator) GetChannels() (chan *metrics.MetricSample, chan metrics.Event, chan metrics.ServiceCheck)
- func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message
- func (agg *BufferedAggregator) GetEvents() metrics.Events
- func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)
- func (agg *BufferedAggregator) GetServiceChecks() metrics.ServiceChecks
- func (agg *BufferedAggregator) IsInputQueueEmpty() bool
- func (agg *BufferedAggregator) SetHostname(hostname string)
- func (agg *BufferedAggregator) Stop()
- type CheckSampler
- type Context
- type RawSender
- type Sender
- type SerieSignature
- type Stats
- type TimeSampler
Constants ¶
const DefaultFlushInterval = 15 * time.Second // flush interval
DefaultFlushInterval aggregator default flush interval
const MetricSamplePoolBatchSize = 32
MetricSamplePoolBatchSize is the batch size of the metric sample pool.
Variables ¶
This section is empty.
Functions ¶
func AddRecurrentSeries ¶
AddRecurrentSeries adds a serie to the series that are sent at every flush
func DestroySender ¶
DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator) Should be called when no sender with this ID is used anymore The metrics of this (these) sender(s) that haven't been flushed yet will be lost
func SetDefaultAggregator ¶
func SetDefaultAggregator(agg *BufferedAggregator)
SetDefaultAggregator allows to force a custom Aggregator as the default one and run it. This is useful for testing or benchmarking.
func SetSender ¶
SetSender returns the passed sender with the passed ID. This is largely for testing purposes
func StopDefaultAggregator ¶
func StopDefaultAggregator()
StopDefaultAggregator stops the default aggregator. Based on 'flushData' waiting metrics (from checks or closed dogstatsd buckets) will be sent to the serializer before stopping.
Types ¶
type BufferedAggregator ¶
type BufferedAggregator struct { // metricSamplePool is a pool of slices of metric sample to avoid allocations. // Used by the Dogstatsd Batcher. MetricSamplePool *metrics.MetricSamplePool TickerChan <-chan time.Time // For test/benchmark purposes: it allows the flush to be controlled from the outside ServerlessFlush chan bool ServerlessFlushDone chan struct{} // [sts] MetricPrefix string // The prefix used for metrics generated in the aggregator. // contains filtered or unexported fields }
BufferedAggregator aggregates metrics in buckets for dogstatsd Metrics
func InitAggregator ¶
func InitAggregator(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string) *BufferedAggregator
InitAggregator returns the Singleton instance
func InitAggregatorWithFlushInterval ¶
func InitAggregatorWithFlushInterval(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string, flushInterval time.Duration) *BufferedAggregator
InitAggregatorWithFlushInterval returns the Singleton instance with a configured flush interval
func NewBufferedAggregator ¶
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder epforwarder.EventPlatformForwarder, hostname string, flushInterval time.Duration) *BufferedAggregator
NewBufferedAggregator instantiates a BufferedAggregator
func (*BufferedAggregator) AddAgentStartupTelemetry ¶
func (agg *BufferedAggregator) AddAgentStartupTelemetry(agentVersion string)
AddAgentStartupTelemetry adds a startup event and count to be sent on the next flush
func (*BufferedAggregator) Flush ¶
func (agg *BufferedAggregator) Flush(start time.Time, waitForSerializer bool)
Flush flushes the data contained in the BufferedAggregator into the Forwarder. This method can be called from multiple routines.
func (*BufferedAggregator) GetBufferedChannels ¶
func (agg *BufferedAggregator) GetBufferedChannels() (chan []metrics.MetricSample, chan []*metrics.Event, chan []*metrics.ServiceCheck)
GetBufferedChannels returns a channel which can be subsequently used to send MetricSamples, Event or ServiceCheck
func (*BufferedAggregator) GetBufferedMetricsWithTsChannel ¶
func (agg *BufferedAggregator) GetBufferedMetricsWithTsChannel() chan []metrics.MetricSample
GetBufferedMetricsWithTsChannel returns the channel to send MetricSamples containing their timestamp.
func (*BufferedAggregator) GetChannels ¶
func (agg *BufferedAggregator) GetChannels() (chan *metrics.MetricSample, chan metrics.Event, chan metrics.ServiceCheck)
GetChannels returns a channel which can be subsequently used to send MetricSamples, Event or ServiceCheck
func (*BufferedAggregator) GetEventPlatformEvents ¶
func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message
GetEventPlatformEvents grabs the event platform events from the queue and clears them. Note that this works only if using the 'noop' event platform forwarder
func (*BufferedAggregator) GetEvents ¶
func (agg *BufferedAggregator) GetEvents() metrics.Events
GetEvents grabs the events from the queue and clears it
func (*BufferedAggregator) GetSeriesAndSketches ¶
func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)
GetSeriesAndSketches grabs all the series & sketches from the queue and clears the queue The parameter `before` is used as an end interval while retrieving series and sketches from the time sampler. Metrics and sketches before this timestamp should be returned.
func (*BufferedAggregator) GetServiceChecks ¶
func (agg *BufferedAggregator) GetServiceChecks() metrics.ServiceChecks
GetServiceChecks grabs all the service checks from the queue and clears the queue
func (*BufferedAggregator) IsInputQueueEmpty ¶
func (agg *BufferedAggregator) IsInputQueueEmpty() bool
IsInputQueueEmpty returns true if every input channel for the aggregator are empty. This is mainly useful for tests and benchmark
func (*BufferedAggregator) SetHostname ¶
func (agg *BufferedAggregator) SetHostname(hostname string)
SetHostname sets the hostname that the aggregator uses by default on all the data it sends Blocks until the main aggregator goroutine has finished handling the update
func (*BufferedAggregator) Stop ¶
func (agg *BufferedAggregator) Stop()
Stop stops the aggregator. Based on 'flushData' waiting metrics (from checks or closed dogstatsd buckets) will be sent to the serializer before stopping.
type CheckSampler ¶
type CheckSampler struct {
// contains filtered or unexported fields
}
CheckSampler aggregates metrics from one Check instance
type Context ¶
Context holds the elements that form a context, and can be serialized into a context key
type RawSender ¶
type RawSender interface { SendRawMetricSample(sample *metrics.MetricSample) SendRawServiceCheck(sc *metrics.ServiceCheck) Event(e metrics.Event) }
RawSender interface to submit samples to aggregator directly
type Sender ¶
type Sender interface { Commit() Gauge(metric string, value float64, hostname string, tags []string) Rate(metric string, value float64, hostname string, tags []string) Count(metric string, value float64, hostname string, tags []string) MonotonicCount(metric string, value float64, hostname string, tags []string) MonotonicCountWithFlushFirstValue(metric string, value float64, hostname string, tags []string, flushFirstValue bool) Counter(metric string, value float64, hostname string, tags []string) Histogram(metric string, value float64, hostname string, tags []string) Historate(metric string, value float64, hostname string, tags []string) ServiceCheck(checkName string, status metrics.ServiceCheckStatus, hostname string, tags []string, message string) HistogramBucket(metric string, value int64, lowerBound, upperBound float64, monotonic bool, hostname string, tags []string, flushFirstValue bool) Event(e metrics.Event) EventPlatformEvent(rawEvent string, eventType string) GetSenderStats() check.SenderStats DisableDefaultHostname(disable bool) SetCheckCustomTags(tags []string) SetCheckService(service string) FinalizeCheckServiceTag() OrchestratorMetadata(msgs []serializer.ProcessMessageBody, clusterID string, nodeType int) }
Sender allows sending metrics from checks/a check
func GetDefaultSender ¶
GetDefaultSender returns the default sender
type SerieSignature ¶
type SerieSignature struct {
// contains filtered or unexported fields
}
SerieSignature holds the elements that allow to know whether two similar `Serie`s from the same bucket can be merged into one
type Stats ¶
type Stats struct { Flushes [32]int64 // circular buffer of recent flushes stat FlushIndex int // last flush position in circular buffer LastFlush int64 // most recent flush stat, provided for convenience Name string // contains filtered or unexported fields }
Stats stores a statistic from several past flushes allowing computations like median or percentiles
type TimeSampler ¶
type TimeSampler struct {
// contains filtered or unexported fields
}
TimeSampler aggregates metrics by buckets of 'interval' seconds
func NewTimeSampler ¶
func NewTimeSampler(interval int64) *TimeSampler
NewTimeSampler returns a newly initialized TimeSampler