Documentation ¶
Overview ¶
Package statsd implements functionality for creating servers compatible with the statsd protocol. See https://github.com/etsy/statsd/blob/master/docs/metric_types.md for a description of the protocol.
The main components of the library are Receiver, Dispatcher, Aggregator and Flusher. Receiver is responsible for receiving metrics from the socket. Dispatcher dispatches received metrics among several Aggregators, which do aggregation based on type of the metric. At every FlushInterval Flusher flushes metrics via associated Backend objects.
Currently the library implements just a few types of Backend, one compatible with Graphite (http://graphite.wikidot.org), one for Datadog and one just for stdout, but any object implementing the Backend interface can be used with the library. See available backends at https://github.com/atlassian/gostatsd/tree/master/backend/backends.
As with the original etsy statsd, multiple backends can be used simultaneously.
Index ¶
- Constants
- Variables
- func AddFlags(fs *pflag.FlagSet)
- type AggregateProcesser
- type Aggregator
- type AggregatorFactory
- type AggregatorFactoryFunc
- type BackendHandler
- func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) error
- func (bh *BackendHandler) DispatchMetric(ctx context.Context, m *gostatsd.Metric) error
- func (bh *BackendHandler) EstimatedTags() int
- func (bh *BackendHandler) Process(ctx context.Context, f DispatcherProcessFunc) gostatsd.Wait
- func (bh *BackendHandler) Run(ctx context.Context)
- func (bh *BackendHandler) RunMetrics(ctx context.Context, statser stats.Statser)
- func (bh *BackendHandler) WaitForEvents()
- type BatchReader
- type CacheOptions
- type CloudHandler
- func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) error
- func (ch *CloudHandler) DispatchMetric(ctx context.Context, m *gostatsd.Metric) error
- func (ch *CloudHandler) EstimatedTags() int
- func (ch *CloudHandler) Run(ctx context.Context)
- func (ch *CloudHandler) RunMetrics(ctx context.Context, statser stats.Statser)
- func (ch *CloudHandler) WaitForEvents()
- type Datagram
- type DatagramParser
- type DatagramReceiver
- type DispatcherProcessFunc
- type EventHandler
- type GenericBatchReader
- type Message
- type MetricAggregator
- func (a *MetricAggregator) Flush(flushInterval time.Duration)
- func (a *MetricAggregator) Process(f ProcessFunc)
- func (a *MetricAggregator) Receive(m *gostatsd.Metric, now time.Time)
- func (a *MetricAggregator) Reset()
- func (a *MetricAggregator) RunMetrics(ctx context.Context, statser statser.Statser)
- type MetricEmitter
- type MetricFlusher
- type MetricHandler
- type ProcessFunc
- type Server
- type SocketFactory
- type TagHandler
- type V6BatchReader
Constants ¶
const ( // StatserInternal is the name used to indicate the use of the internal statser. StatserInternal = "internal" // StatserLogging is the name used to indicate the use of the logging statser. StatserLogging = "logging" // StatserNull is the name used to indicate the use of the null statser. StatserNull = "null" // StatserTagged is the name used to indicate the use of the tagged statser. StatserTagged = "tagged" )
const ( // DefaultMaxCloudRequests is the maximum number of cloud provider requests per second. DefaultMaxCloudRequests = 10 // DefaultBurstCloudRequests is the burst number of cloud provider requests per second. DefaultBurstCloudRequests = DefaultMaxCloudRequests + 5 // DefaultExpiryInterval is the default expiry interval for metrics. DefaultExpiryInterval = 5 * time.Minute // DefaultFlushInterval is the default metrics flush interval. DefaultFlushInterval = 1 * time.Second // DefaultIgnoreHost is the default value for whether the source should be used as the host DefaultIgnoreHost = false // DefaultMetricsAddr is the default address on which to listen for metrics. DefaultMetricsAddr = ":8125" // DefaultMaxQueueSize is the default maximum number of buffered metrics per worker. DefaultMaxQueueSize = 10000 // arbitrary // DefaultMaxConcurrentEvents is the default maximum number of events sent concurrently. DefaultMaxConcurrentEvents = 1024 // arbitrary // DefaultCacheRefreshPeriod is the default cache refresh period. DefaultCacheRefreshPeriod = 1 * time.Minute // DefaultCacheEvictAfterIdlePeriod is the default idle cache eviction period. DefaultCacheEvictAfterIdlePeriod = 10 * time.Minute // DefaultCacheTTL is the default cache TTL for successful lookups. DefaultCacheTTL = 30 * time.Minute // DefaultCacheNegativeTTL is the default cache TTL for failed lookups (errors or when instance was not found). DefaultCacheNegativeTTL = 1 * time.Minute // DefaultInternalNamespace is the default internal namespace DefaultInternalNamespace = "statsd" // DefaultHeartbeatEnabled is the default heartbeat enabled flag DefaultHeartbeatEnabled = false // DefaultReceiveBatchSize is the number of datagrams to read in each receive batch DefaultReceiveBatchSize = 50 // DefaultEstimatedTags is the estimated number of expected tags on an individual metric submitted externally DefaultEstimatedTags = 4 // DefaultConnPerReader is the default for whether to create a connection per reader DefaultConnPerReader = false // DefaultStatserType is the default statser type DefaultStatserType = StatserInternal // DefaultBadLinesPerMinute is the default number of bad lines to allow to log per minute DefaultBadLinesPerMinute = 0 )
const ( // ParamBackends is the name of parameter with backends. ParamBackends = "backends" // ParamCloudProvider is the name of parameter with the name of cloud provider. ParamCloudProvider = "cloud-provider" // ParamMaxCloudRequests is the name of parameter with maximum number of cloud provider requests per second. ParamMaxCloudRequests = "max-cloud-requests" // ParamBurstCloudRequests is the name of parameter with burst number of cloud provider requests per second. ParamBurstCloudRequests = "burst-cloud-requests" // ParamDefaultTags is the name of parameter with the list of additional tags. ParamDefaultTags = "default-tags" // ParamInternalTags is the name of parameter with the list of tags for internal metrics. ParamInternalTags = "internal-tags" // ParamInternalNamespace is the name of parameter with the namespace for internal metrics. ParamInternalNamespace = "internal-namespace" // ParamExpiryInterval is the name of parameter with expiry interval for metrics. ParamExpiryInterval = "expiry-interval" // ParamFlushInterval is the name of parameter with metrics flush interval. ParamFlushInterval = "flush-interval" // ParamIgnoreHost is the name of parameter indicating if the source should be used as the host ParamIgnoreHost = "ignore-host" // ParamMaxReaders is the name of parameter with number of socket readers. ParamMaxReaders = "max-readers" // ParamMaxParsers is the name of the parameter with the number of goroutines that parse datagrams into metrics. ParamMaxParsers = "max-parsers" // ParamMaxWorkers is the name of parameter with number of goroutines that aggregate metrics. ParamMaxWorkers = "max-workers" // ParamMaxQueueSize is the name of parameter with maximum number of buffered metrics per worker. ParamMaxQueueSize = "max-queue-size" // ParamMaxConcurrentEvents is the name of parameter with maximum number of events sent concurrently. ParamMaxConcurrentEvents = "max-concurrent-events" // ParamEstimatedTags is the name of parameter with estimated number of tags per metric ParamEstimatedTags = "estimated-tags" // ParamCacheRefreshPeriod is the name of parameter with cache refresh period. ParamCacheRefreshPeriod = "cloud-cache-refresh-period" // ParamCacheEvictAfterIdlePeriod is the name of parameter with idle cache eviction period. ParamCacheEvictAfterIdlePeriod = "cloud-cache-evict-after-idle-period" // ParamCacheTTL is the name of parameter with cache TTL for successful lookups. ParamCacheTTL = "cloud-cache-ttl" // ParamCacheNegativeTTL is the name of parameter with cache TTL for failed lookups (errors or when instance was not found). ParamCacheNegativeTTL = "cloud-cache-negative-ttl" // ParamMetricsAddr is the name of parameter with address on which to listen for metrics. ParamMetricsAddr = "metrics-addr" // ParamNamespace is the name of parameter with namespace for all metrics. ParamNamespace = "namespace" // ParamStatserType is the name of parameter with type of statser. ParamStatserType = "statser-type" // ParamPercentThreshold is the name of parameter with list of applied percentiles. ParamPercentThreshold = "percent-threshold" // ParamHeartbeatEnabled is the name of the parameter with the heartbeat enabled ParamHeartbeatEnabled = "heartbeat-enabled" // ParamReceiveBatchSize is the name of the parameter with the number of datagrams to read in each receive batch ParamReceiveBatchSize = "receive-batch-size" // ParamConnPerReader is the name of the parameter indicating whether to create a connection per reader ParamConnPerReader = "conn-per-reader" // ParamBadLineRateLimitPerMinute is the name of the parameter indicating how many bad lines can be logged per minute ParamBadLinesPerMinute = "bad-lines-per-minute" )
Variables ¶
var DefaultBackends = []string{"graphite"}
DefaultBackends is the list of default backends' names.
var DefaultInternalTags = gostatsd.Tags{}
DefaultInternalTags is the default list of additional tags on internal metrics
var DefaultMaxParsers = runtime.NumCPU()
DefaultMaxParsers is the default number of goroutines that parse datagrams into metrics.
var DefaultMaxReaders = minInt(8, runtime.NumCPU())
DefaultMaxReaders is the default number of socket reading goroutines.
var DefaultMaxWorkers = runtime.NumCPU()
DefaultMaxWorkers is the default number of goroutines that aggregate metrics.
var DefaultPercentThreshold = []float64{90}
DefaultPercentThreshold is the default list of applied percentiles.
var DefaultTags = gostatsd.Tags{}
DefaultTags is the default list of additional tags.
Functions ¶
Types ¶
type AggregateProcesser ¶
type AggregateProcesser interface {
Process(ctx context.Context, fn DispatcherProcessFunc) gostatsd.Wait
}
AggregateProcesser is an interface to run a function against each Aggregator, in the goroutine context of that Aggregator.
type Aggregator ¶
type Aggregator interface { Receive(*gostatsd.Metric, time.Time) Flush(interval time.Duration) Process(ProcessFunc) Reset() }
Aggregator is an object that aggregates statsd metrics. The function NewAggregator should be used to create the objects.
Incoming metrics should be passed via Receive function.
type AggregatorFactory ¶
type AggregatorFactory interface { // Create creates Aggregator objects. Create() Aggregator }
AggregatorFactory creates Aggregator objects.
type AggregatorFactoryFunc ¶
type AggregatorFactoryFunc func() Aggregator
AggregatorFactoryFunc type is an adapter to allow the use of ordinary functions as AggregatorFactory.
func (AggregatorFactoryFunc) Create ¶
func (f AggregatorFactoryFunc) Create() Aggregator
Create calls f().
type BackendHandler ¶
type BackendHandler struct {
// contains filtered or unexported fields
}
BackendEventHandler dispatches metrics and events to all configured backends (via Aggregators)
func NewBackendHandler ¶
func NewBackendHandler(backends []gostatsd.Backend, maxConcurrentEvents uint, numWorkers int, perWorkerBufferSize int, af AggregatorFactory) *BackendHandler
NewBackendHandler initialises a new Handler which sends metrics and events to all backends
func (*BackendHandler) DispatchEvent ¶
func (*BackendHandler) DispatchMetric ¶
DispatchMetric dispatches metric to a corresponding Aggregator.
func (*BackendHandler) EstimatedTags ¶
func (bh *BackendHandler) EstimatedTags() int
EstimatedTags returns a guess for how many tags to pre-allocate
func (*BackendHandler) Process ¶
func (bh *BackendHandler) Process(ctx context.Context, f DispatcherProcessFunc) gostatsd.Wait
Process concurrently executes provided function in goroutines that own Aggregators. DispatcherProcessFunc function may be executed zero or up to numWorkers times. It is executed less than numWorkers times if the context signals "done".
func (*BackendHandler) Run ¶
func (bh *BackendHandler) Run(ctx context.Context)
Run runs the BackendHandler workers until the Context is closed.
func (*BackendHandler) RunMetrics ¶
func (bh *BackendHandler) RunMetrics(ctx context.Context, statser stats.Statser)
RunMetrics attaches a Statser to the BackendHandler. Stops when the context is closed.
func (*BackendHandler) WaitForEvents ¶
func (bh *BackendHandler) WaitForEvents()
WaitForEvents waits for all event-dispatching goroutines to finish.
type BatchReader ¶
func NewBatchReader ¶
func NewBatchReader(conn net.PacketConn) BatchReader
type CacheOptions ¶
type CacheOptions struct { CacheRefreshPeriod time.Duration CacheEvictAfterIdlePeriod time.Duration CacheTTL time.Duration CacheNegativeTTL time.Duration }
CacheOptions holds cache behaviour configuration.
type CloudHandler ¶
type CloudHandler struct {
// contains filtered or unexported fields
}
CloudHandler enriches metrics and events with additional information fetched from cloud provider.
func NewCloudHandler ¶
func NewCloudHandler(cloud gostatsd.CloudProvider, metrics MetricHandler, events EventHandler, limiter *rate.Limiter, cacheOptions *CacheOptions) *CloudHandler
NewCloudHandler initialises a new cloud handler.
func (*CloudHandler) DispatchEvent ¶
func (*CloudHandler) DispatchMetric ¶
func (*CloudHandler) EstimatedTags ¶
func (ch *CloudHandler) EstimatedTags() int
EstimatedTags returns a guess for how many tags to pre-allocate
func (*CloudHandler) Run ¶
func (ch *CloudHandler) Run(ctx context.Context)
func (*CloudHandler) RunMetrics ¶
func (ch *CloudHandler) RunMetrics(ctx context.Context, statser stats.Statser)
func (*CloudHandler) WaitForEvents ¶
func (ch *CloudHandler) WaitForEvents()
WaitForEvents waits for all event-dispatching goroutines to finish.
type Datagram ¶
type Datagram struct { IP gostatsd.IP Msg []byte DoneFunc func() // to be called once the datagram has been parsed and msg can be freed }
Datagram is a received UDP datagram that has not been parsed into Metric/Event(s)
type DatagramParser ¶
type DatagramParser struct {
// contains filtered or unexported fields
}
DatagramParser receives datagrams and parses them into Metrics/Events For each Metric/Event it calls Handler.HandleMetric/Event()
func NewDatagramParser ¶
func NewDatagramParser(in <-chan []*Datagram, ns string, ignoreHost bool, estimatedTags int, metrics MetricHandler, events EventHandler, statser statser.Statser, badLineLimiter *rate.Limiter) *DatagramParser
NewDatagramParser initialises a new DatagramParser.
func (*DatagramParser) Run ¶
func (dp *DatagramParser) Run(ctx context.Context)
func (*DatagramParser) RunMetrics ¶
func (dp *DatagramParser) RunMetrics(ctx context.Context)
type DatagramReceiver ¶
type DatagramReceiver struct {
// contains filtered or unexported fields
}
DatagramReceiver receives datagrams on its PacketConn and passes them off to be parsed
func NewDatagramReceiver ¶
func NewDatagramReceiver(out chan<- []*Datagram, receiveBatchSize int) *DatagramReceiver
NewDatagramReceiver initialises a new DatagramReceiver.
func (*DatagramReceiver) Receive ¶
func (dr *DatagramReceiver) Receive(ctx context.Context, c net.PacketConn)
Receive accepts incoming datagrams on c, and passes them off to be parsed
func (*DatagramReceiver) RunMetrics ¶
func (dr *DatagramReceiver) RunMetrics(ctx context.Context, statser stats.Statser)
type DispatcherProcessFunc ¶
type DispatcherProcessFunc func(int, Aggregator)
DispatcherProcessFunc is a function that gets executed by Dispatcher for each Aggregator, passing it into the function.
type EventHandler ¶
type EventHandler interface { // DispatchEvent dispatches event to the next step in a pipeline. DispatchEvent(ctx context.Context, e *gostatsd.Event) error // WaitForEvents waits for all event-dispatching goroutines to finish. WaitForEvents() }
EventHandler can be used to handle events
type GenericBatchReader ¶
type GenericBatchReader struct {
// contains filtered or unexported fields
}
type MetricAggregator ¶
MetricAggregator aggregates metrics.
func NewMetricAggregator ¶
func NewMetricAggregator(percentThresholds []float64, expiryInterval time.Duration, disabled gostatsd.TimerSubtypes) *MetricAggregator
NewMetricAggregator creates a new MetricAggregator object.
func (*MetricAggregator) Flush ¶
func (a *MetricAggregator) Flush(flushInterval time.Duration)
Flush prepares the contents of a MetricAggregator for sending via the Sender.
func (*MetricAggregator) Process ¶
func (a *MetricAggregator) Process(f ProcessFunc)
func (*MetricAggregator) Receive ¶
func (a *MetricAggregator) Receive(m *gostatsd.Metric, now time.Time)
Receive aggregates an incoming metric.
func (*MetricAggregator) Reset ¶
func (a *MetricAggregator) Reset()
Reset clears the contents of a MetricAggregator.
func (*MetricAggregator) RunMetrics ¶
func (a *MetricAggregator) RunMetrics(ctx context.Context, statser statser.Statser)
type MetricEmitter ¶
MetricEmitter is an object that emits metrics. Used to pass a Statser to the object after initialization, as Statsers may be created after MetricEmitters
type MetricFlusher ¶
type MetricFlusher struct {
// contains filtered or unexported fields
}
MetricFlusher periodically flushes metrics from all Aggregators to Senders.
func NewMetricFlusher ¶
func NewMetricFlusher(flushInterval time.Duration, aggregateProcesser AggregateProcesser, backends []gostatsd.Backend, hostname string, statser statser.Statser) *MetricFlusher
NewMetricFlusher creates a new MetricFlusher with provided configuration.
func (*MetricFlusher) Run ¶
func (f *MetricFlusher) Run(ctx context.Context)
Run runs the MetricFlusher.
type MetricHandler ¶
type MetricHandler interface { // EstimatedTags returns a guess for how many tags to pre-allocate EstimatedTags() int // DispatchMetric dispatches a metric to the next step in a pipeline. DispatchMetric(ctx context.Context, m *gostatsd.Metric) error }
MetricHandler can be used to handle metrics
type ProcessFunc ¶
ProcessFunc is a function that gets executed by Aggregator with its state passed into the function.
type Server ¶
type Server struct { Backends []gostatsd.Backend CloudProvider gostatsd.CloudProvider Limiter *rate.Limiter InternalTags gostatsd.Tags InternalNamespace string DefaultTags gostatsd.Tags ExpiryInterval time.Duration FlushInterval time.Duration MaxReaders int MaxParsers int MaxWorkers int MaxQueueSize int MaxConcurrentEvents int MaxEventQueueSize int EstimatedTags int MetricsAddr string Namespace string StatserType string PercentThreshold []float64 IgnoreHost bool ConnPerReader bool HeartbeatEnabled bool HeartbeatTags gostatsd.Tags ReceiveBatchSize int DisabledSubTypes gostatsd.TimerSubtypes BadLineRateLimitPerSecond rate.Limit CacheOptions Viper *viper.Viper }
Server encapsulates all of the parameters necessary for starting up the statsd server. These can either be set via command line or directly.
func (*Server) RunWithCustomSocket ¶
func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) error
RunWithCustomSocket runs the server until context signals done. Listening socket is created using sf.
type SocketFactory ¶
type SocketFactory func() (net.PacketConn, error)
SocketFactory is an indirection layer over net.ListenPacket() to allow for different implementations.
type TagHandler ¶
type TagHandler struct {
// contains filtered or unexported fields
}
func NewTagHandler ¶
func NewTagHandler(metrics MetricHandler, events EventHandler, tags gostatsd.Tags) *TagHandler
NewTagHandler initialises a new handler which adds tags and sends metrics/events to the next handler
func (*TagHandler) DispatchEvent ¶
DispatchEvent adds the tags from the TagHandler to the event and passes it to the next stage in the pipeline
func (*TagHandler) DispatchMetric ¶
DispatchMetric adds the tags from the TagHandler to the metric and passes it to the next stage in the pipeline
func (*TagHandler) EstimatedTags ¶
func (th *TagHandler) EstimatedTags() int
EstimatedTags returns a guess for how many tags to pre-allocate
func (*TagHandler) WaitForEvents ¶
func (th *TagHandler) WaitForEvents()
WaitForEvents waits for all event-dispatching goroutines to finish.
type V6BatchReader ¶
type V6BatchReader struct {
// contains filtered or unexported fields
}