Documentation ¶
Overview ¶
Package mdata stands for "managed data" or "metrics data" if you will it has all the stuff to keep metric data in memory, store it, and synchronize save states over the network
Index ¶
- Constants
- Variables
- func AggBoundary(ts uint32, span uint32) uint32
- func ConfigProcess()
- func ConfigSetup()
- func InitPersistNotifier(not ...Notifier)
- func MatchAgg(key string) (uint16, conf.Aggregation)
- func MatchSchema(key string, interval int) (uint16, conf.Schema)
- func MaxChunkSpan() uint32
- func SendPersistMessage(key string, t0 uint32)
- func SetSingleAgg(met ...conf.Method)
- func SetSingleSchema(ret conf.Retentions)
- func TS(ts interface{}) string
- func TTLs() []uint32
- type AggMetric
- func (a *AggMetric) Add(ts uint32, val float64)
- func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) (uint32, bool)
- func (a *AggMetric) Get(from, to uint32) (Result, error)
- func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)
- func (a *AggMetric) SyncAggregatedChunkSaveState(ts uint32, consolidator consolidation.Consolidator, aggSpan uint32)
- func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback
- type AggMetrics
- type Aggregation
- type Aggregator
- type ChunkSaveCallback
- type ChunkWriteRequest
- type ChunkWriteRequestPayload
- func (z *ChunkWriteRequestPayload) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *ChunkWriteRequestPayload) EncodeMsg(en *msgp.Writer) (err error)
- func (z *ChunkWriteRequestPayload) MarshalMsg(b []byte) (o []byte, err error)
- func (z *ChunkWriteRequestPayload) Msgsize() (s int)
- func (z *ChunkWriteRequestPayload) UnmarshalMsg(bts []byte) (o []byte, err error)
- type DefaultNotifierHandler
- type Metric
- type Metrics
- type MockStore
- func (c *MockStore) Add(cwr *ChunkWriteRequest)
- func (c *MockStore) Items() int
- func (c *MockStore) Reset()
- func (c *MockStore) Search(ctx context.Context, metric schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error)
- func (c *MockStore) SetTracer(t opentracing.Tracer)
- func (c *MockStore) Stop()
- type Notifier
- type NotifierHandler
- type PersistMessageBatch
- type ReorderBuffer
- type Result
- type SavedChunk
- type Store
Constants ¶
const PersistMessageBatchV1 = 1
PersistMessage format version
Variables ¶
var ( // set either via ConfigProcess or from the unit tests. other code should not touch Aggregations conf.Aggregations Schemas conf.Schemas PromDiscardedSamples = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "metrictank", Name: "discarded_samples_total", Help: "Total # of samples that were discarded", }, []string{"reason", "org"}) )
var ErrInvalidRange = errors.New("AggMetric: invalid range: from must be less than to")
Functions ¶
func AggBoundary ¶
AggBoundary returns ts if it is a boundary, or the next boundary otherwise. see description for Aggregator and unit tests, for more details
func ConfigProcess ¶
func ConfigProcess()
func ConfigSetup ¶
func ConfigSetup()
func InitPersistNotifier ¶
func InitPersistNotifier(not ...Notifier)
func MatchAgg ¶
func MatchAgg(key string) (uint16, conf.Aggregation)
MatchAgg returns the aggregation definition for the given metric key, and the index of it (to efficiently reference it) it will always find the aggregation definition because Aggregations has a catchall default
func MatchSchema ¶
MatchSchema returns the schema for the given metric key, and the index of the schema (to efficiently reference it) it will always find the schema because Schemas has a catchall default
func MaxChunkSpan ¶
func MaxChunkSpan() uint32
func SendPersistMessage ¶
func SetSingleAgg ¶
func SetSingleSchema ¶
func SetSingleSchema(ret conf.Retentions)
Types ¶
type AggMetric ¶
AggMetric takes in new values, updates the in-memory data and streams the points to aggregators it uses a circular buffer of chunks each chunk starts at their respective t0 a t0 is a timestamp divisible by chunkSpan without a remainder (e.g. 2 hour boundaries) firstT0's data is held at index 0, indexes go up and wrap around from numChunks-1 to 0 in addition, keep in mind that the last chunk is always a work in progress and not useable for aggregation AggMetric is concurrency-safe
func NewAggMetric ¶
func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow, interval uint32, agg *conf.Aggregation, reorderAllowUpdate, dropFirstChunk bool, ingestFrom int64) *AggMetric
NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long it optionally also creates aggregations with the given settings the 0th retention is the native archive of this metric. if there's several others, we create aggregators, using agg. it's the callers responsibility to make sure agg is not nil in that case! If reorderWindow is greater than 0, a reorder buffer is enabled. In that case data points with duplicate timestamps the behavior is defined by reorderAllowUpdate
func (*AggMetric) GC ¶
GC returns whether or not this AggMetric is stale and can be removed, and its pointcount if so chunkMinTs -> min timestamp of a chunk before to be considered stale and to be persisted to Cassandra metricMinTs -> min timestamp for a metric before to be considered stale and to be purged from the tank
func (*AggMetric) Get ¶
Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to more data then what's requested may be included specifically, returns: * points from the ROB (if enabled) * iters from matching chunks * oldest point we have, so that if your query needs data before it, the caller knows when to query the store
func (*AggMetric) GetAggregated ¶
func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)
func (*AggMetric) SyncAggregatedChunkSaveState ¶
func (a *AggMetric) SyncAggregatedChunkSaveState(ts uint32, consolidator consolidation.Consolidator, aggSpan uint32)
Sync the saved state of a chunk by its T0.
func (*AggMetric) SyncChunkSaveState ¶
func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback
Sync the saved state of a chunk by its T0.
type AggMetrics ¶
type AggMetrics struct { sync.RWMutex Metrics map[uint32]map[schema.Key]*AggMetric // contains filtered or unexported fields }
AggMetrics is an in-memory store of AggMetric objects note: they are keyed by MKey here because each AggMetric manages access to, and references of, their rollup archives themselves
func NewAggMetrics ¶
func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bool, ingestFrom map[uint32]int64, chunkMaxStale, metricMaxStale uint32, gcInterval time.Duration) *AggMetrics
func (*AggMetrics) GC ¶
func (ms *AggMetrics) GC()
periodically scan chunks and close any that have not received data in a while
func (*AggMetrics) GetOrCreate ¶
type Aggregation ¶
Aggregation is a container for all summary statistics / aggregated data for 1 metric, in 1 time frame if the Cnt is 0, the numbers don't necessarily make sense.
func NewAggregation ¶
func NewAggregation() *Aggregation
func (*Aggregation) Add ¶
func (a *Aggregation) Add(val float64)
func (*Aggregation) Reset ¶
func (a *Aggregation) Reset()
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
receives data and builds aggregations note: all points with timestamps t1, t2, t3, t4, [t5] get aggregated into a point with ts t5 where t5 % span = 0. in other words: * an aggregation point reflects the data in the timeframe preceding it. * the timestamps for the aggregated series is quantized to the given span, unlike the raw series which may have an offset (be non-quantized)
func NewAggregator ¶
func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey, retOrig string, ret conf.Retention, agg conf.Aggregation, dropFirstChunk bool, ingestFrom int64) *Aggregator
func (*Aggregator) Add ¶
func (agg *Aggregator) Add(ts uint32, val float64)
Add adds the point to the in-progress aggregation, and flushes it if we reached the boundary points going back in time are accepted, unless they go into a previous bucket, in which case they are ignored
type ChunkSaveCallback ¶ added in v0.13.0
type ChunkSaveCallback func()
type ChunkWriteRequest ¶
type ChunkWriteRequest struct { ChunkWriteRequestPayload Callback ChunkSaveCallback Key schema.AMKey }
ChunkWriteRequest is a request to write a chunk into a store
func NewChunkWriteRequest ¶
func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest
type ChunkWriteRequestPayload ¶ added in v0.13.0
func (*ChunkWriteRequestPayload) DecodeMsg ¶ added in v0.13.0
func (z *ChunkWriteRequestPayload) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*ChunkWriteRequestPayload) EncodeMsg ¶ added in v0.13.0
func (z *ChunkWriteRequestPayload) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*ChunkWriteRequestPayload) MarshalMsg ¶ added in v0.13.0
func (z *ChunkWriteRequestPayload) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*ChunkWriteRequestPayload) Msgsize ¶ added in v0.13.0
func (z *ChunkWriteRequestPayload) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*ChunkWriteRequestPayload) UnmarshalMsg ¶ added in v0.13.0
func (z *ChunkWriteRequestPayload) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
type DefaultNotifierHandler ¶
type DefaultNotifierHandler struct {
// contains filtered or unexported fields
}
func NewDefaultNotifierHandler ¶
func NewDefaultNotifierHandler(metrics Metrics, idx idx.MetricIndex) DefaultNotifierHandler
func (DefaultNotifierHandler) Handle ¶
func (dn DefaultNotifierHandler) Handle(data []byte)
func (DefaultNotifierHandler) PartitionOf ¶
func (dn DefaultNotifierHandler) PartitionOf(key schema.MKey) (int32, bool)
type Metric ¶
type Metric interface { Add(ts uint32, val float64) Get(from, to uint32) (Result, error) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error) }
type MockStore ¶
type MockStore struct { // dont save any data. Drop bool // contains filtered or unexported fields }
MockStore is an in-memory Store implementation for unit tests
func NewMockStore ¶
func NewMockStore() *MockStore
func (*MockStore) Add ¶
func (c *MockStore) Add(cwr *ChunkWriteRequest)
Add adds a chunk to the store
func (*MockStore) Search ¶
func (c *MockStore) Search(ctx context.Context, metric schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error)
searches through the mock results and returns the right ones according to start / end
func (*MockStore) SetTracer ¶
func (c *MockStore) SetTracer(t opentracing.Tracer)
type Notifier ¶
type Notifier interface {
Send(SavedChunk)
}
type NotifierHandler ¶
type PersistMessageBatch ¶
type PersistMessageBatch struct { Instance string `json:"instance"` SavedChunks []SavedChunk `json:"saved_chunks"` }
type ReorderBuffer ¶
type ReorderBuffer struct {
// contains filtered or unexported fields
}
ReorderBuffer keeps a window of data during which it is ok to send data out of order. The reorder buffer itself is not thread safe because it is only used by AggMetric, which is thread safe, so there is no locking in the buffer.
newest=0 may mean no points added yet, or newest point is at position 0. we use the Ts of points in the buffer to check for valid points. Ts == 0 means no point in particular newest.Ts == 0 means the buffer is empty the buffer is evenly spaced (points are `interval` apart) and may be sparsely populated
func NewReorderBuffer ¶
func NewReorderBuffer(reorderWindow, interval uint32, allowUpdate bool) *ReorderBuffer
func (*ReorderBuffer) Add ¶
Add adds the point if it falls within the window. it returns points that have been purged out of the buffer, as well as whether the add succeeded.
func (*ReorderBuffer) Flush ¶
func (rob *ReorderBuffer) Flush() []schema.Point
func (*ReorderBuffer) Get ¶
func (rob *ReorderBuffer) Get() []schema.Point
Get returns the points in the buffer
func (*ReorderBuffer) IsEmpty ¶
func (rob *ReorderBuffer) IsEmpty() bool
func (*ReorderBuffer) Reset ¶
func (rob *ReorderBuffer) Reset()
type SavedChunk ¶
SavedChunk represents a chunk persisted to the store Key is a stringified schema.AMKey
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
package chunk encodes timeseries in chunks of data see devdocs/chunk-format.md for more information.
|
package chunk encodes timeseries in chunks of data see devdocs/chunk-format.md for more information. |
tsz
Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info
|
Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info |