Documentation ¶
Index ¶
- Constants
- func AddRecurrentSeries(newSerie *metrics.Serie)
- func GetDogStatsDWorkerAndPipelineCount() (int, int)
- type AgentDemultiplexer
- func (d *AgentDemultiplexer) AddAgentStartupTelemetry(agentVersion string)
- func (d *AgentDemultiplexer) AggregateCheckSample(_ metrics.MetricSample)
- func (d *AgentDemultiplexer) AggregateSample(sample metrics.MetricSample)
- func (d *AgentDemultiplexer) AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch)
- func (d *AgentDemultiplexer) Aggregator() *BufferedAggregator
- func (d *AgentDemultiplexer) DestroySender(id checkid.ID)
- func (d *AgentDemultiplexer) DumpDogstatsdContexts(dest io.Writer) error
- func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool)
- func (d *AgentDemultiplexer) GetDefaultSender() (sender.Sender, error)
- func (d *AgentDemultiplexer) GetDogStatsDPipelinesCount() int
- func (d *AgentDemultiplexer) GetEventPlatformForwarder() (eventplatform.Forwarder, error)
- func (d *AgentDemultiplexer) GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)
- func (d *AgentDemultiplexer) GetMetricSamplePool() *metrics.MetricSamplePool
- func (d *AgentDemultiplexer) GetSender(id checkid.ID) (sender.Sender, error)
- func (d *AgentDemultiplexer) Options() AgentDemultiplexerOptions
- func (d *AgentDemultiplexer) SendSamplesWithoutAggregation(samples metrics.MetricSampleBatch)
- func (d *AgentDemultiplexer) Serializer() serializer.MetricSerializer
- func (d *AgentDemultiplexer) SetSender(s sender.Sender, id checkid.ID) error
- func (d *AgentDemultiplexer) Stop(flush bool)
- type AgentDemultiplexerOptions
- type AgentDemultiplexerPrinter
- type BufferedAggregator
- func (agg *BufferedAggregator) Flush(trigger flushTrigger)
- func (agg *BufferedAggregator) GetBufferedChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)
- func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message
- func (agg *BufferedAggregator) GetEventPlatformForwarder() (eventplatform.Forwarder, error)
- func (agg *BufferedAggregator) GetEvents() event.Events
- func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)
- func (agg *BufferedAggregator) GetServiceChecks() servicecheck.ServiceChecks
- func (agg *BufferedAggregator) IsInputQueueEmpty() bool
- func (agg *BufferedAggregator) Stop()
- type CheckSampler
- type Context
- type ContextDebugRepr
- type Demultiplexer
- type DemultiplexerWithAggregator
- type FlushAndSerializeInParallel
- type HostTagProvider
- type RawSender
- type SerieSignature
- type ServerlessDemultiplexer
- func (d *ServerlessDemultiplexer) AggregateSample(sample metrics.MetricSample)
- func (d *ServerlessDemultiplexer) AggregateSamples(_ TimeSamplerID, samples metrics.MetricSampleBatch)
- func (s ServerlessDemultiplexer) DestroySender(id checkid.ID)
- func (d *ServerlessDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool)
- func (s ServerlessDemultiplexer) GetDefaultSender() (sender.Sender, error)
- func (d *ServerlessDemultiplexer) GetMetricSamplePool() *metrics.MetricSamplePool
- func (s ServerlessDemultiplexer) GetSender(cid checkid.ID) (sender.Sender, error)
- func (d *ServerlessDemultiplexer) Run()
- func (d *ServerlessDemultiplexer) SendSamplesWithoutAggregation(_ metrics.MetricSampleBatch)
- func (d *ServerlessDemultiplexer) Serializer() serializer.MetricSerializer
- func (s ServerlessDemultiplexer) SetSender(sender sender.Sender, id checkid.ID) error
- func (d *ServerlessDemultiplexer) Stop(flush bool)
- type Stats
- type TimeSampler
- type TimeSamplerID
Constants ¶
const ( // DefaultFlushInterval aggregator default flush interval DefaultFlushInterval = 15 * time.Second // flush interval // MetricSamplePoolBatchSize is the batch size of the metric sample pool. MetricSamplePoolBatchSize = 32 )
const ( // AutoAdjustStrategyMaxThroughput will adapt the number of pipelines for maximum throughput AutoAdjustStrategyMaxThroughput = "max_throughput" // AutoAdjustStrategyPerOrigin will adapt the number of pipelines for better container isolation AutoAdjustStrategyPerOrigin = "per_origin" )
const ( // ContextSizeInBytes is the size of a context in bytes // We count the size of the context key with the context. ContextSizeInBytes = int(unsafe.Sizeof(Context{})) + int(unsafe.Sizeof(ckey.ContextKey(0))) )
Variables ¶
This section is empty.
Functions ¶
func AddRecurrentSeries ¶
AddRecurrentSeries adds a serie to the series that are sent at every flush
func GetDogStatsDWorkerAndPipelineCount ¶
GetDogStatsDWorkerAndPipelineCount returns how many routines should be spawned for the DogStatsD workers and how many DogStatsD pipeline should be running.
Types ¶
type AgentDemultiplexer ¶
type AgentDemultiplexer struct {
// contains filtered or unexported fields
}
AgentDemultiplexer is the demultiplexer implementation for the main Agent.
func InitAndStartAgentDemultiplexer ¶
func InitAndStartAgentDemultiplexer( log log.Component, sharedForwarder forwarder.Forwarder, orchestratorForwarder orchestratorforwarder.Component, options AgentDemultiplexerOptions, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, compressor compression.Component, tagger tagger.Component, hostname string) *AgentDemultiplexer
InitAndStartAgentDemultiplexer creates a new Demultiplexer and runs what's necessary in goroutines. As of today, only the embedded BufferedAggregator needs a separate goroutine. In the future, goroutines will be started for the event platform forwarder and/or orchestrator forwarder.
func (*AgentDemultiplexer) AddAgentStartupTelemetry ¶
func (d *AgentDemultiplexer) AddAgentStartupTelemetry(agentVersion string)
AddAgentStartupTelemetry adds a startup event and count (in a DSD time sampler) to be sent on the next flush.
func (*AgentDemultiplexer) AggregateCheckSample ¶
func (d *AgentDemultiplexer) AggregateCheckSample(_ metrics.MetricSample)
AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
func (*AgentDemultiplexer) AggregateSample ¶
func (d *AgentDemultiplexer) AggregateSample(sample metrics.MetricSample)
AggregateSample adds a MetricSample in the first DogStatsD time sampler.
func (*AgentDemultiplexer) AggregateSamples ¶
func (d *AgentDemultiplexer) AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch)
AggregateSamples adds a batch of MetricSample into the given DogStatsD time sampler shard. If you have to submit a single metric sample see `AggregateSample`.
func (*AgentDemultiplexer) Aggregator ¶
func (d *AgentDemultiplexer) Aggregator() *BufferedAggregator
Aggregator returns an aggregator that anyone can use. This method exists to keep compatibility with existing code while introducing the Demultiplexer, however, the plan is to remove it anytime soon.
Deprecated.
func (*AgentDemultiplexer) DestroySender ¶
func (d *AgentDemultiplexer) DestroySender(id checkid.ID)
DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator) Should be called when no sender with this ID is used anymore The metrics of this (these) sender(s) that haven't been flushed yet will be lost
func (*AgentDemultiplexer) DumpDogstatsdContexts ¶
func (d *AgentDemultiplexer) DumpDogstatsdContexts(dest io.Writer) error
DumpDogstatsdContexts writes the current state of the context resolver to dest.
This blocks metrics processing, so dest is expected to be reasonably fast and not block for too long.
func (*AgentDemultiplexer) ForceFlushToSerializer ¶
func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool)
ForceFlushToSerializer triggers the execution of a flush from all data of samplers and the BufferedAggregator to the serializer. Safe to call from multiple threads.
func (*AgentDemultiplexer) GetDefaultSender ¶
func (d *AgentDemultiplexer) GetDefaultSender() (sender.Sender, error)
GetDefaultSender returns a default sender.
func (*AgentDemultiplexer) GetDogStatsDPipelinesCount ¶
func (d *AgentDemultiplexer) GetDogStatsDPipelinesCount() int
GetDogStatsDPipelinesCount returns how many sampling pipeline are running for the DogStatsD samples.
func (*AgentDemultiplexer) GetEventPlatformForwarder ¶
func (d *AgentDemultiplexer) GetEventPlatformForwarder() (eventplatform.Forwarder, error)
GetEventPlatformForwarder returns underlying events and service checks channels.
func (*AgentDemultiplexer) GetEventsAndServiceChecksChannels ¶
func (d *AgentDemultiplexer) GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)
GetEventsAndServiceChecksChannels returneds underlying events and service checks channels.
func (*AgentDemultiplexer) GetMetricSamplePool ¶
func (d *AgentDemultiplexer) GetMetricSamplePool() *metrics.MetricSamplePool
GetMetricSamplePool returns a shared resource used in the whole DogStatsD pipeline to re-use metric samples slices: the server is getting a slice and filling it with samples, the rest of the pipeline process them the end of line (the time sampler) is putting back the slice in the pool. Main idea is to reduce the garbage generated by slices allocation.
func (*AgentDemultiplexer) GetSender ¶
GetSender returns a sender.Sender with passed ID, properly registered with the aggregator If no error is returned here, DestroySender must be called with the same ID once the sender is not used anymore
func (*AgentDemultiplexer) Options ¶
func (d *AgentDemultiplexer) Options() AgentDemultiplexerOptions
Options returns options used during the demux initialization.
func (*AgentDemultiplexer) SendSamplesWithoutAggregation ¶
func (d *AgentDemultiplexer) SendSamplesWithoutAggregation(samples metrics.MetricSampleBatch)
SendSamplesWithoutAggregation buffers a bunch of metrics with timestamp. This data will be directly transmitted "as-is" (i.e. no aggregation, no sampling) to the serializer.
func (*AgentDemultiplexer) Serializer ¶
func (d *AgentDemultiplexer) Serializer() serializer.MetricSerializer
Serializer returns a serializer that anyone can use. This method exists to keep compatibility with existing code while introducing the Demultiplexer, however, the plan is to remove it anytime soon.
Deprecated.
func (*AgentDemultiplexer) SetSender ¶
SetSender returns the passed sender with the passed ID. This is largely for testing purposes
func (*AgentDemultiplexer) Stop ¶
func (d *AgentDemultiplexer) Stop(flush bool)
Stop stops the demultiplexer. Resources are released, the instance should not be used after a call to `Stop()`.
type AgentDemultiplexerOptions ¶
type AgentDemultiplexerOptions struct { FlushInterval time.Duration EnableNoAggregationPipeline bool DontStartForwarders bool // unit tests don't need the forwarders to be instanciated UseDogstatsdContextLimiter bool DogstatsdMaxMetricsTags int }
AgentDemultiplexerOptions are the options used to initialize a Demultiplexer.
func DefaultAgentDemultiplexerOptions ¶
func DefaultAgentDemultiplexerOptions() AgentDemultiplexerOptions
DefaultAgentDemultiplexerOptions returns the default options to initialize an AgentDemultiplexer.
type AgentDemultiplexerPrinter ¶
type AgentDemultiplexerPrinter struct {
DemultiplexerWithAggregator
}
AgentDemultiplexerPrinter is used to output series, sketches, service checks and events. Today, this is only used by the `agent check` command.
func (AgentDemultiplexerPrinter) GetMetricsDataForPrint ¶
func (p AgentDemultiplexerPrinter) GetMetricsDataForPrint() map[string]interface{}
GetMetricsDataForPrint returns metrics data for series and sketches for printing purpose.
func (AgentDemultiplexerPrinter) PrintMetrics ¶
func (p AgentDemultiplexerPrinter) PrintMetrics(checkFileOutput *bytes.Buffer, formatTable bool)
PrintMetrics prints metrics aggregator in the Demultiplexer's check samplers (series and sketches), service checks buffer, events buffers.
type BufferedAggregator ¶
type BufferedAggregator struct { // metricSamplePool is a pool of slices of metric sample to avoid allocations. // Used by the Dogstatsd Batcher. MetricSamplePool *metrics.MetricSamplePool // contains filtered or unexported fields }
BufferedAggregator aggregates metrics in buckets for dogstatsd Metrics
func NewBufferedAggregator ¶
func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder eventplatform.Component, haAgent haagent.Component, tagger tagger.Component, hostname string, flushInterval time.Duration) *BufferedAggregator
NewBufferedAggregator instantiates a BufferedAggregator
func (*BufferedAggregator) Flush ¶ added in v0.9.0
func (agg *BufferedAggregator) Flush(trigger flushTrigger)
Flush flushes the data contained in the BufferedAggregator into the Forwarder. This method can be called from multiple routines.
func (*BufferedAggregator) GetBufferedChannels ¶
func (agg *BufferedAggregator) GetBufferedChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)
GetBufferedChannels returns a channel which can be subsequently used to send Event or ServiceCheck.
func (*BufferedAggregator) GetEventPlatformEvents ¶ added in v0.9.0
func (agg *BufferedAggregator) GetEventPlatformEvents() map[string][]*message.Message
GetEventPlatformEvents grabs the event platform events from the queue and clears them. Note that this works only if using the 'noop' event platform forwarder
func (*BufferedAggregator) GetEventPlatformForwarder ¶
func (agg *BufferedAggregator) GetEventPlatformForwarder() (eventplatform.Forwarder, error)
GetEventPlatformForwarder returns a event platform forwarder
func (*BufferedAggregator) GetEvents ¶
func (agg *BufferedAggregator) GetEvents() event.Events
GetEvents grabs the events from the queue and clears it
func (*BufferedAggregator) GetSeriesAndSketches ¶
func (agg *BufferedAggregator) GetSeriesAndSketches(before time.Time) (metrics.Series, metrics.SketchSeriesList)
GetSeriesAndSketches grabs all the series & sketches from the queue and clears the queue The parameter `before` is used as an end interval while retrieving series and sketches from the time sampler. Metrics and sketches before this timestamp should be returned.
func (*BufferedAggregator) GetServiceChecks ¶
func (agg *BufferedAggregator) GetServiceChecks() servicecheck.ServiceChecks
GetServiceChecks grabs all the service checks from the queue and clears the queue
func (*BufferedAggregator) IsInputQueueEmpty ¶
func (agg *BufferedAggregator) IsInputQueueEmpty() bool
IsInputQueueEmpty returns true if every input channel for the aggregator are empty. This is mainly useful for tests and benchmark
type CheckSampler ¶
type CheckSampler struct {
// contains filtered or unexported fields
}
CheckSampler aggregates metrics from one Check instance
type Context ¶
Context holds the elements that form a context, and can be serialized into a context key
func (*Context) DataSizeInBytes ¶
DataSizeInBytes returns the size of the context data in bytes
func (*Context) SizeInBytes ¶
SizeInBytes returns the size of the context in bytes
func (*Context) Tags ¶
func (c *Context) Tags() tagset.CompositeTags
Tags returns tags for the context.
type ContextDebugRepr ¶
type ContextDebugRepr struct { Name string Host string Type string TaggerTags []string MetricTags []string NoIndex bool Source metrics.MetricSource }
ContextDebugRepr is the on-disk representation of a context.
type Demultiplexer ¶
type Demultiplexer interface { // General // -- // Serializer returns the serializer used by the Demultiplexer instance. Serializer() serializer.MetricSerializer // AggregateSample sends a MetricSample to the DogStatsD time sampler. // In sharded implementation, the metric is sent to the first time sampler. AggregateSample(sample metrics.MetricSample) // AggregateSamples sends a batch of MetricSample to the given DogStatsD // time sampler shard. // Implementation not supporting sharding may ignore the `shard` parameter. AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch) // SendSamplesWithoutAggregation pushes metrics in the no-aggregation pipeline: a pipeline // where the metrics are not sampled and sent as-is. // This is the method to use to send metrics with a valid timestamp attached. SendSamplesWithoutAggregation(metrics metrics.MetricSampleBatch) // ForceFlushToSerializer flushes all the aggregated data from the different samplers to // the serialization/forwarding parts. ForceFlushToSerializer(start time.Time, waitForSerializer bool) // GetMetricSamplePool returns a shared resource used in the whole DogStatsD // pipeline to re-use metric samples slices: the server is getting a slice // and filling it with samples, the rest of the pipeline process them the // end of line (the time sampler) is putting back the slice in the pool. // Main idea is to reduce the garbage generated by slices allocation. GetMetricSamplePool() *metrics.MetricSamplePool // Senders API, mainly used by collectors/checks // -- sender.SenderManager }
Demultiplexer is composed of multiple samplers (check and time/dogstatsd) a shared forwarder, the event platform forwarder, orchestrator data buffers and other data that need to be sent to the forwarders. AgentDemultiplexerOptions let you configure which forwarders have to be started.
type DemultiplexerWithAggregator ¶
type DemultiplexerWithAggregator interface { Demultiplexer Aggregator() *BufferedAggregator // AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline. AggregateCheckSample(sample metrics.MetricSample) Options() AgentDemultiplexerOptions GetEventPlatformForwarder() (eventplatform.Forwarder, error) GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck) DumpDogstatsdContexts(io.Writer) error }
DemultiplexerWithAggregator is a Demultiplexer running an Aggregator. This flavor uses a AgentDemultiplexerOptions struct for startup configuration.
type FlushAndSerializeInParallel ¶
FlushAndSerializeInParallel contains options for flushing metrics and serializing in parallel.
func NewFlushAndSerializeInParallel ¶
func NewFlushAndSerializeInParallel(config model.Config) FlushAndSerializeInParallel
NewFlushAndSerializeInParallel creates a new instance of FlushAndSerializeInParallel.
type HostTagProvider ¶
func NewHostTagProvider ¶
func NewHostTagProvider() *HostTagProvider
func (*HostTagProvider) GetHostTags ¶
func (p *HostTagProvider) GetHostTags() []string
type RawSender ¶
type RawSender interface { SendRawMetricSample(sample *metrics.MetricSample) SendRawServiceCheck(sc *servicecheck.ServiceCheck) Event(e event.Event) }
RawSender interface to submit samples to aggregator directly
type SerieSignature ¶
type SerieSignature struct {
// contains filtered or unexported fields
}
SerieSignature holds the elements that allow to know whether two similar `Serie`s from the same bucket can be merged into one. Series must have the same contextKey.
type ServerlessDemultiplexer ¶
type ServerlessDemultiplexer struct {
// contains filtered or unexported fields
}
ServerlessDemultiplexer is a simple demultiplexer used by the serverless flavor of the Agent
func InitAndStartServerlessDemultiplexer ¶
func InitAndStartServerlessDemultiplexer(keysPerDomain map[string][]string, forwarderTimeout time.Duration, tagger tagger.Component) *ServerlessDemultiplexer
InitAndStartServerlessDemultiplexer creates and starts new Demultiplexer for the serverless agent.
func (*ServerlessDemultiplexer) AggregateSample ¶
func (d *ServerlessDemultiplexer) AggregateSample(sample metrics.MetricSample)
AggregateSample send a MetricSample to the TimeSampler.
func (*ServerlessDemultiplexer) AggregateSamples ¶
func (d *ServerlessDemultiplexer) AggregateSamples(_ TimeSamplerID, samples metrics.MetricSampleBatch)
AggregateSamples send a MetricSampleBatch to the TimeSampler. The ServerlessDemultiplexer is not using sharding in its DogStatsD pipeline, the `shard` parameter is ignored. In the Serverless Agent, consider using `AggregateSample` instead.
func (ServerlessDemultiplexer) DestroySender ¶
DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator) Should be called when no sender with this ID is used anymore The metrics of this (these) sender(s) that haven't been flushed yet will be lost
func (*ServerlessDemultiplexer) ForceFlushToSerializer ¶
func (d *ServerlessDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool)
ForceFlushToSerializer flushes all data from the time sampler to the serializer.
func (ServerlessDemultiplexer) GetDefaultSender ¶
GetDefaultSender returns a default sender.
func (*ServerlessDemultiplexer) GetMetricSamplePool ¶
func (d *ServerlessDemultiplexer) GetMetricSamplePool() *metrics.MetricSamplePool
GetMetricSamplePool returns a shared resource used in the whole DogStatsD pipeline to re-use metric samples slices: the server is getting a slice and filling it with samples, the rest of the pipeline process them the end of line (the time sampler) is putting back the slice in the pool. Main idea is to reduce the garbage generated by slices allocation.
func (ServerlessDemultiplexer) GetSender ¶
GetSender returns a sender.Sender with passed ID, properly registered with the aggregator If no error is returned here, DestroySender must be called with the same ID once the sender is not used anymore
func (*ServerlessDemultiplexer) Run ¶
func (d *ServerlessDemultiplexer) Run()
Run runs all demultiplexer parts
func (*ServerlessDemultiplexer) SendSamplesWithoutAggregation ¶
func (d *ServerlessDemultiplexer) SendSamplesWithoutAggregation(_ metrics.MetricSampleBatch)
SendSamplesWithoutAggregation is not supported in the Serverless Agent implementation.
func (*ServerlessDemultiplexer) Serializer ¶
func (d *ServerlessDemultiplexer) Serializer() serializer.MetricSerializer
Serializer returns the shared serializer
func (ServerlessDemultiplexer) SetSender ¶
SetSender returns the passed sender with the passed ID. This is largely for testing purposes
func (*ServerlessDemultiplexer) Stop ¶
func (d *ServerlessDemultiplexer) Stop(flush bool)
Stop stops the wrapped aggregator and the forwarder.
type Stats ¶
type Stats struct { Flushes [32]int64 // circular buffer of recent flushes stat FlushIndex int // last flush position in circular buffer LastFlush int64 // most recent flush stat, provided for convenience Name string // contains filtered or unexported fields }
Stats stores a statistic from several past flushes allowing computations like median or percentiles
type TimeSampler ¶
type TimeSampler struct {
// contains filtered or unexported fields
}
TimeSampler aggregates metrics by buckets of 'interval' seconds
func NewTimeSampler ¶
func NewTimeSampler(id TimeSamplerID, interval int64, cache *tags.Store, tagger tagger.Component, hostname string) *TimeSampler
NewTimeSampler returns a newly initialized TimeSampler
Source Files ¶
- aggregator.go
- check_sampler.go
- context_resolver.go
- context_resolver_debug.go
- demultiplexer.go
- demultiplexer_agent.go
- demultiplexer_agent_printer.go
- demultiplexer_senders.go
- demultiplexer_serverless.go
- host_tag_provider.go
- no_aggregation_stream_worker.go
- no_orchestrator.go
- sender.go
- sketch_map.go
- tagset_telem.go
- time_sampler.go
- time_sampler_worker.go