Documentation ¶
Index ¶
- Constants
- Variables
- func CalculateTickDelay(interval time.Duration, t time.Time) time.Duration
- func ConsumePanic(sentry *raven.Client, stats *statsd.Client, hostname string, err interface{})
- func NewSocket(addr *net.UDPAddr, recvBuf int, reuseport bool) (net.PacketConn, error)
- func StartSSF(s *Server, a net.Addr, tracePool *sync.Pool)
- func StartStatsd(s *Server, a net.Addr, packetPool *sync.Pool)
- type Config
- type Consul
- type DatadogTraceSpan
- type Discoverer
- type EventWorker
- type Proxy
- func (p *Proxy) HTTPServe()
- func (p *Proxy) Handler() http.Handler
- func (p *Proxy) ProxyMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
- func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)
- func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)
- func (p *Proxy) Shutdown()
- func (p *Proxy) Start()
- type ProxyConfig
- type Server
- func (s *Server) Flush(ctx context.Context)
- func (s *Server) HTTPServe()
- func (s *Server) HandleMetricPacket(packet []byte) error
- func (s *Server) HandleTracePacket(packet []byte)
- func (s *Server) Handler() http.Handler
- func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
- func (s *Server) IsLocal() bool
- func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)
- func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)
- func (s *Server) ReadSSFStreamSocket(serverConn net.Conn)
- func (s *Server) ReadTCPSocket(listener net.Listener)
- func (s *Server) Shutdown()
- func (s *Server) Start()
- type SpanWorker
- type Worker
- type WorkerMetrics
Constants ¶
const REDACTED = "REDACTED"
REDACTED is used to replace values that we don't want to leak into loglines (e.g., credentials)
Variables ¶
var BUILD_DATE = defaultLinkValue
var ErrNoSpanWorker = fmt.Errorf("Can not submit traces to an unstarted server")
var VERSION = defaultLinkValue
VERSION stores the current veneur version. It must be a var so it can be set at link time.
Functions ¶
func CalculateTickDelay ¶ added in v1.8.0
CalculateTickDelay takes the provided time, `Truncate`s it a rounded-down multiple of `interval`, then adds `interval` back to find the "next" tick.
func ConsumePanic ¶ added in v1.5.0
ConsumePanic is intended to be called inside a deferred function when recovering from a panic. It accepts the value of recover() as its only argument, and reports the panic to Sentry, prints the stack, and then repanics (to ensure your program terminates)
Types ¶
type Config ¶
type Config struct { Aggregates []string `yaml:"aggregates"` APIHostname string `yaml:"api_hostname"` AwsAccessKeyID string `yaml:"aws_access_key_id"` AwsRegion string `yaml:"aws_region"` AwsS3Bucket string `yaml:"aws_s3_bucket"` AwsSecretAccessKey string `yaml:"aws_secret_access_key"` DatadogAPIHostname string `yaml:"datadog_api_hostname"` DatadogAPIKey string `yaml:"datadog_api_key"` DatadogTraceAPIAddress string `yaml:"datadog_trace_api_address"` Debug bool `yaml:"debug"` EnableProfiling bool `yaml:"enable_profiling"` FlushFile string `yaml:"flush_file"` FlushMaxPerBody int `yaml:"flush_max_per_body"` ForwardAddress string `yaml:"forward_address"` Hostname string `yaml:"hostname"` HTTPAddress string `yaml:"http_address"` IndicatorSpanTimerName string `yaml:"indicator_span_timer_name"` Interval string `yaml:"interval"` KafkaBroker string `yaml:"kafka_broker"` KafkaCheckTopic string `yaml:"kafka_check_topic"` KafkaEventTopic string `yaml:"kafka_event_topic"` KafkaMetricBufferBytes int `yaml:"kafka_metric_buffer_bytes"` KafkaMetricBufferFrequency string `yaml:"kafka_metric_buffer_frequency"` KafkaMetricBufferMessages int `yaml:"kafka_metric_buffer_messages"` KafkaMetricRequireAcks string `yaml:"kafka_metric_require_acks"` KafkaMetricTopic string `yaml:"kafka_metric_topic"` KafkaPartitioner string `yaml:"kafka_partitioner"` KafkaRetryMax int `yaml:"kafka_retry_max"` KafkaSpanBufferBytes int `yaml:"kafka_span_buffer_bytes"` KafkaSpanBufferFrequency string `yaml:"kafka_span_buffer_frequency"` KafkaSpanBufferMesages int `yaml:"kafka_span_buffer_mesages"` KafkaSpanRequireAcks string `yaml:"kafka_span_require_acks"` KafkaSpanSerializationFormat string `yaml:"kafka_span_serialization_format"` KafkaSpanTopic string `yaml:"kafka_span_topic"` Key string `yaml:"key"` MetricMaxLength int `yaml:"metric_max_length"` NumReaders int `yaml:"num_readers"` NumWorkers int `yaml:"num_workers"` OmitEmptyHostname bool `yaml:"omit_empty_hostname"` Percentiles []float64 `yaml:"percentiles"` ReadBufferSizeBytes int `yaml:"read_buffer_size_bytes"` SentryDsn string `yaml:"sentry_dsn"` SignalfxAPIKey string `yaml:"signalfx_api_key"` SignalfxEndpointBase string `yaml:"signalfx_endpoint_base"` SignalfxHostnameTag string `yaml:"signalfx_hostname_tag"` SsfAddress string `yaml:"ssf_address"` SsfBufferSize int `yaml:"ssf_buffer_size"` SsfListenAddresses []string `yaml:"ssf_listen_addresses"` StatsAddress string `yaml:"stats_address"` StatsdListenAddresses []string `yaml:"statsd_listen_addresses"` SynchronizeWithInterval bool `yaml:"synchronize_with_interval"` Tags []string `yaml:"tags"` TcpAddress string `yaml:"tcp_address"` TLSAuthorityCertificate string `yaml:"tls_authority_certificate"` TLSCertificate string `yaml:"tls_certificate"` TLSKey string `yaml:"tls_key"` TraceAddress string `yaml:"trace_address"` TraceAPIAddress string `yaml:"trace_api_address"` TraceLightstepAccessToken string `yaml:"trace_lightstep_access_token"` TraceLightstepCollectorHost string `yaml:"trace_lightstep_collector_host"` TraceLightstepMaximumSpans int `yaml:"trace_lightstep_maximum_spans"` TraceLightstepNumClients int `yaml:"trace_lightstep_num_clients"` TraceLightstepReconnectPeriod string `yaml:"trace_lightstep_reconnect_period"` TraceMaxLengthBytes int `yaml:"trace_max_length_bytes"` UdpAddress string `yaml:"udp_address"` }
func ReadConfig ¶
ReadConfig unmarshals the config file and slurps in it's data.
type Consul ¶ added in v1.5.0
Consul is a Discoverer that uses Consul to find healthy instances of a given name.
type DatadogTraceSpan ¶
type DatadogTraceSpan struct { Duration int64 `json:"duration"` Error int64 `json:"error"` Meta map[string]string `json:"meta"` Metrics map[string]float64 `json:"metrics"` Name string `json:"name"` ParentID int64 `json:"parent_id,omitempty"` Resource string `json:"resource,omitempty"` Service string `json:"service"` SpanID int64 `json:"span_id"` Start int64 `json:"start"` TraceID int64 `json:"trace_id"` Type string `json:"type"` }
DatadogTraceSpan represents a trace span as JSON for the Datadog tracing API.
type Discoverer ¶ added in v1.5.0
Discoverer is an interface for various service discovery mechanisms. You could implement your own by implementing this method! See consul.go
type EventWorker ¶
type EventWorker struct { EventChan chan samplers.UDPEvent ServiceCheckChan chan samplers.UDPServiceCheck // contains filtered or unexported fields }
EventWorker is similar to a Worker but it collects events and service checks instead of metrics.
func NewEventWorker ¶
func NewEventWorker(stats *statsd.Client) *EventWorker
NewEventWorker creates an EventWorker ready to collect events and service checks.
func (*EventWorker) Flush ¶
func (ew *EventWorker) Flush() ([]samplers.UDPEvent, []samplers.UDPServiceCheck)
Flush returns the EventWorker's stored events and service checks and resets the stored contents.
func (*EventWorker) Work ¶
func (ew *EventWorker) Work()
Work will start the EventWorker listening for events and service checks. This function will never return.
type Proxy ¶ added in v1.5.0
type Proxy struct { Sentry *raven.Client Hostname string ForwardDestinations *consistent.Consistent TraceDestinations *consistent.Consistent Discoverer Discoverer ConsulForwardService string ConsulTraceService string ConsulInterval time.Duration ForwardDestinationsMtx sync.Mutex TraceDestinationsMtx sync.Mutex HTTPAddr string HTTPClient *http.Client Statsd *statsd.Client AcceptingForwards bool AcceptingTraces bool // contains filtered or unexported fields }
func NewProxyFromConfig ¶ added in v1.5.0
func NewProxyFromConfig(conf ProxyConfig) (p Proxy, err error)
func (*Proxy) HTTPServe ¶ added in v1.5.0
func (p *Proxy) HTTPServe()
HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.
func (*Proxy) Handler ¶ added in v1.5.0
Handler returns the Handler responsible for routing request processing.
func (*Proxy) ProxyMetrics ¶ added in v1.5.0
func (p *Proxy) ProxyMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
ProxyMetrics takes a sliceof JSONMetrics and breaks them up into multiple HTTP requests by MetricKey using the hash ring.
func (*Proxy) ProxyTraces ¶ added in v1.5.0
func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)
func (*Proxy) RefreshDestinations ¶ added in v1.5.0
func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)
RefreshDestinations updates the server's list of valid destinations for flushing. This should be called periodically to ensure we have the latest data.
type ProxyConfig ¶ added in v1.5.0
type ProxyConfig struct { ConsulForwardServiceName string `yaml:"consul_forward_service_name"` ConsulRefreshInterval string `yaml:"consul_refresh_interval"` ConsulTraceServiceName string `yaml:"consul_trace_service_name"` Debug bool `yaml:"debug"` EnableProfiling bool `yaml:"enable_profiling"` ForwardAddress string `yaml:"forward_address"` HTTPAddress string `yaml:"http_address"` SentryDsn string `yaml:"sentry_dsn"` StatsAddress string `yaml:"stats_address"` TraceAddress string `yaml:"trace_address"` TraceAPIAddress string `yaml:"trace_api_address"` }
func ReadProxyConfig ¶ added in v1.5.0
func ReadProxyConfig(path string) (c ProxyConfig, err error)
ReadProxyConfig unmarshals the proxy config file and slurps in its data.
type Server ¶
type Server struct { Workers []*Worker EventWorker *EventWorker SpanWorker *SpanWorker Statsd *statsd.Client Sentry *raven.Client Hostname string Tags []string TagsAsMap map[string]string HTTPClient *http.Client HTTPAddr string ForwardAddr string StatsdListenAddrs []net.Addr SSFListenAddrs []net.Addr RcvbufBytes int HistogramPercentiles []float64 HistogramAggregates samplers.HistogramAggregates TraceClient *trace.Client // contains filtered or unexported fields }
A Server is the actual veneur instance that will be run.
func NewFromConfig ¶
NewFromConfig creates a new veneur server from a configuration specification.
func (*Server) HTTPServe ¶
func (s *Server) HTTPServe()
HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.
func (*Server) HandleMetricPacket ¶
HandleMetricPacket processes each packet that is sent to the server, and sends to an appropriate worker (EventWorker or Worker).
func (*Server) HandleTracePacket ¶
HandleTracePacket accepts an incoming packet as bytes and sends it to the appropriate worker.
func (*Server) ImportMetrics ¶
func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
ImportMetrics feeds a slice of json metrics to the server's workers
func (*Server) IsLocal ¶
IsLocal indicates whether veneur is running as a local instance (forwarding non-local data to a global veneur instance) or is running as a global instance (sending all data directly to the final destination).
func (*Server) ReadMetricSocket ¶
func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)
ReadMetricSocket listens for available packets to handle.
func (*Server) ReadSSFPacketSocket ¶ added in v1.7.0
func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)
ReadSSFPacketSocket reads SSF packets off a packet connection.
func (*Server) ReadSSFStreamSocket ¶ added in v1.7.0
ReadSSFStreamSocket reads a streaming connection in framed wire format off a streaming socket. See package github.com/stripe/veneur/protocol for details.
func (*Server) ReadTCPSocket ¶ added in v1.1.0
ReadTCPSocket listens on Server.TCPAddr for new connections, starting a goroutine for each.
type SpanWorker ¶ added in v1.6.0
SpanWorker is similar to a Worker but it collects events and service checks instead of metrics.
func NewSpanWorker ¶ added in v1.6.0
func NewSpanWorker(sinks []sinks.SpanSink, stats *statsd.Client) *SpanWorker
NewSpanWorker creates an SpanWorker ready to collect events and service checks.
func (*SpanWorker) Flush ¶ added in v1.6.0
func (tw *SpanWorker) Flush()
Flush invokes flush on each sink.
func (*SpanWorker) Work ¶ added in v1.6.0
func (tw *SpanWorker) Work()
Work will start the SpanWorker listening for spans. This function will never return.
type Worker ¶
type Worker struct { PacketChan chan samplers.UDPMetric ImportChan chan []samplers.JSONMetric QuitChan chan struct{} // contains filtered or unexported fields }
Worker is the doodad that does work.
func (*Worker) Flush ¶
func (w *Worker) Flush() WorkerMetrics
Flush resets the worker's internal metrics and returns their contents.
func (*Worker) ImportMetric ¶
func (w *Worker) ImportMetric(other samplers.JSONMetric)
ImportMetric receives a metric from another veneur instance
func (*Worker) MetricsProcessedCount ¶ added in v1.2.0
MetricsProcessedCount is a convenince method for testing that allows us to fetch the Worker's processed count in a non-racey way.
func (*Worker) ProcessMetric ¶
ProcessMetric takes a Metric and samples it
This is standalone to facilitate testing
type WorkerMetrics ¶
type WorkerMetrics struct {
// contains filtered or unexported fields
}
WorkerMetrics is just a plain struct bundling together the flushed contents of a worker
func NewWorkerMetrics ¶
func NewWorkerMetrics() WorkerMetrics
NewWorkerMetrics initializes a WorkerMetrics struct
func (WorkerMetrics) Upsert ¶
func (wm WorkerMetrics) Upsert(mk samplers.MetricKey, Scope samplers.MetricScope, tags []string) bool
Upsert creates an entry on the WorkerMetrics struct for the given metrickey (if one does not already exist) and updates the existing entry (if one already exists). Returns true if the metric entry was created and false otherwise.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
|
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium. |
metrics
Package metrics provides sinks that are used by veneur internally.
|
Package metrics provides sinks that are used by veneur internally. |
Package ssf is a generated protocol buffer package.
|
Package ssf is a generated protocol buffer package. |
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
|
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications. |
Package trace provies an experimental API for initiating traces.
|
Package trace provies an experimental API for initiating traces. |