Documentation ¶
Overview ¶
Package tsdb implements a time-series-based storage engine. It provides:
- Partition data based on a time axis.
- Sharding data based on a series id which represents a unique entity of stream/measure
- Retrieving data based on index.Filter.
- Cleaning expired data, or the data retention.
Index ¶
- Variables
- func GlobalSeriesID(scope Entry) common.SeriesID
- func Hash(entry []byte) []byte
- func HashEntity(entity Entity) []byte
- func MarshalEntityValues(evs EntityValues) ([]byte, error)
- func SeriesID(entity Entity) common.SeriesID
- type BlockID
- type BlockState
- type Buffer
- type BufferSupplier
- type CompressionMethod
- type CompressionType
- type Database
- type DatabaseOpts
- type EncodingMethod
- type Entity
- type EntityValue
- type EntityValues
- type Entry
- type GlobalItemID
- type IndexDatabase
- type IndexGranularity
- type IndexWriter
- type IndexWriterBuilder
- type IntervalRule
- type IntervalUnit
- type InvertedIndexOpts
- type Item
- type Iterator
- type OrderBy
- type Path
- type SectionID
- type Seeker
- type SeekerBuilder
- type Series
- type SeriesDatabase
- type SeriesList
- type SeriesSpan
- type Shard
- type ShardState
- type Supplier
- type TSTable
- type TSTableFactory
- type Writer
- type WriterBuilder
Constants ¶
This section is empty.
Variables ¶
var ( // ErrUnknownShard indicates that the shard is not found. ErrUnknownShard = errors.New("unknown shard") // OptionsKey is the key of options in context. OptionsKey = contextOptionsKey{} )
var AnyEntry = Entry(nil)
AnyEntry is the `*` for a regular expression. It could match "any" Entry in an Entity.
var ( // ErrEmptySeriesSpan hints there is no any data blocks based on the input time range. ErrEmptySeriesSpan = errors.New("there is no data in such time range") )
Functions ¶
func GlobalSeriesID ¶
GlobalSeriesID encodes Entry to common.SeriesID.
func HashEntity ¶
HashEntity runs hash function (e.g. with xxhash algorithm) on each segment of the Entity, and concatenates all uint64 in byte array. So the return length of the byte array will be 8 (every uint64 has 8 bytes) * length of the input.
func MarshalEntityValues ¶ added in v0.3.0
func MarshalEntityValues(evs EntityValues) ([]byte, error)
MarshalEntityValues encodes EntityValues to bytes.
Types ¶
type BlockState ¶
BlockState is a sample of a block's runtime state.
type Buffer ¶ added in v0.4.0
type Buffer struct {
// contains filtered or unexported fields
}
Buffer is an exported struct that represents a buffer composed of multiple shard buckets.
func NewBuffer ¶ added in v0.4.0
func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int) (*Buffer, error)
NewBuffer creates a new Buffer instance with the given parameters.
func NewBufferWithWal ¶ added in v0.5.0
func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize, writeConcurrency, numShards int, enableWal bool, walPath string, ) (*Buffer, error)
NewBufferWithWal creates a new Buffer instance with the given parameters.
func (*Buffer) Close ¶ added in v0.4.0
Close gracefully closes the Buffer and ensures that all pending operations are completed.
func (*Buffer) Read ¶ added in v0.4.0
Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
func (*Buffer) Register ¶ added in v0.5.0
Register registers a callback function that will be called when a shard bucket is flushed.
func (*Buffer) Unregister ¶ added in v0.5.0
Unregister unregisters a callback function that will be called when a shard bucket is flushed.
type BufferSupplier ¶ added in v0.5.0
type BufferSupplier struct {
// contains filtered or unexported fields
}
BufferSupplier lends a Buffer to a caller and returns it when the caller is done with it.
func NewBufferSupplier ¶ added in v0.5.0
func NewBufferSupplier(l *logger.Logger, p common.Position, writeConcurrency, numShards int, enableWAL bool, path string) *BufferSupplier
NewBufferSupplier creates a new BufferSupplier instance with the given parameters.
func (*BufferSupplier) Borrow ¶ added in v0.5.0
func (b *BufferSupplier) Borrow(bufferName, name string, bufferSize int, onFlushFn onFlush) (buffer *Buffer, err error)
Borrow borrows a Buffer from the BufferSupplier.
func (*BufferSupplier) Close ¶ added in v0.5.0
func (b *BufferSupplier) Close() error
Close closes all Buffers in the BufferSupplier.
func (*BufferSupplier) Return ¶ added in v0.5.0
func (b *BufferSupplier) Return(bufferName, name string)
Return returns a Buffer to the BufferSupplier.
func (*BufferSupplier) Volume ¶ added in v0.5.0
func (b *BufferSupplier) Volume() int
Volume returns the number of Buffers in the BufferSupplier.
type CompressionMethod ¶ added in v0.3.0
type CompressionMethod struct { Type CompressionType ChunkSizeInBytes int }
CompressionMethod denotes how to compress a single chunk.
type CompressionType ¶ added in v0.3.0
type CompressionType int
CompressionType specifies how a chunk should be compressed.
const ( // CompressionTypeNone mode indicates that a chunk is not compressed. CompressionTypeNone CompressionType = iota // CompressionTypeZSTD mode indicates that a chunk is compressed using CompressionTypeZSTD algorithm. CompressionTypeZSTD )
type Database ¶
type Database interface { io.Closer CreateShardsAndGetByID(id common.ShardID) (Shard, error) Shards() []Shard Shard(id common.ShardID) (Shard, error) }
Database allows listing and getting shard details.
func OpenDatabase ¶
func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error)
OpenDatabase returns a new tsdb runtime. This constructor will create a new database if it's absent, or load an existing one.
type DatabaseOpts ¶
type DatabaseOpts struct { TSTableFactory TSTableFactory Location string SegmentInterval IntervalRule BlockInterval IntervalRule TTL IntervalRule BlockInvertedIndex InvertedIndexOpts SeriesMemSize run.Bytes GlobalIndexMemSize run.Bytes IndexGranularity IndexGranularity ShardNum uint32 EnableGlobalIndex bool EnableWAL bool }
DatabaseOpts wraps options to create a tsdb.
type EncodingMethod ¶
type EncodingMethod struct { EncoderPool encoding.SeriesEncoderPool DecoderPool encoding.SeriesDecoderPool ChunkSizeInBytes int }
EncodingMethod wraps encoder/decoder pools to flush/compact data on disk.
type Entity ¶
type Entity []Entry
Entity denotes an identity of a Series. It defined by Stream or Measure schema.
type EntityValue ¶ added in v0.3.0
EntityValue represents the value of a tag which is a part of an entity.
func Int64Value ¶ added in v0.3.0
func Int64Value(v int64) EntityValue
Int64Value returns an EntityValue which wraps a int64 value.
func StrValue ¶ added in v0.3.0
func StrValue(v string) EntityValue
StrValue returns an EntityValue which wraps a string value.
type EntityValues ¶ added in v0.3.0
type EntityValues []EntityValue
EntityValues is the encoded Entity.
func DecodeEntityValues ¶ added in v0.3.0
func DecodeEntityValues(tvv []*modelv1.TagValue) (result EntityValues)
DecodeEntityValues decodes tag values to EntityValues.
func UnmarshalEntityValues ¶ added in v0.3.0
func UnmarshalEntityValues(evs []byte) (result EntityValues, err error)
UnmarshalEntityValues decodes EntityValues from bytes.
func (EntityValues) Encode ¶ added in v0.3.0
func (evs EntityValues) Encode() (result []*modelv1.TagValue)
Encode EntityValues to tag values.
func (EntityValues) Prepend ¶ added in v0.3.0
func (evs EntityValues) Prepend(scope EntityValue) EntityValues
Prepend inserts an EntityValue before the first EntityValue as the prefix.
func (EntityValues) String ¶ added in v0.3.0
func (evs EntityValues) String() string
String outputs the string represent of an EntityValue.
func (EntityValues) ToEntity ¶ added in v0.3.0
func (evs EntityValues) ToEntity() (result Entity, err error)
ToEntity transforms EntityValues to Entity.
type Entry ¶
type Entry []byte
Entry is an element in an Entity.
func EntityValueToEntry ¶ added in v0.3.0
func EntityValueToEntry(ev EntityValue) (Entry, error)
EntityValueToEntry transforms EntityValue to Entry.
type GlobalItemID ¶
type GlobalItemID struct { ShardID common.ShardID SeriesID common.SeriesID ID common.ItemID // contains filtered or unexported fields }
GlobalItemID is the top level identity of an item. The item could be retrieved by a GlobalItemID in a tsdb.
type IndexDatabase ¶
type IndexDatabase interface { WriterBuilder() IndexWriterBuilder Seek(field index.Field) ([]GlobalItemID, error) }
IndexDatabase allows stocking index data.
type IndexGranularity ¶ added in v0.4.0
type IndexGranularity int
IndexGranularity denotes the granularity of the local index.
const ( IndexGranularityBlock IndexGranularity = iota IndexGranularitySeries )
The options of the local index granularity.
type IndexWriter ¶
type IndexWriter interface { WriteLSMIndex(field []index.Field) error WriteInvertedIndex(field []index.Field) error }
IndexWriter allows ingesting index data.
func NewSeriesIndexWriter ¶ added in v0.4.0
func NewSeriesIndexWriter(seriesID common.SeriesID, seriesDB SeriesDatabase) IndexWriter
NewSeriesIndexWriter returns a new series index writer.
type IndexWriterBuilder ¶
type IndexWriterBuilder interface { Scope(scope Entry) IndexWriterBuilder Time(ts time.Time) IndexWriterBuilder GlobalItemID(itemID GlobalItemID) IndexWriterBuilder Build() (IndexWriter, error) }
IndexWriterBuilder is a helper to build IndexWriter.
type IntervalRule ¶
type IntervalRule struct { Unit IntervalUnit Num int }
IntervalRule defines a length of two points in time.
type IntervalUnit ¶
type IntervalUnit int
IntervalUnit denotes the unit of a time point.
const ( HOUR IntervalUnit = iota DAY )
Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.
func (IntervalUnit) String ¶
func (iu IntervalUnit) String() string
type InvertedIndexOpts ¶ added in v0.3.0
type InvertedIndexOpts struct {
BatchWaitSec int64
}
InvertedIndexOpts wraps options to create the block inverted index.
type Item ¶
type Item interface { Family(family []byte) ([]byte, error) Val() ([]byte, error) ID() common.ItemID SortedField() []byte Time() uint64 }
Item allows retrieving raw data from an item.
type OrderBy ¶ added in v0.4.0
type OrderBy struct { Index *databasev1.IndexRule Sort modelv1.Sort }
OrderBy specifies the order of the result.
type Path ¶
type Path struct {
// contains filtered or unexported fields
}
Path denotes a expression to match a Series. It supports the fuzzy matching more than EQ by setting an entry to AnyEntry.
type SectionID ¶ added in v0.3.0
type SectionID uint32
SectionID is the kind of a block/segment.
func GenerateInternalID ¶
func GenerateInternalID(unit IntervalUnit, suffix int) SectionID
GenerateInternalID returns a identity of a section(segment or block) based on IntervalRule.
type SeekerBuilder ¶
type SeekerBuilder interface { Filter(predicator index.Filter) SeekerBuilder OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) SeekerBuilder OrderByTime(order modelv1.Sort) SeekerBuilder Build() (Seeker, error) }
SeekerBuilder a helper to build a Seeker.
type Series ¶
type Series interface { ID() common.SeriesID Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error) Create(ctx context.Context, t time.Time) (SeriesSpan, error) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error) String() string }
Series denotes a series of data points group by a common.SeriesID common.SeriesID is encoded by a entity defined by Stream or Measure.
type SeriesDatabase ¶
type SeriesDatabase interface { io.Closer GetByID(id common.SeriesID) (Series, error) Get(key []byte, entityValues EntityValues) (Series, error) List(ctx context.Context, path Path) (SeriesList, error) Search(ctx context.Context, path Path, filter index.Filter, order *OrderBy) (SeriesList, error) SizeOnDisk() int64 // contains filtered or unexported methods }
SeriesDatabase allows retrieving series.
type SeriesList ¶
type SeriesList []Series
SeriesList is a collection of Series.
func (SeriesList) Len ¶
func (a SeriesList) Len() int
func (SeriesList) Less ¶
func (a SeriesList) Less(i, j int) bool
func (SeriesList) Merge ¶ added in v0.2.0
func (a SeriesList) Merge(other SeriesList) SeriesList
Merge other SeriesList with this one to create a new SeriesList.
func (SeriesList) Swap ¶
func (a SeriesList) Swap(i, j int)
type SeriesSpan ¶
type SeriesSpan interface { io.Closer WriterBuilder() WriterBuilder SeekerBuilder() SeekerBuilder }
SeriesSpan is a span in a time series. It contains data blocks in such time range.
type Shard ¶
type Shard interface { io.Closer ID() common.ShardID Series() SeriesDatabase Index() IndexDatabase State() ShardState // Only works with MockClock TriggerSchedule(task string) bool }
Shard allows accessing data of tsdb.
func NewScopedShard ¶
NewScopedShard returns a shard in a scope.
type ShardState ¶
type ShardState struct { Blocks []BlockState OpenBlocks []BlockID StrategyManagers []string }
ShardState is a sample of a shard's runtime state.
type Supplier ¶
type Supplier interface {
SupplyTSDB() Database
}
Supplier allows getting a tsdb's runtime.
type TSTable ¶ added in v0.4.0
type TSTable interface { // Put a value with a timestamp/version Put(key, val []byte, ts time.Time) error // Get a value by its key and timestamp/version Get(key []byte, ts time.Time) ([]byte, error) // CollectStats collects statistics of the underlying storage. CollectStats() *badger.Statistics // SizeOnDisk returns the size of the underlying storage. SizeOnDisk() int64 io.Closer }
TSTable is time series table.
type TSTableFactory ¶ added in v0.4.0
type TSTableFactory interface { // NewTSTable creates a new TSTable. NewTSTable(bufferSupplier *BufferSupplier, root string, position common.Position, l *logger.Logger, timeRange timestamp.TimeRange) (TSTable, error) }
TSTableFactory is the factory of TSTable.
type Writer ¶
type Writer interface { IndexWriter Write() (GlobalItemID, error) ItemID() GlobalItemID String() string }
Writer allow ingesting data into a tsdb.
type WriterBuilder ¶
type WriterBuilder interface { Family(name []byte, val []byte) WriterBuilder Time(ts time.Time) WriterBuilder Val(val []byte) WriterBuilder Build() (Writer, error) }
WriterBuilder is a helper to build a Writer.