Documentation ¶
Index ¶
- Constants
- Variables
- type AggregateFn
- type DynamicAggregateFn
- type Event
- type EventDB
- type EventMetrics
- func (m *EventMetrics) AggregateMetric(stream string, tags map[string]string, period time.Duration, agg AggregateFn) error
- func (m *EventMetrics) Close() error
- func (m *EventMetrics) DynamicMetric(stream string, tags map[string]string, period time.Duration, ...) error
- func (m *EventMetrics) GetEventStream(name string) *EventStream
- func (m *EventMetrics) LogMetrics()
- func (m *EventMetrics) Start(ctx context.Context)
- func (m *EventMetrics) UpdateMetrics() error
- type EventStream
- func (s *EventStream) AggregateMetric(tags map[string]string, period time.Duration, agg AggregateFn) error
- func (s *EventStream) Append(data []byte) error
- func (s *EventStream) DynamicMetric(tags map[string]string, period time.Duration, agg DynamicAggregateFn) error
- func (s *EventStream) Insert(e *Event) error
- func (s *EventStream) Range(start, end time.Time) ([]*Event, error)
Constants ¶
const ( // We use a single BigTable table to store all event metrics. BT_TABLE = "metrics-eventdb" // We use a single BigTable column family. BT_COLUMN_FAMILY = "EVTS" // We use a single BigTable column which stores gob-encoded Events. BT_COLUMN = "EVT" INSERT_TIMEOUT = 30 * time.Second QUERY_TIMEOUT = 3 * 60 * time.Second )
const ( // Tag key indicating the time period over which a metric is calculated. TAG_PERIOD = "period" // Tag key indicating which event stream a metric is calculated from. TAG_STREAM = "stream" )
Variables ¶
var ( // Fully-qualified BigTable column name. BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN) )
var ( // Callers may not use these tag keys. RESERVED_TAGS = []string{TAG_PERIOD, TAG_STREAM} )
Functions ¶
This section is empty.
Types ¶
type AggregateFn ¶
AggregateFn is a function which reduces a number of Events into a single data point.
type DynamicAggregateFn ¶
DynamicAggregateFn is a function which reduces a number of Events into a several data points, each with its own set of tags. The aggregation function may return multiple data points, each with its own set of tags. Each tag set comprises its own metric, and the aggregation function may return different tag sets at each iteration, eg. due to events with different properties occurring within each time period.
type EventDB ¶
type EventDB interface { // Append inserts an Event with the given data into the given stream at // the current time. Append(string, []byte) error // Close frees up resources used by the eventDB. Close() error // Insert inserts the given Event into DB. Insert(*Event) error // Range returns all Events in the given range from the given stream. // The beginning of the range is inclusive, while the end is exclusive. Range(string, time.Time, time.Time) ([]*Event, error) }
EventDB is an interface used for storing Events in a BoltDB.
func NewBTEventDB ¶
func NewBTEventDB(ctx context.Context, btProject, btInstance string, ts oauth2.TokenSource) (EventDB, error)
NewBTEventDB returns an EventDB which is backed by BigTable.
type EventMetrics ¶
type EventMetrics struct {
// contains filtered or unexported fields
}
EventMetrics is a struct used for creating aggregate metrics on streams of events.
func NewEventMetrics ¶
func NewEventMetrics(db EventDB, measurement string) (*EventMetrics, error)
NewEventMetrics returns an EventMetrics instance.
func (*EventMetrics) AggregateMetric ¶
func (m *EventMetrics) AggregateMetric(stream string, tags map[string]string, period time.Duration, agg AggregateFn) error
AggregateMetric sets the given aggregation function on the event stream and adds a gauge for it. For example, to compute the sum of all int64 events over a 24-hour period:
s.AggregateMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (float64, error) { sum := int64(0) for _, e := range ev { sum += decodeInt64(e) } return float64(sum), nil })
func (*EventMetrics) DynamicMetric ¶
func (m *EventMetrics) DynamicMetric(stream string, tags map[string]string, period time.Duration, agg DynamicAggregateFn) error
DynamicMetric sets the given aggregation function on the event stream. Gauges will be added and removed dynamically based on the results of the aggregation function. Here's a toy example:
s.DynamicMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) { counts := map[string]int64{} for _, e := range ev { counts[fmt.Sprintf("%d", decodeInt64(e))]++ } rv := make(map[string]float64, len(counts)) for k, v := range counts { rv[k] = float64(v) } return rv })
func (*EventMetrics) GetEventStream ¶
func (m *EventMetrics) GetEventStream(name string) *EventStream
GetEventStream returns an EventStream instance.
func (*EventMetrics) LogMetrics ¶
func (m *EventMetrics) LogMetrics()
LogMetrics logs the current values for all metrics.
func (*EventMetrics) Start ¶
func (m *EventMetrics) Start(ctx context.Context)
Start initiates the EventMetrics goroutines.
func (*EventMetrics) UpdateMetrics ¶
func (m *EventMetrics) UpdateMetrics() error
UpdateMetrics recalculates values for all metrics.
type EventStream ¶
type EventStream struct {
// contains filtered or unexported fields
}
EventStream is a struct which deals with a single stream of related events.
func (*EventStream) AggregateMetric ¶
func (s *EventStream) AggregateMetric(tags map[string]string, period time.Duration, agg AggregateFn) error
AggregateMetric sets the given aggregation function on the event stream and adds a gauge for it. For example, to compute the sum of all int64 events over a 24-hour period:
s.AggregateMetric(myTags, 24*time.Hour, func(ev []*Event) (float64, error) { sum := int64(0) for _, e := range ev { sum += decodeInt64(e) } return float64(sum), nil })
func (*EventStream) Append ¶
func (s *EventStream) Append(data []byte) error
Append adds the given data to the stream at the current time.
func (*EventStream) DynamicMetric ¶
func (s *EventStream) DynamicMetric(tags map[string]string, period time.Duration, agg DynamicAggregateFn) error
DynamicMetric sets the given aggregation function on the event stream. Gauges will be added and removed dynamically based on the results of the aggregation function. Here's a toy example:
s.DynamicMetric(myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) { counts := map[int64]int64{} for _, e := range ev { counts[decodeInt64(e)]++ } rv := make(map[string]float64, len(counts)) for k, v := range counts { rv[fmt.Sprintf("%d", k)] = float64(v) } return rv })
func (*EventStream) Insert ¶
func (s *EventStream) Insert(e *Event) error
Insert inserts the Event into the stream. Overrides any Event at the given timestamp.