Documentation ¶
Index ¶
- func GetConfig(network string, rpcClient *rpc.Client, addressesExt []string, ...) tlstatshouse.GetConfigResult
- func SpareShardReplica(shardReplica int, timestamp uint32) int
- func ValidateConfigSource(c Config) error
- type Agent
- func (s *Agent) AddCounter(key data_model.Key, count float64)
- func (s *Agent) AddCounterHost(key data_model.Key, count float64, hostTag int32, ...)
- func (s *Agent) AddCounterHostStringBytes(key data_model.Key, str []byte, count float64, hostTag int32, ...)
- func (s *Agent) AddUniqueHostStringBytes(key data_model.Key, hostTag int32, str []byte, hashes []int64, count float64, ...)
- func (s *Agent) AddValueArrayCounterHostStringBytes(key data_model.Key, values []float64, mult float64, hostTag int32, str []byte, ...)
- func (s *Agent) AddValueCounter(key data_model.Key, value float64, counter float64, ...)
- func (s *Agent) AddValueCounterHost(key data_model.Key, value float64, counter float64, hostTag int32)
- func (s *Agent) AddValueCounterHostArray(key data_model.Key, values []float64, mult float64, hostTag int32, ...)
- func (s *Agent) AddValueCounterHostStringBytes(key data_model.Key, value float64, counter float64, hostTag int32, str []byte)
- func (s *Agent) AggKey(time uint32, metricID int32, keys [format.MaxTags]int32) data_model.Key
- func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetricHeader, ...)
- func (s *Agent) AutoCreateMetric(ctx context.Context, args tlstatshouse.AutoCreate) error
- func (s *Agent) Close()
- func (s *Agent) CreateBuiltInItemValue(key data_model.Key) *BuiltInItemValue
- func (s *Agent) GetTagMappingBootstrap(ctxParent context.Context) ([]tlstatshouse.Mapping, time.Duration, error)
- func (s *Agent) LoadMetaMetricJournal(ctxParent context.Context, version int64, returnIfEmpty bool) ([]tlmetadata.Event, int64, error)
- func (s *Agent) LoadOrCreateMapping(ctxParent context.Context, key string, floodLimitKey interface{}) (pcache.Value, time.Duration, error)
- func (s *Agent) LoadPromTargets(ctxParent context.Context, version string) (res *tlstatshouse.GetTargetsResult, versionHash string, err error)
- func (s *Agent) MergeItemValue(key data_model.Key, item *data_model.ItemValue, ...)
- func (s *Agent) NumShardReplicas() int
- func (s *Agent) Run(aggHost int32, aggShardKey int32, aggReplicaKey int32)
- type BuiltInItemValue
- type Config
- type DiskBucketStorage
- func (d *DiskBucketStorage) Close() error
- func (d *DiskBucketStorage) EraseBucket(shardID int, time uint32) error
- func (d *DiskBucketStorage) GetBucket(shardID int, time uint32, scratchPad *[]byte) ([]byte, error)
- func (d *DiskBucketStorage) PutBucket(shardID int, time uint32, data []byte) error
- func (d *DiskBucketStorage) ReadNextTailSecond(shardID int) (uint32, bool)
- func (d *DiskBucketStorage) TotalFileSize(shardID int) int64
- type Shard
- func (s *Shard) AddCounterHost(key data_model.Key, keyHash uint64, count float64, hostTag int32, ...)
- func (s *Shard) AddCounterHostStringBytes(key data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, ...)
- func (s *Shard) AddUniqueHostStringBytes(key data_model.Key, hostTag int32, str []byte, keyHash uint64, hashes []int64, ...)
- func (s *Shard) AddValueArrayCounterHost(key data_model.Key, keyHash uint64, values []float64, mult float64, ...)
- func (s *Shard) AddValueArrayCounterHostStringBytes(key data_model.Key, keyHash uint64, values []float64, mult float64, ...)
- func (s *Shard) AddValueCounterHost(key data_model.Key, keyHash uint64, value float64, counter float64, ...)
- func (s *Shard) AddValueCounterHostStringBytes(key data_model.Key, keyHash uint64, value float64, count float64, ...)
- func (s *Shard) ApplyCounter(key data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, ...)
- func (s *Shard) ApplyUnique(key data_model.Key, keyHash uint64, str []byte, hashes []int64, count float64, ...)
- func (s *Shard) ApplyValues(key data_model.Key, keyHash uint64, str []byte, values []float64, ...)
- func (s *Shard) CreateBuiltInItemValue(key data_model.Key) *BuiltInItemValue
- func (s *Shard) HistoricBucketsDataSizeDisk() int64
- func (s *Shard) HistoricBucketsDataSizeMemory() int
- func (s *Shard) IsAlive() bool
- func (s *Shard) MergeItemValue(key data_model.Key, keyHash uint64, item *data_model.ItemValue, ...)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SpareShardReplica ¶
func ValidateConfigSource ¶
Types ¶
type Agent ¶
type Agent struct { Shards []*Shard // Actually those are shard-replicas GetConfigResult tlstatshouse.GetConfigResult // for ingress proxy // Used for builtin metrics when running inside aggregator AggregatorShardKey int32 AggregatorReplicaKey int32 AggregatorHost int32 // contains filtered or unexported fields }
func MakeAgent ¶
func MakeAgent(network string, storageDir string, aesPwd string, config Config, hostName string, componentTag int32, metricStorage format.MetaStorageInterface, logF func(format string, args ...interface{}), beforeFlushBucketFunc func(now time.Time), getConfigResult *tlstatshouse.GetConfigResult) (*Agent, error)
All shard aggregators must be on the same network
func (*Agent) AddCounter ¶
func (s *Agent) AddCounter(key data_model.Key, count float64)
count should be > 0 and not NaN
func (*Agent) AddCounterHost ¶
func (s *Agent) AddCounterHost(key data_model.Key, count float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Agent) AddCounterHostStringBytes ¶
func (s *Agent) AddCounterHostStringBytes(key data_model.Key, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue)
str should be reasonably short. Empty string will be undistinguishable from "the rest" count should be > 0 and not NaN
func (*Agent) AddUniqueHostStringBytes ¶
func (s *Agent) AddUniqueHostStringBytes(key data_model.Key, hostTag int32, str []byte, hashes []int64, count float64, metricInfo *format.MetricMetaValue)
func (*Agent) AddValueArrayCounterHostStringBytes ¶
func (s *Agent) AddValueArrayCounterHostStringBytes(key data_model.Key, values []float64, mult float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue)
func (*Agent) AddValueCounter ¶
func (s *Agent) AddValueCounter(key data_model.Key, value float64, counter float64, metricInfo *format.MetricMetaValue)
value should be not NaN.
func (*Agent) AddValueCounterHost ¶
func (*Agent) AddValueCounterHostArray ¶
func (s *Agent) AddValueCounterHostArray(key data_model.Key, values []float64, mult float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Agent) AddValueCounterHostStringBytes ¶
func (*Agent) ApplyMetric ¶
func (s *Agent) ApplyMetric(m tlstatshouse.MetricBytes, h data_model.MappedMetricHeader, ingestionStatusOKTag int32)
func (*Agent) AutoCreateMetric ¶
func (s *Agent) AutoCreateMetric(ctx context.Context, args tlstatshouse.AutoCreate) error
func (*Agent) CreateBuiltInItemValue ¶
func (s *Agent) CreateBuiltInItemValue(key data_model.Key) *BuiltInItemValue
Do not create too many. Shards will iterate through values before flushing bucket Useful for watermark metrics.
func (*Agent) GetTagMappingBootstrap ¶
func (*Agent) LoadMetaMetricJournal ¶
func (*Agent) LoadOrCreateMapping ¶
func (*Agent) LoadPromTargets ¶
func (s *Agent) LoadPromTargets(ctxParent context.Context, version string) (res *tlstatshouse.GetTargetsResult, versionHash string, err error)
func (*Agent) MergeItemValue ¶
func (s *Agent) MergeItemValue(key data_model.Key, item *data_model.ItemValue, metricInfo *format.MetricMetaValue)
func (*Agent) NumShardReplicas ¶
type BuiltInItemValue ¶
type BuiltInItemValue struct {
// contains filtered or unexported fields
}
func (*BuiltInItemValue) AddValueCounter ¶
func (s *BuiltInItemValue) AddValueCounter(value float64, count float64)
For counters, use AddValueCounter(0, 1)
func (*BuiltInItemValue) Merge ¶
func (s *BuiltInItemValue) Merge(s2 *data_model.ItemValue)
func (*BuiltInItemValue) SetValueCounter ¶
func (s *BuiltInItemValue) SetValueCounter(value float64, count float64)
type Config ¶
type Config struct { AggregatorAddresses []string // Shard Sampling Algorithm SampleBudget int // for all shards, in bytes SampleGroups bool // use group weights. Experimental, will be turned on unconditionally later MaxHistoricDiskSize int64 // for all shards, in bytes // How much strings (per key) is stored and sent to aggregator StringTopCapacity int StringTopCountSend int // Liveness detector to switch between original and spare LivenessResponsesWindowLength int LivenessResponsesWindowSuccesses int KeepAliveSuccessTimeout time.Duration // LivenessResponsesWindowLength subsequent keepalives must takes < this SaveSecondsImmediately bool // If false, will only go to disk if first send fails StatsHouseEnv string Cluster string SkipFirstNShards int // if cluster is extended, first shard might be almost full, so we can skip them for some time. RemoteWriteEnabled bool RemoteWriteAddr string RemoteWritePath string AutoCreate bool }
func DefaultConfig ¶
func DefaultConfig() Config
type DiskBucketStorage ¶
type DiskBucketStorage struct {
// contains filtered or unexported fields
}
func MakeDiskBucketStorage ¶
func MakeDiskBucketStorage(dirPath string, numShards int, logf func(format string, args ...interface{})) (*DiskBucketStorage, error)
func (*DiskBucketStorage) Close ¶
func (d *DiskBucketStorage) Close() error
func (*DiskBucketStorage) EraseBucket ¶
func (d *DiskBucketStorage) EraseBucket(shardID int, time uint32) error
func (*DiskBucketStorage) PutBucket ¶
func (d *DiskBucketStorage) PutBucket(shardID int, time uint32, data []byte) error
func (*DiskBucketStorage) ReadNextTailSecond ¶
func (d *DiskBucketStorage) ReadNextTailSecond(shardID int) (uint32, bool)
func (*DiskBucketStorage) TotalFileSize ¶
func (d *DiskBucketStorage) TotalFileSize(shardID int) int64
type Shard ¶
type Shard struct { ShardReplicaNum int CurrentTime uint32 CurrentBuckets [][]*data_model.MetricsBucket // [resolution][shard]. All disallowed resolutions are always skipped MissedSeconds uint32 // If disk is slow or computer sleeps/slows, several seconds can get into single bucket FutureQueue [][]*data_model.MetricsBucket // 60 seconds long circular buffer. CurentLowResBucket [][]*data_model.MetricsBucket // [resolution][shard] LowResFutureQueue []*data_model.MetricsBucket // Max 60 seconds long. Shorter if max resolution is lower. BucketsToSend chan compressedBucketData BuiltInItemValues []*BuiltInItemValue // Moved into CurrentBuckets before flush PreprocessingBucketTime uint32 PreprocessingBuckets []*data_model.MetricsBucket // CurrentBuckets is moved here, if PreviousBucket empty PreprocessingMissedSeconds uint32 // copy of MissedSeconds for bucket being processed HistoricBucketsToSend []compressedBucketData // Slightly out of order here HistoricBucketsDataSize int // if too many are with data, will put without data, which will be read from disk // contains filtered or unexported fields }
Shard gets data after initial hashing and shard number
func (*Shard) AddCounterHost ¶
func (s *Shard) AddCounterHost(key data_model.Key, keyHash uint64, count float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Shard) AddCounterHostStringBytes ¶
func (s *Shard) AddCounterHostStringBytes(key data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Shard) AddUniqueHostStringBytes ¶
func (s *Shard) AddUniqueHostStringBytes(key data_model.Key, hostTag int32, str []byte, keyHash uint64, hashes []int64, count float64, metricInfo *format.MetricMetaValue)
func (*Shard) AddValueArrayCounterHost ¶
func (s *Shard) AddValueArrayCounterHost(key data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Shard) AddValueArrayCounterHostStringBytes ¶
func (s *Shard) AddValueArrayCounterHostStringBytes(key data_model.Key, keyHash uint64, values []float64, mult float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue)
func (*Shard) AddValueCounterHost ¶
func (s *Shard) AddValueCounterHost(key data_model.Key, keyHash uint64, value float64, counter float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Shard) AddValueCounterHostStringBytes ¶
func (s *Shard) AddValueCounterHostStringBytes(key data_model.Key, keyHash uint64, value float64, count float64, hostTag int32, str []byte, metricInfo *format.MetricMetaValue)
func (*Shard) ApplyCounter ¶
func (s *Shard) ApplyCounter(key data_model.Key, keyHash uint64, str []byte, count float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Shard) ApplyUnique ¶
func (s *Shard) ApplyUnique(key data_model.Key, keyHash uint64, str []byte, hashes []int64, count float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Shard) ApplyValues ¶
func (s *Shard) ApplyValues(key data_model.Key, keyHash uint64, str []byte, values []float64, count float64, hostTag int32, metricInfo *format.MetricMetaValue)
func (*Shard) CreateBuiltInItemValue ¶
func (s *Shard) CreateBuiltInItemValue(key data_model.Key) *BuiltInItemValue
func (*Shard) HistoricBucketsDataSizeDisk ¶
func (*Shard) HistoricBucketsDataSizeMemory ¶
func (*Shard) MergeItemValue ¶
func (s *Shard) MergeItemValue(key data_model.Key, keyHash uint64, item *data_model.ItemValue, metricInfo *format.MetricMetaValue)
Click to show internal directories.
Click to hide internal directories.