sfxclient

package
v2.5.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2019 License: Apache-2.0 Imports: 34 Imported by: 62

README

sfxclient

import "github.com/signalfx/golib/sfxclient"

Package signalfx creates convenient go functions and wrappers to send metrics to SignalFx.

The core of the library is HTTPSink which allows users to send metrics to SignalFx ad-hoc. A Scheduler is built on top of this to facility easy management of metrics for multiple SignalFx reporters at once in more complex libraries.

HTTPSink

The simplest way to send metrics to SignalFx is with HTTPSink. The only struct parameter that needs to be configured is AuthToken. To make it easier to create common Datapoint objects, wrappers exist for Gauge and Cumulative. An example of sending a hello world metric would look like as shown below.

Configuring your endpoints

If no endpoints are set manually, this library uses the us0 realm by default. If you are not in this realm, you will need to explicitly set the endpoint urls above as shown below. To determine if you are in a different realm and need to explicitly set the endpoints, check your profile page in the SignalFx web application. You will also need to specify an organization access token when making requests with the client. For more information on access tokens, see the API's authentication documentation.

func SendHelloWorld() {
    client := NewHTTPSink()
    // modify endpoints if needed
    client.DatapointEndpoint = "https://ingest.{REALM}.signalfx.com/v2/datapoint"
    client.EventEndpoint = "https://ingest.{REALM}.signalfx.com/v2/event"
    client.TraceEndpoint = "https://ingest.{REALM}.signalfx.com/v1/trace"

    client.AuthToken = "ORG_TOKEN"
    ctx := context.Background()
    client.AddDatapoints(ctx, []*datapoint.Datapoint{
        GaugeF("hello.world", nil, 1.0),
    })
    dims = make(map[string]string)
    client.AddEvents(ctx, []*event.Event{
        event.New("hello.world", event.USERDEFINED, dims, time.Time{}),
    })
}
Scheduler

To facilitate periodic sending of datapoints to SignalFx, a Scheduler abstraction exists. You can use this to report custom metrics to SignalFx at some periodic interval.

type CustomApplication struct {
    queue chan int64
}
func (c *CustomApplication) Datapoints() []*datapoint.Datapoint {
    return []*datapoint.Datapoint {
        sfxclient.Gauge("queue.size", nil, len(queue)),
    }
}
func main() {
    scheduler := sfxclient.NewScheduler()
    // endpoint configuration examples
    scheduler.Sink.(*HTTPSink).DatapointEndpoint = "https://ingest.{REALM}.signalfx.com/v2/datapoint"
    scheduler.Sink.(*HTTPSink).EventEndpoint = "https://ingest.{REALM}.signalfx.com/v2/event"
    scheduler.Sink.(*HTTPSink).TraceEndpoint = "https://ingest.{REALM}.signalfx.com/v1/trace"

    scheduler.Sink.(*HTTPSink).AuthToken = "ORG_TOKEN"
    app := &CustomApplication{}
    scheduler.AddCallback(app)
    go scheduler.Schedule(context.Background())
}

RollingBucket and CumulativeBucket

Because counting things and calculating percentiles like p99 or median are common operations, RollingBucket and CumulativeBucket exist to make this easier. They implement the Collector interface which allows users to add them to an existing Scheduler.

Index
Constants
const ClientVersion = "1.0"

ClientVersion is the version of this library and is embedded into the user agent

const DefaultReportingDelay = time.Second * 20

DefaultReportingDelay is the default interval Scheduler users to report metrics to SignalFx

const DefaultTimeout = time.Second * 5

DefaultTimeout is the default time to fail signalfx datapoint requests if they don't succeed

const IngestEndpointV2 = "https://ingest.signalfx.com/v2/datapoint"

IngestEndpointV2 is the v2 version of the signalfx ingest endpoint

const TokenHeaderName = "X-Sf-Token"

TokenHeaderName is the header key for the auth token in the HTTP request

Variables
var DefaultBucketWidth = time.Second * 20

DefaultBucketWidth is the default width that a RollingBucket should flush histogram values

var DefaultErrorHandler = func(err error) error {
	log.DefaultLogger.Log(log.Err, err, "Unable to handle error")
	return nil
}

DefaultErrorHandler is the default way to handle errors by a scheduler. It simply prints them to stdout

var DefaultHistogramSize = 80

DefaultHistogramSize is the default number of windows RollingBucket uses for created histograms

var DefaultMaxBufferSize = 100

DefaultMaxBufferSize is the default number of past bucket Quantile values RollingBucket saves until a Datapoints() call

var DefaultQuantiles = []float64{.25, .5, .9, .99}

DefaultQuantiles are the default set of percentiles RollingBucket should collect

var DefaultUserAgent = fmt.Sprintf("golib-sfxclient/%s (gover %s)", ClientVersion, runtime.Version())

DefaultUserAgent is the UserAgent string sent to signalfx

func Cumulative
func Cumulative(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint

Cumulative creates a SignalFx cumulative counter for integer values.

func CumulativeF
func CumulativeF(metricName string, dimensions map[string]string, val float64) *datapoint.Datapoint

CumulativeF creates a SignalFx cumulative counter for float values.

func CumulativeP
func CumulativeP(metricName string, dimensions map[string]string, val *int64) *datapoint.Datapoint

CumulativeP creates a SignalFx cumulative counter for integer values from a pointer that is loaded atomically.

func Gauge
func Gauge(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint

Gauge creates a SignalFx gauge for integer values.

func GaugeF
func GaugeF(metricName string, dimensions map[string]string, val float64) *datapoint.Datapoint

GaugeF creates a SignalFx gauge for floating point values.

type Collector
type Collector interface {
	Datapoints() []*datapoint.Datapoint
}

Collector is anything Scheduler can track that emits points

var GoMetricsSource Collector = &goMetrics{}

GoMetricsSource is a singleton Collector that collects basic go system stats. It currently collects from runtime.ReadMemStats and adds a few extra metrics like uptime of the process and other runtime package functions.

func NewMultiCollector
func NewMultiCollector(collectors ...Collector) Collector

NewMultiCollector returns a collector that is the aggregate of every given collector. It can be used to turn multiple collectors into a single collector.

type CumulativeBucket
type CumulativeBucket struct {
	MetricName string
	Dimensions map[string]string
}

A CumulativeBucket tracks groups of values, reporting the count/sum/sum of squares as a cumulative counter.

func (*CumulativeBucket) Add
func (b *CumulativeBucket) Add(val int64)

Add an item to the bucket, later reporting the result in the next report cycle.

func (*CumulativeBucket) Datapoints
func (b *CumulativeBucket) Datapoints() []*datapoint.Datapoint

Datapoints returns the count/sum/sumsquare datapoints, or nil if there is no set metric name

func (*CumulativeBucket) MultiAdd
func (b *CumulativeBucket) MultiAdd(res *Result)

MultiAdd many items into the bucket at once using a Result. This can be more efficient as it involves only a constant number of atomic operations.

type HTTPSink
type HTTPSink struct {
	AuthToken          string
	UserAgent          string
	DatapointEndpoint  string
	EventEndpoint      string
	Client             *http.Client
}

HTTPSink will accept signalfx datapoints and forward them to SignalFx via HTTP.

func NewHTTPSink
func NewHTTPSink() *HTTPSink

NewHTTPSink creates a default NewHTTPSink using package level constants as defaults, including an empty auth token. If sending directly to SiganlFx, you will be required to explicitly set the AuthToken

func (*HTTPSink) AddDatapoints
func (h *HTTPSink) AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) (err error)

AddDatapoints forwards the datapoints to SignalFx.

func (*HTTPSink) AddEvents
func (h *HTTPSink) AddEvents(ctx context.Context, points []*event.Event) (err error)

AddEvents forwards the events to SignalFx.

type HashableCollector
type HashableCollector struct {
	Callback func() []*datapoint.Datapoint
}

HashableCollector is a Collector function that can be inserted into a hashmap. You can use it to wrap a functional callback and insert it into a Scheduler.

func CollectorFunc
func CollectorFunc(callback func() []*datapoint.Datapoint) *HashableCollector

CollectorFunc wraps a function to make it a Collector.

func (*HashableCollector) Datapoints
func (h *HashableCollector) Datapoints() []*datapoint.Datapoint

Datapoints calls the wrapped function.

type MultiCollector
type MultiCollector []Collector

MultiCollector acts like a datapoint collector over multiple collectors.

func (MultiCollector) Datapoints
func (m MultiCollector) Datapoints() []*datapoint.Datapoint

Datapoints returns the datapoints from every collector.

type Result
type Result struct {
	Count        int64
	Sum          int64
	SumOfSquares float64
}

Result is a cumulated result of items that can be added to a CumulativeBucket at once

func (*Result) Add
func (r *Result) Add(val int64)

Add a single number to the bucket. This does not use atomic operations and is not thread safe, but adding a finished Result into a CumulativeBucket is thread safe.

type RollingBucket
type RollingBucket struct {
	// MetricName is the metric name used when the RollingBucket is reported to SignalFx
	MetricName string
	// Dimensions are the dimensions used when the RollingBucket is reported to SignalFx
	Dimensions map[string]string
	// Quantiles are an array of values [0 - 1.0] that are the histogram quantiles reported to
	// SignalFx during a Datapoints() call.  For example, [.5] would only report the median.
	Quantiles []float64
	// BucketWidth is how long in time a bucket accumulates values before a flush is forced
	BucketWidth time.Duration
	// Hist is an efficient tracker of numeric values for a histogram
	Hist *gohistogram.NumericHistogram
	// MaxFlushBufferSize is the maximum size of a window to keep for the RollingBucket before
	// quantiles are dropped.  It is ideally close to len(quantiles) * 3 + 15
	MaxFlushBufferSize int
	// Timer is used to track time.Now() during default value add calls
	Timer timekeeper.TimeKeeper
}

RollingBucket keeps histogram style metrics over a BucketWidth window of time. It allows users to collect and report percentile metrics like like median or p99, as well as min/max/sum/count and sum of square from a set of points.

func NewRollingBucket
func NewRollingBucket(metricName string, dimensions map[string]string) *RollingBucket

NewRollingBucket creates a new RollingBucket using default values for Quantiles, BucketWidth, and the histogram tracker.

func (*RollingBucket) Add
func (r *RollingBucket) Add(v float64)

Add a value to the rolling bucket histogram. If the current time is already calculated, it may be more efficient to call AddAt in order to save another time.Time() call.

func (*RollingBucket) AddAt
func (r *RollingBucket) AddAt(v float64, t time.Time)

AddAt is like Add but also takes a time to pretend the value comes at.

func (*RollingBucket) Datapoints
func (r *RollingBucket) Datapoints() []*datapoint.Datapoint

Datapoints returns basic bucket stats every time and will only the first time called for each window return that window's points. For efficiency sake, Datapoints() will only return histogram window values once. Because of this, it is suggested to always forward datapoints returned by this call to SignalFx.

type Scheduler
type Scheduler struct {
	Sink             Sink
	Timer            timekeeper.TimeKeeper
	SendZeroTime     bool
	ErrorHandler     func(error) error
	ReportingDelayNs int64
}

A Scheduler reports metrics to SignalFx at some timely manner.

func NewScheduler
func NewScheduler() *Scheduler

NewScheduler creates a default SignalFx scheduler that can report metrics to SignalFx at some interval.

func (*Scheduler) AddCallback
func (s *Scheduler) AddCallback(db Collector)

AddCallback adds a collector to the default group.

func (*Scheduler) AddGroupedCallback
func (s *Scheduler) AddGroupedCallback(group string, db Collector)

AddGroupedCallback adds a collector to a specific group.

func (*Scheduler) DefaultDimensions
func (s *Scheduler) DefaultDimensions(dims map[string]string)

DefaultDimensions adds a dimension map that are appended to all metrics in the default group.

func (*Scheduler) GroupedDefaultDimensions
func (s *Scheduler) GroupedDefaultDimensions(group string, dims map[string]string)

GroupedDefaultDimensions adds default dimensions to a specific group.

func (*Scheduler) RemoveCallback
func (s *Scheduler) RemoveCallback(db Collector)

RemoveCallback removes a collector from the default group.

func (*Scheduler) RemoveGroupedCallback
func (s *Scheduler) RemoveGroupedCallback(group string, db Collector)

RemoveGroupedCallback removes a collector from a specific group.

func (*Scheduler) ReportOnce
func (s *Scheduler) ReportOnce(ctx context.Context) error

ReportOnce will report any metrics saved in this reporter to SignalFx

func (*Scheduler) ReportingDelay
func (s *Scheduler) ReportingDelay(delay time.Duration)

ReportingDelay sets the interval metrics are reported to SignalFx.

func (*Scheduler) Schedule
func (s *Scheduler) Schedule(ctx context.Context) error

Schedule will run until either the ErrorHandler returns an error or the context is canceled. This is intended to be run inside a goroutine.

func (*Scheduler) Var
func (s *Scheduler) Var() expvar.Var

Var returns an expvar variable that prints the values of the previously reported datapoints.

type Sink
type Sink interface {
	AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) (err error)
}

Sink is anything that can receive points collected by a Scheduler. This can be useful for stubbing out your collector to test the points that will be sent to SignalFx.

type WithDimensions
type WithDimensions struct {
	Dimensions map[string]string
	Collector  Collector
}

WithDimensions adds dimensions on top of the datapoints of a collector. This can be used to take an existing Collector and include extra dimensions.

func (*WithDimensions) Datapoints
func (w *WithDimensions) Datapoints() []*datapoint.Datapoint

Datapoints calls datapoints and adds on Dimensions

Documentation

Overview

Package sfxclient creates convenient go functions and wrappers to send metrics to SignalFx.

The core of the library is HTTPSink which allows users to send metrics and events to SignalFx ad-hoc. A Scheduler is built on top of this to facility easy management of metrics for multiple SignalFx reporters at once in more complex libraries.

HTTPSink

The simplest way to send metrics and events to SignalFx is with HTTPSink. The only struct parameter that needs to be configured is AuthToken. To make it easier to create common Datapoint objects, wrappers exist for Gauge and Cumulative. An example of sending a hello world metric would look like this:

func SendHelloWorld() {
    client := NewHTTPSink()
    client.AuthToken = "ABCDXYZ"
    ctx := context.Background()
    client.AddDatapoints(ctx, []*datapoint.Datapoint{
        GaugeF("hello.world", nil, 1.0),
    })
}

Scheduler

To facilitate periodic sending of datapoints to SignalFx, a Scheduler abstraction exists. You can use this to report custom metrics to SignalFx at some periodic interval.

type CustomApplication struct {
    queue chan int64
}
func (c *CustomApplication) Datapoints() []*datapoint.Datapoint {
    return []*datapoint.Datapoint {
      sfxclient.Gauge("queue.size", nil, len(queue)),
    }
}
func main() {
    scheduler := sfxclient.NewScheduler()
    scheduler.Sink.(*HTTPSink).AuthToken = "ABCD-XYZ"
    app := &CustomApplication{}
    scheduler.AddCallback(app)
    go scheduler.Schedule(context.Background())
}

RollingBucket and CumulativeBucket

Because counting things and calculating percentiles like p99 or median are common operations, RollingBucket and CumulativeBucket exist to make this easier. They implement the Collector interface which allows users to add them to an existing Scheduler.

To run integration tests, testing sending to SignalFx with an actual token, create a file named authinfo.json that has your auth Token, similar to the following

{
  "AuthToken": "abcdefg"
}

Then execute the following:

go test -v --tags=integration -run TestDatapointSending ./sfxclient/

Index

Examples

Constants

View Source
const (
	// ClientVersion is the version of this library and is embedded into the user agent
	ClientVersion = "1.0"

	// IngestEndpointV2 is the v2 version of the signalfx ingest endpoint
	IngestEndpointV2 = "https://ingest.signalfx.com/v2/datapoint"

	// EventIngestEndpointV2 is the v2 version of the signalfx event endpoint
	EventIngestEndpointV2 = "https://ingest.signalfx.com/v2/event"

	// TraceIngestEndpointV1 is the v1 version of the signalfx trace endpoint
	TraceIngestEndpointV1 = "https://ingest.signalfx.com/v1/trace"

	// DefaultTimeout is the default time to fail signalfx datapoint requests if they don't succeed
	DefaultTimeout = time.Second * 5
)
View Source
const (
	// DefaultReportingDelay is the default interval Scheduler users to report metrics to SignalFx
	DefaultReportingDelay = time.Second * 20
	// DefaultReportingTimeout is the default timeout value for Scheduler to log error message if reporting is not completed within this duration
	DefaultReportingTimeout = time.Second * 5
)
View Source
const TokenHeaderName = "X-Sf-Token"

TokenHeaderName is the header key for the auth token in the HTTP request

Variables

View Source
var (
	// XDebugID Debugs the transaction via signalscope if value matches known secret
	XDebugID xKeyContextValue = "X-Debug-Id"
	// XTracingDebug Sets debug flag on trace if value matches known secret
	XTracingDebug xKeyContextValue = "X-SF-Trace-Token"
	// XTracingID if set accompanies the tracingDebug and gives a client the ability to put a value into a tag on the ingest span
	XTracingID xKeyContextValue = "X-SF-Tracing-ID"
)
View Source
var DefaultBucketWidth = time.Second * 20

DefaultBucketWidth is the default width that a RollingBucket should flush histogram values

View Source
var DefaultErrorHandler = func(err error) error {
	log.DefaultLogger.Log(log.Err, err, "Unable to handle error")
	return nil
}

DefaultErrorHandler is the default way to handle errors by a scheduler. It simply prints them to stdout

View Source
var DefaultHistogramSize = 80

DefaultHistogramSize is the default number of windows RollingBucket uses for created histograms

View Source
var DefaultMaxBufferSize = 100

DefaultMaxBufferSize is the default number of past bucket Quantile values RollingBucket saves until a Datapoints() call

View Source
var DefaultQuantiles = []float64{.25, .5, .9, .99}

DefaultQuantiles are the default set of percentiles RollingBucket should collect

View Source
var DefaultUserAgent = fmt.Sprintf("golib-sfxclient/%s (gover %s)", ClientVersion, runtime.Version())

DefaultUserAgent is the UserAgent string sent to signalfx

Functions

func Counter

func Counter(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint

Counter creates a SignalFx counter for integer values, incrementing by a set value. Generally, it is preferable to use Cumulative Counters when possible.

func Cumulative

func Cumulative(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint

Cumulative creates a SignalFx cumulative counter for integer values.

func CumulativeF

func CumulativeF(metricName string, dimensions map[string]string, val float64) *datapoint.Datapoint

CumulativeF creates a SignalFx cumulative counter for float values.

func CumulativeP

func CumulativeP(metricName string, dimensions map[string]string, val *int64) *datapoint.Datapoint

CumulativeP creates a SignalFx cumulative counter for integer values from a pointer that is loaded atomically.

Example
client := NewHTTPSink()
ctx := context.Background()
var countThing int64
go func() {
	atomic.AddInt64(&countThing, 1)
}()
if err := client.AddDatapoints(ctx, []*datapoint.Datapoint{
	CumulativeP("server.request_count", nil, &countThing),
}); err != nil {
	panic("Could not send datapoints")
}
Output:

func Gauge

func Gauge(metricName string, dimensions map[string]string, val int64) *datapoint.Datapoint

Gauge creates a SignalFx gauge for integer values.

func GaugeF

func GaugeF(metricName string, dimensions map[string]string, val float64) *datapoint.Datapoint

GaugeF creates a SignalFx gauge for floating point values.

Types

type AsyncMultiTokenSink added in v1.1.0

type AsyncMultiTokenSink struct {
	ShutdownTimeout time.Duration // ShutdownTimeout is how long the sink should wait before timing out after Close() is called

	Hasher hash.Hash32 // Hasher is used to hash access tokens to a worker

	NewHTTPClient func() *http.Client // function used to create an http client for the underlying sinks
	// contains filtered or unexported fields
}

AsyncMultiTokenSink asynchronously sends datapoints for multiple tokens

func NewAsyncMultiTokenSink added in v1.1.0

func NewAsyncMultiTokenSink(numChannels int64, numDrainingThreads int64, buffer int, batchSize int, DatapointEndpoint, EventEndpoint, TraceEndpoint, userAgent string, httpClient func() *http.Client, errorHandler func(error) error, maxRetry int) *AsyncMultiTokenSink

NewAsyncMultiTokenSink returns a sink that asynchronously emits datapoints with different tokens

func (*AsyncMultiTokenSink) AddDatapoints added in v1.1.0

func (a *AsyncMultiTokenSink) AddDatapoints(ctx context.Context, datapoints []*datapoint.Datapoint) (err error)

AddDatapoints add datepoints to the multitoken sync using a context that has the TokenCtxKey

func (*AsyncMultiTokenSink) AddDatapointsWithToken added in v1.1.0

func (a *AsyncMultiTokenSink) AddDatapointsWithToken(token string, datapoints []*datapoint.Datapoint) (err error)

AddDatapointsWithToken emits a list of datapoints using a supplied token

func (*AsyncMultiTokenSink) AddEvents added in v1.1.0

func (a *AsyncMultiTokenSink) AddEvents(ctx context.Context, events []*event.Event) (err error)

AddEvents add datepoints to the multitoken sync using a context that has the TokenCtxKey

func (*AsyncMultiTokenSink) AddEventsWithToken added in v1.1.0

func (a *AsyncMultiTokenSink) AddEventsWithToken(token string, events []*event.Event) (err error)

AddEventsWithToken emits a list of events using a supplied token

func (*AsyncMultiTokenSink) AddSpans

func (a *AsyncMultiTokenSink) AddSpans(ctx context.Context, spans []*trace.Span) (err error)

AddSpans add datepoints to the multitoken sync using a context that has the TokenCtxKey

func (*AsyncMultiTokenSink) AddSpansWithToken

func (a *AsyncMultiTokenSink) AddSpansWithToken(token string, spans []*trace.Span) (err error)

AddSpansWithToken emits a list of events using a supplied token

func (*AsyncMultiTokenSink) Close added in v1.1.0

func (a *AsyncMultiTokenSink) Close() (err error)

Close stops the existing workers and prevents additional datapoints from being added if a ShutdownTimeout is set on the sink, it will be used as a timeout for closing the sink the default timeout is 5 seconds

func (*AsyncMultiTokenSink) Datapoints added in v1.1.0

func (a *AsyncMultiTokenSink) Datapoints() (dps []*datapoint.Datapoint)

Datapoints returns a set of datapoints about the sink

type AsyncTokenStatusCounter added in v1.1.0

type AsyncTokenStatusCounter struct {
	// contains filtered or unexported fields
}

AsyncTokenStatusCounter is a counter and collector for http statuses by token

func NewAsyncTokenStatusCounter added in v1.1.0

func NewAsyncTokenStatusCounter(name string, buffer int, numWorkers int64, defaultDims map[string]string) *AsyncTokenStatusCounter

NewAsyncTokenStatusCounter returns a structure for counting occurences of http statuses by token

func (*AsyncTokenStatusCounter) Datapoints added in v1.1.0

func (a *AsyncTokenStatusCounter) Datapoints() (dps []*datapoint.Datapoint)

Datapoints returns datapoints for each token and status

func (*AsyncTokenStatusCounter) Increment added in v1.1.0

func (a *AsyncTokenStatusCounter) Increment(status *tokenStatus)

Increment adds a tokenStatus object to the counter

type Collector

type Collector interface {
	Datapoints() []*datapoint.Datapoint
}

Collector is anything Scheduler can track that emits points

var GoMetricsSource Collector = &goMetrics{}

GoMetricsSource is a singleton Collector that collects basic go system stats. It currently collects from runtime.ReadMemStats and adds a few extra metrics like uptime of the process and other runtime package functions.

func NewMultiCollector

func NewMultiCollector(collectors ...Collector) Collector

NewMultiCollector returns a collector that is the aggregate of every given collector. It can be used to turn multiple collectors into a single collector.

Example
var a Collector
var b Collector
c := NewMultiCollector(a, b)
c.Datapoints()
Output:

type ContextKey added in v1.1.0

type ContextKey string

ContextKey is a custom key type for context values

const (
	// TokenCtxKey is a context key for tokens
	TokenCtxKey ContextKey = TokenHeaderName
)

type CumulativeBucket

type CumulativeBucket struct {
	MetricName string
	Dimensions map[string]string
	// contains filtered or unexported fields
}

A CumulativeBucket tracks groups of values, reporting the count/sum/sum of squares as a cumulative counter.

Example
cb := &CumulativeBucket{
	MetricName: "mname",
	Dimensions: map[string]string{"type": "dev"},
}
cb.Add(1)
cb.Add(3)
client := NewHTTPSink()
ctx := context.Background()
// Will expect it to send count=2, sum=4, sumofsquare=10
log.IfErr(log.Panic, client.AddDatapoints(ctx, cb.Datapoints()))
Output:

func (*CumulativeBucket) Add

func (b *CumulativeBucket) Add(val int64)

Add an item to the bucket, later reporting the result in the next report cycle.

func (*CumulativeBucket) Datapoints

func (b *CumulativeBucket) Datapoints() []*datapoint.Datapoint

Datapoints returns the count/sum/sumsquare datapoints, or nil if there is no set metric name

func (*CumulativeBucket) MultiAdd

func (b *CumulativeBucket) MultiAdd(res *Result)

MultiAdd many items into the bucket at once using a Result. This can be more efficient as it involves only a constant number of atomic operations.

type HTTPSink

type HTTPSink struct {
	AuthToken         string
	UserAgent         string
	EventEndpoint     string
	DatapointEndpoint string
	TraceEndpoint     string
	AdditionalHeaders map[string]string
	ResponseCallback  func(resp *http.Response, responseBody []byte)
	Client            *http.Client

	DisableCompression bool
	// contains filtered or unexported fields
}

HTTPSink -

Example
sink := NewHTTPSink()
sink.AuthToken = "ABCDEFG"
ctx := context.Background()
err := sink.AddDatapoints(ctx, []*datapoint.Datapoint{
	// Sending a gauge with the value 1.2
	GaugeF("a.gauge", nil, 1.2),
	// Sending a cumulative counter with dimensions
	Cumulative("a.counter", map[string]string{"type": "dev"}, 100),
})
if err != nil {
	panic(err)
}
Output:

func NewHTTPSink

func NewHTTPSink() *HTTPSink

NewHTTPSink creates a default NewHTTPSink using package level constants as defaults, including an empty auth token. If sending directly to SignalFx, you will be required to explicitly set the AuthToken

func (*HTTPSink) AddDatapoints

func (h *HTTPSink) AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) (err error)

AddDatapoints forwards the datapoints to SignalFx.

func (*HTTPSink) AddEvents

func (h *HTTPSink) AddEvents(ctx context.Context, events []*event.Event) (err error)

AddEvents forwards the events to SignalFx.

func (*HTTPSink) AddSpans added in v1.1.5

func (h *HTTPSink) AddSpans(ctx context.Context, traces []*trace.Span) (err error)

AddSpans forwards the traces to SignalFx.

type HashableCollector

type HashableCollector struct {
	Callback func() []*datapoint.Datapoint
}

HashableCollector is a Collector function that can be inserted into a hashmap. You can use it to wrap a functional callback and insert it into a Scheduler.

func CollectorFunc

func CollectorFunc(callback func() []*datapoint.Datapoint) *HashableCollector

CollectorFunc wraps a function to make it a Collector.

func (*HashableCollector) Datapoints

func (h *HashableCollector) Datapoints() []*datapoint.Datapoint

Datapoints calls the wrapped function.

type MultiCollector

type MultiCollector []Collector

MultiCollector acts like a datapoint collector over multiple collectors.

func (MultiCollector) Datapoints

func (m MultiCollector) Datapoints() []*datapoint.Datapoint

Datapoints returns the datapoints from every collector.

type Result

type Result struct {
	Count        int64
	Sum          int64
	SumOfSquares float64
}

Result is a cumulated result of items that can be added to a CumulativeBucket at once

func (*Result) Add

func (r *Result) Add(val int64)

Add a single number to the bucket. This does not use atomic operations and is not thread safe, but adding a finished Result into a CumulativeBucket is thread safe.

type RollingBucket

type RollingBucket struct {
	// MetricName is the metric name used when the RollingBucket is reported to SignalFx
	MetricName string
	// Dimensions are the dimensions used when the RollingBucket is reported to SignalFx
	Dimensions map[string]string
	// Quantiles are an array of values [0 - 1.0] that are the histogram quantiles reported to
	// SignalFx during a Datapoints() call.  For example, [.5] would only report the median.
	Quantiles []float64
	// BucketWidth is how long in time a bucket accumulates values before a flush is forced
	BucketWidth time.Duration
	// Hist is an efficient tracker of numeric values for a histogram
	Hist *gohistogram.NumericHistogram
	// MaxFlushBufferSize is the maximum size of a window to keep for the RollingBucket before
	// quantiles are dropped.  It is ideally close to len(quantiles) * 3 + 15
	MaxFlushBufferSize int
	// Timer is used to track time.Now() during default value add calls
	Timer timekeeper.TimeKeeper
	// contains filtered or unexported fields
}

RollingBucket keeps histogram style metrics over a BucketWidth window of time. It allows users to collect and report percentile metrics like like median or p99, as well as min/max/sum/count and sum of square from a set of points.

func NewRollingBucket

func NewRollingBucket(metricName string, dimensions map[string]string) *RollingBucket

NewRollingBucket creates a new RollingBucket using default values for Quantiles, BucketWidth, and the histogram tracker.

func (*RollingBucket) Add

func (r *RollingBucket) Add(v float64)

Add a value to the rolling bucket histogram. If the current time is already calculated, it may be more efficient to call AddAt in order to save another time.Time() call.

func (*RollingBucket) AddAt

func (r *RollingBucket) AddAt(v float64, t time.Time)

AddAt is like Add but also takes a time to pretend the value comes at.

func (*RollingBucket) Datapoints

func (r *RollingBucket) Datapoints() []*datapoint.Datapoint

Datapoints returns basic bucket stats every time and will only the first time called for each window return that window's points. For efficiency sake, Datapoints() will only return histogram window values once. Because of this, it is suggested to always forward datapoints returned by this call to SignalFx.

type SFXAPIError added in v1.1.0

type SFXAPIError struct {
	StatusCode   int
	ResponseBody string
}

SFXAPIError is returned when the API returns a status code other than 200.

func (SFXAPIError) Error added in v1.1.0

func (se SFXAPIError) Error() string

type Scheduler

type Scheduler struct {
	Sink         Sink
	Timer        timekeeper.TimeKeeper
	SendZeroTime bool

	ErrorHandler       func(error) error
	ReportingDelayNs   int64
	ReportingTimeoutNs int64

	Prefix string
	// contains filtered or unexported fields
}

A Scheduler reports metrics to SignalFx at some timely manner.

Example
s := NewScheduler()
s.Sink.(*HTTPSink).AuthToken = "ABCD-XYZ"
s.AddCallback(GoMetricsSource)
bucket := NewRollingBucket("req.time", map[string]string{"env": "test"})
s.AddCallback(bucket)
bucket.Add(1.2)
bucket.Add(3)
ctx := context.Background()
err := s.Schedule(ctx)
fmt.Println("Schedule result: ", err)
Output:

func NewScheduler

func NewScheduler() *Scheduler

NewScheduler creates a default SignalFx scheduler that can report metrics to SignalFx at some interval.

func (*Scheduler) AddCallback

func (s *Scheduler) AddCallback(db Collector)

AddCallback adds a collector to the default group.

func (*Scheduler) AddGroupedCallback

func (s *Scheduler) AddGroupedCallback(group string, db Collector)

AddGroupedCallback adds a collector to a specific group.

func (*Scheduler) CollectDatapoints

func (s *Scheduler) CollectDatapoints() []*datapoint.Datapoint

CollectDatapoints gives a scheduler an external endpoint to be called and is thread safe

func (*Scheduler) Debug

func (s *Scheduler) Debug(debug bool)

Debug used for debugging collectDatapoints()

func (*Scheduler) DefaultDimensions

func (s *Scheduler) DefaultDimensions(dims map[string]string)

DefaultDimensions adds a dimension map that are appended to all metrics in the default group.

func (*Scheduler) GroupedDefaultDimensions

func (s *Scheduler) GroupedDefaultDimensions(group string, dims map[string]string)

GroupedDefaultDimensions adds default dimensions to a specific group.

func (*Scheduler) RemoveCallback

func (s *Scheduler) RemoveCallback(db Collector)

RemoveCallback removes a collector from the default group.

func (*Scheduler) RemoveGroupedCallback

func (s *Scheduler) RemoveGroupedCallback(group string, db Collector)

RemoveGroupedCallback removes a collector from a specific group.

func (*Scheduler) ReportOnce

func (s *Scheduler) ReportOnce(ctx context.Context) error

ReportOnce will report any metrics saved in this reporter to SignalFx

func (*Scheduler) ReportingDelay

func (s *Scheduler) ReportingDelay(delay time.Duration)

ReportingDelay sets the interval metrics are reported to SignalFx.

func (*Scheduler) ReportingTimeout

func (s *Scheduler) ReportingTimeout(timeout time.Duration)

ReportingTimeout sets the timeout value if reporting to SignalFx is not completed within this duration

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context) error

Schedule will run until either the ErrorHandler returns an error or the context is canceled. This is intended to be run inside a goroutine.

func (*Scheduler) Var

func (s *Scheduler) Var() expvar.Var

Var returns an expvar variable that prints the values of the previously reported datapoints.

type Sink

type Sink interface {
	AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) (err error)
}

Sink is anything that can receive points collected by a Scheduler. This can be useful for stubbing out your collector to test the points that will be sent to SignalFx.

type TimeCounter

type TimeCounter struct {
	NsBarrier int64
	Above     int64
	Below     int64
}

TimeCounter counts durations exactly above/below a value. NsBarrier is expected to be some amount of nanoseconds that the barrier exists at

func (*TimeCounter) Add

func (t *TimeCounter) Add(dur time.Duration)

Add includes a datapoint in the source, incrementing above or below according to barrier

func (*TimeCounter) Collector

func (t *TimeCounter) Collector(metric string) Collector

Collector returns a datapoint collector for this time counter that uses the given metric name.

type WithDimensions

type WithDimensions struct {
	Dimensions map[string]string
	Collector  Collector
}

WithDimensions adds dimensions on top of the datapoints of a collector. This can be used to take an existing Collector and include extra dimensions.

Example
sched := NewScheduler()
sched.AddCallback(&WithDimensions{
	Collector: GoMetricsSource,
	Dimensions: map[string]string{
		"extra": "dimension",
	},
})
Output:

func (*WithDimensions) Datapoints

func (w *WithDimensions) Datapoints() []*datapoint.Datapoint

Datapoints calls datapoints and adds on Dimensions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL