phlaredb

package
v1.0.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: AGPL-3.0 Imports: 62 Imported by: 0

Documentation

Overview

nolint unused

nolint unused

nolint unused

nolint unused

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Compact

func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Meta, err error)

func InRange

func InRange(q Querier, start, end model.Time) bool

func MergeProfilesLabels

func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error

func MergeProfilesPprof

func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error

func MergeProfilesStacktraces

func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error

func NewSingleBlockQuerierFromMeta

func NewSingleBlockQuerierFromMeta(phlarectx context.Context, bucketReader phlareobj.Bucket, meta *block.Meta) *singleBlockQuerier

func PostingsForMatchers

func PostingsForMatchers(ix IndexReader, shard *index.ShardAnnotation, ms ...*labels.Matcher) (index.Postings, error)

PostingsForMatchers assembles a single postings iterator against the index reader based on the given matchers. The resulting postings are not ordered by series.

func SelectMatchingProfiles

func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfilesRequest, queriers Queriers) ([]iter.Iterator[Profile], error)

SelectMatchingProfiles returns a list iterator of profiles matching the given request.

func SplitFiltersAndMatchers

func SplitFiltersAndMatchers(allMatchers []*labels.Matcher) (filters, matchers []*labels.Matcher)

SplitFiltersAndMatchers splits empty matchers off, which are treated as filters, see #220

Types

type BidiServerMerge

type BidiServerMerge[Res any, Req any] interface {
	Send(Res) error
	Receive() (Req, error)
}

type BlockGetter

type BlockGetter func(ctx context.Context, start, end model.Time) (Queriers, error)

type BlockInfo

type BlockInfo struct {
	ID          ulid.ULID
	MinTime     model.Time
	MaxTime     model.Time
	Profiles    TableInfo
	Stacktraces TableInfo
	Locations   TableInfo
	Functions   TableInfo
	Mappings    TableInfo
	Strings     TableInfo
	Series      uint64
}

type BlockProfile

type BlockProfile struct {
	RowNum int64
	// contains filtered or unexported fields
}

func (BlockProfile) Fingerprint

func (p BlockProfile) Fingerprint() model.Fingerprint

func (BlockProfile) Labels

func (p BlockProfile) Labels() phlaremodel.Labels

func (BlockProfile) RowNumber

func (p BlockProfile) RowNumber() int64

func (BlockProfile) StacktracePartition

func (p BlockProfile) StacktracePartition() uint64

func (BlockProfile) Timestamp

func (p BlockProfile) Timestamp() model.Time

type BlockQuerier

type BlockQuerier struct {
	// contains filtered or unexported fields
}

func NewBlockQuerier

func NewBlockQuerier(phlarectx context.Context, bucketReader phlareobj.Bucket) *BlockQuerier

func (*BlockQuerier) AddBlockQuerierByMeta

func (b *BlockQuerier) AddBlockQuerierByMeta(m *block.Meta)

func (*BlockQuerier) BlockInfo

func (b *BlockQuerier) BlockInfo() []BlockInfo

func (*BlockQuerier) BlockMetas

func (b *BlockQuerier) BlockMetas(ctx context.Context) (metas []*block.Meta, _ error)

func (*BlockQuerier) Close

func (b *BlockQuerier) Close() error

func (*BlockQuerier) Queriers

func (b *BlockQuerier) Queriers() Queriers

func (*BlockQuerier) Sync

func (b *BlockQuerier) Sync(ctx context.Context) error

Sync gradually scans the available blocks. If there are any changes to the last run it will Open/Close new/no longer existing ones.

type BlockReader

type BlockReader interface {
	Meta() block.Meta
	Profiles() []parquet.RowGroup
	Index() IndexReader
	Symbols() SymbolsReader
}

type Config

type Config struct {
	DataPath string `yaml:"data_path,omitempty"`
	// Blocks are generally cut once they reach 1000M of memory size, this will setup an upper limit to the duration of data that a block has that is cut by the ingester.
	MaxBlockDuration time.Duration `yaml:"max_block_duration,omitempty"`

	// TODO: docs
	RowGroupTargetSize uint64 `yaml:"row_group_target_size"`

	Parquet *ParquetConfig `yaml:"-"` // Those configs should not be exposed to the user, rather they should be determined by phlare itself. Currently, they are solely used for test cases.
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)
type Head struct {
	// contains filtered or unexported fields
}

func NewHead

func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Head, error)

func (*Head) Bounds

func (h *Head) Bounds() (mint, maxt model.Time)

func (*Head) Flush

func (h *Head) Flush(ctx context.Context) error

Flush closes the head and writes data to disk. No ingestion requests should be made concurrently with the call, or after it returns. The call is thread-safe for reads.

func (*Head) Ingest

func (h *Head) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, externalLabels ...*typesv1.LabelPair) error

func (*Head) LabelNames

func (h *Head) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)

LabelNames returns the possible label values for a given label name.

func (*Head) LabelValues

func (h *Head) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)

LabelValues returns the possible label values for a given label name.

func (*Head) MemorySize

func (h *Head) MemorySize() uint64

func (*Head) Move

func (h *Head) Move() error

Move moves the head directory to local blocks. The call is not thread-safe: no concurrent reads and writes are allowed.

After the call, head in-memory representation is not valid and should not be accessed for querying.

func (*Head) ProfileTypes

func (h *Head) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)

ProfileTypes returns the possible profile types.

func (*Head) Queriers

func (h *Head) Queriers() Queriers

Returns underlying queries, the queriers should be roughly ordered in TS increasing order

func (*Head) Series

func (h *Head) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error)

func (*Head) Size

func (h *Head) Size() uint64

func (*Head) Sort

func (h *Head) Sort(in []Profile) []Profile

func (*Head) SymDBFiles

func (h *Head) SymDBFiles() ([]block.File, error)

SymDBFiles lists files in symdb folder

type Helper

type Helper[M Models, K comparable] interface {
	// contains filtered or unexported methods
}

type IndexReader

type IndexReader interface {
	// Bounds returns the earliest and latest samples in the index
	Bounds() (int64, int64)

	Checksum() uint32

	// Symbols return an iterator over sorted string symbols that may occur in
	// series' labels and indices. It is not safe to use the returned strings
	// beyond the lifetime of the index reader.
	Symbols() index.StringIter

	// SortedLabelValues returns sorted possible label values.
	SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// LabelValues returns possible label values which may not be sorted.
	LabelValues(name string, matchers ...*labels.Matcher) ([]string, error)

	// Postings returns the postings list iterator for the label pairs.
	// The Postings here contain the offsets to the series inside the index.
	// Found IDs are not strictly required to point to a valid Series, e.g.
	// during background garbage collections. Input values must be sorted.
	Postings(name string, shard *index.ShardAnnotation, values ...string) (index.Postings, error)

	// Series populates the given labels and chunk metas for the series identified
	// by the reference.
	// Returns storage.ErrNotFound if the ref does not resolve to a known series.
	Series(ref storage.SeriesRef, lset *phlaremodel.Labels, chks *[]index.ChunkMeta) (uint64, error)

	// LabelNames returns all the unique label names present in the index in sorted order.
	LabelNames(matchers ...*labels.Matcher) ([]string, error)

	// LabelValueFor returns label value for the given label name in the series referred to by ID.
	// If the series couldn't be found or the series doesn't have the requested label a
	// storage.ErrNotFound is returned as error.
	LabelValueFor(id storage.SeriesRef, label string) (string, error)

	// LabelNamesFor returns all the label names for the series referred to by IDs.
	// The names returned are sorted.
	LabelNamesFor(ids ...storage.SeriesRef) ([]string, error)

	// Close releases the underlying resources of the reader.
	Close() error
}

IndexReader provides reading access of serialized index data.

type ParquetConfig

type ParquetConfig struct {
	MaxBufferRowCount int
	MaxRowGroupBytes  uint64 // This is the maximum row group size in bytes that the raw data uses in memory.
	MaxBlockBytes     uint64 // This is the size of all parquet tables in memory after which a new block is cut
}

type PhlareDB

type PhlareDB struct {
	services.Service
	// contains filtered or unexported fields
}

func New

func New(phlarectx context.Context, cfg Config, limiter TenantLimiter, fs phlareobj.Bucket) (*PhlareDB, error)

func (*PhlareDB) BlockMetas

func (f *PhlareDB) BlockMetas(ctx context.Context) ([]*block.Meta, error)

func (*PhlareDB) Close

func (f *PhlareDB) Close() error

func (*PhlareDB) Evict

func (f *PhlareDB) Evict(blockID ulid.ULID, fn func() error) (bool, error)

Evict removes the given local block from the PhlareDB. Note that the block files are not deleted from the disk. No evictions should be done after and during the Close call.

func (*PhlareDB) Flush

func (f *PhlareDB) Flush(ctx context.Context) (err error)

func (*PhlareDB) Ingest

func (f *PhlareDB) Ingest(ctx context.Context, p *profilev1.Profile, id uuid.UUID, externalLabels ...*typesv1.LabelPair) (err error)

func (*PhlareDB) LabelNames

func (f *PhlareDB) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (resp *connect.Response[typesv1.LabelNamesResponse], err error)

LabelNames returns the possible label names.

func (*PhlareDB) LabelValues

func (f *PhlareDB) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (resp *connect.Response[typesv1.LabelValuesResponse], err error)

LabelValues returns the possible label values for a given label name.

func (*PhlareDB) LocalDataPath

func (f *PhlareDB) LocalDataPath() string

func (*PhlareDB) MergeProfilesLabels

func (f *PhlareDB) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error

func (*PhlareDB) MergeProfilesPprof

func (f *PhlareDB) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error

func (*PhlareDB) MergeProfilesStacktraces

func (f *PhlareDB) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error

func (*PhlareDB) ProfileTypes

func (f *PhlareDB) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (resp *connect.Response[ingestv1.ProfileTypesResponse], err error)

ProfileTypes returns the possible profile types.

func (*PhlareDB) Series

func (f *PhlareDB) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (resp *connect.Response[ingestv1.SeriesResponse], err error)

Series returns labels series for the given set of matchers.

type Profile

type Profile interface {
	StacktracePartition() uint64
	Timestamp() model.Time
	Fingerprint() model.Fingerprint
	Labels() phlaremodel.Labels
}

type ProfileSelectorIterator

type ProfileSelectorIterator struct {
	// contains filtered or unexported fields
}

func NewProfileSelectorIterator

func NewProfileSelectorIterator() *ProfileSelectorIterator

func (*ProfileSelectorIterator) At

func (*ProfileSelectorIterator) Close

func (it *ProfileSelectorIterator) Close() error

func (*ProfileSelectorIterator) Err

func (it *ProfileSelectorIterator) Err() error

func (*ProfileSelectorIterator) Next

func (it *ProfileSelectorIterator) Next() bool

func (*ProfileSelectorIterator) Push

func (it *ProfileSelectorIterator) Push(batch []Profile)

type ProfileWithIndex

type ProfileWithIndex struct {
	Profile
	Index int
}

type ProfileWithLabels

type ProfileWithLabels struct {
	// contains filtered or unexported fields
}

func (ProfileWithLabels) Fingerprint

func (p ProfileWithLabels) Fingerprint() model.Fingerprint

func (ProfileWithLabels) Labels

func (ProfileWithLabels) Samples

func (p ProfileWithLabels) Samples() schemav1.Samples

func (ProfileWithLabels) StacktracePartition

func (p ProfileWithLabels) StacktracePartition() uint64

func (ProfileWithLabels) Timestamp

func (p ProfileWithLabels) Timestamp() model.Time

func (ProfileWithLabels) Total

func (p ProfileWithLabels) Total() int64

type Querier

type Querier interface {
	Bounds() (model.Time, model.Time)
	SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error)
	MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error)
	MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error)
	MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error)
	Open(ctx context.Context) error
	// Sorts profiles for retrieval.
	Sort([]Profile) []Profile
}

type Queriers

type Queriers []Querier

func (Queriers) ForTimeRange

func (queriers Queriers) ForTimeRange(_ context.Context, start, end model.Time) (Queriers, error)

func (Queriers) Open

func (queriers Queriers) Open(ctx context.Context) error

func (Queriers) SelectMatchingProfiles

func (queriers Queriers) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error)

type ResultWithRowNum

type ResultWithRowNum[M any] struct {
	Result M
	RowNum int64
}

type SeriesIterator

type SeriesIterator struct {
	iter.Iterator[*schemav1.InMemoryProfile]
	// contains filtered or unexported fields
}

func NewSeriesIterator

func NewSeriesIterator(labels phlaremodel.Labels, fingerprint model.Fingerprint, it iter.Iterator[*schemav1.InMemoryProfile]) *SeriesIterator

func (*SeriesIterator) At

func (it *SeriesIterator) At() Profile

func (*SeriesIterator) Next

func (it *SeriesIterator) Next() bool

type Source

type Source interface {
	Schema() *parquet.Schema
	RowGroups() []parquet.RowGroup
}

type StacktraceDB

type StacktraceDB interface {
	Open(ctx context.Context) error
	Close() error

	// Load the database into memory entirely.
	// This method is used at compaction.
	Load(context.Context) error
	WriteStats(partition uint64, s *symdb.Stats)

	Resolve(ctx context.Context, partition uint64, locs symdb.StacktraceInserter, stacktraceIDs []uint32) error
}

type SymbolsAppender

type SymbolsAppender interface {
	AppendStacktraces([]uint32, []*schemav1.Stacktrace)
	AppendLocations([]uint32, []*schemav1.InMemoryLocation)
	AppendMappings([]uint32, []*schemav1.InMemoryMapping)
	AppendFunctions([]uint32, []*schemav1.InMemoryFunction)
	AppendStrings([]uint32, []string)
}

type SymbolsReader

type SymbolsReader interface {
	SymbolsResolver(partition uint64) (SymbolsResolver, error)
}

type SymbolsResolver

type SymbolsResolver interface {
	ResolveStacktraces(ctx context.Context, dst symdb.StacktraceInserter, stacktraces []uint32) error

	Locations(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryLocation]
	Mappings(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryMapping]
	Functions(iter.Iterator[uint32]) iter.Iterator[*schemav1.InMemoryFunction]
	Strings(iter.Iterator[uint32]) iter.Iterator[string]

	WriteStats(*symdb.Stats)
}

type SymbolsWriter

type SymbolsWriter interface {
	SymbolsAppender(partition uint64) (SymbolsAppender, error)
}

type Table

type Table interface {
	Name() string
	Size() uint64       // Size estimates the uncompressed byte size of the table in memory and on disk.
	MemorySize() uint64 // MemorySize estimates the uncompressed byte size of the table in memory.
	Init(path string, cfg *ParquetConfig, metrics *headMetrics) error
	Flush(context.Context) (numRows uint64, numRowGroups uint64, err error)
	Close() error
}

type TableInfo

type TableInfo struct {
	Rows      uint64
	RowGroups uint64
	Bytes     uint64
}

type TenantLimiter

type TenantLimiter interface {
	AllowProfile(fp model.Fingerprint, lbs phlaremodel.Labels, tsNano int64) error
	Stop()
}

Directories

Path Synopsis
schemas
v1
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL