adaptive

package
v1.46.0-tgt.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddFlags

func AddFlags(flagSet *flag.FlagSet)

AddFlags adds flags for Options

func FloatEquals

func FloatEquals(a, b float64) bool

FloatEquals compares two floats with 10 decimal positions precision.

func NewAggregator

func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator

NewAggregator creates a throughput aggregator that simply emits metrics about the number of operations seen over the aggregationInterval.

func TruncateFloat

func TruncateFloat(v float64) string

TruncateFloat truncates float to 6 decimal positions and converts to string.

Types

type Factory

type Factory struct {
	// contains filtered or unexported fields
}

Factory implements strategystore.Factory for an adaptive strategy store.

func NewFactory

func NewFactory() *Factory

NewFactory creates a new Factory.

func (*Factory) AddFlags

func (f *Factory) AddFlags(flagSet *flag.FlagSet)

AddFlags implements plugin.Configurable

func (*Factory) CreateStrategyStore

func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error)

CreateStrategyStore implements strategystore.Factory

func (*Factory) InitFromViper

func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger)

InitFromViper implements plugin.Configurable

func (*Factory) Initialize

func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error

Initialize implements strategystore.Factory

type Options

type Options struct {
	// TargetSamplesPerSecond is the global target rate of samples per operation.
	// TODO implement manual overrides per service/operation.
	TargetSamplesPerSecond float64

	// DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target)
	// throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation)
	// means that if abs((actual-expected) / expected) < 0.3, then the actual sampling rate is "close enough"
	// and the system does not need to send an updated sampling probability (the "control signal" u(t)
	// in the PID Controller terminology) to the sampler in the application.
	//
	// Increase this to reduce the amount of fluctuation in the calculated probabilities.
	DeltaTolerance float64

	// CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute,
	// new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth
	// of aggregated throughput data.
	CalculationInterval time.Duration

	// AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if
	// the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the
	// AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for
	// all operations.
	// TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice.
	AggregationBuckets int

	// BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS,
	// ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS.
	BucketsForCalculation int

	// Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval
	// is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time
	// we'll have [now()-12m,now()-2m] range of throughput data in memory to base the calculations
	// off of. This delay is necessary to counteract the rate at which the jaeger clients poll for
	// the latest sampling probabilities. The default client poll rate is 1 minute, which means that
	// during any 1 minute interval, the clients will be fetching new probabilities in a uniformly
	// distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can
	// guarantee that all clients can use the latest calculated probabilities for at least 1 minute.
	Delay time.Duration

	// InitialSamplingProbability is the initial sampling probability for all new operations.
	InitialSamplingProbability float64

	// MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling
	// probability will be in the range [MinSamplingProbability, 1.0].
	MinSamplingProbability float64

	// MinSamplesPerSecond determines the min number of traces that are sampled per second.
	// For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do
	// its best to sample at least one trace a minute for an operation. This is useful for low QPS operations
	// that may never be sampled by the probabilistic sampler.
	MinSamplesPerSecond float64

	// LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before
	// attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval
	// to reduce lock thrashing.
	LeaderLeaseRefreshInterval time.Duration

	// FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower
	// (ie. failed to gain the leader lock).
	FollowerLeaseRefreshInterval time.Duration
}

Options holds configuration for the adaptive sampling strategy store. The abbreviation SPS refers to "samples-per-second", which is the target of the optimization/control implemented by the adaptive sampling.

func (*Options) InitFromViper

func (opts *Options) InitFromViper(v *viper.Viper) *Options

InitFromViper initializes Options with properties from viper

type Processor

type Processor struct {
	sync.RWMutex
	Options
	// contains filtered or unexported fields
}

Processor retrieves service throughput over a look back interval and calculates sampling probabilities per operation such that each operation is sampled at a specified target QPS. It achieves this by retrieving discrete buckets of operation throughput and doing a weighted average of the throughput and generating a probability to match the targetQPS.

func NewStrategyStore

func NewStrategyStore(options Options, metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) (*Processor, error)

NewStrategyStore creates a strategy store that holds adaptive sampling strategies.

func (*Processor) Close

func (p *Processor) Close() error

Close stops the processor from calculating probabilities.

func (*Processor) GetSamplingStrategy

func (p *Processor) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error)

GetSamplingStrategy implements Thrift endpoint for retrieving sampling strategy for a service.

func (*Processor) Start

func (p *Processor) Start() error

Start initializes and starts the sampling processor which regularly calculates sampling probabilities.

type SamplingCache

type SamplingCache map[string]map[string]*SamplingCacheEntry

SamplingCache is a nested map: service -> operation -> cache entry.

func (SamplingCache) Get

func (s SamplingCache) Get(service, operation string) *SamplingCacheEntry

Get retrieves the entry for given service/operation. Returns nil if not found.

func (SamplingCache) Set

func (s SamplingCache) Set(service, operation string, entry *SamplingCacheEntry)

Set adds a new entry for given service/operation.

type SamplingCacheEntry

type SamplingCacheEntry struct {
	Probability   float64
	UsingAdaptive bool
}

SamplingCacheEntry keeps track of the probability and whether a service-operation is observed using adaptive sampling.

type WeightVectorCache

type WeightVectorCache struct {
	sync.Mutex
	// contains filtered or unexported fields
}

WeightVectorCache stores normalizing weight vectors of different lengths. The head of each weight vector contains the largest weight.

func NewWeightVectorCache

func NewWeightVectorCache() *WeightVectorCache

NewWeightVectorCache returns a new weights vector cache.

func (*WeightVectorCache) GetWeights

func (c *WeightVectorCache) GetWeights(length int) []float64

GetWeights returns weights for the specified length { w(i) = i ^ 4, i=1..L }, normalized.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL