data_model

package
v1.0.0-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2022 License: MPL-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Aggregator aggregates data in shards
	// Clients pre calculate shard ID and puts into lower byte of metricID in TL
	// Also clients shuffle shards, so that aggregator is less likely to stop on shard mutex
	// LogAggregationShardsPerSecond should be 8 (Cannot be larger, no point in being smaller)
	LogAggregationShardsPerSecond = 8
	AggregationShardsPerSecond    = 1 << LogAggregationShardsPerSecond

	MaxHistorySendStreams      = 10      // Do not change, unless you understand how exactly new conveyor works
	MaxHistoryInsertBatch      = 4       // Should be < MaxHistorySendStreams/2 so that we have enough historic buckets received when we finish previous insert
	HistoryInsertBodySizeLimit = 1 << 20 // We will batch several historic buckets together if they fit in this limit

	MaxLivenessResponsesWindowLength = 60
	MaxHistoricBucketsMemorySize     = 50 << 20

	MaxUncompressedBucketSize = 10 << 20 // limits memory for uncompressed data buffer in aggregator, Still dangerous

	MinStringTopCapacity        = 20
	MinStringTopSend            = 5
	MinStringTopInsert          = 5
	AggregatorStringTopCapacity = 1000 // big, but reasonable

	AgentPercentileCompression      = 40 // TODO - will typically have 20-30 centroids for compression 40
	AggregatorPercentileCompression = 80 // TODO - clickhouse has compression of 256 by default

	MaxShortWindow       = 5     // Must be >= 2, 5 seconds to send recent data, if too late - send as historic
	FutureWindow         = 2     // Allow a couple of seconds clocks difference on clients
	MaxHistoricWindow    = 86400 // 1 day to send historic data, then drop
	MaxHistoricWindowLag = 100   // Clients try to delete old data themselves, we allow some lag for those who already sent us data

	BelieveTimestampWindow = 3600 * 3 / 2 // Hour plus some margin, for crons running once per hour

	MinCardinalityWindow = 300 // Our estimators GC depends on this not being too small
	MinMaxCardinality    = 100

	InsertBudgetFixed = 50000

	AgentAggregatorDelay = 5  // Typical max
	InsertDelay          = 10 // Typical max

	MaxConveyorDelay = MaxShortWindow + FutureWindow + InsertDelay + AgentAggregatorDelay

	MaxMissedSecondsIntoContributors = 60 // If source sends more MissedSeconds, they will be truncated. Do not make large. We scan 4 arrays of this size on each insert.

	ClientRPCPongTimeout = 30 * time.Second

	AgentMappingTimeout1 = 10 * time.Second
	AgentMappingTimeout2 = 30 * time.Second
	AutoConfigTimeout    = 30 * time.Second

	MaxJournalItemsSent = 1000 // TODO - increase, but limit response size in bytes
	MaxJournalBytesSent = 800 * 1024

	ClickHouseTimeoutConfig   = time.Second * 10 // either quickly autoconfig or quickly exit
	ClickhouseConfigRetries   = 5
	ClickHouseTimeout         = time.Minute
	ClickHouseTimeoutHistoric = 5 * time.Minute // reduces chance of duplicates via historic conveyor
	NoHistoricBucketsDelay    = 3 * time.Second
	ClickHouseErrorDelay      = 10 * time.Second

	KeepAliveMaxBackoff          = 30 * time.Second // for cases when aggregators quickly return error
	JournalDDOSProtectionTimeout = 50 * time.Millisecond

	InternalLogInsertInterval = 5 * time.Second

	RPCErrorMissedRecentConveyor   = -5001 // just send again through historic
	RPCErrorInsertRecentConveyor   = -5002 // just send again through historic
	RPCErrorInsertHistoricConveyor = -5003 // just send again after delay
	RPCErrorNoAutoCreate           = -5004 // just live with it, this is normal
	RPCErrorTerminateLongpoll      = -5005 // we terminate long polls because rpc.Server does not inform us about hctx disconnects. TODO - remove as soon as Server is updated

	JournalDiskNamespace        = "metric_journal_v5:"
	TagValueDiskNamespace       = "tag_value_v3:"
	TagValueInvertDiskNamespace = "tag_value_invert_v3:"
	BootstrapDiskNamespace      = "bootstrap:" // stored in aggregator only

	MappingMaxMetricsInQueue = 1000
	MappingMaxMemCacheSize   = 2_000_000
	MappingCacheTTLMinimum   = 7 * 24 * time.Hour
	MappingNegativeCacheTTL  = 5 * time.Second
	MappingMinInterval       = 1 * time.Millisecond

	SimulatorMetricPrefix = "simulator_metric_"
)
View Source
const DefaultStringTopCapacity = 100 // if capacity is 0, this one will be used instead

Variables

This section is empty.

Functions

func NextBackoffDuration

func NextBackoffDuration(backoffTimeout time.Duration) time.Duration

func RoundSampleFactor

func RoundSampleFactor(rnd *rand.Rand, sf float64) float64

func SampleFactor

func SampleFactor(rnd *rand.Rand, sampleFactors map[int32]float64, metric int32) (float64, bool)

This function will be used in second sampling pass to fit all saved data in predefined budget

func SampleFactorDeterministic

func SampleFactorDeterministic(sampleFactors map[int32]float64, key Key, time uint32) (float64, bool)

This function assumes structure of hour table with time = toStartOfHour(time) This turned out bad idea, so we do not use it anywhere now

func SilentRPCError

func SilentRPCError(err error) bool

those can seriously fill our logs, we want to avoid it in a consistent manner

func TillStartOfNextSecond

func TillStartOfNextSecond(now time.Time) time.Duration

Types

type ChUnique

type ChUnique struct {
	// contains filtered or unexported fields
}

func (*ChUnique) Insert

func (ch *ChUnique) Insert(val uint64)

func (*ChUnique) ItemsCount

func (ch *ChUnique) ItemsCount() int

of non-zero items in array + int(hasZeroItem)

func (*ChUnique) Marshall

func (ch *ChUnique) Marshall(dst io.Writer) error

func (*ChUnique) MarshallAppend

func (ch *ChUnique) MarshallAppend(buf []byte) []byte

Saves to clickhouse format for inserting using RowBinary or other binary format

func (*ChUnique) MarshallAppendEstimatedSize

func (ch *ChUnique) MarshallAppendEstimatedSize() int

func (*ChUnique) Merge

func (ch *ChUnique) Merge(rhs ChUnique)

func (*ChUnique) MergeRead

func (ch *ChUnique) MergeRead(r *bytes.Buffer) error

func (*ChUnique) Reset

func (ch *ChUnique) Reset()

func (*ChUnique) Size

func (ch *ChUnique) Size(asIs bool) uint64

func (*ChUnique) UmMarshall

func (ch *ChUnique) UmMarshall(r *bytes.Buffer) error

type Estimator

type Estimator struct {
	// contains filtered or unexported fields
}

func (*Estimator) GarbageCollect

func (e *Estimator) GarbageCollect(oldestTime uint32)

func (*Estimator) Init

func (e *Estimator) Init(window int, maxCardinality int)

Will cause divide by 0 if forgotten

func (*Estimator) ReportHourCardinality

func (e *Estimator) ReportHourCardinality(time uint32, usedMetrics map[int32]struct{}, builtInStat *map[Key]*MultiItem, aggregatorHost int32, shardKey int32, replicaKey int32, numShards int)

func (*Estimator) UpdateWithKeys

func (e *Estimator) UpdateWithKeys(time uint32, keys []Key)

type ItemValue

type ItemValue struct {
	Counter                      float64
	ValueMin, ValueMax, ValueSum float64 // Aggregates of Value
	ValueSumSquare               float64 // Aggregates of Value
	MinHostTag, MaxHostTag       int32   // Mapped hostname responsible for ValueMin and ValueMax. MinHostTag is not inserted in clickhouse for now
	ValueSet                     bool    // first value is assigned to Min&Max
}

func SimpleItemValue

func SimpleItemValue(value float64, count float64, hostTag int32) ItemValue

func (*ItemValue) AddCounterHost

func (s *ItemValue) AddCounterHost(count float64, hostTag int32)

func (*ItemValue) AddValue

func (s *ItemValue) AddValue(value float64)

func (*ItemValue) AddValueArrayHost

func (s *ItemValue) AddValueArrayHost(values []float64, mult float64, hostTag int32)

func (*ItemValue) AddValueCounter

func (s *ItemValue) AddValueCounter(value float64, count float64)

func (*ItemValue) AddValueCounterHost

func (s *ItemValue) AddValueCounterHost(value float64, count float64, hostTag int32)

func (*ItemValue) Merge

func (s *ItemValue) Merge(s2 *ItemValue)

func (*ItemValue) MergeWithTLItem

func (s *ItemValue) MergeWithTLItem(s2 *tlstatshouse.ItemBytes, hostTag int32)

func (*ItemValue) MergeWithTLItem2

func (s *ItemValue) MergeWithTLItem2(s2 *tlstatshouse.MultiValueBytes, fields_mask uint32, hostTag int32)

func (*ItemValue) TLSizeEstimate

func (s *ItemValue) TLSizeEstimate() int

type Key

type Key struct {
	Timestamp uint32
	Metric    int32
	Keys      [format.MaxTags]int32 // Unused keys are set to special 0-value
}

Time Series Key, will be optimized to single human-readable string

func AggKey

func AggKey(t uint32, m int32, k [format.MaxTags]int32, hostTag int32, shardTag int32, replicaTag int32) Key

func KeyFromStatshouseItem

func KeyFromStatshouseItem(item tlstatshouse.ItemBytes, bucketTimestamp uint32) (key Key, shardID int)

func KeyFromStatshouseMultiItem

func KeyFromStatshouseMultiItem(item *tlstatshouse.MultiItemBytes, bucketTimestamp uint32) (key Key, shardID int)

func (*Key) ClearedKeys

func (k *Key) ClearedKeys() Key

func (*Key) Hash

func (k *Key) Hash() uint64

func (*Key) TLMultiItemFromKey

func (k *Key) TLMultiItemFromKey(defaultTimestamp uint32) tlstatshouse.MultiItem

func (*Key) TLSizeEstimate

func (k *Key) TLSizeEstimate(defaultTimestamp uint32) int

func (*Key) ToSlice

func (k *Key) ToSlice() []int32

func (Key) WithAgentEnvRouteArch

func (k Key) WithAgentEnvRouteArch(agentEnvTag int32, routeTag int32, buildArchTag int32) Key

type MappedMetricHeader

type MappedMetricHeader struct {
	ReceiveTime time.Time // Saved at mapping start and used where we need time.Now. This is different to MetricBatch.T, which is sent by clients
	MetricInfo  *format.MetricMetaValue
	Key         Key
	SValue      []byte // reference to memory inside tlstatshouse.MetricBytes.
	HostTag     int32

	CheckedTagIndex int  // we check tags one by one, remembering position here, between invocations of mapTags
	ValuesChecked   bool // infs, nans, etc. This might be expensive, so done only once

	IsKeySet  [format.MaxTags]bool // report setting keys more than once.
	IsSKeySet bool
	IsHKeySet bool

	// errors below
	IngestionStatus int32  // if error happens, this will be != 0. errors are in fast path, so there must be no allocations
	InvalidString   []byte // reference to memory inside tlstatshouse.MetricBytes. If more than 1 problem, reports the last one
	IngestionTagKey int32  // +TagIDShift, as required by "tag_id" in builtin metric. Contains error tad ID for IngestionStatus != 0, or any tag which caused uncached load IngestionStatus == 0

	// warnings below
	NotFoundTagName       []byte // reference to memory inside tlstatshouse.MetricBytes. If more than 1 problem, reports the last one
	TagSetTwiceKey        int32  // +TagIDShift, as required by "tag_id" in builtin metric. If more than 1, remembers some
	LegacyCanonicalTagKey int32  // +TagIDShift, as required by "tag_id" in builtin metric. If more than 1, remembers some
}

func (*MappedMetricHeader) SetInvalidString

func (h *MappedMetricHeader) SetInvalidString(ingestionStatus int32, tagIDKey int32, invalidString []byte)

func (*MappedMetricHeader) SetKey

func (h *MappedMetricHeader) SetKey(index int, id int32, tagIDKey int32)

type MetricsBucket

type MetricsBucket struct {
	Time uint32

	MultiItems map[Key]*MultiItem
}

func (*MetricsBucket) Empty

func (b *MetricsBucket) Empty() bool

type MultiItem

type MultiItem struct {
	Top  map[string]*MultiValue
	Tail MultiValue // elements not in top are collected here

	Capacity int     // algorithm supports changing on the fly, <2 means DefaultStringTopCapacity
	SF       float64 // set when Marshalling/Sampling
	// contains filtered or unexported fields
}

All our items are technically string tops, but most have empty Top map

func MapKeyItemMultiItem

func MapKeyItemMultiItem(other *map[Key]*MultiItem, key Key, stringTopCapacity int, created *bool) *MultiItem

func (*MultiItem) FinishStringTop

func (s *MultiItem) FinishStringTop(capacity int) float64

func (*MultiItem) MapStringTop

func (s *MultiItem) MapStringTop(str string, count float64) *MultiValue

func (*MultiItem) MapStringTopBytes

func (s *MultiItem) MapStringTopBytes(str []byte, count float64) *MultiValue

func (*MultiItem) Merge

func (s *MultiItem) Merge(s2 *MultiItem)

func (*MultiItem) MergeWithTLItem

func (s *MultiItem) MergeWithTLItem(s2 *tlstatshouse.ItemBytes, hostTag int32)

func (*MultiItem) MergeWithTLMultiItem

func (s *MultiItem) MergeWithTLMultiItem(s2 *tlstatshouse.MultiItemBytes, hostTag int32)

func (*MultiItem) RowBinarySizeEstimate

func (s *MultiItem) RowBinarySizeEstimate() int

func (*MultiItem) TLSizeEstimate

func (s *MultiItem) TLSizeEstimate() int

type MultiValue

type MultiValue struct {
	Value        ItemValue
	ValueTDigest *tdigest.TDigest // We do not create it until we have at least 1 value to add
	HLL          ChUnique
}

func (*MultiValue) AddCounterHost

func (s *MultiValue) AddCounterHost(count float64, hostTag int32)

func (*MultiValue) AddUniqueHost

func (s *MultiValue) AddUniqueHost(hashes []int64, count float64, hostTag int32)

func (*MultiValue) AddValueArrayHostPercentile

func (s *MultiValue) AddValueArrayHostPercentile(values []float64, mult float64, hostTag int32, compression float64)

func (*MultiValue) AddValueCounterHost

func (s *MultiValue) AddValueCounterHost(value float64, count float64, hostTag int32)

func (*MultiValue) AddValueCounterHostPercentile

func (s *MultiValue) AddValueCounterHostPercentile(value float64, count float64, hostTag int32, compression float64)

func (*MultiValue) ApplyUnique

func (s *MultiValue) ApplyUnique(hashes []int64, count float64, hostTag int32)

func (*MultiValue) ApplyValues

func (s *MultiValue) ApplyValues(values []float64, count float64, hostTag int32, compression float64, hasPercentiles bool)

func (*MultiValue) Empty

func (s *MultiValue) Empty() bool

TODO - deal with empty items. Do not allow them to exist?

func (*MultiValue) Merge

func (s *MultiValue) Merge(s2 *MultiValue)

func (*MultiValue) MergeWithTL2

func (s *MultiValue) MergeWithTL2(s2 *tlstatshouse.MultiValueBytes, fields_mask uint32, hostTag int32, compression float64)

func (*MultiValue) MultiValueToTL

func (s *MultiValue) MultiValueToTL(item *tlstatshouse.MultiValue, sampleFactor float64, fieldsMask *uint32, marshalBuf *[]byte)

func (*MultiValue) RowBinarySizeEstimate

func (s *MultiValue) RowBinarySizeEstimate() int

func (*MultiValue) TLSizeEstimate

func (s *MultiValue) TLSizeEstimate() int

type SamplingGroup

type SamplingGroup struct {
	GroupID     int32
	GroupWeight int64 // actually, effective weight

	MetricList      []*SamplingMetric
	SumMetricWeight int64
	SumSize         int64
}

we have additional sampling stage for groups of metrics

type SamplingMetric

type SamplingMetric struct {
	MetricID     int32
	MetricWeight int64 // actually, effective weight
	RoundFactors bool
	Group        *SamplingGroup // never nil when running group sampling algorithm, nil for sampling without groups

	SumSize int64
	Items   []SamplingMultiItemPair
}

type SamplingMultiItemPair

type SamplingMultiItemPair struct {
	Key         Key
	Item        *MultiItem
	WhaleWeight float64 // whale selection criteria, for now sum Counters
}

Directories

Path Synopsis
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
constants
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
internal
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
tl
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
tlengine
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
tlmetadata
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
tlnet
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
tlstatshouse
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
tlstatshouseApi
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.
Code generated by vktl/cmd/tlgen2; DO NOT EDIT.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL