Documentation ¶
Overview ¶
Package mdata stands for "managed data" or "metrics data" if you will it has all the stuff to keep metric data in memory, store it, and synchronize save states over the network
Index ¶
- Constants
- Variables
- func AggBoundary(ts uint32, span uint32) uint32
- func ConfigProcess()
- func ConfigSetup()
- func InitPersistNotifier(not ...Notifier)
- func MatchAgg(key string) (uint16, conf.Aggregation)
- func MatchSchema(key string, interval int) (uint16, conf.Schema)
- func MaxChunkSpan() uint32
- func SendPersistMessage(key string, t0 uint32)
- func SetSingleAgg(met ...conf.Method)
- func SetSingleSchema(ret ...conf.Retention)
- func TS(ts interface{}) string
- func TTLs() []uint32
- type AggMetric
- func (a *AggMetric) Add(ts uint32, val float64)
- func (a *AggMetric) GC(now, chunkMinTs, metricMinTs uint32) (uint32, bool)
- func (a *AggMetric) Get(from, to uint32) (Result, error)
- func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)
- func (a *AggMetric) SyncAggregatedChunkSaveState(ts uint32, consolidator consolidation.Consolidator, aggSpan uint32)
- func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback
- type AggMetrics
- type Aggregation
- type Aggregator
- type ChunkSaveCallback
- type ChunkWriteRequest
- type ChunkWriteRequestPayload
- func (z *ChunkWriteRequestPayload) DecodeMsg(dc *msgp.Reader) (err error)
- func (z *ChunkWriteRequestPayload) EncodeMsg(en *msgp.Writer) (err error)
- func (z *ChunkWriteRequestPayload) MarshalMsg(b []byte) (o []byte, err error)
- func (z *ChunkWriteRequestPayload) Msgsize() (s int)
- func (z *ChunkWriteRequestPayload) UnmarshalMsg(bts []byte) (o []byte, err error)
- type DefaultNotifierHandler
- type Metric
- type Metrics
- type MockStore
- func (c *MockStore) Add(cwr *ChunkWriteRequest)
- func (c *MockStore) Items() int
- func (c *MockStore) Reset()
- func (c *MockStore) Search(ctx context.Context, metric schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error)
- func (c *MockStore) SetTracer(t opentracing.Tracer)
- func (c *MockStore) Stop()
- type Notifier
- type NotifierHandler
- type PersistMessageBatch
- type ReorderBuffer
- type Result
- type SavedChunk
- type Store
Constants ¶
const PersistMessageBatchV1 = 1
PersistMessage format version
Variables ¶
var ( // set either via ConfigProcess or from the unit tests. other code should not touch Aggregations conf.Aggregations Schemas conf.Schemas PromDiscardedSamples = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "metrictank", Name: "discarded_samples_total", Help: "Total # of samples that were discarded", }, []string{"reason", "org"}) )
var ErrInvalidRange = errors.New("AggMetric: invalid range: from must be less than to")
Functions ¶
func AggBoundary ¶
AggBoundary returns ts if it is a boundary, or the next boundary otherwise. see description for Aggregator and unit tests, for more details
func ConfigProcess ¶
func ConfigProcess()
func ConfigSetup ¶
func ConfigSetup()
func InitPersistNotifier ¶
func InitPersistNotifier(not ...Notifier)
func MatchAgg ¶
func MatchAgg(key string) (uint16, conf.Aggregation)
MatchAgg returns the aggregation definition for the given metric key, and the index of it (to efficiently reference it) it will always find the aggregation definition because Aggregations has a catchall default
func MatchSchema ¶
MatchSchema returns the schema for the given metric key, and the index of the schema (to efficiently reference it) it will always find the schema because Schemas has a catchall default
func MaxChunkSpan ¶
func MaxChunkSpan() uint32
func SendPersistMessage ¶
func SetSingleAgg ¶
func SetSingleSchema ¶
Types ¶
type AggMetric ¶
AggMetric takes in new values, updates the in-memory data and streams the points to aggregators it uses a circular buffer of chunks each chunk starts at their respective t0 a t0 is a timestamp divisible by chunkSpan without a remainder (e.g. 2 hour boundaries) firstT0's data is held at index 0, indexes go up and wrap around from numChunks-1 to 0 in addition, keep in mind that the last chunk is always a work in progress and not useable for aggregation AggMetric is concurrency-safe
func NewAggMetric ¶
func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey, retentions conf.Retentions, reorderWindow, interval uint32, agg *conf.Aggregation, dropFirstChunk bool) *AggMetric
NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long it optionally also creates aggregations with the given settings the 0th retention is the native archive of this metric. if there's several others, we create aggregators, using agg. it's the callers responsibility to make sure agg is not nil in that case!
func (*AggMetric) GC ¶
GC returns whether or not this AggMetric is stale and can be removed, and its pointcount if so chunkMinTs -> min timestamp of a chunk before to be considered stale and to be persisted to Cassandra metricMinTs -> min timestamp for a metric before to be considered stale and to be purged from the tank
func (*AggMetric) Get ¶
Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to more data then what's requested may be included specifically, returns: * points from the ROB (if enabled) * iters from matching chunks * oldest point we have, so that if your query needs data before it, the caller knows when to query the store
func (*AggMetric) GetAggregated ¶
func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error)
func (*AggMetric) SyncAggregatedChunkSaveState ¶
func (a *AggMetric) SyncAggregatedChunkSaveState(ts uint32, consolidator consolidation.Consolidator, aggSpan uint32)
Sync the saved state of a chunk by its T0.
func (*AggMetric) SyncChunkSaveState ¶
func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback
Sync the saved state of a chunk by its T0.
type AggMetrics ¶
type AggMetrics struct { sync.RWMutex Metrics map[uint32]map[schema.Key]*AggMetric // contains filtered or unexported fields }
AggMetrics is an in-memory store of AggMetric objects note: they are keyed by MKey here because each AggMetric manages access to, and references of, their rollup archives themselves
func NewAggMetrics ¶
func NewAggMetrics(store Store, cachePusher cache.CachePusher, dropFirstChunk bool, chunkMaxStale, metricMaxStale uint32, gcInterval time.Duration) *AggMetrics
func (*AggMetrics) GC ¶
func (ms *AggMetrics) GC()
periodically scan chunks and close any that have not received data in a while
func (*AggMetrics) GetOrCreate ¶
type Aggregation ¶
Aggregation is a container for all summary statistics / aggregated data for 1 metric, in 1 time frame if the Cnt is 0, the numbers don't necessarily make sense.
func NewAggregation ¶
func NewAggregation() *Aggregation
func (*Aggregation) Add ¶
func (a *Aggregation) Add(val float64)
func (*Aggregation) Reset ¶
func (a *Aggregation) Reset()
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
receives data and builds aggregations note: all points with timestamps t1, t2, t3, t4, [t5] get aggregated into a point with ts t5 where t5 % span = 0. in other words: * an aggregation point reflects the data in the timeframe preceding it. * the timestamps for the aggregated series is quantized to the given span, unlike the raw series which may have an offset (be non-quantized)
func NewAggregator ¶
func NewAggregator(store Store, cachePusher cache.CachePusher, key schema.AMKey, ret conf.Retention, agg conf.Aggregation, dropFirstChunk bool) *Aggregator
func (*Aggregator) Add ¶
func (agg *Aggregator) Add(ts uint32, val float64)
type ChunkSaveCallback ¶
type ChunkSaveCallback func()
type ChunkWriteRequest ¶
type ChunkWriteRequest struct { ChunkWriteRequestPayload Callback ChunkSaveCallback Key schema.AMKey }
ChunkWriteRequest is a request to write a chunk into a store
func NewChunkWriteRequest ¶
func NewChunkWriteRequest(callback ChunkSaveCallback, key schema.AMKey, ttl, t0 uint32, data []byte, ts time.Time) ChunkWriteRequest
type ChunkWriteRequestPayload ¶
func (*ChunkWriteRequestPayload) DecodeMsg ¶
func (z *ChunkWriteRequestPayload) DecodeMsg(dc *msgp.Reader) (err error)
DecodeMsg implements msgp.Decodable
func (*ChunkWriteRequestPayload) EncodeMsg ¶
func (z *ChunkWriteRequestPayload) EncodeMsg(en *msgp.Writer) (err error)
EncodeMsg implements msgp.Encodable
func (*ChunkWriteRequestPayload) MarshalMsg ¶
func (z *ChunkWriteRequestPayload) MarshalMsg(b []byte) (o []byte, err error)
MarshalMsg implements msgp.Marshaler
func (*ChunkWriteRequestPayload) Msgsize ¶
func (z *ChunkWriteRequestPayload) Msgsize() (s int)
Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (*ChunkWriteRequestPayload) UnmarshalMsg ¶
func (z *ChunkWriteRequestPayload) UnmarshalMsg(bts []byte) (o []byte, err error)
UnmarshalMsg implements msgp.Unmarshaler
type DefaultNotifierHandler ¶
type DefaultNotifierHandler struct {
// contains filtered or unexported fields
}
func NewDefaultNotifierHandler ¶
func NewDefaultNotifierHandler(metrics Metrics, idx idx.MetricIndex) DefaultNotifierHandler
func (DefaultNotifierHandler) Handle ¶
func (dn DefaultNotifierHandler) Handle(data []byte)
func (DefaultNotifierHandler) PartitionOf ¶
func (dn DefaultNotifierHandler) PartitionOf(key schema.MKey) (int32, bool)
type Metric ¶
type Metric interface { Add(ts uint32, val float64) Get(from, to uint32) (Result, error) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error) }
type MockStore ¶
type MockStore struct { // dont save any data. Drop bool // contains filtered or unexported fields }
MockStore is an in-memory Store implementation for unit tests
func NewMockStore ¶
func NewMockStore() *MockStore
func (*MockStore) Add ¶
func (c *MockStore) Add(cwr *ChunkWriteRequest)
Add adds a chunk to the store
func (*MockStore) Search ¶
func (c *MockStore) Search(ctx context.Context, metric schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error)
searches through the mock results and returns the right ones according to start / end
func (*MockStore) SetTracer ¶
func (c *MockStore) SetTracer(t opentracing.Tracer)
type Notifier ¶
type Notifier interface {
Send(SavedChunk)
}
type NotifierHandler ¶
type PersistMessageBatch ¶
type PersistMessageBatch struct { Instance string `json:"instance"` SavedChunks []SavedChunk `json:"saved_chunks"` }
type ReorderBuffer ¶
type ReorderBuffer struct {
// contains filtered or unexported fields
}
ReorderBuffer keeps a window of data during which it is ok to send data out of order. The reorder buffer itself is not thread safe because it is only used by AggMetric, which is thread safe, so there is no locking in the buffer.
newest=0 may mean no points added yet, or newest point is at position 0. we use the Ts of points in the buffer to check for valid points. Ts == 0 means no point in particular newest.Ts == 0 means the buffer is empty the buffer is evenly spaced (points are `interval` apart) and may be sparsely populated
func NewReorderBuffer ¶
func NewReorderBuffer(reorderWindow, interval uint32) *ReorderBuffer
func (*ReorderBuffer) Add ¶
Add adds the point if it falls within the window. it returns points that have been purged out of the buffer, as well as whether the add succeeded.
func (*ReorderBuffer) Flush ¶
func (rob *ReorderBuffer) Flush() []schema.Point
func (*ReorderBuffer) Get ¶
func (rob *ReorderBuffer) Get() []schema.Point
Get returns the points in the buffer
func (*ReorderBuffer) IsEmpty ¶
func (rob *ReorderBuffer) IsEmpty() bool
func (*ReorderBuffer) Reset ¶
func (rob *ReorderBuffer) Reset()
type SavedChunk ¶
SavedChunk represents a chunk persisted to the store Key is a stringified schema.AMKey
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
package chunk encodes timeseries in chunks of data see devdocs/chunk-format.md for more information.
|
package chunk encodes timeseries in chunks of data see devdocs/chunk-format.md for more information. |
tsz
Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info
|
Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info Package tsz implements time-series compression it is a fork of https://github.com/dgryski/go-tsz which implements http://www.vldb.org/pvldb/vol8/p1816-teller.pdf see devdocs/chunk-format.md for more info |