metrics

package
v3.0.0-...-7ba4d6b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 12 Imported by: 0

Documentation

Overview

Package metrics implements the Beam metrics API, described at http://s.apache.org/beam-metrics-api

Metrics in the Beam model are uniquely identified by a namespace, a name, and the PTransform context in which they are used. Further, they are reported as a delta against the bundle being processed, so that overcounting doesn't occur if a bundle needs to be retried. Each metric is scoped to their bundle, and ptransform.

Cells (or metric cells) are defined for each Beam model metric type, and the serve as concurrency safe storage of a given metric's values. Proxys are exported values representing the metric, for use in user ptransform code. They don't retain their cells, since they don't have the context to be able to store them for export back to the pipeline runner.

Metric cells aren't initialized until their first mutation, which follows from the Beam model design, where metrics are only sent for a bundle if they have changed. This is particularly convenient for distributions which means their min and max fields can be set to the first value on creation rather than have some marker of uninitialized state, which would otherwise need to be checked for on every update.

Metric values are implemented as lightweight proxies of the user provided namespace and name. This allows them to be declared globally, and used in any ParDo. Further, as per the design, they can be declared dynamically at runtime.

To handle reporting deltas on the metrics by bundle, metrics are keyed by bundleID,PTransformID,namespace, and name, so metrics that are identical except for bundles are treated as distinct, effectively providing per bundle deltas, since a new value cell is used per bundle.

Index

Constants

View Source
const (
	// StartBundle indicates starting state of a bundle
	StartBundle bundleProcState = 0
	// ProcessBundle indicates processing state of a bundle
	ProcessBundle bundleProcState = 1
	// FinishBundle indicates finishing state of a bundle
	FinishBundle bundleProcState = 2
	// TotalBundle (not a state) used for aggregating above states of a bundle
	TotalBundle bundleProcState = 3
)

Variables

This section is empty.

Functions

func DumpToLog

func DumpToLog(ctx context.Context)

DumpToLog is a debugging function that outputs all metrics available locally to beam.Log.

func DumpToLogFromStore

func DumpToLogFromStore(ctx context.Context, store *Store)

DumpToLogFromStore dumps the metrics in the provided Store to beam.Log.

func DumpToOutFromContext

func DumpToOutFromContext(ctx context.Context)

DumpToOutFromContext is a debugging function that outputs all metrics available locally to std out, extracting the metric store from the context.

func DumpToOutFromStore

func DumpToOutFromStore(store *Store)

DumpToOutFromStore is a debugging function that outputs all metrics available locally to std out directly from the store.

func GetBundleID

func GetBundleID(ctx context.Context) string

GetBundleID sources the Bundle's instruction ID from a context, if available.

For Beam internal use only. Subject to change.

func GetTransformID

func GetTransformID(ctx context.Context) string

GetTransformID sources the TransformID from a context, if available.

For Beam internal use only. Subject to change.

func SetBundleID

func SetBundleID(ctx context.Context, id string) context.Context

SetBundleID sets the id of the current Bundle, and populates the store.

func SetPTransformID

func SetPTransformID(ctx context.Context, id string) context.Context

SetPTransformID sets the id of the current PTransform. Must only be called on a context returned by SetBundleID.

Types

type BundleState

type BundleState struct {
	// contains filtered or unexported fields
}

BundleState stores information about a PTransform for execution time metrics.

func (BundleState) String

func (b BundleState) String() string

String implements the Stringer interface.

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter is a simple counter for incrementing and decrementing a value.

func NewCounter

func NewCounter(ns, n string) *Counter

NewCounter returns the Counter with the given namespace and name.

func (*Counter) Dec

func (m *Counter) Dec(ctx context.Context, v int64)

Dec decrements the counter within the given PTransform context by v.

func (*Counter) Inc

func (m *Counter) Inc(ctx context.Context, v int64)

Inc increments the counter within the given PTransform context by v.

func (*Counter) String

func (m *Counter) String() string

type CounterResult

type CounterResult struct {
	Attempted, Committed int64
	Key                  StepKey
}

CounterResult is an attempted and a commited value of a counter metric plus key.

func MergeCounters

func MergeCounters(
	attempted map[StepKey]int64,
	committed map[StepKey]int64) []CounterResult

MergeCounters combines counter metrics that share a common key.

func (CounterResult) Name

func (r CounterResult) Name() string

Name returns the Name of this Counter.

func (CounterResult) Namespace

func (r CounterResult) Namespace() string

Namespace returns the Namespace of this Counter.

func (CounterResult) Result

func (r CounterResult) Result() int64

Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).

func (CounterResult) Transform

func (r CounterResult) Transform() string

Transform returns the Transform step for this CounterResult.

type Distribution

type Distribution struct {
	// contains filtered or unexported fields
}

Distribution is a simple distribution of values.

func NewDistribution

func NewDistribution(ns, n string) *Distribution

NewDistribution returns the Distribution with the given namespace and name.

func (*Distribution) String

func (m *Distribution) String() string

func (*Distribution) Update

func (m *Distribution) Update(ctx context.Context, v int64)

Update updates the distribution within the given PTransform context with v.

type DistributionResult

type DistributionResult struct {
	Attempted, Committed DistributionValue
	Key                  StepKey
}

DistributionResult is an attempted and a commited value of a distribution metric plus key.

func MergeDistributions

func MergeDistributions(
	attempted map[StepKey]DistributionValue,
	committed map[StepKey]DistributionValue) []DistributionResult

MergeDistributions combines distribution metrics that share a common key.

func (DistributionResult) Name

func (r DistributionResult) Name() string

Name returns the Name of this Distribution.

func (DistributionResult) Namespace

func (r DistributionResult) Namespace() string

Namespace returns the Namespace of this Distribution.

func (DistributionResult) Result

Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).

func (DistributionResult) Transform

func (r DistributionResult) Transform() string

Transform returns the Transform step for this DistributionResult.

type DistributionValue

type DistributionValue struct {
	Count, Sum, Min, Max int64
}

DistributionValue is the value of a Distribution metric.

type ExecutionState

type ExecutionState struct {
	State        bundleProcState
	IsProcessing bool // set to true when sent as a response to ProcessBundleProgress Request
	TotalTime    time.Duration
}

ExecutionState stores the information about a bundle in a particular state.

func (ExecutionState) String

func (e ExecutionState) String() string

String implements the Stringer interface.

type Extractor

type Extractor struct {
	// SumInt64 extracts data from Sum Int64 counters.
	SumInt64 func(labels Labels, v int64)
	// DistributionInt64 extracts data from Distribution Int64 counters.
	DistributionInt64 func(labels Labels, count, sum, min, max int64)
	// GaugeInt64 extracts data from Gauge Int64 counters.
	GaugeInt64 func(labels Labels, v int64, t time.Time)

	// MsecsInt64 extracts data from StateRegistry of ExecutionState.
	// Extraction of Msec counters is experimental and subject to change.
	MsecsInt64 func(labels string, e *[4]ExecutionState)
}

Extractor allows users to access metrics programatically after pipeline completion. Users assign functions to fields that interest them, and that function is called for each metric of the associated kind.

func (Extractor) ExtractFrom

func (e Extractor) ExtractFrom(store *Store) error

ExtractFrom the given metrics Store all the metrics for populated function fields. Returns an error if no fields were set.

type Gauge

type Gauge struct {
	// contains filtered or unexported fields
}

Gauge is a time, value pair metric.

func NewGauge

func NewGauge(ns, n string) *Gauge

NewGauge returns the Gauge with the given namespace and name.

func (*Gauge) Set

func (m *Gauge) Set(ctx context.Context, v int64)

Set sets the gauge to the given value, and associates it with the current time on the clock.

func (*Gauge) String

func (m *Gauge) String() string

type GaugeResult

type GaugeResult struct {
	Attempted, Committed GaugeValue
	Key                  StepKey
}

GaugeResult is an attempted and a commited value of a gauge metric plus key.

func MergeGauges

func MergeGauges(
	attempted map[StepKey]GaugeValue,
	committed map[StepKey]GaugeValue) []GaugeResult

MergeGauges combines gauge metrics that share a common key.

func (GaugeResult) Name

func (r GaugeResult) Name() string

Name returns the Name of this Gauge.

func (GaugeResult) Namespace

func (r GaugeResult) Namespace() string

Namespace returns the Namespace of this Gauge.

func (GaugeResult) Result

func (r GaugeResult) Result() GaugeValue

Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).

func (GaugeResult) Transform

func (r GaugeResult) Transform() string

Transform returns the Transform step for this GaugeResult.

type GaugeValue

type GaugeValue struct {
	Value     int64
	Timestamp time.Time
}

GaugeValue is the value of a Gauge metric.

type Labels

type Labels struct {
	// contains filtered or unexported fields
}

Labels provide the context for the given metric.

func PCollectionLabels

func PCollectionLabels(pcollection string) Labels

PCollectionLabels builds a Labels for pcollection metrics. Intended for framework use.

func PTransformLabels

func PTransformLabels(transform string) Labels

PTransformLabels builds a Labels for transform metrics. Intended for framework use.

func UserLabels

func UserLabels(transform, namespace, name string) Labels

UserLabels builds a Labels for user metrics. Intended for framework use.

func (Labels) Map

func (l Labels) Map() map[string]string

Map produces a map of present labels to their values.

Returns nil map if invalid.

func (Labels) Name

func (l Labels) Name() string

Name returns the name for this metric.

func (Labels) Namespace

func (l Labels) Namespace() string

Namespace returns the namespace context for this metric.

func (Labels) PCollection

func (l Labels) PCollection() string

PCollection returns the PCollection id for this metric.

func (Labels) Transform

func (l Labels) Transform() string

Transform returns the transform context for this metric, if available.

type MsecResult

type MsecResult struct {
	Attempted, Committed MsecValue
	Key                  StepKey
}

MsecResult is an attempted and a commited value of a counter metric plus key.

func MergeMsecs

func MergeMsecs(
	attempted map[StepKey]MsecValue,
	committed map[StepKey]MsecValue) []MsecResult

MergeMsecs combines counter metrics that share a common key.

func (MsecResult) Name

func (r MsecResult) Name() string

Name returns the Name of this MsecResult.

func (MsecResult) Namespace

func (r MsecResult) Namespace() string

Namespace returns the Namespace of this MsecResult.

func (MsecResult) Result

func (r MsecResult) Result() MsecValue

Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).

func (MsecResult) Transform

func (r MsecResult) Transform() string

Transform returns the Transform step for this MsecResult.

type MsecValue

type MsecValue struct {
	Start, Process, Finish, Total time.Duration
}

MsecValue is the value of a single msec metric.

type PColResult

type PColResult struct {
	Attempted, Committed PColValue
	Key                  StepKey
}

PColResult is an attempted and a commited value of a pcollection metric plus key.

func MergePCols

func MergePCols(
	attempted map[StepKey]PColValue,
	committed map[StepKey]PColValue) []PColResult

MergePCols combines pcollection metrics that share a common key.

func (PColResult) Name

func (r PColResult) Name() string

Name returns the Name of this Pcollection Result.

func (PColResult) Namespace

func (r PColResult) Namespace() string

Namespace returns the Namespace of this Pcollection Result.

func (PColResult) Result

func (r PColResult) Result() PColValue

Result returns committed metrics. Falls back to attempted metrics if committed are not populated (e.g. due to not being supported on a given runner).

func (PColResult) Transform

func (r PColResult) Transform() string

Transform returns the Transform step for this Pcollection Result.

type PColValue

type PColValue struct {
	ElementCount    int64
	SampledByteSize DistributionValue
}

PColValue is the value of a single PCollection metric.

type PTransformState

type PTransformState struct {
	// contains filtered or unexported fields
}

PTransformState stores the state of PTransform for DoFn metrics.

func NewPTransformState

func NewPTransformState(pid string) *PTransformState

NewPTransformState creates a new PTransformState.

func (*PTransformState) Set

func (s *PTransformState) Set(ctx context.Context, state bundleProcState)

Set stores the state of PTransform in its bundle.

type QueryResults

type QueryResults struct {
	// contains filtered or unexported fields
}

QueryResults is the result of a query. Allows accessing all of the metrics that matched the filter.

func (QueryResults) Counters

func (qr QueryResults) Counters() []CounterResult

Counters returns a slice of counter metrics.

func (QueryResults) Distributions

func (qr QueryResults) Distributions() []DistributionResult

Distributions returns a slice of distribution metrics.

func (QueryResults) Gauges

func (qr QueryResults) Gauges() []GaugeResult

Gauges returns a slice of gauge metrics.

func (QueryResults) Msecs

func (qr QueryResults) Msecs() []MsecResult

Msecs returns a slice of DoFn metrics

func (QueryResults) PCols

func (qr QueryResults) PCols() []PColResult

PCols returns a slice of PCollection metrics.

type Results

type Results struct {
	// contains filtered or unexported fields
}

Results represents all metrics gathered during the job's execution. It allows for querying metrics using a provided filter.

func NewResults

func NewResults(
	counters []CounterResult,
	distributions []DistributionResult,
	gauges []GaugeResult,
	msecs []MsecResult,
	pCols []PColResult) *Results

NewResults creates a new Results.

func ResultsExtractor

func ResultsExtractor(ctx context.Context) Results

ResultsExtractor extracts the metrics.Results from Store using ctx. This is same as what metrics.dumperExtractor and metrics.dumpTo would do together.

func (Results) AllMetrics

func (mr Results) AllMetrics() QueryResults

AllMetrics returns all metrics from a Results instance.

func (Results) Query

func (mr Results) Query(f func(SingleResult) bool) QueryResults

Query allows metrics querying with filter. The filter takes the form of predicate function. Example:

qr = pr.Metrics().Query(func(mr beam.MetricResult) bool {
    return sr.Namespace() == test.namespace
})

type SingleResult

type SingleResult interface {
	Name() string
	Namespace() string
	Transform() string
}

SingleResult interface facilitates metrics query filtering methods.

type StateSampler

type StateSampler struct {
	// contains filtered or unexported fields
}

StateSampler tracks the state of a bundle.

func NewSampler

func NewSampler(store *Store) StateSampler

NewSampler creates a new state sampler.

func (*StateSampler) Sample

func (s *StateSampler) Sample(ctx context.Context, t time.Duration)

Sample checks for state transition in processing a DoFn

func (*StateSampler) SetLogInterval

func (s *StateSampler) SetLogInterval(t time.Duration)

SetLogInterval sets the logging interval for lull reporting.

type StepKey

type StepKey struct {
	Step, Name, Namespace string
}

StepKey uniquely identifies a metric within a pipeline graph.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store retains per transform countersets, intended for per bundle use.

func GetStore

func GetStore(ctx context.Context) *Store

GetStore extracts the metrics Store for the given context for a bundle.

Returns nil if the context doesn't contain a metric Store.

func (*Store) BundleState

func (b *Store) BundleState() string

BundleState returns the bundle state.

func (*Store) StateRegistry

func (b *Store) StateRegistry() string

StateRegistry returns the state registry that stores bundleID to executions states mapping.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL