Documentation ¶
Index ¶
- Constants
- Variables
- func CalculateTickDelay(interval time.Duration, t time.Time) time.Duration
- func ConsumePanic(sentry *raven.Client, cl *trace.Client, hostname string, err interface{})
- func NewSocket(addr *net.UDPAddr, recvBuf int, reuseport bool) (net.PacketConn, error)
- func SetLogger(logger *logrus.Logger)
- func StartSSF(s *Server, a net.Addr, tracePool *sync.Pool) net.Addr
- func StartStatsd(s *Server, a net.Addr, packetPool *sync.Pool) net.Addr
- type Config
- type Consul
- type DatadogTraceSpan
- type Discoverer
- type EventWorker
- type KubernetesDiscoverer
- type Proxy
- func (p *Proxy) HTTPServe()
- func (p *Proxy) Handler() http.Handler
- func (p *Proxy) ProxyMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric, origin string)
- func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)
- func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)
- func (p *Proxy) ReportRuntimeMetrics()
- func (p *Proxy) Serve()
- func (p *Proxy) Shutdown()
- func (p *Proxy) Start()
- type ProxyConfig
- type Server
- func (s *Server) Flush(ctx context.Context)
- func (s *Server) HTTPServe()
- func (s *Server) HandleMetricPacket(packet []byte) error
- func (s *Server) HandleTracePacket(packet []byte)
- func (s *Server) Handler() http.Handler
- func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
- func (s *Server) IsLocal() bool
- func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)
- func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)
- func (s *Server) ReadSSFStreamSocket(serverConn net.Conn)
- func (s *Server) ReadTCPSocket(listener net.Listener)
- func (s *Server) Serve()
- func (s *Server) Shutdown()
- func (s *Server) Start()
- type SpanWorker
- type UnknownConfigKeys
- type Worker
- func (w *Worker) Flush() WorkerMetrics
- func (w *Worker) ImportMetric(other samplers.JSONMetric)
- func (w *Worker) ImportMetricGRPC(other *metricpb.Metric) (err error)
- func (w *Worker) IngestMetrics(ms []*metricpb.Metric)
- func (w *Worker) IngestUDP(metric samplers.UDPMetric)
- func (w *Worker) MetricsProcessedCount() int64
- func (w *Worker) ProcessMetric(m *samplers.UDPMetric)
- func (w *Worker) Stop()
- func (w *Worker) Work()
- type WorkerMetrics
Constants ¶
const REDACTED = "REDACTED"
REDACTED is used to replace values that we don't want to leak into loglines (e.g., credentials)
Variables ¶
var BUILD_DATE = defaultLinkValue
var VERSION = defaultLinkValue
VERSION stores the current veneur version. It must be a var so it can be set at link time.
Functions ¶
func CalculateTickDelay ¶ added in v1.8.0
CalculateTickDelay takes the provided time, `Truncate`s it a rounded-down multiple of `interval`, then adds `interval` back to find the "next" tick.
func ConsumePanic ¶ added in v1.5.0
ConsumePanic is intended to be called inside a deferred function when recovering from a panic. It accepts the value of recover() as its only argument, and reports the panic to Sentry, prints the stack, and then repanics (to ensure your program terminates)
func StartSSF ¶ added in v1.7.0
StartSSF starts listening for SSF on an address a, and returns the concrete address that the server is listening on.
func StartStatsd ¶ added in v1.7.0
StartStatsd spawns a goroutine that listens for metrics in statsd format on the address a, and returns the concrete listening address. As this is a setup routine, if any error occurs, it panics.
Types ¶
type Config ¶
type Config struct { Aggregates []string `yaml:"aggregates"` AwsAccessKeyID string `yaml:"aws_access_key_id"` AwsRegion string `yaml:"aws_region"` AwsS3Bucket string `yaml:"aws_s3_bucket"` AwsSecretAccessKey string `yaml:"aws_secret_access_key"` BlockProfileRate int `yaml:"block_profile_rate"` DatadogAPIHostname string `yaml:"datadog_api_hostname"` DatadogAPIKey string `yaml:"datadog_api_key"` DatadogFlushMaxPerBody int `yaml:"datadog_flush_max_per_body"` DatadogSpanBufferSize int `yaml:"datadog_span_buffer_size"` DatadogTraceAPIAddress string `yaml:"datadog_trace_api_address"` Debug bool `yaml:"debug"` DebugFlushedMetrics bool `yaml:"debug_flushed_metrics"` DebugIngestedSpans bool `yaml:"debug_ingested_spans"` EnableProfiling bool `yaml:"enable_profiling"` FalconerAddress string `yaml:"falconer_address"` FlushFile string `yaml:"flush_file"` FlushMaxPerBody int `yaml:"flush_max_per_body"` ForwardAddress string `yaml:"forward_address"` ForwardUseGrpc bool `yaml:"forward_use_grpc"` GrpcAddress string `yaml:"grpc_address"` Hostname string `yaml:"hostname"` HTTPAddress string `yaml:"http_address"` IndicatorSpanTimerName string `yaml:"indicator_span_timer_name"` Interval string `yaml:"interval"` KafkaBroker string `yaml:"kafka_broker"` KafkaCheckTopic string `yaml:"kafka_check_topic"` KafkaEventTopic string `yaml:"kafka_event_topic"` KafkaMetricBufferBytes int `yaml:"kafka_metric_buffer_bytes"` KafkaMetricBufferFrequency string `yaml:"kafka_metric_buffer_frequency"` KafkaMetricBufferMessages int `yaml:"kafka_metric_buffer_messages"` KafkaMetricRequireAcks string `yaml:"kafka_metric_require_acks"` KafkaMetricTopic string `yaml:"kafka_metric_topic"` KafkaPartitioner string `yaml:"kafka_partitioner"` KafkaRetryMax int `yaml:"kafka_retry_max"` KafkaSpanBufferBytes int `yaml:"kafka_span_buffer_bytes"` KafkaSpanBufferFrequency string `yaml:"kafka_span_buffer_frequency"` KafkaSpanBufferMesages int `yaml:"kafka_span_buffer_mesages"` KafkaSpanRequireAcks string `yaml:"kafka_span_require_acks"` KafkaSpanSampleRatePercent int `yaml:"kafka_span_sample_rate_percent"` KafkaSpanSampleTag string `yaml:"kafka_span_sample_tag"` KafkaSpanSerializationFormat string `yaml:"kafka_span_serialization_format"` KafkaSpanTopic string `yaml:"kafka_span_topic"` LightstepAccessToken string `yaml:"lightstep_access_token"` LightstepCollectorHost string `yaml:"lightstep_collector_host"` LightstepMaximumSpans int `yaml:"lightstep_maximum_spans"` LightstepNumClients int `yaml:"lightstep_num_clients"` LightstepReconnectPeriod string `yaml:"lightstep_reconnect_period"` MetricMaxLength int `yaml:"metric_max_length"` MutexProfileFraction int `yaml:"mutex_profile_fraction"` NumReaders int `yaml:"num_readers"` NumSpanWorkers int `yaml:"num_span_workers"` NumWorkers int `yaml:"num_workers"` OmitEmptyHostname bool `yaml:"omit_empty_hostname"` Percentiles []float64 `yaml:"percentiles"` ReadBufferSizeBytes int `yaml:"read_buffer_size_bytes"` SentryDsn string `yaml:"sentry_dsn"` SignalfxAPIKey string `yaml:"signalfx_api_key"` SignalfxEndpointBase string `yaml:"signalfx_endpoint_base"` SignalfxHostnameTag string `yaml:"signalfx_hostname_tag"` SignalfxMetricNamePrefixDrops []string `yaml:"signalfx_metric_name_prefix_drops"` SignalfxMetricTagPrefixDrops []string `yaml:"signalfx_metric_tag_prefix_drops"` SignalfxPerTagAPIKeys []struct { APIKey string `yaml:"api_key"` Name string `yaml:"name"` } `yaml:"signalfx_per_tag_api_keys"` SignalfxVaryKeyBy string `yaml:"signalfx_vary_key_by"` SpanChannelCapacity int `yaml:"span_channel_capacity"` SplunkHecAddress string `yaml:"splunk_hec_address"` SplunkHecBatchSize int `yaml:"splunk_hec_batch_size"` SplunkHecConnectionLifetimeJitter string `yaml:"splunk_hec_connection_lifetime_jitter"` SplunkHecIngestTimeout string `yaml:"splunk_hec_ingest_timeout"` SplunkHecMaxConnectionLifetime string `yaml:"splunk_hec_max_connection_lifetime"` SplunkHecSendTimeout string `yaml:"splunk_hec_send_timeout"` SplunkHecSubmissionWorkers int `yaml:"splunk_hec_submission_workers"` SplunkHecTLSValidateHostname string `yaml:"splunk_hec_tls_validate_hostname"` SplunkHecToken string `yaml:"splunk_hec_token"` SplunkSpanSampleRate int `yaml:"splunk_span_sample_rate"` SsfBufferSize int `yaml:"ssf_buffer_size"` SsfListenAddresses []string `yaml:"ssf_listen_addresses"` StatsAddress string `yaml:"stats_address"` StatsdListenAddresses []string `yaml:"statsd_listen_addresses"` SynchronizeWithInterval bool `yaml:"synchronize_with_interval"` Tags []string `yaml:"tags"` TagsExclude []string `yaml:"tags_exclude"` TLSAuthorityCertificate string `yaml:"tls_authority_certificate"` TLSCertificate string `yaml:"tls_certificate"` TLSKey string `yaml:"tls_key"` TraceLightstepAccessToken string `yaml:"trace_lightstep_access_token"` TraceLightstepCollectorHost string `yaml:"trace_lightstep_collector_host"` TraceLightstepMaximumSpans int `yaml:"trace_lightstep_maximum_spans"` TraceLightstepNumClients int `yaml:"trace_lightstep_num_clients"` TraceLightstepReconnectPeriod string `yaml:"trace_lightstep_reconnect_period"` TraceMaxLengthBytes int `yaml:"trace_max_length_bytes"` }
func ReadConfig ¶
ReadConfig unmarshals the config file and slurps in its data. ReadConfig can return an error of type *UnknownConfigKeys, which means that the file is usable, but contains unknown fields.
type Consul ¶ added in v1.5.0
Consul is a Discoverer that uses Consul to find healthy instances of a given name.
type DatadogTraceSpan ¶
type DatadogTraceSpan struct { Duration int64 `json:"duration"` Error int64 `json:"error"` Meta map[string]string `json:"meta"` Metrics map[string]float64 `json:"metrics"` Name string `json:"name"` ParentID int64 `json:"parent_id,omitempty"` Resource string `json:"resource,omitempty"` Service string `json:"service"` SpanID int64 `json:"span_id"` Start int64 `json:"start"` TraceID int64 `json:"trace_id"` Type string `json:"type"` }
DatadogTraceSpan represents a trace span as JSON for the Datadog tracing API.
type Discoverer ¶ added in v1.5.0
Discoverer is an interface for various service discovery mechanisms. You could implement your own by implementing this method! See consul.go
type EventWorker ¶
type EventWorker struct {
// contains filtered or unexported fields
}
EventWorker is similar to a Worker but it collects events and service checks instead of metrics.
func NewEventWorker ¶
func NewEventWorker(cl *trace.Client, stats *statsd.Client) *EventWorker
NewEventWorker creates an EventWorker ready to collect events and service checks.
func (*EventWorker) Flush ¶
func (ew *EventWorker) Flush() []ssf.SSFSample
Flush returns the EventWorker's stored events and service checks and resets the stored contents.
func (*EventWorker) Work ¶
func (ew *EventWorker) Work()
Work will start the EventWorker listening for events and service checks. This function will never return.
type KubernetesDiscoverer ¶
type KubernetesDiscoverer struct {
// contains filtered or unexported fields
}
func NewKubernetesDiscoverer ¶
func NewKubernetesDiscoverer() (*KubernetesDiscoverer, error)
func (*KubernetesDiscoverer) GetDestinationsForService ¶
func (kd *KubernetesDiscoverer) GetDestinationsForService(serviceName string) ([]string, error)
type Proxy ¶ added in v1.5.0
type Proxy struct { Sentry *raven.Client Hostname string ForwardDestinations *consistent.Consistent TraceDestinations *consistent.Consistent ForwardGRPCDestinations *consistent.Consistent Discoverer Discoverer ConsulForwardService string ConsulTraceService string ConsulForwardGRPCService string ConsulInterval time.Duration MetricsInterval time.Duration ForwardDestinationsMtx sync.Mutex TraceDestinationsMtx sync.Mutex ForwardGRPCDestinationsMtx sync.Mutex HTTPAddr string HTTPClient *http.Client AcceptingForwards bool AcceptingTraces bool AcceptingGRPCForwards bool ForwardTimeout time.Duration TraceClient *trace.Client // contains filtered or unexported fields }
func NewProxyFromConfig ¶ added in v1.5.0
func NewProxyFromConfig(logger *logrus.Logger, conf ProxyConfig) (p Proxy, err error)
func (*Proxy) HTTPServe ¶ added in v1.5.0
func (p *Proxy) HTTPServe()
HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.
func (*Proxy) Handler ¶ added in v1.5.0
Handler returns the Handler responsible for routing request processing.
func (*Proxy) ProxyMetrics ¶ added in v1.5.0
ProxyMetrics takes a slice of JSONMetrics and breaks them up into multiple HTTP requests by MetricKey using the hash ring.
func (*Proxy) ProxyTraces ¶ added in v1.5.0
func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)
func (*Proxy) RefreshDestinations ¶ added in v1.5.0
func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)
RefreshDestinations updates the server's list of valid destinations for flushing. This should be called periodically to ensure we have the latest data.
func (*Proxy) ReportRuntimeMetrics ¶
func (p *Proxy) ReportRuntimeMetrics()
func (*Proxy) Serve ¶
func (p *Proxy) Serve()
Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.
type ProxyConfig ¶ added in v1.5.0
type ProxyConfig struct { ConsulForwardGrpcServiceName string `yaml:"consul_forward_grpc_service_name"` ConsulForwardServiceName string `yaml:"consul_forward_service_name"` ConsulRefreshInterval string `yaml:"consul_refresh_interval"` ConsulTraceServiceName string `yaml:"consul_trace_service_name"` Debug bool `yaml:"debug"` EnableProfiling bool `yaml:"enable_profiling"` ForwardAddress string `yaml:"forward_address"` ForwardTimeout string `yaml:"forward_timeout"` GrpcAddress string `yaml:"grpc_address"` GrpcForwardAddress string `yaml:"grpc_forward_address"` HTTPAddress string `yaml:"http_address"` IdleConnectionTimeout string `yaml:"idle_connection_timeout"` MaxIdleConns int `yaml:"max_idle_conns"` MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` RuntimeMetricsInterval string `yaml:"runtime_metrics_interval"` SentryDsn string `yaml:"sentry_dsn"` SsfDestinationAddress string `yaml:"ssf_destination_address"` StatsAddress string `yaml:"stats_address"` TraceAddress string `yaml:"trace_address"` TraceAPIAddress string `yaml:"trace_api_address"` TracingClientCapacity int `yaml:"tracing_client_capacity"` TracingClientFlushInterval string `yaml:"tracing_client_flush_interval"` TracingClientMetricsInterval string `yaml:"tracing_client_metrics_interval"` }
func ReadProxyConfig ¶ added in v1.5.0
func ReadProxyConfig(path string) (c ProxyConfig, err error)
ReadProxyConfig unmarshals the proxy config file and slurps in its data.
type Server ¶
type Server struct { Workers []*Worker EventWorker *EventWorker SpanChan chan *ssf.SSFSpan SpanWorker *SpanWorker SpanWorkerGoroutines int Statsd *statsd.Client Sentry *raven.Client Hostname string Tags []string TagsAsMap map[string]string HTTPClient *http.Client HTTPAddr string ForwardAddr string StatsdListenAddrs []net.Addr SSFListenAddrs []net.Addr RcvbufBytes int HistogramPercentiles []float64 HistogramAggregates samplers.HistogramAggregates TraceClient *trace.Client // contains filtered or unexported fields }
A Server is the actual veneur instance that will be run.
func NewFromConfig ¶
NewFromConfig creates a new veneur server from a configuration specification and sets up the passed logger according to the configuration.
func (*Server) HTTPServe ¶
func (s *Server) HTTPServe()
HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.
func (*Server) HandleMetricPacket ¶
HandleMetricPacket processes each packet that is sent to the server, and sends to an appropriate worker (EventWorker or Worker).
func (*Server) HandleTracePacket ¶
HandleTracePacket accepts an incoming packet as bytes and sends it to the appropriate worker.
func (*Server) ImportMetrics ¶
func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
ImportMetrics feeds a slice of json metrics to the server's workers
func (*Server) IsLocal ¶
IsLocal indicates whether veneur is running as a local instance (forwarding non-local data to a global veneur instance) or is running as a global instance (sending all data directly to the final destination).
func (*Server) ReadMetricSocket ¶
func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)
ReadMetricSocket listens for available packets to handle.
func (*Server) ReadSSFPacketSocket ¶ added in v1.7.0
func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)
ReadSSFPacketSocket reads SSF packets off a packet connection.
func (*Server) ReadSSFStreamSocket ¶ added in v1.7.0
ReadSSFStreamSocket reads a streaming connection in framed wire format off a streaming socket. See package github.com/stripe/veneur/protocol for details.
func (*Server) ReadTCPSocket ¶ added in v1.1.0
ReadTCPSocket listens on Server.TCPAddr for new connections, starting a goroutine for each.
func (*Server) Serve ¶
func (s *Server) Serve()
Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.
type SpanWorker ¶ added in v1.6.0
SpanWorker is similar to a Worker but it collects events and service checks instead of metrics.
func NewSpanWorker ¶ added in v1.6.0
func NewSpanWorker(sinks []sinks.SpanSink, cl *trace.Client, statsd *statsd.Client, spanChan <-chan *ssf.SSFSpan, commonTags map[string]string) *SpanWorker
NewSpanWorker creates a SpanWorker ready to collect events and service checks.
func (*SpanWorker) Flush ¶ added in v1.6.0
func (tw *SpanWorker) Flush()
Flush invokes flush on each sink.
func (*SpanWorker) Work ¶ added in v1.6.0
func (tw *SpanWorker) Work()
Work will start the SpanWorker listening for spans. This function will never return.
type UnknownConfigKeys ¶
type UnknownConfigKeys struct {
// contains filtered or unexported fields
}
UnknownConfigKeys represents a failure to strictly parse a configuration YAML file has failed, indicating that the file contains unknown keys.
func (*UnknownConfigKeys) Error ¶
func (e *UnknownConfigKeys) Error() string
type Worker ¶
type Worker struct { PacketChan chan samplers.UDPMetric ImportChan chan []samplers.JSONMetric ImportMetricChan chan []*metricpb.Metric QuitChan chan struct{} // contains filtered or unexported fields }
Worker is the doodad that does work.
func (*Worker) Flush ¶
func (w *Worker) Flush() WorkerMetrics
Flush resets the worker's internal metrics and returns their contents.
func (*Worker) ImportMetric ¶
func (w *Worker) ImportMetric(other samplers.JSONMetric)
ImportMetric receives a metric from another veneur instance
func (*Worker) ImportMetricGRPC ¶
ImportMetricGRPC receives a metric from another veneur instance over gRPC.
In practice, this is only called when in the aggregation tier, so we don't handle LocalOnly scope.
func (*Worker) IngestMetrics ¶
func (*Worker) MetricsProcessedCount ¶ added in v1.2.0
MetricsProcessedCount is a convenince method for testing that allows us to fetch the Worker's processed count in a non-racey way.
func (*Worker) ProcessMetric ¶
ProcessMetric takes a Metric and samples it
type WorkerMetrics ¶
type WorkerMetrics struct {
// contains filtered or unexported fields
}
WorkerMetrics is just a plain struct bundling together the flushed contents of a worker
func NewWorkerMetrics ¶
func NewWorkerMetrics() WorkerMetrics
NewWorkerMetrics initializes a WorkerMetrics struct
func (WorkerMetrics) ForwardableMetrics ¶
func (wm WorkerMetrics) ForwardableMetrics(cl *trace.Client) []*metricpb.Metric
ForwardableMetrics converts all metrics that should be forwarded to metricpb.Metric (protobuf-compatible).
func (WorkerMetrics) Upsert ¶
func (wm WorkerMetrics) Upsert(mk samplers.MetricKey, Scope samplers.MetricScope, tags []string) bool
Upsert creates an entry on the WorkerMetrics struct for the given metrickey (if one does not already exist) and updates the existing entry (if one already exists). Returns true if the metric entry was created and false otherwise.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package forwardrpc is a generated protocol buffer package.
|
Package forwardrpc is a generated protocol buffer package. |
Package importsrv receives metrics over gRPC and sends them to workers.
|
Package importsrv receives metrics over gRPC and sends them to workers. |
internal
|
|
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
|
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium. |
Package proxysrv proxies metrics over gRPC to global veneurs using consistent hashing The Server provided accepts a hash ring of destinations, and then listens for metrics over gRPC.
|
Package proxysrv proxies metrics over gRPC to global veneurs using consistent hashing The Server provided accepts a hash ring of destinations, and then listens for metrics over gRPC. |
metricpb
Package metricpb is a generated protocol buffer package.
|
Package metricpb is a generated protocol buffer package. |
grpsink
Package grpsink is a generated protocol buffer package.
|
Package grpsink is a generated protocol buffer package. |
ssfmetrics
Package ssfmetrics provides sinks that are used by veneur internally.
|
Package ssfmetrics provides sinks that are used by veneur internally. |
Package ssf provides an implementation of the Sensor Sensibility Format.
|
Package ssf provides an implementation of the Sensor Sensibility Format. |
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
|
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications. |
Package trace provies an experimental API for initiating traces.
|
Package trace provies an experimental API for initiating traces. |
metrics
Package metrics provides routines for conveniently reporting metrics attached to SSF spans.
|
Package metrics provides routines for conveniently reporting metrics attached to SSF spans. |