atlassiansamplingprocessor

package module
v0.0.0-...-5f22753 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

README

Atlassian Sampling Processor

This is a tail sampling processor. Small parts of the code are copied and modified from the collector's tailsamplingprocessor.

Open Source

This component is open source, however the source of truth is the closed source version. The open source version is periodically synced with closed source version. External contributors are still welcome.

Any code which cannot be consumed by the wider community, being atlassian specific, will be rejected.

Detailed Documentation and Design Rationale

Detailed design documentation for this processor can be found in DESIGN.md.

Contrasting to Collector Contrib's tail sampling processor

This processor takes quite a different approach to the collector-contrib's tail sampling processor. Some key differences are noted here.

  • Makes quick decisions when possible (doesn't wait a specified time period to make a decision on a trace). This is possible via mandatory decision caches, so when subsequent spans arrive the correct decision is made on them. Additionally, it allows us to identify "garbage" quickly and occasionally take a hardline "not sampled" stance before the whole trace arrives. For example, a root span with no other spans in the trace can be dropped immediately instead of waiting. These can also be marked as "low priority" and put in the secondary cache which is evicted more quickly (see below for more).
  • Policy evaluations follow a strict order, and the first definitive decision is used. This allows one policy to take priority over another. Does not have concept of "inverting" a decision, but does have a policy that can downgrade a "Sampled" decision to another decision.
  • Supports horizontal scaling. Can flush it's cached data on shutdown.
  • Can optionally compress spans in-memory, saving memory at the cost of processing time and CPU usage.
  • Optionally variable cache size. Num traces kept in memory can automatically self-adjust if heap usage exceeds target. Keeps memory usage constant and squeezes most memory possible out of resource.
  • Allows remote sampling percentage configuration, via an optional extension that implement a RateGetter interface. The extension is not provided in open source.

Config

primary_cache_size

The amount traces with non-low priority that are held in the primary internal LRU cache. When this value reaches max, the least-recently-used trace is evicted and considered as "not sampled".

The primary_cache_size value should be greater than 0, and should be set to a value that is appropriate for the trace volume.

primary_cache_size is the most important value to tune in terms of memory usage. Keep an eye on the processor_atlassian_sampling_trace_eviction_time metric to tune how long you would like your traces to stay pending in memory before being considered not-sampled.

The primary cache size is initially set to 80% of the primary_cache_size value. It is automatically adjusted depending on heap memory usage at runtime, but will not exceed the primary_cache_size value.

secondary_cache_size

The amount of traces with low priority that are held in the secondary internal LRU cache. When this value reaches max, the least-recently-used trace is evicted and considered as "not sampled".

The secondary_cache_size value should be less than 50% of primary_cache_size.

If left at 0, there will be no secondary cache, and only the primary cache will be used.

The default value is 0.

Note: It will overwrite any entries of the same key in either the primary or secondary cache to prevent a key appearing in both primary and secondary. If the caller wants to promote an existing key from secondary to primary, they can Put with non-low priority.

decision_cache

This provides two size config options, sampled_cache_size and non_sampled_cache_size. These values configure the size of the decision caches. The decision caches hold a set of trace IDs that have been sampled or not sampled respectively. This allows a shortcut to the evaluation of newly arriving spans, if a decision for their trace has already been made.

It is recommended for this value to be at least an order of magnitude higher than max_traces, since the internal memory usage is much lower (it only stores the trace ID, not all the span data). So, it's valuable to hold on to the decision for a longer time than you hold onto any trace data, in case there are any late-arriving spans.

Keep an eye on processor_atlassian_sampling_decision_eviction_time to make sure that decisions are lasting an appropriate amount of time.

flush_on_shutdown

This is false by default. When set to true, the Shutdown() of the component causes all traces that are pending in the trace data cache to be flushed to the next consumer. This is to prevent data loss when a node running this component shuts down.

Before being flushed, a resource attribute atlassiansampling.flushes is added to the resource spans. This enables the downstream components to detect which resource spans have been flushed due to shut down, and routing them accordingly, for example using the routingconnector. Additionally, the attribute counts how many times that it's been flushed and re-ingested, enabling the detection of infinite cycling.

compression_enabled

If this is enabled, trace data stored in the primary and secondary caches are marshalled and compressed using the Snappy algorithm, and decompressed once a sampling decision is made. The default value is false.

policies

policies is a list of policies, which configure how the sampling decisions are evaluated against the incoming data.

The order of the policies is important, as the first one that matches a non-pending decision will be used as the final decision.

Policies include a name, type, and then further configuration depending on what the type was.

Current supported policy types are:

  • span_count - samples the trace if it meets a minimum amount of spans.
  • probabilistic - evaluates the hash of the trace ID to a configured percentage.
  • remote_probabilistic - fetches sampling rates using the specified rate getter at runtime and samples traces based on the fetched sampling rate. The RateGetter interface can be implemented by a custom extension, and the ID of the component can be provided in the configuration.
  • and - combines any number of sampling policies together.
  • root_spans - specifies a sub-policy to only operate on lone-root-spans, but eagerly converts the sub-policy "pending" decisions into "not sampled" decisions. A span considered to be "lone" if there is no other spans present for the same trace when it arrives, and it is considered to be a root span if it has no parent ID, or has a parent ID equal to the right 64-bits of the trace ID.
  • latency - samples traces with duration equal to or greater than threshold_ms. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between.
  • status_code - samples based upon the status code (OK, ERROR or UNSET)
  • ottl_condition - samples based on given boolean OTTL condition (span and span event).
  • downgrader - downgrades any "Sampled" decision from the sub_policy, to what is specified in downgrade_to.
  • threshold - inspects span attribute sampling.tail.threshold, and makes a Sampled decision if the numerical value of the attribute is larger than the rightmost 56-bits of the trace ID. The attribute takes the string form "0x[0-9a-fA-F]{1,14}". If the numerical part of the attribute is less than 14 digits (56-bits) long, it will be right-padded with zeroes as per OTEP-235.
Example

For a full example of multiple sampling policies being configured, see the example test file.

Metrics

Metrics emitted from this component are documented in documentation.md.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory

func NewFactory() processor.Factory

Types

type AndConfig

type AndConfig struct {
	SubPolicyCfg []AndSubPolicyConfig `mapstructure:"and_sub_policy"`
}

AndConfig holds the common configuration to all and policies.

type AndSubPolicyConfig

type AndSubPolicyConfig struct {
	SharedPolicyConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

AndSubPolicyConfig holds the common configuration to all policies under and policy.

type Config

type Config struct {
	// PolicyConfig sets the tail-based sampling policy which makes a sampling decision
	// for a given trace when requested.
	PolicyConfig []PolicyConfig `mapstructure:"policies"`

	// TargetHeapBytes, is the optional target heap size runtime.MemStats.HeapAlloc.
	// If set, the processor may adjust cache sizes dynamically in order to keep within the target.
	// A good starting point to set this is about 75% of overall memory resource allocation.
	TargetHeapBytes uint64 `mapstructure:"target_heap_bytes"`

	// PrimaryCacheSize sets the initial and maximum size of the primary cache that holds non-low priority traces.
	PrimaryCacheSize int `mapstructure:"primary_cache_size"`

	// SecondaryCacheSize defaults to 10% of the primary cache size.
	// It should not more than 50% of the primary cache size
	SecondaryCacheSize int `mapstructure:"secondary_cache_size"`

	DecisionCacheCfg `mapstructure:"decision_cache"`

	// FlushOnShutdown determines whether to flush the pending/cached trace data upon shutdown.
	FlushOnShutdown bool `mapstructure:"flush_on_shutdown"`

	// CompressionEnabled compresses trace data in the primary and secondary caches if enabled
	CompressionEnabled bool `mapstructure:"compression_enabled"`
}

func (*Config) Validate

func (cfg *Config) Validate() (errors error)

type DecisionCacheCfg

type DecisionCacheCfg struct {
	// SampledCacheSize specifies the size of the cache that holds the sampled trace IDs.
	// This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs.
	// For effective use, this value should be at least an order of magnitude higher than Config.MaxTraces.
	// By default, 10x the Config.MaxTraces value will be used.
	SampledCacheSize int `mapstructure:"sampled_cache_size"`
	// NonSampledCacheSize specifies the size of the cache that holds the non-sampled trace IDs.
	// This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs.
	// For effective use, this value should be at least an order of magnitude higher than Config.MaxTraces.
	// By default, 10x the Config.MaxTraces value will be used.
	NonSampledCacheSize int `mapstructure:"non_sampled_cache_size"`
}

type DowngraderConfig

type DowngraderConfig struct {
	DowngradeTo  string             `mapstructure:"downgrade_to"`
	SubPolicyCfg SharedPolicyConfig `mapstructure:"sub_policy"`
}

type LatencyConfig

type LatencyConfig struct {
	// Lower bound in milliseconds
	ThresholdMs int64 `mapstructure:"threshold_ms"`
}

LatencyConfig holds the configurable settings to create a latency filter sampling policy evaluator

type OTTLConditionConfig

type OTTLConditionConfig struct {
	ErrorMode           ottl.ErrorMode `mapstructure:"error_mode"`
	SpanConditions      []string       `mapstructure:"span"`
	SpanEventConditions []string       `mapstructure:"spanevent"`
}

OTTLConditionConfig holds the configurable setting to create a OTTL condition filter sampling policy evaluator.

type PolicyConfig

type PolicyConfig struct {
	SharedPolicyConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

	// Configs for defining and policy
	AndConfig AndConfig `mapstructure:"and"`

	RootSpansConfig RootSpansConfig `mapstructure:"root_spans"`

	DowngraderConfig DowngraderConfig `mapstructure:"downgrader"`
}

PolicyConfig holds the common configuration to all policies.

type PolicyType

type PolicyType string

PolicyType indicates the type of sampling policy.

const (
	// Probabilistic samples a given percentage of traces.
	Probabilistic PolicyType = "probabilistic"
	// And allows defining a policy, combining the other policies in one
	And PolicyType = "and"
	// Downgrader downgrades a sampled decision
	Downgrader PolicyType = "downgrader"
	// SpanCount sample traces that are have more spans per Trace than a given threshold.
	SpanCount PolicyType = "span_count"
	// RootSpans allows a sub-policy to be defined, and operates the sub-policy only on root spans with no children.
	RootSpans PolicyType = "root_spans"
	// Latency sample traces that are longer than a given threshold.
	Latency PolicyType = "latency"
	// StatusCode sample traces that have a given status code.
	StatusCode PolicyType = "status_code"
	// OTTLCondition sample traces which match user provided OpenTelemetry Transformation Language
	// conditions.
	OTTLCondition PolicyType = "ottl_condition"
	// Threshold retrieves the threshold from sampling.tail.threshold attribute.
	// It compares the threshold to the trace ID.
	Threshold PolicyType = "threshold"
	// RemoteProbabilistic fetches the sampling rate and samples traces based on the returned rate at runtime.
	RemoteProbabilistic PolicyType = "remote_probabilistic"
)

type ProbabilisticConfig

type ProbabilisticConfig struct {
	// HashSalt allows one to configure the hashing salts. This is important in scenarios where multiple layers of collectors
	// have different sampling rates: if they use the same salt all passing one layer may pass the other even if they have
	// different sampling rates, configuring different salts avoids that.
	HashSalt string `mapstructure:"hash_salt"`
	// SamplingPercentage is the percentage rate at which traces are going to be sampled. Defaults to zero, i.e.: no sample.
	// Values greater or equal 100 are treated as "sample all traces".
	SamplingPercentage float64 `mapstructure:"sampling_percentage"`
}

ProbabilisticConfig holds the configurable settings to create a probabilistic sampling policy evaluator.

type RemoteProbabilisticConfig

type RemoteProbabilisticConfig struct {
	// HashSalt allows one to configure the hashing salts. This is important in scenarios where multiple layers of collectors
	// have different sampling rates: if they use the same salt all passing one layer may pass the other even if they have
	// different sampling rates, configuring different salts avoids that.
	HashSalt string `mapstructure:"hash_salt"`
	// DefaultRate is the default rate at which traces are going to be sampled if there is an error from the specified rate getter.
	// Defaults to zero, i.e.: no sample. Values greater or equal 100 are treated as "sample all traces".
	DefaultRate float64 `mapstructure:"default_rate"`
	// RateGetterExt is the component id of the rate getter extension to use to fetch the sampling rate at runtime.
	// The extension must implement the RateGetter interface.
	RateGetterExt component.ID `mapstructure:"rate_getter"`
}

RemoteProbabilisticConfig holds the configurable settings to create a remote probabilistic sampling policy evaluator.

type RootSpansConfig

type RootSpansConfig struct {
	SubPolicyCfg SharedPolicyConfig `mapstructure:"sub_policy"`
}

type SharedPolicyConfig

type SharedPolicyConfig struct {
	// Name given to the instance of the policy to make easy to identify it in metrics and logs.
	Name string `mapstructure:"name"`
	// Type of the policy this will be used to match the proper configuration of the policy.
	Type PolicyType `mapstructure:"type"`
	// Configs for probabilistic sampling policy evaluator.
	ProbabilisticConfig `mapstructure:"probabilistic"`
	// Configs for span count filter sampling policy evaluator.
	SpanCountConfig `mapstructure:"span_count"`
	// Configs for latency filter sampling policy evaluator.
	LatencyConfig `mapstructure:"latency"`
	// Configs for status code filter sampling policy evaluator.
	StatusCodeConfig `mapstructure:"status_code"`
	// Configs for OTTL condition filter sampling policy evaluator
	OTTLConditionConfig `mapstructure:"ottl_condition"`
	// Configs for remote probabilistic sampling policy evaluator
	RemoteProbabilisticConfig `mapstructure:"remote_probabilistic"`
}

SharedPolicyConfig holds the common configuration to all policies that are used in derivative policy configurations such as the "and" policy.

type SpanCountConfig

type SpanCountConfig struct {
	// MinSpans is the minimum number of spans in a Trace for it to be sampled
	MinSpans int32 `mapstructure:"min_spans"`
	// LogSampled indicates whether to emit a log when a trace is sampled.
	// It will log trace ID, services in trace, and span count.
	LogSampled bool `mapstructure:"log_sampled"`
}

SpanCountConfig holds the configurable settings to create a Span Count filter sampling policy evaluator

type StatusCodeConfig

type StatusCodeConfig struct {
	StatusCodes []string `mapstructure:"status_codes"`
}

StatusCodeConfig holds the configurable settings to create a status code filter sampling policy evaluator.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL