series

package
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2023 License: Apache-2.0 Imports: 29 Imported by: 2

Documentation

Overview

Package series provides tools for collecting and aggregating timeseries events as part of the logging infrastructure.

The series "system" includes a few basic types and concepts: an "event" which is a single data point, a Metric which is a single series of datapoints, and a Collector which is responsible for tracking and publishing metrics.

In general, as a developer, to use grip/series for your metrics: you configure a series.Collector, and embed it in your grip sending pipeline, and then embed metric events in your message.

The x/metrics package contains message types that use github.com/shirou/gopsutil to collect and generate structred logging messages with metrics information. These tools also integrate with the `tychoish/birch` bson library and it's `birch/x/ftdc` timeseries compression format. Additionally, `bson` formatted output renders for metric events are also provided here.

WARNING: This implementation is alpha quality at the moment. Pull requests welcome.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GoRuntimeEventProducer

func GoRuntimeEventProducer(labels ...dt.Pair[string, string]) fun.Producer[[]*Event]

func RenderHistogramJSON

func RenderHistogramJSON(
	buf *bytes.Buffer,
	key string,
	labels fun.Future[[]byte],
	sample *dt.Pairs[float64, int64],
	ts time.Time,
)

func RenderLabelsGraphite

func RenderLabelsGraphite(builder *bytes.Buffer, labels []dt.Pair[string, string], extra ...dt.Pair[string, string])

func RenderLabelsJSON

func RenderLabelsJSON(buf *bytes.Buffer, labels []dt.Pair[string, string], extra ...dt.Pair[string, string])

func RenderLabelsOpenTSB

func RenderLabelsOpenTSB(builder *bytes.Buffer, labels []dt.Pair[string, string], extra ...dt.Pair[string, string])

func RenderMetricGraphite

func RenderMetricGraphite(buf *bytes.Buffer, key string, labels fun.Future[[]byte], value int64, ts time.Time)

func RenderMetricJSON

func RenderMetricJSON(buf *bytes.Buffer, key string, labels fun.Future[[]byte], value int64, ts time.Time)

func RenderMetricOpenTSB

func RenderMetricOpenTSB(buf *bytes.Buffer, key string, labels fun.Future[[]byte], value int64, ts time.Time)

func Sender

func Sender(s send.Sender, coll *Collector) send.Sender

Sender wraps a send.Sender and a collector and unifies them: if there are any events wrapped or embedded in the message the sender extracts them, propagating the events to the collector and then separately passing the message along to the underlying sender.

The events are, typically, not part of the message sent to the underlying sender: while Events do have a string form that can be logged, and most senders will handle them appropriately, events are not logged with *this* sender.

func WithMetrics

func WithMetrics(c any, events ...*Event) message.Composer

WithMetrics inspects a value that might have *Event, (or related types, including functions that produce events and slices of events) embedded in them.

Types

type Collector

type Collector struct {
	CollectorConf
	// contains filtered or unexported fields
}

Collector maintains the local state of collected metrics: metric series are registered lazily when they are first sent, and the collector tracks the value and is responsible for orchestrating.

func NewCollector

func NewCollector(ctx context.Context, opts ...CollectorOptionProvider) (*Collector, error)

NewCollector constructs a collector service that is responsible for collecting and distributing metric events. There are several basic modes of operation:

- Embedded: Use series.Sender to create in a grip/send.Sender: here the collector wraps the sender and intercepts events from normal logger messages. The series.WithMetrics helper can attach metrics.

- Directly: You can use the Push/Publish/Stream/PushEvent methods to send events to the collector.

- Background: Using the Register() method you can add a function to the Collector which will collect its result and distribute them on the provided backend.

Output from a collector is managed by CollectorBackends, which may be implemented externally (a backend is a fun.Processor function that consumes (and processes!) fun.Iterator[series.MetricPublisher] objects. Metrics publishers, then are closures that write the metrics format to an io.Writer, while the formatting of a message is controlled by the <>Renderer function in the Collector configuration.

func (*Collector) Close

func (c *Collector) Close() error

func (*Collector) Iterator

func (c *Collector) Iterator() *fun.Iterator[MetricSnapshot]

Iterator iterates through every metric and label combination, and takes a (rough) snapshot of each metric. Rough only because the timestamps and last metric may not always be (exactly) synchronized with regards to eachother.

func (*Collector) Publish

func (c *Collector) Publish(events []*Event)

func (*Collector) Push

func (c *Collector) Push(events ...*Event)

func (*Collector) PushEvent

func (c *Collector) PushEvent(e *Event)

func (*Collector) Register

func (c *Collector) Register(prod fun.Producer[[]*Event], dur time.Duration)

Register runs an event producing function,

func (*Collector) Stream

func (c *Collector) Stream(
	iter *fun.Iterator[*Event],
	opts ...fun.OptionProvider[*fun.WorkerGroupConf],
) fun.Worker

type CollectorBackend

type CollectorBackend fun.Processor[*fun.Iterator[MetricPublisher]]

func LoggerBackend

func LoggerBackend(sender send.Sender) CollectorBackend

func (CollectorBackend) Worker

type CollectorBackendFileConf

type CollectorBackendFileConf struct {
	Directory      string
	FilePrefix     string
	Extension      string
	CounterPadding int
	Megabytes      int
	Gzip           bool
}

func (*CollectorBackendFileConf) RotatingFilePath

func (conf *CollectorBackendFileConf) RotatingFilePath() fun.Producer[string]

func (*CollectorBackendFileConf) RotatingFileProducer

func (conf *CollectorBackendFileConf) RotatingFileProducer() fun.Producer[io.WriteCloser]

func (*CollectorBackendFileConf) Validate

func (conf *CollectorBackendFileConf) Validate() error

type CollectorBackendSocketConf

type CollectorBackendSocketConf struct {
	Dialer  net.Dialer
	Network string // tcp or udb
	Address string

	DialWorkers       int
	IdleConns         int
	MinDialRetryDelay time.Duration
	MaxDialRetryDelay time.Duration
	DialErrorHandling CollectorBackendSocketErrorOption

	MessageWorkers       int
	NumMessageRetries    int
	MinMessageRetryDelay time.Duration
	MaxMessageRetryDelay time.Duration
	MessageErrorHandling CollectorBackendSocketErrorOption
}

func (*CollectorBackendSocketConf) Validate

func (conf *CollectorBackendSocketConf) Validate() error

type CollectorBackendSocketErrorOption

type CollectorBackendSocketErrorOption int8
const (
	CollectorBackendSocketErrorINVALID CollectorBackendSocketErrorOption = iota
	CollectorBackendSocketErrorAbort
	CollectorBackendSocketErrorContinue
	CollectorBackendSocketErrorCollect
	CollectorBackendSocketErrorPanic
	CollectorBackendSocketErrorUNSPECIFIED
)

func (CollectorBackendSocketErrorOption) Validate

type CollectorBakendFileOptionProvider

type CollectorBakendFileOptionProvider = fun.OptionProvider[*CollectorBackendFileConf]

func CollectorBackendFileConfCounterPadding

func CollectorBackendFileConfCounterPadding(v int) CollectorBakendFileOptionProvider

func CollectorBackendFileConfDirectory

func CollectorBackendFileConfDirectory(path string) CollectorBakendFileOptionProvider

func CollectorBackendFileConfExtension

func CollectorBackendFileConfExtension(ext string) CollectorBakendFileOptionProvider

func CollectorBackendFileConfPrefix

func CollectorBackendFileConfPrefix(prefix string) CollectorBakendFileOptionProvider

func CollectorBackendFileConfRotationSizeMB

func CollectorBackendFileConfRotationSizeMB(v int) CollectorBakendFileOptionProvider

type CollectorBakendSocketOptionProvider

type CollectorBakendSocketOptionProvider = fun.OptionProvider[*CollectorBackendSocketConf]

func CollectorBackendSocketConfAddress

func CollectorBackendSocketConfAddress(addr string) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfDialWorkers

func CollectorBackendSocketConfDialWorkers(n int) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfDialer

func CollectorBackendSocketConfDialer(d net.Dialer) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfIdleConns

func CollectorBackendSocketConfIdleConns(n int) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMaxDialRetryDelay

func CollectorBackendSocketConfMaxDialRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMaxMessageRetryDelay

func CollectorBackendSocketConfMaxMessageRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMessageWorkers

func CollectorBackendSocketConfMessageWorkers(n int) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMinDialRetryDelay

func CollectorBackendSocketConfMinDialRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMinMessageRetryDelay

func CollectorBackendSocketConfMinMessageRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfNetowrkTCP

func CollectorBackendSocketConfNetowrkTCP() CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfNetowrkUDP

func CollectorBackendSocketConfNetowrkUDP() CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfNumMessageRetries

func CollectorBackendSocketConfNumMessageRetries(n int) CollectorBakendSocketOptionProvider

type CollectorConf

type CollectorConf struct {
	Backends      []CollectorBackend
	BrokerOptions pubsub.BrokerOptions
	Buffer        int

	LabelRenderer          MetricLabelRenderer
	MetricRenderer         MetricValueRenderer
	DefaultHistogramRender MetricHistogramRenderer
}

func (*CollectorConf) Validate

func (conf *CollectorConf) Validate() error

type CollectorOptionProvider

type CollectorOptionProvider = fun.OptionProvider[*CollectorConf]

func CollectorConfAppendBackends

func CollectorConfAppendBackends(bs ...CollectorBackend) CollectorOptionProvider

func CollectorConfBuffer

func CollectorConfBuffer(size int) CollectorOptionProvider

func CollectorConfFileLoggerBackend

func CollectorConfFileLoggerBackend(opts *CollectorBackendFileConf) CollectorOptionProvider

func CollectorConfOutputGraphite

func CollectorConfOutputGraphite() CollectorOptionProvider

func CollectorConfOutputJSON

func CollectorConfOutputJSON() CollectorOptionProvider

func CollectorConfOutputOpenTSB

func CollectorConfOutputOpenTSB() CollectorOptionProvider

func CollectorConfSet

func CollectorConfSet(c *CollectorConf) CollectorOptionProvider

func CollectorConfWithLoggerBackend

func CollectorConfWithLoggerBackend(sender send.Sender) CollectorOptionProvider

type Event

type Event struct {
	// contains filtered or unexported fields
}

func Extract

func Extract(c any) []*Event

Extract takes an arbitrary object and attempts to introspect it to find events.

func (*Event) Export

func (e *Event) Export() Record

func (*Event) MarshalJSON

func (e *Event) MarshalJSON() ([]byte, error)

func (*Event) String

func (e *Event) String() string

type EventExtractor

type EventExtractor interface {
	Events() []*Event
}

EventExtractor is a type that is implementable by arbitrary types to create events.

type HistogramConf

type HistogramConf struct {
	Min               int64
	Max               int64
	SignificantDigits int
	Quantiles         []float64
	OutOfRange        HistogramOutOfRangeOption
	Interval          time.Duration
	Renderer          MetricHistogramRenderer
}

func MakeDefaultHistogramConf

func MakeDefaultHistogramConf() *HistogramConf

func (*HistogramConf) Apply

func (conf *HistogramConf) Apply(opts ...HistogramOptionProvider) error

func (*HistogramConf) Validate

func (conf *HistogramConf) Validate() error

type HistogramOptionProvider

type HistogramOptionProvider = fun.OptionProvider[*HistogramConf]

func HistogramConfBounds

func HistogramConfBounds(min, max int64) HistogramOptionProvider

func HistogramConfInterval

func HistogramConfInterval(dur time.Duration) HistogramOptionProvider

func HistogramConfLowerBound

func HistogramConfLowerBound(in int64) HistogramOptionProvider

func HistogramConfReset

func HistogramConfReset() HistogramOptionProvider

func HistogramConfSet

func HistogramConfSet(arg *HistogramConf) HistogramOptionProvider

func HistogramConfSetQuantiles

func HistogramConfSetQuantiles(quant []float64) HistogramOptionProvider

func HistogramConfSignifcantDigits

func HistogramConfSignifcantDigits(in int) HistogramOptionProvider

func HistogramConfUpperBound

func HistogramConfUpperBound(in int64) HistogramOptionProvider

type HistogramOutOfRangeOption

type HistogramOutOfRangeOption int8
const (
	HistogramOutOfRangeINVALID HistogramOutOfRangeOption = iota
	HistogramOutOfRangePanic
	HistogramOutOfRangeIgnore
	HistogramOutOfRangeTruncate
	HistogramOutOfRangeUNSPECIFIED
)

type Metric

type Metric struct {
	ID   string
	Type MetricType
	// contains filtered or unexported fields
}

func Collect

func Collect(id string) *Metric

func Counter

func Counter(id string) *Metric

func Delta

func Delta(id string) *Metric

func Gauge

func Gauge(id string) *Metric

func Histogram

func Histogram(id string, opts ...HistogramOptionProvider) *Metric

func (*Metric) Add

func (m *Metric) Add(v int64) *Event

func (*Metric) Annotate

func (m *Metric) Annotate(pairs ...dt.Pair[string, string]) *Metric

func (*Metric) Collect

func (m *Metric) Collect(fn fun.Future[int64]) *Event

func (*Metric) CollectAdd

func (m *Metric) CollectAdd(fn fun.Future[int64]) *Event

func (*Metric) Dec

func (m *Metric) Dec() *Event

func (*Metric) Equal

func (m *Metric) Equal(two *Metric) bool

func (*Metric) Inc

func (m *Metric) Inc() *Event

func (*Metric) Label

func (m *Metric) Label(k, v string) *Metric

func (*Metric) Labels

func (m *Metric) Labels(set *dt.Set[dt.Pair[string, string]]) *Metric

func (*Metric) MetricType

func (m *Metric) MetricType(t MetricType) *Metric

func (*Metric) Periodic

func (m *Metric) Periodic(dur time.Duration) *Metric

Periodic sets an interval for the metrics to be reported: new events aren't reported for this metric (id+labels) regardless of periodic being set on future matching events, but the periodic reporting remains.

This periodicity only refers to the _reporting_ of the event, not the collection of the event. Register a fun.Producer[[]*Events] on the series.Collector for periodic collection.

func (*Metric) Set

func (m *Metric) Set(v int64) *Event

type MetricHistogramRenderer

type MetricHistogramRenderer func(
	wr *bytes.Buffer,
	key string,
	labels fun.Future[[]byte],
	sample *dt.Pairs[float64, int64],
	ts time.Time,
)

func MakeDefaultHistogramMetricRenderer

func MakeDefaultHistogramMetricRenderer(mr MetricValueRenderer) MetricHistogramRenderer

type MetricLabelRenderer

type MetricLabelRenderer func(output *bytes.Buffer, labels []dt.Pair[string, string], extra ...dt.Pair[string, string])

MetricLabelRenderer provides an implementation for an ordered set of labels (tags) for a specific metric series. MetricLabels are rendered and cached in the Collector, and the buffered output, is passed as a future to the MetricRenderer function.

type MetricMessage

type MetricMessage struct {
	message.Composer
	Events []*Event
}

MetricMessage is a collection of events and a message.Composer object that can be used as a message.Composer but that also contains some number of events.

func Message

func Message(m message.Composer, events ...*Event) *MetricMessage

Message is a simple constructor around *MetricMessage (which implements message.Composer) and includes a slice of event pointers.

func (*MetricMessage) Raw

func (m *MetricMessage) Raw() any

func (*MetricMessage) String

func (m *MetricMessage) String() string

func (*MetricMessage) Structured

func (m *MetricMessage) Structured() bool

type MetricPublisher

type MetricPublisher func(io.Writer) error

type MetricSnapshot

type MetricSnapshot struct {
	Name      string
	Labels    string
	Value     int64
	Timestamp time.Time
}

MetricSnapshot is the export format for a metric series at a given point of time.

type MetricType

type MetricType string
const (
	MetricTypeDeltas    MetricType = "deltas"
	MetricTypeCounter   MetricType = "counter"
	MetricTypeGuage     MetricType = "gauge"
	MetricTypeHistogram MetricType = "histogram"
)

type MetricValueRenderer

type MetricValueRenderer func(writer *bytes.Buffer, key string, labels fun.Future[[]byte], value int64, ts time.Time)

MetricValueRenderer takes an event and writes the output to a buffer. This makes it possible to use the metrics system with arbitrary output formats and targets.

type Record

type Record struct {
	ID     string                    `bson:"metric" json:"metric" yaml:"metric"`
	Value  int64                     `bson:"Value" json:"Value" yaml:"Value"`
	Labels *dt.Pairs[string, string] `bson:"labels" json:"labels" yaml:"labels"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL