aggregator

package
v1.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: AGPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewAggregatorCollector

func NewAggregatorCollector[T any](aggregator *Aggregator[T], prefix string) prometheus.Collector

func RegisterAggregatorCollector

func RegisterAggregatorCollector[T any](aggregator *Aggregator[T], reg prometheus.Registerer)

RegisterAggregatorCollector registers aggregator metrics collector.

Types

type AggregateFn

type AggregateFn[T any] func(T) (T, error)

type AggregationResult

type AggregationResult[T any] struct {
	// contains filtered or unexported fields
}

func (*AggregationResult[T]) Close

func (r *AggregationResult[T]) Close(err error)

Close notifies all the contributors about the error encountered. Owner of the aggregated result must propagate any processing error happened with the value.

func (*AggregationResult[T]) Handler

func (r *AggregationResult[T]) Handler() func() (T, error)

Handler returns a handler of the aggregated result. The handler is nil, if it has already been acquired. The returned function is synchronous and blocks for up to the aggregation period duration.

func (*AggregationResult[T]) Value

func (r *AggregationResult[T]) Value() (v T, ok bool)

Value returns the aggregated value and indicates whether the caller owns it.

func (*AggregationResult[T]) Wait

func (r *AggregationResult[T]) Wait() error

Wait blocks until the aggregation finishes. The block duration never exceeds aggregation period.

type Aggregator

type Aggregator[T any] struct {
	// contains filtered or unexported fields
}

Aggregator aggregates values within a time window over a period of time.

func NewAggregator

func NewAggregator[T any](window, period time.Duration) *Aggregator[T]

func (*Aggregator[T]) Aggregate

func (a *Aggregator[T]) Aggregate(key uint64, timestamp int64, fn AggregateFn[T]) (*AggregationResult[T], bool, error)

func (*Aggregator[T]) Start

func (a *Aggregator[T]) Start()

func (*Aggregator[T]) Stop

func (a *Aggregator[T]) Stop()

Stop the aggregator. It does not wait for ongoing aggregations to complete as no aggregation requests expected during shutdown.

type Limits

type Limits interface {
	DistributorAggregationWindow(tenantID string) model.Duration
	DistributorAggregationPeriod(tenantID string) model.Duration
}

type MultiTenantAggregator

type MultiTenantAggregator[T any] struct {
	*services.BasicService
	// contains filtered or unexported fields
}

func NewMultiTenantAggregator

func NewMultiTenantAggregator[T any](limits Limits, registerer prometheus.Registerer) *MultiTenantAggregator[T]

func (*MultiTenantAggregator[T]) AggregatorForTenant

func (m *MultiTenantAggregator[T]) AggregatorForTenant(tenantID string) (*Aggregator[T], bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL