tsdb

package
v0.55.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: Apache-2.0 Imports: 50 Imported by: 173

README

TSDB

GoPkg

This directory contains the Prometheus TSDB (Time Series DataBase) library, which handles storage and querying of all Prometheus v2 data.

Documentation

External resources

A series of blog posts explaining different components of TSDB:

Documentation

Overview

Package tsdb implements a time series storage for float64 sample data.

Example
package main

import (
	"context"
	"fmt"
	"math"
	"os"
	"time"

	"github.com/prometheus/prometheus/model/labels"
	"github.com/prometheus/prometheus/tsdb/chunkenc"
)

func main() {
	// Create a random dir to work in.  Open() doesn't require a pre-existing dir, but
	// we want to make sure not to make a mess where we shouldn't.
	dir, err := os.MkdirTemp("", "tsdb-test")
	noErr(err)

	// Open a TSDB for reading and/or writing.
	db, err := Open(dir, nil, nil, DefaultOptions(), nil)
	noErr(err)

	// Open an appender for writing.
	app := db.Appender(context.Background())

	series := labels.FromStrings("foo", "bar")

	// Ref is 0 for the first append since we don't know the reference for the series.
	ref, err := app.Append(0, series, time.Now().Unix(), 123)
	noErr(err)

	// Another append for a second later.
	// Re-using the ref from above since it's the same series, makes append faster.
	time.Sleep(time.Second)
	_, err = app.Append(ref, series, time.Now().Unix(), 124)
	noErr(err)

	// Commit to storage.
	err = app.Commit()
	noErr(err)

	// In case you want to do more appends after app.Commit(),
	// you need a new appender.
	app = db.Appender(context.Background())
	// ... adding more samples.

	// Open a querier for reading.
	querier, err := db.Querier(math.MinInt64, math.MaxInt64)
	noErr(err)
	ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))

	for ss.Next() {
		series := ss.At()
		fmt.Println("series:", series.Labels().String())

		it := series.Iterator(nil)
		for it.Next() == chunkenc.ValFloat {
			_, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below)
			fmt.Println("sample", v)
		}

		fmt.Println("it.Err():", it.Err())
	}
	fmt.Println("ss.Err():", ss.Err())
	ws := ss.Warnings()
	if len(ws) > 0 {
		fmt.Println("warnings:", ws)
	}
	err = querier.Close()
	noErr(err)

	// Clean up any last resources when done.
	err = db.Close()
	noErr(err)
	err = os.RemoveAll(dir)
	noErr(err)

}

func noErr(err error) {
	if err != nil {
		panic(err)
	}
}
Output:

series: {foo="bar"}
sample 123
sample 124
it.Err(): <nil>
ss.Err(): <nil>

Index

Examples

Constants

View Source
const (
	// DefaultOutOfOrderCapMax is the default maximum size of an in-memory out-of-order chunk.
	DefaultOutOfOrderCapMax int64 = 32
	// DefaultSamplesPerChunk provides a default target number of samples per chunk.
	DefaultSamplesPerChunk = 120
)
View Source
const (

	// CompactionHintFromOutOfOrder is a hint noting that the block
	// was created from out-of-order chunks.
	CompactionHintFromOutOfOrder = "from-out-of-order"
)
View Source
const (
	// DefaultBlockDuration in milliseconds.
	DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond)
)
View Source
const (
	// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
	DefaultStripeSize = 1 << 14
)

Variables

View Source
var (
	// ErrInvalidSample is returned if an appended sample is not valid and can't
	// be ingested.
	ErrInvalidSample = errors.New("invalid sample")
	// ErrInvalidExemplar is returned if an appended exemplar is not valid and can't
	// be ingested.
	ErrInvalidExemplar = errors.New("invalid exemplar")
	// ErrAppenderClosed is returned if an appender has already be successfully
	// rolled back or committed.
	ErrAppenderClosed = errors.New("appender closed")
)
View Source
var ErrClosed = errors.New("db already closed")

ErrClosed is returned when the db is closed.

View Source
var ErrClosing = errors.New("block is closing")

ErrClosing is returned when a block is in the process of being closed.

View Source
var ErrInvalidTimes = fmt.Errorf("max time is lesser than min time")
View Source
var ErrNoSeriesAppended = errors.New("no series appended, aborting")

ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.

View Source
var ErrNotReady = errors.New("TSDB not ready")

ErrNotReady is returned if the underlying storage is not ready yet.

Functions

func AllSortedPostings added in v0.54.0

func AllSortedPostings(ctx context.Context, reader IndexReader) index.Postings

AllSortedPostings returns a sorted all posting iterator from the input index reader.

func BeyondSizeRetention

func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})

BeyondSizeRetention returns those blocks which are beyond the size retention set in the db options.

func BeyondTimeRetention

func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})

BeyondTimeRetention returns those blocks which are beyond the time retention set in the db options.

func CreateBlock

func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error)

CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk.

func DeleteChunkSnapshots

func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error

DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.

func ExponentialBlockRanges

func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64

ExponentialBlockRanges returns the time ranges based on the stepSize.

func LastChunkSnapshot

func LastChunkSnapshot(dir string) (string, int, int, error)

LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. If dir does not contain any chunk snapshots, ErrNotFound is returned.

func NewBlockChunkQuerier

func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)

NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.

func NewBlockChunkSeriesSet added in v0.44.0

func NewBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet

func NewBlockQuerier

func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error)

NewBlockQuerier returns a querier against the block reader and requested min and max time range.

func NewHeadAndOOOChunkQuerier added in v0.55.0

func NewHeadAndOOOChunkQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.ChunkQuerier) storage.ChunkQuerier

func NewHeadAndOOOQuerier added in v0.55.0

func NewHeadAndOOOQuerier(inoMint, mint, maxt int64, head *Head, oooIsoState *oooIsolationState, querier storage.Querier) storage.Querier

func NewMergedStringIter

func NewMergedStringIter(a, b index.StringIter) index.StringIter

NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.

func PostingsForMatchers

func PostingsForMatchers(ctx context.Context, ix IndexReader, ms ...*labels.Matcher) (index.Postings, error)

PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.

Types

type Block

type Block struct {
	// contains filtered or unexported fields
}

Block represents a directory of time series data covering a continuous time range.

func OpenBlock

func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error)

OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used to instantiate chunk structs.

func (*Block) Chunks

func (pb *Block) Chunks() (ChunkReader, error)

Chunks returns a new ChunkReader against the block data.

func (*Block) CleanTombstones

func (pb *Block) CleanTombstones(dest string, c Compactor) ([]ulid.ULID, bool, error)

CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). If there was a rewrite, then it returns the ULID of new blocks written, else nil. If a resultant block is empty (tombstones covered the whole block), then it returns an empty slice. It returns a boolean indicating if the parent block can be deleted safely of not.

func (*Block) Close

func (pb *Block) Close() error

Close closes the on-disk block. It blocks as long as there are readers reading from the block.

func (*Block) Delete

func (pb *Block) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error

Delete matching series between mint and maxt in the block.

func (*Block) Dir

func (pb *Block) Dir() string

Dir returns the directory of the block.

func (*Block) GetSymbolTableSize

func (pb *Block) GetSymbolTableSize() uint64

GetSymbolTableSize returns the Symbol Table Size in the index of this block.

func (*Block) Index

func (pb *Block) Index() (IndexReader, error)

Index returns a new IndexReader against the block data.

func (*Block) LabelNames

func (pb *Block) LabelNames(ctx context.Context) ([]string, error)

LabelNames returns all the unique label names present in the Block in sorted order.

func (*Block) MaxTime

func (pb *Block) MaxTime() int64

MaxTime returns the max time of the meta.

func (*Block) Meta

func (pb *Block) Meta() BlockMeta

Meta returns meta information about the block.

func (*Block) MinTime

func (pb *Block) MinTime() int64

MinTime returns the min time of the meta.

func (*Block) OverlapsClosedInterval

func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool

OverlapsClosedInterval returns true if the block overlaps [mint, maxt].

func (*Block) Size

func (pb *Block) Size() int64

Size returns the number of bytes that the block takes up.

func (*Block) Snapshot

func (pb *Block) Snapshot(dir string) error

Snapshot creates snapshot of the block into dir.

func (*Block) String

func (pb *Block) String() string

func (*Block) Tombstones

func (pb *Block) Tombstones() (tombstones.Reader, error)

Tombstones returns a new TombstoneReader against the block data.

type BlockChunkQuerierFunc added in v0.54.0

type BlockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)

type BlockDesc

type BlockDesc struct {
	ULID    ulid.ULID `json:"ulid"`
	MinTime int64     `json:"minTime"`
	MaxTime int64     `json:"maxTime"`
}

BlockDesc describes a block by ULID and time range.

type BlockMeta

type BlockMeta struct {
	// Unique identifier for the block and its contents. Changes on compaction.
	ULID ulid.ULID `json:"ulid"`

	// MinTime and MaxTime specify the time range all samples
	// in the block are in.
	MinTime int64 `json:"minTime"`
	MaxTime int64 `json:"maxTime"`

	// Stats about the contents of the block.
	Stats BlockStats `json:"stats,omitempty"`

	// Information on compactions the block was created from.
	Compaction BlockMetaCompaction `json:"compaction"`

	// Version of the index format.
	Version int `json:"version"`
}

BlockMeta provides meta information about a block.

func CompactBlockMetas

func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta

CompactBlockMetas merges many block metas into one, combining its source blocks together and adjusting compaction level. Min/Max time of result block meta covers all input blocks.

type BlockMetaCompaction

type BlockMetaCompaction struct {
	// Maximum number of compaction cycles any source block has
	// gone through.
	Level int `json:"level"`
	// ULIDs of all source head blocks that went into the block.
	Sources []ulid.ULID `json:"sources,omitempty"`
	// Indicates that during compaction it resulted in a block without any samples
	// so it should be deleted on the next reloadBlocks.
	Deletable bool `json:"deletable,omitempty"`
	// Short descriptions of the direct blocks that were used to create
	// this block.
	Parents []BlockDesc `json:"parents,omitempty"`
	Failed  bool        `json:"failed,omitempty"`
	// Additional information about the compaction, for example, block created from out-of-order chunks.
	Hints []string `json:"hints,omitempty"`
}

BlockMetaCompaction holds information about compactions a block went through.

func (*BlockMetaCompaction) FromOutOfOrder added in v0.39.0

func (bm *BlockMetaCompaction) FromOutOfOrder() bool

func (*BlockMetaCompaction) SetOutOfOrder added in v0.39.0

func (bm *BlockMetaCompaction) SetOutOfOrder()

type BlockPopulator added in v0.44.0

type BlockPopulator interface {
	PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error
}

type BlockQuerierFunc added in v0.54.0

type BlockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error)

type BlockReader

type BlockReader interface {
	// Index returns an IndexReader over the block's data.
	Index() (IndexReader, error)

	// Chunks returns a ChunkReader over the block's data.
	Chunks() (ChunkReader, error)

	// Tombstones returns a tombstones.Reader over the block's deleted data.
	Tombstones() (tombstones.Reader, error)

	// Meta provides meta information about the block reader.
	Meta() BlockMeta

	// Size returns the number of bytes that the block takes up on disk.
	Size() int64
}

BlockReader provides reading access to a data block.

type BlockStats

type BlockStats struct {
	NumSamples    uint64 `json:"numSamples,omitempty"`
	NumSeries     uint64 `json:"numSeries,omitempty"`
	NumChunks     uint64 `json:"numChunks,omitempty"`
	NumTombstones uint64 `json:"numTombstones,omitempty"`
}

BlockStats contains stats about contents of a block.

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter is a block writer that allows appending and flushing series to disk.

func NewBlockWriter

func NewBlockWriter(logger log.Logger, dir string, blockSize int64) (*BlockWriter, error)

NewBlockWriter creates a new block writer.

The returned writer accumulates all the series in the Head block until `Flush` is called.

Note that the writer will not check if the target directory exists or contains anything at all. It is the caller's responsibility to ensure that the resulting blocks do not overlap etc. Writer ensures the block flush is atomic (via rename).

func (*BlockWriter) Appender

func (w *BlockWriter) Appender(ctx context.Context) storage.Appender

Appender returns a new appender on the database. Appender can't be called concurrently. However, the returned Appender can safely be used concurrently.

func (*BlockWriter) Close

func (w *BlockWriter) Close() error

func (*BlockWriter) Flush

func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error)

Flush implements the Writer interface. This is where actual block writing happens. After flush completes, no writes can be done.

type BlocksToDeleteFunc

type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}

func DefaultBlocksToDelete

func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc

DefaultBlocksToDelete returns a filter which decides time based and size based retention from the options of the db.

type ChunkReader

type ChunkReader interface {
	// ChunkOrIterable returns the series data for the given chunks.Meta.
	// Either a single chunk will be returned, or an iterable.
	// A single chunk should be returned if chunks.Meta maps to a chunk that
	// already exists and doesn't need modifications.
	// An iterable should be returned if chunks.Meta maps to a subset of the
	// samples in a stored chunk, or multiple chunks. (E.g. OOOHeadChunkReader
	// could return an iterable where multiple histogram samples have counter
	// resets. There can only be one counter reset per histogram chunk so
	// multiple chunks would be created from the iterable in this case.)
	// Only one of chunk or iterable should be returned. In some cases you may
	// always expect a chunk to be returned. You can check that iterable is nil
	// in those cases.
	ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error)

	// Close releases all underlying resources of the reader.
	Close() error
}

ChunkReader provides reading access of serialized time series data.

type ChunkReaderWithCopy added in v0.55.0

type ChunkReaderWithCopy interface {
	ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error)
}

type ChunkSnapshotStats

type ChunkSnapshotStats struct {
	TotalSeries int
	Dir         string
}

ChunkSnapshotStats returns stats about a created chunk snapshot.

type ChunkWriter

type ChunkWriter interface {
	// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
	// must be populated.
	// After returning successfully, the Ref fields in the ChunkMetas
	// are set and can be used to retrieve the chunks from the written data.
	WriteChunks(chunks ...chunks.Meta) error

	// Close writes any required finalization and closes the resources
	// associated with the underlying writer.
	Close() error
}

ChunkWriter serializes a time block of chunked series data.

type CircularExemplarStorage

type CircularExemplarStorage struct {
	// contains filtered or unexported fields
}

func (*CircularExemplarStorage) AddExemplar

func (*CircularExemplarStorage) Appender

func (*CircularExemplarStorage) ApplyConfig

func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error

func (*CircularExemplarStorage) ExemplarQuerier

func (*CircularExemplarStorage) IterateExemplars

func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error

IterateExemplars iterates through all the exemplars from oldest to newest appended and calls the given function on all of them till the end (or) till the first function call that returns an error.

func (*CircularExemplarStorage) Querier

func (*CircularExemplarStorage) Resize

func (ce *CircularExemplarStorage) Resize(l int64) int

Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.

func (*CircularExemplarStorage) Select

func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)

Select returns exemplars for a given set of label matchers.

func (*CircularExemplarStorage) ValidateExemplar

func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error

type Compactor

type Compactor interface {
	// Plan returns a set of directories that can be compacted concurrently.
	// The directories can be overlapping.
	// Results returned when compactions are in progress are undefined.
	Plan(dir string) ([]string, error)

	// Write persists one or more Blocks into a directory.
	// No Block is written when resulting Block has 0 samples and returns an empty slice.
	// Prometheus always return one or no block. The interface allows returning more than one
	// block for downstream users to experiment with compactor.
	Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) ([]ulid.ULID, error)

	// Compact runs compaction against the provided directories. Must
	// only be called concurrently with results of Plan().
	// Can optionally pass a list of already open blocks,
	// to avoid having to reopen them.
	// Prometheus always return one or no block. The interface allows returning more than one
	// block for downstream users to experiment with compactor.
	// When one resulting Block has 0 samples
	//  * No block is written.
	//  * The source dirs are marked Deletable.
	//  * Block is not included in the result.
	Compact(dest string, dirs []string, open []*Block) ([]ulid.ULID, error)
}

Compactor provides compaction against an underlying storage of time series data.

type CompactorMetrics added in v0.44.0

type CompactorMetrics struct {
	Ran               prometheus.Counter
	PopulatingBlocks  prometheus.Gauge
	OverlappingBlocks prometheus.Counter
	Duration          prometheus.Histogram
	ChunkSize         prometheus.Histogram
	ChunkSamples      prometheus.Histogram
	ChunkRange        prometheus.Histogram
}

func NewCompactorMetrics added in v0.52.0

func NewCompactorMetrics(r prometheus.Registerer) *CompactorMetrics

NewCompactorMetrics initializes metrics for Compactor.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB handles reads and writes of time series falling into a hashed partition of a seriedb.

func Open

func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, stats *DBStats) (db *DB, err error)

Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.

func (*DB) Appender

func (db *DB) Appender(ctx context.Context) storage.Appender

Appender opens a new appender against the database.

func (*DB) ApplyConfig

func (db *DB) ApplyConfig(conf *config.Config) error

ApplyConfig applies a new config to the DB. Behaviour of 'OutOfOrderTimeWindow' is as follows: OOO enabled = oooTimeWindow > 0. OOO disabled = oooTimeWindow is 0. 1) Before: OOO disabled, Now: OOO enabled =>

  • A new WBL is created for the head block.
  • OOO compaction is enabled.
  • Overlapping queries are enabled.

2) Before: OOO enabled, Now: OOO enabled =>

  • Only the time window is updated.

3) Before: OOO enabled, Now: OOO disabled =>

  • Time Window set to 0. So no new OOO samples will be allowed.
  • OOO WBL will stay and will be eventually cleaned up.
  • OOO Compaction and overlapping queries will remain enabled until a restart or until all OOO samples are compacted.

4) Before: OOO disabled, Now: OOO disabled => no-op.

func (*DB) Blocks

func (db *DB) Blocks() []*Block

Blocks returns the databases persisted blocks.

func (*DB) ChunkQuerier

func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

ChunkQuerier returns a new chunk querier over the data partition for the given time range.

func (*DB) CleanTombstones

func (db *DB) CleanTombstones() (err error)

CleanTombstones re-writes any blocks with tombstones.

func (*DB) Close

func (db *DB) Close() error

Close the partition.

func (*DB) Compact

func (db *DB) Compact(ctx context.Context) (returnErr error)

Compact data if possible. After successful compaction blocks are reloaded which will also delete the blocks that fall out of the retention window. Old blocks are only deleted on reloadBlocks based on the new block's parent information. See DB.reloadBlocks documentation for further information.

func (*DB) CompactHead

func (db *DB) CompactHead(head *RangeHead) error

CompactHead compacts the given RangeHead.

func (*DB) CompactOOOHead added in v0.39.0

func (db *DB) CompactOOOHead(ctx context.Context) error

CompactOOOHead compacts the OOO Head.

func (*DB) Delete

func (db *DB) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error

Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.

func (*DB) Dir

func (db *DB) Dir() string

Dir returns the directory of the database.

func (*DB) DisableCompactions

func (db *DB) DisableCompactions()

DisableCompactions disables auto compactions.

func (*DB) DisableNativeHistograms added in v0.40.0

func (db *DB) DisableNativeHistograms()

DisableNativeHistograms disables the native histogram feature.

func (*DB) EnableCompactions

func (db *DB) EnableCompactions()

EnableCompactions enables auto compactions.

func (*DB) EnableNativeHistograms added in v0.40.0

func (db *DB) EnableNativeHistograms()

EnableNativeHistograms enables the native histogram feature.

func (*DB) ExemplarQuerier

func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)

func (*DB) ForceHeadMMap added in v0.48.0

func (db *DB) ForceHeadMMap()

ForceHeadMMap is intended for use only in tests and benchmarks.

func (*DB) Head

func (db *DB) Head() *Head

Head returns the databases's head.

func (*DB) Querier

func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error)

Querier returns a new querier over the data partition for the given time range.

func (*DB) SetWriteNotified added in v0.45.0

func (db *DB) SetWriteNotified(wn wlog.WriteNotified)

func (*DB) Snapshot

func (db *DB) Snapshot(dir string, withHead bool) error

Snapshot writes the current data to the directory. If withHead is set to true it will create a new block containing all data that's currently in the memory buffer/WAL.

func (*DB) StartTime

func (db *DB) StartTime() (int64, error)

StartTime implements the Storage interface.

func (*DB) String

func (db *DB) String() string

type DBReadOnly

type DBReadOnly struct {
	// contains filtered or unexported fields
}

DBReadOnly provides APIs for read only operations on a database. Current implementation doesn't support concurrency so all API calls should happen in the same go routine.

func OpenDBReadOnly

func OpenDBReadOnly(dir, sandboxDirRoot string, l log.Logger) (*DBReadOnly, error)

OpenDBReadOnly opens DB in the given directory for read only operations.

func (*DBReadOnly) Block added in v0.45.0

func (db *DBReadOnly) Block(blockID string) (BlockReader, error)

Block returns a block reader by given block id.

func (*DBReadOnly) Blocks

func (db *DBReadOnly) Blocks() ([]BlockReader, error)

Blocks returns a slice of block readers for persisted blocks.

func (*DBReadOnly) ChunkQuerier

func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. Current implementation doesn't support multiple ChunkQueriers.

func (*DBReadOnly) Close

func (db *DBReadOnly) Close() error

Close all block readers and delete the sandbox dir.

func (*DBReadOnly) FlushWAL

func (db *DBReadOnly) FlushWAL(dir string) (returnErr error)

FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. Samples that are in existing blocks will not be written to the new block. Note that if the read only database is running concurrently with a writable database then writing the WAL to the database directory can race.

func (*DBReadOnly) LastBlockID added in v0.45.0

func (db *DBReadOnly) LastBlockID() (string, error)

LastBlockID returns the BlockID of latest block.

func (*DBReadOnly) Querier

func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error)

Querier loads the blocks and wal and returns a new querier over the data partition for the given time range. Current implementation doesn't support multiple Queriers.

type DBStats

type DBStats struct {
	Head *HeadStats
}

DBStats contains statistics about the DB separated by component (eg. head). They are available before the DB has finished initializing.

func NewDBStats

func NewDBStats() *DBStats

NewDBStats returns a new DBStats object initialized using the new function from each component.

type DefaultBlockPopulator added in v0.44.0

type DefaultBlockPopulator struct{}

func (DefaultBlockPopulator) PopulateBlock added in v0.44.0

func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) (err error)

PopulateBlock fills the index and chunk writers with new data gathered as the union of the provided blocks. It returns meta information for the new block. It expects sorted blocks input by mint.

type DeletedIterator

type DeletedIterator struct {
	// Iter is an Iterator to be wrapped.
	Iter chunkenc.Iterator
	// Intervals are the deletion intervals.
	Intervals tombstones.Intervals
}

DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.

func (*DeletedIterator) At

func (it *DeletedIterator) At() (int64, float64)

func (*DeletedIterator) AtFloatHistogram added in v0.40.0

func (it *DeletedIterator) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram)

func (*DeletedIterator) AtHistogram added in v0.40.0

func (it *DeletedIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram)

func (*DeletedIterator) AtT added in v0.40.0

func (it *DeletedIterator) AtT() int64

func (*DeletedIterator) Err

func (it *DeletedIterator) Err() error

func (*DeletedIterator) Next

func (it *DeletedIterator) Next() chunkenc.ValueType

func (*DeletedIterator) Seek

type ExemplarMetrics

type ExemplarMetrics struct {
	// contains filtered or unexported fields
}

func NewExemplarMetrics

func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics

type ExemplarStorage

type ExemplarStorage interface {
	storage.ExemplarQueryable
	AddExemplar(labels.Labels, exemplar.Exemplar) error
	ValidateExemplar(labels.Labels, exemplar.Exemplar) error
	IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
}

func NewCircularExemplarStorage

func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error)

NewCircularExemplarStorage creates a circular in memory exemplar storage. If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in 1GB of extra memory, accounting for the fact that this is heap allocated space. If len <= 0, then the exemplar storage is essentially a noop storage but can later be resized to store exemplars.

type Head struct {
	// contains filtered or unexported fields
}

Head handles reads and writes of time series data within a time window.

func NewHead

func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *HeadOptions, stats *HeadStats) (*Head, error)

NewHead opens the head block in dir.

func (*Head) AppendableMinValidTime

func (h *Head) AppendableMinValidTime() (int64, bool)

AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head. Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.

func (*Head) Appender

func (h *Head) Appender(_ context.Context) storage.Appender

Appender returns a new Appender on the database.

func (*Head) ApplyConfig

func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL)

func (*Head) ChunkSnapshot

func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error)

ChunkSnapshot creates a snapshot of all the series and tombstones in the head. It deletes the old chunk snapshots if the chunk snapshot creation is successful.

The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written using the WAL package. N is the last WAL segment present during snapshotting and M is the offset in segment N upto which data was written.

The snapshot first contains all series (each in individual records and not sorted), followed by tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they were written to the circular buffer.

func (*Head) Chunks

func (h *Head) Chunks() (ChunkReader, error)

Chunks returns a ChunkReader against the block.

func (*Head) Close

func (h *Head) Close() error

Close flushes the WAL and closes the head. It also takes a snapshot of in-memory chunks if enabled.

func (*Head) Delete

func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error

Delete all samples in the range of [mint, maxt] for series that satisfy the given label matchers.

func (*Head) DisableNativeHistograms added in v0.40.0

func (h *Head) DisableNativeHistograms()

DisableNativeHistograms disables the native histogram feature.

func (*Head) EnableNativeHistograms added in v0.40.0

func (h *Head) EnableNativeHistograms()

EnableNativeHistograms enables the native histogram feature.

func (*Head) ExemplarQuerier

func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)

func (*Head) Index

func (h *Head) Index() (IndexReader, error)

Index returns an IndexReader against the block.

func (*Head) Init

func (h *Head) Init(minValidTime int64) error

Init loads data from the write ahead log and prepares the head for writes. It should be called before using an appender so that it limits the ingested samples to the head min valid time.

func (*Head) IsQuerierCollidingWithTruncation

func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64)

IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier has to be created. In the latter case, the method also returns the new mint to be used for creating the new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.

NOTE: The querier should already be taken before calling this.

func (*Head) MaxOOOTime added in v0.39.0

func (h *Head) MaxOOOTime() int64

MaxOOOTime returns the highest timestamp on visible data in the out of order head.

func (*Head) MaxTime

func (h *Head) MaxTime() int64

MaxTime returns the highest timestamp seen in data of the head.

func (*Head) Meta

func (h *Head) Meta() BlockMeta

Meta returns meta information about the head. The head is dynamic so will return dynamic results.

func (*Head) MinOOOTime added in v0.39.0

func (h *Head) MinOOOTime() int64

MinOOOTime returns the lowest time bound on visible data in the out of order head.

func (*Head) MinTime

func (h *Head) MinTime() int64

MinTime returns the lowest time bound on visible data in the head.

func (*Head) NumSeries

func (h *Head) NumSeries() uint64

NumSeries returns the number of active series in the head.

func (*Head) OverlapsClosedInterval

func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool

OverlapsClosedInterval returns true if the head overlaps [mint, maxt].

func (*Head) PostingsCardinalityStats

func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats

PostingsCardinalityStats returns highest cardinality stats by label and value names.

func (*Head) RebuildSymbolTable added in v0.54.0

func (h *Head) RebuildSymbolTable(logger log.Logger) *labels.SymbolTable

RebuildSymbolTable is a no-op when not using dedupelabels.

func (*Head) SetMinValidTime

func (h *Head) SetMinValidTime(minValidTime int64)

SetMinValidTime sets the minimum timestamp the head can ingest.

func (*Head) SetOutOfOrderTimeWindow added in v0.39.0

func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wlog.WL)

SetOutOfOrderTimeWindow updates the out of order related parameters. If the Head already has a WBL set, then the wbl will be ignored.

func (*Head) Size

func (h *Head) Size() int64

func (*Head) Stats

func (h *Head) Stats(statsByLabelName string, limit int) *Stats

Stats returns important current HEAD statistics. Note that it is expensive to calculate these.

func (*Head) String

func (h *Head) String() string

String returns an human readable representation of the TSDB head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.

func (*Head) Tombstones

func (h *Head) Tombstones() (tombstones.Reader, error)

Tombstones returns a new reader over the head's tombstones.

func (*Head) Truncate

func (h *Head) Truncate(mint int64) (err error)

Truncate removes old data before mint from the head and WAL.

func (*Head) WaitForAppendersOverlapping added in v0.37.3

func (h *Head) WaitForAppendersOverlapping(maxt int64)

WaitForAppendersOverlapping waits for appends overlapping maxt to finish.

func (*Head) WaitForPendingReadersForOOOChunksAtOrBefore added in v0.49.0

func (h *Head) WaitForPendingReadersForOOOChunksAtOrBefore(chunk chunks.ChunkDiskMapperRef)

WaitForPendingReadersForOOOChunksAtOrBefore is like WaitForPendingReadersInTimeRange, except it waits for queries touching OOO chunks less than or equal to chunk to finish querying.

func (*Head) WaitForPendingReadersInTimeRange

func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64)

WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. The query timeout limits the max wait time of this function implicitly. The mint is inclusive and maxt is the truncation time hence exclusive.

type HeadAndOOOChunkQuerier added in v0.55.0

type HeadAndOOOChunkQuerier struct {
	// contains filtered or unexported fields
}

HeadAndOOOChunkQuerier queries both the head and the out-of-order head.

func (*HeadAndOOOChunkQuerier) Close added in v0.55.0

func (q *HeadAndOOOChunkQuerier) Close() error

func (*HeadAndOOOChunkQuerier) LabelNames added in v0.55.0

func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error)

func (*HeadAndOOOChunkQuerier) LabelValues added in v0.55.0

func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error)

func (*HeadAndOOOChunkQuerier) Select added in v0.55.0

func (q *HeadAndOOOChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet

type HeadAndOOOChunkReader added in v0.55.0

type HeadAndOOOChunkReader struct {
	// contains filtered or unexported fields
}

func NewHeadAndOOOChunkReader added in v0.55.0

func NewHeadAndOOOChunkReader(head *Head, mint, maxt int64, cr *headChunkReader, oooIsoState *oooIsolationState, maxMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOChunkReader

func (*HeadAndOOOChunkReader) ChunkOrIterable added in v0.55.0

func (cr *HeadAndOOOChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error)

func (*HeadAndOOOChunkReader) ChunkOrIterableWithCopy added in v0.55.0

func (cr *HeadAndOOOChunkReader) ChunkOrIterableWithCopy(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, int64, error)

ChunkOrIterableWithCopy implements ChunkReaderWithCopy. The special Copy behaviour is only implemented for the in-order head chunk.

func (*HeadAndOOOChunkReader) Close added in v0.55.0

func (cr *HeadAndOOOChunkReader) Close() error

type HeadAndOOOIndexReader added in v0.55.0

type HeadAndOOOIndexReader struct {
	// contains filtered or unexported fields
}

func NewHeadAndOOOIndexReader added in v0.55.0

func NewHeadAndOOOIndexReader(head *Head, inoMint, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *HeadAndOOOIndexReader

func (HeadAndOOOIndexReader) Close added in v0.55.0

func (h HeadAndOOOIndexReader) Close() error

func (HeadAndOOOIndexReader) LabelNames added in v0.55.0

func (h HeadAndOOOIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error)

LabelNames returns all the unique label names present in the head that are within the time range mint to maxt.

func (HeadAndOOOIndexReader) LabelNamesFor added in v0.55.0

func (h HeadAndOOOIndexReader) LabelNamesFor(ctx context.Context, series index.Postings) ([]string, error)

LabelNamesFor returns all the label names for the series referred to by the postings. The names returned are sorted.

func (HeadAndOOOIndexReader) LabelValueFor added in v0.55.0

func (h HeadAndOOOIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error)

LabelValueFor returns label value for the given label name in the series referred to by ID.

func (*HeadAndOOOIndexReader) LabelValues added in v0.55.0

func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

LabelValues needs to be overridden from the headIndexReader implementation so we can return labels within either in-order range or ooo range.

func (HeadAndOOOIndexReader) Postings added in v0.55.0

func (h HeadAndOOOIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error)

Postings returns the postings list iterator for the label pairs.

func (HeadAndOOOIndexReader) PostingsForLabelMatching added in v0.55.0

func (h HeadAndOOOIndexReader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) index.Postings

func (*HeadAndOOOIndexReader) Series added in v0.55.0

func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error

func (HeadAndOOOIndexReader) ShardedPostings added in v0.55.0

func (h HeadAndOOOIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings

ShardedPostings implements IndexReader. This function returns an failing postings list if sharding has not been enabled in the Head.

func (HeadAndOOOIndexReader) SortedLabelValues added in v0.55.0

func (h HeadAndOOOIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

SortedLabelValues returns label values present in the head for the specific label name that are within the time range mint to maxt. If matchers are specified the returned result set is reduced to label values of metrics matching the matchers.

func (HeadAndOOOIndexReader) SortedPostings added in v0.55.0

func (h HeadAndOOOIndexReader) SortedPostings(p index.Postings) index.Postings

func (HeadAndOOOIndexReader) Symbols added in v0.55.0

func (h HeadAndOOOIndexReader) Symbols() index.StringIter

type HeadAndOOOQuerier added in v0.55.0

type HeadAndOOOQuerier struct {
	// contains filtered or unexported fields
}

HeadAndOOOQuerier queries both the head and the out-of-order head.

func (*HeadAndOOOQuerier) Close added in v0.55.0

func (q *HeadAndOOOQuerier) Close() error

func (*HeadAndOOOQuerier) LabelNames added in v0.55.0

func (q *HeadAndOOOQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error)

func (*HeadAndOOOQuerier) LabelValues added in v0.55.0

func (q *HeadAndOOOQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error)

func (*HeadAndOOOQuerier) Select added in v0.55.0

func (q *HeadAndOOOQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet

type HeadOptions

type HeadOptions struct {
	// Runtime reloadable option. At the top of the struct for 32 bit OS:
	// https://pkg.go.dev/sync/atomic#pkg-note-BUG
	MaxExemplars atomic.Int64

	OutOfOrderTimeWindow atomic.Int64
	OutOfOrderCapMax     atomic.Int64

	// EnableNativeHistograms enables the ingestion of native histograms.
	EnableNativeHistograms atomic.Bool

	// EnableCreatedTimestampZeroIngestion enables the ingestion of the created timestamp as a synthetic zero sample.
	// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
	EnableCreatedTimestampZeroIngestion bool

	ChunkRange int64
	// ChunkDirRoot is the parent directory of the chunks directory.
	ChunkDirRoot         string
	ChunkPool            chunkenc.Pool
	ChunkWriteBufferSize int
	ChunkWriteQueueSize  int

	SamplesPerChunk int

	// StripeSize sets the number of entries in the hash map, it must be a power of 2.
	// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
	// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
	StripeSize                     int
	SeriesCallback                 SeriesLifecycleCallback
	EnableExemplarStorage          bool
	EnableMemorySnapshotOnShutdown bool

	IsolationDisabled bool

	// Maximum number of CPUs that can simultaneously processes WAL replay.
	// The default value is GOMAXPROCS.
	// If it is set to a negative value or zero, the default value is used.
	WALReplayConcurrency int

	// EnableSharding enables ShardedPostings() support in the Head.
	EnableSharding bool
}

HeadOptions are parameters for the Head block.

func DefaultHeadOptions

func DefaultHeadOptions() *HeadOptions

type HeadStats

type HeadStats struct {
	WALReplayStatus *WALReplayStatus
}

HeadStats are the statistics for the head component of the DB.

func NewHeadStats

func NewHeadStats() *HeadStats

NewHeadStats returns a new HeadStats object.

type IndexReader

type IndexReader interface {
	// Symbols return an iterator over sorted string symbols that may occur in
	// series' labels and indices. It is not safe to use the returned strings
	// beyond the lifetime of the index reader.
	Symbols() index.StringIter

	// SortedLabelValues returns sorted possible label values.
	SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

	// LabelValues returns possible label values which may not be sorted.
	LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

	// Postings returns the postings list iterator for the label pairs.
	// The Postings here contain the offsets to the series inside the index.
	// Found IDs are not strictly required to point to a valid Series, e.g.
	// during background garbage collections.
	Postings(ctx context.Context, name string, values ...string) (index.Postings, error)

	// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
	// If no postings are found having at least one matching label, an empty iterator is returned.
	PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings

	// SortedPostings returns a postings list that is reordered to be sorted
	// by the label set of the underlying series.
	SortedPostings(index.Postings) index.Postings

	// ShardedPostings returns a postings list filtered by the provided shardIndex
	// out of shardCount. For a given posting, its shard MUST be computed hashing
	// the series labels mod shardCount, using a hash function which is consistent over time.
	ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings

	// Series populates the given builder and chunk metas for the series identified
	// by the reference.
	// Returns storage.ErrNotFound if the ref does not resolve to a known series.
	Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error

	// LabelNames returns all the unique label names present in the index in sorted order.
	LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error)

	// LabelValueFor returns label value for the given label name in the series referred to by ID.
	// If the series couldn't be found or the series doesn't have the requested label a
	// storage.ErrNotFound is returned as error.
	LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error)

	// LabelNamesFor returns all the label names for the series referred to by the postings.
	// The names returned are sorted.
	LabelNamesFor(ctx context.Context, postings index.Postings) ([]string, error)

	// Close releases the underlying resources of the reader.
	Close() error
}

IndexReader provides reading access of serialized index data.

func NewOOOCompactionHeadIndexReader added in v0.39.0

func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader

type IndexReaderPostingsFunc added in v0.54.0

type IndexReaderPostingsFunc func(ctx context.Context, reader IndexReader) index.Postings

IndexReaderPostingsFunc is a function to get a sorted posting iterator from a given index reader.

type IndexWriter

type IndexWriter interface {
	// AddSymbol registers a single symbol.
	// Symbols must be registered in sorted order.
	AddSymbol(sym string) error

	// AddSeries populates the index writer with a series and its offsets
	// of chunks that the index can reference.
	// Implementations may require series to be insert in strictly increasing order by
	// their labels. The reference numbers are used to resolve entries in postings lists
	// that are added later.
	AddSeries(ref storage.SeriesRef, l labels.Labels, chunks ...chunks.Meta) error

	// Close writes any finalization and closes the resources associated with
	// the underlying writer.
	Close() error
}

IndexWriter serializes the index for a block of series data. The methods must be called in the order they are specified in.

type LeveledCompactor

type LeveledCompactor struct {
	// contains filtered or unexported fields
}

LeveledCompactor implements the Compactor interface.

func NewLeveledCompactorWithChunkSize

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)

func NewLeveledCompactorWithOptions added in v0.50.0

func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error)

func (*LeveledCompactor) Compact

func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) ([]ulid.ULID, error)

Compact creates a new block in the compactor's directory from the blocks in the provided directories.

func (*LeveledCompactor) CompactWithBlockPopulator added in v0.44.0

func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) ([]ulid.ULID, error)

func (*LeveledCompactor) Plan

func (c *LeveledCompactor) Plan(dir string) ([]string, error)

Plan returns a list of compactable blocks in the provided directory.

func (*LeveledCompactor) Write

func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, base *BlockMeta) ([]ulid.ULID, error)

type LeveledCompactorOptions added in v0.50.0

type LeveledCompactorOptions struct {
	// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
	// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more.
	PE index.PostingsEncoder
	// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
	MaxBlockChunkSegmentSize int64
	// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
	MergeFunc storage.VerticalChunkSeriesMergeFunc
	// EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled.
	// It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction.
	EnableOverlappingCompaction bool
}

type NewCompactorFunc added in v0.53.0

type NewCompactorFunc func(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts *Options) (Compactor, error)

type OOOChunk added in v0.39.0

type OOOChunk struct {
	// contains filtered or unexported fields
}

OOOChunk maintains samples in time-ascending order. Inserts for timestamps already seen, are dropped. Samples are stored uncompressed to allow easy sorting. Perhaps we can be more efficient later.

func NewOOOChunk added in v0.39.0

func NewOOOChunk() *OOOChunk

func (*OOOChunk) Insert added in v0.39.0

Insert inserts the sample such that order is maintained. Returns false if insert was not possible due to the same timestamp already existing.

func (*OOOChunk) NumSamples added in v0.39.0

func (o *OOOChunk) NumSamples() int

func (*OOOChunk) ToEncodedChunks added in v0.54.0

func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error)

ToEncodedChunks returns chunks with the samples in the OOOChunk.

type OOOCompactionHead added in v0.39.0

type OOOCompactionHead struct {
	// contains filtered or unexported fields
}

func NewOOOCompactionHead added in v0.39.0

func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error)

NewOOOCompactionHead does the following: 1. M-maps all the in-memory ooo chunks. 2. Compute the expected block ranges while iterating through all ooo series and store it. 3. Store the list of postings having ooo series. 4. Cuts a new WBL file for the OOO WBL. All the above together have a bit of CPU and memory overhead, and can have a bit of impact on the sample append latency. So call NewOOOCompactionHead only right before compaction.

func (*OOOCompactionHead) ChunkRange added in v0.39.0

func (ch *OOOCompactionHead) ChunkRange() int64

func (*OOOCompactionHead) Chunks added in v0.39.0

func (ch *OOOCompactionHead) Chunks() (ChunkReader, error)

func (*OOOCompactionHead) CloneForTimeRange added in v0.39.0

func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead

CloneForTimeRange clones the OOOCompactionHead such that the IndexReader and ChunkReader obtained from this only looks at the m-map chunks within the given time ranges while not looking beyond the ch.lastMmapRef. Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.

func (*OOOCompactionHead) Index added in v0.39.0

func (ch *OOOCompactionHead) Index() (IndexReader, error)

func (*OOOCompactionHead) LastMmapRef added in v0.39.0

func (ch *OOOCompactionHead) LastMmapRef() chunks.ChunkDiskMapperRef

func (*OOOCompactionHead) LastWBLFile added in v0.39.0

func (ch *OOOCompactionHead) LastWBLFile() int

func (*OOOCompactionHead) MaxTime added in v0.39.0

func (ch *OOOCompactionHead) MaxTime() int64

func (*OOOCompactionHead) Meta added in v0.39.0

func (ch *OOOCompactionHead) Meta() BlockMeta

func (*OOOCompactionHead) MinTime added in v0.39.0

func (ch *OOOCompactionHead) MinTime() int64

func (*OOOCompactionHead) Size added in v0.39.0

func (ch *OOOCompactionHead) Size() int64

func (*OOOCompactionHead) Tombstones added in v0.39.0

func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error)

type OOOCompactionHeadIndexReader added in v0.39.0

type OOOCompactionHeadIndexReader struct {
	// contains filtered or unexported fields
}

func (*OOOCompactionHeadIndexReader) Close added in v0.39.0

func (ir *OOOCompactionHeadIndexReader) Close() error

func (*OOOCompactionHeadIndexReader) LabelNames added in v0.39.0

func (*OOOCompactionHeadIndexReader) LabelNamesFor added in v0.39.0

func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, postings index.Postings) ([]string, error)

func (*OOOCompactionHeadIndexReader) LabelValueFor added in v0.39.0

func (*OOOCompactionHeadIndexReader) LabelValues added in v0.39.0

func (ir *OOOCompactionHeadIndexReader) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

func (*OOOCompactionHeadIndexReader) Postings added in v0.39.0

func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error)

func (*OOOCompactionHeadIndexReader) PostingsForLabelMatching added in v0.53.0

func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings

func (*OOOCompactionHeadIndexReader) PostingsForMatchers added in v0.39.0

func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error)

func (*OOOCompactionHeadIndexReader) Series added in v0.39.0

func (*OOOCompactionHeadIndexReader) ShardedPostings added in v0.51.0

func (ir *OOOCompactionHeadIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings

func (*OOOCompactionHeadIndexReader) SortedLabelValues added in v0.39.0

func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

func (*OOOCompactionHeadIndexReader) SortedPostings added in v0.39.0

func (*OOOCompactionHeadIndexReader) Symbols added in v0.39.0

type Options

type Options struct {
	// Segments (wal files) max size.
	// WALSegmentSize = 0, segment size is default size.
	// WALSegmentSize > 0, segment size is WALSegmentSize.
	// WALSegmentSize < 0, wal is disabled.
	WALSegmentSize int

	// MaxBlockChunkSegmentSize is the max size of block chunk segment files.
	// MaxBlockChunkSegmentSize = 0, chunk segment size is default size.
	// MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize.
	MaxBlockChunkSegmentSize int64

	// Duration of persisted data to keep.
	// Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration.
	// Typically it is in milliseconds.
	RetentionDuration int64

	// Maximum number of bytes in blocks to be retained.
	// 0 or less means disabled.
	// NOTE: For proper storage calculations need to consider
	// the size of the WAL folder which is not added when calculating
	// the current size of the database.
	MaxBytes int64

	// NoLockfile disables creation and consideration of a lock file.
	NoLockfile bool

	// WALCompression configures the compression type to use on records in the WAL.
	WALCompression wlog.CompressionType

	// Maximum number of CPUs that can simultaneously processes WAL replay.
	// If it is <=0, then GOMAXPROCS is used.
	WALReplayConcurrency int

	// StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance.
	StripeSize int

	// The timestamp range of head blocks after which they get persisted.
	// It's the minimum duration of any persisted block.
	// Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration.
	// Typically it is in milliseconds.
	MinBlockDuration int64

	// The maximum timestamp range of compacted blocks.
	// Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration.
	// Typically it is in milliseconds.
	MaxBlockDuration int64

	// HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper.
	HeadChunksWriteBufferSize int

	// HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper.
	HeadChunksWriteQueueSize int

	// SamplesPerChunk configures the target number of samples per chunk.
	SamplesPerChunk int

	// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
	// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
	SeriesLifecycleCallback SeriesLifecycleCallback

	// BlocksToDelete is a function which returns the blocks which can be deleted.
	// It is always the default time and size based retention in Prometheus and
	// mainly meant for external users who import TSDB.
	BlocksToDelete BlocksToDeleteFunc

	// Enables the in memory exemplar storage.
	EnableExemplarStorage bool

	// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
	EnableMemorySnapshotOnShutdown bool

	// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
	// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
	MaxExemplars int64

	// Disables isolation between reads and in-flight appends.
	IsolationDisabled bool

	// EnableNativeHistograms enables the ingestion of native histograms.
	EnableNativeHistograms bool

	// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
	// This can change during run-time, so this value from here should only be used
	// while initialising.
	OutOfOrderTimeWindow int64

	// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
	// If it is <=0, the default value is assumed.
	OutOfOrderCapMax int64

	// Compaction of overlapping blocks are allowed if EnableOverlappingCompaction is true.
	// This is an optional flag for overlapping blocks.
	// The reason why this flag exists is because there are various users of the TSDB
	// that do not want vertical compaction happening on ingest time. Instead,
	// they'd rather keep overlapping blocks and let another component do the overlapping compaction later.
	EnableOverlappingCompaction bool

	// EnableSharding enables query sharding support in TSDB.
	EnableSharding bool

	// EnableDelayedCompaction, when set to true, assigns a random value to CompactionDelay during DB opening.
	// When set to false, delayed compaction is disabled, unless CompactionDelay is set directly.
	EnableDelayedCompaction bool
	// CompactionDelay delays the start time of auto compactions.
	// It can be increased by up to one minute if the DB does not commit too often.
	CompactionDelay time.Duration

	// NewCompactorFunc is a function that returns a TSDB compactor.
	NewCompactorFunc NewCompactorFunc

	// BlockQuerierFunc is a function to return storage.Querier from a BlockReader.
	BlockQuerierFunc BlockQuerierFunc

	// BlockChunkQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader.
	BlockChunkQuerierFunc BlockChunkQuerierFunc
}

Options of the DB storage.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions used for the DB. They are reasonable for setups using millisecond precision timestamps.

type Overlaps

type Overlaps map[TimeRange][]BlockMeta

Overlaps contains overlapping blocks aggregated by overlapping range.

func OverlappingBlocks

func OverlappingBlocks(bm []BlockMeta) Overlaps

OverlappingBlocks returns all overlapping blocks from given meta files.

func (Overlaps) String

func (o Overlaps) String() string

String returns human readable string form of overlapped blocks.

type RangeHead

type RangeHead struct {
	// contains filtered or unexported fields
}

RangeHead allows querying Head via an IndexReader, ChunkReader and tombstones.Reader but only within a restricted range. Used for queries and compactions.

func NewRangeHead

func NewRangeHead(head *Head, mint, maxt int64) *RangeHead

NewRangeHead returns a *RangeHead. There are no restrictions on mint/maxt.

func NewRangeHeadWithIsolationDisabled added in v0.37.3

func NewRangeHeadWithIsolationDisabled(head *Head, mint, maxt int64) *RangeHead

NewRangeHeadWithIsolationDisabled returns a *RangeHead that does not create an isolationState.

func (*RangeHead) BlockMaxTime

func (h *RangeHead) BlockMaxTime() int64

BlockMaxTime returns the max time of the potential block created from this head. It's different to MaxTime as we need to add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.

func (*RangeHead) Chunks

func (h *RangeHead) Chunks() (ChunkReader, error)

func (*RangeHead) Index

func (h *RangeHead) Index() (IndexReader, error)

func (*RangeHead) MaxTime

func (h *RangeHead) MaxTime() int64

MaxTime returns the max time of actual data fetch-able from the head. This controls the chunks time range which is closed [b.MinTime, b.MaxTime].

func (*RangeHead) Meta

func (h *RangeHead) Meta() BlockMeta

func (*RangeHead) MinTime

func (h *RangeHead) MinTime() int64

func (*RangeHead) NumSeries

func (h *RangeHead) NumSeries() uint64

func (*RangeHead) Size

func (h *RangeHead) Size() int64

func (*RangeHead) String

func (h *RangeHead) String() string

String returns an human readable representation of the range head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.

func (*RangeHead) Tombstones

func (h *RangeHead) Tombstones() (tombstones.Reader, error)

type SeriesLifecycleCallback

type SeriesLifecycleCallback interface {
	// PreCreation is called before creating a series to indicate if the series can be created.
	// A non nil error means the series should not be created.
	PreCreation(labels.Labels) error
	// PostCreation is called after creating a series to indicate a creation of series.
	PostCreation(labels.Labels)
	// PostDeletion is called after deletion of series.
	PostDeletion(map[chunks.HeadSeriesRef]labels.Labels)
}

SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. It is always a no-op in Prometheus and mainly meant for external users who import TSDB. All the callbacks should be safe to be called concurrently. It is up to the user to implement soft or hard consistency by making the callbacks atomic or non-atomic. Atomic callbacks can cause degradation performance.

type Stats

type Stats struct {
	NumSeries         uint64
	MinTime, MaxTime  int64
	IndexPostingStats *index.PostingsStats
}

type TimeRange

type TimeRange struct {
	Min, Max int64
}

TimeRange specifies minTime and maxTime range.

type WALReplayStatus

type WALReplayStatus struct {
	sync.RWMutex
	Min     int
	Max     int
	Current int
}

WALReplayStatus contains status information about the WAL replay.

func (*WALReplayStatus) GetWALReplayStatus

func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus

GetWALReplayStatus returns the WAL replay status information.

Directories

Path Synopsis
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
Package goversion enforces the go version supported by the tsdb module.
Package goversion enforces the go version supported by the tsdb module.
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL