telemetry

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2024 License: MIT Imports: 18 Imported by: 2

README

/pkg/telemetry

cd /

[!NOTE] Asyncmachine-go is an AOP Actor Model library for distributed workflows, built on top of a clock-based state machine. It has atomic transitions, subscriptions, RPC, logging, TUI debugger, metrics, tracing, and soon diagrams.

/pkg/telemetry provides several telemetry exporters and a Grafana dashboard:

dbg

dbg is a simple telemetry used by am-dbg TUI Debugger. It delivers DbgMsg and DbgMsgStruct via standard net/rpc. It can also be consumed by a custom client.

OpenTelemetry Traces

Open Telemetry traces integration exposes machine's states and transitions as Otel traces, compatible with Jaeger. Tracers are inherited from parent machines and form a tree.

Prometheus Grafana

Tree Structure
- mach:ID
  - states
    - Foo
      - Foo (trace)
      - Foo (trace)
      - ...
    - ...
  - transitions
    - [add] Foo
      - FooEnter (trace)
      - FooState (trace)
      - ...
    - ...
  - submachines
    - mach:ID2
      - ...
    - ...

You can import an existing asset into your Jaeger instance for an interactive demo.

Otel tracing Setup
import (
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
    "go.opentelemetry.io/otel/trace"
)

// ...

var mach *am.Machine
var tracer trace.Tracer

machTracer := amtele.NewOtelMachTracer(tracer, &amtele.OtelMachTracerOpts{
    SkipTransitions: false,
})
mach.BindTracer(machTracer)

OpenTelemetry Logger

Open Telemetry logger integration exposes machine's logs (any level) as structured Otlp format. It can be very handy for stdout logging.

import (
    "go.opentelemetry.io/otel/sdk/log"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
)

// ...

var mach *am.Machine
var logExporter log.Exporter

// activate logs
mach.SetLogLevel(am.LogLevelOps)

// create a log provider
logProvider := amtele.NewOtelLoggerProvider(logExporter)
// bind provider to a machine
amtele.BindOtelLogger(mach, logProvider, "myserviceid")

Prometheus Metrics

pkg/telemetry/prometheus binds to machine's transactions, collects values within a defined interval, and exposes averaged metrics. Use it with the provided Grafana dashboard. Tracers are inherited from parent machines.

Prometheus Setup
import (
    "time"
    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amprom "github.com/pancsta/asyncmachine-go/pkg/telemetry/prometheus"
    "github.com/prometheus/client_golang/prometheus/push"

// ...

var mach *am.Machine
var promRegistry *prometheus.Registry
var promPusher *push.Pusher

// bind transition to metrics
metrics := amprom.TransitionsToPrometheus(mach, 5 * time.Minute)

// bind metrics either a registry or a pusher
amprom.BindToRegistry(promRegistry)
amprom.BindToPusher(promPusher)

Loki Logger

Loki is the easiest way to persist distributed logs from asyncmachine. You'll need a promtail client.

import (
    "github.com/ic2hrmk/promtail"

    am "github.com/pancsta/asyncmachine-go/pkg/machine"
    amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
)

// ...

var mach *am.Machine
var service string

// init promtail and bind AM logger
identifiers := map[string]string{
    "service_name": service,
}
promtailClient, err := promtail.NewJSONv1Client("localhost:3100", identifiers)
if err != nil {
    panic(err)
}
defer promtailClient.Close()
amtele.BindLokiLogger(mach, promtailClient)

Grafana Dashboard

grafana

Grafana dashboards need to be generated per "source" (e.g. process), by passing all monitored machine IDs and the source name (service_name for Loki, job for Prometheus). See am-gen grafana. It will optionally auto-sync the dashboard using K-Phoen/grabana (requires GRAFANA_TOKEN).

am-gen grafana \
  --name "My Dashboard" \
  --ids MyMach1,MyMach2 \
  --source service_name_or_job \
  --grafana-url http://localhost:3000

Panels:

  • Number of transitions
  • Transition Mutations
    • Queue size
    • States added
    • States removed
    • States touched
  • Transition Details
    • Transition ticks
    • Number of steps
    • Number of handlers
  • States & Relations
    • Number of states
    • Number of relations
    • Referenced states
    • Active states
    • Inactive states
  • Average Transition Time
  • Errors
  • Log view

Inheritance

Most of the exporters are automatically inherited from parent machines, so the results come in automatically. To define a sub-parent relationship, use am.Opts.Parent while initializing a machine. Alternatively, tracers can be copied using OptsWithParentTracers, or manually via Machine.Tracers.

Documentation

Status

Testing, not semantically versioned.

monorepo

Go back to the monorepo root to continue reading.

Documentation

Overview

Package telemetry provides telemetry exporters for asyncmachine: am-dbg, Prometheus, and OpenTelemetry.

Index

Constants

View Source
const (
	// DbgAddr is the default address of the am-dbg server.
	DbgAddr = "localhost:6831"
	// EnvAmDbgAddr set the address of a running am-dbg instance.
	// "localhost:6831" | "" (default)
	EnvAmDbgAddr = "AM_DBG_ADDR"
)

Variables

This section is empty.

Functions

func BindLokiLogger added in v0.8.0

func BindLokiLogger(mach am.Api, client promtail.Client)

func BindOtelLogger added in v0.8.0

func BindOtelLogger(
	mach am.Api, provider *ologsdk.LoggerProvider, service string,
)

BindOtelLogger binds an OpenTelemetry logger to a machine.

func NewOtelLoggerProvider added in v0.8.0

func NewOtelLoggerProvider(exporter ologsdk.Exporter) *ologsdk.LoggerProvider

NewOtelLoggerProvider creates a new OpenTelemetry logger provider bound to the given exporter.

func NormalizeId added in v0.8.0

func NormalizeId(id string) string

func TransitionsToDbg added in v0.7.0

func TransitionsToDbg(mach am.Api, addr string) error

TransitionsToDbg sends transitions to the am-dbg server.

Types

type DbgMsg added in v0.5.0

type DbgMsg interface {
	// Clock returns the state's clock, using the passed index
	Clock(statesIndex am.S, state string) uint64
	// Is returns true if the state is active, using the passed index
	Is(statesIndex am.S, states am.S) bool
}

DbgMsg is the interface for the messages to be sent to the am-dbg server.

type DbgMsgStruct added in v0.5.0

type DbgMsgStruct struct {
	// Machine ID
	ID string
	// state names defining the indexes for diffs
	StatesIndex am.S
	// all the states with relations
	States am.Struct
	// parent machine ID
	Parent string
	// machine tags
	Tags []string
}

DbgMsgStruct contains the state and relations data.

func (*DbgMsgStruct) Clock added in v0.5.0

func (d *DbgMsgStruct) Clock(_ am.S, _ string) uint64

func (*DbgMsgStruct) Is added in v0.5.0

func (d *DbgMsgStruct) Is(_ am.S, _ am.S) bool

type DbgMsgTx added in v0.5.0

type DbgMsgTx struct {
	MachineID string
	// Transition ID
	ID string
	// Clocks is [am.TimeAfter], state indexes to tick values after this tx
	// TODO refac to TimeAfter, re-gen all the assets
	Clocks am.Time
	// result of the transition
	Accepted bool
	// mutation type
	Type am.MutationType
	// called states
	// TODO index optimization
	CalledStates []string
	// TODO rename to CalledStates, re-gen all assets
	CalledStatesIdxs []int
	// all the transition steps
	Steps []*am.Step
	// log entries created during the transition
	LogEntries []*am.LogEntry
	// log entries before the transition, which happened after the prev one
	PreLogEntries []*am.LogEntry
	// transition was triggered by an auto state
	IsAuto bool
	// queue length at the start of the transition
	Queue int
	// Time is human time. Don't send this over the wire.
	// TODO remove, re-gen all the assets
	Time *time.Time
}

DbgMsgTx contains transition data.

func (*DbgMsgTx) ActiveStates added in v0.8.0

func (d *DbgMsgTx) ActiveStates(statesIndex am.S) am.S

func (*DbgMsgTx) CalledStateNames added in v0.8.0

func (d *DbgMsgTx) CalledStateNames(statesIndex am.S) am.S

func (*DbgMsgTx) Clock added in v0.5.0

func (d *DbgMsgTx) Clock(statesIndex am.S, state string) uint64

func (*DbgMsgTx) Index added in v0.8.0

func (d *DbgMsgTx) Index(statesIndex am.S, state string) int

func (*DbgMsgTx) Is added in v0.5.0

func (d *DbgMsgTx) Is(statesIndex am.S, states am.S) bool

func (*DbgMsgTx) Is1 added in v0.5.0

func (d *DbgMsgTx) Is1(statesIndex am.S, state string) bool

func (*DbgMsgTx) String added in v0.8.0

func (d *DbgMsgTx) String(statesIndex am.S) string

func (*DbgMsgTx) TimeSum added in v0.5.0

func (d *DbgMsgTx) TimeSum() uint64

type DbgTracer added in v0.7.0

type DbgTracer struct {
	*am.NoOpTracer

	Addr string
	Mach am.Api
	// contains filtered or unexported fields
}

func NewDbgTracer added in v0.8.0

func NewDbgTracer(mach am.Api, addr string) *DbgTracer

func (*DbgTracer) MachineDispose added in v0.8.0

func (t *DbgTracer) MachineDispose(id string)

func (*DbgTracer) MachineInit added in v0.8.0

func (t *DbgTracer) MachineInit(mach am.Api) context.Context

func (*DbgTracer) StructChange added in v0.7.0

func (t *DbgTracer) StructChange(mach am.Api, _ am.Struct)

func (*DbgTracer) TransitionEnd added in v0.7.0

func (t *DbgTracer) TransitionEnd(tx *am.Transition)

type OtelMachTracer added in v0.5.0

type OtelMachTracer struct {
	Tracer        trace.Tracer
	Machines      map[string]*OtelMachineData
	MachinesMx    sync.Mutex
	MachinesOrder []string
	// TODO bind to env var
	Logf func(format string, args ...any)
	// contains filtered or unexported fields
}

OtelMachTracer implements machine.Tracer for OpenTelemetry. Support tracing of multiple state machines.

func NewOtelMachTracer added in v0.5.0

func NewOtelMachTracer(tracer trace.Tracer, opts *OtelMachTracerOpts,
) *OtelMachTracer

NewOtelMachTracer creates a new machine tracer from an OpenTelemetry tracer. Requires OtelMachTracer.Dispose to be called at the end.

func (*OtelMachTracer) End added in v0.5.0

func (ot *OtelMachTracer) End()

func (*OtelMachTracer) HandlerEnd added in v0.5.0

func (ot *OtelMachTracer) HandlerEnd(tx *am.Transition, _ string, _ string)

func (*OtelMachTracer) HandlerStart added in v0.5.0

func (ot *OtelMachTracer) HandlerStart(
	tx *am.Transition, emitter string, handler string,
)

func (*OtelMachTracer) Inheritable added in v0.5.0

func (ot *OtelMachTracer) Inheritable() bool

func (*OtelMachTracer) MachineDispose added in v0.5.0

func (ot *OtelMachTracer) MachineDispose(id string)

func (*OtelMachTracer) MachineInit added in v0.5.0

func (ot *OtelMachTracer) MachineInit(mach *am.Machine) context.Context

func (*OtelMachTracer) NewSubmachine added in v0.5.0

func (ot *OtelMachTracer) NewSubmachine(parent, mach *am.Machine)

NewSubmachine links 2 machines with a parent-child relationship.

func (*OtelMachTracer) QueueEnd added in v0.6.0

func (ot *OtelMachTracer) QueueEnd(*am.Machine)

func (*OtelMachTracer) TransitionEnd added in v0.5.0

func (ot *OtelMachTracer) TransitionEnd(tx *am.Transition)

func (*OtelMachTracer) TransitionInit added in v0.5.0

func (ot *OtelMachTracer) TransitionInit(tx *am.Transition)

type OtelMachTracerOpts added in v0.5.0

type OtelMachTracerOpts struct {
	// if true, only state changes will be traced
	SkipTransitions bool
	Logf            func(format string, args ...any)
}

type OtelMachineData added in v0.5.0

type OtelMachineData struct {
	ID string
	// handler ctx & span to be used for more detailed tracing inside handlers
	HandlerTrace context.Context
	HandlerSpan  trace.Span
	Lock         sync.Mutex
	Ended        bool
	// contains filtered or unexported fields
}

Directories

Path Synopsis
Package prometheus provides Prometheus metrics for asyncmachine.
Package prometheus provides Prometheus metrics for asyncmachine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL