Documentation
¶
Index ¶
- Constants
- Variables
- func EncodeDatapoint(mName []byte, tags *TagsHolder, dp float64, timestamp uint32, nBytes uint64, ...) error
- func ExtractInfluxPayloadAndInsertDp(rawCSV []byte, tags *TagsHolder, orgid uint64) (uint32, []error)
- func ExtractOTSDBPayload(rawJson []byte, tags *TagsHolder) ([]byte, float64, uint32, error)
- func ForceFlushMetricsBlock()
- func GetFinalTagsTreeDir(mid string, suffix uint64) string
- func GetTotalEncodedSize() uint64
- func GetUniqueTagKeysForUnrotated(tRange *dtu.MetricsTimeRange, myid uint64) (map[string]struct{}, error)
- func GetUnrotatedMetricStats(orgid uint64) (uint64, uint64, uint64)
- func GetUnrotatedMetricsSegmentRequests(tRange *dtu.MetricsTimeRange, querySummary *summary.QuerySummary, orgid uint64) (map[string][]*structs.MetricsSearchRequest, error)
- func InitMetricsSegStore()
- func InitTestingConfig()
- func ResetMetricsSegStore_TestOnly()
- type MetricsAndTagsHolder
- type MetricsBlock
- type MetricsSegment
- func (ms *MetricsSegment) AddMNameToBloom(mName []byte)
- func (ms *MetricsSegment) CheckAndRotate(forceRotate bool) error
- func (ms *MetricsSegment) FlushMetricNames() error
- func (ms *MetricsSegment) FlushMetricNamesBloom() error
- func (ms *MetricsSegment) LoadMetricNamesIntoMap(resultContainer map[string]bool)
- func (ms *MetricsSegment) SetMockMetricSegmentMNamesBloom() string
- func (ms *MetricsSegment) SetMockMetricSegmentMNamesMap(mNamesCount uint32, mNameBase string) string
- type TagTree
- type TagsHolder
- type TagsTreeHolder
- type TimeSeries
Constants ¶
const METRICS_BLK_FLUSH_SLEEP_DURATION = 60 // 1 min
const METRICS_BLK_ROTATE_SLEEP_DURATION = 10 // 10 seconds
Variables ¶
var OrgMetricsAndTags map[uint64]*MetricsAndTagsHolder = make(map[uint64]*MetricsAndTagsHolder)
var TAGS_TREE_FLUSH_SLEEP_DURATION = 60 // 1 min
Functions ¶
func EncodeDatapoint ¶
func EncodeDatapoint(mName []byte, tags *TagsHolder, dp float64, timestamp uint32, nBytes uint64, orgid uint64) error
For a given metricName, tags, dp, and timestamp, add it to the respective in memory series
Internally, this function will try to find the series then will encode it. If it cannot find the series or no space exists in the metrics segment, it will return an error
Return number of bytes written and any error encountered
func ExtractInfluxPayloadAndInsertDp ¶
func ExtractInfluxPayloadAndInsertDp(rawCSV []byte, tags *TagsHolder, orgid uint64) (uint32, []error)
for an input raw csv row []byte; extract the metric name, datapoint value, timestamp, all tags Call the EncodeDatapoint function to add the datapoint to the respective series Return the number of datapoints ingested and any errors encountered
func ExtractOTSDBPayload ¶
for an input raw json []byte, return the metric name, datapoint value, timestamp, all tags, and any errors occurred The metric name is returned as a raw []byte The tags
func ForceFlushMetricsBlock ¶
func ForceFlushMetricsBlock()
func GetFinalTagsTreeDir ¶
func GetTotalEncodedSize ¶
func GetTotalEncodedSize() uint64
func GetUniqueTagKeysForUnrotated ¶
func GetUniqueTagKeysForUnrotated(tRange *dtu.MetricsTimeRange, myid uint64) (map[string]struct{}, error)
func GetUnrotatedMetricStats ¶
Returns the total incoming bytes, total on disk bytes, approx number of datapoints across all metric segments
func GetUnrotatedMetricsSegmentRequests ¶
func GetUnrotatedMetricsSegmentRequests(tRange *dtu.MetricsTimeRange, querySummary *summary.QuerySummary, orgid uint64) (map[string][]*structs.MetricsSearchRequest, error)
func InitMetricsSegStore ¶
func InitMetricsSegStore()
TODO: pre-allocates as many metricsbuffers that can fix and sets hash range To evenly distribute metric names, hash range can simply metricsId mod numMetricsBuffers
func InitTestingConfig ¶
func InitTestingConfig()
func ResetMetricsSegStore_TestOnly ¶
func ResetMetricsSegStore_TestOnly()
Types ¶
type MetricsAndTagsHolder ¶
type MetricsAndTagsHolder struct { MetricSegments map[string]*MetricsSegment TagHolders map[string]*TagsTreeHolder }
type MetricsBlock ¶
type MetricsBlock struct {
// contains filtered or unexported fields
}
A metrics buffer represent a 15 minute (or 1GB size) window of encoded series
A metrics buffer's suffix determines the path of the generated files in relation to the metricssegment ¶
Every 5s, this metrics buffer should persist to disk and will create / update two file:
- Raw TS encoded file. Format [tsid][packed-len][raw-values]
- TSID offset file. Format [tsid][soff]
func (*MetricsBlock) FlushTSOAndTSGFiles ¶
func (mb *MetricsBlock) FlushTSOAndTSGFiles(file string) error
Format of TSO file: [version - 1 byte][number of tsids - 2 bytes][tsid - 8bytes][offset - 4 bytes][tsid - 8bytes]... Formar of TSG file: [version - 1 byte][tsid - 8bytes][len - 4 bytes][raw series - n bytes][tsid - 8 bytes]...
func (*MetricsBlock) GetTimeSeries ¶
func (mb *MetricsBlock) GetTimeSeries(tsid uint64) (*TimeSeries, bool, error)
returns:
*TimeSeries corresponding to tsid if found bool indicating if the tsid was found
This will create the time series if it doesn't exist already
func (*MetricsBlock) InsertTimeSeries ¶
func (mb *MetricsBlock) InsertTimeSeries(tsid uint64, ts *TimeSeries) (bool, int, error)
Inserts a time series for the given tsid
The caller is responsible for acquiring and releasing the the required locks
Returns bool if the tsid already existed, the idx it exists at, or any errors
type MetricsSegment ¶
type MetricsSegment struct { Suffix uint64 // current suffix Mid string // metrics id for this metric segment Orgid uint64 // contains filtered or unexported fields }
A metrics segment represents a 2hr window and consists of many metrics blocks and tagTrees.
Only a single metrics buffer per metrics segment can be in memory at a time. Prior metrics buffers will be flushed to disk.
The tagsTree will be shared across metrics this metrics segment.
A metrics segment generate the following set of files:
- A tagTree file for each incoming tagKey seen across this segment
- A metricsBlock file for each incoming 15minute window
- A bloomfilter for all metric names in the metrics segment
TODO: this metrics segment should reject samples not in 2hr window
func GetAllMetricsSegments ¶
func GetAllMetricsSegments() []*MetricsSegment
func GetMetricSegments ¶
func GetMetricSegments(orgid uint64) []*MetricsSegment
func GetUnrotatedMetricSegmentsOverTheTimeRange ¶
func GetUnrotatedMetricSegmentsOverTheTimeRange(tRange *dtu.MetricsTimeRange, orgid uint64) ([]*MetricsSegment, error)
func InitMetricsSegment ¶
func InitMetricsSegment(orgid uint64, mId string) (*MetricsSegment, error)
func (*MetricsSegment) AddMNameToBloom ¶
func (ms *MetricsSegment) AddMNameToBloom(mName []byte)
func (*MetricsSegment) CheckAndRotate ¶
func (ms *MetricsSegment) CheckAndRotate(forceRotate bool) error
Wrapper function to check and rotate the current metrics block or the metrics segment
Caller is responsible for acquiring locks
func (*MetricsSegment) FlushMetricNames ¶
func (ms *MetricsSegment) FlushMetricNames() error
- Flushes the metrics segment's mNamesMap to disk - The Metirc Names are stored in the Length and Value format.
func (*MetricsSegment) FlushMetricNamesBloom ¶
func (ms *MetricsSegment) FlushMetricNamesBloom() error
func (*MetricsSegment) LoadMetricNamesIntoMap ¶
func (ms *MetricsSegment) LoadMetricNamesIntoMap(resultContainer map[string]bool)
func (*MetricsSegment) SetMockMetricSegmentMNamesBloom ¶
func (ms *MetricsSegment) SetMockMetricSegmentMNamesBloom() string
This is a mock function and is only used during tests.
func (*MetricsSegment) SetMockMetricSegmentMNamesMap ¶
func (ms *MetricsSegment) SetMockMetricSegmentMNamesMap(mNamesCount uint32, mNameBase string) string
This is a mock function and is only used during tests.
type TagTree ¶
type TagTree struct {
// contains filtered or unexported fields
}
TagTree is a two level tree, containing metricName at level 1 and a tagValue at level 2 The leaf nodes stores the tsids that match certain tagValue
The tags for a metrics will be inserted via xxhash to allow for O(log n) search ¶
TODO: how to flushes to just write updates
func InitTagsTree ¶
type TagsHolder ¶
type TagsHolder struct {
// contains filtered or unexported fields
}
func GetTagsHolder ¶
func GetTagsHolder() *TagsHolder
Allocates and returns a TagsHolder
Caller is responsible for calling ReturnTagsHolder
func (*TagsHolder) GetTSID ¶
func (th *TagsHolder) GetTSID(mName []byte) (uint64, error)
Gets the TSID given a metric name
Internally, will make sure the tags keys are sorted
func (*TagsHolder) Insert ¶
func (th *TagsHolder) Insert(key string, value []byte, vType jp.ValueType)
func (*TagsHolder) String ¶
func (th *TagsHolder) String() string
type TagsTreeHolder ¶
type TagsTreeHolder struct {
// contains filtered or unexported fields
}
Holder struct for all tagTrees
Internally, will expose functions to check and add tags to the tree
func GetAllTagsTreeHolders ¶
func GetAllTagsTreeHolders() []*TagsTreeHolder
func GetTagsTreeHolder ¶
func GetTagsTreeHolder(orgid uint64, mid string) *TagsTreeHolder
func InitTagsTreeHolder ¶
func InitTagsTreeHolder(mid string) (*TagsTreeHolder, error)
func (*TagsTreeHolder) AddTagsForTSID ¶
func (tth *TagsTreeHolder) AddTagsForTSID(mName []byte, tags *TagsHolder, tsid uint64) error
Returns a bool indicating if this tsid is new
Adds the inputed tags into corresponding tagsTree ¶
Internally, will use the internal bloom to check if the tsid has already been added or not
func (*TagsTreeHolder) EncodeTagsTreeHolder ¶
func (tt *TagsTreeHolder) EncodeTagsTreeHolder() error
type TimeSeries ¶
type TimeSeries struct {
// contains filtered or unexported fields
}
Represents a single timeseries
func (*TimeSeries) AddSingleEntry ¶
func (ts *TimeSeries) AddSingleEntry(dpVal float64, dpTS uint32) (uint64, error)
adds this single dp and time entry to the time series encode dpVal & dpTs using dod / floating point compression every 15 mins, if a series was updated, we need to flush it
Returns number of bytes written, or any errors encoundered