tsdb

package
v0.49.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2024 License: Apache-2.0 Imports: 51 Imported by: 0

README

TSDB

GoPkg

This directory contains the Prometheus TSDB (Time Series DataBase) library, which handles storage and querying of all Prometheus v2 data.

Documentation

External resources

A series of blog posts explaining different components of TSDB:

Documentation

Overview

Package tsdb implements a time series storage for float64 sample data.

Example
package main

import (
	"context"
	"fmt"
	"math"
	"os"
	"time"

	"github.com/go-follow/prometheus/model/labels"
	"github.com/go-follow/prometheus/tsdb/chunkenc"
)

func main() {
	// Create a random dir to work in.  Open() doesn't require a pre-existing dir, but
	// we want to make sure not to make a mess where we shouldn't.
	dir, err := os.MkdirTemp("", "tsdb-test")
	noErr(err)

	// Open a TSDB for reading and/or writing.
	db, err := Open(dir, nil, nil, DefaultOptions(), nil)
	noErr(err)

	// Open an appender for writing.
	app := db.Appender(context.Background())

	series := labels.FromStrings("foo", "bar")

	// Ref is 0 for the first append since we don't know the reference for the series.
	ref, err := app.Append(0, series, time.Now().Unix(), 123)
	noErr(err)

	// Another append for a second later.
	// Re-using the ref from above since it's the same series, makes append faster.
	time.Sleep(time.Second)
	_, err = app.Append(ref, series, time.Now().Unix(), 124)
	noErr(err)

	// Commit to storage.
	err = app.Commit()
	noErr(err)

	// In case you want to do more appends after app.Commit(),
	// you need a new appender.
	app = db.Appender(context.Background())
	// ... adding more samples.

	// Open a querier for reading.
	querier, err := db.Querier(math.MinInt64, math.MaxInt64)
	noErr(err)
	ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))

	for ss.Next() {
		series := ss.At()
		fmt.Println("series:", series.Labels().String())

		it := series.Iterator(nil)
		for it.Next() == chunkenc.ValFloat {
			_, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below)
			fmt.Println("sample", v)
		}

		fmt.Println("it.Err():", it.Err())
	}
	fmt.Println("ss.Err():", ss.Err())
	ws := ss.Warnings()
	if len(ws) > 0 {
		fmt.Println("warnings:", ws)
	}
	err = querier.Close()
	noErr(err)

	// Clean up any last resources when done.
	err = db.Close()
	noErr(err)
	err = os.RemoveAll(dir)
	noErr(err)

}

func noErr(err error) {
	if err != nil {
		panic(err)
	}
}
Output:

series: {foo="bar"}
sample 123
sample 124
it.Err(): <nil>
ss.Err(): <nil>

Index

Examples

Constants

View Source
const (
	// DefaultOutOfOrderCapMax is the default maximum size of an in-memory out-of-order chunk.
	DefaultOutOfOrderCapMax int64 = 32
	// DefaultSamplesPerChunk provides a default target number of samples per chunk.
	DefaultSamplesPerChunk = 120
)
View Source
const (
	// WALMagic is a 4 byte number every WAL segment file starts with.
	WALMagic = uint32(0x43AF00EF)

	// WALFormatDefault is the version flag for the default outer segment file format.
	WALFormatDefault = byte(1)
)
View Source
const (

	// CompactionHintFromOutOfOrder is a hint noting that the block
	// was created from out-of-order chunks.
	CompactionHintFromOutOfOrder = "from-out-of-order"
)
View Source
const (
	// Default duration of a block in milliseconds.
	DefaultBlockDuration = int64(2 * time.Hour / time.Millisecond)
)
View Source
const (
	// DefaultStripeSize is the default number of entries to allocate in the stripeSeries hash map.
	DefaultStripeSize = 1 << 14
)

Variables

View Source
var (
	// ErrInvalidSample is returned if an appended sample is not valid and can't
	// be ingested.
	ErrInvalidSample = errors.New("invalid sample")
	// ErrInvalidExemplar is returned if an appended exemplar is not valid and can't
	// be ingested.
	ErrInvalidExemplar = errors.New("invalid exemplar")
	// ErrAppenderClosed is returned if an appender has already be successfully
	// rolled back or committed.
	ErrAppenderClosed = errors.New("appender closed")
)
View Source
var ErrClosed = errors.New("db already closed")

ErrClosed is returned when the db is closed.

View Source
var ErrClosing = errors.New("block is closing")

ErrClosing is returned when a block is in the process of being closed.

View Source
var ErrInvalidTimes = fmt.Errorf("max time is lesser than min time")
View Source
var ErrNoSeriesAppended = errors.New("no series appended, aborting")

ErrNoSeriesAppended is returned if the series count is zero while flushing blocks.

View Source
var ErrNotReady = errors.New("TSDB not ready")

ErrNotReady is returned if the underlying storage is not ready yet.

Functions

func BeyondSizeRetention

func BeyondSizeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})

BeyondSizeRetention returns those blocks which are beyond the size retention set in the db options.

func BeyondTimeRetention

func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struct{})

BeyondTimeRetention returns those blocks which are beyond the time retention set in the db options.

func CreateBlock

func CreateBlock(series []storage.Series, dir string, chunkRange int64, logger log.Logger) (string, error)

CreateBlock creates a chunkrange block from the samples passed to it, and writes it to disk.

func DeleteChunkSnapshots

func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error

DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.

func ExponentialBlockRanges

func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64

ExponentialBlockRanges returns the time ranges based on the stepSize.

func LastChunkSnapshot

func LastChunkSnapshot(dir string) (string, int, int, error)

LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot. If dir does not contain any chunk snapshots, ErrNotFound is returned.

func MigrateWAL

func MigrateWAL(logger log.Logger, dir string) (err error)

MigrateWAL rewrites the deprecated write ahead log into the new format.

func NewBlockChunkQuerier

func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)

NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.

func NewBlockChunkSeriesSet

func NewBlockChunkSeriesSet(id ulid.ULID, i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64, disableTrimming bool) storage.ChunkSeriesSet

func NewBlockQuerier

func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error)

NewBlockQuerier returns a querier against the block reader and requested min and max time range.

func NewMergedStringIter

func NewMergedStringIter(a, b index.StringIter) index.StringIter

NewMergedStringIter returns string iterator that allows to merge symbols on demand and stream result.

func PostingsForMatchers

func PostingsForMatchers(ctx context.Context, ix IndexReader, ms ...*labels.Matcher) (index.Postings, error)

PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.

Types

type Block

type Block struct {
	// contains filtered or unexported fields
}

Block represents a directory of time series data covering a continuous time range.

func OpenBlock

func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error)

OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used to instantiate chunk structs.

func (*Block) Chunks

func (pb *Block) Chunks() (ChunkReader, error)

Chunks returns a new ChunkReader against the block data.

func (*Block) CleanTombstones

func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, bool, error)

CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones). If there was a rewrite, then it returns the ULID of the new block written, else nil. If the resultant block is empty (tombstones covered the whole block), then it deletes the new block and return nil UID. It returns a boolean indicating if the parent block can be deleted safely of not.

func (*Block) Close

func (pb *Block) Close() error

Close closes the on-disk block. It blocks as long as there are readers reading from the block.

func (*Block) Delete

func (pb *Block) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error

Delete matching series between mint and maxt in the block.

func (*Block) Dir

func (pb *Block) Dir() string

Dir returns the directory of the block.

func (*Block) GetSymbolTableSize

func (pb *Block) GetSymbolTableSize() uint64

GetSymbolTableSize returns the Symbol Table Size in the index of this block.

func (*Block) Index

func (pb *Block) Index() (IndexReader, error)

Index returns a new IndexReader against the block data.

func (*Block) LabelNames

func (pb *Block) LabelNames(ctx context.Context) ([]string, error)

LabelNames returns all the unique label names present in the Block in sorted order.

func (*Block) MaxTime

func (pb *Block) MaxTime() int64

MaxTime returns the max time of the meta.

func (*Block) Meta

func (pb *Block) Meta() BlockMeta

Meta returns meta information about the block.

func (*Block) MinTime

func (pb *Block) MinTime() int64

MinTime returns the min time of the meta.

func (*Block) OverlapsClosedInterval

func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool

OverlapsClosedInterval returns true if the block overlaps [mint, maxt].

func (*Block) Size

func (pb *Block) Size() int64

Size returns the number of bytes that the block takes up.

func (*Block) Snapshot

func (pb *Block) Snapshot(dir string) error

Snapshot creates snapshot of the block into dir.

func (*Block) String

func (pb *Block) String() string

func (*Block) Tombstones

func (pb *Block) Tombstones() (tombstones.Reader, error)

Tombstones returns a new TombstoneReader against the block data.

type BlockDesc

type BlockDesc struct {
	ULID    ulid.ULID `json:"ulid"`
	MinTime int64     `json:"minTime"`
	MaxTime int64     `json:"maxTime"`
}

BlockDesc describes a block by ULID and time range.

type BlockMeta

type BlockMeta struct {
	// Unique identifier for the block and its contents. Changes on compaction.
	ULID ulid.ULID `json:"ulid"`

	// MinTime and MaxTime specify the time range all samples
	// in the block are in.
	MinTime int64 `json:"minTime"`
	MaxTime int64 `json:"maxTime"`

	// Stats about the contents of the block.
	Stats BlockStats `json:"stats,omitempty"`

	// Information on compactions the block was created from.
	Compaction BlockMetaCompaction `json:"compaction"`

	// Version of the index format.
	Version int `json:"version"`
}

BlockMeta provides meta information about a block.

func CompactBlockMetas

func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta

CompactBlockMetas merges many block metas into one, combining it's source blocks together and adjusting compaction level. Min/Max time of result block meta covers all input blocks.

type BlockMetaCompaction

type BlockMetaCompaction struct {
	// Maximum number of compaction cycles any source block has
	// gone through.
	Level int `json:"level"`
	// ULIDs of all source head blocks that went into the block.
	Sources []ulid.ULID `json:"sources,omitempty"`
	// Indicates that during compaction it resulted in a block without any samples
	// so it should be deleted on the next reloadBlocks.
	Deletable bool `json:"deletable,omitempty"`
	// Short descriptions of the direct blocks that were used to create
	// this block.
	Parents []BlockDesc `json:"parents,omitempty"`
	Failed  bool        `json:"failed,omitempty"`
	// Additional information about the compaction, for example, block created from out-of-order chunks.
	Hints []string `json:"hints,omitempty"`
}

BlockMetaCompaction holds information about compactions a block went through.

func (*BlockMetaCompaction) FromOutOfOrder

func (bm *BlockMetaCompaction) FromOutOfOrder() bool

func (*BlockMetaCompaction) SetOutOfOrder

func (bm *BlockMetaCompaction) SetOutOfOrder()

type BlockPopulator

type BlockPopulator interface {
	PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
}

type BlockReader

type BlockReader interface {
	// Index returns an IndexReader over the block's data.
	Index() (IndexReader, error)

	// Chunks returns a ChunkReader over the block's data.
	Chunks() (ChunkReader, error)

	// Tombstones returns a tombstones.Reader over the block's deleted data.
	Tombstones() (tombstones.Reader, error)

	// Meta provides meta information about the block reader.
	Meta() BlockMeta

	// Size returns the number of bytes that the block takes up on disk.
	Size() int64
}

BlockReader provides reading access to a data block.

type BlockStats

type BlockStats struct {
	NumSamples    uint64 `json:"numSamples,omitempty"`
	NumSeries     uint64 `json:"numSeries,omitempty"`
	NumChunks     uint64 `json:"numChunks,omitempty"`
	NumTombstones uint64 `json:"numTombstones,omitempty"`
}

BlockStats contains stats about contents of a block.

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter is a block writer that allows appending and flushing series to disk.

func NewBlockWriter

func NewBlockWriter(logger log.Logger, dir string, blockSize int64) (*BlockWriter, error)

NewBlockWriter create a new block writer.

The returned writer accumulates all the series in the Head block until `Flush` is called.

Note that the writer will not check if the target directory exists or contains anything at all. It is the caller's responsibility to ensure that the resulting blocks do not overlap etc. Writer ensures the block flush is atomic (via rename).

func (*BlockWriter) Appender

func (w *BlockWriter) Appender(ctx context.Context) storage.Appender

Appender returns a new appender on the database. Appender can't be called concurrently. However, the returned Appender can safely be used concurrently.

func (*BlockWriter) Close

func (w *BlockWriter) Close() error

func (*BlockWriter) Flush

func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error)

Flush implements the Writer interface. This is where actual block writing happens. After flush completes, no writes can be done.

type BlocksToDeleteFunc

type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}

func DefaultBlocksToDelete

func DefaultBlocksToDelete(db *DB) BlocksToDeleteFunc

DefaultBlocksToDelete returns a filter which decides time based and size based retention from the options of the db.

type ChunkReader

type ChunkReader interface {
	// ChunkOrIterable returns the series data for the given chunks.Meta.
	// Either a single chunk will be returned, or an iterable.
	// A single chunk should be returned if chunks.Meta maps to a chunk that
	// already exists and doesn't need modifications.
	// An iterable should be returned if chunks.Meta maps to a subset of the
	// samples in a stored chunk, or multiple chunks. (E.g. OOOHeadChunkReader
	// could return an iterable where multiple histogram samples have counter
	// resets. There can only be one counter reset per histogram chunk so
	// multiple chunks would be created from the iterable in this case.)
	// Only one of chunk or iterable should be returned. In some cases you may
	// always expect a chunk to be returned. You can check that iterable is nil
	// in those cases.
	ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error)

	// Close releases all underlying resources of the reader.
	Close() error
}

ChunkReader provides reading access of serialized time series data.

type ChunkSnapshotStats

type ChunkSnapshotStats struct {
	TotalSeries int
	Dir         string
}

ChunkSnapshotStats returns stats about a created chunk snapshot.

type ChunkWriter

type ChunkWriter interface {
	// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
	// must be populated.
	// After returning successfully, the Ref fields in the ChunkMetas
	// are set and can be used to retrieve the chunks from the written data.
	WriteChunks(chunks ...chunks.Meta) error

	// Close writes any required finalization and closes the resources
	// associated with the underlying writer.
	Close() error
}

ChunkWriter serializes a time block of chunked series data.

type CircularExemplarStorage

type CircularExemplarStorage struct {
	// contains filtered or unexported fields
}

func (*CircularExemplarStorage) AddExemplar

func (*CircularExemplarStorage) Appender

func (*CircularExemplarStorage) ApplyConfig

func (ce *CircularExemplarStorage) ApplyConfig(cfg *config.Config) error

func (*CircularExemplarStorage) ExemplarQuerier

func (*CircularExemplarStorage) IterateExemplars

func (ce *CircularExemplarStorage) IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error

IterateExemplars iterates through all the exemplars from oldest to newest appended and calls the given function on all of them till the end (or) till the first function call that returns an error.

func (*CircularExemplarStorage) Querier

func (*CircularExemplarStorage) Resize

func (ce *CircularExemplarStorage) Resize(l int64) int

Resize changes the size of exemplar buffer by allocating a new buffer and migrating data to it. Exemplars are kept when possible. Shrinking will discard oldest data (in order of ingest) as needed.

func (*CircularExemplarStorage) Select

func (ce *CircularExemplarStorage) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error)

Select returns exemplars for a given set of label matchers.

func (*CircularExemplarStorage) ValidateExemplar

func (ce *CircularExemplarStorage) ValidateExemplar(l labels.Labels, e exemplar.Exemplar) error

type Compactor

type Compactor interface {
	// Plan returns a set of directories that can be compacted concurrently.
	// The directories can be overlapping.
	// Results returned when compactions are in progress are undefined.
	Plan(dir string) ([]string, error)

	// Write persists a Block into a directory.
	// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
	Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)

	// Compact runs compaction against the provided directories. Must
	// only be called concurrently with results of Plan().
	// Can optionally pass a list of already open blocks,
	// to avoid having to reopen them.
	// When resulting Block has 0 samples
	//  * No block is written.
	//  * The source dirs are marked Deletable.
	//  * Returns empty ulid.ULID{}.
	Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
}

Compactor provides compaction against an underlying storage of time series data.

type CompactorMetrics

type CompactorMetrics struct {
	Ran               prometheus.Counter
	PopulatingBlocks  prometheus.Gauge
	OverlappingBlocks prometheus.Counter
	Duration          prometheus.Histogram
	ChunkSize         prometheus.Histogram
	ChunkSamples      prometheus.Histogram
	ChunkRange        prometheus.Histogram
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB handles reads and writes of time series falling into a hashed partition of a seriedb.

func Open

func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, stats *DBStats) (db *DB, err error)

Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used.

func (*DB) Appender

func (db *DB) Appender(ctx context.Context) storage.Appender

Appender opens a new appender against the database.

func (*DB) ApplyConfig

func (db *DB) ApplyConfig(conf *config.Config) error

ApplyConfig applies a new config to the DB. Behaviour of 'OutOfOrderTimeWindow' is as follows: OOO enabled = oooTimeWindow > 0. OOO disabled = oooTimeWindow is 0. 1) Before: OOO disabled, Now: OOO enabled =>

  • A new WBL is created for the head block.
  • OOO compaction is enabled.
  • Overlapping queries are enabled.

2) Before: OOO enabled, Now: OOO enabled =>

  • Only the time window is updated.

3) Before: OOO enabled, Now: OOO disabled =>

  • Time Window set to 0. So no new OOO samples will be allowed.
  • OOO WBL will stay and will be eventually cleaned up.
  • OOO Compaction and overlapping queries will remain enabled until a restart or until all OOO samples are compacted.

4) Before: OOO disabled, Now: OOO disabled => no-op.

func (*DB) Blocks

func (db *DB) Blocks() []*Block

Blocks returns the databases persisted blocks.

func (*DB) ChunkQuerier

func (db *DB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

ChunkQuerier returns a new chunk querier over the data partition for the given time range.

func (*DB) CleanTombstones

func (db *DB) CleanTombstones() (err error)

CleanTombstones re-writes any blocks with tombstones.

func (*DB) Close

func (db *DB) Close() error

Close the partition.

func (*DB) Compact

func (db *DB) Compact(ctx context.Context) (returnErr error)

Compact data if possible. After successful compaction blocks are reloaded which will also delete the blocks that fall out of the retention window. Old blocks are only deleted on reloadBlocks based on the new block's parent information. See DB.reloadBlocks documentation for further information.

func (*DB) CompactHead

func (db *DB) CompactHead(head *RangeHead) error

CompactHead compacts the given RangeHead.

func (*DB) CompactOOOHead

func (db *DB) CompactOOOHead(ctx context.Context) error

CompactOOOHead compacts the OOO Head.

func (*DB) Delete

func (db *DB) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error

Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.

func (*DB) Dir

func (db *DB) Dir() string

Dir returns the directory of the database.

func (*DB) DisableCompactions

func (db *DB) DisableCompactions()

DisableCompactions disables auto compactions.

func (*DB) DisableNativeHistograms

func (db *DB) DisableNativeHistograms()

DisableNativeHistograms disables the native histogram feature.

func (*DB) EnableCompactions

func (db *DB) EnableCompactions()

EnableCompactions enables auto compactions.

func (*DB) EnableNativeHistograms

func (db *DB) EnableNativeHistograms()

EnableNativeHistograms enables the native histogram feature.

func (*DB) ExemplarQuerier

func (db *DB) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)

func (*DB) ForceHeadMMap

func (db *DB) ForceHeadMMap()

ForceHeadMMap is intended for use only in tests and benchmarks.

func (*DB) Head

func (db *DB) Head() *Head

Head returns the databases's head.

func (*DB) Querier

func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error)

Querier returns a new querier over the data partition for the given time range.

func (*DB) SetWriteNotified

func (db *DB) SetWriteNotified(wn wlog.WriteNotified)

func (*DB) Snapshot

func (db *DB) Snapshot(dir string, withHead bool) error

Snapshot writes the current data to the directory. If withHead is set to true it will create a new block containing all data that's currently in the memory buffer/WAL.

func (*DB) StartTime

func (db *DB) StartTime() (int64, error)

StartTime implements the Storage interface.

func (*DB) String

func (db *DB) String() string

type DBReadOnly

type DBReadOnly struct {
	// contains filtered or unexported fields
}

DBReadOnly provides APIs for read only operations on a database. Current implementation doesn't support concurrency so all API calls should happen in the same go routine.

func OpenDBReadOnly

func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error)

OpenDBReadOnly opens DB in the given directory for read only operations.

func (*DBReadOnly) Block

func (db *DBReadOnly) Block(blockID string) (BlockReader, error)

Block returns a block reader by given block id.

func (*DBReadOnly) Blocks

func (db *DBReadOnly) Blocks() ([]BlockReader, error)

Blocks returns a slice of block readers for persisted blocks.

func (*DBReadOnly) ChunkQuerier

func (db *DBReadOnly) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

ChunkQuerier loads blocks and the wal and returns a new chunk querier over the data partition for the given time range. Current implementation doesn't support multiple ChunkQueriers.

func (*DBReadOnly) Close

func (db *DBReadOnly) Close() error

Close all block readers.

func (*DBReadOnly) FlushWAL

func (db *DBReadOnly) FlushWAL(dir string) (returnErr error)

FlushWAL creates a new block containing all data that's currently in the memory buffer/WAL. Samples that are in existing blocks will not be written to the new block. Note that if the read only database is running concurrently with a writable database then writing the WAL to the database directory can race.

func (*DBReadOnly) LastBlockID

func (db *DBReadOnly) LastBlockID() (string, error)

LastBlockID returns the BlockID of latest block.

func (*DBReadOnly) Querier

func (db *DBReadOnly) Querier(mint, maxt int64) (storage.Querier, error)

Querier loads the blocks and wal and returns a new querier over the data partition for the given time range. Current implementation doesn't support multiple Queriers.

type DBStats

type DBStats struct {
	Head *HeadStats
}

DBStats contains statistics about the DB separated by component (eg. head). They are available before the DB has finished initializing.

func NewDBStats

func NewDBStats() *DBStats

NewDBStats returns a new DBStats object initialized using the new function from each component.

type DefaultBlockPopulator

type DefaultBlockPopulator struct{}

func (DefaultBlockPopulator) PopulateBlock

func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error)

PopulateBlock fills the index and chunk writers with new data gathered as the union of the provided blocks. It returns meta information for the new block. It expects sorted blocks input by mint.

type DeletedIterator

type DeletedIterator struct {
	// Iter is an Iterator to be wrapped.
	Iter chunkenc.Iterator
	// Intervals are the deletion intervals.
	Intervals tombstones.Intervals
}

DeletedIterator wraps chunk Iterator and makes sure any deleted metrics are not returned.

func (*DeletedIterator) At

func (it *DeletedIterator) At() (int64, float64)

func (*DeletedIterator) AtFloatHistogram

func (it *DeletedIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram)

func (*DeletedIterator) AtHistogram

func (it *DeletedIterator) AtHistogram() (int64, *histogram.Histogram)

func (*DeletedIterator) AtT

func (it *DeletedIterator) AtT() int64

func (*DeletedIterator) Err

func (it *DeletedIterator) Err() error

func (*DeletedIterator) Next

func (it *DeletedIterator) Next() chunkenc.ValueType

func (*DeletedIterator) Seek

type ExemplarMetrics

type ExemplarMetrics struct {
	// contains filtered or unexported fields
}

func NewExemplarMetrics

func NewExemplarMetrics(reg prometheus.Registerer) *ExemplarMetrics

type ExemplarStorage

type ExemplarStorage interface {
	storage.ExemplarQueryable
	AddExemplar(labels.Labels, exemplar.Exemplar) error
	ValidateExemplar(labels.Labels, exemplar.Exemplar) error
	IterateExemplars(f func(seriesLabels labels.Labels, e exemplar.Exemplar) error) error
}

func NewCircularExemplarStorage

func NewCircularExemplarStorage(length int64, m *ExemplarMetrics) (ExemplarStorage, error)

NewCircularExemplarStorage creates an circular in memory exemplar storage. If we assume the average case 95 bytes per exemplar we can fit 5651272 exemplars in 1GB of extra memory, accounting for the fact that this is heap allocated space. If len <= 0, then the exemplar storage is essentially a noop storage but can later be resized to store exemplars.

type Head struct {
	// contains filtered or unexported fields
}

Head handles reads and writes of time series data within a time window.

func NewHead

func NewHead(r prometheus.Registerer, l log.Logger, wal, wbl *wlog.WL, opts *HeadOptions, stats *HeadStats) (*Head, error)

NewHead opens the head block in dir.

func (*Head) AppendableMinValidTime

func (h *Head) AppendableMinValidTime() (int64, bool)

AppendableMinValidTime returns the minimum valid time for samples to be appended to the Head. Returns false if Head hasn't been initialized yet and the minimum time isn't known yet.

func (*Head) Appender

func (h *Head) Appender(_ context.Context) storage.Appender

Appender returns a new Appender on the database.

func (*Head) ApplyConfig

func (h *Head) ApplyConfig(cfg *config.Config, wbl *wlog.WL)

func (*Head) ChunkSnapshot

func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error)

ChunkSnapshot creates a snapshot of all the series and tombstones in the head. It deletes the old chunk snapshots if the chunk snapshot creation is successful.

The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written using the WAL package. N is the last WAL segment present during snapshotting and M is the offset in segment N upto which data was written.

The snapshot first contains all series (each in individual records and not sorted), followed by tombstones (a single record), and finally exemplars (>= 1 record). Exemplars are in the order they were written to the circular buffer.

func (*Head) Chunks

func (h *Head) Chunks() (ChunkReader, error)

Chunks returns a ChunkReader against the block.

func (*Head) Close

func (h *Head) Close() error

Close flushes the WAL and closes the head. It also takes a snapshot of in-memory chunks if enabled.

func (*Head) Delete

func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error

Delete all samples in the range of [mint, maxt] for series that satisfy the given label matchers.

func (*Head) DisableNativeHistograms

func (h *Head) DisableNativeHistograms()

DisableNativeHistograms disables the native histogram feature.

func (*Head) EnableNativeHistograms

func (h *Head) EnableNativeHistograms()

EnableNativeHistograms enables the native histogram feature.

func (*Head) ExemplarQuerier

func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error)

func (*Head) Index

func (h *Head) Index() (IndexReader, error)

Index returns an IndexReader against the block.

func (*Head) Init

func (h *Head) Init(minValidTime int64) error

Init loads data from the write ahead log and prepares the head for writes. It should be called before using an appender so that it limits the ingested samples to the head min valid time.

func (*Head) IsQuerierCollidingWithTruncation

func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64) (shouldClose, getNew bool, newMint int64)

IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier has to be created. In the latter case, the method also returns the new mint to be used for creating the new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.

NOTE: The querier should already be taken before calling this.

func (*Head) MaxOOOTime

func (h *Head) MaxOOOTime() int64

MaxOOOTime returns the highest timestamp on visible data in the out of order head.

func (*Head) MaxTime

func (h *Head) MaxTime() int64

MaxTime returns the highest timestamp seen in data of the head.

func (*Head) Meta

func (h *Head) Meta() BlockMeta

Meta returns meta information about the head. The head is dynamic so will return dynamic results.

func (*Head) MinOOOTime

func (h *Head) MinOOOTime() int64

MinOOOTime returns the lowest time bound on visible data in the out of order head.

func (*Head) MinTime

func (h *Head) MinTime() int64

MinTime returns the lowest time bound on visible data in the head.

func (*Head) NumSeries

func (h *Head) NumSeries() uint64

NumSeries returns the number of active series in the head.

func (*Head) OverlapsClosedInterval

func (h *Head) OverlapsClosedInterval(mint, maxt int64) bool

OverlapsClosedInterval returns true if the head overlaps [mint, maxt].

func (*Head) PostingsCardinalityStats

func (h *Head) PostingsCardinalityStats(statsByLabelName string, limit int) *index.PostingsStats

PostingsCardinalityStats returns highest cardinality stats by label and value names.

func (*Head) SetMinValidTime

func (h *Head) SetMinValidTime(minValidTime int64)

SetMinValidTime sets the minimum timestamp the head can ingest.

func (*Head) SetOutOfOrderTimeWindow

func (h *Head) SetOutOfOrderTimeWindow(oooTimeWindow int64, wbl *wlog.WL)

SetOutOfOrderTimeWindow updates the out of order related parameters. If the Head already has a WBL set, then the wbl will be ignored.

func (*Head) Size

func (h *Head) Size() int64

func (*Head) Stats

func (h *Head) Stats(statsByLabelName string, limit int) *Stats

Stats returns important current HEAD statistics. Note that it is expensive to calculate these.

func (*Head) String

func (h *Head) String() string

String returns an human readable representation of the TSDB head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.

func (*Head) Tombstones

func (h *Head) Tombstones() (tombstones.Reader, error)

Tombstones returns a new reader over the head's tombstones.

func (*Head) Truncate

func (h *Head) Truncate(mint int64) (err error)

Truncate removes old data before mint from the head and WAL.

func (*Head) WaitForAppendersOverlapping

func (h *Head) WaitForAppendersOverlapping(maxt int64)

WaitForAppendersOverlapping waits for appends overlapping maxt to finish.

func (*Head) WaitForPendingReadersForOOOChunksAtOrBefore

func (h *Head) WaitForPendingReadersForOOOChunksAtOrBefore(chunk chunks.ChunkDiskMapperRef)

WaitForPendingReadersForOOOChunksAtOrBefore is like WaitForPendingReadersInTimeRange, except it waits for queries touching OOO chunks less than or equal to chunk to finish querying.

func (*Head) WaitForPendingReadersInTimeRange

func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64)

WaitForPendingReadersInTimeRange waits for queries overlapping with given range to finish querying. The query timeout limits the max wait time of this function implicitly. The mint is inclusive and maxt is the truncation time hence exclusive.

type HeadOptions

type HeadOptions struct {
	// Runtime reloadable option. At the top of the struct for 32 bit OS:
	// https://pkg.go.dev/sync/atomic#pkg-note-BUG
	MaxExemplars atomic.Int64

	OutOfOrderTimeWindow atomic.Int64
	OutOfOrderCapMax     atomic.Int64

	// EnableNativeHistograms enables the ingestion of native histograms.
	EnableNativeHistograms atomic.Bool

	ChunkRange int64
	// ChunkDirRoot is the parent directory of the chunks directory.
	ChunkDirRoot         string
	ChunkPool            chunkenc.Pool
	ChunkWriteBufferSize int
	ChunkWriteQueueSize  int

	SamplesPerChunk int

	// StripeSize sets the number of entries in the hash map, it must be a power of 2.
	// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
	// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
	StripeSize                     int
	SeriesCallback                 SeriesLifecycleCallback
	EnableExemplarStorage          bool
	EnableMemorySnapshotOnShutdown bool

	IsolationDisabled bool

	// Maximum number of CPUs that can simultaneously processes WAL replay.
	// The default value is GOMAXPROCS.
	// If it is set to a negative value or zero, the default value is used.
	WALReplayConcurrency int
}

HeadOptions are parameters for the Head block.

func DefaultHeadOptions

func DefaultHeadOptions() *HeadOptions

type HeadStats

type HeadStats struct {
	WALReplayStatus *WALReplayStatus
}

HeadStats are the statistics for the head component of the DB.

func NewHeadStats

func NewHeadStats() *HeadStats

NewHeadStats returns a new HeadStats object.

type IndexReader

type IndexReader interface {
	// Symbols return an iterator over sorted string symbols that may occur in
	// series' labels and indices. It is not safe to use the returned strings
	// beyond the lifetime of the index reader.
	Symbols() index.StringIter

	// SortedLabelValues returns sorted possible label values.
	SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

	// LabelValues returns possible label values which may not be sorted.
	LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

	// Postings returns the postings list iterator for the label pairs.
	// The Postings here contain the offsets to the series inside the index.
	// Found IDs are not strictly required to point to a valid Series, e.g.
	// during background garbage collections.
	Postings(ctx context.Context, name string, values ...string) (index.Postings, error)

	// SortedPostings returns a postings list that is reordered to be sorted
	// by the label set of the underlying series.
	SortedPostings(index.Postings) index.Postings

	// Series populates the given builder and chunk metas for the series identified
	// by the reference.
	// Returns storage.ErrNotFound if the ref does not resolve to a known series.
	Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error

	// LabelNames returns all the unique label names present in the index in sorted order.
	LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error)

	// LabelValueFor returns label value for the given label name in the series referred to by ID.
	// If the series couldn't be found or the series doesn't have the requested label a
	// storage.ErrNotFound is returned as error.
	LabelValueFor(ctx context.Context, id storage.SeriesRef, label string) (string, error)

	// LabelNamesFor returns all the label names for the series referred to by IDs.
	// The names returned are sorted.
	LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error)

	// Close releases the underlying resources of the reader.
	Close() error
}

IndexReader provides reading access of serialized index data.

func NewOOOCompactionHeadIndexReader

func NewOOOCompactionHeadIndexReader(ch *OOOCompactionHead) IndexReader

type IndexWriter

type IndexWriter interface {
	// AddSymbol registers a single symbol.
	// Symbols must be registered in sorted order.
	AddSymbol(sym string) error

	// AddSeries populates the index writer with a series and its offsets
	// of chunks that the index can reference.
	// Implementations may require series to be insert in strictly increasing order by
	// their labels. The reference numbers are used to resolve entries in postings lists
	// that are added later.
	AddSeries(ref storage.SeriesRef, l labels.Labels, chunks ...chunks.Meta) error

	// Close writes any finalization and closes the resources associated with
	// the underlying writer.
	Close() error
}

IndexWriter serializes the index for a block of series data. The methods must be called in the order they are specified in.

type LeveledCompactor

type LeveledCompactor struct {
	// contains filtered or unexported fields
}

LeveledCompactor implements the Compactor interface.

func NewLeveledCompactor

func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)

NewLeveledCompactor returns a LeveledCompactor.

func NewLeveledCompactorWithChunkSize

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error)

func (*LeveledCompactor) Compact

func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error)

Compact creates a new block in the compactor's directory from the blocks in the provided directories.

func (*LeveledCompactor) CompactWithBlockPopulator

func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error)

func (*LeveledCompactor) Plan

func (c *LeveledCompactor) Plan(dir string) ([]string, error)

Plan returns a list of compactable blocks in the provided directory.

func (*LeveledCompactor) Write

func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)

type OOOChunk

type OOOChunk struct {
	// contains filtered or unexported fields
}

OOOChunk maintains samples in time-ascending order. Inserts for timestamps already seen, are dropped. Samples are stored uncompressed to allow easy sorting. Perhaps we can be more efficient later.

func NewOOOChunk

func NewOOOChunk() *OOOChunk

func (*OOOChunk) Insert

func (o *OOOChunk) Insert(t int64, v float64) bool

Insert inserts the sample such that order is maintained. Returns false if insert was not possible due to the same timestamp already existing.

func (*OOOChunk) NumSamples

func (o *OOOChunk) NumSamples() int

func (*OOOChunk) ToXOR

func (o *OOOChunk) ToXOR() (*chunkenc.XORChunk, error)

func (*OOOChunk) ToXORBetweenTimestamps

func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, error)

type OOOCompactionHead

type OOOCompactionHead struct {
	// contains filtered or unexported fields
}

func NewOOOCompactionHead

func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, error)

NewOOOCompactionHead does the following: 1. M-maps all the in-memory ooo chunks. 2. Compute the expected block ranges while iterating through all ooo series and store it. 3. Store the list of postings having ooo series. 4. Cuts a new WBL file for the OOO WBL. All the above together have a bit of CPU and memory overhead, and can have a bit of impact on the sample append latency. So call NewOOOCompactionHead only right before compaction.

func (*OOOCompactionHead) ChunkRange

func (ch *OOOCompactionHead) ChunkRange() int64

func (*OOOCompactionHead) Chunks

func (ch *OOOCompactionHead) Chunks() (ChunkReader, error)

func (*OOOCompactionHead) CloneForTimeRange

func (ch *OOOCompactionHead) CloneForTimeRange(mint, maxt int64) *OOOCompactionHead

CloneForTimeRange clones the OOOCompactionHead such that the IndexReader and ChunkReader obtained from this only looks at the m-map chunks within the given time ranges while not looking beyond the ch.lastMmapRef. Only the method of BlockReader interface are valid for the cloned OOOCompactionHead.

func (*OOOCompactionHead) Index

func (ch *OOOCompactionHead) Index() (IndexReader, error)

func (*OOOCompactionHead) LastMmapRef

func (ch *OOOCompactionHead) LastMmapRef() chunks.ChunkDiskMapperRef

func (*OOOCompactionHead) LastWBLFile

func (ch *OOOCompactionHead) LastWBLFile() int

func (*OOOCompactionHead) MaxTime

func (ch *OOOCompactionHead) MaxTime() int64

func (*OOOCompactionHead) Meta

func (ch *OOOCompactionHead) Meta() BlockMeta

func (*OOOCompactionHead) MinTime

func (ch *OOOCompactionHead) MinTime() int64

func (*OOOCompactionHead) Size

func (ch *OOOCompactionHead) Size() int64

func (*OOOCompactionHead) Tombstones

func (ch *OOOCompactionHead) Tombstones() (tombstones.Reader, error)

type OOOCompactionHeadIndexReader

type OOOCompactionHeadIndexReader struct {
	// contains filtered or unexported fields
}

func (*OOOCompactionHeadIndexReader) Close

func (ir *OOOCompactionHeadIndexReader) Close() error

func (*OOOCompactionHeadIndexReader) LabelNames

func (*OOOCompactionHeadIndexReader) LabelNamesFor

func (ir *OOOCompactionHeadIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error)

func (*OOOCompactionHeadIndexReader) LabelValueFor

func (*OOOCompactionHeadIndexReader) LabelValues

func (ir *OOOCompactionHeadIndexReader) LabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

func (*OOOCompactionHeadIndexReader) Postings

func (ir *OOOCompactionHeadIndexReader) Postings(_ context.Context, name string, values ...string) (index.Postings, error)

func (*OOOCompactionHeadIndexReader) PostingsForMatchers

func (ir *OOOCompactionHeadIndexReader) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error)

func (*OOOCompactionHeadIndexReader) Series

func (*OOOCompactionHeadIndexReader) SortedLabelValues

func (ir *OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

func (*OOOCompactionHeadIndexReader) SortedPostings

func (*OOOCompactionHeadIndexReader) Symbols

type OOOHeadChunkReader

type OOOHeadChunkReader struct {
	// contains filtered or unexported fields
}

func NewOOOHeadChunkReader

func NewOOOHeadChunkReader(head *Head, mint, maxt int64, isoState *oooIsolationState) *OOOHeadChunkReader

func (OOOHeadChunkReader) ChunkOrIterable

func (cr OOOHeadChunkReader) ChunkOrIterable(meta chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error)

func (OOOHeadChunkReader) Close

func (cr OOOHeadChunkReader) Close() error

type OOOHeadIndexReader

type OOOHeadIndexReader struct {
	// contains filtered or unexported fields
}

OOOHeadIndexReader implements IndexReader so ooo samples in the head can be accessed. It also has a reference to headIndexReader so we can leverage on its IndexReader implementation for all the methods that remain the same. We decided to do this to avoid code duplication. The only methods that change are the ones about getting Series and Postings.

func NewOOOHeadIndexReader

func NewOOOHeadIndexReader(head *Head, mint, maxt int64, lastGarbageCollectedMmapRef chunks.ChunkDiskMapperRef) *OOOHeadIndexReader

func (OOOHeadIndexReader) Close

func (h OOOHeadIndexReader) Close() error

func (OOOHeadIndexReader) LabelNames

func (h OOOHeadIndexReader) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, error)

LabelNames returns all the unique label names present in the head that are within the time range mint to maxt.

func (OOOHeadIndexReader) LabelNamesFor

func (h OOOHeadIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.SeriesRef) ([]string, error)

LabelNamesFor returns all the label names for the series referred to by IDs. The names returned are sorted.

func (OOOHeadIndexReader) LabelValueFor

func (h OOOHeadIndexReader) LabelValueFor(_ context.Context, id storage.SeriesRef, label string) (string, error)

LabelValueFor returns label value for the given label name in the series referred to by ID.

func (*OOOHeadIndexReader) LabelValues

func (oh *OOOHeadIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

LabelValues needs to be overridden from the headIndexReader implementation due to the check that happens at the beginning where we make sure that the query interval overlaps with the head minooot and maxooot.

func (*OOOHeadIndexReader) Postings

func (oh *OOOHeadIndexReader) Postings(ctx context.Context, name string, values ...string) (index.Postings, error)

func (*OOOHeadIndexReader) Series

func (oh *OOOHeadIndexReader) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error

func (OOOHeadIndexReader) SortedLabelValues

func (h OOOHeadIndexReader) SortedLabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error)

SortedLabelValues returns label values present in the head for the specific label name that are within the time range mint to maxt. If matchers are specified the returned result set is reduced to label values of metrics matching the matchers.

func (OOOHeadIndexReader) SortedPostings

func (h OOOHeadIndexReader) SortedPostings(p index.Postings) index.Postings

func (OOOHeadIndexReader) Symbols

func (h OOOHeadIndexReader) Symbols() index.StringIter

type OOORangeHead

type OOORangeHead struct {
	// contains filtered or unexported fields
}

OOORangeHead allows querying Head out of order samples via BlockReader interface implementation.

func NewOOORangeHead

func NewOOORangeHead(head *Head, mint, maxt int64, minRef chunks.ChunkDiskMapperRef) *OOORangeHead

func (*OOORangeHead) Chunks

func (oh *OOORangeHead) Chunks() (ChunkReader, error)

func (*OOORangeHead) Index

func (oh *OOORangeHead) Index() (IndexReader, error)

func (*OOORangeHead) MaxTime

func (oh *OOORangeHead) MaxTime() int64

func (*OOORangeHead) Meta

func (oh *OOORangeHead) Meta() BlockMeta

func (*OOORangeHead) MinTime

func (oh *OOORangeHead) MinTime() int64

func (*OOORangeHead) Size

func (oh *OOORangeHead) Size() int64

Size returns the size taken by the Head block.

func (*OOORangeHead) String

func (oh *OOORangeHead) String() string

String returns an human readable representation of the out of order range head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.

func (*OOORangeHead) Tombstones

func (oh *OOORangeHead) Tombstones() (tombstones.Reader, error)

type Options

type Options struct {
	// Segments (wal files) max size.
	// WALSegmentSize = 0, segment size is default size.
	// WALSegmentSize > 0, segment size is WALSegmentSize.
	// WALSegmentSize < 0, wal is disabled.
	WALSegmentSize int

	// MaxBlockChunkSegmentSize is the max size of block chunk segment files.
	// MaxBlockChunkSegmentSize = 0, chunk segment size is default size.
	// MaxBlockChunkSegmentSize > 0, chunk segment size is MaxBlockChunkSegmentSize.
	MaxBlockChunkSegmentSize int64

	// Duration of persisted data to keep.
	// Unit agnostic as long as unit is consistent with MinBlockDuration and MaxBlockDuration.
	// Typically it is in milliseconds.
	RetentionDuration int64

	// Maximum number of bytes in blocks to be retained.
	// 0 or less means disabled.
	// NOTE: For proper storage calculations need to consider
	// the size of the WAL folder which is not added when calculating
	// the current size of the database.
	MaxBytes int64

	// NoLockfile disables creation and consideration of a lock file.
	NoLockfile bool

	// Compaction of overlapping blocks are allowed if AllowOverlappingCompaction is true.
	// This is an optional flag for overlapping blocks.
	// The reason why this flag exists is because there are various users of the TSDB
	// that do not want vertical compaction happening on ingest time. Instead,
	// they'd rather keep overlapping blocks and let another component do the overlapping compaction later.
	// For Prometheus, this will always be true.
	AllowOverlappingCompaction bool

	// WALCompression configures the compression type to use on records in the WAL.
	WALCompression wlog.CompressionType

	// Maximum number of CPUs that can simultaneously processes WAL replay.
	// If it is <=0, then GOMAXPROCS is used.
	WALReplayConcurrency int

	// StripeSize is the size in entries of the series hash map. Reducing the size will save memory but impact performance.
	StripeSize int

	// The timestamp range of head blocks after which they get persisted.
	// It's the minimum duration of any persisted block.
	// Unit agnostic as long as unit is consistent with RetentionDuration and MaxBlockDuration.
	// Typically it is in milliseconds.
	MinBlockDuration int64

	// The maximum timestamp range of compacted blocks.
	// Unit agnostic as long as unit is consistent with MinBlockDuration and RetentionDuration.
	// Typically it is in milliseconds.
	MaxBlockDuration int64

	// HeadChunksWriteBufferSize configures the write buffer size used by the head chunks mapper.
	HeadChunksWriteBufferSize int

	// HeadChunksWriteQueueSize configures the size of the chunk write queue used in the head chunks mapper.
	HeadChunksWriteQueueSize int

	// SamplesPerChunk configures the target number of samples per chunk.
	SamplesPerChunk int

	// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
	// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
	SeriesLifecycleCallback SeriesLifecycleCallback

	// BlocksToDelete is a function which returns the blocks which can be deleted.
	// It is always the default time and size based retention in Prometheus and
	// mainly meant for external users who import TSDB.
	BlocksToDelete BlocksToDeleteFunc

	// Enables the in memory exemplar storage.
	EnableExemplarStorage bool

	// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
	EnableMemorySnapshotOnShutdown bool

	// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
	// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
	MaxExemplars int64

	// Disables isolation between reads and in-flight appends.
	IsolationDisabled bool

	// EnableNativeHistograms enables the ingestion of native histograms.
	EnableNativeHistograms bool

	// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
	// This can change during run-time, so this value from here should only be used
	// while initialising.
	OutOfOrderTimeWindow int64

	// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
	// If it is <=0, the default value is assumed.
	OutOfOrderCapMax int64
}

Options of the DB storage.

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions used for the DB. They are reasonable for setups using millisecond precision timestamps.

type Overlaps

type Overlaps map[TimeRange][]BlockMeta

Overlaps contains overlapping blocks aggregated by overlapping range.

func OverlappingBlocks

func OverlappingBlocks(bm []BlockMeta) Overlaps

OverlappingBlocks returns all overlapping blocks from given meta files.

func (Overlaps) String

func (o Overlaps) String() string

String returns human readable string form of overlapped blocks.

type RangeHead

type RangeHead struct {
	// contains filtered or unexported fields
}

RangeHead allows querying Head via an IndexReader, ChunkReader and tombstones.Reader but only within a restricted range. Used for queries and compactions.

func NewRangeHead

func NewRangeHead(head *Head, mint, maxt int64) *RangeHead

NewRangeHead returns a *RangeHead. There are no restrictions on mint/maxt.

func NewRangeHeadWithIsolationDisabled

func NewRangeHeadWithIsolationDisabled(head *Head, mint, maxt int64) *RangeHead

NewRangeHeadWithIsolationDisabled returns a *RangeHead that does not create an isolationState.

func (*RangeHead) BlockMaxTime

func (h *RangeHead) BlockMaxTime() int64

BlockMaxTime returns the max time of the potential block created from this head. It's different to MaxTime as we need to add +1 millisecond to block maxt because block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.

func (*RangeHead) Chunks

func (h *RangeHead) Chunks() (ChunkReader, error)

func (*RangeHead) Index

func (h *RangeHead) Index() (IndexReader, error)

func (*RangeHead) MaxTime

func (h *RangeHead) MaxTime() int64

MaxTime returns the max time of actual data fetch-able from the head. This controls the chunks time range which is closed [b.MinTime, b.MaxTime].

func (*RangeHead) Meta

func (h *RangeHead) Meta() BlockMeta

func (*RangeHead) MinTime

func (h *RangeHead) MinTime() int64

func (*RangeHead) NumSeries

func (h *RangeHead) NumSeries() uint64

func (*RangeHead) Size

func (h *RangeHead) Size() int64

func (*RangeHead) String

func (h *RangeHead) String() string

String returns an human readable representation of the range head. It's important to keep this function in order to avoid the struct dump when the head is stringified in errors or logs.

func (*RangeHead) Tombstones

func (h *RangeHead) Tombstones() (tombstones.Reader, error)

type SegmentWAL deprecated

type SegmentWAL struct {
	// contains filtered or unexported fields
}

SegmentWAL is a write ahead log for series data.

Deprecated: use wlog pkg combined with the record coders instead.

func OpenSegmentWAL

func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error)

OpenSegmentWAL opens or creates a write ahead log in the given directory. The WAL must be read completely before new data is written.

func (*SegmentWAL) Close

func (w *SegmentWAL) Close() error

Close syncs all data and closes the underlying resources.

func (*SegmentWAL) LogDeletes

func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error

LogDeletes write a batch of new deletes to the log.

func (*SegmentWAL) LogSamples

func (w *SegmentWAL) LogSamples(samples []record.RefSample) error

LogSamples writes a batch of new samples to the log.

func (*SegmentWAL) LogSeries

func (w *SegmentWAL) LogSeries(series []record.RefSeries) error

LogSeries writes a batch of new series labels to the log. The series have to be ordered.

func (*SegmentWAL) Reader

func (w *SegmentWAL) Reader() WALReader

Reader returns a new reader over the write ahead log data. It must be completely consumed before writing to the WAL.

func (*SegmentWAL) Sync

func (w *SegmentWAL) Sync() error

Sync flushes the changes to disk.

func (*SegmentWAL) Truncate

func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool) error

Truncate deletes the values prior to mint and the series which the keep function does not indicate to preserve.

type SeriesLifecycleCallback

type SeriesLifecycleCallback interface {
	// PreCreation is called before creating a series to indicate if the series can be created.
	// A non nil error means the series should not be created.
	PreCreation(labels.Labels) error
	// PostCreation is called after creating a series to indicate a creation of series.
	PostCreation(labels.Labels)
	// PostDeletion is called after deletion of series.
	PostDeletion(map[chunks.HeadSeriesRef]labels.Labels)
}

SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series. It is always a no-op in Prometheus and mainly meant for external users who import TSDB. All the callbacks should be safe to be called concurrently. It is up to the user to implement soft or hard consistency by making the callbacks atomic or non-atomic. Atomic callbacks can cause degradation performance.

type Stats

type Stats struct {
	NumSeries         uint64
	MinTime, MaxTime  int64
	IndexPostingStats *index.PostingsStats
}

type TimeRange

type TimeRange struct {
	Min, Max int64
}

TimeRange specifies minTime and maxTime range.

type WAL deprecated

type WAL interface {
	Reader() WALReader
	LogSeries([]record.RefSeries) error
	LogSamples([]record.RefSample) error
	LogDeletes([]tombstones.Stone) error
	Truncate(mint int64, keep func(uint64) bool) error
	Close() error
}

WAL is a write ahead log that can log new series labels and samples. It must be completely read before new entries are logged.

Deprecated: use wlog pkg combined with the record codex instead.

type WALEntryType

type WALEntryType uint8

WALEntryType indicates what data a WAL entry contains.

const (
	WALEntrySymbols WALEntryType = 1
	WALEntrySeries  WALEntryType = 2
	WALEntrySamples WALEntryType = 3
	WALEntryDeletes WALEntryType = 4
)

Entry types in a segment file.

type WALReader

type WALReader interface {
	Read(
		seriesf func([]record.RefSeries),
		samplesf func([]record.RefSample),
		deletesf func([]tombstones.Stone),
	) error
}

WALReader reads entries from a WAL.

type WALReplayStatus

type WALReplayStatus struct {
	sync.RWMutex
	Min     int
	Max     int
	Current int
}

WALReplayStatus contains status information about the WAL replay.

func (*WALReplayStatus) GetWALReplayStatus

func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus

GetWALReplayStatus returns the WAL replay status information.

Directories

Path Synopsis
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
Package fileutil provides utility methods used when dealing with the filesystem in tsdb.
Package goversion enforces the go version supported by the tsdb module.
Package goversion enforces the go version supported by the tsdb module.
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.
Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL