Documentation ¶
Index ¶
- func AddFlags(flagSet *flag.FlagSet)
- func FloatEquals(a, b float64) bool
- func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, ...) strategystore.Aggregator
- func TruncateFloat(v float64) string
- type Factory
- func (f *Factory) AddFlags(flagSet *flag.FlagSet)
- func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error)
- func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger)
- func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, ...) error
- type Options
- type Processor
- type SamplingCache
- type SamplingCacheEntry
- type WeightVectorCache
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FloatEquals ¶
FloatEquals compares two floats with 10 decimal positions precision.
func NewAggregator ¶
func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator
NewAggregator creates a throughput aggregator that simply emits metrics about the number of operations seen over the aggregationInterval.
func TruncateFloat ¶
TruncateFloat truncates float to 6 decimal positions and converts to string.
Types ¶
type Factory ¶
type Factory struct {
// contains filtered or unexported fields
}
Factory implements strategystore.Factory for an adaptive strategy store.
func (*Factory) CreateStrategyStore ¶
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error)
CreateStrategyStore implements strategystore.Factory
func (*Factory) InitFromViper ¶
InitFromViper implements plugin.Configurable
func (*Factory) Initialize ¶
func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error
Initialize implements strategystore.Factory
type Options ¶
type Options struct { // TargetSamplesPerSecond is the global target rate of samples per operation. // TODO implement manual overrides per service/operation. TargetSamplesPerSecond float64 // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) // means that if abs((actual-expected) / expected) < 0.3, then the actual sampling rate is "close enough" // and the system does not need to send an updated sampling probability (the "control signal" u(t) // in the PID Controller terminology) to the sampler in the application. // // Increase this to reduce the amount of fluctuation in the calculated probabilities. DeltaTolerance float64 // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth // of aggregated throughput data. CalculationInterval time.Duration // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for // all operations. // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. AggregationBuckets int // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. BucketsForCalculation int // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time // we'll have [now()-12m,now()-2m] range of throughput data in memory to base the calculations // off of. This delay is necessary to counteract the rate at which the jaeger clients poll for // the latest sampling probabilities. The default client poll rate is 1 minute, which means that // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. Delay time.Duration // InitialSamplingProbability is the initial sampling probability for all new operations. InitialSamplingProbability float64 // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling // probability will be in the range [MinSamplingProbability, 1.0]. MinSamplingProbability float64 // MinSamplesPerSecond determines the min number of traces that are sampled per second. // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations // that may never be sampled by the probabilistic sampler. MinSamplesPerSecond float64 // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval // to reduce lock thrashing. LeaderLeaseRefreshInterval time.Duration // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower // (ie. failed to gain the leader lock). FollowerLeaseRefreshInterval time.Duration }
Options holds configuration for the adaptive sampling strategy store. The abbreviation SPS refers to "samples-per-second", which is the target of the optimization/control implemented by the adaptive sampling.
type Processor ¶
Processor retrieves service throughput over a look back interval and calculates sampling probabilities per operation such that each operation is sampled at a specified target QPS. It achieves this by retrieving discrete buckets of operation throughput and doing a weighted average of the throughput and generating a probability to match the targetQPS.
func NewStrategyStore ¶
func NewStrategyStore(options Options, metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) (*Processor, error)
NewStrategyStore creates a strategy store that holds adaptive sampling strategies.
func (*Processor) GetSamplingStrategy ¶
func (p *Processor) GetSamplingStrategy(_ context.Context, service string) (*sampling.SamplingStrategyResponse, error)
GetSamplingStrategy implements Thrift endpoint for retrieving sampling strategy for a service.
type SamplingCache ¶
type SamplingCache map[string]map[string]*SamplingCacheEntry
SamplingCache is a nested map: service -> operation -> cache entry.
func (SamplingCache) Get ¶
func (s SamplingCache) Get(service, operation string) *SamplingCacheEntry
Get retrieves the entry for given service/operation. Returns nil if not found.
func (SamplingCache) Set ¶
func (s SamplingCache) Set(service, operation string, entry *SamplingCacheEntry)
Set adds a new entry for given service/operation.
type SamplingCacheEntry ¶
SamplingCacheEntry keeps track of the probability and whether a service-operation is observed using adaptive sampling.
type WeightVectorCache ¶
WeightVectorCache stores normalizing weight vectors of different lengths. The head of each weight vector contains the largest weight.
func NewWeightVectorCache ¶
func NewWeightVectorCache() *WeightVectorCache
NewWeightVectorCache returns a new weights vector cache.
func (*WeightVectorCache) GetWeights ¶
func (c *WeightVectorCache) GetWeights(length int) []float64
GetWeights returns weights for the specified length { w(i) = i ^ 4, i=1..L }, normalized.