telemetry

package
v0.47.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2023 License: BSD-3-Clause Imports: 18 Imported by: 0

Documentation

Overview

Package telemetry is a library for collecting various Metric, for example from standard runtime/metrics, and send or write it to one or more Forwarder. Each Forwarder has capability to format the Metric before sending or writing it using Formatter.

Index

Examples

Constants

This section is empty.

Variables

View Source
var RuntimeMetricsAlias = map[string]string{
	`/cgo/go-to-c-calls:calls`: `go_cgo_calls`,

	`/cpu/classes/gc/mark/assist:cpu-seconds`:    `go_cpu_gc_mark_assist_seconds`,
	`/cpu/classes/gc/mark/dedicated:cpu-seconds`: `go_cpu_gc_mark_dedicated_seconds`,
	`/cpu/classes/gc/mark/idle:cpu-seconds`:      `go_cpu_gc_mark_idle_seconds`,
	`/cpu/classes/gc/pause:cpu-seconds`:          `go_cpu_gc_pause_seconds`,
	`/cpu/classes/gc/total:cpu-seconds`:          `go_cpu_gc_total_seconds`,

	`/cpu/classes/idle:cpu-seconds`:                `go_cpu_idle_seconds`,
	`/cpu/classes/scavenge/assist:cpu-seconds`:     `go_cpu_scavenge_assist_seconds`,
	`/cpu/classes/scavenge/background:cpu-seconds`: `go_cpu_scavenge_background_seconds`,
	`/cpu/classes/scavenge/total:cpu-seconds`:      `go_cpu_scavenge_total_seconds`,
	`/cpu/classes/total:cpu-seconds`:               `go_cpu_total_seconds`,
	`/cpu/classes/user:cpu-seconds`:                `go_cpu_user_seconds`,

	`/gc/cycles/automatic:gc-cycles`: `go_gc_cycles_automatic`,
	`/gc/cycles/forced:gc-cycles`:    `go_gc_cycles_forced`,
	`/gc/cycles/total:gc-cycles`:     `go_gc_cycles_total`,

	`/gc/heap/allocs-by-size:bytes`: `go_gc_heap_alloc_by_size_bytes`,
	`/gc/heap/allocs:bytes`:         `go_gc_heap_allocs_bytes`,
	`/gc/heap/allocs:objects`:       `go_gc_heap_allocs_objects`,
	`/gc/heap/frees-by-size:bytes`:  `go_gc_heap_frees_by_size_bytes`,
	`/gc/heap/frees:bytes`:          `go_gc_heap_frees_bytes`,
	`/gc/heap/frees:objects`:        `go_gc_heap_frees_objects`,
	`/gc/heap/goal:bytes`:           `go_gc_heap_goal_bytes`,
	`/gc/heap/objects:objects`:      `go_gc_heap_objects`,
	`/gc/heap/tiny/allocs:objects`:  `go_gc_heap_tiny_allocs_objects`,

	`/gc/limiter/last-enabled:gc-cycle`: `go_gc_limiter_last_enabled`,

	`/gc/pauses:seconds`: `go_gc_pauses_seconds`,

	`/gc/stack/starting-size:bytes`: `go_gc_stack_starting_size_bytes`,

	`/godebug/non-default-behavior/execerrdot:events`:           `go_godebug_execerrdot_events`,
	`/godebug/non-default-behavior/http2client:events`:          `go_godebug_http2client_events`,
	`/godebug/non-default-behavior/http2server:events`:          `go_godebug_http2server_events`,
	`/godebug/non-default-behavior/installgoroot:events`:        `go_godebug_installgoroot_events`,
	`/godebug/non-default-behavior/panicnil:events`:             `go_godebug_panicnil_events`,
	`/godebug/non-default-behavior/randautoseed:events`:         `go_godebug_randautoseed_events`,
	`/godebug/non-default-behavior/tarinsecurepath:events`:      `go_godebug_trainsecurepath_events`,
	`/godebug/non-default-behavior/x509sha1:events`:             `go_godebug_x509sha1_events`,
	`/godebug/non-default-behavior/x509usefallbackroots:events`: `go_godebug_x509usefallbackroots_events`,
	`/godebug/non-default-behavior/zipinsecurepath:events`:      `go_godebug_zipinsecurepath_events`,

	`/memory/classes/heap/free:bytes`:     `go_memory_heap_free_bytes`,
	`/memory/classes/heap/objects:bytes`:  `go_memory_heap_objects_bytes`,
	`/memory/classes/heap/released:bytes`: `go_memory_heap_released_bytes`,
	`/memory/classes/heap/stacks:bytes`:   `go_memory_heap_stacks_bytes`,
	`/memory/classes/heap/unused:bytes`:   `go_memory_heap_unused_bytes`,

	`/memory/classes/metadata/mcache/free:bytes`:  `go_memory_metadata_mcache_free_bytes`,
	`/memory/classes/metadata/mcache/inuse:bytes`: `go_memory_metadata_mcache_inuse_bytes`,
	`/memory/classes/metadata/mspan/free:bytes`:   `go_memory_metadata_mspan_free_bytes`,
	`/memory/classes/metadata/mspan/inuse:bytes`:  `go_memory_metadata_mspan_inuse_bytes`,
	`/memory/classes/metadata/other:bytes`:        `go_memory_metadata_other_bytes`,

	`/memory/classes/os-stacks:bytes`:         `go_memory_os_stacks_bytes`,
	`/memory/classes/other:bytes`:             `go_memory_other_bytes`,
	`/memory/classes/profiling/buckets:bytes`: `go_memory_profiling_buckets_bytes`,
	`/memory/classes/total:bytes`:             `go_memory_total_bytes`,

	`/sched/gomaxprocs:threads`:    `go_sched_gomaxprocs`,
	`/sched/goroutines:goroutines`: `go_sched_goroutines`,
	`/sched/latencies:seconds`:     `go_sched_latencies_seconds`,

	`/sync/mutex/wait/total:seconds`: `go_sync_mutex_wait_total_seconds`,
}

RuntimeMetricsAlias define an alias for runtime/metrics.Name to be exported by GoMetricsCollector.

Functions

func ContextForwardWait

func ContextForwardWait(ctx context.Context) context.Context

ContextForwardWait wait for the Agent.Forward or Agent.BulkForward to be finished.

Types

type Agent

type Agent struct {
	// contains filtered or unexported fields
}

Agent is the one that responsible to collect and forward the metrics.

func NewAgent

func NewAgent(opts AgentOptions) (agent *Agent)

NewAgent create, initalize, and run the new Agent. The agent will start auto collecting the metrics in the background every [AgentOptions.Interval] and forward it to each Forwarder.

func (*Agent) BulkForward

func (agent *Agent) BulkForward(ctx context.Context, list []Metric) error

BulkForward push list of Metric asynchronously. If ctx contains ContextForwardWait, it will forward the metric synchronously.

func (*Agent) Forward

func (agent *Agent) Forward(ctx context.Context, m Metric) (err error)

Forward single metric to agent asynchronously. If ctx contains ContextForwardWait, it will forward the metric synchronously.

func (*Agent) Stop

func (agent *Agent) Stop()

Stop the agent and close all Forwarder.

type AgentOptions

type AgentOptions struct {
	// Name of the agent.
	Name string

	// Metadata provides static, additional information to be forwarded
	// along with the collected metrics.
	Metadata *Metadata

	// Timestamp define the function to be called to set the
	// [Metric.Timestamp].
	// Default to NanoTimestamp.
	Timestamp Timestamper

	// Collectors contains list of Collector that provide the metrics to
	// be forwarded.
	// An empty Collectors means no metrics will be collected and
	// forwarded.
	Collectors []Collector

	// Forwarders contains list of target where collected metrics will be
	// forwarded.
	Forwarders []Forwarder

	// Interval for collecting metrics.
	// Default value is one minutes with the minimium value is 10 seconds.
	Interval time.Duration
}

AgentOptions contains options to run the Agent.

type BufferForwarder

type BufferForwarder struct {
	sync.Mutex
	// contains filtered or unexported fields
}

BufferForwarder write the metrics to underlying bytes.Buffer.

Example
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/shuLhan/share/lib/telemetry"
)

func main() {
	// Create the Formatter and Forwarder.
	var (
		dsvFmt = telemetry.NewDsvFormatter(';', telemetry.RuntimeMetricsAlias)
		bufFwd = telemetry.NewBufferForwarder(dsvFmt)
	)

	// Create metadata.
	var md = telemetry.NewMetadata()
	md.Set(`name`, `BufferForwarder`)
	md.Set(`version`, `0.1.0`)

	// Create the Agent.
	var (
		agentOpts = telemetry.AgentOptions{
			Metadata:   md,
			Forwarders: []telemetry.Forwarder{bufFwd},
			Timestamp:  telemetry.DummyTimestamp(),
		}
		agent = telemetry.NewAgent(agentOpts)
	)
	defer agent.Stop()

	// Forward single metric and print the result.
	var (
		m = telemetry.Metric{
			Name:  `usage`,
			Value: 0.5,
		}
		ctx = telemetry.ContextForwardWait(context.Background())

		err error
	)
	err = agent.Forward(ctx, m)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf(`%s`, bufFwd.Bytes())

	// Forward list of Metric and print the result.
	var list = []telemetry.Metric{
		{Name: `usage`, Value: 0.4},
		{Name: `usage`, Value: 0.3},
	}
	err = agent.BulkForward(ctx, list)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf(`%s`, bufFwd.Bytes())
}
Output:

1678606568;"usage";0.500000;"name=BufferForwarder,version=0.1.0"
1678606568;"usage";0.400000;"name=BufferForwarder,version=0.1.0"
1678606568;"usage";0.300000;"name=BufferForwarder,version=0.1.0"

func NewBufferForwarder

func NewBufferForwarder(f Formatter) *BufferForwarder

NewBufferForwarder create new BufferForwarder using f as Formatter.

func (*BufferForwarder) Bytes

func (buf *BufferForwarder) Bytes() (b []byte)

Bytes return the metrics that has been written to Buffer. Once this method called the underlying Buffer will be resetted.

func (*BufferForwarder) Close

func (buf *BufferForwarder) Close() error

Close on Buffer is a no-op.

func (*BufferForwarder) Formatter

func (buf *BufferForwarder) Formatter() Formatter

Formatter return the Formatter used by this BufferForwarder.

func (*BufferForwarder) Write

func (buf *BufferForwarder) Write(wire []byte) (n int, err error)

Write the raw metrics to Buffer.

type Collector

type Collector interface {
	// Collect the metrics with timestamp.
	Collect(timestamp int64) []Metric
}

Collector provides an interface to collect the metrics.

type DsvFormatter

type DsvFormatter struct {
	// contains filtered or unexported fields
}

DsvFormatter format the Metric in single line where each value is separated by single character. The metric are formatted in the following order,

Timestamp SEP Name SEP Value *(SEP metadata)
metadata = Metadata.Key "=" Metadata.Value *("," metadata)

The Name, Value, and metadata are enclosed with double quoted.

func NewDsvFormatter

func NewDsvFormatter(sep rune, metricsAlias map[string]string) (dsv *DsvFormatter)

NewDsvFormatter create new DsvFormatter using sep as separater and options to change the metric output name using metricsAlias. See RuntimeMetricsAlias for example.

func (*DsvFormatter) BulkFormat

func (dsv *DsvFormatter) BulkFormat(listm []Metric, md *Metadata) []byte

BulkFormat bulk format list of Metric with Metadata.

func (*DsvFormatter) Format

func (dsv *DsvFormatter) Format(m Metric, md *Metadata) []byte

Format single Metric into single line DSV.

func (*DsvFormatter) Name

func (dsv *DsvFormatter) Name() string

Name return the Name of DsvFormatter as "dsv".

type FileForwarder

type FileForwarder struct {
	// contains filtered or unexported fields
}

FileForwarder forward the raw metrics into file.

func NewFileForwarder

func NewFileForwarder(fmt Formatter, file io.WriteCloser) (fwd *FileForwarder)

NewFileForwarder create new FileForwarder using fmt as the Formatter.

func (*FileForwarder) Close

func (fwd *FileForwarder) Close() (err error)

Close the underlying file. Calling Forward after closing Forwarder may cause panic.

func (*FileForwarder) Formatter

func (fwd *FileForwarder) Formatter() Formatter

Formatter return the Formatter that is used by this FileForwarder.

func (*FileForwarder) Forward

func (fwd *FileForwarder) Forward(wire []byte) (err error)

Forward write the formatted metrics into file.

type Formatter

type Formatter interface {
	// BulkFormat format list of Metric with metadata for transfer.
	BulkFormat(listm []Metric, md *Metadata) []byte

	// Format the Metric m and metadata for transfer.
	Format(m Metric, md *Metadata) []byte

	// Name return the name of formatter.
	Name() string
}

Formatter define the interface that responsible to convert single or bulk of Metric into its wire format.

type Forwarder

type Forwarder interface {
	// Implement the Close and Write from package [io].
	// Calling Forward after Close may cause panic.
	io.WriteCloser

	// Formatter return the Formatter being used to format the metrics.
	Formatter() Formatter
}

Forwarder provide the interface to be implemented by forwarder in order to store the collected metrics.

type GoMemStatsCollector

type GoMemStatsCollector struct {
	// contains filtered or unexported fields
}

GoMemStatsCollector collect Go statistics about memory allocator, as in calling runtime.ReadMemStats.

This collector export the following metric names, with value from the field of runtime.MemStats:

func NewGoMemStatsCollector

func NewGoMemStatsCollector(filter *regexp.Regexp) (col *GoMemStatsCollector)

NewGoMemStatsCollector create new MemStats collector with options to filter the metrics by its name using regular expression.

If filter is nil, none of the metrics will be collected.

func (*GoMemStatsCollector) Collect

func (col *GoMemStatsCollector) Collect(ts int64) (list []Metric)

Collect the Go MemStats.

type GoMetricsCollector

type GoMetricsCollector struct {
	// contains filtered or unexported fields
}

GoMetricsCollector collect the metrics using runtime/metrics.Read.

func NewGoMetricsCollector

func NewGoMetricsCollector(filter *regexp.Regexp) (col *GoMetricsCollector)

NewGoMetricsCollector create new collector for runtime/metrics with options to filter specific metric by alias name using regular expression.

For example, to collect all metrics pass regex `^.*$`, to collect memory only pass `^go_memory_.*$`. A nil filter means no metrics will be collected.

func (*GoMetricsCollector) Collect

func (col *GoMetricsCollector) Collect(timestamp int64) (list []Metric)

Collect the runtime/metrics.

type IlpFormatter

type IlpFormatter struct {
	// contains filtered or unexported fields
}

IlpFormatter format the Metric using the Influxdata Line Protocol, ILP. Syntax,

ILP      = measurement [METADATA] " " METRIC [" " timestamp] LF
METADATA = *("," key "=" value)
METRIC   = key "=" value *("," METRIC)

func NewIlpFormatter

func NewIlpFormatter(measurement string) (ilp *IlpFormatter)

NewIlpFormatter create and initialize new IlpFormatter.

func (*IlpFormatter) BulkFormat

func (ilp *IlpFormatter) BulkFormat(list []Metric, md *Metadata) []byte

BulkFormat format list of Metric with metadata.

func (*IlpFormatter) Format

func (ilp *IlpFormatter) Format(m Metric, md *Metadata) []byte

Format single Metric.

func (*IlpFormatter) Name

func (ilp *IlpFormatter) Name() string

Name return the unique name of the formatter, "ilp".

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata provides versioned Map with stable key order.

Example
package main

import (
	"fmt"

	"github.com/shuLhan/share/lib/telemetry"
)

func main() {
	var md = telemetry.NewMetadata()

	// The new Metadata has version=0.
	fmt.Println(md.Version(), md.String(), ".")

	// Setting a key increase the version to 1
	md.Set(`host`, `localhost`)
	fmt.Println(md.Version(), md.String(), ".")

	// ... even if the key already exist.
	md.Set(`host`, `my.localhost`)
	fmt.Println(md.Version(), md.String(), ".")

	// Deleting a key increase the version too.
	md.Delete(`host`)
	fmt.Println(md.Version(), md.String(), ".")

	// But if the key is not exist, it will not increase the version.
	md.Delete(`host`)
	fmt.Println(md.Version(), md.String(), ".")

}
Output:

0  .
1 host=localhost .
2 host=my.localhost .
3  .
3  .

func NewMetadata

func NewMetadata() (md *Metadata)

NewMetadata create and initialize new metadata.

func (*Metadata) Delete

func (md *Metadata) Delete(key string)

Delete Metadata by its key. The versioning will be increased only if the key exist.

func (*Metadata) Get

func (md *Metadata) Get(key string) string

Get the Metadata value by its key.

func (*Metadata) Keys

func (md *Metadata) Keys() (keys []string)

Keys return the Metadata keys sorted lexicographically.

func (*Metadata) KeysMap

func (md *Metadata) KeysMap() (keys []string, vals map[string]string)

KeysMap return the Metadata keys sorted lexicographically and its map.

func (*Metadata) Set

func (md *Metadata) Set(key, value string)

Set store the key with value into Metadata. This method always increase the version.

func (*Metadata) String

func (md *Metadata) String() string

String return the Metadata where each item separated by comma and the key-value separated by equal character.

func (*Metadata) Version

func (md *Metadata) Version() int

Version return the current version of Metadata.

type Metric

type Metric struct {
	Name      string  `json:"name"`
	Value     float64 `json:"value"`
	Timestamp int64   `json:"timestamp"`
}

Metric contains name, value, and timestamp.

type QuestdbForwarder

type QuestdbForwarder struct {
	// contains filtered or unexported fields
}

QuestdbForwarder forward the metrics to QuestDB using TCP.

func NewQuestdbForwarder

func NewQuestdbForwarder(opts QuestdbOptions) (fwd *QuestdbForwarder, err error)

NewQuestdbForwarder create new forwarder for QuestDB.

func (*QuestdbForwarder) Close

func (fwd *QuestdbForwarder) Close() (err error)

Close the connection to the questdb server.

func (*QuestdbForwarder) Formatter

func (fwd *QuestdbForwarder) Formatter() Formatter

Formatter return the Formatter used by questdb.

func (*QuestdbForwarder) Write

func (fwd *QuestdbForwarder) Write(b []byte) (n int, err error)

Write forward the formatted Metric into the questdb server.

type QuestdbOptions

type QuestdbOptions struct {
	// Fmt the Formatter to use to convert the Metric.
	// Usually set to IlpFormatter.
	Fmt Formatter

	// ServerUrl define the QuestDB server URL.
	// Currently, it only support the TCP scheme using the following
	// format "tcp://<host>:<port>".
	ServerUrl string

	// Timeout define default timeout for Write.
	// Default to 10 seconds.
	Timeout time.Duration
	// contains filtered or unexported fields
}

QuestdbOptions options for QuestdbForwarder.

type StdoutForwarder

type StdoutForwarder struct {
	// contains filtered or unexported fields
}

StdoutForwarder write the metrics to os.Stdout. This type is used as example and to provide wrapper for os.Stdout, since user should not call Close on os.Stdout.

func NewStdoutForwarder

func NewStdoutForwarder(f Formatter) *StdoutForwarder

NewStdoutForwarder create new StdoutForwarder using f as Formatter.

func (*StdoutForwarder) Close

func (stdout *StdoutForwarder) Close() error

Close on StdoutForwarder sync the Stdout.

func (*StdoutForwarder) Formatter

func (stdout *StdoutForwarder) Formatter() Formatter

Formatter return the Formatter used by this StdoutForwarder.

func (*StdoutForwarder) Write

func (stdout *StdoutForwarder) Write(wire []byte) (n int, err error)

Write the raw metrics to stdout.

type Timestamper

type Timestamper func() int64

Timestamper a type that return a function to generate timestamp.

func DummyTimestamp

func DummyTimestamp() Timestamper

DummyTimestamp return fixed epoch 1678606568, for testing only.

func MilliTimestamp

func MilliTimestamp() Timestamper

MilliTimestamp return the number of milliseconds elapsed since January 1, 1970 UTC.

func NanoTimestamp

func NanoTimestamp() Timestamper

NanoTimestamp return the number of nanoseconds elapsed since January 1, 1970 UTC

func SecondTimestamp

func SecondTimestamp() Timestamper

SecondTimestamp return the number of seconds elapsed since January 1, 1970 UTC

Directories

Path Synopsis
internal
cmd/agent-example
Program agent-example provide an example of how to create agent.
Program agent-example provide an example of how to create agent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL