adaptive

package
v1.61.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

README

Adaptive Sampling

Adaptive sampling works in Jaeger collector by observing the spans received from services and recalculating sampling probabilities for each service/endpoint combination to ensure that the volume of collected traces matches the desired target of traces per second. When a new service or endpoint is detected, it is initially sampled with "initial-sampling-probability" until enough data is collected to calculate the rate appropriate for the traffic going through the endpoint.

Adaptive sampling requires a storage backend to store the observed traffic data and computed probabilities. At the moment memory (for all-in-one deployment), cassandra, badger, elasticsearch and opensearch are supported as sampling storage backends.

Note: adaptive sampling in Jaeger backend does not actually do the sampling. The sampling is performed by OTEL SDKs, and sampling decisions are propagated through trace context. The job of Adaptive Sampling is to dynamically calculate sampling probabilities and expose them as sampling strategies via Jaeger's Remote Sampling protocol.

References:

Implementation details

There are three main components of the Adaptive Sampling: Aggregator, Post-aggregator (could use a better name), and Provider.

Aggregator

Aggregator is a component that runs in the ingestion pipeline (e.g. as a trace processor in OTEL Collector). It looks at all spans passing through that instance of the collector and looks for root spans. Each root span indicates a new trace being generated, so the aggregation aggregates the count of those traces (grouped by service name and span name) and periodically flushes those aggregates (called "throughput") to storage.

Post-aggregator

Post-aggregator is the main logic responcible for adaptive part of this sampling strategy implementation. Its main job is to load all throughput from storage (because multiple instances of collector could've written different aggregates), aggregate it into a final output, and compute the desired sampling probabilities, which are also written into storage.

In a typical production usage Jaeger deployment consists of many collectors. Each collector runs an independent aggregator, because they do not require coordination as long as there is a shared storage. Each collector also runs post-aggregator, however only one of those should be combining the output of all aggregators and producing the final sampling probabilities. This is achived by using a simple leader-follower election with the help of the storage backend. The leader post-aggregator does the main job of the computation, while the follower-aggregators are only loading the throughput from storage and aggregate it in memory, so that each of them is ready to assume the role of the leader if needed, but they do not compute the probabilities or write them back into storage.

Provider

Provider is responsible for providing the sampling strategy to the SDKs when they poll the /sampling endpoint. It periodically reads the computed sampling probabilities from storage and translates them into sampling strategy output expected by the Jaeger Remote Sampling protocol.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddFlags

func AddFlags(flagSet *flag.FlagSet)

AddFlags adds flags for Options

func FloatEquals

func FloatEquals(a, b float64) bool

FloatEquals compares two floats with 10 decimal positions precision.

func NewAggregator

func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (samplingstrategy.Aggregator, error)

NewAggregator creates a throughput aggregator that simply emits metrics about the number of operations seen over the aggregationInterval.

func TruncateFloat

func TruncateFloat(v float64) string

TruncateFloat truncates float to 6 decimal positions and converts to string.

Types

type Factory

type Factory struct {
	// contains filtered or unexported fields
}

Factory implements samplingstrategy.Factory for an adaptive strategy store.

func NewFactory

func NewFactory() *Factory

NewFactory creates a new Factory.

func (*Factory) AddFlags

func (*Factory) AddFlags(flagSet *flag.FlagSet)

AddFlags implements plugin.Configurable

func (*Factory) Close

func (f *Factory) Close() error

Closes the factory

func (*Factory) CreateStrategyProvider

func (f *Factory) CreateStrategyProvider() (samplingstrategy.Provider, samplingstrategy.Aggregator, error)

CreateStrategyProvider implements samplingstrategy.Factory

func (*Factory) InitFromViper

func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger)

InitFromViper implements plugin.Configurable

func (*Factory) Initialize

func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error

Initialize implements samplingstrategy.Factory

type Options

type Options struct {
	// TargetSamplesPerSecond is the global target rate of samples per operation.
	// TODO implement manual overrides per service/operation.
	TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"`

	// DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target)
	// throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation)
	// means that if abs((actual-expected) / expected) < 0.3, then the actual sampling rate is "close enough"
	// and the system does not need to send an updated sampling probability (the "control signal" u(t)
	// in the PID Controller terminology) to the sampler in the application.
	//
	// Increase this to reduce the amount of fluctuation in the calculated probabilities.
	DeltaTolerance float64 `mapstructure:"delta_tolerance"`

	// CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute,
	// new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth
	// of aggregated throughput data.
	CalculationInterval time.Duration `mapstructure:"calculation_interval"`

	// AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if
	// the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the
	// AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for
	// all operations.
	// TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice.
	AggregationBuckets int `mapstructure:"aggregation_buckets"`

	// BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS,
	// ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS.
	BucketsForCalculation int `mapstructure:"calculation_buckets"`

	// Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval
	// is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time
	// we'll have [now()-12m,now()-2m] range of throughput data in memory to base the calculations
	// off of. This delay is necessary to counteract the rate at which the jaeger clients poll for
	// the latest sampling probabilities. The default client poll rate is 1 minute, which means that
	// during any 1 minute interval, the clients will be fetching new probabilities in a uniformly
	// distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can
	// guarantee that all clients can use the latest calculated probabilities for at least 1 minute.
	Delay time.Duration `mapstructure:"calculation_delay"`

	// InitialSamplingProbability is the initial sampling probability for all new operations.
	InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"`

	// MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling
	// probability will be in the range [MinSamplingProbability, 1.0].
	MinSamplingProbability float64 `mapstructure:"min_sampling_probability"`

	// MinSamplesPerSecond determines the min number of traces that are sampled per second.
	// For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do
	// its best to sample at least one trace a minute for an operation. This is useful for low QPS operations
	// that may never be sampled by the probabilistic sampler.
	MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"`

	// LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before
	// attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval
	// to reduce lock thrashing.
	LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"`

	// FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower
	// (ie. failed to gain the leader lock).
	FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"`
}

Options holds configuration for the adaptive sampling strategy store. The abbreviation SPS refers to "samples-per-second", which is the target of the optimization/control implemented by the adaptive sampling.

func DefaultOptions added in v1.60.0

func DefaultOptions() Options

func (*Options) InitFromViper

func (opts *Options) InitFromViper(v *viper.Viper) *Options

InitFromViper initializes Options with properties from viper

type PostAggregator

type PostAggregator struct {
	sync.RWMutex
	Options
	// contains filtered or unexported fields
}

PostAggregator retrieves service throughput over a lookback interval and calculates sampling probabilities per operation such that each operation is sampled at a specified target QPS. It achieves this by retrieving discrete buckets of operation throughput and doing a weighted average of the throughput and generating a probability to match the targetQPS.

func (*PostAggregator) Start

func (p *PostAggregator) Start() error

Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities.

type Provider

type Provider struct {
	sync.RWMutex
	Options
	// contains filtered or unexported fields
}

Provider is responsible for providing sampling strategies for services. It periodically loads sampling probabilities from storage and converts them into sampling strategies that are cached and served to clients. Provider relies on sampling probabilities being periodically updated by the aggregator & post-aggregator.

func NewProvider

func NewProvider(options Options, logger *zap.Logger, participant leaderelection.ElectionParticipant, store samplingstore.Store) *Provider

NewProvider creates a strategy store that holds adaptive sampling strategies.

func (*Provider) Close

func (p *Provider) Close() error

Close stops the service from loading probabilities and generating strategies.

func (*Provider) GetSamplingStrategy

func (p *Provider) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error)

GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service.

func (*Provider) Start

func (p *Provider) Start() error

Start initializes and starts the sampling service which regularly loads sampling probabilities and generates strategies.

type SamplingCache

type SamplingCache map[string]map[string]*SamplingCacheEntry

SamplingCache is a nested map: service -> operation -> cache entry.

func (SamplingCache) Get

func (s SamplingCache) Get(service, operation string) *SamplingCacheEntry

Get retrieves the entry for given service/operation. Returns nil if not found.

func (SamplingCache) Set

func (s SamplingCache) Set(service, operation string, entry *SamplingCacheEntry)

Set adds a new entry for given service/operation.

type SamplingCacheEntry

type SamplingCacheEntry struct {
	Probability   float64
	UsingAdaptive bool
}

SamplingCacheEntry keeps track of the probability and whether a service-operation is observed using adaptive sampling.

type WeightVectorCache

type WeightVectorCache struct {
	sync.Mutex
	// contains filtered or unexported fields
}

WeightVectorCache stores normalizing weight vectors of different lengths. The head of each weight vector contains the largest weight.

func NewWeightVectorCache

func NewWeightVectorCache() *WeightVectorCache

NewWeightVectorCache returns a new weights vector cache.

func (*WeightVectorCache) GetWeights

func (c *WeightVectorCache) GetWeights(length int) []float64

GetWeights returns weights for the specified length { w(i) = i ^ 4, i=1..L }, normalized.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL