Documentation
¶
Overview ¶
Package statsd implements functionality for creating servers compatible with the statsd protocol. See https://github.com/etsy/statsd/blob/master/docs/metric_types.md for a description of the protocol.
The main components of the library are Receiver, Dispatcher, Aggregator and Flusher. Receiver is responsible for receiving metrics from the socket. Dispatcher dispatches received metrics among several Aggregators, which do aggregation based on type of the metric. At every FlushInterval Flusher flushes metrics via associated Backend objects.
Currently the library implements just a few types of Backend, one compatible with Graphite (http://graphite.wikidot.org), one for Datadog and one just for stdout, but any object implementing the Backend interface can be used with the library. See available backends at https://github.com/hligit/gostatsd/tree/master/backend/backends.
As with the original etsy statsd, multiple backends can be used simultaneously.
Index ¶
- type AggregateProcesser
- type Aggregator
- type AggregatorFactory
- type AggregatorFactoryFunc
- type BackendHandler
- func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
- func (bh *BackendHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)
- func (bh *BackendHandler) EstimatedTags() int
- func (bh *BackendHandler) Process(ctx context.Context, f DispatcherProcessFunc) gostatsd.Wait
- func (bh *BackendHandler) Run(ctx context.Context)
- func (bh *BackendHandler) RunMetrics(ctx context.Context, statser stats.Statser)
- func (bh *BackendHandler) RunMetricsContext(ctx context.Context)
- func (bh *BackendHandler) WaitForEvents()
- type BatchReader
- type CloudHandler
- func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
- func (ch *CloudHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)
- func (ch *CloudHandler) EstimatedTags() int
- func (ch *CloudHandler) Run(ctx context.Context)
- func (ch *CloudHandler) RunMetrics(ctx context.Context, statser stats.Statser)
- func (ch *CloudHandler) WaitForEvents()
- type Datagram
- type DatagramParser
- type DatagramReceiver
- type DispatcherProcessFunc
- type Filter
- type GenericBatchReader
- type HttpForwarderHandlerV2
- func (hfh *HttpForwarderHandlerV2) DispatchEvent(ctx context.Context, e *gostatsd.Event)
- func (hfh *HttpForwarderHandlerV2) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)
- func (hfh *HttpForwarderHandlerV2) EstimatedTags() int
- func (hfh *HttpForwarderHandlerV2) Run(ctx context.Context)
- func (hfh *HttpForwarderHandlerV2) RunMetricsContext(ctx context.Context)
- func (hfh *HttpForwarderHandlerV2) WaitForEvents()
- type Message
- type MetricAggregator
- type MetricEmitter
- type MetricFlusher
- type ProcessFunc
- type Server
- type SocketFactory
- type TagChanger
- type TagHandler
- type V6BatchReader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregateProcesser ¶
type AggregateProcesser interface {
Process(ctx context.Context, fn DispatcherProcessFunc) gostatsd.Wait
}
AggregateProcesser is an interface to run a function against each Aggregator, in the goroutine context of that Aggregator.
type Aggregator ¶
type Aggregator interface { ReceiveMap(mm *gostatsd.MetricMap) Flush(interval time.Duration) Process(ProcessFunc) Reset() }
Aggregator is an object that aggregates statsd metrics. The function NewAggregator should be used to create the objects.
Incoming metrics should be passed via ReceiveMap function.
type AggregatorFactory ¶
type AggregatorFactory interface { // Create creates Aggregator objects. Create() Aggregator }
AggregatorFactory creates Aggregator objects.
type AggregatorFactoryFunc ¶
type AggregatorFactoryFunc func() Aggregator
AggregatorFactoryFunc type is an adapter to allow the use of ordinary functions as AggregatorFactory.
func (AggregatorFactoryFunc) Create ¶
func (f AggregatorFactoryFunc) Create() Aggregator
Create calls f().
type BackendHandler ¶
type BackendHandler struct {
// contains filtered or unexported fields
}
BackendEventHandler dispatches metrics and events to all configured backends (via Aggregators)
func NewBackendHandler ¶
func NewBackendHandler(backends []gostatsd.Backend, maxConcurrentEvents uint, numWorkers int, perWorkerBufferSize int, af AggregatorFactory) *BackendHandler
NewBackendHandler initialises a new Handler which sends metrics and events to all backends
func (*BackendHandler) DispatchEvent ¶
func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
func (*BackendHandler) DispatchMetricMap ¶
func (bh *BackendHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)
DispatchMetricMap splits a MetricMap in to per-aggregator buckets and distributes it.
func (*BackendHandler) EstimatedTags ¶
func (bh *BackendHandler) EstimatedTags() int
EstimatedTags returns a guess for how many tags to pre-allocate
func (*BackendHandler) Process ¶
func (bh *BackendHandler) Process(ctx context.Context, f DispatcherProcessFunc) gostatsd.Wait
Process concurrently executes provided function in goroutines that own Aggregators. DispatcherProcessFunc function may be executed zero or up to numWorkers times. It is executed less than numWorkers times if the context signals "done".
func (*BackendHandler) Run ¶
func (bh *BackendHandler) Run(ctx context.Context)
Run runs the BackendHandler workers until the Context is closed.
func (*BackendHandler) RunMetrics ¶
func (bh *BackendHandler) RunMetrics(ctx context.Context, statser stats.Statser)
RunMetrics attaches a Statser to the BackendHandler. Stops when the context is closed.
func (*BackendHandler) RunMetricsContext ¶
func (bh *BackendHandler) RunMetricsContext(ctx context.Context)
RunMetricsContext pulls a Statser from the Context and invokes RunMetrics. Allows a BackendHandler to still conform to MetricEmitter.
func (*BackendHandler) WaitForEvents ¶
func (bh *BackendHandler) WaitForEvents()
WaitForEvents waits for all event-dispatching goroutines to finish.
type BatchReader ¶
func NewBatchReader ¶
func NewBatchReader(conn net.PacketConn) BatchReader
type CloudHandler ¶
type CloudHandler struct {
// contains filtered or unexported fields
}
CloudHandler enriches metrics and events with additional information fetched from cloud provider.
func NewCloudHandler ¶
func NewCloudHandler(cachedInstances gostatsd.CachedInstances, handler gostatsd.PipelineHandler) *CloudHandler
NewCloudHandler initialises a new cloud handler.
func (*CloudHandler) DispatchEvent ¶
func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
func (*CloudHandler) DispatchMetricMap ¶
func (ch *CloudHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)
DispatchMetricMap re-dispatches a MetricMap through CloudHandler.processMetrics TODO: This is inefficient, and should be handled first class, however that is a major re-factor of
the CloudHandler. It is also recommended to not use a CloudHandler in an http receiver based service, as the IP is not propagated.
func (*CloudHandler) EstimatedTags ¶
func (ch *CloudHandler) EstimatedTags() int
EstimatedTags returns a guess for how many tags to pre-allocate
func (*CloudHandler) Run ¶
func (ch *CloudHandler) Run(ctx context.Context)
func (*CloudHandler) RunMetrics ¶
func (ch *CloudHandler) RunMetrics(ctx context.Context, statser stats.Statser)
func (*CloudHandler) WaitForEvents ¶
func (ch *CloudHandler) WaitForEvents()
WaitForEvents waits for all event-dispatching goroutines to finish.
type Datagram ¶
type Datagram struct { IP gostatsd.Source Msg []byte Timestamp gostatsd.Nanotime DoneFunc func() // to be called once the datagram has been parsed and msg can be freed }
Datagram is a received UDP datagram that has not been parsed into Metric/Event(s)
type DatagramParser ¶
type DatagramParser struct {
// contains filtered or unexported fields
}
DatagramParser receives datagrams and parses them into Metrics/Events For each Metric/Event it calls Handler.HandleMetric/Event()
func NewDatagramParser ¶
func NewDatagramParser( in <-chan []*Datagram, ns string, ignoreHost bool, estimatedTags int, handler gostatsd.PipelineHandler, badLineRateLimitPerSecond rate.Limit, logRawMetric bool, logger logrus.FieldLogger, ) *DatagramParser
NewDatagramParser initialises a new DatagramParser.
func (*DatagramParser) Run ¶
func (dp *DatagramParser) Run(ctx context.Context)
func (*DatagramParser) RunMetricsContext ¶
func (dp *DatagramParser) RunMetricsContext(ctx context.Context)
type DatagramReceiver ¶
type DatagramReceiver struct {
// contains filtered or unexported fields
}
DatagramReceiver receives datagrams on its PacketConn and passes them off to be parsed
func NewDatagramReceiver ¶
func NewDatagramReceiver(out chan<- []*Datagram, sf SocketFactory, numReaders, receiveBatchSize int) *DatagramReceiver
NewDatagramReceiver initialises a new DatagramReceiver.
func (*DatagramReceiver) Receive ¶
func (dr *DatagramReceiver) Receive(ctx context.Context, c net.PacketConn)
Receive accepts incoming datagrams on c, and passes them off to be parsed
func (*DatagramReceiver) Run ¶
func (dr *DatagramReceiver) Run(ctx context.Context)
func (*DatagramReceiver) RunMetricsContext ¶
func (dr *DatagramReceiver) RunMetricsContext(ctx context.Context)
type DispatcherProcessFunc ¶
type DispatcherProcessFunc func(int, Aggregator)
DispatcherProcessFunc is a function that gets executed by Dispatcher for each Aggregator, passing it into the function.
type Filter ¶
type Filter struct { MatchMetrics gostatsd.StringMatchList // Name must match ExcludeMetrics gostatsd.StringMatchList // Name must not match MatchTags gostatsd.StringMatchList // Any tag must match DropTags gostatsd.StringMatchList // Any tag matching anything will be dropped DropMetric bool // Drop the entire metric DropHost bool // Clears Hostname if present }
func NewFilterFromViper ¶
NewFilterFromViper creates a new Filter given a *viper.Viper
type GenericBatchReader ¶
type GenericBatchReader struct {
// contains filtered or unexported fields
}
type HttpForwarderHandlerV2 ¶
type HttpForwarderHandlerV2 struct {
// contains filtered or unexported fields
}
HttpForwarderHandlerV2 is a PipelineHandler which sends metrics to another gostatsd instance
func NewHttpForwarderHandlerV2 ¶
func NewHttpForwarderHandlerV2( logger logrus.FieldLogger, transport, apiEndpoint string, consolidatorSlots, maxRequests int, compress bool, maxRequestElapsedTime time.Duration, flushInterval time.Duration, xheaders map[string]string, dynHeaderNames []string, pool *transport.TransportPool, ) (*HttpForwarderHandlerV2, error)
NewHttpForwarderHandlerV2 returns a new handler which dispatches metrics over http to another gostatsd server.
func NewHttpForwarderHandlerV2FromViper ¶
func NewHttpForwarderHandlerV2FromViper(logger logrus.FieldLogger, v *viper.Viper, pool *transport.TransportPool) (*HttpForwarderHandlerV2, error)
NewHttpForwarderHandlerV2FromViper returns a new http API client.
func (*HttpForwarderHandlerV2) DispatchEvent ¶
func (hfh *HttpForwarderHandlerV2) DispatchEvent(ctx context.Context, e *gostatsd.Event)
func (*HttpForwarderHandlerV2) DispatchMetricMap ¶
func (hfh *HttpForwarderHandlerV2) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)
DispatchMetricMap dispatches a metric map to the MetricConsolidator
func (*HttpForwarderHandlerV2) EstimatedTags ¶
func (hfh *HttpForwarderHandlerV2) EstimatedTags() int
func (*HttpForwarderHandlerV2) Run ¶
func (hfh *HttpForwarderHandlerV2) Run(ctx context.Context)
func (*HttpForwarderHandlerV2) RunMetricsContext ¶
func (hfh *HttpForwarderHandlerV2) RunMetricsContext(ctx context.Context)
func (*HttpForwarderHandlerV2) WaitForEvents ¶
func (hfh *HttpForwarderHandlerV2) WaitForEvents()
type MetricAggregator ¶
type MetricAggregator struct {
// contains filtered or unexported fields
}
MetricAggregator aggregates metrics.
func NewMetricAggregator ¶
func NewMetricAggregator( percentThresholds []float64, expiryIntervalCounter time.Duration, expiryIntervalGauge time.Duration, expiryIntervalSet time.Duration, expiryIntervalTimer time.Duration, disabled gostatsd.TimerSubtypes, histogramLimit uint32, ) *MetricAggregator
NewMetricAggregator creates a new MetricAggregator object.
func (*MetricAggregator) Flush ¶
func (a *MetricAggregator) Flush(flushInterval time.Duration)
Flush prepares the contents of a MetricAggregator for sending via the Sender.
func (*MetricAggregator) Process ¶
func (a *MetricAggregator) Process(f ProcessFunc)
func (*MetricAggregator) ReceiveMap ¶
func (a *MetricAggregator) ReceiveMap(mm *gostatsd.MetricMap)
ReceiveMap takes a single metric map and will aggregate the values
func (*MetricAggregator) Reset ¶
func (a *MetricAggregator) Reset()
Reset clears the contents of a MetricAggregator.
func (*MetricAggregator) RunMetrics ¶
func (a *MetricAggregator) RunMetrics(ctx context.Context, statser stats.Statser)
type MetricEmitter ¶
MetricEmitter is an object that emits metrics. Used to pass a Statser to the object after initialization, as Statsers may be created after MetricEmitters
type MetricFlusher ¶
type MetricFlusher struct {
// contains filtered or unexported fields
}
MetricFlusher periodically flushes metrics from all Aggregators to Senders.
func NewMetricFlusher ¶
func NewMetricFlusher(flushInterval, flushOffset time.Duration, aligned bool, aggregateProcesser AggregateProcesser, backends []gostatsd.Backend) *MetricFlusher
NewMetricFlusher creates a new MetricFlusher with provided configuration.
func (*MetricFlusher) Run ¶
func (f *MetricFlusher) Run(ctx context.Context)
Run runs the MetricFlusher.
type ProcessFunc ¶
ProcessFunc is a function that gets executed by Aggregator with its state passed into the function.
type Server ¶
type Server struct { Runnables []gostatsd.Runnable Backends []gostatsd.Backend CachedInstances gostatsd.CachedInstances InternalTags gostatsd.Tags InternalNamespace string DefaultTags gostatsd.Tags ExpiryIntervalCounter time.Duration ExpiryIntervalGauge time.Duration ExpiryIntervalSet time.Duration ExpiryIntervalTimer time.Duration FlushInterval time.Duration FlushOffset time.Duration FlushAligned bool MaxReaders int MaxParsers int MaxWorkers int MaxQueueSize int MaxConcurrentEvents int MaxEventQueueSize int EstimatedTags int MetricsAddr string Namespace string StatserType string PercentThreshold []float64 IgnoreHost bool ConnPerReader bool HeartbeatEnabled bool HeartbeatTags gostatsd.Tags ReceiveBatchSize int DisabledSubTypes gostatsd.TimerSubtypes HistogramLimit uint32 BadLineRateLimitPerSecond rate.Limit ServerMode string Hostname gostatsd.Source LogRawMetric bool Viper *viper.Viper TransportPool *transport.TransportPool }
Server encapsulates all of the parameters necessary for starting up the statsd server. These can either be set via command line or directly.
func (*Server) RunWithCustomSocket ¶
func (s *Server) RunWithCustomSocket(ctx context.Context, sf SocketFactory) error
RunWithCustomSocket runs the server until context signals done. Listening socket is created using sf.
type SocketFactory ¶
type SocketFactory func() (net.PacketConn, error)
SocketFactory is an indirection layer over net.ListenPacket() to allow for different implementations.
type TagChanger ¶
type TagChanger interface {
AddTagsSetSource(additionalTags gostatsd.Tags, newSource gostatsd.Source)
}
TagChanger is an interface that Metric/Event can implement to update their tags and source. It is so the CloudHandler can change the tags without worrying about the TagsKey cache.
type TagHandler ¶
type TagHandler struct {
// contains filtered or unexported fields
}
func NewTagHandler ¶
func NewTagHandler(handler gostatsd.PipelineHandler, tags gostatsd.Tags, filters []Filter) *TagHandler
NewTagHandler initialises a new handler which adds unique tags, and sends metrics/events to the next handler based on filter rules.
func NewTagHandlerFromViper ¶
func NewTagHandlerFromViper(v *viper.Viper, handler gostatsd.PipelineHandler, tags gostatsd.Tags) *TagHandler
func (*TagHandler) DispatchEvent ¶
func (th *TagHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
DispatchEvent adds the unique tags from the TagHandler to the event and passes it to the next stage in the pipeline
func (*TagHandler) DispatchMetricMap ¶
func (th *TagHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap)
DispatchMetricMap adds the unique tags from the TagHandler to each consolidated metric in the map and passes it to the next stage in the pipeline
There is potential to optimize here: if the tagsKey doesn't change, we don't need to re-calculate it. But we're keeping things simple for now.
func (*TagHandler) EstimatedTags ¶
func (th *TagHandler) EstimatedTags() int
EstimatedTags returns a guess for how many tags to pre-allocate
func (*TagHandler) WaitForEvents ¶
func (th *TagHandler) WaitForEvents()
WaitForEvents waits for all event-dispatching goroutines to finish.
type V6BatchReader ¶
type V6BatchReader struct {
// contains filtered or unexported fields
}