streamaggr

package
v0.13.0-victorialogs Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregators

type Aggregators struct {
	// contains filtered or unexported fields
}

Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.

func LoadFromFile

func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, error)

LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.

opts can contain additional options. If opts is nil, then default options are used.

The returned Aggregators must be stopped with MustStop() when no longer needed.

func (*Aggregators) Equal added in v1.90.0

func (a *Aggregators) Equal(b *Aggregators) bool

Equal returns true if a and b are initialized from identical configs.

func (*Aggregators) IsEnabled added in v1.97.7

func (a *Aggregators) IsEnabled() bool

IsEnabled returns true if Aggregators has at least one configured aggregator

func (*Aggregators) MustStop

func (a *Aggregators) MustStop()

MustStop stops a.

func (*Aggregators) Push

func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) []byte

Push pushes tss to a.

Push sets matchIdxs[idx] to 1 if the corresponding tss[idx] was used in aggregations. Otherwise matchIdxs[idx] is set to 0.

Push returns matchIdxs with len equal to len(tss). It re-uses the matchIdxs if it has enough capacity to hold len(tss) items. Otherwise it allocates new matchIdxs.

type Config

type Config struct {
	// Match is a label selector for filtering time series for the given selector.
	//
	// If the match isn't set, then all the input time series are processed.
	Match *promrelabel.IfExpression `yaml:"match,omitempty"`

	// Interval is the interval between aggregations.
	Interval string `yaml:"interval"`

	// NoAlighFlushToInterval disables aligning of flushes to multiples of Interval.
	// By default flushes are aligned to Interval.
	//
	// See also FlushOnShutdown.
	NoAlignFlushToInterval *bool `yaml:"no_align_flush_to_interval,omitempty"`

	// FlushOnShutdown defines whether to flush incomplete aggregation state on startup and shutdown.
	// By default incomplete aggregation state is dropped, since it may confuse users.
	FlushOnShutdown *bool `yaml:"flush_on_shutdown,omitempty"`

	// DedupInterval is an optional interval for deduplication.
	DedupInterval string `yaml:"dedup_interval,omitempty"`

	// Staleness interval is interval after which the series state will be reset if no samples have been sent during it.
	// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket.
	StalenessInterval string `yaml:"staleness_interval,omitempty"`

	// Outputs is a list of output aggregate functions to produce.
	//
	// The following names are allowed:
	//
	// - rate_sum - calculates sum of rate for input counters
	// - rate_avg - calculates average of rate for input counters
	// - total - aggregates input counters
	// - total_prometheus - aggregates input counters, ignoring the first sample in new time series
	// - increase - calculates the increase over input series
	// - increase_prometheus - calculates the increase over input series, ignoring the first sample in new time series
	// - count_series - counts the number of unique input series
	// - count_samples - counts the input samples
	// - unique_samples - counts the number of unique sample values
	// - sum_samples - sums the input sample values
	// - last - the last biggest sample value
	// - min - the minimum sample value
	// - max - the maximum sample value
	// - avg - the average value across all the samples
	// - stddev - standard deviation across all the samples
	// - stdvar - standard variance across all the samples
	// - histogram_bucket - creates VictoriaMetrics histogram for input samples
	// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
	//
	// The output time series will have the following names by default:
	//
	//   input_name:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
	//
	// See also KeepMetricNames
	//
	Outputs []string `yaml:"outputs"`

	// KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
	KeepMetricNames *bool `yaml:"keep_metric_names,omitempty"`

	// IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval.
	IgnoreOldSamples *bool `yaml:"ignore_old_samples,omitempty"`

	// IgnoreFirstIntervals sets number of aggregation intervals to be ignored on start.
	IgnoreFirstIntervals *int `yaml:"ignore_first_intervals,omitempty"`

	// By is an optional list of labels for grouping input series.
	//
	// See also Without.
	//
	// If neither By nor Without are set, then the Outputs are calculated
	// individually per each input time series.
	By []string `yaml:"by,omitempty"`

	// Without is an optional list of labels, which must be excluded when grouping input series.
	//
	// See also By.
	//
	// If neither By nor Without are set, then the Outputs are calculated
	// individually per each input time series.
	Without []string `yaml:"without,omitempty"`

	// DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples.
	//
	// Labels are dropped before de-duplication and aggregation.
	DropInputLabels *[]string `yaml:"drop_input_labels,omitempty"`

	// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
	// before aggregation.
	InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"`

	// OutputRelabelConfigs is an optional relabeling rules, which are applied
	// on the aggregated output before being sent to remote storage.
	OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}

Config is a configuration for a single stream aggregation.

type Deduplicator added in v1.97.7

type Deduplicator struct {
	// contains filtered or unexported fields
}

Deduplicator deduplicates samples per each time series.

func NewDeduplicator added in v1.97.7

func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string) *Deduplicator

NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.

The de-duplicated samples are passed to pushFunc once per dedupInterval.

An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples. Common case is to drop `replica`-like labels from samples received from HA datasources.

MustStop must be called on the returned deduplicator in order to free up occupied resources.

func (*Deduplicator) MustStop added in v1.97.7

func (d *Deduplicator) MustStop()

MustStop stops d.

func (*Deduplicator) Push added in v1.97.7

func (d *Deduplicator) Push(tss []prompbmarshal.TimeSeries)

Push pushes tss to d.

type Options added in v1.97.7

type Options struct {
	// DedupInterval is deduplication interval for samples received for the same time series.
	//
	// The last sample per each series is left per each DedupInterval if DedupInterval > 0.
	//
	// By default deduplication is disabled.
	//
	// The deduplication can be set up individually per each aggregation via dedup_interval option.
	DedupInterval time.Duration

	// DropInputLabels is an optional list of labels to drop from samples before de-duplication and stream aggregation.
	DropInputLabels []string

	// NoAlignFlushToInterval disables alignment of flushes to the aggregation interval.
	//
	// By default flushes are aligned to aggregation interval.
	//
	// The alignment of flushes can be disabled individually per each aggregation via no_align_flush_to_interval option.
	NoAlignFlushToInterval bool

	// FlushOnShutdown enables flush of incomplete aggregation state on startup and shutdown.
	//
	// By default incomplete state is dropped.
	//
	// The flush of incomplete state can be enabled individually per each aggregation via flush_on_shutdown option.
	FlushOnShutdown bool

	// KeepMetricNames instructs to leave metric names as is for the output time series without adding any suffix.
	//
	// By default the following suffix is added to every output time series:
	//
	//     input_name:<interval>[_by_<by_labels>][_without_<without_labels>]_<output>
	//
	// This option can be overridden individually per each aggregation via keep_metric_names option.
	KeepMetricNames bool

	// IgnoreOldSamples instructs to ignore samples with timestamps older than the current aggregation interval.
	//
	// By default all the samples are taken into account.
	//
	// This option can be overridden individually per each aggregation via ignore_old_samples option.
	IgnoreOldSamples bool

	// IgnoreFirstIntervals sets amount of aggregation intervals to ignore on start.
	//
	// By default, no intervals will be ignored.
	//
	// This option can be overridden individually per each aggregation via ignore_first_intervals option.
	IgnoreFirstIntervals int
}

Options contains optional settings for the Aggregators.

type PushFunc

type PushFunc func(tss []prompbmarshal.TimeSeries)

PushFunc is called by Aggregators when it needs to push its state to metrics storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL