Documentation ¶
Index ¶
- Constants
- func NextBackoffDuration(backoffTimeout time.Duration) time.Duration
- func RoundSampleFactor(rnd *rand.Rand, sf float64) float64
- func SampleFactor(rnd *rand.Rand, sampleFactors map[int32]float64, metric int32) (float64, bool)
- func SampleFactorDeterministic(sampleFactors map[int32]float64, key Key, time uint32) (float64, bool)
- func SilentRPCError(err error) bool
- func TillStartOfNextSecond(now time.Time) time.Duration
- type ChUnique
- func (ch *ChUnique) Insert(val uint64)
- func (ch *ChUnique) ItemsCount() int
- func (ch *ChUnique) Marshall(dst io.Writer) error
- func (ch *ChUnique) MarshallAppend(buf []byte) []byte
- func (ch *ChUnique) MarshallAppendEstimatedSize() int
- func (ch *ChUnique) Merge(rhs ChUnique)
- func (ch *ChUnique) MergeRead(r *bytes.Buffer) error
- func (ch *ChUnique) Reset()
- func (ch *ChUnique) Size(asIs bool) uint64
- func (ch *ChUnique) UmMarshall(r *bytes.Buffer) error
- type Estimator
- type ItemValue
- func (s *ItemValue) AddCounterHost(count float64, hostTag int32)
- func (s *ItemValue) AddValue(value float64)
- func (s *ItemValue) AddValueArrayHost(values []float64, mult float64, hostTag int32)
- func (s *ItemValue) AddValueCounter(value float64, count float64)
- func (s *ItemValue) AddValueCounterHost(value float64, count float64, hostTag int32)
- func (s *ItemValue) Merge(s2 *ItemValue)
- func (s *ItemValue) MergeWithTLItem(s2 *tlstatshouse.ItemBytes, hostTag int32)
- func (s *ItemValue) MergeWithTLItem2(s2 *tlstatshouse.MultiValueBytes, fields_mask uint32, hostTag int32)
- func (s *ItemValue) TLSizeEstimate() int
- type Key
- func AggKey(t uint32, m int32, k [format.MaxTags]int32, hostTag int32, shardTag int32, ...) Key
- func KeyFromStatshouseItem(item tlstatshouse.ItemBytes, bucketTimestamp uint32) (key Key, shardID int)
- func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32) (key Key, shardID int)
- func (k *Key) ClearedKeys() Key
- func (k *Key) Hash() uint64
- func (k *Key) TLMultiItemFromKey(defaultTimestamp uint32) tlstatshouse.MultiItem
- func (k *Key) TLSizeEstimate(defaultTimestamp uint32) int
- func (k *Key) ToSlice() []int32
- func (k Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) Key
- type MappedMetricHeader
- type MetricsBucket
- type MultiItem
- func (s *MultiItem) FinishStringTop(capacity int) float64
- func (s *MultiItem) MapStringTop(str string, count float64) *MultiValue
- func (s *MultiItem) MapStringTopBytes(str []byte, count float64) *MultiValue
- func (s *MultiItem) Merge(s2 *MultiItem)
- func (s *MultiItem) MergeWithTLItem(s2 *tlstatshouse.ItemBytes, hostTag int32)
- func (s *MultiItem) MergeWithTLMultiItem(s2 *tlstatshouse.MultiItemBytes, hostTag int32)
- func (s *MultiItem) RowBinarySizeEstimate() int
- func (s *MultiItem) TLSizeEstimate() int
- type MultiValue
- func (s *MultiValue) AddCounterHost(count float64, hostTag int32)
- func (s *MultiValue) AddUniqueHost(hashes []int64, count float64, hostTag int32)
- func (s *MultiValue) AddValueArrayHostPercentile(values []float64, mult float64, hostTag int32, compression float64)
- func (s *MultiValue) AddValueCounterHost(value float64, count float64, hostTag int32)
- func (s *MultiValue) AddValueCounterHostPercentile(value float64, count float64, hostTag int32, compression float64)
- func (s *MultiValue) ApplyUnique(hashes []int64, count float64, hostTag int32)
- func (s *MultiValue) ApplyValues(values []float64, count float64, hostTag int32, compression float64, ...)
- func (s *MultiValue) Empty() bool
- func (s *MultiValue) Merge(s2 *MultiValue)
- func (s *MultiValue) MergeWithTL2(s2 *tlstatshouse.MultiValueBytes, fields_mask uint32, hostTag int32, ...)
- func (s *MultiValue) MultiValueToTL(item *tlstatshouse.MultiValue, sampleFactor float64, fieldsMask *uint32, ...)
- func (s *MultiValue) RowBinarySizeEstimate() int
- func (s *MultiValue) TLSizeEstimate() int
- type SamplingGroup
- type SamplingMetric
- type SamplingMultiItemPair
Constants ¶
View Source
const ( // Aggregator aggregates data in shards // Clients pre calculate shard ID and puts into lower byte of metricID in TL // Also clients shuffle shards, so that aggregator is less likely to stop on shard mutex // LogAggregationShardsPerSecond should be 8 (Cannot be larger, no point in being smaller) LogAggregationShardsPerSecond = 8 AggregationShardsPerSecond = 1 << LogAggregationShardsPerSecond MaxHistorySendStreams = 10 // Do not change, unless you understand how exactly new conveyor works MaxHistoryInsertBatch = 4 // Should be < MaxHistorySendStreams/2 so that we have enough historic buckets received when we finish previous insert HistoryInsertBodySizeLimit = 1 << 20 // We will batch several historic buckets together if they fit in this limit MaxLivenessResponsesWindowLength = 60 MaxHistoricBucketsMemorySize = 50 << 20 MaxUncompressedBucketSize = 10 << 20 // limits memory for uncompressed data buffer in aggregator, Still dangerous MinStringTopCapacity = 20 MinStringTopSend = 5 MinStringTopInsert = 5 AggregatorStringTopCapacity = 1000 // big, but reasonable AgentPercentileCompression = 40 // TODO - will typically have 20-30 centroids for compression 40 AggregatorPercentileCompression = 80 // TODO - clickhouse has compression of 256 by default MaxShortWindow = 5 // Must be >= 2, 5 seconds to send recent data, if too late - send as historic FutureWindow = 2 // Allow a couple of seconds clocks difference on clients MaxHistoricWindow = 86400 // 1 day to send historic data, then drop MaxHistoricWindowLag = 100 // Clients try to delete old data themselves, we allow some lag for those who already sent us data BelieveTimestampWindow = 3600 * 3 / 2 // Hour plus some margin, for crons running once per hour MinCardinalityWindow = 300 // Our estimators GC depends on this not being too small MinMaxCardinality = 100 InsertBudgetFixed = 50000 AgentAggregatorDelay = 5 // Typical max InsertDelay = 10 // Typical max MaxConveyorDelay = MaxShortWindow + FutureWindow + InsertDelay + AgentAggregatorDelay MaxMissedSecondsIntoContributors = 60 // If source sends more MissedSeconds, they will be truncated. Do not make large. We scan 4 arrays of this size on each insert. ClientRPCPongTimeout = 30 * time.Second AgentMappingTimeout1 = 10 * time.Second AgentMappingTimeout2 = 30 * time.Second AutoConfigTimeout = 30 * time.Second MaxJournalItemsSent = 1000 // TODO - increase, but limit response size in bytes MaxJournalBytesSent = 800 * 1024 ClickHouseTimeoutConfig = time.Second * 10 // either quickly autoconfig or quickly exit ClickhouseConfigRetries = 5 ClickHouseTimeout = time.Minute ClickHouseTimeoutHistoric = 5 * time.Minute // reduces chance of duplicates via historic conveyor NoHistoricBucketsDelay = 3 * time.Second ClickHouseErrorDelay = 10 * time.Second KeepAliveMaxBackoff = 30 * time.Second // for cases when aggregators quickly return error JournalDDOSProtectionTimeout = 50 * time.Millisecond InternalLogInsertInterval = 5 * time.Second RPCErrorMissedRecentConveyor = -5001 // just send again through historic RPCErrorInsertRecentConveyor = -5002 // just send again through historic RPCErrorInsertHistoricConveyor = -5003 // just send again after delay RPCErrorNoAutoCreate = -5004 // just live with it, this is normal RPCErrorTerminateLongpoll = -5005 // we terminate long polls because rpc.Server does not inform us about hctx disconnects. TODO - remove as soon as Server is updated JournalDiskNamespace = "metric_journal_v5:" TagValueDiskNamespace = "tag_value_v3:" TagValueInvertDiskNamespace = "tag_value_invert_v3:" BootstrapDiskNamespace = "bootstrap:" // stored in aggregator only MappingMaxMetricsInQueue = 1000 MappingMaxMemCacheSize = 2_000_000 MappingCacheTTLMinimum = 7 * 24 * time.Hour MappingNegativeCacheTTL = 5 * time.Second MappingMinInterval = 1 * time.Millisecond SimulatorMetricPrefix = "simulator_metric_" )
View Source
const DefaultStringTopCapacity = 100 // if capacity is 0, this one will be used instead
Variables ¶
This section is empty.
Functions ¶
func SampleFactor ¶
This function will be used in second sampling pass to fit all saved data in predefined budget
func SampleFactorDeterministic ¶
func SampleFactorDeterministic(sampleFactors map[int32]float64, key Key, time uint32) (float64, bool)
This function assumes structure of hour table with time = toStartOfHour(time) This turned out bad idea, so we do not use it anywhere now
func SilentRPCError ¶
those can seriously fill our logs, we want to avoid it in a consistent manner
Types ¶
type ChUnique ¶
type ChUnique struct {
// contains filtered or unexported fields
}
func (*ChUnique) ItemsCount ¶
of non-zero items in array + int(hasZeroItem) ¶
func (*ChUnique) MarshallAppend ¶
Saves to clickhouse format for inserting using RowBinary or other binary format
func (*ChUnique) MarshallAppendEstimatedSize ¶
type Estimator ¶
type Estimator struct {
// contains filtered or unexported fields
}
func (*Estimator) GarbageCollect ¶
func (*Estimator) ReportHourCardinality ¶
func (*Estimator) UpdateWithKeys ¶
type ItemValue ¶
type ItemValue struct { Counter float64 ValueMin, ValueMax, ValueSum float64 // Aggregates of Value ValueSumSquare float64 // Aggregates of Value MinHostTag, MaxHostTag int32 // Mapped hostname responsible for ValueMin and ValueMax. MinHostTag is not inserted in clickhouse for now ValueSet bool // first value is assigned to Min&Max }
func (*ItemValue) AddCounterHost ¶
func (*ItemValue) AddValueArrayHost ¶
func (*ItemValue) AddValueCounter ¶
func (*ItemValue) AddValueCounterHost ¶
func (*ItemValue) MergeWithTLItem ¶
func (s *ItemValue) MergeWithTLItem(s2 *tlstatshouse.ItemBytes, hostTag int32)
func (*ItemValue) MergeWithTLItem2 ¶
func (s *ItemValue) MergeWithTLItem2(s2 *tlstatshouse.MultiValueBytes, fields_mask uint32, hostTag int32)
func (*ItemValue) TLSizeEstimate ¶
type Key ¶
type Key struct { Timestamp uint32 Metric int32 Keys [format.MaxTags]int32 // Unused keys are set to special 0-value }
Time Series Key, will be optimized to single human-readable string
func KeyFromStatshouseItem ¶
func KeyFromStatshouseItem(item tlstatshouse.ItemBytes, bucketTimestamp uint32) (key Key, shardID int)
func KeyFromStatshouseMultiItem ¶
func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32) (key Key, shardID int)
func (*Key) ClearedKeys ¶
func (*Key) TLMultiItemFromKey ¶
func (k *Key) TLMultiItemFromKey(defaultTimestamp uint32) tlstatshouse.MultiItem
func (*Key) TLSizeEstimate ¶
type MappedMetricHeader ¶
type MappedMetricHeader struct { ReceiveTime time.Time // Saved at mapping start and used where we need time.Now. This is different to MetricBatch.T, which is sent by clients MetricInfo *format.MetricMetaValue Key Key SValue []byte // reference to memory inside tlstatshouse.MetricBytes. HostTag int32 CheckedTagIndex int // we check tags one by one, remembering position here, between invocations of mapTags ValuesChecked bool // infs, nans, etc. This might be expensive, so done only once IsKeySet [format.MaxTags]bool // report setting keys more than once. IsSKeySet bool IsHKeySet bool // errors below IngestionStatus int32 // if error happens, this will be != 0. errors are in fast path, so there must be no allocations InvalidString []byte // reference to memory inside tlstatshouse.MetricBytes. If more than 1 problem, reports the last one IngestionTagKey int32 // +TagIDShift, as required by "tag_id" in builtin metric. Contains error tad ID for IngestionStatus != 0, or any tag which caused uncached load IngestionStatus == 0 // warnings below NotFoundTagName []byte // reference to memory inside tlstatshouse.MetricBytes. If more than 1 problem, reports the last one TagSetTwiceKey int32 // +TagIDShift, as required by "tag_id" in builtin metric. If more than 1, remembers some LegacyCanonicalTagKey int32 // +TagIDShift, as required by "tag_id" in builtin metric. If more than 1, remembers some }
func (*MappedMetricHeader) SetInvalidString ¶
func (h *MappedMetricHeader) SetInvalidString(ingestionStatus int32, tagIDKey int32, invalidString []byte)
type MetricsBucket ¶
func (*MetricsBucket) Empty ¶
func (b *MetricsBucket) Empty() bool
type MultiItem ¶
type MultiItem struct { Top map[string]*MultiValue Tail MultiValue // elements not in top are collected here Capacity int // algorithm supports changing on the fly, <2 means DefaultStringTopCapacity SF float64 // set when Marshalling/Sampling // contains filtered or unexported fields }
All our items are technically string tops, but most have empty Top map
func MapKeyItemMultiItem ¶
func (*MultiItem) FinishStringTop ¶
func (*MultiItem) MapStringTop ¶
func (s *MultiItem) MapStringTop(str string, count float64) *MultiValue
func (*MultiItem) MapStringTopBytes ¶
func (s *MultiItem) MapStringTopBytes(str []byte, count float64) *MultiValue
func (*MultiItem) MergeWithTLItem ¶
func (s *MultiItem) MergeWithTLItem(s2 *tlstatshouse.ItemBytes, hostTag int32)
func (*MultiItem) MergeWithTLMultiItem ¶
func (s *MultiItem) MergeWithTLMultiItem(s2 *tlstatshouse.MultiItemBytes, hostTag int32)
func (*MultiItem) RowBinarySizeEstimate ¶
func (*MultiItem) TLSizeEstimate ¶
type MultiValue ¶
type MultiValue struct { Value ItemValue ValueTDigest *tdigest.TDigest // We do not create it until we have at least 1 value to add HLL ChUnique }
func (*MultiValue) AddCounterHost ¶
func (s *MultiValue) AddCounterHost(count float64, hostTag int32)
func (*MultiValue) AddUniqueHost ¶
func (s *MultiValue) AddUniqueHost(hashes []int64, count float64, hostTag int32)
func (*MultiValue) AddValueArrayHostPercentile ¶
func (s *MultiValue) AddValueArrayHostPercentile(values []float64, mult float64, hostTag int32, compression float64)
func (*MultiValue) AddValueCounterHost ¶
func (s *MultiValue) AddValueCounterHost(value float64, count float64, hostTag int32)
func (*MultiValue) AddValueCounterHostPercentile ¶
func (s *MultiValue) AddValueCounterHostPercentile(value float64, count float64, hostTag int32, compression float64)
func (*MultiValue) ApplyUnique ¶
func (s *MultiValue) ApplyUnique(hashes []int64, count float64, hostTag int32)
func (*MultiValue) ApplyValues ¶
func (*MultiValue) Empty ¶
func (s *MultiValue) Empty() bool
TODO - deal with empty items. Do not allow them to exist?
func (*MultiValue) Merge ¶
func (s *MultiValue) Merge(s2 *MultiValue)
func (*MultiValue) MergeWithTL2 ¶
func (s *MultiValue) MergeWithTL2(s2 *tlstatshouse.MultiValueBytes, fields_mask uint32, hostTag int32, compression float64)
func (*MultiValue) MultiValueToTL ¶
func (s *MultiValue) MultiValueToTL(item *tlstatshouse.MultiValue, sampleFactor float64, fieldsMask *uint32, marshalBuf *[]byte)
func (*MultiValue) RowBinarySizeEstimate ¶
func (s *MultiValue) RowBinarySizeEstimate() int
func (*MultiValue) TLSizeEstimate ¶
func (s *MultiValue) TLSizeEstimate() int
type SamplingGroup ¶
type SamplingGroup struct { GroupID int32 GroupWeight int64 // actually, effective weight MetricList []*SamplingMetric SumMetricWeight int64 SumSize int64 }
we have additional sampling stage for groups of metrics
type SamplingMetric ¶
type SamplingMetric struct { MetricID int32 MetricWeight int64 // actually, effective weight RoundFactors bool Group *SamplingGroup // never nil when running group sampling algorithm, nil for sampling without groups SumSize int64 Items []SamplingMultiItemPair }
type SamplingMultiItemPair ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
constants
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
internal
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
tl
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
tlengine
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
tlmetadata
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
tlnet
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
tlstatshouse
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
tlstatshouseApi
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
|
Code generated by vktl/cmd/tlgen2; DO NOT EDIT. |
Click to show internal directories.
Click to hide internal directories.