Documentation ¶
Index ¶
- func AddFlags(flagSet *flag.FlagSet)
- func FloatEquals(a, b float64) bool
- func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, ...) (samplingstrategy.Aggregator, error)
- func TruncateFloat(v float64) string
- type Factory
- func (*Factory) AddFlags(flagSet *flag.FlagSet)
- func (f *Factory) Close() error
- func (f *Factory) CreateStrategyProvider() (samplingstrategy.Provider, samplingstrategy.Aggregator, error)
- func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger)
- func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, ...) error
- type Options
- type PostAggregator
- type Provider
- type SamplingCache
- type SamplingCacheEntry
- type WeightVectorCache
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FloatEquals ¶
FloatEquals compares two floats with 10 decimal positions precision.
func NewAggregator ¶
func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (samplingstrategy.Aggregator, error)
NewAggregator creates a throughput aggregator that simply emits metrics about the number of operations seen over the aggregationInterval.
func TruncateFloat ¶
TruncateFloat truncates float to 6 decimal positions and converts to string.
Types ¶
type Factory ¶
type Factory struct {
// contains filtered or unexported fields
}
Factory implements samplingstrategy.Factory for an adaptive strategy store.
func (*Factory) CreateStrategyProvider ¶
func (f *Factory) CreateStrategyProvider() (samplingstrategy.Provider, samplingstrategy.Aggregator, error)
CreateStrategyProvider implements samplingstrategy.Factory
func (*Factory) InitFromViper ¶
InitFromViper implements plugin.Configurable
func (*Factory) Initialize ¶
func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error
Initialize implements samplingstrategy.Factory
type Options ¶
type Options struct { // TargetSamplesPerSecond is the global target rate of samples per operation. // TODO implement manual overrides per service/operation. TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"` // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) // means that if abs((actual-expected) / expected) < 0.3, then the actual sampling rate is "close enough" // and the system does not need to send an updated sampling probability (the "control signal" u(t) // in the PID Controller terminology) to the sampler in the application. // // Increase this to reduce the amount of fluctuation in the calculated probabilities. DeltaTolerance float64 `mapstructure:"delta_tolerance"` // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth // of aggregated throughput data. CalculationInterval time.Duration `mapstructure:"calculation_interval"` // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for // all operations. // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. AggregationBuckets int `mapstructure:"aggregation_buckets"` // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. BucketsForCalculation int `mapstructure:"calculation_buckets"` // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time // we'll have [now()-12m,now()-2m] range of throughput data in memory to base the calculations // off of. This delay is necessary to counteract the rate at which the jaeger clients poll for // the latest sampling probabilities. The default client poll rate is 1 minute, which means that // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. Delay time.Duration `mapstructure:"calculation_delay"` // InitialSamplingProbability is the initial sampling probability for all new operations. InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"` // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling // probability will be in the range [MinSamplingProbability, 1.0]. MinSamplingProbability float64 `mapstructure:"min_sampling_probability"` // MinSamplesPerSecond determines the min number of traces that are sampled per second. // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations // that may never be sampled by the probabilistic sampler. MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"` // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval // to reduce lock thrashing. LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"` // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower // (ie. failed to gain the leader lock). FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"` }
Options holds configuration for the adaptive sampling strategy store. The abbreviation SPS refers to "samples-per-second", which is the target of the optimization/control implemented by the adaptive sampling.
func DefaultOptions ¶ added in v1.60.0
func DefaultOptions() Options
type PostAggregator ¶
PostAggregator retrieves service throughput over a lookback interval and calculates sampling probabilities per operation such that each operation is sampled at a specified target QPS. It achieves this by retrieving discrete buckets of operation throughput and doing a weighted average of the throughput and generating a probability to match the targetQPS.
func (*PostAggregator) Start ¶
func (p *PostAggregator) Start() error
Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities.
type Provider ¶
Provider is responsible for providing sampling strategies for services. It periodically loads sampling probabilities from storage and converts them into sampling strategies that are cached and served to clients. Provider relies on sampling probabilities being periodically updated by the aggregator & post-aggregator.
func NewProvider ¶
func NewProvider(options Options, logger *zap.Logger, participant leaderelection.ElectionParticipant, store samplingstore.Store) *Provider
NewProvider creates a strategy store that holds adaptive sampling strategies.
func (*Provider) Close ¶
Close stops the service from loading probabilities and generating strategies.
func (*Provider) GetSamplingStrategy ¶
func (p *Provider) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error)
GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service.
type SamplingCache ¶
type SamplingCache map[string]map[string]*SamplingCacheEntry
SamplingCache is a nested map: service -> operation -> cache entry.
func (SamplingCache) Get ¶
func (s SamplingCache) Get(service, operation string) *SamplingCacheEntry
Get retrieves the entry for given service/operation. Returns nil if not found.
func (SamplingCache) Set ¶
func (s SamplingCache) Set(service, operation string, entry *SamplingCacheEntry)
Set adds a new entry for given service/operation.
type SamplingCacheEntry ¶
SamplingCacheEntry keeps track of the probability and whether a service-operation is observed using adaptive sampling.
type WeightVectorCache ¶
WeightVectorCache stores normalizing weight vectors of different lengths. The head of each weight vector contains the largest weight.
func NewWeightVectorCache ¶
func NewWeightVectorCache() *WeightVectorCache
NewWeightVectorCache returns a new weights vector cache.
func (*WeightVectorCache) GetWeights ¶
func (c *WeightVectorCache) GetWeights(length int) []float64
GetWeights returns weights for the specified length { w(i) = i ^ 4, i=1..L }, normalized.