gostatsd

package module
v0.0.0-...-7a0dfb2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2020 License: MIT Imports: 17 Imported by: 0

README

gostatsd

Godoc Build Status Coverage Status GitHub tag Docker Pulls Docker Stars MicroBadger Layers Size Go Report Card license

An implementation of Etsy's statsd in Go, based on original code from @kisielk.

The project provides both a server called "gostatsd" which works much like Etsy's version, but also provides a library for developing customized servers.

Backends are pluggable and only need to support the backend interface.

Being written in Go, it is able to use all cores which makes it easy to scale up the server based on load.

Building the server

Gostatsd currently targets Go 1.13.6. If you are compiling from source, please ensure you are running this version.

From the gostatsd directory run make build. The binary will be built in build/bin/<arch>/gostatsd.

You will need to install the Golang build dependencies by running make setup in the gostatsd directory. This must be done before the first build, and again if the dependencies change. A protobuf installation is expected to be found in the tools/ directory. Managing this in a platform agnostic way is difficult, but PRs are welcome. Hopefully it will be sufficient to use the generated protobuf files in the majority of cases.

If you are unable to build gostatsd please check your Go version, and try running make setup again before reporting a bug.

Running the server

gostatsd --help gives a complete description of available options and their defaults. You can use make run to run the server with just the stdout backend to display info on screen.

You can also run through docker by running make run-docker which will use docker-compose to run gostatsd with a graphite backend and a grafana dashboard.

While not generally tested on Windows, it should work. Maximum throughput is likely to be better on a linux system, however.

Configuring the server mode

The server can currently run in two modes: standalone and forwarder. It is configured through the top level server-mode configuration setting. The default is standalone.

In standalone mode, raw metrics are processed and aggregated as normal, and aggregated data is submitted to configured backends (see below)

This configuration mode allows the following configuration options:

  • expiry-interval: interval before metrics are expired, see Metric expiry and persistence section. Defaults to 5m. 0 to disable, -1 for immediate.
  • expiry-interval-counter: interval before counters are expired, defaults to the value of expiry-interval.
  • expiry-interval-gauge: interval before gauges are expired, defaults to the value of expiry-interval.
  • expiry-interval-set: interval before sets are expired, defaults to the value of expiry-interval.
  • expiry-interval-timer: interval before timers are expired, defaults to the value of expiry-interval.
  • flush-aligned: whether or not the flush should be aligned. Setting this will flush at an exact time interval. With a 10 second flush-interval, if the service happens to be started at 12:47:13, then flushing will occur at 12:47:20, 12:47:30, etc, rather than 12:47:23, 12:47:33, etc. This removes query time ambiguity in a multi-server environment. Defaults to false.
  • flush-interval: duration for how long to batch metrics before flushing. Should be an order of magnitude less than the upstream flush interval. Defaults to 1s.
  • flush-offset: offset for flush interval when flush alignment is enabled. For example, with an offset of 7s and an interval of 10s, it will flush at 12:47:10+7 = 12:47:17, etc.
  • ignore-host: indicates whether or not an explicit host field will be added to all incoming metrics and events. Defaults to false
  • max-readers: the number of UDP receivers to run. Defaults to 8 or the number of logical cores, whichever is less.
  • max-parsers: the number of workers available to parse metrics. Defaults to the number of logical cores.
  • max-workers: the number of aggregators to process metrics. Defaults to the number of logical cores.
  • max-queue-size: the size of the buffers between parsers and workers. Defaults to 10000, monitored via channel.* metric, with dispatch_aggregator_batch and dispatch_aggregator_map channels.
  • max-concurrent-events: the maximum number of concurrent events to be dispatching. Defaults to 1024, monitored via channel.* metric, with backend_events_sem channel.
  • estimated-tags: provides a hint to the system as to how many tags are expected to be seen on any particular metric, so that memory can be pre-allocated and reducing churn. Defaults to 4. Note: this is only a hint, and it is safe to send more.
  • log-raw-metric: logs raw metrics received from the network. Defaults to false.
  • metrics-addr: the address to listen to metrics on. Defaults to :8125.
  • namespace: a namespace to prefix all metrics with. Defaults to ''.
  • statser-type: configures where internal metrics are sent to. May be internal which sends them to the internal processing pipeline, logging which logs them, null which drops them. Defaults to internal, or null if the NewRelic backend is enabled.
  • percent-threshold: configures the "percentiles" sent on timers. Space separated string. Defaults to 90.
  • heartbeat-enabled: emits a metric named heartbeat every flush interval, tagged by version and commit. Defaults to false.
  • receive-batch-size: the number of datagrams to attempt to read. It is more CPU efficient to read multiple, however it takes extra memory. See [Memory allocation for read buffers] section below for details. Defaults to 50.
  • conn-per-reader: attempts to create a connection for every UDP receiver. Not supported by all OS versions. Defaults to false.
  • bad-lines-per-minute: the number of metrics which fail to parse to log per minute. This is used to prevent a bad client spamming malformed statsd data, while still logging some information to enable troubleshooting. Defaults to 0.
  • hostname: sets the hostname on internal metrics
  • timer-histogram-limit: specifies the maximum number of buckets on histograms. See [Timer histograms] below.

In forwarder mode, raw metrics are collected from a frontend, and instead of being aggregated they are sent via http to another gostatsd server after passing through the processing pipeline (cloud provider, static tags, filtering, etc).

A forwarder server is intended to run on-host and collect metrics, forwarding them on to a central aggregation service. At present the central aggregation service can only scale vertically, but horizontal scaling through clustering is planned.

Aligned flushing is deliberately not supported in forwarder mode, as it would impact the central aggregation server due to all for forwarder nodes transmitting at once, and the expectation that many forwarding flushes will occur per central flush anyway.

Configuring forwarder mode requires a configuration file, with a section named http-transport. The raw version spoken is not configurable per server (see HTTP.md for version guarantees). The configuration section allows the following configuration options:

  • compress: boolean indicating if the payload should be compressed. Defaults to true
  • api-endpoint: configures the endpoint to submit raw metrics to. This setting should be just a base URL, for example https://statsd-aggregator.private, with no path. Required, no default
  • max-requests: maximum number of requests in flight. Defaults to 1000 (which is probably too high)
  • max-request-elapsed-time: duration for the maximum amount of time to try submitting data before giving up. This includes retries. Defaults to 30s (which is probably too high). Setting this value to -1 will disable retries.
  • consolidator-slots: number of slots in the metric consolidator. Memory usage is a function of this. Lower values may cause blocking in the pipeline (back pressure). A UDP only receiver will never use more than the number of configured parsers (--max-parsers option). Defaults to the value of --max-parsers, but may require tuning for HTTP based servers.
  • transport: see TRANSPORT.md for how to configure the transport.
  • custom-headers : a map of strings that are added to each request sent to allow for additional network routing / request inspection. Not required, default is empty. Example: --custom-headers='{"region" : "us-east-1", "service" : "event-producer"}'
  • dynamic-headers : similar with custom-headers, but the header values are extracted from metric tags matching the provided list of string. Tag names are canonicalized by first replacing underscores with hyphens, then converting first letter and each letter after a hyphen to uppercase, the rest are converted to lower case. If a tag is specified in both custom-header and dynamic-header, the vaule set by custom-header takes precedence. Not required, default is empty. Example: --dynamic-headers='["region", "service"]'. This is an experimental feature and it may be removed or changed in future versions.

The following settings from the previous section are also supported:

  • expiry-*
  • ignore-host
  • max-readers
  • max-parsers
  • estimated-tags
  • log-raw-metric
  • metrics-addr
  • namespace
  • statser-type
  • heartbeat-enabled
  • receive-batch-size
  • conn-per-reader
  • bad-lines-per-minute
  • hostname
  • log-raw-metric

Metric expiry and persistence

After a metric has been sent to the server, the server will continue to send the metric to the configured backend until it expires, even if no additional metrics are sent from the client. The value sent depends on the metric type:

  • counter: sends 0 for both rate and count
  • gauge: sends the last received value.
  • set: sends 0
  • timer: sends non-percentile values of 0. Percentile values are not sent at all (see issue #135)

Setting an expiry interval of 0 will persist metrics forever. If metrics are not carefully controlled in such an environment, the server may run out of memory or overload the backend receiving the metrics. Setting a negative expiry interval will result in metrics not being persisted at all.

Each metric type has its own interval, which is configured using the following precedence (from highest to lowest): expiry-interval-<type> > expiry-interval > default (5 minutes).

Configuring HTTP servers

The service supports multiple HTTP servers, with different configurations for different requirements. All http servers are named in the top level http-servers setting. It should be a space separated list of names. Each server is then configured by creating a section in the configuration file named http.<servername>. An http server section has the following configuration options:

  • address: the address to bind to
  • enable-prof: boolean indicating if profiler endpoints should be enabled. Default false
  • enable-expvar: boolean indicating if expvar endpoints should be enabled. Default false
  • enable-ingestion: boolean indicating if ingestion should be enabled. Default false
  • enable-healthcheck: boolean indicating if healthchecks should be enabled. Default true

For example, to configure a server with a localhost only diagnostics endpoint, and a regular ingestion endpoint that can sit behind an ELB, the following configuration could be used:

backends='stdout'
http-servers='receiver profiler'

[http.receiver]
address='0.0.0.0:8080'
enable-ingestion=true

[http.profiler]
address='127.0.0.1:6060'
enable-expvar=true
enable-prof=true

There is no capability to run an https server at this point in time, and no auth (which is why you might want different addresses). You could also put a reverse proxy in front of the service. Documentation for the endpoints can be found under HTTP.md

Configuring backends

Refer to backends for configuration options for the backends.

Cloud providers

Cloud providers are a way to automatically enrich metrics with metadata from a cloud vendor.

Refer to cloud providers for configuration options for the cloud providers.

They should be disabled on the aggregation server when using http forwarding, as the source IP isn't propagated, and that information should be collected on the ingestion server.

Configuring timer sub-metrics

By default, timer metrics will result in aggregated metrics of the form (exact name varies by backend):

<base>.Count
<base>.CountPerSecond
<base>.Mean
<base>.Median
<base>.Lower
<base>.Upper
<base>.StdDev
<base>.Sum
<base>.SumSquares

In addition, the following aggregated metrics will be emitted for each configured percentile:

<base>.Count_XX
<base>.Mean_XX
<base>.Sum_XX
<base>.SumSquares_XX
<base>.Upper_XX - for positive only
<base>.Lower_-XX - for negative only

These can be controlled through the disabled-sub-metrics configuration section:

[disabled-sub-metrics]
# Regular metrics
count=false
count-per-second=false
mean=false
median=false
lower=false
upper=false
stddev=false
sum=false
sum-squares=false

# Percentile metrics
count-pct=false
mean-pct=false
sum-pct=false
sum-squares-pct=false
lower-pct=false
upper-pct=false

By default (for compatibility), they are all false and the metrics will be emitted.

Timer histograms (experimental feature)

Timer histograms inspired by Prometheus implementation can be enabled on a per time series basis using gsd_histogram meta tag with value containing histogram bucketing definition (joined with _) e.g. gsd_histogram:-10_0_2.5_5_10_25_50.

It will:

  • output additional counter time series with name <base>.histogram and le tags specifying histogram buckets.
  • disable default sub-aggregations for timers e.g. <base>.Count, <base>.Mean, <base>.Upper, <base>.Upper_XX, etc.

For timer with gsd_histogram:-10_0_2.5_5_10_25_50 meta tag, following time series will be generated

  • <base>.histogram with tag le:-10
  • <base>.histogram with tag le:0
  • <base>.histogram with tag le:2.5
  • <base>.histogram with tag le:5
  • <base>.histogram with tag le:10
  • <base>.histogram with tag le:25
  • <base>.histogram with tag le:50
  • <base>.histogram with tag le:+Inf

Each time series will contain a total number of timer data points that had a value less or equal le value, e.g. counter <base>.histogram with the tag le:5 will contain the number of all observations that had a value not bigger than 5. Counter <base>.histogram with tag le:+Inf is equivalent to <base>.count and contains the total number.

All original timer tags are preserved and added to all the time series.

To limit cardinality, timer-histogram-limit option can be specified to limit the number of buckets that will be created (default is math.MaxUint32). Value of 0 won't disable the feature, 0 buckets will be emitted which effectively drops metrics with gsd_hostogram tags.

Incorrect meta tag values will be handled in best effort manner, i.e.

  • gsd_histogram:10__20_50 & gsd_histogram:10_incorrect_20_50 will generate le:10, le:20, le:50 and le:+Inf buckets
  • gsd_histogram:incorrect will result in only le:+Inf bucket

This is an experimental feature and it may be removed or changed in future versions.

Load testing

There is a tool under cmd/loader with support for a number of options which can be used to generate synthetic statsd load. There is also another load generation tool under cmd/tester which is deprecated and will be removed in a future release.

Help for the loader tool can be found through --help.

Sending metrics

The server listens for UDP packets on the address given by the --metrics-addr flag, aggregates them, then sends them to the backend servers given by the --backends flag (space separated list of backend names).

Currently supported backends are:

  • cloudwatch
  • datadog
  • graphite
  • influxdb
  • newrelic
  • statsdaemon
  • stdout

The format of each metric is:

<bucket name>:<value>|<type>\n
  • <bucket name> is a string like abc.def.g, just like a graphite bucket name
  • <value> is a string representation of a floating point number
  • <type> is one of c, g, or ms for "counter", "gauge", and "timer" respectively.

A single packet can contain multiple metrics, each ending with a newline.

Optionally, gostatsd supports sample rates (for simple counters, and for timer counters) and tags:

  • <bucket name>:<value>|c|@<sample rate>\n where sample rate is a float between 0 and 1
  • <bucket name>:<value>|c|@<sample rate>|#<tags>\n where tags is a comma separated list of tags
  • <bucket name>:<value>|<type>|#<tags>\n where tags is a comma separated list of tags

Tags format is: simple or key:value.

A simple way to test your installation or send metrics from a script is to use echo and the netcat utility nc:

echo 'abc.def.g:10|c' | nc -w1 -u localhost 8125

Monitoring

Many metrics for the internal processes are emitted. See METRICS.md for details. Go expvar is also exposed if the --profile flag is used.

Memory allocation for read buffers

By default gostatsd will batch read multiple packets to optimise read performance. The amount of memory allocated for these read buffers is determined by the config options:

max-readers * receive-batch-size * 64KB (max packet size)

The metric avg_packets_in_batch can be used to track the average number of datagrams received per batch, and the --receive-batch-size flag used to tune it. There may be some benefit to tuning the --max-readers flag as well.

Using the library

In your source code:

import "github.com/atlassian/gostatsd/pkg/statsd"

Note that this project uses Go modules for dependency management.

Documentation can be found via go doc github.com/atlassian/gostatsd/pkg/statsd or at https://godoc.org/github.com/atlassian/gostatsd/pkg/statsd

Versioning

Gostatsd uses semver versioning for both API and configuration settings, however it does not use it for packages.

This is due to gostatsd being an application first and a library second. Breaking API changes occur regularly, and the overhead of managing this is too burdensome.

Contributors

Pull requests, issues and comments welcome. For pull requests:

  • Add tests for new features and bug fixes
  • Follow the existing style
  • Separate unrelated changes into multiple pull requests

See the existing issues for things to start contributing.

For bigger changes, make sure you start a discussion first by creating an issue and explaining the intended change.

Atlassian requires contributors to sign a Contributor License Agreement, known as a CLA. This serves as a record stating that the contributor is entitled to contribute the code/documentation/translation to the project and is willing to have it used in distributions and derivative works (or is willing to transfer ownership).

Prior to accepting your contributions we ask that you please follow the appropriate link below to digitally sign the CLA. The Corporate CLA is for those who are contributing as a member of an organization and the individual CLA is for those contributing as an individual.

License

Copyright (c) 2012 Kamil Kisiel. Copyright @ 2016-2020 Atlassian Pty Ltd and others.

Licensed under the MIT license. See LICENSE file.

Documentation

Index

Constants

View Source
const (
	// StatserInternal is the name used to indicate the use of the internal statser.
	StatserInternal = "internal"
	// StatserLogging is the name used to indicate the use of the logging statser.
	StatserLogging = "logging"
	// StatserNull is the name used to indicate the use of the null statser.
	StatserNull = "null"
	// StatserTagged is the name used to indicate the use of the tagged statser.
	StatserTagged = "tagged"
)
View Source
const (
	// DefaultMaxCloudRequests is the maximum number of cloud provider requests per second.
	DefaultMaxCloudRequests = 10
	// DefaultBurstCloudRequests is the burst number of cloud provider requests per second.
	DefaultBurstCloudRequests = DefaultMaxCloudRequests + 5
	// DefaultExpiryInterval is the default expiry interval for metrics.
	DefaultExpiryInterval = 5 * time.Minute
	// DefaultFlushInterval is the default metrics flush interval.
	DefaultFlushInterval = 1 * time.Second
	// DefaultFlushOffset is the default metrics flush interval offset when alignment is enabled
	DefaultFlushOffset = 0
	// DefaultFlushOffset is the default for whether metric flushing should be aligned
	DefaultFlushAligned = false
	// DefaultIgnoreHost is the default value for whether the source should be used as the host
	DefaultIgnoreHost = false
	// DefaultMetricsAddr is the default address on which to listen for metrics.
	DefaultMetricsAddr = ":8125"
	// DefaultMaxQueueSize is the default maximum number of buffered metrics per worker.
	DefaultMaxQueueSize = 10000 // arbitrary
	// DefaultMaxConcurrentEvents is the default maximum number of events sent concurrently.
	DefaultMaxConcurrentEvents = 1024 // arbitrary
	// DefaultCacheRefreshPeriod is the default cache refresh period.
	DefaultCacheRefreshPeriod = 1 * time.Minute
	// DefaultCacheEvictAfterIdlePeriod is the default idle cache eviction period.
	DefaultCacheEvictAfterIdlePeriod = 10 * time.Minute
	// DefaultCacheTTL is the default cache TTL for successful lookups.
	DefaultCacheTTL = 30 * time.Minute
	// DefaultCacheNegativeTTL is the default cache TTL for failed lookups (errors or when instance was not found).
	DefaultCacheNegativeTTL = 1 * time.Minute
	// DefaultInternalNamespace is the default internal namespace
	DefaultInternalNamespace = "statsd"
	// DefaultHeartbeatEnabled is the default heartbeat enabled flag
	DefaultHeartbeatEnabled = false
	// DefaultReceiveBatchSize is the number of datagrams to read in each receive batch
	DefaultReceiveBatchSize = 50
	// DefaultEstimatedTags is the estimated number of expected tags on an individual metric submitted externally
	DefaultEstimatedTags = 4
	// DefaultConnPerReader is the default for whether to create a connection per reader
	DefaultConnPerReader = false
	// DefaultStatserType is the default statser type
	DefaultStatserType = StatserInternal
	// DefaultBadLinesPerMinute is the default number of bad lines to allow to log per minute
	DefaultBadLinesPerMinute = 0
	// DefaultServerMode is the default mode to run as, standalone|forwarder
	DefaultServerMode = "standalone"
	// DefaultTimerHistogramLimit default upper limit for timer histograms (effectively unlimited)
	DefaultTimerHistogramLimit = math.MaxUint32
	// DefaultLogRawMetric is the default value for whether to log the metrics received from network
	DefaultLogRawMetric = false
)
View Source
const (
	// ParamBackends is the name of parameter with backends.
	ParamBackends = "backends"
	// ParamCloudProvider is the name of parameter with the name of cloud provider.
	ParamCloudProvider = "cloud-provider"
	// ParamMaxCloudRequests is the name of parameter with maximum number of cloud provider requests per second.
	ParamMaxCloudRequests = "max-cloud-requests"
	// ParamBurstCloudRequests is the name of parameter with burst number of cloud provider requests per second.
	ParamBurstCloudRequests = "burst-cloud-requests"
	// ParamDefaultTags is the name of parameter with the list of additional tags.
	ParamDefaultTags = "default-tags"
	// ParamInternalTags is the name of parameter with the list of tags for internal metrics.
	ParamInternalTags = "internal-tags"
	// ParamInternalNamespace is the name of parameter with the namespace for internal metrics.
	ParamInternalNamespace = "internal-namespace"
	// ParamExpiryInterval is the name of parameter with expiry interval for metrics.
	ParamExpiryInterval = "expiry-interval"
	// ParamExpiryIntervalCounter is the name of parameter which overrides counter expiry interval for metrics.
	ParamExpiryIntervalCounter = "expiry-interval-counter"
	// ParamExpiryIntervalGauge is the name of parameter with overrides gauge expiry interval for metrics.
	ParamExpiryIntervalGauge = "expiry-interval-gauge"
	// ParamExpiryIntervalSet is the name of parameter with overrides set expiry interval for metrics.
	ParamExpiryIntervalSet = "expiry-interval-set"
	// ParamExpiryIntervalTimer is the name of parameter with overrides timer expiry interval for metrics.
	ParamExpiryIntervalTimer = "expiry-interval-timer"
	// ParamFlushInterval is the name of parameter with metrics flush interval.
	ParamFlushInterval = "flush-interval"
	// ParamFlushInterval is the name of parameter with metrics flush interval alignment.
	ParamFlushOffset = "flush-offset"
	// ParamFlushInterval is the name of parameter with metrics flush interval alignment enable state.
	ParamFlushAligned = "flush-aligned"
	// ParamIgnoreHost is the name of parameter indicating if the source should be used as the host
	ParamIgnoreHost = "ignore-host"
	// ParamMaxReaders is the name of parameter with number of socket readers.
	ParamMaxReaders = "max-readers"
	// ParamMaxParsers is the name of the parameter with the number of goroutines that parse datagrams into metrics.
	ParamMaxParsers = "max-parsers"
	// ParamMaxWorkers is the name of parameter with number of goroutines that aggregate metrics.
	ParamMaxWorkers = "max-workers"
	// ParamMaxQueueSize is the name of parameter with maximum number of buffered metrics per worker.
	ParamMaxQueueSize = "max-queue-size"
	// ParamMaxConcurrentEvents is the name of parameter with maximum number of events sent concurrently.
	ParamMaxConcurrentEvents = "max-concurrent-events"
	// ParamEstimatedTags is the name of parameter with estimated number of tags per metric
	ParamEstimatedTags = "estimated-tags"
	// ParamCacheRefreshPeriod is the name of parameter with cache refresh period.
	ParamCacheRefreshPeriod = "cloud-cache-refresh-period"
	// ParamCacheEvictAfterIdlePeriod is the name of parameter with idle cache eviction period.
	ParamCacheEvictAfterIdlePeriod = "cloud-cache-evict-after-idle-period"
	// ParamCacheTTL is the name of parameter with cache TTL for successful lookups.
	ParamCacheTTL = "cloud-cache-ttl"
	// ParamCacheNegativeTTL is the name of parameter with cache TTL for failed lookups (errors or when instance was not found).
	ParamCacheNegativeTTL = "cloud-cache-negative-ttl"
	// ParamMetricsAddr is the name of parameter with address on which to listen for metrics.
	ParamMetricsAddr = "metrics-addr"
	// ParamNamespace is the name of parameter with namespace for all metrics.
	ParamNamespace = "namespace"
	// ParamStatserType is the name of parameter with type of statser.
	ParamStatserType = "statser-type"
	// ParamPercentThreshold is the name of parameter with list of applied percentiles.
	ParamPercentThreshold = "percent-threshold"
	// ParamHeartbeatEnabled is the name of the parameter with the heartbeat enabled
	ParamHeartbeatEnabled = "heartbeat-enabled"
	// ParamReceiveBatchSize is the name of the parameter with the number of datagrams to read in each receive batch
	ParamReceiveBatchSize = "receive-batch-size"
	// ParamConnPerReader is the name of the parameter indicating whether to create a connection per reader
	ParamConnPerReader = "conn-per-reader"
	// ParamBadLineRateLimitPerMinute is the name of the parameter indicating how many bad lines can be logged per minute
	ParamBadLinesPerMinute = "bad-lines-per-minute"
	// ParamServerMode is the name of the parameter used to configure the server mode.
	ParamServerMode = "server-mode"
	// ParamHostname allows hostname overrides
	ParamHostname = "hostname"
	// ParamTimerHistogramLimit upper limit of timer histogram buckets that can be specified
	ParamTimerHistogramLimit = "timer-histogram-limit"
	// ParamLogRawMetric enables custom metrics to be printed to stdout
	ParamLogRawMetric = "log-raw-metric"
)
View Source
const StatsdSourceID = "s"

StatsdSourceID stores the key used to tag metrics with the origin IP address. Should be short to avoid extra hashing and memory overhead for map operations.

Variables

View Source
var DefaultBackends = []string{"graphite"}

DefaultBackends is the list of default backends' names.

View Source
var DefaultInternalTags = Tags{}

DefaultInternalTags is the default list of additional tags on internal metrics

View Source
var DefaultMaxParsers = runtime.NumCPU()

DefaultMaxParsers is the default number of goroutines that parse datagrams into metrics.

View Source
var DefaultMaxReaders = minInt(8, runtime.NumCPU())

DefaultMaxReaders is the default number of socket reading goroutines.

View Source
var DefaultMaxWorkers = runtime.NumCPU()

DefaultMaxWorkers is the default number of goroutines that aggregate metrics.

View Source
var DefaultPercentThreshold = []float64{90}

DefaultPercentThreshold is the default list of applied percentiles.

View Source
var DefaultTags = Tags{}

DefaultTags is the default list of additional tags.

Functions

func AddFlags

func AddFlags(fs *pflag.FlagSet)

AddFlags adds flags to the specified FlagSet.

func Bucket

func Bucket(metricName string, source Source, max int) int

func FormatTagsKey

func FormatTagsKey(source Source, tags Tags) string

func NormalizeTagKey

func NormalizeTagKey(key string) string

NormalizeTagKey cleans up the key of a tag.

Types

type AggregatedMetrics

type AggregatedMetrics interface {
	MetricsName() string
	Delete(string)
	DeleteChild(string, string)
	HasChildren(string) bool
}

AggregatedMetrics is an interface for aggregated metrics.

type AlertType

type AlertType byte

AlertType is the type of alert.

const (
	// AlertInfo is alert level "info".
	AlertInfo AlertType = iota // Must be zero to work as default
	// AlertWarning is alert level "warning".
	AlertWarning
	// AlertError is alert level "error".
	AlertError
	// AlertSuccess is alert level "success".
	AlertSuccess
)

func (AlertType) String

func (a AlertType) String() string

func (AlertType) StringWithEmptyDefault

func (a AlertType) StringWithEmptyDefault() string

StringWithEmptyDefault returns empty string for default alert type.

type Backend

type Backend interface {
	// Name returns the name of the backend.
	Name() string
	// SendMetricsAsync flushes the metrics to the backend, preparing payload synchronously but doing the send asynchronously.
	// Must not read/write MetricMap asynchronously.
	SendMetricsAsync(context.Context, *MetricMap, SendCallback)
	// SendEvent sends event to the backend.
	SendEvent(context.Context, *Event) error
}

Backend represents a backend. If Backend implements the Runner interface, it's started in a new goroutine at creation.

type BackendFactory

type BackendFactory func(config *viper.Viper, logger logrus.FieldLogger, pool *transport.TransportPool) (Backend, error)

BackendFactory is a function that returns a Backend.

type CacheOptions

type CacheOptions struct {
	CacheRefreshPeriod        time.Duration
	CacheEvictAfterIdlePeriod time.Duration
	CacheTTL                  time.Duration
	CacheNegativeTTL          time.Duration
}

CacheOptions holds cache behaviour configuration.

type CachedInstances

type CachedInstances interface {
	// Peek fetches instance information from the cache.
	// The cache is also a negative cache - may be a cache hit but the returned instance is nil.
	Peek(Source) (*Instance, bool)
	// IpSink returns a channel that can be used to supply IP addresses for which information needs
	// to be fetched and cached.
	IpSink() chan<- Source
	// InfoSource returns a channel that can be used to receive information about IPs.
	InfoSource() <-chan InstanceInfo
	// EstimatedTags returns a guess for how many tags to pre-allocate
	EstimatedTags() int
}

type CachedInstancesFactory

type CachedInstancesFactory func(v *viper.Viper, logger logrus.FieldLogger, version string) (CachedInstances, error)

CachedInstancesFactory is a function that returns a CachedInstances instance.

type CloudProvider

type CloudProvider interface {
	// Name returns the name of the cloud provider.
	Name() string
	// Instance returns instances details from the cloud provider.
	// ip -> nil pointer if instance was not found.
	// map is returned even in case of errors because it may contain partial data.
	Instance(context.Context, ...Source) (map[Source]*Instance, error)
	// MaxInstancesBatch returns maximum number of instances that could be requested via the Instance method.
	MaxInstancesBatch() int
	// EstimatedTags returns a guess of how many tags are likely to be added by the CloudProvider
	EstimatedTags() int
}

CloudProvider represents a cloud provider. If CloudProvider implements the Runner interface, it's started in a new goroutine at creation.

type CloudProviderFactory

type CloudProviderFactory func(v *viper.Viper, logger logrus.FieldLogger, version string) (CloudProvider, error)

CloudProviderFactory is a function that returns a CloudProvider.

type Counter

type Counter struct {
	PerSecond float64  // The calculated per second rate
	Value     int64    // The numeric value of the metric
	Timestamp Nanotime // Last time value was updated
	Source    Source   // Source of the metric
	Tags      Tags     // The tags for the counter
}

Counter is used for storing aggregated values for counters.

func NewCounter

func NewCounter(timestamp Nanotime, value int64, source Source, tags Tags) Counter

NewCounter initialises a new counter.

type Counters

type Counters map[string]map[string]Counter

Counters stores a map of counters by tags.

func (Counters) Delete

func (c Counters) Delete(k string)

Delete deletes the metrics from the collection.

func (Counters) DeleteChild

func (c Counters) DeleteChild(k, t string)

DeleteChild deletes the metrics from the collection for the given tags.

func (Counters) Each

func (c Counters) Each(f func(metricName string, tagsKey string, c Counter))

Each iterates over each counter.

func (Counters) HasChildren

func (c Counters) HasChildren(k string) bool

HasChildren returns whether there are more children nested under the key.

func (Counters) MetricsName

func (c Counters) MetricsName() string

MetricsName returns the name of the aggregated metrics collection.

type Event

type Event struct {
	// Title of the event.
	Title string
	// Text of the event. Supports line breaks.
	Text string
	// DateHappened of the event. Unix epoch timestamp. Default is now when not specified in incoming metric.
	DateHappened int64
	// AggregationKey of the event, to group it with some other events.
	AggregationKey string
	// SourceTypeName of the event.
	SourceTypeName string
	// Tags of the event.
	Tags Tags
	// Source of the metric
	Source Source
	// Priority of the event.
	Priority Priority
	// AlertType of the event.
	AlertType AlertType
}

Event represents an event, described at http://docs.datadoghq.com/guides/dogstatsd/

func (*Event) AddTagsSetSource

func (e *Event) AddTagsSetSource(additionalTags Tags, newSource Source)

type Events

type Events []*Event

Events represents a list of events.

type Gauge

type Gauge struct {
	Value     float64  // The numeric value of the metric
	Timestamp Nanotime // Last time value was updated
	Source    Source   // Source of the metric
	Tags      Tags     // The tags for the gauge
}

Gauge is used for storing aggregated values for gauges.

func NewGauge

func NewGauge(timestamp Nanotime, value float64, source Source, tags Tags) Gauge

NewGauge initialises a new gauge.

type Gauges

type Gauges map[string]map[string]Gauge

Gauges stores a map of gauges by tags.

func (Gauges) Delete

func (g Gauges) Delete(k string)

Delete deletes the metrics from the collection.

func (Gauges) DeleteChild

func (g Gauges) DeleteChild(k, t string)

DeleteChild deletes the metrics from the collection for the given tags.

func (Gauges) Each

func (g Gauges) Each(f func(metricName string, tagsKey string, g Gauge))

Each iterates over each gauge.

func (Gauges) HasChildren

func (g Gauges) HasChildren(k string) bool

HasChildren returns whether there are more children nested under the key.

func (Gauges) MetricsName

func (g Gauges) MetricsName() string

MetricsName returns the name of the aggregated metrics collection.

type HistogramThreshold

type HistogramThreshold float64

type Instance

type Instance struct {
	ID   Source
	Tags Tags
}

Instance represents a cloud instance.

type InstanceInfo

type InstanceInfo struct {
	IP Source
	// Instance may be nil if the lookup resulted in an error or instance was not found.
	Instance *Instance
}

type Metric

type Metric struct {
	Name        string  // The name of the metric
	Value       float64 // The numeric value of the metric
	Rate        float64 // The sampling rate of the metric
	Tags        Tags    // The tags for the metric
	TagsKey     string  // The tags rendered as a string to uniquely identify the tagset in a map.  Sort of a cache.  Will be removed at some point.
	StringValue string  // The string value for some metrics e.g. Set
	// Source is the source of the metric, its lifecycle is:
	// - If ignore-host is set, it will be set to the `host` tag if present, otherwise blank.  If ignore-host is not set, it will be set to the sending IP
	// - If the cloud provider is enabled, it will attempt to perform a lookup of this value to find a new value (instance ID, pod ID, etc)
	// - If the tag handler matches a `drop-host` filter, it will be removed
	// - Backends treat it inconsistently
	Source    Source     // Source of the metric.  In order of
	Timestamp Nanotime   // Most accurate known timestamp of this metric
	Type      MetricType // The type of metric
	DoneFunc  func()     // Returns the metric to the pool. May be nil. Call Metric.Done(), not this.
}

Metric represents a single data collected datapoint.

func (*Metric) AddTagsSetSource

func (m *Metric) AddTagsSetSource(additionalTags Tags, newSource Source)

func (*Metric) Bucket

func (m *Metric) Bucket(max int) int

Bucket will pick a distribution bucket for this metric to land in. max is exclusive.

func (*Metric) Done

func (m *Metric) Done()

Done invokes DoneFunc if it's set, returning the metric to the pool.

func (*Metric) FormatTagsKey

func (m *Metric) FormatTagsKey() string

func (*Metric) Reset

func (m *Metric) Reset()

Reset is used to reset a metric to as clean state, called on re-use from the pool.

func (*Metric) String

func (m *Metric) String() string

type MetricConsolidator

type MetricConsolidator struct {
	// contains filtered or unexported fields
}

MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and either send the slice to the provided channel, or make them available synchronously through Drain/Fill. Run can also be started in a long running goroutine to perform flushing, or Flush can be called externally to trigger the channel send.

Used to consolidate metrics such as: - counter[name=x, value=1] - counter[name=x, value=1] - counter[name=x, value=1] - counter[name=x, value=1] - counter[name=x, value=1]

in to: - counter[name=x, value=5]

Similar consolidation is performed for other metric types.

func NewMetricConsolidator

func NewMetricConsolidator(spots int, flushInterval time.Duration, sink chan<- []*MetricMap) *MetricConsolidator

func (*MetricConsolidator) Drain

func (mc *MetricConsolidator) Drain(ctx context.Context) []*MetricMap

Drain will collect all the MetricMaps in the MetricConsolidator and return them. If the context.Context is canceled before everything can be collected, they are returned to the MetricConsolidator and nil is returned.

func (*MetricConsolidator) Fill

func (mc *MetricConsolidator) Fill()

Fill re-populates the MetricConsolidator with empty MetricMaps, it is the pair to Drain and must be called after a successful Drain, must not be called after a failed Drain.

func (*MetricConsolidator) Flush

func (mc *MetricConsolidator) Flush(ctx context.Context)

Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then create new MetricMaps for new metrics to land in. Not thread-safe.

func (*MetricConsolidator) ReceiveMetricMap

func (mc *MetricConsolidator) ReceiveMetricMap(mm *MetricMap)

ReceiveMetricMap will merge a MetricMap in to one of the MetricMaps

func (*MetricConsolidator) ReceiveMetrics

func (mc *MetricConsolidator) ReceiveMetrics(metrics []*Metric)

ReceiveMetrics will push a slice of Metrics in to one of the MetricMaps

func (*MetricConsolidator) Run

func (mc *MetricConsolidator) Run(ctx context.Context)

type MetricMap

type MetricMap struct {
	Counters Counters
	Timers   Timers
	Gauges   Gauges
	Sets     Sets
}

MetricMap is used for storing aggregated or consolidated Metric values. The keys of each map are metric names.

func MergeMaps

func MergeMaps(mms []*MetricMap) *MetricMap

func NewMetricMap

func NewMetricMap() *MetricMap

func (*MetricMap) AsMetrics

func (mm *MetricMap) AsMetrics() []*Metric

AsMetrics will synthesize Metrics from the MetricMap and return them as a slice

func (*MetricMap) IsEmpty

func (mm *MetricMap) IsEmpty() bool

func (*MetricMap) Merge

func (mm *MetricMap) Merge(mmFrom *MetricMap)

func (*MetricMap) Receive

func (mm *MetricMap) Receive(m *Metric)

Receive adds a single Metric to the MetricMap, and releases the Metric.

func (*MetricMap) Split

func (mm *MetricMap) Split(count int) []*MetricMap

Split will split a MetricMap up in to multiple MetricMaps, where each one contains metrics only for its buckets.

func (*MetricMap) SplitByTags

func (mm *MetricMap) SplitByTags(tagNames []string) map[string]*MetricMap

func (*MetricMap) String

func (mm *MetricMap) String() string

type MetricType

type MetricType byte

MetricType is an enumeration of all the possible types of Metric.

const (

	// COUNTER is statsd counter type
	COUNTER MetricType = iota
	// TIMER is statsd timer type
	TIMER
	// GAUGE is statsd gauge type
	GAUGE
	// SET is statsd set type
	SET
)

func (MetricType) String

func (m MetricType) String() string

type MetricsRunner

type MetricsRunner interface {
	RunMetricsContext(context.Context)
}

type Nanotime

type Nanotime int64

Nanotime is the number of nanoseconds elapsed since January 1, 1970 UTC. Get the value with time.Now().UnixNano().

func NanoMax

func NanoMax(t1, t2 Nanotime) Nanotime

func NanoNow

func NanoNow() Nanotime

type Percentile

type Percentile struct {
	Float float64
	Str   string
}

Percentile is used to store the aggregation for a percentile.

func (*Percentile) String

func (p *Percentile) String() string

String returns the string value of a percentile.

type Percentiles

type Percentiles []Percentile

Percentiles represents an array of percentiles.

func (*Percentiles) Set

func (p *Percentiles) Set(s string, f float64)

Set append a percentile aggregation to the percentiles.

func (*Percentiles) String

func (p *Percentiles) String() string

String returns the string value of percentiles.

type PipelineHandler

type PipelineHandler interface {
	RawMetricHandler
	// EstimatedTags returns a guess for how many tags to pre-allocate
	EstimatedTags() int
	// DispatchEvent dispatches event to the next step in a pipeline.
	DispatchEvent(context.Context, *Event)
	// WaitForEvents waits for all event-dispatching goroutines to finish.
	WaitForEvents()
}

PipelineHandler can be used to handle metrics and events, it provides an estimate of how many tags it may add.

type Priority

type Priority byte

Priority of an event.

const (
	// PriNormal is normal priority.
	PriNormal Priority = iota // Must be zero to work as default
	// PriLow is low priority.
	PriLow
)

func (Priority) String

func (p Priority) String() string

func (Priority) StringWithEmptyDefault

func (p Priority) StringWithEmptyDefault() string

StringWithEmptyDefault returns empty string for default priority.

type RawMetricHandler

type RawMetricHandler interface {
	DispatchMetricMap(ctx context.Context, mm *MetricMap)
}

RawMetricHandler is an interface that accepts a Metric for processing. Raw refers to pre-aggregation, not pre-consolidation.

type Runnable

type Runnable func(context.Context)

Runnable is a long running function intended to be launched in a goroutine.

func MaybeAppendRunnable

func MaybeAppendRunnable(runnables []Runnable, maybeRunner interface{}) []Runnable

type Runner

type Runner interface {
	Run(context.Context)
}

Runner exposes a Runnable through an interface

type SendCallback

type SendCallback func([]error)

SendCallback is called by Backend.SendMetricsAsync() to notify about the result of operation. A list of errors is passed to the callback. It may be empty or contain nil values. Every non-nil value is an error that happened while sending metrics.

type Set

type Set struct {
	Values    map[string]struct{}
	Timestamp Nanotime // Last time value was updated
	Source    Source   // Hostname of the source of the metric
	Tags      Tags     // The tags for the set
}

Set is used for storing aggregated values for sets.

func NewSet

func NewSet(timestamp Nanotime, values map[string]struct{}, source Source, tags Tags) Set

NewSet initialises a new set.

type Sets

type Sets map[string]map[string]Set

Sets stores a map of sets by tags.

func (Sets) Delete

func (s Sets) Delete(k string)

Delete deletes the metrics from the collection.

func (Sets) DeleteChild

func (s Sets) DeleteChild(k, t string)

DeleteChild deletes the metrics from the collection for the given tags.

func (Sets) Each

func (s Sets) Each(f func(metricName string, tagsKey string, s Set))

Each iterates over each set.

func (Sets) HasChildren

func (s Sets) HasChildren(k string) bool

HasChildren returns whether there are more children nested under the key.

func (Sets) MetricsName

func (s Sets) MetricsName() string

MetricsName returns the name of the aggregated metrics collection.

type Source

type Source string

Source is a v4/v6 IP address. We do not use net.IP because it will involve conversion to string and back several times.

const UnknownSource Source = ""

UnknownSource is an IP of an unknown source.

type StringMatch

type StringMatch struct {
	// contains filtered or unexported fields
}

func NewStringMatch

func NewStringMatch(s string) StringMatch

func (StringMatch) Match

func (sm StringMatch) Match(s string) bool

Match indicates if the provided string matches the criteria for this StringMatch

type StringMatchList

type StringMatchList []StringMatch

func (StringMatchList) MatchAny

func (sml StringMatchList) MatchAny(s string) bool

MatchAny indicates if s matches anything in the list, returns false if the list is empty

func (StringMatchList) MatchAnyMultiple

func (sml StringMatchList) MatchAnyMultiple(tests []string) bool

MatchMultipleAny indicates if any string passed matches anything in the list, returns false if sml or tests is empty

type Tags

type Tags []string

Tags represents a list of tags. Tags can be of two forms: 1. "key:value". "value" may contain column(s) as well. 2. "tag". No column. Each tag's key and/or value may contain characters invalid for a particular backend. Backends are expected to handle them appropriately. Different backends may have different sets of valid characters so it is undesirable to have restrictions on the input side.

func (Tags) Concat

func (tags Tags) Concat(additional Tags) Tags

Concat returns a new Tags with the additional ones added

func (Tags) Copy

func (tags Tags) Copy() Tags

Copy returns a copy of the Tags

func (Tags) SortedString

func (tags Tags) SortedString() string

SortedString sorts the tags alphabetically and returns a comma-separated string representation of the tags. Note that this method may mutate the original object.

func (Tags) String

func (tags Tags) String() string

String returns a comma-separated string representation of the tags.

type Timer

type Timer struct {
	Count        int         // The number of timers in the series
	SampledCount float64     // Number of timings received, divided by sampling rate
	PerSecond    float64     // The calculated per second rate
	Mean         float64     // The mean time of the series
	Median       float64     // The median time of the series
	Min          float64     // The minimum time of the series
	Max          float64     // The maximum time of the series
	StdDev       float64     // The standard deviation for the series
	Sum          float64     // The sum for the series
	SumSquares   float64     // The sum squares for the series
	Values       []float64   // The numeric value of the metric
	Percentiles  Percentiles // The percentile aggregations of the metric
	Timestamp    Nanotime    // Last time value was updated
	Source       Source      // Hostname of the source of the metric
	Tags         Tags        // The tags for the timer

	// Map bounds to count of measures seen in that bucket.
	// This map only non-empty if the metric specifies histogram aggregation in its tags.
	Histogram map[HistogramThreshold]int
}

Timer is used for storing aggregated values for timers.

func NewTimer

func NewTimer(timestamp Nanotime, values []float64, source Source, tags Tags) Timer

NewTimer initialises a new timer.

func NewTimerValues

func NewTimerValues(values []float64) Timer

NewTimerValues initialises a new timer only from Values array

type TimerSubtypes

type TimerSubtypes struct {
	Lower          bool
	LowerPct       bool // pct
	Upper          bool
	UpperPct       bool // pct
	Count          bool
	CountPct       bool // pct
	CountPerSecond bool
	Mean           bool
	MeanPct        bool // pct
	Median         bool
	StdDev         bool
	Sum            bool
	SumPct         bool // pct
	SumSquares     bool
	SumSquaresPct  bool // pct
}

func DisabledSubMetrics

func DisabledSubMetrics(viper *viper.Viper) TimerSubtypes

type Timers

type Timers map[string]map[string]Timer

Timers stores a map of timers by tags.

func (Timers) Delete

func (t Timers) Delete(k string)

Delete deletes the metrics from the collection.

func (Timers) DeleteChild

func (t Timers) DeleteChild(k, tags string)

DeleteChild deletes the metrics from the collection for the given tags.

func (Timers) Each

func (t Timers) Each(f func(metricName string, tagsKey string, t Timer))

Each iterates over each timer.

func (Timers) HasChildren

func (t Timers) HasChildren(k string) bool

HasChildren returns whether there are more children nested under the key.

func (Timers) MetricsName

func (t Timers) MetricsName() string

MetricsName returns the name of the aggregated metrics collection.

type Wait

type Wait func()

Directories

Path Synopsis
cmd
internal
fixtures
This is for test fixtures only.
This is for test fixtures only.
pkg
statsd
Package statsd implements functionality for creating servers compatible with the statsd protocol.
Package statsd implements functionality for creating servers compatible with the statsd protocol.
web

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL