aggregator

package
v0.0.0-...-949e29e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 54 Imported by: 37

README

package aggregator

The Aggregator is the first thing a metric hits during its journey towards the intake. This package is responsible for metrics reception and aggregation before passing them to the forwarder. It computes rates and histograms and passes them to the Serializer.

For now sources of metrics are DogStatsD and Python/Go checks. DogStatsD directly send MetricSample to the Aggregator while checks use the sender to do so.

MetricSample are the raw metric value that flow from our 2 sources to the different metric types (Gauge, Count, ...).

     +===========+                       +===============+
     + DogStatsD +                       +    checks     +
     +===========+                       | Python and Go |
          ++                             +===============+
          ||                                    ++
          ||                                    vv
          ||                                .+------+.
          ||                                . Sender .
          ||                                '+---+--+'
          ||                                     |
          vv                                     v
       .+----------------------------------------+--+.
       +                 Aggregator                  +
       '+--+-------------------------------------+--+'
           |                                     |
           |                                     |
           v                                     v
    .+-----+-----+.                       .+-----+------+.
    + TimeSampler +                       + CheckSampler +
    '+-----+-----+'                       '+-----+------+'
           |                                     |
           |                                     |
           +         .+---------------+.         +
           '+------->+ ContextMetrics  +<-------+'
                     '+-------+-------+'
                              |
                              v
                     .+-------+-------+.
                     +     Metrics     +
                     | Gauge           |
                     | Count           |
                     | Histogram       |
                     | Rate            |
                     | Set             |
                     + ...             +
                     '+--------+------+'
                              ||               +=================+
                              ++==============>+  Serializer     |
                                               +=================+

Sender

The Sender is used by calling code (namely: checks) that wants to send metric samples upstream. Sender exposes a high level interface mapping to different metric types supported upstream (Gauges, Counters, etc). To get an instance of the global default sender, call GetDefaultSender, the function will take care of initialising everything, Aggregator included.

Aggregator

For now the package provides only one Aggregator implementation, the BufferedAggregator, named after its capabilities of storing in memory the samples it receives. The Aggregator should be used as a singleton, the function InitAggregator takes care of this and should be considered the right way to get an Aggregator instance at any time. An Aggregator has its own loop that needs to be started with the run method, in the case of the BufferedAggregator the buffer is flushed at defined intervals. An Aggregator receives metric samples using one or more channels and those samples are processed by different samplers (TimeSampler or CheckSampler).

Sampler

Metrics come this way as samples (e.g. in case of rates, the actual metric is computed over samples in a given time) and samplers take care of store and process them depending on where samples come from. We currently use two different samplers, one for samples coming from Dogstatsd, the other one for samples coming from checks. In the latter case, we have one sampler instance per check instance (this is to support running the same check at different intervals).

Metric

We have different kind of metrics (Gauge, Count, ...). Those are responsible to compute final Serie (set of points) to forwarde the the Datadog backend.

Documentation

Index

Constants

View Source
const (
	// DefaultFlushInterval aggregator default flush interval
	DefaultFlushInterval = 15 * time.Second // flush interval

	// MetricSamplePoolBatchSize is the batch size of the metric sample pool.
	MetricSamplePoolBatchSize = 32
)
View Source
const (
	// AutoAdjustStrategyMaxThroughput will adapt the number of pipelines for maximum throughput
	AutoAdjustStrategyMaxThroughput = "max_throughput"
	// AutoAdjustStrategyPerOrigin will adapt the number of pipelines for better container isolation
	AutoAdjustStrategyPerOrigin = "per_origin"
)
View Source
const (
	// ContextSizeInBytes is the size of a context in bytes
	// We count the size of the context key with the context.
	ContextSizeInBytes = int(unsafe.Sizeof(Context{})) + int(unsafe.Sizeof(ckey.ContextKey(0)))
)

Variables

This section is empty.

Functions

func AddRecurrentSeries

func AddRecurrentSeries(newSerie *metrics.Serie)

AddRecurrentSeries adds a serie to the series that are sent at every flush

func GetDogStatsDWorkerAndPipelineCount

func GetDogStatsDWorkerAndPipelineCount() (int, int)

GetDogStatsDWorkerAndPipelineCount returns how many routines should be spawned for the DogStatsD workers and how many DogStatsD pipeline should be running.

Types

type AgentDemultiplexer

type AgentDemultiplexer struct {
	// contains filtered or unexported fields
}

AgentDemultiplexer is the demultiplexer implementation for the main Agent.

func InitAndStartAgentDemultiplexer

func InitAndStartAgentDemultiplexer(
	log log.Component,
	sharedForwarder forwarder.Forwarder,
	orchestratorForwarder orchestratorforwarder.Component,
	options AgentDemultiplexerOptions,
	eventPlatformForwarder eventplatform.Component,
	haAgent haagent.Component,
	compressor compression.Component,
	tagger tagger.Component,
	hostname string) *AgentDemultiplexer

InitAndStartAgentDemultiplexer creates a new Demultiplexer and runs what's necessary in goroutines. As of today, only the embedded BufferedAggregator needs a separate goroutine. In the future, goroutines will be started for the event platform forwarder and/or orchestrator forwarder.

func (*AgentDemultiplexer) AddAgentStartupTelemetry

func (d *AgentDemultiplexer) AddAgentStartupTelemetry(agentVersion string)

AddAgentStartupTelemetry adds a startup event and count (in a DSD time sampler) to be sent on the next flush.

func (*AgentDemultiplexer) AggregateCheckSample

func (d *AgentDemultiplexer) AggregateCheckSample(_ metrics.MetricSample)

AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.

func (*AgentDemultiplexer) AggregateSample

func (d *AgentDemultiplexer) AggregateSample(sample metrics.MetricSample)

AggregateSample adds a MetricSample in the first DogStatsD time sampler.

func (*AgentDemultiplexer) AggregateSamples

func (d *AgentDemultiplexer) AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch)

AggregateSamples adds a batch of MetricSample into the given DogStatsD time sampler shard. If you have to submit a single metric sample see `AggregateSample`.

func (*AgentDemultiplexer) Aggregator

func (d *AgentDemultiplexer) Aggregator() *BufferedAggregator

Aggregator returns an aggregator that anyone can use. This method exists to keep compatibility with existing code while introducing the Demultiplexer, however, the plan is to remove it anytime soon.

Deprecated.

func (*AgentDemultiplexer) DestroySender

func (d *AgentDemultiplexer) DestroySender(id checkid.ID)

DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator) Should be called when no sender with this ID is used anymore The metrics of this (these) sender(s) that haven't been flushed yet will be lost

func (*AgentDemultiplexer) DumpDogstatsdContexts

func (d *AgentDemultiplexer) DumpDogstatsdContexts(dest io.Writer) error

DumpDogstatsdContexts writes the current state of the context resolver to dest.

This blocks metrics processing, so dest is expected to be reasonably fast and not block for too long.

func (*AgentDemultiplexer) ForceFlushToSerializer

func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool)

ForceFlushToSerializer triggers the execution of a flush from all data of samplers and the BufferedAggregator to the serializer. Safe to call from multiple threads.

func (*AgentDemultiplexer) GetDefaultSender

func (d *AgentDemultiplexer) GetDefaultSender() (sender.Sender, error)

GetDefaultSender returns a default sender.

func (*AgentDemultiplexer) GetDogStatsDPipelinesCount

func (d *AgentDemultiplexer) GetDogStatsDPipelinesCount() int

GetDogStatsDPipelinesCount returns how many sampling pipeline are running for the DogStatsD samples.

func (*AgentDemultiplexer) GetEventPlatformForwarder

func (d *AgentDemultiplexer) GetEventPlatformForwarder() (eventplatform.Forwarder, error)

GetEventPlatformForwarder returns underlying events and service checks channels.

func (*AgentDemultiplexer) GetEventsAndServiceChecksChannels

func (d *AgentDemultiplexer) GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)

GetEventsAndServiceChecksChannels returneds underlying events and service checks channels.

func (*AgentDemultiplexer) GetMetricSamplePool

func (d *AgentDemultiplexer) GetMetricSamplePool() *metrics.MetricSamplePool

GetMetricSamplePool returns a shared resource used in the whole DogStatsD pipeline to re-use metric samples slices: the server is getting a slice and filling it with samples, the rest of the pipeline process them the end of line (the time sampler) is putting back the slice in the pool. Main idea is to reduce the garbage generated by slices allocation.

func (*AgentDemultiplexer) GetSender

func (d *AgentDemultiplexer) GetSender(id checkid.ID) (sender.Sender, error)

GetSender returns a sender.Sender with passed ID, properly registered with the aggregator If no error is returned here, DestroySender must be called with the same ID once the sender is not used anymore

func (*AgentDemultiplexer) Options

Options returns options used during the demux initialization.

func (*AgentDemultiplexer) SendSamplesWithoutAggregation

func (d *AgentDemultiplexer) SendSamplesWithoutAggregation(samples metrics.MetricSampleBatch)

SendSamplesWithoutAggregation buffers a bunch of metrics with timestamp. This data will be directly transmitted "as-is" (i.e. no aggregation, no sampling) to the serializer.

func (*AgentDemultiplexer) Serializer

Serializer returns a serializer that anyone can use. This method exists to keep compatibility with existing code while introducing the Demultiplexer, however, the plan is to remove it anytime soon.

Deprecated.

func (*AgentDemultiplexer) SetSender

func (d *AgentDemultiplexer) SetSender(s sender.Sender, id checkid.ID) error

SetSender returns the passed sender with the passed ID. This is largely for testing purposes

func (*AgentDemultiplexer) Stop

func (d *AgentDemultiplexer) Stop(flush bool)

Stop stops the demultiplexer. Resources are released, the instance should not be used after a call to `Stop()`.

type AgentDemultiplexerOptions

type AgentDemultiplexerOptions struct {
	FlushInterval time.Duration

	EnableNoAggregationPipeline bool

	DontStartForwarders bool // unit tests don't need the forwarders to be instanciated

	UseDogstatsdContextLimiter bool
	DogstatsdMaxMetricsTags    int
}

AgentDemultiplexerOptions are the options used to initialize a Demultiplexer.

func DefaultAgentDemultiplexerOptions

func DefaultAgentDemultiplexerOptions() AgentDemultiplexerOptions

DefaultAgentDemultiplexerOptions returns the default options to initialize an AgentDemultiplexer.

type AgentDemultiplexerPrinter

type AgentDemultiplexerPrinter struct {
	DemultiplexerWithAggregator
}

AgentDemultiplexerPrinter is used to output series, sketches, service checks and events. Today, this is only used by the `agent check` command.

func (AgentDemultiplexerPrinter) GetMetricsDataForPrint

func (p AgentDemultiplexerPrinter) GetMetricsDataForPrint() map[string]interface{}

GetMetricsDataForPrint returns metrics data for series and sketches for printing purpose.

func (AgentDemultiplexerPrinter) PrintMetrics

func (p AgentDemultiplexerPrinter) PrintMetrics(checkFileOutput *bytes.Buffer, formatTable bool)

PrintMetrics prints metrics aggregator in the Demultiplexer's check samplers (series and sketches), service checks buffer, events buffers.

type BufferedAggregator

type BufferedAggregator struct {

	// metricSamplePool is a pool of slices of metric sample to avoid allocations.
	// Used by the Dogstatsd Batcher.
	MetricSamplePool *metrics.MetricSamplePool
	// contains filtered or unexported fields
}

BufferedAggregator aggregates metrics in buckets for dogstatsd Metrics

func NewBufferedAggregator

func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator

NewBufferedAggregator instantiates a BufferedAggregator

func (*BufferedAggregator) Flush added in v0.9.0

func (agg *BufferedAggregator) Flush(trigger flushTrigger)

Flush flushes the data contained in the BufferedAggregator into the Forwarder. This method can be called from multiple routines.

func (*BufferedAggregator) GetBufferedChannels

func (agg *BufferedAggregator) GetBufferedChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)

GetBufferedChannels returns a channel which can be subsequently used to send Event or ServiceCheck.

func (*BufferedAggregator) GetEventPlatformEvents added in v0.9.0

func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message

GetEventPlatformEvents grabs the event platform events from the queue and clears them. Note that this works only if using the 'noop' event platform forwarder

func (*BufferedAggregator) GetEventPlatformForwarder

func (agg *BufferedAggregator) GetEventPlatformForwarder() (eventplatform.Forwarder, error)

GetEventPlatformForwarder returns a event platform forwarder

func (*BufferedAggregator) GetEvents

func (agg *BufferedAggregator) GetEvents() event.Events

GetEvents grabs the events from the queue and clears it

func (*BufferedAggregator) GetSeriesAndSketches

func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)

GetSeriesAndSketches grabs all the series & sketches from the queue and clears the queue The parameter `before` is used as an end interval while retrieving series and sketches from the time sampler. Metrics and sketches before this timestamp should be returned.

func (*BufferedAggregator) GetServiceChecks

func (agg *BufferedAggregator) GetServiceChecks() servicecheck.ServiceChecks

GetServiceChecks grabs all the service checks from the queue and clears the queue

func (*BufferedAggregator) IsInputQueueEmpty

func (agg *BufferedAggregator) IsInputQueueEmpty() bool

IsInputQueueEmpty returns true if every input channel for the aggregator are empty. This is mainly useful for tests and benchmark

func (*BufferedAggregator) Stop

func (agg *BufferedAggregator) Stop()

Stop stops the aggregator.

type CheckSampler

type CheckSampler struct {
	// contains filtered or unexported fields
}

CheckSampler aggregates metrics from one Check instance

type Context

type Context struct {
	Name string
	Host string
	// contains filtered or unexported fields
}

Context holds the elements that form a context, and can be serialized into a context key

func (*Context) DataSizeInBytes

func (c *Context) DataSizeInBytes() int

DataSizeInBytes returns the size of the context data in bytes

func (*Context) SizeInBytes

func (c *Context) SizeInBytes() int

SizeInBytes returns the size of the context in bytes

func (*Context) Tags

func (c *Context) Tags() tagset.CompositeTags

Tags returns tags for the context.

type ContextDebugRepr

type ContextDebugRepr struct {
	Name       string
	Host       string
	Type       string
	TaggerTags []string
	MetricTags []string
	NoIndex    bool
	Source     metrics.MetricSource
}

ContextDebugRepr is the on-disk representation of a context.

type Demultiplexer

type Demultiplexer interface {
	// General
	// --
	// Serializer returns the serializer used by the Demultiplexer instance.
	Serializer() serializer.MetricSerializer

	// AggregateSample sends a MetricSample to the DogStatsD time sampler.
	// In sharded implementation, the metric is sent to the first time sampler.
	AggregateSample(sample metrics.MetricSample)
	// AggregateSamples sends a batch of MetricSample to the given DogStatsD
	// time sampler shard.
	// Implementation not supporting sharding may ignore the `shard` parameter.
	AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch)

	// SendSamplesWithoutAggregation pushes metrics in the no-aggregation pipeline: a pipeline
	// where the metrics are not sampled and sent as-is.
	// This is the method to use to send metrics with a valid timestamp attached.
	SendSamplesWithoutAggregation(metrics metrics.MetricSampleBatch)

	// ForceFlushToSerializer flushes all the aggregated data from the different samplers to
	// the serialization/forwarding parts.
	ForceFlushToSerializer(start time.Time, waitForSerializer bool)
	// GetMetricSamplePool returns a shared resource used in the whole DogStatsD
	// pipeline to re-use metric samples slices: the server is getting a slice
	// and filling it with samples, the rest of the pipeline process them the
	// end of line (the time sampler) is putting back the slice in the pool.
	// Main idea is to reduce the garbage generated by slices allocation.
	GetMetricSamplePool() *metrics.MetricSamplePool

	// Senders API, mainly used by collectors/checks
	// --
	sender.SenderManager
}

Demultiplexer is composed of multiple samplers (check and time/dogstatsd) a shared forwarder, the event platform forwarder, orchestrator data buffers and other data that need to be sent to the forwarders. AgentDemultiplexerOptions let you configure which forwarders have to be started.

type DemultiplexerWithAggregator

type DemultiplexerWithAggregator interface {
	Demultiplexer
	Aggregator() *BufferedAggregator
	// AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
	AggregateCheckSample(sample metrics.MetricSample)
	Options() AgentDemultiplexerOptions
	GetEventPlatformForwarder() (eventplatform.Forwarder, error)
	GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)
	DumpDogstatsdContexts(io.Writer) error
}

DemultiplexerWithAggregator is a Demultiplexer running an Aggregator. This flavor uses a AgentDemultiplexerOptions struct for startup configuration.

type FlushAndSerializeInParallel

type FlushAndSerializeInParallel struct {
	ChannelSize int
	BufferSize  int
}

FlushAndSerializeInParallel contains options for flushing metrics and serializing in parallel.

func NewFlushAndSerializeInParallel

func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInParallel

NewFlushAndSerializeInParallel creates a new instance of FlushAndSerializeInParallel.

type HostTagProvider

type HostTagProvider struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewHostTagProvider

func NewHostTagProvider() *HostTagProvider

func (*HostTagProvider) GetHostTags

func (p *HostTagProvider) GetHostTags() []string

type RawSender

type RawSender interface {
	SendRawMetricSample(sample *metrics.MetricSample)
	SendRawServiceCheck(sc *servicecheck.ServiceCheck)
	Event(e event.Event)
}

RawSender interface to submit samples to aggregator directly

type SerieSignature

type SerieSignature struct {
	// contains filtered or unexported fields
}

SerieSignature holds the elements that allow to know whether two similar `Serie`s from the same bucket can be merged into one. Series must have the same contextKey.

type ServerlessDemultiplexer

type ServerlessDemultiplexer struct {
	// contains filtered or unexported fields
}

ServerlessDemultiplexer is a simple demultiplexer used by the serverless flavor of the Agent

func InitAndStartServerlessDemultiplexer

func InitAndStartServerlessDemultiplexer(keysPerDomain map[string][]string, forwarderTimeout time.Duration, tagger tagger.Component) *ServerlessDemultiplexer

InitAndStartServerlessDemultiplexer creates and starts new Demultiplexer for the serverless agent.

func (*ServerlessDemultiplexer) AggregateSample

func (d *ServerlessDemultiplexer) AggregateSample(sample metrics.MetricSample)

AggregateSample send a MetricSample to the TimeSampler.

func (*ServerlessDemultiplexer) AggregateSamples

func (d *ServerlessDemultiplexer) AggregateSamples(_ TimeSamplerID, samples metrics.MetricSampleBatch)

AggregateSamples send a MetricSampleBatch to the TimeSampler. The ServerlessDemultiplexer is not using sharding in its DogStatsD pipeline, the `shard` parameter is ignored. In the Serverless Agent, consider using `AggregateSample` instead.

func (ServerlessDemultiplexer) DestroySender

func (s ServerlessDemultiplexer) DestroySender(id checkid.ID)

DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator) Should be called when no sender with this ID is used anymore The metrics of this (these) sender(s) that haven't been flushed yet will be lost

func (*ServerlessDemultiplexer) ForceFlushToSerializer

func (d *ServerlessDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool)

ForceFlushToSerializer flushes all data from the time sampler to the serializer.

func (ServerlessDemultiplexer) GetDefaultSender

func (s ServerlessDemultiplexer) GetDefaultSender() (sender.Sender, error)

GetDefaultSender returns a default sender.

func (*ServerlessDemultiplexer) GetMetricSamplePool

func (d *ServerlessDemultiplexer) GetMetricSamplePool() *metrics.MetricSamplePool

GetMetricSamplePool returns a shared resource used in the whole DogStatsD pipeline to re-use metric samples slices: the server is getting a slice and filling it with samples, the rest of the pipeline process them the end of line (the time sampler) is putting back the slice in the pool. Main idea is to reduce the garbage generated by slices allocation.

func (ServerlessDemultiplexer) GetSender

func (s ServerlessDemultiplexer) GetSender(cid checkid.ID) (sender.Sender, error)

GetSender returns a sender.Sender with passed ID, properly registered with the aggregator If no error is returned here, DestroySender must be called with the same ID once the sender is not used anymore

func (*ServerlessDemultiplexer) Run

func (d *ServerlessDemultiplexer) Run()

Run runs all demultiplexer parts

func (*ServerlessDemultiplexer) SendSamplesWithoutAggregation

func (d *ServerlessDemultiplexer) SendSamplesWithoutAggregation(_ metrics.MetricSampleBatch)

SendSamplesWithoutAggregation is not supported in the Serverless Agent implementation.

func (*ServerlessDemultiplexer) Serializer

Serializer returns the shared serializer

func (ServerlessDemultiplexer) SetSender

func (s ServerlessDemultiplexer) SetSender(sender sender.Sender, id checkid.ID) error

SetSender returns the passed sender with the passed ID. This is largely for testing purposes

func (*ServerlessDemultiplexer) Stop

func (d *ServerlessDemultiplexer) Stop(flush bool)

Stop stops the wrapped aggregator and the forwarder.

type Stats

type Stats struct {
	Flushes    [32]int64 // circular buffer of recent flushes stat
	FlushIndex int       // last flush position in circular buffer
	LastFlush  int64     // most recent flush stat, provided for convenience
	Name       string
	// contains filtered or unexported fields
}

Stats stores a statistic from several past flushes allowing computations like median or percentiles

type TimeSampler

type TimeSampler struct {
	// contains filtered or unexported fields
}

TimeSampler aggregates metrics by buckets of 'interval' seconds

func NewTimeSampler

func NewTimeSampler(id TimeSamplerID, interval int64, cache *tags.Store, tagger tagger.Component, hostname string) *TimeSampler

NewTimeSampler returns a newly initialized TimeSampler

type TimeSamplerID

type TimeSamplerID int

TimeSamplerID is a type ID for sharded time samplers.

Directories

Path Synopsis
ckey module
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL