veneur

package module
v9.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2018 License: MIT Imports: 75 Imported by: 0

README

Build Status GoDoc

Table of Contents

What Is Veneur?

Veneur (/vɛnˈʊr/, rhymes with “assure”) is a distributed, fault-tolerant pipeline for runtime data. It provides a server implementation of the DogStatsD protocol or SSF for aggregating metrics and sending them to downstream storage to one or more supported sinks. It can also act as a global aggregator for histograms, sets and counters.

More generically, Veneur is a convenient sink for various observability primitives with lots of outputs!

Use Case

Once you cross a threshold into dozens, hundreds or (gasp!) thousands of machines emitting metric data for an application, you've moved into that world where data about individual hosts is uninteresting except in aggregate form. Instead of paying to store tons of data points and then aggregating them later at read-time, Veneur can calculate global aggregates, like percentiles and forward those along to your time series database, etc.

Veneur is also a StatsD or DogStatsD protocol transport, fowarding the locally collected metrics over more reliable TCP implementations.

Here are some examples of why Stripe and other companies are using Veneur today:

  • reducing cost by pre-aggregating metrics such as timers into percentiles
  • creating a vendor-agnostic metric collection pipeline
  • consolidating disparate observability data (from trace spans to metrics, and more!)
  • improving efficiency over other metric aggregator implementations
  • improving reliability by building a more resilient forwarding system over single points of failure

See Also

We wanted percentiles, histograms and sets to be global. We wanted to unify our observability clients, be vendor agnostic and build automatic features like SLI measurement. Veneur helps us do all this and more!

Status

Veneur is currently handling all metrics for Stripe and is considered production ready. It is under active development and maintenance! Starting with v1.6, Veneur operates on a six-week release cycle, and all releases are tagged in git. If you'd like to contribute, see CONTRIBUTING!

Building Veneur requires Go 1.9 or later.

Features

Vendor And Backend Agnostic

Veneur has many sinks such that your data can be sent one or more vendors, TSDBs or tracing stores!

Modern Metrics Format (Or Others!)

Unify metrics, spans and logs via the Sensor Sensibility Format. Also works with DogStatsD, StatsD and Prometheus.

Global Aggregation

If configured to do so, Veneur can selectively aggregate global metrics to be cumulative across all instances that report to a central Veneur, allowing global percentile calculation, global counters or global sets.

For example, say you emit a timer foo.bar.call_duration_ms from 20 hosts that are configured to forward to a central Veneur. You'll see the following:

  • Metrics that have been "globalized"
    • foo.bar.call_duration_ms.50percentile: the p50 across all hosts, by tag
    • foo.bar.call_duration_ms.90percentile: the p90 across all hosts, by tag
    • foo.bar.call_duration_ms.95percentile: the p95 across all hosts, by tag
    • foo.bar.call_duration_ms.99percentile: the p99 across all hosts, by tag
  • Metrics that remain host-local
    • foo.bar.call_duration_ms.avg: by-host tagged average
    • foo.bar.call_duration_ms.count: by-host tagged count which (when summed) shows the total count of times this metric was emitted
    • foo.bar.call_duration_ms.max: by-host tagged maximum value
    • foo.bar.call_duration_ms.median: by-host tagged median value
    • foo.bar.call_duration_ms.min: by-host tagged minimum value
    • foo.bar.call_duration_ms.sum: by-host tagged sum value representing the total time

Clients can choose to override this behavior by including the tag veneurlocalonly.

Approximate Histograms

Because Veneur is built to handle lots and lots of data, it uses approximate histograms. We have our own implementation of Dunning's t-digest, which has bounded memory consumption and reduced error at extreme quantiles. Metrics are consistently routed to the same worker to distribute load and to be added to the same histogram.

Datadog's DogStatsD — and StatsD — uses an exact histogram which retains all samples and is reset every flush period. This means that there is a loss of precision when using Veneur, but the resulting percentile values are meant to be more representative of a global view.

Approximate Sets

Veneur uses HyperLogLogs for approximate unique sets. These are a very efficient unique counter with fixed memory consumption.

Global Counters

Via an optional magic tag Veneur will forward counters to a global host for accumulation. This feature was primarily developed to control tag cardinality. Some counters are valuable but do not require per-host tagging.

Concepts

  • Global metrics are those that benefit from being aggregated for chunks — or all — of your infrastructure. These are histograms (including the percentiles generated by timers) and sets.
  • Metrics that are sent to another Veneur instance for aggregation are said to be "forwarded". This terminology helps to decipher configuration and metric options below.
  • Flushed, in Veneur, means metrics or spans processed by a sink.

By Metric Type Behavior

To clarify how each metric type behaves in Veneur, please use the following:

  • Counters: Locally accrued, flushed to sinks (see magic tags for global version)
  • Gauges: Locally accrued, flushed to sinks (see magic tags for global version)
  • Histograms: Locally accrued, count, max and min flushed to sinks, percentiles forwarded to forward_address for global aggregation when set.
  • Timers: Locally accrued, count, max and min flushed to sinks, percentiles forwarded to forward_address for global aggregation when set.
  • Sets: Locally accrued, forwarded to forward_address for sinks aggregation when set.

Expiration

Veneur expires all metrics on each flush. If a metric is no longer being sent (or is sent sparsely) Veneur will not send it as zeros! This was chosen because the combination of the approximation's features and the additional hysteresis imposed by retaining these approximations over time was deemed more complex than desirable.

Other Notes

  • Veneur aligns its flush timing with the local clock. For the default interval of 10s Veneur will generally emit metrics at 00, 10, 20, 30, … seconds after the minute.
  • Veneur will delay it's first metric emission to align the clock as stated above. This may result in a brief quiet period on a restart at worst < interval seconds long.

Usage

veneur -f example.yaml

See example.yaml for a sample config. Be sure to set the appropriate *_api_key!

Setup

Here we'll document some explanations of setup choices you may make when using Veneur.

Clients

Veneur is capable of ingesting:

  • DogStatsD including events and service checks
  • SSF
  • StatsD as a subset of DogStatsD, but this may cause trouble depending on where you store your metrics.

To use clients with Veneur you need only configure your client of choice to the proper host and port combination. This port should match one of:

  • statsd_listen_addresses for UDP- and TCP-based clients
  • ssf_listen_addresses for SSF-based clients using UDP or UNIX domain sockets.

Einhorn Usage

When you upgrade Veneur (deploy, stop, start with new binary) there will be a brief period where Veneur will not be able to handle HTTP requests. At Stripe we use Einhorn as a shared socket manager to bridge the gap until Veneur is ready to handle HTTP requests again.

You'll need to consult Einhorn's documentation for installation, setup and usage. But once you've done that you can tell Veneur to use Einhorn by setting http_address to einhorn@0. This informs goji/bind to use its Einhorn handling code to bind to the file descriptor for HTTP.

Forwarding

Veneur instances can be configured to forward their global metrics to another Veneur instance. You can use this feature to get the best of both worlds: metrics that benefit from global aggregation can be passed up to a single global Veneur, but other metrics can be published locally with host-scoped information. Note: Forwarding adds an additional delay to metric availability corresponding to the value of the interval configuration option, as the local veneur will flush it to its configured upstream, which will then flush any recieved metrics when its interval expires.

If a local instance receives a histogram or set, it will publish the local parts of that metric (the count, min and max) directly to sinks, but instead of publishing percentiles, it will package the entire histogram and send it to the global instance. The global instance will aggregate all the histograms together and publish their percentiles to sinks.

Note that the global instance can also receive metrics over UDP. It will publish a count, min and max for the samples that were sent directly to it, but not counting any samples from other Veneur instances (this ensures that things don't get double-counted). You can even chain multiple levels of forwarding together if you want. This might be useful if, for example, your global Veneur is under too much load. The root of the tree will be the Veneur instance that has an empty forward_address. (Do not tell a Veneur instance to forward metrics to itself. We don't support that and it doesn't really make sense in the first place.)

With respect to the tags configuration option, the tags that will be added are those of the Veneur that actually publishes to a sink. If a local instance forwards its histograms and sets to a global instance, the local instance's tags will not be attached to the forwarded structures. It will still use its own tags for the other metrics it publishes, but the percentiles will get extra tags only from the global instance.

Proxy

To improve availability, you can leverage veneur-proxy in conjunction with Consul service discovery.

The proxy can be configured to query the Consul API for instances of a service using consul_forward_service_name. Each healthy instance is then entered in to a hash ring. When choosing which host to forward to, Veneur will use a combination of metric name and tags to consistently choose the same host for forwarding.

See more documentation for Proxy Veneur.

Static Configuration

For static configuration you need one Veneur, which we'll call the global instance, and one or more other Veneurs, which we'll call local instances. The local instances should have their forward_address configured to the global instance's http_address. The global instance should have an empty forward_address (ie just don't set it). You can then report metrics to any Veneur's statsd_listen_addresses as usual.

Magic Tag

If you want a metric to be strictly host-local, you can tell Veneur not to forward it by including a veneurlocalonly tag in the metric packet, eg foo:1|h|#veneurlocalonly. This tag will not actually appear in storage; Veneur removes it.

Global Counters And Gauges

Relatedly, if you want to forward a counter or gauge to the global Veneur instance to reduce tag cardinality, you can tell Veneur to flush it to the global instance by including a veneurglobalonly tag in the metric's packet. This veneurglobalonly tag is stripped and will not be passed on to sinks.

Note: For global counters to report correctly, the local and global Veneur instances should be configured to have the same flush interval.

Note: Global gauges are "random write wins" since they are merged in a non-deterministic order at the global Veneur.

Routing metrics

Veneur supports specifying that metrics should only be routed to a specific metric sink, with the veneursinkonly:<sink_name> tag. The <sink_name> value can be any configured metric sink. Currently, that's datadog, kafka, signalfx. It's possible to specify multiple sink destination tags on a metric, which will cause the metric to be routed to each sink specified.

Configuration

Veneur expects to have a config file supplied via -f PATH. The included example.yaml explains all the options!

The config file can be validated using a pair of flags:

  • -validate-config: checks that the config file specified via -f is valid YAML, and has correct datatypes for all fields.
  • -validate-config-strict: checks the above, and also that there are no unknown fields.

Configuration via Environment Variables

Veneur and veneur-proxy each allow configuration via environment variables using envconfig. Options provided via environment variables take precedent over those in config. This allows stuff like:

VENEUR_DEBUG=true veneur -f someconfig.yml

Note: The environment variables used for configuration map to the field names in config.go, capitalized, with the prefix VENEUR_. For example, the environment variable equivalent of datadog_api_hostname is VENEUR_DATADOGAPIHOSTNAME.

You may specify configurations that are arrays by separating them with a comma, for example VENEUR_AGGREGATES="min,max"

Monitoring

Here are the important things to monitor with Veneur:

At Local Node

When running as a local instance, you will be primarily concerned with the following metrics:

  • veneur.flush*.error_total as a count of errors when flushing metrics. This should rarely happen. Occasional errors are fine, but sustained is bad.
Forwarding

If you are forwarding metrics to central Veneur, you'll want to monitor these:

  • veneur.forward.error_total and the cause tag. This should pretty much never happen and definitely not be sustained.
  • veneur.forward.duration_ns and veneur.forward.duration_ns.count. These metrics track the per-host time spent performing a forward. The time should be minimal!

At Global Node

When forwarding you'll want to also monitor the global nodes you're using for aggregation:

  • veneur.import.request_error_total and the cause tag. This should pretty much never happen and definitely not be sustained.
  • veneur.import.response_duration_ns and veneur.import.response_duration_ns.count to monitor duration and number of received forwards. This should not fail and not take very long. How long it takes will depend on how many metrics you're forwarding.
  • And the same veneur.flush.* metrics from the "At Local Node" section.

Metrics

Veneur will emit metrics to the stats_address configured above in DogStatsD form. Those metrics are:

  • veneur.sink.metric_flush_total_duration_ns.* - Duration of flushes per-sink, tagged by sink.
  • veneur.packet.error_total - Number of packets that Veneur could not parse due to some sort of formatting error by the client. Tagged by packet_type and reason.
  • veneur.forward.post_metrics_total - Indicates how many metrics are being forwarded in a given POST request. A "metric", in this context, refers to a unique combination of name, tags and metric type.
  • veneur.*.content_length_bytes.* - The number of bytes in a single POST body. Remember that Veneur POSTs large sets of metrics in multiple separate bodies in parallel. Uses a histogram, so there are multiple metrics generated depending on your local DogStatsD config.
  • veneur.forward.duration_ns - Same as flush.duration_ns, but for forwarding requests.
  • veneur.flush.error_total - Number of errors received POSTing via sinks.
  • veneur.forward.error_total - Number of errors received POSTing to an upstream Veneur. See also import.request_error_total below.
  • veneur.gc.number - Number of completed GC cycles.
  • veneur.gc.pause_total_ns - Total seconds of STW GC since the program started.
  • veneur.mem.heap_alloc_bytes - Total number of reachable and unreachable but uncollected heap objects in bytes.
  • veneur.worker.metrics_processed_total - Total number of metric packets processed between flushes by workers, tagged by worker. This helps you find hot spots where a single worker is handling a lot of metrics. The sum across all workers should be approximately proportional to the number of packets received.
  • veneur.worker.metrics_flushed_total - Total number of metrics flushed at each flush time, tagged by metric_type. A "metric", in this context, refers to a unique combination of name, tags and metric type. You can use this metric to detect when your clients are introducing new instrumentation, or when you acquire new clients.
  • veneur.worker.metrics_imported_total - Total number of metrics received via the importing endpoint. A "metric", in this context, refers to a unique combination of name, tags, type and originating host. This metric indicates how much of a Veneur instance's load is coming from imports.
  • veneur.import.response_duration_ns - Time spent responding to import HTTP requests. This metric is broken into part tags for request (time spent blocking the client) and merge (time spent sending metrics to workers).
  • veneur.import.request_error_total - A counter for the number of import requests that have errored out. You can use this for monitoring and alerting when imports fail.

Error Handling

In addition to logging, Veneur will dutifully send any errors it generates to a Sentry instance. This will occur if you set the sentry_dsn configuration option. Not setting the option will disable Sentry reporting.

Performance

Processing packets quickly is the name of the game.

Benchmarks

The common use case for Veneur is as an aggregator and host-local replacement for DogStatsD, therefore processing UDP fast is no longer the priority. That said, we were processing > 60k packets/second in production before shifting to the current local aggregation method. This outperformed both the Datadog-provided DogStatsD and StatsD in our infrastructure.

SO_REUSEPORT

As other implementations have observed, there's a limit to how many UDP packets a single kernel thread can consume before it starts to fall over. Veneur supports the SO_REUSEPORT socket option on Linux, allowing multiple threads to share the UDP socket with kernel-space balancing between them. If you've tried throwing more cores at Veneur and it's just not going fast enough, this feature can probably help by allowing more of those cores to work on the socket (which is Veneur's hottest code path by far). Note that this is only supported on Linux (right now). We have not added support for other platforms, like darwin and BSDs.

TCP connections

Veneur supports reading the statsd protocol from TCP connections. This is mostly to support TLS encryption and authentication, but might be useful on its own. Since TCP is a continuous stream of bytes, this requires each stat to be terminated by a new line character ('\n'). Most statsd clients only add new lines between stats within a single UDP packet, and omit the final trailing new line. This means you will likely need to modify your client to use this feature.

TLS encryption and authentication

If you specify the tls_key and tls_certificate options, Veneur will only accept TLS connections on its TCP port. This allows the metrics sent to Veneur to be encrypted.

If you specify the tls_authority_certificate option, Veneur will require clients to present a client certificate, signed by this authority. This ensures that only authenticated clients can connect.

You can generate your own set of keys using openssl:

# Generate the authority key and certificate (2048-bit RSA signed using SHA-256)
openssl genrsa -out cakey.pem 2048
openssl req -new -x509 -sha256 -key cakey.pem -out cacert.pem -days 1095 -subj "/O=Example Inc/CN=Example Certificate Authority"

# Generate the server key and certificate, signed by the authority
openssl genrsa -out serverkey.pem 2048
openssl req -new -sha256 -key serverkey.pem -out serverkey.csr -days 1095 -subj "/O=Example Inc/CN=veneur.example.com"
openssl x509 -sha256 -req -in serverkey.csr -CA cacert.pem -CAkey cakey.pem -CAcreateserial -out servercert.pem -days 1095

# Generate a client key and certificate, signed by the authority
openssl genrsa -out clientkey.pem 2048
openssl req -new -sha256 -key clientkey.pem -out clientkey.csr -days 1095 -subj "/O=Example Inc/CN=Veneur client key"
openssl x509 -req -in clientkey.csr -CA cacert.pem -CAkey cakey.pem -CAcreateserial -out clientcert.pem -days 1095

Set statsd_listen_addresses, tls_key, tls_certificate, and tls_authority_certificate:

statsd_listen_addresses:
  - "tcp://localhost:8129"
tls_certificate: |
  -----BEGIN CERTIFICATE-----
  MIIC8TCCAdkCCQDc2V7P5nCDLjANBgkqhkiG9w0BAQsFADBAMRUwEwYDVQQKEwxC
  ...
  -----END CERTIFICATE-----
tls_key: |
    -----BEGIN RSA PRIVATE KEY-----
  MIIEpAIBAAKCAQEA7Sntp4BpEYGzgwQR8byGK99YOIV2z88HHtPDwdvSP0j5ZKdg
  ...
  -----END RSA PRIVATE KEY-----
tls_authority_certificate: |
  -----BEGIN CERTIFICATE-----
  ...
  -----END CERTIFICATE-----
Performance implications of TLS

Establishing a TLS connection is fairly expensive, so you should reuse connections as much as possible. RSA keys are also far more expensive than using ECDH keys. Using localhost on a machine with one CPU, Veneur was able to establish ~700 connections/second using ECDH prime256v1 keys, but only ~110 connections/second using RSA 2048-bit keys. According to the Go profiling for a Veneur instance using TLS with RSA keys, approximately 25% of the CPU time was in the TLS handshake, and 13% was decrypting data.

Name

The veneur is a person acting as superintendent of the chase and especially of hounds in French medieval venery and being an important officer of the royal household. In other words, it is the master of dogs. :)

Documentation

Index

Constants

View Source
const REDACTED = "REDACTED"

REDACTED is used to replace values that we don't want to leak into loglines (e.g., credentials)

Variables

View Source
var BUILD_DATE = defaultLinkValue
View Source
var VERSION = defaultLinkValue

VERSION stores the current veneur version. It must be a var so it can be set at link time.

Functions

func CalculateTickDelay added in v1.8.0

func CalculateTickDelay(interval time.Duration, t time.Time) time.Duration

CalculateTickDelay takes the provided time, `Truncate`s it a rounded-down multiple of `interval`, then adds `interval` back to find the "next" tick.

func ConsumePanic added in v1.5.0

func ConsumePanic(sentry *raven.Client, cl *trace.Client, hostname string, err interface{})

ConsumePanic is intended to be called inside a deferred function when recovering from a panic. It accepts the value of recover() as its only argument, and reports the panic to Sentry, prints the stack, and then repanics (to ensure your program terminates)

func SetLogger

func SetLogger(logger *logrus.Logger)

SetLogger sets the default logger in veneur to the passed value.

func StartSSF added in v1.7.0

func StartSSF(s *Server, a net.Addr, tracePool *sync.Pool) net.Addr

StartSSF starts listening for SSF on an address a, and returns the concrete address that the server is listening on.

func StartStatsd added in v1.7.0

func StartStatsd(s *Server, a net.Addr, packetPool *sync.Pool) net.Addr

StartStatsd spawns a goroutine that listens for metrics in statsd format on the address a, and returns the concrete listening address. As this is a setup routine, if any error occurs, it panics.

Types

type Config

type Config struct {
	Aggregates                    []string  `yaml:"aggregates"`
	AwsAccessKeyID                string    `yaml:"aws_access_key_id"`
	AwsRegion                     string    `yaml:"aws_region"`
	AwsS3Bucket                   string    `yaml:"aws_s3_bucket"`
	AwsSecretAccessKey            string    `yaml:"aws_secret_access_key"`
	BlockProfileRate              int       `yaml:"block_profile_rate"`
	DatadogAPIHostname            string    `yaml:"datadog_api_hostname"`
	DatadogAPIKey                 string    `yaml:"datadog_api_key"`
	DatadogFlushMaxPerBody        int       `yaml:"datadog_flush_max_per_body"`
	DatadogSpanBufferSize         int       `yaml:"datadog_span_buffer_size"`
	DatadogTraceAPIAddress        string    `yaml:"datadog_trace_api_address"`
	Debug                         bool      `yaml:"debug"`
	DebugFlushedMetrics           bool      `yaml:"debug_flushed_metrics"`
	DebugIngestedSpans            bool      `yaml:"debug_ingested_spans"`
	EnableProfiling               bool      `yaml:"enable_profiling"`
	FalconerAddress               string    `yaml:"falconer_address"`
	FlushFile                     string    `yaml:"flush_file"`
	FlushMaxPerBody               int       `yaml:"flush_max_per_body"`
	ForwardAddress                string    `yaml:"forward_address"`
	ForwardUseGrpc                bool      `yaml:"forward_use_grpc"`
	GrpcAddress                   string    `yaml:"grpc_address"`
	Hostname                      string    `yaml:"hostname"`
	HTTPAddress                   string    `yaml:"http_address"`
	IndicatorSpanTimerName        string    `yaml:"indicator_span_timer_name"`
	Interval                      string    `yaml:"interval"`
	KafkaBroker                   string    `yaml:"kafka_broker"`
	KafkaCheckTopic               string    `yaml:"kafka_check_topic"`
	KafkaEventTopic               string    `yaml:"kafka_event_topic"`
	KafkaMetricBufferBytes        int       `yaml:"kafka_metric_buffer_bytes"`
	KafkaMetricBufferFrequency    string    `yaml:"kafka_metric_buffer_frequency"`
	KafkaMetricBufferMessages     int       `yaml:"kafka_metric_buffer_messages"`
	KafkaMetricRequireAcks        string    `yaml:"kafka_metric_require_acks"`
	KafkaMetricTopic              string    `yaml:"kafka_metric_topic"`
	KafkaPartitioner              string    `yaml:"kafka_partitioner"`
	KafkaRetryMax                 int       `yaml:"kafka_retry_max"`
	KafkaSpanBufferBytes          int       `yaml:"kafka_span_buffer_bytes"`
	KafkaSpanBufferFrequency      string    `yaml:"kafka_span_buffer_frequency"`
	KafkaSpanBufferMesages        int       `yaml:"kafka_span_buffer_mesages"`
	KafkaSpanRequireAcks          string    `yaml:"kafka_span_require_acks"`
	KafkaSpanSampleRatePercent    int       `yaml:"kafka_span_sample_rate_percent"`
	KafkaSpanSampleTag            string    `yaml:"kafka_span_sample_tag"`
	KafkaSpanSerializationFormat  string    `yaml:"kafka_span_serialization_format"`
	KafkaSpanTopic                string    `yaml:"kafka_span_topic"`
	LightstepAccessToken          string    `yaml:"lightstep_access_token"`
	LightstepCollectorHost        string    `yaml:"lightstep_collector_host"`
	LightstepMaximumSpans         int       `yaml:"lightstep_maximum_spans"`
	LightstepNumClients           int       `yaml:"lightstep_num_clients"`
	LightstepReconnectPeriod      string    `yaml:"lightstep_reconnect_period"`
	MetricMaxLength               int       `yaml:"metric_max_length"`
	MutexProfileFraction          int       `yaml:"mutex_profile_fraction"`
	NumReaders                    int       `yaml:"num_readers"`
	NumSpanWorkers                int       `yaml:"num_span_workers"`
	NumWorkers                    int       `yaml:"num_workers"`
	OmitEmptyHostname             bool      `yaml:"omit_empty_hostname"`
	Percentiles                   []float64 `yaml:"percentiles"`
	ReadBufferSizeBytes           int       `yaml:"read_buffer_size_bytes"`
	SentryDsn                     string    `yaml:"sentry_dsn"`
	SignalfxAPIKey                string    `yaml:"signalfx_api_key"`
	SignalfxEndpointBase          string    `yaml:"signalfx_endpoint_base"`
	SignalfxHostnameTag           string    `yaml:"signalfx_hostname_tag"`
	SignalfxMetricNamePrefixDrops []string  `yaml:"signalfx_metric_name_prefix_drops"`
	SignalfxMetricTagPrefixDrops  []string  `yaml:"signalfx_metric_tag_prefix_drops"`
	SignalfxPerTagAPIKeys         []struct {
		APIKey string `yaml:"api_key"`
		Name   string `yaml:"name"`
	} `yaml:"signalfx_per_tag_api_keys"`
	SignalfxVaryKeyBy                 string   `yaml:"signalfx_vary_key_by"`
	SpanChannelCapacity               int      `yaml:"span_channel_capacity"`
	SplunkHecAddress                  string   `yaml:"splunk_hec_address"`
	SplunkHecBatchSize                int      `yaml:"splunk_hec_batch_size"`
	SplunkHecConnectionLifetimeJitter string   `yaml:"splunk_hec_connection_lifetime_jitter"`
	SplunkHecIngestTimeout            string   `yaml:"splunk_hec_ingest_timeout"`
	SplunkHecMaxConnectionLifetime    string   `yaml:"splunk_hec_max_connection_lifetime"`
	SplunkHecSendTimeout              string   `yaml:"splunk_hec_send_timeout"`
	SplunkHecSubmissionWorkers        int      `yaml:"splunk_hec_submission_workers"`
	SplunkHecTLSValidateHostname      string   `yaml:"splunk_hec_tls_validate_hostname"`
	SplunkHecToken                    string   `yaml:"splunk_hec_token"`
	SplunkSpanSampleRate              int      `yaml:"splunk_span_sample_rate"`
	SsfBufferSize                     int      `yaml:"ssf_buffer_size"`
	SsfListenAddresses                []string `yaml:"ssf_listen_addresses"`
	StatsAddress                      string   `yaml:"stats_address"`
	StatsdListenAddresses             []string `yaml:"statsd_listen_addresses"`
	SynchronizeWithInterval           bool     `yaml:"synchronize_with_interval"`
	Tags                              []string `yaml:"tags"`
	TagsExclude                       []string `yaml:"tags_exclude"`
	TLSAuthorityCertificate           string   `yaml:"tls_authority_certificate"`
	TLSCertificate                    string   `yaml:"tls_certificate"`
	TLSKey                            string   `yaml:"tls_key"`
	TraceLightstepAccessToken         string   `yaml:"trace_lightstep_access_token"`
	TraceLightstepCollectorHost       string   `yaml:"trace_lightstep_collector_host"`
	TraceLightstepMaximumSpans        int      `yaml:"trace_lightstep_maximum_spans"`
	TraceLightstepNumClients          int      `yaml:"trace_lightstep_num_clients"`
	TraceLightstepReconnectPeriod     string   `yaml:"trace_lightstep_reconnect_period"`
	TraceMaxLengthBytes               int      `yaml:"trace_max_length_bytes"`
}

func ReadConfig

func ReadConfig(path string) (c Config, err error)

ReadConfig unmarshals the config file and slurps in its data. ReadConfig can return an error of type *UnknownConfigKeys, which means that the file is usable, but contains unknown fields.

func (Config) ParseInterval

func (c Config) ParseInterval() (time.Duration, error)

ParseInterval handles parsing the flush interval as a time.Duration

type Consul added in v1.5.0

type Consul struct {
	ConsulHealth *api.Health
}

Consul is a Discoverer that uses Consul to find healthy instances of a given name.

func NewConsul added in v1.5.0

func NewConsul(config *api.Config) (*Consul, error)

NewConsul creates a new instance of a Consul Discoverer

func (*Consul) GetDestinationsForService added in v1.5.0

func (c *Consul) GetDestinationsForService(serviceName string) ([]string, error)

GetDestinationsForService updates the list of destinations based on healthy nodes found via Consul. It returns destinations in the form "<host>:<port>".

type DatadogTraceSpan

type DatadogTraceSpan struct {
	Duration int64              `json:"duration"`
	Error    int64              `json:"error"`
	Meta     map[string]string  `json:"meta"`
	Metrics  map[string]float64 `json:"metrics"`
	Name     string             `json:"name"`
	ParentID int64              `json:"parent_id,omitempty"`
	Resource string             `json:"resource,omitempty"`
	Service  string             `json:"service"`
	SpanID   int64              `json:"span_id"`
	Start    int64              `json:"start"`
	TraceID  int64              `json:"trace_id"`
	Type     string             `json:"type"`
}

DatadogTraceSpan represents a trace span as JSON for the Datadog tracing API.

type Discoverer added in v1.5.0

type Discoverer interface {
	GetDestinationsForService(string) ([]string, error)
}

Discoverer is an interface for various service discovery mechanisms. You could implement your own by implementing this method! See consul.go

type EventWorker

type EventWorker struct {
	// contains filtered or unexported fields
}

EventWorker is similar to a Worker but it collects events and service checks instead of metrics.

func NewEventWorker

func NewEventWorker(cl *trace.Client, stats *statsd.Client) *EventWorker

NewEventWorker creates an EventWorker ready to collect events and service checks.

func (*EventWorker) Flush

func (ew *EventWorker) Flush() []ssf.SSFSample

Flush returns the EventWorker's stored events and service checks and resets the stored contents.

func (*EventWorker) Work

func (ew *EventWorker) Work()

Work will start the EventWorker listening for events and service checks. This function will never return.

type KubernetesDiscoverer

type KubernetesDiscoverer struct {
	// contains filtered or unexported fields
}

func NewKubernetesDiscoverer

func NewKubernetesDiscoverer() (*KubernetesDiscoverer, error)

func (*KubernetesDiscoverer) GetDestinationsForService

func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([]string, error)

type Proxy added in v1.5.0

type Proxy struct {
	Sentry                     *raven.Client
	Hostname                   string
	ForwardDestinations        *consistent.Consistent
	TraceDestinations          *consistent.Consistent
	ForwardGRPCDestinations    *consistent.Consistent
	Discoverer                 Discoverer
	ConsulForwardService       string
	ConsulTraceService         string
	ConsulForwardGRPCService   string
	ConsulInterval             time.Duration
	MetricsInterval            time.Duration
	ForwardDestinationsMtx     sync.Mutex
	TraceDestinationsMtx       sync.Mutex
	ForwardGRPCDestinationsMtx sync.Mutex
	HTTPAddr                   string
	HTTPClient                 *http.Client
	AcceptingForwards          bool
	AcceptingTraces            bool
	AcceptingGRPCForwards      bool
	ForwardTimeout             time.Duration

	TraceClient *trace.Client
	// contains filtered or unexported fields
}

func NewProxyFromConfig added in v1.5.0

func NewProxyFromConfig(logger *logrus.Logger, conf ProxyConfig) (p Proxy, err error)

func (*Proxy) HTTPServe added in v1.5.0

func (p *Proxy) HTTPServe()

HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.

func (*Proxy) Handler added in v1.5.0

func (p *Proxy) Handler() http.Handler

Handler returns the Handler responsible for routing request processing.

func (*Proxy) ProxyMetrics added in v1.5.0

func (p *Proxy) ProxyMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric, origin string)

ProxyMetrics takes a slice of JSONMetrics and breaks them up into multiple HTTP requests by MetricKey using the hash ring.

func (*Proxy) ProxyTraces added in v1.5.0

func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)

func (*Proxy) RefreshDestinations added in v1.5.0

func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)

RefreshDestinations updates the server's list of valid destinations for flushing. This should be called periodically to ensure we have the latest data.

func (*Proxy) ReportRuntimeMetrics

func (p *Proxy) ReportRuntimeMetrics()

func (*Proxy) Serve

func (p *Proxy) Serve()

Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.

func (*Proxy) Shutdown added in v1.5.0

func (p *Proxy) Shutdown()

Shutdown signals the server to shut down after closing all current connections.

func (*Proxy) Start added in v1.5.0

func (p *Proxy) Start()

Start fires up the various goroutines that run on behalf of the server. This is separated from the constructor for testing convenience.

type ProxyConfig added in v1.5.0

type ProxyConfig struct {
	ConsulForwardGrpcServiceName string `yaml:"consul_forward_grpc_service_name"`
	ConsulForwardServiceName     string `yaml:"consul_forward_service_name"`
	ConsulRefreshInterval        string `yaml:"consul_refresh_interval"`
	ConsulTraceServiceName       string `yaml:"consul_trace_service_name"`
	Debug                        bool   `yaml:"debug"`
	EnableProfiling              bool   `yaml:"enable_profiling"`
	ForwardAddress               string `yaml:"forward_address"`
	ForwardTimeout               string `yaml:"forward_timeout"`
	GrpcAddress                  string `yaml:"grpc_address"`
	GrpcForwardAddress           string `yaml:"grpc_forward_address"`
	HTTPAddress                  string `yaml:"http_address"`
	IdleConnectionTimeout        string `yaml:"idle_connection_timeout"`
	MaxIdleConns                 int    `yaml:"max_idle_conns"`
	MaxIdleConnsPerHost          int    `yaml:"max_idle_conns_per_host"`
	RuntimeMetricsInterval       string `yaml:"runtime_metrics_interval"`
	SentryDsn                    string `yaml:"sentry_dsn"`
	SsfDestinationAddress        string `yaml:"ssf_destination_address"`
	StatsAddress                 string `yaml:"stats_address"`
	TraceAddress                 string `yaml:"trace_address"`
	TraceAPIAddress              string `yaml:"trace_api_address"`
	TracingClientCapacity        int    `yaml:"tracing_client_capacity"`
	TracingClientFlushInterval   string `yaml:"tracing_client_flush_interval"`
	TracingClientMetricsInterval string `yaml:"tracing_client_metrics_interval"`
}

func ReadProxyConfig added in v1.5.0

func ReadProxyConfig(path string) (c ProxyConfig, err error)

ReadProxyConfig unmarshals the proxy config file and slurps in its data.

type Server

type Server struct {
	Workers              []*Worker
	EventWorker          *EventWorker
	SpanChan             chan *ssf.SSFSpan
	SpanWorker           *SpanWorker
	SpanWorkerGoroutines int

	Statsd *statsd.Client
	Sentry *raven.Client

	Hostname  string
	Tags      []string
	TagsAsMap map[string]string

	HTTPClient *http.Client

	HTTPAddr string

	ForwardAddr string

	StatsdListenAddrs []net.Addr
	SSFListenAddrs    []net.Addr
	RcvbufBytes       int

	HistogramPercentiles []float64

	HistogramAggregates samplers.HistogramAggregates

	TraceClient *trace.Client
	// contains filtered or unexported fields
}

A Server is the actual veneur instance that will be run.

func NewFromConfig

func NewFromConfig(logger *logrus.Logger, conf Config) (*Server, error)

NewFromConfig creates a new veneur server from a configuration specification and sets up the passed logger according to the configuration.

func (*Server) Flush

func (s *Server) Flush(ctx context.Context)

Flush collects sampler's metrics and passes them to sinks.

func (*Server) HTTPServe

func (s *Server) HTTPServe()

HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.

func (*Server) HandleMetricPacket

func (s *Server) HandleMetricPacket(packet []byte) error

HandleMetricPacket processes each packet that is sent to the server, and sends to an appropriate worker (EventWorker or Worker).

func (*Server) HandleTracePacket

func (s *Server) HandleTracePacket(packet []byte)

HandleTracePacket accepts an incoming packet as bytes and sends it to the appropriate worker.

func (*Server) Handler

func (s *Server) Handler() http.Handler

Handler returns the Handler responsible for routing request processing.

func (*Server) ImportMetrics

func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)

ImportMetrics feeds a slice of json metrics to the server's workers

func (*Server) IsLocal

func (s *Server) IsLocal() bool

IsLocal indicates whether veneur is running as a local instance (forwarding non-local data to a global veneur instance) or is running as a global instance (sending all data directly to the final destination).

func (*Server) ReadMetricSocket

func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)

ReadMetricSocket listens for available packets to handle.

func (*Server) ReadSSFPacketSocket added in v1.7.0

func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)

ReadSSFPacketSocket reads SSF packets off a packet connection.

func (*Server) ReadSSFStreamSocket added in v1.7.0

func (s *Server) ReadSSFStreamSocket(serverConn net.Conn)

ReadSSFStreamSocket reads a streaming connection in framed wire format off a streaming socket. See package github.com/stripe/veneur/protocol for details.

func (*Server) ReadTCPSocket added in v1.1.0

func (s *Server) ReadTCPSocket(listener net.Listener)

ReadTCPSocket listens on Server.TCPAddr for new connections, starting a goroutine for each.

func (*Server) Serve

func (s *Server) Serve()

Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown signals the server to shut down after closing all current connections.

func (*Server) Start added in v1.1.0

func (s *Server) Start()

Start spins up the Server to do actual work, firing off goroutines for various workers and utilities.

type SpanWorker added in v1.6.0

type SpanWorker struct {
	SpanChan <-chan *ssf.SSFSpan
	// contains filtered or unexported fields
}

SpanWorker is similar to a Worker but it collects events and service checks instead of metrics.

func NewSpanWorker added in v1.6.0

func NewSpanWorker(sinks []sinks.SpanSink, cl *trace.Client, statsd *statsd.Client, spanChan <-chan *ssf.SSFSpan, commonTags map[string]string) *SpanWorker

NewSpanWorker creates a SpanWorker ready to collect events and service checks.

func (*SpanWorker) Flush added in v1.6.0

func (tw *SpanWorker) Flush()

Flush invokes flush on each sink.

func (*SpanWorker) Work added in v1.6.0

func (tw *SpanWorker) Work()

Work will start the SpanWorker listening for spans. This function will never return.

type UnknownConfigKeys

type UnknownConfigKeys struct {
	// contains filtered or unexported fields
}

UnknownConfigKeys represents a failure to strictly parse a configuration YAML file has failed, indicating that the file contains unknown keys.

func (*UnknownConfigKeys) Error

func (e *UnknownConfigKeys) Error() string

type Worker

type Worker struct {
	PacketChan       chan samplers.UDPMetric
	ImportChan       chan []samplers.JSONMetric
	ImportMetricChan chan []*metricpb.Metric
	QuitChan         chan struct{}
	// contains filtered or unexported fields
}

Worker is the doodad that does work.

func NewWorker

func NewWorker(id int, cl *trace.Client, logger *logrus.Logger, stats *statsd.Client) *Worker

NewWorker creates, and returns a new Worker object.

func (*Worker) Flush

func (w *Worker) Flush() WorkerMetrics

Flush resets the worker's internal metrics and returns their contents.

func (*Worker) ImportMetric

func (w *Worker) ImportMetric(other samplers.JSONMetric)

ImportMetric receives a metric from another veneur instance

func (*Worker) ImportMetricGRPC

func (w *Worker) ImportMetricGRPC(other *metricpb.Metric) (err error)

ImportMetricGRPC receives a metric from another veneur instance over gRPC.

In practice, this is only called when in the aggregation tier, so we don't handle LocalOnly scope.

func (*Worker) IngestMetrics

func (w *Worker) IngestMetrics(ms []*metricpb.Metric)

func (*Worker) IngestUDP

func (w *Worker) IngestUDP(metric samplers.UDPMetric)

IngestUDP on a Worker feeds the metric into the worker's PacketChan.

func (*Worker) MetricsProcessedCount added in v1.2.0

func (w *Worker) MetricsProcessedCount() int64

MetricsProcessedCount is a convenince method for testing that allows us to fetch the Worker's processed count in a non-racey way.

func (*Worker) ProcessMetric

func (w *Worker) ProcessMetric(m *samplers.UDPMetric)

ProcessMetric takes a Metric and samples it

func (*Worker) Stop

func (w *Worker) Stop()

Stop tells the worker to stop listening for work requests.

Note that the worker will only stop *after* it has finished its work.

func (*Worker) Work

func (w *Worker) Work()

Work will start the worker listening for metrics to process or import. It will not return until the worker is sent a message to terminate using Stop()

type WorkerMetrics

type WorkerMetrics struct {
	// contains filtered or unexported fields
}

WorkerMetrics is just a plain struct bundling together the flushed contents of a worker

func NewWorkerMetrics

func NewWorkerMetrics() WorkerMetrics

NewWorkerMetrics initializes a WorkerMetrics struct

func (WorkerMetrics) ForwardableMetrics

func (wm WorkerMetrics) ForwardableMetrics(cl *trace.Client) []*metricpb.Metric

ForwardableMetrics converts all metrics that should be forwarded to metricpb.Metric (protobuf-compatible).

func (WorkerMetrics) Upsert

func (wm WorkerMetrics) Upsert(mk samplers.MetricKey, Scope samplers.MetricScope, tags []string) bool

Upsert creates an entry on the WorkerMetrics struct for the given metrickey (if one does not already exist) and updates the existing entry (if one already exists). Returns true if the metric entry was created and false otherwise.

Directories

Path Synopsis
cmd
Package forwardrpc is a generated protocol buffer package.
Package forwardrpc is a generated protocol buffer package.
Package importsrv receives metrics over gRPC and sends them to workers.
Package importsrv receives metrics over gRPC and sends them to workers.
internal
s3
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
Package proxysrv proxies metrics over gRPC to global veneurs using consistent hashing The Server provided accepts a hash ring of destinations, and then listens for metrics over gRPC.
Package proxysrv proxies metrics over gRPC to global veneurs using consistent hashing The Server provided accepts a hash ring of destinations, and then listens for metrics over gRPC.
metricpb
Package metricpb is a generated protocol buffer package.
Package metricpb is a generated protocol buffer package.
grpsink
Package grpsink is a generated protocol buffer package.
Package grpsink is a generated protocol buffer package.
ssfmetrics
Package ssfmetrics provides sinks that are used by veneur internally.
Package ssfmetrics provides sinks that are used by veneur internally.
Package ssf provides an implementation of the Sensor Sensibility Format.
Package ssf provides an implementation of the Sensor Sensibility Format.
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
Package trace provies an experimental API for initiating traces.
Package trace provies an experimental API for initiating traces.
metrics
Package metrics provides routines for conveniently reporting metrics attached to SSF spans.
Package metrics provides routines for conveniently reporting metrics attached to SSF spans.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL