Documentation ¶
Index ¶
- Constants
- Variables
- type AggregateFn
- type DynamicAggregateFn
- type Event
- type EventDB
- type EventMetrics
- func (m *EventMetrics) AggregateMetric(stream string, tags map[string]string, period time.Duration, agg AggregateFn) error
- func (m *EventMetrics) Close() error
- func (m *EventMetrics) ComputeStatsMetric(stream string, tags map[string]string, period time.Duration, ...) error
- func (m *EventMetrics) DynamicMetric(stream string, tags map[string]string, period time.Duration, ...) error
- func (m *EventMetrics) GetEventStream(name string) *EventStream
- func (m *EventMetrics) LogMetrics()
- func (m *EventMetrics) Start(ctx context.Context)
- func (m *EventMetrics) UpdateMetrics() error
- type EventStream
- func (s *EventStream) AggregateMetric(tags map[string]string, period time.Duration, agg AggregateFn) error
- func (s *EventStream) Append(data []byte) error
- func (s *EventStream) ComputeStatsMetric(tags map[string]string, period time.Duration, obs ObservationsFn) error
- func (s *EventStream) DynamicMetric(tags map[string]string, period time.Duration, agg DynamicAggregateFn) error
- func (s *EventStream) Insert(e *Event) error
- func (s *EventStream) Range(start, end time.Time) ([]*Event, error)
- type ObservationsFn
Constants ¶
const ( // We use a single BigTable table to store all event metrics. BT_TABLE = "metrics-eventdb" // We use a single BigTable column family. BT_COLUMN_FAMILY = "EVTS" // We use a single BigTable column which stores gob-encoded Events. BT_COLUMN = "EVT" INSERT_TIMEOUT = 30 * time.Second QUERY_TIMEOUT = 3 * 60 * time.Second )
Variables ¶
var ( // Fully-qualified BigTable column name. BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN) )
var ( // Callers may not use these tag keys. RESERVED_TAGS = []string{tagPeriod, tagStream} )
Functions ¶
This section is empty.
Types ¶
type AggregateFn ¶
AggregateFn is a function which reduces a number of Events into a single data point.
type DynamicAggregateFn ¶
DynamicAggregateFn is a function which reduces a number of Events into a several data points, each with its own set of tags. The aggregation function may return multiple data points, each with its own set of tags. Each tag set comprises its own metric, and the aggregation function may return different tag sets at each iteration, eg. due to events with different properties occurring within each time period.
type EventDB ¶
type EventDB interface { // Append inserts an Event with the given data into the given stream at // the current time. Append(string, []byte) error // Close frees up resources used by the eventDB. Close() error // Insert inserts the given Event into DB. Insert(*Event) error // Range returns all Events in the given range from the given stream. // The beginning of the range is inclusive, while the end is exclusive. Range(string, time.Time, time.Time) ([]*Event, error) }
EventDB is an interface used for storing Events in a BoltDB.
func NewBTEventDB ¶
func NewBTEventDB(ctx context.Context, btProject, btInstance string, ts oauth2.TokenSource) (EventDB, error)
NewBTEventDB returns an EventDB which is backed by BigTable.
type EventMetrics ¶
type EventMetrics struct {
// contains filtered or unexported fields
}
EventMetrics is a struct used for creating aggregate metrics on streams of events.
func NewEventMetrics ¶
func NewEventMetrics(db EventDB, measurement string) (*EventMetrics, error)
NewEventMetrics returns an EventMetrics instance.
func (*EventMetrics) AggregateMetric ¶
func (m *EventMetrics) AggregateMetric(stream string, tags map[string]string, period time.Duration, agg AggregateFn) error
AggregateMetric sets the given aggregation function on the event stream and adds a gauge for it. For example, to compute the sum of all int64 events over a 24-hour period:
s.AggregateMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (float64, error) { sum := int64(0) for _, e := range ev { sum += decodeInt64(e) } return float64(sum), nil })
func (*EventMetrics) ComputeStatsMetric ¶
func (m *EventMetrics) ComputeStatsMetric(stream string, tags map[string]string, period time.Duration, obs ObservationsFn) error
ComputeStatsMetric sets the given observation function on the event stream. Gauges will be added for various aggregation types on the observations generated by the function, eg. mean, standard deviation, quantiles, etc. For example, to compute statistics for duration of all events over a 24-hour period:
s.ComputeStatsMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) { vals := make([]float64, 0, len(ev)) for _, e := range ev { data := decodeEvent() vals = append(vals, float64(data.End.Sub(data.Start))) } return vals })
func (*EventMetrics) DynamicMetric ¶
func (m *EventMetrics) DynamicMetric(stream string, tags map[string]string, period time.Duration, agg DynamicAggregateFn) error
DynamicMetric sets the given aggregation function on the event stream. Gauges will be added and removed dynamically based on the results of the aggregation function. Here's a toy example:
s.DynamicMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) { counts := map[string]int64{} for _, e := range ev { counts[fmt.Sprintf("%d", decodeInt64(e))]++ } rv := make(map[string]float64, len(counts)) for k, v := range counts { rv[k] = float64(v) } return rv })
func (*EventMetrics) GetEventStream ¶
func (m *EventMetrics) GetEventStream(name string) *EventStream
GetEventStream returns an EventStream instance.
func (*EventMetrics) LogMetrics ¶
func (m *EventMetrics) LogMetrics()
LogMetrics logs the current values for all metrics.
func (*EventMetrics) Start ¶
func (m *EventMetrics) Start(ctx context.Context)
Start initiates the EventMetrics goroutines.
func (*EventMetrics) UpdateMetrics ¶
func (m *EventMetrics) UpdateMetrics() error
UpdateMetrics recalculates values for all metrics.
type EventStream ¶
type EventStream struct {
// contains filtered or unexported fields
}
EventStream is a struct which deals with a single stream of related events.
func (*EventStream) AggregateMetric ¶
func (s *EventStream) AggregateMetric(tags map[string]string, period time.Duration, agg AggregateFn) error
AggregateMetric sets the given aggregation function on the event stream and adds a gauge for it. For example, to compute the sum of all int64 events over a 24-hour period:
s.AggregateMetric(myTags, 24*time.Hour, func(ev []*Event) (float64, error) { sum := int64(0) for _, e := range ev { sum += decodeInt64(e) } return float64(sum), nil })
func (*EventStream) Append ¶
func (s *EventStream) Append(data []byte) error
Append adds the given data to the stream at the current time.
func (*EventStream) ComputeStatsMetric ¶
func (s *EventStream) ComputeStatsMetric(tags map[string]string, period time.Duration, obs ObservationsFn) error
ComputeStatsMetric sets the given observation function on the event stream. Gauges will be added for various aggregation types on the observations generated by the function, eg. mean, standard deviation, quantiles, etc. For example, to compute statistics for duration of all events over a 24-hour period:
s.ComputeStatsMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) { vals := make([]float64, 0, len(ev)) for _, e := range ev { data := decodeEvent() vals = append(vals, float64(data.End.Sub(data.Start))) } return vals })
func (*EventStream) DynamicMetric ¶
func (s *EventStream) DynamicMetric(tags map[string]string, period time.Duration, agg DynamicAggregateFn) error
DynamicMetric sets the given aggregation function on the event stream. Gauges will be added and removed dynamically based on the results of the aggregation function. Here's a toy example:
s.DynamicMetric(myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) { counts := map[int64]int64{} for _, e := range ev { counts[decodeInt64(e)]++ } rv := make(map[string]float64, len(counts)) for k, v := range counts { rv[fmt.Sprintf("%d", k)] = float64(v) } return rv })
func (*EventStream) Insert ¶
func (s *EventStream) Insert(e *Event) error
Insert inserts the Event into the stream. Overrides any Event at the given timestamp.
type ObservationsFn ¶
ObservationsFn is a function which derives a number of float64 observations from a set of Events. The number of observations does not need to equal the number of Events.