Documentation ¶
Overview ¶
Package telemetry is a library for collecting various Metric, for example from standard runtime/metrics, and send or write it to one or more Forwarder. Each Forwarder has capability to format the Metric before sending or writing it using Formatter.
Index ¶
- Variables
- func ContextForwardWait(ctx context.Context) context.Context
- type Agent
- type AgentOptions
- type BufferForwarder
- type Collector
- type DsvFormatter
- type FileForwarder
- type Formatter
- type Forwarder
- type GoMemStatsCollector
- type GoMetricsCollector
- type IlpFormatter
- type Metadata
- func (md *Metadata) Delete(key string)
- func (md *Metadata) Get(key string) string
- func (md *Metadata) Keys() (keys []string)
- func (md *Metadata) KeysMap() (keys []string, vals map[string]string)
- func (md *Metadata) Set(key, value string)
- func (md *Metadata) String() string
- func (md *Metadata) Version() int
- type Metric
- type QuestdbForwarder
- type QuestdbOptions
- type StdoutForwarder
- type Timestamper
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var RuntimeMetricsAlias = map[string]string{
`/cgo/go-to-c-calls:calls`: `go_cgo_calls`,
`/cpu/classes/gc/mark/assist:cpu-seconds`: `go_cpu_gc_mark_assist_seconds`,
`/cpu/classes/gc/mark/dedicated:cpu-seconds`: `go_cpu_gc_mark_dedicated_seconds`,
`/cpu/classes/gc/mark/idle:cpu-seconds`: `go_cpu_gc_mark_idle_seconds`,
`/cpu/classes/gc/pause:cpu-seconds`: `go_cpu_gc_pause_seconds`,
`/cpu/classes/gc/total:cpu-seconds`: `go_cpu_gc_total_seconds`,
`/cpu/classes/idle:cpu-seconds`: `go_cpu_idle_seconds`,
`/cpu/classes/scavenge/assist:cpu-seconds`: `go_cpu_scavenge_assist_seconds`,
`/cpu/classes/scavenge/background:cpu-seconds`: `go_cpu_scavenge_background_seconds`,
`/cpu/classes/scavenge/total:cpu-seconds`: `go_cpu_scavenge_total_seconds`,
`/cpu/classes/total:cpu-seconds`: `go_cpu_total_seconds`,
`/cpu/classes/user:cpu-seconds`: `go_cpu_user_seconds`,
`/gc/cycles/automatic:gc-cycles`: `go_gc_cycles_automatic`,
`/gc/cycles/forced:gc-cycles`: `go_gc_cycles_forced`,
`/gc/cycles/total:gc-cycles`: `go_gc_cycles_total`,
`/gc/heap/allocs-by-size:bytes`: `go_gc_heap_alloc_by_size_bytes`,
`/gc/heap/allocs:bytes`: `go_gc_heap_allocs_bytes`,
`/gc/heap/allocs:objects`: `go_gc_heap_allocs_objects`,
`/gc/heap/frees-by-size:bytes`: `go_gc_heap_frees_by_size_bytes`,
`/gc/heap/frees:bytes`: `go_gc_heap_frees_bytes`,
`/gc/heap/frees:objects`: `go_gc_heap_frees_objects`,
`/gc/heap/goal:bytes`: `go_gc_heap_goal_bytes`,
`/gc/heap/objects:objects`: `go_gc_heap_objects`,
`/gc/heap/tiny/allocs:objects`: `go_gc_heap_tiny_allocs_objects`,
`/gc/limiter/last-enabled:gc-cycle`: `go_gc_limiter_last_enabled`,
`/gc/pauses:seconds`: `go_gc_pauses_seconds`,
`/gc/stack/starting-size:bytes`: `go_gc_stack_starting_size_bytes`,
`/godebug/non-default-behavior/execerrdot:events`: `go_godebug_execerrdot_events`,
`/godebug/non-default-behavior/http2client:events`: `go_godebug_http2client_events`,
`/godebug/non-default-behavior/http2server:events`: `go_godebug_http2server_events`,
`/godebug/non-default-behavior/installgoroot:events`: `go_godebug_installgoroot_events`,
`/godebug/non-default-behavior/panicnil:events`: `go_godebug_panicnil_events`,
`/godebug/non-default-behavior/randautoseed:events`: `go_godebug_randautoseed_events`,
`/godebug/non-default-behavior/tarinsecurepath:events`: `go_godebug_trainsecurepath_events`,
`/godebug/non-default-behavior/x509sha1:events`: `go_godebug_x509sha1_events`,
`/godebug/non-default-behavior/x509usefallbackroots:events`: `go_godebug_x509usefallbackroots_events`,
`/godebug/non-default-behavior/zipinsecurepath:events`: `go_godebug_zipinsecurepath_events`,
`/memory/classes/heap/free:bytes`: `go_memory_heap_free_bytes`,
`/memory/classes/heap/objects:bytes`: `go_memory_heap_objects_bytes`,
`/memory/classes/heap/released:bytes`: `go_memory_heap_released_bytes`,
`/memory/classes/heap/stacks:bytes`: `go_memory_heap_stacks_bytes`,
`/memory/classes/heap/unused:bytes`: `go_memory_heap_unused_bytes`,
`/memory/classes/metadata/mcache/free:bytes`: `go_memory_metadata_mcache_free_bytes`,
`/memory/classes/metadata/mcache/inuse:bytes`: `go_memory_metadata_mcache_inuse_bytes`,
`/memory/classes/metadata/mspan/free:bytes`: `go_memory_metadata_mspan_free_bytes`,
`/memory/classes/metadata/mspan/inuse:bytes`: `go_memory_metadata_mspan_inuse_bytes`,
`/memory/classes/metadata/other:bytes`: `go_memory_metadata_other_bytes`,
`/memory/classes/os-stacks:bytes`: `go_memory_os_stacks_bytes`,
`/memory/classes/other:bytes`: `go_memory_other_bytes`,
`/memory/classes/profiling/buckets:bytes`: `go_memory_profiling_buckets_bytes`,
`/memory/classes/total:bytes`: `go_memory_total_bytes`,
`/sched/gomaxprocs:threads`: `go_sched_gomaxprocs`,
`/sched/goroutines:goroutines`: `go_sched_goroutines`,
`/sched/latencies:seconds`: `go_sched_latencies_seconds`,
`/sync/mutex/wait/total:seconds`: `go_sync_mutex_wait_total_seconds`,
}
RuntimeMetricsAlias define an alias for runtime/metrics.Name to be exported by GoMetricsCollector.
Functions ¶
func ContextForwardWait ¶
ContextForwardWait wait for the Agent.Forward or Agent.BulkForward to be finished.
Types ¶
type Agent ¶
type Agent struct {
// contains filtered or unexported fields
}
Agent is the one that responsible to collect and forward the metrics.
func NewAgent ¶
func NewAgent(opts AgentOptions) (agent *Agent)
NewAgent create, initalize, and run the new Agent. The agent will start auto collecting the metrics in the background every [AgentOptions.Interval] and forward it to each Forwarder.
func (*Agent) BulkForward ¶
BulkForward push list of Metric asynchronously. If ctx contains ContextForwardWait, it will forward the metric synchronously.
type AgentOptions ¶
type AgentOptions struct { // Name of the agent. Name string // Metadata provides static, additional information to be forwarded // along with the collected metrics. Metadata *Metadata // Timestamp define the function to be called to set the // [Metric.Timestamp]. // Default to NanoTimestamp. Timestamp Timestamper // Collectors contains list of Collector that provide the metrics to // be forwarded. // An empty Collectors means no metrics will be collected and // forwarded. Collectors []Collector // Forwarders contains list of target where collected metrics will be // forwarded. Forwarders []Forwarder // Interval for collecting metrics. // Default value is one minutes with the minimium value is 10 seconds. Interval time.Duration }
AgentOptions contains options to run the Agent.
type BufferForwarder ¶
BufferForwarder write the metrics to underlying bytes.Buffer.
Example ¶
package main import ( "context" "fmt" "log" "github.com/shuLhan/share/lib/telemetry" ) func main() { // Create the Formatter and Forwarder. var ( dsvFmt = telemetry.NewDsvFormatter(';', telemetry.RuntimeMetricsAlias) bufFwd = telemetry.NewBufferForwarder(dsvFmt) ) // Create metadata. var md = telemetry.NewMetadata() md.Set(`name`, `BufferForwarder`) md.Set(`version`, `0.1.0`) // Create the Agent. var ( agentOpts = telemetry.AgentOptions{ Metadata: md, Forwarders: []telemetry.Forwarder{bufFwd}, Timestamp: telemetry.DummyTimestamp(), } agent = telemetry.NewAgent(agentOpts) ) defer agent.Stop() // Forward single metric and print the result. var ( m = telemetry.Metric{ Name: `usage`, Value: 0.5, } ctx = telemetry.ContextForwardWait(context.Background()) err error ) err = agent.Forward(ctx, m) if err != nil { log.Fatal(err) } fmt.Printf(`%s`, bufFwd.Bytes()) // Forward list of Metric and print the result. var list = []telemetry.Metric{ {Name: `usage`, Value: 0.4}, {Name: `usage`, Value: 0.3}, } err = agent.BulkForward(ctx, list) if err != nil { log.Fatal(err) } fmt.Printf(`%s`, bufFwd.Bytes()) }
Output: 1678606568;"usage";0.500000;"name=BufferForwarder,version=0.1.0" 1678606568;"usage";0.400000;"name=BufferForwarder,version=0.1.0" 1678606568;"usage";0.300000;"name=BufferForwarder,version=0.1.0"
func NewBufferForwarder ¶
func NewBufferForwarder(f Formatter) *BufferForwarder
NewBufferForwarder create new BufferForwarder using f as Formatter.
func (*BufferForwarder) Bytes ¶
func (buf *BufferForwarder) Bytes() (b []byte)
Bytes return the metrics that has been written to Buffer. Once this method called the underlying Buffer will be resetted.
func (*BufferForwarder) Close ¶
func (buf *BufferForwarder) Close() error
Close on Buffer is a no-op.
func (*BufferForwarder) Formatter ¶
func (buf *BufferForwarder) Formatter() Formatter
Formatter return the Formatter used by this BufferForwarder.
type Collector ¶
type Collector interface { // Collect the metrics with timestamp. Collect(timestamp int64) []Metric }
Collector provides an interface to collect the metrics.
type DsvFormatter ¶
type DsvFormatter struct {
// contains filtered or unexported fields
}
DsvFormatter format the Metric in single line where each value is separated by single character. The metric are formatted in the following order,
Timestamp SEP Name SEP Value *(SEP metadata) metadata = Metadata.Key "=" Metadata.Value *("," metadata)
The Name, Value, and metadata are enclosed with double quoted.
func NewDsvFormatter ¶
func NewDsvFormatter(sep rune, metricsAlias map[string]string) (dsv *DsvFormatter)
NewDsvFormatter create new DsvFormatter using sep as separater and options to change the metric output name using metricsAlias. See RuntimeMetricsAlias for example.
func (*DsvFormatter) BulkFormat ¶
func (dsv *DsvFormatter) BulkFormat(listm []Metric, md *Metadata) []byte
BulkFormat bulk format list of Metric with Metadata.
func (*DsvFormatter) Format ¶
func (dsv *DsvFormatter) Format(m Metric, md *Metadata) []byte
Format single Metric into single line DSV.
func (*DsvFormatter) Name ¶
func (dsv *DsvFormatter) Name() string
Name return the Name of DsvFormatter as "dsv".
type FileForwarder ¶
type FileForwarder struct {
// contains filtered or unexported fields
}
FileForwarder forward the raw metrics into file.
func NewFileForwarder ¶
func NewFileForwarder(fmt Formatter, file io.WriteCloser) (fwd *FileForwarder)
NewFileForwarder create new FileForwarder using fmt as the Formatter.
func (*FileForwarder) Close ¶
func (fwd *FileForwarder) Close() (err error)
Close the underlying file. Calling Forward after closing Forwarder may cause panic.
func (*FileForwarder) Formatter ¶
func (fwd *FileForwarder) Formatter() Formatter
Formatter return the Formatter that is used by this FileForwarder.
func (*FileForwarder) Forward ¶
func (fwd *FileForwarder) Forward(wire []byte) (err error)
Forward write the formatted metrics into file.
type Formatter ¶
type Formatter interface { // BulkFormat format list of Metric with metadata for transfer. BulkFormat(listm []Metric, md *Metadata) []byte // Format the Metric m and metadata for transfer. Format(m Metric, md *Metadata) []byte // Name return the name of formatter. Name() string }
Formatter define the interface that responsible to convert single or bulk of Metric into its wire format.
type Forwarder ¶
type Forwarder interface { // Implement the Close and Write from package [io]. // Calling Forward after Close may cause panic. io.WriteCloser // Formatter return the Formatter being used to format the metrics. Formatter() Formatter }
Forwarder provide the interface to be implemented by forwarder in order to store the collected metrics.
type GoMemStatsCollector ¶
type GoMemStatsCollector struct {
// contains filtered or unexported fields
}
GoMemStatsCollector collect Go statistics about memory allocator, as in calling runtime.ReadMemStats.
This collector export the following metric names, with value from the field of runtime.MemStats:
- go_memstats_alloc_bytes: runtime.MemStats.Alloc
- go_memstats_total_alloc_bytes: runtime.MemStats.TotalAlloc
- go_memstats_sys_bytes: runtime.MemStats.Sys
- go_memstats_lookups: runtime.MemStats.Lookups
- go_memstats_mallocs_objects: runtime.MemStats.Mallocs
- go_memstats_frees_objects: runtime.MemStats.Frees
- go_memstats_heap_alloc_bytes: runtime.MemStats.HeapAlloc
- go_memstats_heap_sys_bytes: runtime.MemStats.HeapSys
- go_memstats_heap_idle_bytes: runtime.MemStats.HeapIdle
- go_memstats_heap_inuse_bytes: runtime.MemStats.HeapInuse
- go_memstats_heap_released_bytes: runtime.MemStats.HeapReleased
- go_memstats_heap_objects: runtime.MemStats.HeapObjects
- go_memstats_stack_inuse_bytes: runtime.MemStats.StackInuse
- go_memstats_stack_sys_bytes: runtime.MemStats.StackSys
- go_memstats_mspan_inuse_bytes: runtime.MemStats.MSpanInuse
- go_memstats_mspan_sys_bytes: runtime.MemStats.MSpanSys
- go_memstats_mcache_inuse_bytes: runtime.MemStats.MCacheInuse
- go_memstats_mcache_sys_bytes: runtime.MemStats.MCacheSys
- go_memstats_buck_hash_sys_bytes: runtime.MemStats.BuckHashSys
- go_memstats_gc_sys_bytes: runtime.MemStats.GCSys
- go_memstats_other_sys_bytes: runtime.MemStats.OtherSys
- go_memstats_gc_next_bytes: runtime.MemStats.NextGC
- go_memstats_gc_last: runtime.MemStats.LastGC
- go_memstats_pause_total_ns: runtime.MemStats.PauseTotalNs
- go_memstats_pause_ns: runtime.MemStats.PauseNs
- go_memstats_pause_end_ns: runtime.MemStats.PauseEnd
- go_memstats_gc_num: runtime.MemStats.NumGC
- go_memstats_gc_forced_num: runtime.MemStats.NumForcedGC
- go_memstats_gc_cpu_fraction: runtime.MemStats.GCCPUFraction
func NewGoMemStatsCollector ¶
func NewGoMemStatsCollector(filter *regexp.Regexp) (col *GoMemStatsCollector)
NewGoMemStatsCollector create new MemStats collector with options to filter the metrics by its name using regular expression.
If filter is nil, none of the metrics will be collected.
func (*GoMemStatsCollector) Collect ¶
func (col *GoMemStatsCollector) Collect(ts int64) (list []Metric)
Collect the Go MemStats.
type GoMetricsCollector ¶
type GoMetricsCollector struct {
// contains filtered or unexported fields
}
GoMetricsCollector collect the metrics using runtime/metrics.Read.
func NewGoMetricsCollector ¶
func NewGoMetricsCollector(filter *regexp.Regexp) (col *GoMetricsCollector)
NewGoMetricsCollector create new collector for runtime/metrics with options to filter specific metric by alias name using regular expression.
For example, to collect all metrics pass regex `^.*$`, to collect memory only pass `^go_memory_.*$`. A nil filter means no metrics will be collected.
func (*GoMetricsCollector) Collect ¶
func (col *GoMetricsCollector) Collect(timestamp int64) (list []Metric)
Collect the runtime/metrics.
type IlpFormatter ¶
type IlpFormatter struct {
// contains filtered or unexported fields
}
IlpFormatter format the Metric using the Influxdata Line Protocol, ILP. Syntax,
ILP = measurement [METADATA] " " METRIC [" " timestamp] LF METADATA = *("," key "=" value) METRIC = key "=" value *("," METRIC)
func NewIlpFormatter ¶
func NewIlpFormatter(measurement string) (ilp *IlpFormatter)
NewIlpFormatter create and initialize new IlpFormatter.
func (*IlpFormatter) BulkFormat ¶
func (ilp *IlpFormatter) BulkFormat(list []Metric, md *Metadata) []byte
BulkFormat format list of Metric with metadata.
func (*IlpFormatter) Format ¶
func (ilp *IlpFormatter) Format(m Metric, md *Metadata) []byte
Format single Metric.
func (*IlpFormatter) Name ¶
func (ilp *IlpFormatter) Name() string
Name return the unique name of the formatter, "ilp".
type Metadata ¶
type Metadata struct {
// contains filtered or unexported fields
}
Metadata provides versioned Map with stable key order.
Example ¶
package main import ( "fmt" "github.com/shuLhan/share/lib/telemetry" ) func main() { var md = telemetry.NewMetadata() // The new Metadata has version=0. fmt.Println(md.Version(), md.String(), ".") // Setting a key increase the version to 1 md.Set(`host`, `localhost`) fmt.Println(md.Version(), md.String(), ".") // ... even if the key already exist. md.Set(`host`, `my.localhost`) fmt.Println(md.Version(), md.String(), ".") // Deleting a key increase the version too. md.Delete(`host`) fmt.Println(md.Version(), md.String(), ".") // But if the key is not exist, it will not increase the version. md.Delete(`host`) fmt.Println(md.Version(), md.String(), ".") }
Output: 0 . 1 host=localhost . 2 host=my.localhost . 3 . 3 .
func NewMetadata ¶
func NewMetadata() (md *Metadata)
NewMetadata create and initialize new metadata.
func (*Metadata) Delete ¶
Delete Metadata by its key. The versioning will be increased only if the key exist.
func (*Metadata) Set ¶
Set store the key with value into Metadata. This method always increase the version.
type Metric ¶
type Metric struct { Name string `json:"name"` Value float64 `json:"value"` Timestamp int64 `json:"timestamp"` }
Metric contains name, value, and timestamp.
type QuestdbForwarder ¶
type QuestdbForwarder struct {
// contains filtered or unexported fields
}
QuestdbForwarder forward the metrics to QuestDB using TCP.
func NewQuestdbForwarder ¶
func NewQuestdbForwarder(opts QuestdbOptions) (fwd *QuestdbForwarder, err error)
NewQuestdbForwarder create new forwarder for QuestDB.
func (*QuestdbForwarder) Close ¶
func (fwd *QuestdbForwarder) Close() (err error)
Close the connection to the questdb server.
func (*QuestdbForwarder) Formatter ¶
func (fwd *QuestdbForwarder) Formatter() Formatter
Formatter return the Formatter used by questdb.
type QuestdbOptions ¶
type QuestdbOptions struct { // Fmt the Formatter to use to convert the Metric. // Usually set to IlpFormatter. Fmt Formatter // ServerUrl define the QuestDB server URL. // Currently, it only support the TCP scheme using the following // format "tcp://<host>:<port>". ServerUrl string // Timeout define default timeout for Write. // Default to 10 seconds. Timeout time.Duration // contains filtered or unexported fields }
QuestdbOptions options for QuestdbForwarder.
type StdoutForwarder ¶
type StdoutForwarder struct {
// contains filtered or unexported fields
}
StdoutForwarder write the metrics to os.Stdout. This type is used as example and to provide wrapper for os.Stdout, since user should not call Close on os.Stdout.
func NewStdoutForwarder ¶
func NewStdoutForwarder(f Formatter) *StdoutForwarder
NewStdoutForwarder create new StdoutForwarder using f as Formatter.
func (*StdoutForwarder) Close ¶
func (stdout *StdoutForwarder) Close() error
Close on StdoutForwarder sync the Stdout.
func (*StdoutForwarder) Formatter ¶
func (stdout *StdoutForwarder) Formatter() Formatter
Formatter return the Formatter used by this StdoutForwarder.
type Timestamper ¶
type Timestamper func() int64
Timestamper a type that return a function to generate timestamp.
func DummyTimestamp ¶
func DummyTimestamp() Timestamper
DummyTimestamp return fixed epoch 1678606568, for testing only.
func MilliTimestamp ¶
func MilliTimestamp() Timestamper
MilliTimestamp return the number of milliseconds elapsed since January 1, 1970 UTC.
func NanoTimestamp ¶
func NanoTimestamp() Timestamper
NanoTimestamp return the number of nanoseconds elapsed since January 1, 1970 UTC
func SecondTimestamp ¶
func SecondTimestamp() Timestamper
SecondTimestamp return the number of seconds elapsed since January 1, 1970 UTC
Source Files ¶
- agent.go
- agent_options.go
- buffer_forwarder.go
- collector.go
- context.go
- dsv_formatter.go
- file_forwarder.go
- formatter.go
- forwarder.go
- go_memstats_collector.go
- go_metrics_collector.go
- ilp_formatter.go
- metadata.go
- metric.go
- questdb_forwarder.go
- questdb_options.go
- stdout_forwarder.go
- telemetry.go
- timestamper.go
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
cmd/agent-example
Program agent-example provide an example of how to create agent.
|
Program agent-example provide an example of how to create agent. |