Documentation ¶
Overview ¶
Package sfxclient creates convenient go functions and wrappers to send metrics to SignalFx.
The core of the library is HTTPSink which allows users to send metrics and events to SignalFx ad-hoc. A Scheduler is built on top of this to facility easy management of metrics for multiple SignalFx reporters at once in more complex libraries.
HTTPSink ¶
The simplest way to send metrics and events to SignalFx is with HTTPSink. The only struct parameter that needs to be configured is AuthToken. To make it easier to create common Datapoint objects, wrappers exist for Gauge and Cumulative. An example of sending a hello world metric would look like this:
func SendHelloWorld() { client := NewHTTPSink() client.AuthToken = "ABCDXYZ" ctx := context.Background() client.AddDatapoints(ctx, []*datapoint.Datapoint{ GaugeF("hello.world", nil, 1.0), }) }
Scheduler ¶
To facilitate periodic sending of datapoints to SignalFx, a Scheduler abstraction exists. You can use this to report custom metrics to SignalFx at some periodic interval.
type CustomApplication struct { queue chan int64 } func (c *CustomApplication) Datapoints() []*datapoint.Datapoint { return []*datapoint.Datapoint { sfxclient.Gauge("queue.size", nil, len(queue)), } } func main() { scheduler := sfxclient.NewScheduler() scheduler.Sink.(*HTTPSink).AuthToken = "ABCD-XYZ" app := &CustomApplication{} scheduler.AddCallback(app) go scheduler.Schedule(context.Background()) }
RollingBucket and CumulativeBucket ¶
Because counting things and calculating percentiles like p99 or median are common operations, RollingBucket and CumulativeBucket exist to make this easier. They implement the Collector interface which allows users to add them to an existing Scheduler.
To run integration tests, testing sending to SignalFx with an actual token, create a file named authinfo.json that has your auth Token, similar to the following
{ "AuthToken": "abcdefg" }
Then execute the following:
go test -v --tags=integration -run TestDatapointSending ./sfxclient/
Index ¶
- Constants
- Variables
- func Counter(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint
- func Cumulative(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint
- func CumulativeF(metricName string, dimensions map[string]string, val float64) *datapoint.Datapoint
- func CumulativeP(metricName string, dimensions map[string]string, val *int64) *datapoint.Datapoint
- func Gauge(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint
- func GaugeF(metricName string, dimensions map[string]string, val float64) *datapoint.Datapoint
- type AsyncMultiTokenSink
- func (a *AsyncMultiTokenSink) AddDatapoints(ctx context.Context, datapoints []*datapoint.Datapoint) (err error)
- func (a *AsyncMultiTokenSink) AddDatapointsWithToken(token string, datapoints []*datapoint.Datapoint) (err error)
- func (a *AsyncMultiTokenSink) AddEvents(ctx context.Context, events []*event.Event) (err error)
- func (a *AsyncMultiTokenSink) AddEventsWithToken(token string, events []*event.Event) (err error)
- func (a *AsyncMultiTokenSink) AddSpans(ctx context.Context, spans []*trace.Span) (err error)
- func (a *AsyncMultiTokenSink) AddSpansWithToken(token string, spans []*trace.Span) (err error)
- func (a *AsyncMultiTokenSink) Close() (err error)
- func (a *AsyncMultiTokenSink) Datapoints() (dps []*datapoint.Datapoint)
- type AsyncTokenStatusCounter
- type Collector
- type ContextKey
- type CumulativeBucket
- type HTTPSink
- type HTTPSinkOption
- type HashableCollector
- type MemCallback
- type MultiCollector
- type Result
- type RollingBucket
- type SFXAPIError
- type Scheduler
- func (s *Scheduler) AddCallback(db Collector)
- func (s *Scheduler) AddGroupedCallback(group string, db Collector)
- func (s *Scheduler) CollectDatapoints() []*datapoint.Datapoint
- func (s *Scheduler) Debug(debug bool)
- func (s *Scheduler) DefaultDimensions(dims map[string]string)
- func (s *Scheduler) GroupedDefaultDimensions(group string, dims map[string]string)
- func (s *Scheduler) RemoveCallback(db Collector)
- func (s *Scheduler) RemoveGroupedCallback(group string, db Collector)
- func (s *Scheduler) ReportOnce(ctx context.Context) error
- func (s *Scheduler) ReportingDelay(delay time.Duration)
- func (s *Scheduler) ReportingTimeout(timeout time.Duration)
- func (s *Scheduler) Schedule(ctx context.Context) error
- func (s *Scheduler) Var() expvar.Var
- type Sink
- type TimeCounter
- type TooManyRequestError
- type WithDimensions
Examples ¶
Constants ¶
const ( // ClientVersion is the version of this library and is embedded into the user agent ClientVersion = "1.0" // IngestEndpointV2 is the v2 version of the signalfx ingest endpoint IngestEndpointV2 = "https://ingest.us0.signalfx.com/v2/datapoint" // EventIngestEndpointV2 is the v2 version of the signalfx event endpoint EventIngestEndpointV2 = "https://ingest.us0.signalfx.com/v2/event" // TraceIngestEndpointV1 is migrated to splunk o11y apm product endpoint // @Deprecated TraceIngestEndpointV1 = "https://ingest.us0.signalfx.com/v2/trace" // TraceIngestSAPMEndpointV2 is the of the sapm trace endpoint TraceIngestSAPMEndpointV2 = "https://ingest.us0.signalfx.com/v2/trace" // DefaultTimeout is the default time to fail signalfx datapoint requests if they don't succeed DefaultTimeout = time.Second * 5 )
const ( // DefaultReportingDelay is the default interval Scheduler users to report metrics to SignalFx DefaultReportingDelay = time.Second * 20 // DefaultReportingTimeout is the default timeout value for Scheduler to log error message if reporting is not completed within this duration DefaultReportingTimeout = time.Second * 5 )
const TokenHeaderName = "X-Sf-Token"
TokenHeaderName is the header key for the auth token in the HTTP request
Variables ¶
var ( // XDebugID Debugs the transaction via signalscope if value matches known secret XDebugID xKeyContextValue = "X-Debug-Id" // XTracingDebug Sets debug flag on trace if value matches known secret XTracingDebug xKeyContextValue = "X-SF-Trace-Token" // XTracingID if set accompanies the tracingDebug and gives a client the ability to put a value into a tag on the ingest span XTracingID xKeyContextValue = "X-SF-Tracing-ID" )
var DefaultBucketWidth = time.Second * 20
DefaultBucketWidth is the default width that a RollingBucket should flush histogram values
var DefaultErrorHandler = func(err error) error { log.DefaultLogger.Log(log.Err, err, "Unable to handle error") return nil }
DefaultErrorHandler is the default way to handle errors by a scheduler. It simply prints them to stdout
var DefaultHistogramSize = 80
DefaultHistogramSize is the default number of windows RollingBucket uses for created histograms
var DefaultMaxBufferSize = 100
DefaultMaxBufferSize is the default number of past bucket Quantile values RollingBucket saves until a Datapoints() call
var DefaultQuantiles = []float64{.25, .5, .9, .99}
DefaultQuantiles are the default set of percentiles RollingBucket should collect
var DefaultUserAgent = fmt.Sprintf("golib-sfxclient/%s (gover %s)", ClientVersion, runtime.Version())
DefaultUserAgent is the UserAgent string sent to signalfx
Functions ¶
func Counter ¶
Counter creates a SignalFx counter for integer values, incrementing by a set value. Generally, it is preferable to use Cumulative Counters when possible.
func Cumulative ¶
Cumulative creates a SignalFx cumulative counter for integer values.
func CumulativeF ¶
CumulativeF creates a SignalFx cumulative counter for float values.
func CumulativeP ¶
CumulativeP creates a SignalFx cumulative counter for integer values from a pointer that is loaded atomically.
Example ¶
client := NewHTTPSink() ctx := context.Background() var countThing int64 go func() { atomic.AddInt64(&countThing, 1) }() if err := client.AddDatapoints(ctx, []*datapoint.Datapoint{ CumulativeP("server.request_count", nil, &countThing), }); err != nil { panic("Could not send datapoints") }
Output:
Types ¶
type AsyncMultiTokenSink ¶
type AsyncMultiTokenSink struct { ShutdownTimeout time.Duration // ShutdownTimeout is how long the sink should wait before timing out after Close() is called Hasher hash.Hash32 // Hasher is used to hash access tokens to a worker NewHTTPClient func() *http.Client // function used to create an http client for the underlying sinks // contains filtered or unexported fields }
AsyncMultiTokenSink asynchronously sends datapoints for multiple tokens
func NewAsyncMultiTokenSink ¶
func NewAsyncMultiTokenSink(numChannels int64, numDrainingThreads int64, buffer int, batchSize int, datapointEndpoint, eventEndpoint, traceEndpoint, userAgent string, httpClient func() *http.Client, errorHandler func(error) error, maxRetry int) *AsyncMultiTokenSink
NewAsyncMultiTokenSink returns a sink that asynchronously emits datapoints with different tokens
func (*AsyncMultiTokenSink) AddDatapoints ¶
func (a *AsyncMultiTokenSink) AddDatapoints(ctx context.Context, datapoints []*datapoint.Datapoint) (err error)
AddDatapoints add datapoints to the multi token sync using a context that has the TokenCtxKey
func (*AsyncMultiTokenSink) AddDatapointsWithToken ¶
func (a *AsyncMultiTokenSink) AddDatapointsWithToken(token string, datapoints []*datapoint.Datapoint) (err error)
AddDatapointsWithToken emits a list of datapoints using a supplied token
func (*AsyncMultiTokenSink) AddEvents ¶
AddEvents add datapoints to the multi token sync using a context that has the TokenCtxKey
func (*AsyncMultiTokenSink) AddEventsWithToken ¶
func (a *AsyncMultiTokenSink) AddEventsWithToken(token string, events []*event.Event) (err error)
AddEventsWithToken emits a list of events using a supplied token
func (*AsyncMultiTokenSink) AddSpans ¶
AddSpans add datepoints to the multitoken sync using a context that has the TokenCtxKey
func (*AsyncMultiTokenSink) AddSpansWithToken ¶
func (a *AsyncMultiTokenSink) AddSpansWithToken(token string, spans []*trace.Span) (err error)
AddSpansWithToken emits a list of events using a supplied token
func (*AsyncMultiTokenSink) Close ¶
func (a *AsyncMultiTokenSink) Close() (err error)
Close stops the existing workers and prevents additional datapoints from being added if a ShutdownTimeout is set on the sink, it will be used as a timeout for closing the sink the default timeout is 5 seconds
func (*AsyncMultiTokenSink) Datapoints ¶
func (a *AsyncMultiTokenSink) Datapoints() (dps []*datapoint.Datapoint)
Datapoints returns a set of datapoints about the sink
type AsyncTokenStatusCounter ¶
type AsyncTokenStatusCounter struct {
// contains filtered or unexported fields
}
AsyncTokenStatusCounter is a counter and collector for http statuses by token
func NewAsyncTokenStatusCounter ¶
func NewAsyncTokenStatusCounter(name string, buffer int, numWorkers int64, defaultDims map[string]string) *AsyncTokenStatusCounter
NewAsyncTokenStatusCounter returns a structure for counting occurrences of http statuses by token
func (*AsyncTokenStatusCounter) Datapoints ¶
func (a *AsyncTokenStatusCounter) Datapoints() (dps []*datapoint.Datapoint)
Datapoints returns datapoints for each token and status
func (*AsyncTokenStatusCounter) Increment ¶
func (a *AsyncTokenStatusCounter) Increment(status *tokenStatus)
Increment adds a tokenStatus object to the counter
type Collector ¶
Collector is anything Scheduler can track that emits points
func NewMultiCollector ¶
NewMultiCollector returns a collector that is the aggregate of every given collector. It can be used to turn multiple collectors into a single collector.
Example ¶
var a Collector var b Collector c := NewMultiCollector(a, b) So(len(c.Datapoints()), ShouldEqual, 0)
Output:
type ContextKey ¶
type ContextKey string
ContextKey is a custom key type for context values
const ( // TokenCtxKey is a context key for tokens TokenCtxKey ContextKey = TokenHeaderName )
type CumulativeBucket ¶
type CumulativeBucket struct { MetricName string Dimensions map[string]string // contains filtered or unexported fields }
A CumulativeBucket tracks groups of values, reporting the count/sum/sum of squares as a cumulative counter.
Example ¶
cb := &CumulativeBucket{ MetricName: "mname", Dimensions: map[string]string{"type": "dev"}, } cb.Add(1) cb.Add(3) client := NewHTTPSink() ctx := context.Background() // Will expect it to send count=2, sum=4, sumofsquare=10 log.IfErr(log.Panic, client.AddDatapoints(ctx, cb.Datapoints()))
Output:
func (*CumulativeBucket) Add ¶
func (b *CumulativeBucket) Add(val int64)
Add an item to the bucket, later reporting the result in the next report cycle.
func (*CumulativeBucket) Datapoints ¶
func (b *CumulativeBucket) Datapoints() []*datapoint.Datapoint
Datapoints returns the count/sum/sumsquare datapoints, or nil if there is no set metric name
func (*CumulativeBucket) MultiAdd ¶
func (b *CumulativeBucket) MultiAdd(res *Result)
MultiAdd many items into the bucket at once using a Result. This can be more efficient as it involves only a constant number of atomic operations.
type HTTPSink ¶
type HTTPSink struct { AuthToken string UserAgent string EventEndpoint string DatapointEndpoint string TraceEndpoint string AdditionalHeaders map[string]string ResponseCallback func(resp *http.Response, responseBody []byte) Client *http.Client DisableCompression bool // contains filtered or unexported fields }
HTTPSink -
Example ¶
sink := NewHTTPSink() sink.AuthToken = "ABCDEFG" ctx := context.Background() err := sink.AddDatapoints(ctx, []*datapoint.Datapoint{ // Sending a gauge with the value 1.2 GaugeF("a.gauge", nil, 1.2), // Sending a cumulative counter with dimensions Cumulative("a.counter", map[string]string{"type": "dev"}, 100), }) if err != nil { panic(err) }
Output:
func NewHTTPSink ¶
func NewHTTPSink(opts ...HTTPSinkOption) *HTTPSink
NewHTTPSink creates a default NewHTTPSink using package level constants as defaults, including an empty auth token. If sending directly to SignalFx, you will be required to explicitly set the AuthToken
func (*HTTPSink) AddDatapoints ¶
AddDatapoints forwards the datapoints to SignalFx.
type HTTPSinkOption ¶ added in v3.1.0
type HTTPSinkOption func(*HTTPSink)
HTTPSinkOption can be passed to NewHTTPSink to customize it's behaviour
func WithSAPMTraceExporter ¶ added in v3.1.0
func WithSAPMTraceExporter() HTTPSinkOption
WithSAPMTraceExporter takes a reference to HTTPSink and configures it to export using the SAPM protocol.
func WithZipkinTraceExporter ¶ added in v3.1.0
func WithZipkinTraceExporter() HTTPSinkOption
WithZipkinTraceExporter takes a reference to HTTPSink and configures it to export using the Zipkin protocol.
type HashableCollector ¶
HashableCollector is a Collector function that can be inserted into a hashmap. You can use it to wrap a functional callback and insert it into a Scheduler.
func CollectorFunc ¶
func CollectorFunc(callback func() []*datapoint.Datapoint) *HashableCollector
CollectorFunc wraps a function to make it a Collector.
func (*HashableCollector) Datapoints ¶
func (h *HashableCollector) Datapoints() []*datapoint.Datapoint
Datapoints calls the wrapped function.
type MemCallback ¶ added in v3.3.10
MemCallback interface implements the necessary api's for goMetric
var GoMetricsSource MemCallback = &goMetrics{}
GoMetricsSource is a singleton Collector that collects basic go system stats. It currently collects from runtime.ReadMemStats and adds a few extra metrics like uptime of the process and other runtime package functions.
type MultiCollector ¶
type MultiCollector []Collector
MultiCollector acts like a datapoint collector over multiple collectors.
func (MultiCollector) Datapoints ¶
func (m MultiCollector) Datapoints() []*datapoint.Datapoint
Datapoints returns the datapoints from every collector.
type RollingBucket ¶
type RollingBucket struct { // MetricName is the metric name used when the RollingBucket is reported to SignalFx MetricName string // Dimensions are the dimensions used when the RollingBucket is reported to SignalFx Dimensions map[string]string // Quantiles are an array of values [0 - 1.0] that are the histogram quantiles reported to // SignalFx during a Datapoints() call. For example, [.5] would only report the median. Quantiles []float64 // BucketWidth is how long in time a bucket accumulates values before a flush is forced BucketWidth time.Duration // Hist is an efficient tracker of numeric values for a histogram Hist *gohistogram.NumericHistogram // MaxFlushBufferSize is the maximum size of a window to keep for the RollingBucket before // quantiles are dropped. It is ideally close to len(quantiles) * 3 + 15 MaxFlushBufferSize int // Timer is used to track time.Now() during default value add calls Timer timekeeper.TimeKeeper // IncludeSum turns on Sum and SumOfSquares IncludeSum bool // contains filtered or unexported fields }
RollingBucket keeps histogram style metrics over a BucketWidth window of time. It allows users to collect and report percentile metrics like like median or p99, as well as min/max/sum/count and sum of square from a set of points.
func NewRollingBucket ¶
func NewRollingBucket(metricName string, dimensions map[string]string) *RollingBucket
NewRollingBucket creates a new RollingBucket using default values for Quantiles, BucketWidth, and the histogram tracker.
func (*RollingBucket) Add ¶
func (r *RollingBucket) Add(v float64)
Add a value to the rolling bucket histogram. If the current time is already calculated, it may be more efficient to call AddAt in order to save another time.Time() call.
func (*RollingBucket) AddAt ¶
func (r *RollingBucket) AddAt(v float64, t time.Time)
AddAt is like Add but also takes a time to pretend the value comes at.
func (*RollingBucket) Datapoints ¶
func (r *RollingBucket) Datapoints() []*datapoint.Datapoint
Datapoints returns basic bucket stats every time and will only the first time called for each window return that window's points. For efficiency sake, Datapoints() will only return histogram window values once. Because of this, it is suggested to always forward datapoints returned by this call to SignalFx.
type SFXAPIError ¶
SFXAPIError is returned when the API returns a status code other than 200.
func (SFXAPIError) Error ¶
func (se SFXAPIError) Error() string
type Scheduler ¶
type Scheduler struct { Sink Sink Timer timekeeper.TimeKeeper SendZeroTime bool ErrorHandler func(error) error ReportingDelayNs int64 ReportingTimeoutNs int64 Prefix string // contains filtered or unexported fields }
A Scheduler reports metrics to SignalFx at some timely manner.
Example ¶
s := NewScheduler() s.Sink.(*HTTPSink).AuthToken = "ABCD-XYZ" s.AddCallback(GoMetricsSource) bucket := NewRollingBucket("req.time", map[string]string{"env": "test"}) s.AddCallback(bucket) bucket.Add(1.2) bucket.Add(3) ctx := context.Background() err := s.Schedule(ctx) fmt.Println("Schedule result: ", err)
Output:
func NewScheduler ¶
func NewScheduler() *Scheduler
NewScheduler creates a default SignalFx scheduler that can report metrics to SignalFx at some interval.
func (*Scheduler) AddCallback ¶
AddCallback adds a collector to the default group.
func (*Scheduler) AddGroupedCallback ¶
AddGroupedCallback adds a collector to a specific group.
func (*Scheduler) CollectDatapoints ¶
CollectDatapoints gives a scheduler an external endpoint to be called and is thread safe
func (*Scheduler) DefaultDimensions ¶
DefaultDimensions adds a dimension map that are appended to all metrics in the default group.
func (*Scheduler) GroupedDefaultDimensions ¶
GroupedDefaultDimensions adds default dimensions to a specific group.
func (*Scheduler) RemoveCallback ¶
RemoveCallback removes a collector from the default group.
func (*Scheduler) RemoveGroupedCallback ¶
RemoveGroupedCallback removes a collector from a specific group.
func (*Scheduler) ReportOnce ¶
ReportOnce will report any metrics saved in this reporter to SignalFx
func (*Scheduler) ReportingDelay ¶
ReportingDelay sets the interval metrics are reported to SignalFx.
func (*Scheduler) ReportingTimeout ¶
ReportingTimeout sets the timeout value if reporting to SignalFx is not completed within this duration
type Sink ¶
type Sink interface {
AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) (err error)
}
Sink is anything that can receive points collected by a Scheduler. This can be useful for stubbing out your collector to test the points that will be sent to SignalFx.
type TimeCounter ¶
TimeCounter counts durations exactly above/below a value. NsBarrier is expected to be some amount of nanoseconds that the barrier exists at
func (*TimeCounter) Add ¶
func (t *TimeCounter) Add(dur time.Duration)
Add includes a datapoint in the source, incrementing above or below according to barrier
func (*TimeCounter) Collector ¶
func (t *TimeCounter) Collector(metric string) Collector
Collector returns a datapoint collector for this time counter that uses the given metric name.
type TooManyRequestError ¶ added in v3.3.1
type TooManyRequestError struct { // Throttle-Type header returned with the 429 which indicates what the reason. // This is a SignalFx-specific header, and the value may be empty. ThrottleType string // The client should retry after certain intervals. // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After for details. RetryAfter time.Duration // wrapped error. Err error }
TooManyRequestError is returned when the API returns HTTP 429 error. see https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429 fot details.
func (TooManyRequestError) Error ¶ added in v3.3.1
func (e TooManyRequestError) Error() string
type WithDimensions ¶
WithDimensions adds dimensions on top of the datapoints of a collector. This can be used to take an existing Collector and include extra dimensions.
Example ¶
sched := NewScheduler() sched.AddCallback(&WithDimensions{ Collector: GoMetricsSource, Dimensions: map[string]string{ "extra": "dimension", }, })
Output:
func (*WithDimensions) Datapoints ¶
func (w *WithDimensions) Datapoints() []*datapoint.Datapoint
Datapoints calls datapoints and adds on Dimensions