events

package
v0.0.0-...-33de393 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2021 License: BSD-3-Clause Imports: 12 Imported by: 3

Documentation

Index

Constants

View Source
const (

	// We use a single BigTable table to store all event metrics.
	BT_TABLE = "metrics-eventdb"

	// We use a single BigTable column family.
	BT_COLUMN_FAMILY = "EVTS"

	// We use a single BigTable column which stores gob-encoded Events.
	BT_COLUMN = "EVT"

	INSERT_TIMEOUT = 30 * time.Second
	QUERY_TIMEOUT  = 3 * 60 * time.Second
)
View Source
const (
	// Tag key indicating the time period over which a metric is calculated.
	TAG_PERIOD = "period"

	// Tag key indicating which event stream a metric is calculated from.
	TAG_STREAM = "stream"
)

Variables

View Source
var (
	// Fully-qualified BigTable column name.
	BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN)
)
View Source
var (
	// Callers may not use these tag keys.
	RESERVED_TAGS = []string{TAG_PERIOD, TAG_STREAM}
)

Functions

This section is empty.

Types

type AggregateFn

type AggregateFn func([]*Event) (float64, error)

AggregateFn is a function which reduces a number of Events into a single data point.

type DynamicAggregateFn

type DynamicAggregateFn func([]*Event) ([]map[string]string, []float64, error)

DynamicAggregateFn is a function which reduces a number of Events into a several data points, each with its own set of tags. The aggregation function may return multiple data points, each with its own set of tags. Each tag set comprises its own metric, and the aggregation function may return different tag sets at each iteration, eg. due to events with different properties occurring within each time period.

type Event

type Event struct {
	Stream    string
	Timestamp time.Time
	Data      []byte
}

Event contains some metadata plus whatever data the user chooses.

type EventDB

type EventDB interface {
	// Append inserts an Event with the given data into the given stream at
	// the current time.
	Append(string, []byte) error
	// Close frees up resources used by the eventDB.
	Close() error
	// Insert inserts the given Event into DB.
	Insert(*Event) error
	// Range returns all Events in the given range from the given stream.
	// The beginning of the range is inclusive, while the end is exclusive.
	Range(string, time.Time, time.Time) ([]*Event, error)
}

EventDB is an interface used for storing Events in a BoltDB.

func NewBTEventDB

func NewBTEventDB(ctx context.Context, btProject, btInstance string, ts oauth2.TokenSource) (EventDB, error)

NewBTEventDB returns an EventDB which is backed by BigTable.

type EventMetrics

type EventMetrics struct {
	// contains filtered or unexported fields
}

EventMetrics is a struct used for creating aggregate metrics on streams of events.

func NewEventMetrics

func NewEventMetrics(db EventDB, measurement string) (*EventMetrics, error)

NewEventMetrics returns an EventMetrics instance.

func (*EventMetrics) AggregateMetric

func (m *EventMetrics) AggregateMetric(stream string, tags map[string]string, period time.Duration, agg AggregateFn) error

AggregateMetric sets the given aggregation function on the event stream and adds a gauge for it. For example, to compute the sum of all int64 events over a 24-hour period:

s.AggregateMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (float64, error) {
        sum := int64(0)
        for _, e := range ev {
                sum += decodeInt64(e)
        }
        return float64(sum), nil
})

func (*EventMetrics) Close

func (m *EventMetrics) Close() error

Close cleans up the EventMetrics.

func (*EventMetrics) DynamicMetric

func (m *EventMetrics) DynamicMetric(stream string, tags map[string]string, period time.Duration, agg DynamicAggregateFn) error

DynamicMetric sets the given aggregation function on the event stream. Gauges will be added and removed dynamically based on the results of the aggregation function. Here's a toy example:

s.DynamicMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) {
	counts := map[string]int64{}
	for _, e := range ev {
		counts[fmt.Sprintf("%d", decodeInt64(e))]++
	}
	rv := make(map[string]float64, len(counts))
	for k, v := range counts {
		rv[k] = float64(v)
	}
	return rv
})

func (*EventMetrics) GetEventStream

func (m *EventMetrics) GetEventStream(name string) *EventStream

GetEventStream returns an EventStream instance.

func (*EventMetrics) LogMetrics

func (m *EventMetrics) LogMetrics()

LogMetrics logs the current values for all metrics.

func (*EventMetrics) Start

func (m *EventMetrics) Start(ctx context.Context)

Start initiates the EventMetrics goroutines.

func (*EventMetrics) UpdateMetrics

func (m *EventMetrics) UpdateMetrics() error

UpdateMetrics recalculates values for all metrics.

type EventStream

type EventStream struct {
	// contains filtered or unexported fields
}

EventStream is a struct which deals with a single stream of related events.

func (*EventStream) AggregateMetric

func (s *EventStream) AggregateMetric(tags map[string]string, period time.Duration, agg AggregateFn) error

AggregateMetric sets the given aggregation function on the event stream and adds a gauge for it. For example, to compute the sum of all int64 events over a 24-hour period:

s.AggregateMetric(myTags, 24*time.Hour, func(ev []*Event) (float64, error) {
        sum := int64(0)
        for _, e := range ev {
                sum += decodeInt64(e)
        }
        return float64(sum), nil
})

func (*EventStream) Append

func (s *EventStream) Append(data []byte) error

Append adds the given data to the stream at the current time.

func (*EventStream) DynamicMetric

func (s *EventStream) DynamicMetric(tags map[string]string, period time.Duration, agg DynamicAggregateFn) error

DynamicMetric sets the given aggregation function on the event stream. Gauges will be added and removed dynamically based on the results of the aggregation function. Here's a toy example:

s.DynamicMetric(myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) {
        counts := map[int64]int64{}
        for _, e := range ev {
                counts[decodeInt64(e)]++
        }
        rv := make(map[string]float64, len(counts))
        for k, v := range counts {
                rv[fmt.Sprintf("%d", k)] = float64(v)
        }
        return rv
})

func (*EventStream) Insert

func (s *EventStream) Insert(e *Event) error

Insert inserts the Event into the stream. Overrides any Event at the given timestamp.

func (*EventStream) Range

func (s *EventStream) Range(start, end time.Time) ([]*Event, error)

Range returns all Events in the given range.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL