Documentation ¶
Overview ¶
Package tsdb implements a time series storage for float64 sample data.
Example ¶
package main import ( "context" "fmt" "math" "os" "time" "github.com/emmalidtdg/prometheus/model/labels" "github.com/emmalidtdg/prometheus/tsdb/chunkenc" ) func main() { // Create a random dir to work in. Open() doesn't require a pre-existing dir, but // we want to make sure not to make a mess where we shouldn't. dir, err := os.MkdirTemp("", "tsdb-test") noErr(err) // Open a TSDB for reading and/or writing. db, err := Open(dir, nil, nil, DefaultOptions(), nil) noErr(err) // Open an appender for writing. app := db.Appender(context.Background()) series := labels.FromStrings("foo", "bar") // Ref is 0 for the first append since we don't know the reference for the series. ref, err := app.Append(0, series, time.Now().Unix(), 123) noErr(err) // Another append for a second later. // Re-using the ref from above since it's the same series, makes append faster. time.Sleep(time.Second) _, err = app.Append(ref, series, time.Now().Unix(), 124) noErr(err) // Commit to storage. err = app.Commit() noErr(err) // In case you want to do more appends after app.Commit(), // you need a new appender. app = db.Appender(context.Background()) // ... adding more samples. // Open a querier for reading. querier, err := db.Querier(math.MinInt64, math.MaxInt64) noErr(err) ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) for ss.Next() { series := ss.At() fmt.Println("series:", series.Labels().String()) it := series.Iterator(nil) for it.Next() == chunkenc.ValFloat { _, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below) fmt.Println("sample", v) } fmt.Println("it.Err():", it.Err()) } fmt.Println("ss.Err():", ss.Err()) ws := ss.Warnings() if len(ws) > 0 { fmt.Println("warnings:", ws) } err = querier.Close() noErr(err) // Clean up any last resources when done. err = db.Close() noErr(err) err = os.RemoveAll(dir) noErr(err) } func noErr(err error) { if err != nil { panic(err) } }
Output: series: {foo="bar"} sample 123 sample 124 it.Err(): <nil> ss.Err(): <nil>
Index ¶
- Constants
- Variables
- func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})
- func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})
- func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error)
- func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error
- func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64
- func LastChunkSnapshot(dir string) (string, int, int, error)
- func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)
- func NewBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombstones.Reader, ...) storage.ChunkSeriesSet
- func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error)
- func NewMergedStringIter(a, b index.StringIter) index.StringIter
- func PostingsForMatchers(ctx context.Context, ix IndexReader, ms ...*labels.Matcher) (index.Postings, error)
- type Block
- func (pb *Block) Chunks() (ChunkReader, error)
- func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error)
- func (pb *Block) Close() error
- func (pb *Block) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error
- func (pb *Block) Dir() string
- func (pb *Block) GetSymbolTableSize() uint64
- func (pb *Block) Index() (IndexReader, error)
- func (pb *Block) LabelNames(ctx context.Context) ([]string, error)
- func (pb *Block) MaxTime() int64
- func (pb *Block) Meta() BlockMeta
- func (pb *Block) MinTime() int64
- func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool
- func (pb *Block) Size() int64
- func (pb *Block) Snapshot(dir string) error
- func (pb *Block) String() string
- func (pb *Block) Tombstones() (tombstones.Reader, error)
- type BlockDesc
- type BlockMeta
- type BlockMetaCompaction
- type BlockPopulator
- type BlockReader
- type BlockStats
- type BlockWriter
- type BlocksToDeleteFunc
- type ChunkReader
- type ChunkSnapshotStats
- type ChunkWriter
- type CircularExemplarStorage
- func (ce *CircularExemplarStorage) AddExemplar(l labels.Labels, e exemplar.Exemplar) error
- func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage
- func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error
- func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error)
- func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
- func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error)
- func (ce *CircularExemplarStorage) Resize(l int64) int
- func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)
- func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error
- type Compactor
- type CompactorMetrics
- type DB
- func (db *DB) Appender(ctx context.Context) storage.Appender
- func (db *DB) ApplyConfig(conf *config.Config) error
- func (db *DB) Blocks() []*Block
- func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
- func (db *DB) CleanTombstones() (err error)
- func (db *DB) Close() error
- func (db *DB) Compact(ctx context.Context) (returnErr error)
- func (db *DB) CompactHead(head *RangeHead) error
- func (db *DB) CompactOOOHead(ctx context.Context) error
- func (db *DB) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error
- func (db *DB) Dir() string
- func (db *DB) DisableCompactions()
- func (db *DB) DisableNativeHistograms()
- func (db *DB) EnableCompactions()
- func (db *DB) EnableNativeHistograms()
- func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)
- func (db *DB) ForceHeadMMap()
- func (db *DB) Head() *Head
- func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error)
- func (db *DB) SetWriteNotified(wn wlog.WriteNotified)
- func (db *DB) Snapshot(dir string, withHead bool) error
- func (db *DB) StartTime() (int64, error)
- func (db *DB) String() string
- type DBReadOnly
- func (db *DBReadOnly) Block(blockID string) (BlockReader, error)
- func (db *DBReadOnly) Blocks() ([]BlockReader, error)
- func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
- func (db *DBReadOnly) Close() error
- func (db *DBReadOnly) FlushWAL(dir string) (returnErr error)
- func (db *DBReadOnly) LastBlockID() (string, error)
- func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error)
- type DBStats
- type DefaultBlockPopulator
- type DeletedIterator
- func (it *DeletedIterator) At() (int64, float64)
- func (it *DeletedIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram)
- func (it *DeletedIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram)
- func (it *DeletedIterator) AtT() int64
- func (it *DeletedIterator) Err() error
- func (it *DeletedIterator) Next() chunkenc.ValueType
- func (it *DeletedIterator) Seek(t int64) chunkenc.ValueType
- type ExemplarMetrics
- type ExemplarStorage
- type Head
- func (h *Head) AppendableMinValidTime() (int64, bool)
- func (h *Head) Appender(_ context.Context) storage.Appender
- func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL)
- func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error)
- func (h *Head) Chunks() (ChunkReader, error)
- func (h *Head) Close() error
- func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error
- func (h *Head) DisableNativeHistograms()
- func (h *Head) EnableNativeHistograms()
- func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)
- func (h *Head) Index() (IndexReader, error)
- func (h *Head) Init(minValidTime int64) error
- func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64)
- func (h *Head) MaxOOOTime() int64
- func (h *Head) MaxTime() int64
- func (h *Head) Meta() BlockMeta
- func (h *Head) MinOOOTime() int64
- func (h *Head) MinTime() int64
- func (h *Head) NumSeries() uint64
- func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool
- func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats
- func (h *Head) SetMinValidTime(minValidTime int64)
- func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wlog.WL)
- func (h *Head) Size() int64
- func (h *Head) Stats(statsByLabelName string, limit int) *Stats
- func (h *Head) String() string
- func (h *Head) Tombstones() (tombstones.Reader, error)
- func (h *Head) Truncate(mint int64) (err error)
- func (h *Head) WaitForAppendersOverlapping(maxt int64)
- func (h *Head) WaitForPendingReadersForOOOChunksAtOrBefore(chunk chunks.ChunkDiskMapperRef)
- func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64)
- type HeadOptions
- type HeadStats
- type IndexReader
- type IndexWriter
- type LeveledCompactor
- func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, ...) (*LeveledCompactor, error)
- func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, ...) (*LeveledCompactor, error)
- func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, ...) (*LeveledCompactor, error)
- func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error)
- func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error)
- func (c *LeveledCompactor) Plan(dir string) ([]string, error)
- func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error)
- type LeveledCompactorOptions
- type OOOChunk
- type OOOCompactionHead
- func (ch *OOOCompactionHead) ChunkRange() int64
- func (ch *OOOCompactionHead) Chunks() (ChunkReader, error)
- func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead
- func (ch *OOOCompactionHead) Index() (IndexReader, error)
- func (ch *OOOCompactionHead) LastMmapRef() chunks.ChunkDiskMapperRef
- func (ch *OOOCompactionHead) LastWBLFile() int
- func (ch *OOOCompactionHead) MaxTime() int64
- func (ch *OOOCompactionHead) Meta() BlockMeta
- func (ch *OOOCompactionHead) MinTime() int64
- func (ch *OOOCompactionHead) Size() int64
- func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error)
- type OOOCompactionHeadIndexReader
- func (ir *OOOCompactionHeadIndexReader) Close() error
- func (ir *OOOCompactionHeadIndexReader) LabelNames(context.Context, ...*labels.Matcher) ([]string, error)
- func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error)
- func (ir *OOOCompactionHeadIndexReader) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error)
- func (ir *OOOCompactionHeadIndexReader) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
- func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error)
- func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error)
- func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error
- func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings
- func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
- func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings
- func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter
- type OOOHeadChunkReader
- type OOOHeadIndexReader
- func (h OOOHeadIndexReader) Close() error
- func (h OOOHeadIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error)
- func (h OOOHeadIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error)
- func (h OOOHeadIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error)
- func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
- func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error)
- func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error
- func (h OOOHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings
- func (h OOOHeadIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
- func (h OOOHeadIndexReader) SortedPostings(p index.Postings) index.Postings
- func (h OOOHeadIndexReader) Symbols() index.StringIter
- type OOORangeHead
- func (oh *OOORangeHead) Chunks() (ChunkReader, error)
- func (oh *OOORangeHead) Index() (IndexReader, error)
- func (oh *OOORangeHead) MaxTime() int64
- func (oh *OOORangeHead) Meta() BlockMeta
- func (oh *OOORangeHead) MinTime() int64
- func (oh *OOORangeHead) Size() int64
- func (oh *OOORangeHead) String() string
- func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error)
- type Options
- type Overlaps
- type RangeHead
- func (h *RangeHead) BlockMaxTime() int64
- func (h *RangeHead) Chunks() (ChunkReader, error)
- func (h *RangeHead) Index() (IndexReader, error)
- func (h *RangeHead) MaxTime() int64
- func (h *RangeHead) Meta() BlockMeta
- func (h *RangeHead) MinTime() int64
- func (h *RangeHead) NumSeries() uint64
- func (h *RangeHead) Size() int64
- func (h *RangeHead) String() string
- func (h *RangeHead) Tombstones() (tombstones.Reader, error)
- type SeriesLifecycleCallback
- type Stats
- type TimeRange
- type WALReplayStatus
Examples ¶
Constants ¶
const ( // DefaultOutOfOrderCapMax is the default maximum size of an in-memory out-of-order chunk. DefaultOutOfOrderCapMax int64 = 32 // DefaultSamplesPerChunk provides a default target number of samples per chunk. DefaultSamplesPerChunk = 120 )
const ( // CompactionHintFromOutOfOrder is a hint noting that the block // was created from out-of-order chunks. CompactionHintFromOutOfOrder = "from-out-of-order" )
const ( // Default duration of a block in milliseconds. DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond) )
const (
// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
DefaultStripeSize = 1 << 14
)
Variables ¶
var ( // ErrInvalidSample is returned if an appended sample is not valid and can't // be ingested. ErrInvalidSample = errors.New("invalid sample") // ErrInvalidExemplar is returned if an appended exemplar is not valid and can't // be ingested. ErrInvalidExemplar = errors.New("invalid exemplar") // ErrAppenderClosed is returned if an appender has already be successfully // rolled back or committed. ErrAppenderClosed = errors.New("appender closed") )
var ErrClosed = errors.New("db already closed")
ErrClosed is returned when the db is closed.
var ErrClosing = errors.New("block is closing")
ErrClosing is returned when a block is in the process of being closed.
var ErrInvalidTimes = fmt.Errorf("max time is lesser than min time")
var ErrNoSeriesAppended = errors.New("no series appended, aborting")
ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.
var ErrNotReady = errors.New("TSDB not ready")
ErrNotReady is returned if the underlying storage is not ready yet.
Functions ¶
func BeyondSizeRetention ¶
BeyondSizeRetention returns those blocks which are beyond the size retention set in the db options.
func BeyondTimeRetention ¶
BeyondTimeRetention returns those blocks which are beyond the time retention set in the db options.
func CreateBlock ¶
func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error)
CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk.
func DeleteChunkSnapshots ¶
DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
func ExponentialBlockRanges ¶
ExponentialBlockRanges returns the time ranges based on the stepSize.
func LastChunkSnapshot ¶
LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. If dir does not contain any chunk snapshots, ErrNotFound is returned.
func NewBlockChunkQuerier ¶
func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)
NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.
func NewBlockChunkSeriesSet ¶
func NewBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet
func NewBlockQuerier ¶
func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error)
NewBlockQuerier returns a querier against the block reader and requested min and max time range.
func NewMergedStringIter ¶
func NewMergedStringIter(a, b index.StringIter) index.StringIter
NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.
func PostingsForMatchers ¶
func PostingsForMatchers(ctx context.Context, ix IndexReader, ms ...*labels.Matcher) (index.Postings, error)
PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.
Types ¶
type Block ¶
type Block struct {
// contains filtered or unexported fields
}
Block represents a directory of time series data covering a continuous time range.
func OpenBlock ¶
OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used to instantiate chunk structs.
func (*Block) Chunks ¶
func (pb *Block) Chunks() (ChunkReader, error)
Chunks returns a new ChunkReader against the block data.
func (*Block) CleanTombstones ¶
CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). If there was a rewrite, then it returns the ULID of the new block written, else nil. If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID. It returns a boolean indicating if the parent block can be deleted safely of not.
func (*Block) Close ¶
Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (*Block) GetSymbolTableSize ¶
GetSymbolTableSize returns the Symbol Table Size in the index of this block.
func (*Block) Index ¶
func (pb *Block) Index() (IndexReader, error)
Index returns a new IndexReader against the block data.
func (*Block) LabelNames ¶
LabelNames returns all the unique label names present in the Block in sorted order.
func (*Block) OverlapsClosedInterval ¶
OverlapsClosedInterval returns true if the block overlaps [mint, maxt].
func (*Block) Tombstones ¶
func (pb *Block) Tombstones() (tombstones.Reader, error)
Tombstones returns a new TombstoneReader against the block data.
type BlockDesc ¶
type BlockDesc struct { ULID ulid.ULID `json:"ulid"` MinTime int64 `json:"minTime"` MaxTime int64 `json:"maxTime"` }
BlockDesc describes a block by ULID and time range.
type BlockMeta ¶
type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. ULID ulid.ULID `json:"ulid"` // MinTime and MaxTime specify the time range all samples // in the block are in. MinTime int64 `json:"minTime"` MaxTime int64 `json:"maxTime"` // Stats about the contents of the block. Stats BlockStats `json:"stats,omitempty"` // Information on compactions the block was created from. Compaction BlockMetaCompaction `json:"compaction"` // Version of the index format. Version int `json:"version"` }
BlockMeta provides meta information about a block.
type BlockMetaCompaction ¶
type BlockMetaCompaction struct { // Maximum number of compaction cycles any source block has // gone through. Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` // Indicates that during compaction it resulted in a block without any samples // so it should be deleted on the next reloadBlocks. Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` Failed bool `json:"failed,omitempty"` // Additional information about the compaction, for example, block created from out-of-order chunks. Hints []string `json:"hints,omitempty"` }
BlockMetaCompaction holds information about compactions a block went through.
func (*BlockMetaCompaction) FromOutOfOrder ¶
func (bm *BlockMetaCompaction) FromOutOfOrder() bool
func (*BlockMetaCompaction) SetOutOfOrder ¶
func (bm *BlockMetaCompaction) SetOutOfOrder()
type BlockPopulator ¶
type BlockPopulator interface {
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
}
type BlockReader ¶
type BlockReader interface { // Index returns an IndexReader over the block's data. Index() (IndexReader, error) // Chunks returns a ChunkReader over the block's data. Chunks() (ChunkReader, error) // Tombstones returns a tombstones.Reader over the block's deleted data. Tombstones() (tombstones.Reader, error) // Meta provides meta information about the block reader. Meta() BlockMeta // Size returns the number of bytes that the block takes up on disk. Size() int64 }
BlockReader provides reading access to a data block.
type BlockStats ¶
type BlockStats struct { NumSamples uint64 `json:"numSamples,omitempty"` NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` }
BlockStats contains stats about contents of a block.
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter is a block writer that allows appending and flushing series to disk.
func NewBlockWriter ¶
NewBlockWriter create a new block writer.
The returned writer accumulates all the series in the Head block until `Flush` is called.
Note that the writer will not check if the target directory exists or contains anything at all. It is the caller's responsibility to ensure that the resulting blocks do not overlap etc. Writer ensures the block flush is atomic (via rename).
func (*BlockWriter) Appender ¶
func (w *BlockWriter) Appender(ctx context.Context) storage.Appender
Appender returns a new appender on the database. Appender can't be called concurrently. However, the returned Appender can safely be used concurrently.
func (*BlockWriter) Close ¶
func (w *BlockWriter) Close() error
type BlocksToDeleteFunc ¶
func DefaultBlocksToDelete ¶
func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc
DefaultBlocksToDelete returns a filter which decides time based and size based retention from the options of the db.
type ChunkReader ¶
type ChunkReader interface { // ChunkOrIterable returns the series data for the given chunks.Meta. // Either a single chunk will be returned, or an iterable. // A single chunk should be returned if chunks.Meta maps to a chunk that // already exists and doesn't need modifications. // An iterable should be returned if chunks.Meta maps to a subset of the // samples in a stored chunk, or multiple chunks. (E.g. OOOHeadChunkReader // could return an iterable where multiple histogram samples have counter // resets. There can only be one counter reset per histogram chunk so // multiple chunks would be created from the iterable in this case.) // Only one of chunk or iterable should be returned. In some cases you may // always expect a chunk to be returned. You can check that iterable is nil // in those cases. ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) // Close releases all underlying resources of the reader. Close() error }
ChunkReader provides reading access of serialized time series data.
type ChunkSnapshotStats ¶
ChunkSnapshotStats returns stats about a created chunk snapshot.
type ChunkWriter ¶
type ChunkWriter interface { // WriteChunks writes several chunks. The Chunk field of the ChunkMetas // must be populated. // After returning successfully, the Ref fields in the ChunkMetas // are set and can be used to retrieve the chunks from the written data. WriteChunks(chunks ...chunks.Meta) error // Close writes any required finalization and closes the resources // associated with the underlying writer. Close() error }
ChunkWriter serializes a time block of chunked series data.
type CircularExemplarStorage ¶
type CircularExemplarStorage struct {
// contains filtered or unexported fields
}
func (*CircularExemplarStorage) AddExemplar ¶
func (*CircularExemplarStorage) Appender ¶
func (ce *CircularExemplarStorage) Appender() *CircularExemplarStorage
func (*CircularExemplarStorage) ApplyConfig ¶
func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error
func (*CircularExemplarStorage) ExemplarQuerier ¶
func (ce *CircularExemplarStorage) ExemplarQuerier(_ context.Context) (storage.ExemplarQuerier, error)
func (*CircularExemplarStorage) IterateExemplars ¶
func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
IterateExemplars iterates through all the exemplars from oldest to newest appended and calls the given function on all of them till the end (or) till the first function call that returns an error.
func (*CircularExemplarStorage) Querier ¶
func (ce *CircularExemplarStorage) Querier(_ context.Context) (storage.ExemplarQuerier, error)
func (*CircularExemplarStorage) Resize ¶
func (ce *CircularExemplarStorage) Resize(l int64) int
Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.
func (*CircularExemplarStorage) Select ¶
func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)
Select returns exemplars for a given set of label matchers.
func (*CircularExemplarStorage) ValidateExemplar ¶
type Compactor ¶
type Compactor interface { // Plan returns a set of directories that can be compacted concurrently. // The directories can be overlapping. // Results returned when compactions are in progress are undefined. Plan(dir string) ([]string, error) // Write persists a Block into a directory. // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. // When resulting Block has 0 samples // * No block is written. // * The source dirs are marked Deletable. // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) }
Compactor provides compaction against an underlying storage of time series data.
type CompactorMetrics ¶
type CompactorMetrics struct { Ran prometheus.Counter PopulatingBlocks prometheus.Gauge OverlappingBlocks prometheus.Counter Duration prometheus.Histogram ChunkSize prometheus.Histogram ChunkSamples prometheus.Histogram ChunkRange prometheus.Histogram }
func NewCompactorMetrics ¶
func NewCompactorMetrics(r prometheus.Registerer) *CompactorMetrics
NewCompactorMetrics initializes metrics for Compactor.
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB handles reads and writes of time series falling into a hashed partition of a seriedb.
func Open ¶
func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, stats *DBStats) (db *DB, err error)
Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.
func (*DB) ApplyConfig ¶
ApplyConfig applies a new config to the DB. Behaviour of 'OutOfOrderTimeWindow' is as follows: OOO enabled = oooTimeWindow > 0. OOO disabled = oooTimeWindow is 0. 1) Before: OOO disabled, Now: OOO enabled =>
- A new WBL is created for the head block.
- OOO compaction is enabled.
- Overlapping queries are enabled.
2) Before: OOO enabled, Now: OOO enabled =>
- Only the time window is updated.
3) Before: OOO enabled, Now: OOO disabled =>
- Time Window set to 0. So no new OOO samples will be allowed.
- OOO WBL will stay and will be eventually cleaned up.
- OOO Compaction and overlapping queries will remain enabled until a restart or until all OOO samples are compacted.
4) Before: OOO disabled, Now: OOO disabled => no-op.
func (*DB) ChunkQuerier ¶
func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
ChunkQuerier returns a new chunk querier over the data partition for the given time range.
func (*DB) CleanTombstones ¶
CleanTombstones re-writes any blocks with tombstones.
func (*DB) Compact ¶
Compact data if possible. After successful compaction blocks are reloaded which will also delete the blocks that fall out of the retention window. Old blocks are only deleted on reloadBlocks based on the new block's parent information. See DB.reloadBlocks documentation for further information.
func (*DB) CompactHead ¶
CompactHead compacts the given RangeHead.
func (*DB) CompactOOOHead ¶
CompactOOOHead compacts the OOO Head.
func (*DB) Delete ¶
Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
func (*DB) DisableCompactions ¶
func (db *DB) DisableCompactions()
DisableCompactions disables auto compactions.
func (*DB) DisableNativeHistograms ¶
func (db *DB) DisableNativeHistograms()
DisableNativeHistograms disables the native histogram feature.
func (*DB) EnableCompactions ¶
func (db *DB) EnableCompactions()
EnableCompactions enables auto compactions.
func (*DB) EnableNativeHistograms ¶
func (db *DB) EnableNativeHistograms()
EnableNativeHistograms enables the native histogram feature.
func (*DB) ExemplarQuerier ¶
func (*DB) ForceHeadMMap ¶
func (db *DB) ForceHeadMMap()
ForceHeadMMap is intended for use only in tests and benchmarks.
func (*DB) Querier ¶
Querier returns a new querier over the data partition for the given time range.
func (*DB) SetWriteNotified ¶
func (db *DB) SetWriteNotified(wn wlog.WriteNotified)
func (*DB) Snapshot ¶
Snapshot writes the current data to the directory. If withHead is set to true it will create a new block containing all data that's currently in the memory buffer/WAL.
type DBReadOnly ¶
type DBReadOnly struct {
// contains filtered or unexported fields
}
DBReadOnly provides APIs for read only operations on a database. Current implementation doesn't support concurrency so all API calls should happen in the same go routine.
func OpenDBReadOnly ¶
func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error)
OpenDBReadOnly opens DB in the given directory for read only operations.
func (*DBReadOnly) Block ¶
func (db *DBReadOnly) Block(blockID string) (BlockReader, error)
Block returns a block reader by given block id.
func (*DBReadOnly) Blocks ¶
func (db *DBReadOnly) Blocks() ([]BlockReader, error)
Blocks returns a slice of block readers for persisted blocks.
func (*DBReadOnly) ChunkQuerier ¶
func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. Current implementation doesn't support multiple ChunkQueriers.
func (*DBReadOnly) FlushWAL ¶
func (db *DBReadOnly) FlushWAL(dir string) (returnErr error)
FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. Samples that are in existing blocks will not be written to the new block. Note that if the read only database is running concurrently with a writable database then writing the WAL to the database directory can race.
func (*DBReadOnly) LastBlockID ¶
func (db *DBReadOnly) LastBlockID() (string, error)
LastBlockID returns the BlockID of latest block.
type DBStats ¶
type DBStats struct {
Head *HeadStats
}
DBStats contains statistics about the DB separated by component (eg. head). They are available before the DB has finished initializing.
func NewDBStats ¶
func NewDBStats() *DBStats
NewDBStats returns a new DBStats object initialized using the new function from each component.
type DefaultBlockPopulator ¶
type DefaultBlockPopulator struct{}
func (DefaultBlockPopulator) PopulateBlock ¶
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error)
PopulateBlock fills the index and chunk writers with new data gathered as the union of the provided blocks. It returns meta information for the new block. It expects sorted blocks input by mint.
type DeletedIterator ¶
type DeletedIterator struct { // Iter is an Iterator to be wrapped. Iter chunkenc.Iterator // Intervals are the deletion intervals. Intervals tombstones.Intervals }
DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.
func (*DeletedIterator) At ¶
func (it *DeletedIterator) At() (int64, float64)
func (*DeletedIterator) AtFloatHistogram ¶
func (it *DeletedIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram)
func (*DeletedIterator) AtHistogram ¶
func (*DeletedIterator) AtT ¶
func (it *DeletedIterator) AtT() int64
func (*DeletedIterator) Err ¶
func (it *DeletedIterator) Err() error
func (*DeletedIterator) Next ¶
func (it *DeletedIterator) Next() chunkenc.ValueType
type ExemplarMetrics ¶
type ExemplarMetrics struct {
// contains filtered or unexported fields
}
func NewExemplarMetrics ¶
func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics
type ExemplarStorage ¶
type ExemplarStorage interface { storage.ExemplarQueryable AddExemplar(labels.Labels, exemplar.Exemplar) error ValidateExemplar(labels.Labels, exemplar.Exemplar) error IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error }
func NewCircularExemplarStorage ¶
func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error)
NewCircularExemplarStorage creates an circular in memory exemplar storage. If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in 1GB of extra memory, accounting for the fact that this is heap allocated space. If len <= 0, then the exemplar storage is essentially a noop storage but can later be resized to store exemplars.
type Head ¶
type Head struct {
// contains filtered or unexported fields
}
Head handles reads and writes of time series data within a time window.
func NewHead ¶
func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *HeadOptions, stats *HeadStats) (*Head, error)
NewHead opens the head block in dir.
func (*Head) AppendableMinValidTime ¶
AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head. Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.
func (*Head) ChunkSnapshot ¶
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error)
ChunkSnapshot creates a snapshot of all the series and tombstones in the head. It deletes the old chunk snapshots if the chunk snapshot creation is successful.
The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written using the WAL package. N is the last WAL segment present during snapshotting and M is the offset in segment N upto which data was written.
The snapshot first contains all series (each in individual records and not sorted), followed by tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they were written to the circular buffer.
func (*Head) Chunks ¶
func (h *Head) Chunks() (ChunkReader, error)
Chunks returns a ChunkReader against the block.
func (*Head) Close ¶
Close flushes the WAL and closes the head. It also takes a snapshot of in-memory chunks if enabled.
func (*Head) Delete ¶
Delete all samples in the range of [mint, maxt] for series that satisfy the given label matchers.
func (*Head) DisableNativeHistograms ¶
func (h *Head) DisableNativeHistograms()
DisableNativeHistograms disables the native histogram feature.
func (*Head) EnableNativeHistograms ¶
func (h *Head) EnableNativeHistograms()
EnableNativeHistograms enables the native histogram feature.
func (*Head) ExemplarQuerier ¶
func (*Head) Index ¶
func (h *Head) Index() (IndexReader, error)
Index returns an IndexReader against the block.
func (*Head) Init ¶
Init loads data from the write ahead log and prepares the head for writes. It should be called before using an appender so that it limits the ingested samples to the head min valid time.
func (*Head) IsQuerierCollidingWithTruncation ¶
func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64)
IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier has to be created. In the latter case, the method also returns the new mint to be used for creating the new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.
NOTE: The querier should already be taken before calling this.
func (*Head) MaxOOOTime ¶
MaxOOOTime returns the highest timestamp on visible data in the out of order head.
func (*Head) Meta ¶
Meta returns meta information about the head. The head is dynamic so will return dynamic results.
func (*Head) MinOOOTime ¶
MinOOOTime returns the lowest time bound on visible data in the out of order head.
func (*Head) OverlapsClosedInterval ¶
OverlapsClosedInterval returns true if the head overlaps [mint, maxt].
func (*Head) PostingsCardinalityStats ¶
func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats
PostingsCardinalityStats returns highest cardinality stats by label and value names.
func (*Head) SetMinValidTime ¶
SetMinValidTime sets the minimum timestamp the head can ingest.
func (*Head) SetOutOfOrderTimeWindow ¶
SetOutOfOrderTimeWindow updates the out of order related parameters. If the Head already has a WBL set, then the wbl will be ignored.
func (*Head) Stats ¶
Stats returns important current HEAD statistics. Note that it is expensive to calculate these.
func (*Head) String ¶
String returns an human readable representation of the TSDB head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.
func (*Head) Tombstones ¶
func (h *Head) Tombstones() (tombstones.Reader, error)
Tombstones returns a new reader over the head's tombstones.
func (*Head) WaitForAppendersOverlapping ¶
WaitForAppendersOverlapping waits for appends overlapping maxt to finish.
func (*Head) WaitForPendingReadersForOOOChunksAtOrBefore ¶
func (h *Head) WaitForPendingReadersForOOOChunksAtOrBefore(chunk chunks.ChunkDiskMapperRef)
WaitForPendingReadersForOOOChunksAtOrBefore is like WaitForPendingReadersInTimeRange, except it waits for queries touching OOO chunks less than or equal to chunk to finish querying.
func (*Head) WaitForPendingReadersInTimeRange ¶
WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. The query timeout limits the max wait time of this function implicitly. The mint is inclusive and maxt is the truncation time hence exclusive.
type HeadOptions ¶
type HeadOptions struct { // Runtime reloadable option. At the top of the struct for 32 bit OS: // https://pkg.go.dev/sync/atomic#pkg-note-BUG MaxExemplars atomic.Int64 OutOfOrderTimeWindow atomic.Int64 OutOfOrderCapMax atomic.Int64 // EnableNativeHistograms enables the ingestion of native histograms. EnableNativeHistograms atomic.Bool // EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample. // See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md EnableCreatedTimestampZeroIngestion bool ChunkRange int64 // ChunkDirRoot is the parent directory of the chunks directory. ChunkDirRoot string ChunkPool chunkenc.Pool ChunkWriteBufferSize int ChunkWriteQueueSize int SamplesPerChunk int // StripeSize sets the number of entries in the hash map, it must be a power of 2. // A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series. // A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series. StripeSize int SeriesCallback SeriesLifecycleCallback EnableExemplarStorage bool EnableMemorySnapshotOnShutdown bool IsolationDisabled bool // Maximum number of CPUs that can simultaneously processes WAL replay. // The default value is GOMAXPROCS. // If it is set to a negative value or zero, the default value is used. WALReplayConcurrency int // EnableSharding enables ShardedPostings() support in the Head. EnableSharding bool }
HeadOptions are parameters for the Head block.
func DefaultHeadOptions ¶
func DefaultHeadOptions() *HeadOptions
type HeadStats ¶
type HeadStats struct {
WALReplayStatus *WALReplayStatus
}
HeadStats are the statistics for the head component of the DB.
type IndexReader ¶
type IndexReader interface { // Symbols return an iterator over sorted string symbols that may occur in // series' labels and indices. It is not safe to use the returned strings // beyond the lifetime of the index reader. Symbols() index.StringIter // SortedLabelValues returns sorted possible label values. SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) // LabelValues returns possible label values which may not be sorted. LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) // Postings returns the postings list iterator for the label pairs. // The Postings here contain the offsets to the series inside the index. // Found IDs are not strictly required to point to a valid Series, e.g. // during background garbage collections. Postings(ctx context.Context, name string, values ...string) (index.Postings, error) // SortedPostings returns a postings list that is reordered to be sorted // by the label set of the underlying series. SortedPostings(index.Postings) index.Postings // ShardedPostings returns a postings list filtered by the provided shardIndex // out of shardCount. For a given posting, its shard MUST be computed hashing // the series labels mod shardCount, using a hash function which is consistent over time. ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings // Series populates the given builder and chunk metas for the series identified // by the reference. // Returns storage.ErrNotFound if the ref does not resolve to a known series. Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error // LabelNames returns all the unique label names present in the index in sorted order. LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error) // LabelValueFor returns label value for the given label name in the series referred to by ID. // If the series couldn't be found or the series doesn't have the requested label a // storage.ErrNotFound is returned as error. LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error) // LabelNamesFor returns all the label names for the series referred to by IDs. // The names returned are sorted. LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error) // Close releases the underlying resources of the reader. Close() error }
IndexReader provides reading access of serialized index data.
func NewOOOCompactionHeadIndexReader ¶
func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader
type IndexWriter ¶
type IndexWriter interface { // AddSymbol registers a single symbol. // Symbols must be registered in sorted order. AddSymbol(sym string) error // AddSeries populates the index writer with a series and its offsets // of chunks that the index can reference. // Implementations may require series to be insert in strictly increasing order by // their labels. The reference numbers are used to resolve entries in postings lists // that are added later. AddSeries(ref storage.SeriesRef, l labels.Labels, chunks ...chunks.Meta) error // Close writes any finalization and closes the resources associated with // the underlying writer. Close() error }
IndexWriter serializes the index for a block of series data. The methods must be called in the order they are specified in.
type LeveledCompactor ¶
type LeveledCompactor struct {
// contains filtered or unexported fields
}
LeveledCompactor implements the Compactor interface.
func NewLeveledCompactor ¶
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)
func NewLeveledCompactorWithChunkSize ¶
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)
func NewLeveledCompactorWithOptions ¶
func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error)
func (*LeveledCompactor) Compact ¶
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error)
Compact creates a new block in the compactor's directory from the blocks in the provided directories.
func (*LeveledCompactor) CompactWithBlockPopulator ¶
func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error)
func (*LeveledCompactor) Plan ¶
func (c *LeveledCompactor) Plan(dir string) ([]string, error)
Plan returns a list of compactable blocks in the provided directory.
func (*LeveledCompactor) Write ¶
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) (ulid.ULID, error)
type LeveledCompactorOptions ¶
type LeveledCompactorOptions struct { // PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction. // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more. PE index.PostingsEncoder // MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used. MaxBlockChunkSegmentSize int64 // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. MergeFunc storage.VerticalChunkSeriesMergeFunc // EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled. // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. EnableOverlappingCompaction bool }
type OOOChunk ¶
type OOOChunk struct {
// contains filtered or unexported fields
}
OOOChunk maintains samples in time-ascending order. Inserts for timestamps already seen, are dropped. Samples are stored uncompressed to allow easy sorting. Perhaps we can be more efficient later.
func NewOOOChunk ¶
func NewOOOChunk() *OOOChunk
func (*OOOChunk) Insert ¶
Insert inserts the sample such that order is maintained. Returns false if insert was not possible due to the same timestamp already existing.
func (*OOOChunk) NumSamples ¶
type OOOCompactionHead ¶
type OOOCompactionHead struct {
// contains filtered or unexported fields
}
func NewOOOCompactionHead ¶
func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error)
NewOOOCompactionHead does the following: 1. M-maps all the in-memory ooo chunks. 2. Compute the expected block ranges while iterating through all ooo series and store it. 3. Store the list of postings having ooo series. 4. Cuts a new WBL file for the OOO WBL. All the above together have a bit of CPU and memory overhead, and can have a bit of impact on the sample append latency. So call NewOOOCompactionHead only right before compaction.
func (*OOOCompactionHead) ChunkRange ¶
func (ch *OOOCompactionHead) ChunkRange() int64
func (*OOOCompactionHead) Chunks ¶
func (ch *OOOCompactionHead) Chunks() (ChunkReader, error)
func (*OOOCompactionHead) CloneForTimeRange ¶
func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead
CloneForTimeRange clones the OOOCompactionHead such that the IndexReader and ChunkReader obtained from this only looks at the m-map chunks within the given time ranges while not looking beyond the ch.lastMmapRef. Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.
func (*OOOCompactionHead) Index ¶
func (ch *OOOCompactionHead) Index() (IndexReader, error)
func (*OOOCompactionHead) LastMmapRef ¶
func (ch *OOOCompactionHead) LastMmapRef() chunks.ChunkDiskMapperRef
func (*OOOCompactionHead) LastWBLFile ¶
func (ch *OOOCompactionHead) LastWBLFile() int
func (*OOOCompactionHead) MaxTime ¶
func (ch *OOOCompactionHead) MaxTime() int64
func (*OOOCompactionHead) Meta ¶
func (ch *OOOCompactionHead) Meta() BlockMeta
func (*OOOCompactionHead) MinTime ¶
func (ch *OOOCompactionHead) MinTime() int64
func (*OOOCompactionHead) Size ¶
func (ch *OOOCompactionHead) Size() int64
func (*OOOCompactionHead) Tombstones ¶
func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error)
type OOOCompactionHeadIndexReader ¶
type OOOCompactionHeadIndexReader struct {
// contains filtered or unexported fields
}
func (*OOOCompactionHeadIndexReader) Close ¶
func (ir *OOOCompactionHeadIndexReader) Close() error
func (*OOOCompactionHeadIndexReader) LabelNames ¶
func (*OOOCompactionHeadIndexReader) LabelNamesFor ¶
func (*OOOCompactionHeadIndexReader) LabelValueFor ¶
func (*OOOCompactionHeadIndexReader) LabelValues ¶
func (*OOOCompactionHeadIndexReader) PostingsForMatchers ¶
func (*OOOCompactionHeadIndexReader) Series ¶
func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error
func (*OOOCompactionHeadIndexReader) ShardedPostings ¶
func (*OOOCompactionHeadIndexReader) SortedLabelValues ¶
func (*OOOCompactionHeadIndexReader) SortedPostings ¶
func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings
func (*OOOCompactionHeadIndexReader) Symbols ¶
func (ir *OOOCompactionHeadIndexReader) Symbols() index.StringIter
type OOOHeadChunkReader ¶
type OOOHeadChunkReader struct {
// contains filtered or unexported fields
}
func NewOOOHeadChunkReader ¶
func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader
func (OOOHeadChunkReader) ChunkOrIterable ¶
func (OOOHeadChunkReader) Close ¶
func (cr OOOHeadChunkReader) Close() error
type OOOHeadIndexReader ¶
type OOOHeadIndexReader struct {
// contains filtered or unexported fields
}
OOOHeadIndexReader implements IndexReader so ooo samples in the head can be accessed. It also has a reference to headIndexReader so we can leverage on its IndexReader implementation for all the methods that remain the same. We decided to do this to avoid code duplication. The only methods that change are the ones about getting Series and Postings.
func NewOOOHeadIndexReader ¶
func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader
func (OOOHeadIndexReader) LabelNames ¶
func (h OOOHeadIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error)
LabelNames returns all the unique label names present in the head that are within the time range mint to maxt.
func (OOOHeadIndexReader) LabelNamesFor ¶
func (h OOOHeadIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error)
LabelNamesFor returns all the label names for the series referred to by IDs. The names returned are sorted.
func (OOOHeadIndexReader) LabelValueFor ¶
func (h OOOHeadIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error)
LabelValueFor returns label value for the given label name in the series referred to by ID.
func (*OOOHeadIndexReader) LabelValues ¶
func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
LabelValues needs to be overridden from the headIndexReader implementation due to the check that happens at the beginning where we make sure that the query interval overlaps with the head minooot and maxooot.
func (*OOOHeadIndexReader) Series ¶
func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error
func (OOOHeadIndexReader) ShardedPostings ¶
func (h OOOHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings
ShardedPostings implements IndexReader. This function returns an failing postings list if sharding has not been enabled in the Head.
func (OOOHeadIndexReader) SortedLabelValues ¶
func (h OOOHeadIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)
SortedLabelValues returns label values present in the head for the specific label name that are within the time range mint to maxt. If matchers are specified the returned result set is reduced to label values of metrics matching the matchers.
func (OOOHeadIndexReader) SortedPostings ¶
func (OOOHeadIndexReader) Symbols ¶
func (h OOOHeadIndexReader) Symbols() index.StringIter
type OOORangeHead ¶
type OOORangeHead struct {
// contains filtered or unexported fields
}
OOORangeHead allows querying Head out of order samples via BlockReader interface implementation.
func NewOOORangeHead ¶
func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead
func (*OOORangeHead) Chunks ¶
func (oh *OOORangeHead) Chunks() (ChunkReader, error)
func (*OOORangeHead) Index ¶
func (oh *OOORangeHead) Index() (IndexReader, error)
func (*OOORangeHead) MaxTime ¶
func (oh *OOORangeHead) MaxTime() int64
func (*OOORangeHead) Meta ¶
func (oh *OOORangeHead) Meta() BlockMeta
func (*OOORangeHead) MinTime ¶
func (oh *OOORangeHead) MinTime() int64
func (*OOORangeHead) Size ¶
func (oh *OOORangeHead) Size() int64
Size returns the size taken by the Head block.
func (*OOORangeHead) String ¶
func (oh *OOORangeHead) String() string
String returns an human readable representation of the out of order range head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.
func (*OOORangeHead) Tombstones ¶
func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error)
type Options ¶
type Options struct { // Segments (wal files) max size. // WALSegmentSize = 0, segment size is default size. // WALSegmentSize > 0, segment size is WALSegmentSize. // WALSegmentSize < 0, wal is disabled. WALSegmentSize int // MaxBlockChunkSegmentSize is the max size of block chunk segment files. // MaxBlockChunkSegmentSize = 0, chunk segment size is default size. // MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize. MaxBlockChunkSegmentSize int64 // Duration of persisted data to keep. // Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration. // Typically it is in milliseconds. RetentionDuration int64 // Maximum number of bytes in blocks to be retained. // 0 or less means disabled. // NOTE: For proper storage calculations need to consider // the size of the WAL folder which is not added when calculating // the current size of the database. MaxBytes int64 // NoLockfile disables creation and consideration of a lock file. NoLockfile bool // WALCompression configures the compression type to use on records in the WAL. WALCompression wlog.CompressionType // Maximum number of CPUs that can simultaneously processes WAL replay. // If it is <=0, then GOMAXPROCS is used. WALReplayConcurrency int // StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance. StripeSize int // The timestamp range of head blocks after which they get persisted. // It's the minimum duration of any persisted block. // Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration. // Typically it is in milliseconds. MinBlockDuration int64 // The maximum timestamp range of compacted blocks. // Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration. // Typically it is in milliseconds. MaxBlockDuration int64 // HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper. HeadChunksWriteBufferSize int // HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper. HeadChunksWriteQueueSize int // SamplesPerChunk configures the target number of samples per chunk. SamplesPerChunk int // SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. // It is always a no-op in Prometheus and mainly meant for external users who import TSDB. SeriesLifecycleCallback SeriesLifecycleCallback // BlocksToDelete is a function which returns the blocks which can be deleted. // It is always the default time and size based retention in Prometheus and // mainly meant for external users who import TSDB. BlocksToDelete BlocksToDeleteFunc // Enables the in memory exemplar storage. EnableExemplarStorage bool // Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster. EnableMemorySnapshotOnShutdown bool // MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory. // See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage. MaxExemplars int64 // Disables isolation between reads and in-flight appends. IsolationDisabled bool // EnableNativeHistograms enables the ingestion of native histograms. EnableNativeHistograms bool // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. // This can change during run-time, so this value from here should only be used // while initialising. OutOfOrderTimeWindow int64 // OutOfOrderCapMax is maximum capacity for OOO chunks (in samples). // If it is <=0, the default value is assumed. OutOfOrderCapMax int64 // Compaction of overlapping blocks are allowed if EnableOverlappingCompaction is true. // This is an optional flag for overlapping blocks. // The reason why this flag exists is because there are various users of the TSDB // that do not want vertical compaction happening on ingest time. Instead, // they'd rather keep overlapping blocks and let another component do the overlapping compaction later. // For Prometheus, this will always be true. EnableOverlappingCompaction bool // EnableSharding enables query sharding support in TSDB. EnableSharding bool }
Options of the DB storage.
func DefaultOptions ¶
func DefaultOptions() *Options
DefaultOptions used for the DB. They are reasonable for setups using millisecond precision timestamps.
type Overlaps ¶
Overlaps contains overlapping blocks aggregated by overlapping range.
func OverlappingBlocks ¶
OverlappingBlocks returns all overlapping blocks from given meta files.
type RangeHead ¶
type RangeHead struct {
// contains filtered or unexported fields
}
RangeHead allows querying Head via an IndexReader, ChunkReader and tombstones.Reader but only within a restricted range. Used for queries and compactions.
func NewRangeHead ¶
NewRangeHead returns a *RangeHead. There are no restrictions on mint/maxt.
func NewRangeHeadWithIsolationDisabled ¶
NewRangeHeadWithIsolationDisabled returns a *RangeHead that does not create an isolationState.
func (*RangeHead) BlockMaxTime ¶
BlockMaxTime returns the max time of the potential block created from this head. It's different to MaxTime as we need to add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
func (*RangeHead) Chunks ¶
func (h *RangeHead) Chunks() (ChunkReader, error)
func (*RangeHead) Index ¶
func (h *RangeHead) Index() (IndexReader, error)
func (*RangeHead) MaxTime ¶
MaxTime returns the max time of actual data fetch-able from the head. This controls the chunks time range which is closed [b.MinTime, b.MaxTime].
func (*RangeHead) String ¶
String returns an human readable representation of the range head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.
func (*RangeHead) Tombstones ¶
func (h *RangeHead) Tombstones() (tombstones.Reader, error)
type SeriesLifecycleCallback ¶
type SeriesLifecycleCallback interface { // PreCreation is called before creating a series to indicate if the series can be created. // A non nil error means the series should not be created. PreCreation(labels.Labels) error // PostCreation is called after creating a series to indicate a creation of series. PostCreation(labels.Labels) // PostDeletion is called after deletion of series. PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) }
SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. It is always a no-op in Prometheus and mainly meant for external users who import TSDB. All the callbacks should be safe to be called concurrently. It is up to the user to implement soft or hard consistency by making the callbacks atomic or non-atomic. Atomic callbacks can cause degradation performance.
type Stats ¶
type Stats struct { NumSeries uint64 MinTime, MaxTime int64 IndexPostingStats *index.PostingsStats }
type TimeRange ¶
type TimeRange struct {
Min, Max int64
}
TimeRange specifies minTime and maxTime range.
type WALReplayStatus ¶
WALReplayStatus contains status information about the WAL replay.
func (*WALReplayStatus) GetWALReplayStatus ¶
func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus
GetWALReplayStatus returns the WAL replay status information.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
|
Package fileutil provides utility methods used when dealing with the filesystem in tsdb. |
Package goversion enforces the go version supported by the tsdb module.
|
Package goversion enforces the go version supported by the tsdb module. |
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.
|
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot. |