Documentation
¶
Index ¶
- Constants
- Variables
- func CalculateTickDelay(interval time.Duration, t time.Time) time.Duration
- func ConsumePanic(cl *trace.Client, hostname string, err interface{})
- func ForwardGrpcSingle(ctx context.Context, client forwardrpc.ForwardClient, ...) error
- func ForwardGrpcStream(ctx context.Context, client forwardrpc.ForwardClient, ...) error
- func NewSocket(addr *net.UDPAddr, recvBuf int, reuseport bool) (net.PacketConn, error)
- type Config
- type DatadogTraceSpan
- type EventWorker
- type GlobalListeningPerProtocolMetrics
- type GrpcMetricsSource
- type HttpConfig
- type MetricSinkConfig
- type MetricSinkTypes
- type ParsedSourceConfig
- type ProtocolType
- type Proxy
- func (p *Proxy) HTTPServe()
- func (p *Proxy) Handler() http.Handler
- func (p *Proxy) IsListeningHTTP() bool
- func (p *Proxy) ProxyMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric, origin string)
- func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)
- func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)
- func (p *Proxy) ReportRuntimeMetrics()
- func (p *Proxy) Serve()
- func (p *Proxy) Shutdown()
- func (p *Proxy) Start()
- type ProxyConfig
- type ProxyProtocol
- type SentryHook
- type Server
- func (s *Server) Flush(ctx context.Context)
- func (s *Server) FlushWatchdog()
- func (s *Server) HTTPServe()
- func (s *Server) HandleMetricPacket(packet []byte, protocolType ProtocolType) error
- func (s *Server) HandleTracePacket(packet []byte, protocolType ProtocolType)
- func (s *Server) Handler() http.Handler
- func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
- func (s *Server) IsListeningHTTP() bool
- func (s *Server) IsLocal() bool
- func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)
- func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)
- func (s *Server) ReadSSFStreamSocket(serverConn net.Conn)
- func (s *Server) ReadStatsdDatagramSocket(serverConn *net.UnixConn, packetPool *sync.Pool)
- func (s *Server) ReadTCPSocket(listener net.Listener)
- func (s *Server) Serve()
- func (s *Server) Shutdown()
- func (s *Server) Start()
- type ServerConfig
- type SinkConfig
- type SinkRoutingConfig
- type SinkRoutingSinks
- type SourceConfig
- type SourceTypes
- type SpanSinkConfig
- type SpanSinkTypes
- type SpanWorker
- type SsfMetricsSource
- type UdpMetricsSource
- type UnknownConfigKeys
- type Worker
- func (w *Worker) Flush() WorkerMetrics
- func (w *Worker) ImportMetric(other samplers.JSONMetric)
- func (w *Worker) ImportMetricGRPC(other *metricpb.Metric) (err error)
- func (w *Worker) IngestMetrics(ms []*metricpb.Metric)
- func (w *Worker) IngestUDP(metric samplers.UDPMetric)
- func (w *Worker) MetricsProcessedCount() int64
- func (w *Worker) ProcessMetric(m *samplers.UDPMetric)
- func (w *Worker) SampleTimeseries(m *samplers.UDPMetric)
- func (w *Worker) Stop()
- func (w *Worker) Work()
- type WorkerMetrics
Constants ¶
const ( CounterTypeName = "counter" GaugeTypeName = "gauge" HistogramTypeName = "histogram" SetTypeName = "set" TimerTypeName = "timer" StatusTypeName = "status" )
const SentryFlushTimeout = 10 * time.Second
SentryFlushTimeout is set to 10 seconds. If events are not flushed to Sentry within the time limit, they are dropped.
Variables ¶
var BUILD_DATE = defaultLinkValue
var VERSION = defaultLinkValue
VERSION stores the current veneur version. It must be a var so it can be set at link time.
Functions ¶
func CalculateTickDelay ¶
CalculateTickDelay takes the provided time, `Truncate`s it a rounded-down multiple of `interval`, then adds `interval` back to find the "next" tick.
func ConsumePanic ¶
ConsumePanic is intended to be called inside a deferred function when recovering from a panic. It accepts the value of recover() as its only argument, and reports the panic to Sentry, prints the stack, and then repanics (to ensure your program terminates)
func ForwardGrpcSingle ¶ added in v14.2.0
func ForwardGrpcSingle( ctx context.Context, client forwardrpc.ForwardClient, metrics []*metricpb.Metric, ) error
func ForwardGrpcStream ¶ added in v14.2.0
func ForwardGrpcStream( ctx context.Context, client forwardrpc.ForwardClient, metrics []*metricpb.Metric, ) error
Types ¶
type Config ¶
type Config struct { Aggregates []string `yaml:"aggregates"` AwsAccessKeyID util.StringSecret `yaml:"aws_access_key_id"` AwsRegion string `yaml:"aws_region"` AwsS3Bucket string `yaml:"aws_s3_bucket"` AwsSecretAccessKey util.StringSecret `yaml:"aws_secret_access_key"` BlockProfileRate int `yaml:"block_profile_rate"` CountUniqueTimeseries bool `yaml:"count_unique_timeseries"` DatadogAPIHostname string `yaml:"datadog_api_hostname"` DatadogAPIKey util.StringSecret `yaml:"datadog_api_key"` DatadogExcludeTagsPrefixByPrefixMetric []struct { MetricPrefix string `yaml:"metric_prefix"` Tags []string `yaml:"tags"` } `yaml:"datadog_exclude_tags_prefix_by_prefix_metric"` DatadogFlushMaxPerBody int `yaml:"datadog_flush_max_per_body"` DatadogMetricNamePrefixDrops []string `yaml:"datadog_metric_name_prefix_drops"` DatadogSpanBufferSize int `yaml:"datadog_span_buffer_size"` DatadogTraceAPIAddress string `yaml:"datadog_trace_api_address"` Debug bool `yaml:"debug"` DebugFlushedMetrics bool `yaml:"debug_flushed_metrics"` DebugIngestedSpans bool `yaml:"debug_ingested_spans"` EnableProfiling bool `yaml:"enable_profiling"` ExtendTags []string `yaml:"extend_tags"` FalconerAddress string `yaml:"falconer_address"` Features struct { EnableMetricSinkRouting bool `yaml:"enable_metric_sink_routing"` MigrateMetricSinks bool `yaml:"migrate_metric_sinks"` MigrateSpanSinks bool `yaml:"migrate_span_sinks"` ProxyProtocol string `yaml:"proxy_protocol"` } `yaml:"features"` FlushFile string `yaml:"flush_file"` FlushMaxPerBody int `yaml:"flush_max_per_body"` FlushOnShutdown bool `yaml:"flush_on_shutdown"` FlushWatchdogMissedFlushes int `yaml:"flush_watchdog_missed_flushes"` ForwardAddress string `yaml:"forward_address"` ForwardUseGrpc bool `yaml:"forward_use_grpc"` GrpcAddress string `yaml:"grpc_address"` GrpcListenAddresses []util.Url `yaml:"grpc_listen_addresses"` Hostname string `yaml:"hostname"` HTTP HttpConfig `yaml:"http"` HTTPAddress string `yaml:"http_address"` HTTPQuit bool `yaml:"http_quit"` IndicatorSpanTimerName string `yaml:"indicator_span_timer_name"` Interval time.Duration `yaml:"interval"` KafkaBroker string `yaml:"kafka_broker"` KafkaCheckTopic string `yaml:"kafka_check_topic"` KafkaEventTopic string `yaml:"kafka_event_topic"` KafkaMetricBufferBytes int `yaml:"kafka_metric_buffer_bytes"` KafkaMetricBufferFrequency time.Duration `yaml:"kafka_metric_buffer_frequency"` KafkaMetricBufferMessages int `yaml:"kafka_metric_buffer_messages"` KafkaMetricRequireAcks string `yaml:"kafka_metric_require_acks"` KafkaMetricTopic string `yaml:"kafka_metric_topic"` KafkaPartitioner string `yaml:"kafka_partitioner"` KafkaRetryMax int `yaml:"kafka_retry_max"` KafkaSpanBufferBytes int `yaml:"kafka_span_buffer_bytes"` KafkaSpanBufferFrequency time.Duration `yaml:"kafka_span_buffer_frequency"` KafkaSpanBufferMesages int `yaml:"kafka_span_buffer_mesages"` KafkaSpanRequireAcks string `yaml:"kafka_span_require_acks"` KafkaSpanSampleRatePercent float64 `yaml:"kafka_span_sample_rate_percent"` KafkaSpanSampleTag string `yaml:"kafka_span_sample_tag"` KafkaSpanSerializationFormat string `yaml:"kafka_span_serialization_format"` KafkaSpanTopic string `yaml:"kafka_span_topic"` LightstepAccessToken util.StringSecret `yaml:"lightstep_access_token"` LightstepCollectorHost util.Url `yaml:"lightstep_collector_host"` LightstepMaximumSpans int `yaml:"lightstep_maximum_spans"` LightstepNumClients int `yaml:"lightstep_num_clients"` LightstepReconnectPeriod time.Duration `yaml:"lightstep_reconnect_period"` MetricMaxLength int `yaml:"metric_max_length"` MetricSinkRouting []SinkRoutingConfig `yaml:"metric_sink_routing"` MetricSinks []SinkConfig `yaml:"metric_sinks"` MutexProfileFraction int `yaml:"mutex_profile_fraction"` NewrelicAccountID int `yaml:"newrelic_account_id"` NewrelicCommonTags []string `yaml:"newrelic_common_tags"` NewrelicEventType string `yaml:"newrelic_event_type"` NewrelicInsertKey util.StringSecret `yaml:"newrelic_insert_key"` NewrelicRegion string `yaml:"newrelic_region"` NewrelicServiceCheckEventType string `yaml:"newrelic_service_check_event_type"` NewrelicTraceObserverURL string `yaml:"newrelic_trace_observer_url"` NumReaders int `yaml:"num_readers"` NumSpanWorkers int `yaml:"num_span_workers"` NumWorkers int `yaml:"num_workers"` ObjectiveSpanTimerName string `yaml:"objective_span_timer_name"` OmitEmptyHostname bool `yaml:"omit_empty_hostname"` Percentiles []float64 `yaml:"percentiles"` PrometheusNetworkType string `yaml:"prometheus_network_type"` PrometheusRepeaterAddress string `yaml:"prometheus_repeater_address"` ReadBufferSizeBytes int `yaml:"read_buffer_size_bytes"` SentryDsn util.StringSecret `yaml:"sentry_dsn"` SignalfxAPIKey util.StringSecret `yaml:"signalfx_api_key"` SignalfxDynamicPerTagAPIKeysEnable bool `yaml:"signalfx_dynamic_per_tag_api_keys_enable"` SignalfxDynamicPerTagAPIKeysRefreshPeriod time.Duration `yaml:"signalfx_dynamic_per_tag_api_keys_refresh_period"` SignalfxEndpointAPI string `yaml:"signalfx_endpoint_api"` SignalfxEndpointBase string `yaml:"signalfx_endpoint_base"` SignalfxFlushMaxPerBody int `yaml:"signalfx_flush_max_per_body"` SignalfxHostnameTag string `yaml:"signalfx_hostname_tag"` SignalfxMetricNamePrefixDrops []string `yaml:"signalfx_metric_name_prefix_drops"` SignalfxMetricTagPrefixDrops []string `yaml:"signalfx_metric_tag_prefix_drops"` SignalfxPerTagAPIKeys []struct { APIKey util.StringSecret `yaml:"api_key"` Name string `yaml:"name"` } `yaml:"signalfx_per_tag_api_keys"` SignalfxVaryKeyBy string `yaml:"signalfx_vary_key_by"` SignalfxVaryKeyByFavorCommonDimensions bool `yaml:"signalfx_vary_key_by_favor_common_dimensions"` Sources []SourceConfig `yaml:"sources"` SpanChannelCapacity int `yaml:"span_channel_capacity"` SpanSinks []SinkConfig `yaml:"span_sinks"` SplunkHecAddress string `yaml:"splunk_hec_address"` SplunkHecBatchSize int `yaml:"splunk_hec_batch_size"` SplunkHecConnectionLifetimeJitter time.Duration `yaml:"splunk_hec_connection_lifetime_jitter"` SplunkHecIngestTimeout time.Duration `yaml:"splunk_hec_ingest_timeout"` SplunkHecMaxConnectionLifetime time.Duration `yaml:"splunk_hec_max_connection_lifetime"` SplunkHecSendTimeout time.Duration `yaml:"splunk_hec_send_timeout"` SplunkHecSubmissionWorkers int `yaml:"splunk_hec_submission_workers"` SplunkHecTLSValidateHostname string `yaml:"splunk_hec_tls_validate_hostname"` SplunkHecToken string `yaml:"splunk_hec_token"` SplunkSpanSampleRate int `yaml:"splunk_span_sample_rate"` SsfBufferSize int `yaml:"ssf_buffer_size"` SsfListenAddresses []util.Url `yaml:"ssf_listen_addresses"` StatsAddress string `yaml:"stats_address"` StatsdListenAddresses []util.Url `yaml:"statsd_listen_addresses"` SynchronizeWithInterval bool `yaml:"synchronize_with_interval"` Tags []string `yaml:"tags"` TagsExclude []string `yaml:"tags_exclude"` TLSAuthorityCertificate string `yaml:"tls_authority_certificate"` TLSCertificate string `yaml:"tls_certificate"` TLSKey util.StringSecret `yaml:"tls_key"` TraceLightstepAccessToken util.StringSecret `yaml:"trace_lightstep_access_token"` TraceLightstepCollectorHost util.Url `yaml:"trace_lightstep_collector_host"` TraceLightstepMaximumSpans int `yaml:"trace_lightstep_maximum_spans"` TraceLightstepNumClients int `yaml:"trace_lightstep_num_clients"` TraceLightstepReconnectPeriod time.Duration `yaml:"trace_lightstep_reconnect_period"` TraceMaxLengthBytes int `yaml:"trace_max_length_bytes"` VeneurMetricsAdditionalTags []string `yaml:"veneur_metrics_additional_tags"` VeneurMetricsScopes struct { Counter string `yaml:"counter"` Gauge string `yaml:"gauge"` Histogram string `yaml:"histogram"` Set string `yaml:"set"` Status string `yaml:"status"` } `yaml:"veneur_metrics_scopes"` XrayAddress string `yaml:"xray_address"` XrayAnnotationTags []string `yaml:"xray_annotation_tags"` XraySamplePercentage float64 `yaml:"xray_sample_percentage"` }
type DatadogTraceSpan ¶
type DatadogTraceSpan struct { Duration int64 `json:"duration"` Error int64 `json:"error"` Meta map[string]string `json:"meta"` Metrics map[string]float64 `json:"metrics"` Name string `json:"name"` ParentID int64 `json:"parent_id,omitempty"` Resource string `json:"resource,omitempty"` Service string `json:"service"` SpanID int64 `json:"span_id"` Start int64 `json:"start"` TraceID int64 `json:"trace_id"` Type string `json:"type"` }
DatadogTraceSpan represents a trace span as JSON for the Datadog tracing API.
type EventWorker ¶
type EventWorker struct {
// contains filtered or unexported fields
}
EventWorker is similar to a Worker but it collects events and service checks instead of metrics.
func NewEventWorker ¶
func NewEventWorker(cl *trace.Client, stats scopedstatsd.Client) *EventWorker
NewEventWorker creates an EventWorker ready to collect events and service checks.
func (*EventWorker) Flush ¶
func (ew *EventWorker) Flush() []ssf.SSFSample
Flush returns the EventWorker's stored events and service checks and resets the stored contents.
func (*EventWorker) Work ¶
func (ew *EventWorker) Work()
Work will start the EventWorker listening for events and service checks. This function will never return.
type GlobalListeningPerProtocolMetrics ¶ added in v14.2.0
type GlobalListeningPerProtocolMetrics struct {
// contains filtered or unexported fields
}
type GrpcMetricsSource ¶ added in v14.2.0
type GrpcMetricsSource struct {
// contains filtered or unexported fields
}
type HttpConfig ¶ added in v14.2.0
type HttpConfig struct { // Enables /config/json and /config/yaml endpoints for displaying the current // configuration. Entries of type util.StringSecret will be redacted unless // the -print-secrets flag is set. Config bool `yaml:"config"` }
type MetricSinkConfig ¶ added in v14.2.0
type MetricSinkConfig interface{}
type MetricSinkTypes ¶ added in v14.2.0
type MetricSinkTypes = map[string]struct { // Creates a new metric sink instance. Create func( *Server, string, *logrus.Entry, Config, MetricSinkConfig, ) (sinks.MetricSink, error) // Parses the config for the sink into a format that is validated and safe to // log. ParseConfig func(string, interface{}) (MetricSinkConfig, error) }
type ParsedSourceConfig ¶ added in v14.2.0
type ParsedSourceConfig interface{}
type ProtocolType ¶ added in v14.2.0
type ProtocolType int
const ( DOGSTATSD_TCP ProtocolType = iota DOGSTATSD_UDP DOGSTATSD_UNIX DOGSTATSD_GRPC SSF_UNIX SSF_UDP SSF_GRPC )
func (ProtocolType) String ¶ added in v14.2.0
func (p ProtocolType) String() string
type Proxy ¶
type Proxy struct { Hostname string ForwardDestinations *consistent.Consistent TraceDestinations *consistent.Consistent ForwardGRPCDestinations *consistent.Consistent Discoverer discovery.Discoverer ConsulForwardService string ConsulTraceService string ConsulForwardGRPCService string ConsulInterval time.Duration MetricsInterval time.Duration ForwardDestinationsMtx sync.Mutex TraceDestinationsMtx sync.Mutex ForwardGRPCDestinationsMtx sync.Mutex HTTPAddr string HTTPClient *http.Client AcceptingForwards bool AcceptingTraces bool AcceptingGRPCForwards bool ForwardTimeout time.Duration TraceClient *trace.Client // contains filtered or unexported fields }
func NewProxyFromConfig ¶
func NewProxyFromConfig( logger *logrus.Logger, conf ProxyConfig, ) (*Proxy, error)
func (*Proxy) HTTPServe ¶
func (p *Proxy) HTTPServe()
HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.
func (*Proxy) IsListeningHTTP ¶ added in v14.2.0
IsListeningHTTP returns if the Proxy is currently listening over HTTP
func (*Proxy) ProxyMetrics ¶
ProxyMetrics takes a slice of JSONMetrics and breaks them up into multiple HTTP requests by MetricKey using the hash ring.
func (*Proxy) ProxyTraces ¶
func (p *Proxy) ProxyTraces(ctx context.Context, traces []DatadogTraceSpan)
func (*Proxy) RefreshDestinations ¶
func (p *Proxy) RefreshDestinations(serviceName string, ring *consistent.Consistent, mtx *sync.Mutex)
RefreshDestinations updates the server's list of valid destinations for flushing. This should be called periodically to ensure we have the latest data.
func (*Proxy) ReportRuntimeMetrics ¶
func (p *Proxy) ReportRuntimeMetrics()
func (*Proxy) Serve ¶
func (p *Proxy) Serve()
Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.
type ProxyConfig ¶
type ProxyConfig struct { ConsulForwardGrpcServiceName string `yaml:"consul_forward_grpc_service_name"` ConsulForwardServiceName string `yaml:"consul_forward_service_name"` ConsulRefreshInterval string `yaml:"consul_refresh_interval"` ConsulTraceServiceName string `yaml:"consul_trace_service_name"` Debug bool `yaml:"debug"` EnableProfiling bool `yaml:"enable_profiling"` ForwardAddress string `yaml:"forward_address"` ForwardTimeout string `yaml:"forward_timeout"` GrpcAddress string `yaml:"grpc_address"` GrpcForwardAddress string `yaml:"grpc_forward_address"` HTTPAddress string `yaml:"http_address"` IdleConnectionTimeout string `yaml:"idle_connection_timeout"` IgnoreTags []matcher.TagMatcher `yaml:"ignore_tags"` MaxIdleConns int `yaml:"max_idle_conns"` MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"` RuntimeMetricsInterval string `yaml:"runtime_metrics_interval"` SentryDsn string `yaml:"sentry_dsn"` SsfDestinationAddress util.Url `yaml:"ssf_destination_address"` StatsAddress string `yaml:"stats_address"` TraceAddress string `yaml:"trace_address"` TraceAPIAddress string `yaml:"trace_api_address"` TracingClientCapacity int `yaml:"tracing_client_capacity"` TracingClientFlushInterval string `yaml:"tracing_client_flush_interval"` TracingClientMetricsInterval string `yaml:"tracing_client_metrics_interval"` }
func ReadProxyConfig ¶
func ReadProxyConfig( logger *logrus.Entry, path string, ) (c ProxyConfig, err error)
ReadProxyConfig unmarshals the proxy config file and slurps in its data.
type ProxyProtocol ¶ added in v14.2.0
type ProxyProtocol int64
const ( ProxyProtocolUnknown ProxyProtocol = iota ProxyProtocolRest ProxyProtocolGrpcSingle ProxyProtocolGrpcStream )
type SentryHook ¶ added in v14.2.0
logrus hook to send error/fatal/panic messages to sentry
func (SentryHook) Levels ¶ added in v14.2.0
func (s SentryHook) Levels() []logrus.Level
type Server ¶
type Server struct { Config Config Workers []*Worker EventWorker *EventWorker SpanChan chan *ssf.SSFSpan SpanWorker *SpanWorker SpanWorkerGoroutines int CountUniqueTimeseries bool Statsd *scopedstatsd.ScopedClient Hostname string Tags []string TagsAsMap map[string]string HTTPClient *http.Client HTTPAddr string ForwardAddr string StatsdListenAddrs []net.Addr SSFListenAddrs []net.Addr GRPCListenAddrs []net.Addr RcvbufBytes int FlushOnShutdown bool Interval time.Duration HistogramPercentiles []float64 HistogramAggregates samplers.HistogramAggregates TraceClient *trace.Client // contains filtered or unexported fields }
A Server is the actual veneur instance that will be run.
func NewFromConfig ¶
func NewFromConfig(config ServerConfig) (*Server, error)
NewFromConfig creates a new veneur server from a configuration specification and sets up the passed logger according to the configuration.
func (*Server) FlushWatchdog ¶
func (s *Server) FlushWatchdog()
FlushWatchdog periodically checks that at most `flush_watchdog_missed_flushes` were skipped in a Server. If more than that number was skipped, it panics (assuming that flushing is stuck) with a full level of detail on that panic's backtraces.
It never terminates, so is ideally run from a goroutine in a program's main function.
func (*Server) HTTPServe ¶
func (s *Server) HTTPServe()
HTTPServe starts the HTTP server and listens perpetually until it encounters an unrecoverable error.
func (*Server) HandleMetricPacket ¶
func (s *Server) HandleMetricPacket(packet []byte, protocolType ProtocolType) error
HandleMetricPacket processes each packet that is sent to the server, and sends to an appropriate worker (EventWorker or Worker).
func (*Server) HandleTracePacket ¶
func (s *Server) HandleTracePacket(packet []byte, protocolType ProtocolType)
HandleTracePacket accepts an incoming packet as bytes and sends it to the appropriate worker.
func (*Server) ImportMetrics ¶
func (s *Server) ImportMetrics(ctx context.Context, jsonMetrics []samplers.JSONMetric)
ImportMetrics feeds a slice of json metrics to the server's workers
func (*Server) IsListeningHTTP ¶ added in v14.2.0
IsListeningHTTP returns if the Server is currently listening over HTTP
func (*Server) IsLocal ¶
IsLocal indicates whether veneur is running as a local instance (forwarding non-local data to a global veneur instance) or is running as a global instance (sending all data directly to the final destination).
func (*Server) ReadMetricSocket ¶
func (s *Server) ReadMetricSocket(serverConn net.PacketConn, packetPool *sync.Pool)
ReadMetricSocket listens for available packets to handle.
func (*Server) ReadSSFPacketSocket ¶
func (s *Server) ReadSSFPacketSocket(serverConn net.PacketConn, packetPool *sync.Pool)
ReadSSFPacketSocket reads SSF packets off a packet connection.
func (*Server) ReadSSFStreamSocket ¶
ReadSSFStreamSocket reads a streaming connection in framed wire format off a streaming socket. See package github.com/stripe/veneur/v14/protocol for details.
func (*Server) ReadStatsdDatagramSocket ¶
ReadStatsdDatagramSocket reads statsd metrics packets from connection off a unix datagram socket.
func (*Server) ReadTCPSocket ¶
ReadTCPSocket listens on Server.TCPAddr for new connections, starting a goroutine for each.
func (*Server) Serve ¶
func (s *Server) Serve()
Start all of the the configured servers (gRPC or HTTP) and block until one of them exist. At that point, stop them both.
type ServerConfig ¶ added in v14.2.0
type ServerConfig struct { Config Config Logger *logrus.Logger MetricSinkTypes MetricSinkTypes SourceTypes SourceTypes SpanSinkTypes SpanSinkTypes }
Config used to create a new server.
type SinkConfig ¶ added in v14.2.0
type SinkConfig struct { Kind string `yaml:"kind"` Name string `yaml:"name"` Config interface{} `yaml:"config"` StripTags []matcher.TagMatcher `yaml:"strip_tags"` }
type SinkRoutingConfig ¶ added in v14.2.0
type SinkRoutingConfig struct { Name string `yaml:"name"` Match []matcher.Matcher `yaml:"match"` Sinks SinkRoutingSinks `yaml:"sinks"` }
type SinkRoutingSinks ¶ added in v14.2.0
type SourceConfig ¶ added in v14.2.0
type SourceTypes ¶ added in v14.2.0
type SourceTypes = map[string]struct { // Creates a new source intsance. Create func( *Server, string, *logrus.Entry, ParsedSourceConfig, ) (sources.Source, error) // Parses the config for the source into a format that is validated and safe // to log. ParseConfig func(string, interface{}) (ParsedSourceConfig, error) }
type SpanSinkConfig ¶ added in v14.2.0
type SpanSinkConfig interface{}
type SpanSinkTypes ¶ added in v14.2.0
type SpanSinkTypes = map[string]struct { // Creates a new span sink intsance. Create func( *Server, string, *logrus.Entry, Config, SpanSinkConfig, ) (sinks.SpanSink, error) // Parses the config for the sink into a format that is validated and safe to // log. ParseConfig func(string, interface{}) (SpanSinkConfig, error) }
type SpanWorker ¶
SpanWorker is similar to a Worker but it collects events and service checks instead of metrics.
func NewSpanWorker ¶
func NewSpanWorker( sinks []sinks.SpanSink, cl *trace.Client, statsd scopedstatsd.Client, spanChan <-chan *ssf.SSFSpan, commonTags map[string]string, logger *logrus.Entry, ) *SpanWorker
NewSpanWorker creates a SpanWorker ready to collect events and service checks.
func (*SpanWorker) Work ¶
func (tw *SpanWorker) Work()
Work will start the SpanWorker listening for spans. This function will never return.
type SsfMetricsSource ¶ added in v14.2.0
type SsfMetricsSource struct {
// contains filtered or unexported fields
}
type UdpMetricsSource ¶ added in v14.2.0
type UdpMetricsSource struct {
// contains filtered or unexported fields
}
func (*UdpMetricsSource) StartStatsd ¶ added in v14.2.0
func (source *UdpMetricsSource) StartStatsd( s *Server, a net.Addr, packetPool *sync.Pool, ) net.Addr
StartStatsd spawns a goroutine that listens for metrics in statsd format on the address a, and returns the concrete listening address. As this is a setup routine, if any error occurs, it panics.
type UnknownConfigKeys ¶
type UnknownConfigKeys struct {
// contains filtered or unexported fields
}
UnknownConfigKeys represents a failure to strictly parse a configuration YAML file has failed, indicating that the file contains unknown keys.
func (*UnknownConfigKeys) Error ¶
func (e *UnknownConfigKeys) Error() string
type Worker ¶
type Worker struct { PacketChan chan samplers.UDPMetric ImportChan chan []samplers.JSONMetric ImportMetricChan chan []*metricpb.Metric QuitChan chan struct{} // contains filtered or unexported fields }
Worker is the doodad that does work.
func NewWorker ¶
func NewWorker(id int, isLocal bool, countUniqueTimeseries bool, cl *trace.Client, logger *logrus.Logger, stats scopedstatsd.Client) *Worker
NewWorker creates, and returns a new Worker object.
func (*Worker) Flush ¶
func (w *Worker) Flush() WorkerMetrics
Flush resets the worker's internal metrics and returns their contents.
func (*Worker) ImportMetric ¶
func (w *Worker) ImportMetric(other samplers.JSONMetric)
ImportMetric receives a metric from another veneur instance
func (*Worker) ImportMetricGRPC ¶
ImportMetricGRPC receives a metric from another veneur instance over gRPC.
In practice, this is only called when in the aggregation tier, so we don't handle LocalOnly scope.
func (*Worker) IngestMetrics ¶
func (*Worker) MetricsProcessedCount ¶
MetricsProcessedCount is a convenince method for testing that allows us to fetch the Worker's processed count in a non-racey way.
func (*Worker) ProcessMetric ¶
ProcessMetric takes a Metric and samples it
func (*Worker) SampleTimeseries ¶
SampleTimeseries takes a metric and counts whether the timeseries has already been seen by the worker in this flush interval.
type WorkerMetrics ¶
type WorkerMetrics struct {
// contains filtered or unexported fields
}
WorkerMetrics is just a plain struct bundling together the flushed contents of a worker
func NewWorkerMetrics ¶
func NewWorkerMetrics() WorkerMetrics
NewWorkerMetrics initializes a WorkerMetrics struct
func (WorkerMetrics) ForwardableMetrics ¶
func (wm WorkerMetrics) ForwardableMetrics( cl *trace.Client, logger *logrus.Entry, ) []*metricpb.Metric
ForwardableMetrics converts all metrics that should be forwarded to metricpb.Metric (protobuf-compatible).
func (WorkerMetrics) Upsert ¶
func (wm WorkerMetrics) Upsert(mk samplers.MetricKey, Scope samplers.MetricScope, tags []string) bool
Upsert creates an entry on the WorkerMetrics struct for the given metrickey (if one does not already exist) and updates the existing entry (if one already exists). Returns true if the metric entry was created and false otherwise.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
cmd
|
|
internal
|
|
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium.
|
Package protocol contains routines for implementing veneur's SSF wire protocol to read and write framed SSF samples on a streaming network link or other non-seekable medium. |
Package proxysrv proxies metrics over gRPC to global veneurs using consistent hashing The Server provided accepts a hash ring of destinations, and then listens for metrics over gRPC.
|
Package proxysrv proxies metrics over gRPC to global veneurs using consistent hashing The Server provided accepts a hash ring of destinations, and then listens for metrics over gRPC. |
mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
ssfmetrics
Package ssfmetrics provides sinks that are used by veneur internally.
|
Package ssfmetrics provides sinks that are used by veneur internally. |
mock
Package mock is a generated GoMock package.
|
Package mock is a generated GoMock package. |
proxy
Package importsrv receives metrics over gRPC and sends them to workers.
|
Package importsrv receives metrics over gRPC and sends them to workers. |
Package ssf provides an implementation of the Sensor Sensibility Format.
|
Package ssf provides an implementation of the Sensor Sensibility Format. |
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications.
|
Package tdigest provides an implementation of Ted Dunning's t-digest, an approximate histogram for online, distributed applications. |
Package trace provies an API for initiating and reporting SSF traces and attaching spans to them.
|
Package trace provies an API for initiating and reporting SSF traces and attaching spans to them. |
metrics
Package metrics provides routines for conveniently reporting metrics attached to SSF spans.
|
Package metrics provides routines for conveniently reporting metrics attached to SSF spans. |
testbackend
Package testbackend contains helpers to make it easier to test the tracing behavior of code using veneur's trace API.
|
Package testbackend contains helpers to make it easier to test the tracing behavior of code using veneur's trace API. |