tsdb

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2024 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ShardSegmentIndicator

func ShardSegmentIndicator(database string, shardID models.ShardID, interval timeutil.Interval, name string) string

ShardSegmentIndicator returns the segment name indicator information.

func ShardSegmentPath

func ShardSegmentPath(database string, shardID models.ShardID, interval timeutil.Interval) string

ShardSegmentPath returns segment path in shard dir.

Types

type DataFamily

type DataFamily interface {
	// Indicator returns data family indicator's string.
	Indicator() string
	// Shard returns shard.
	Shard() Shard
	// Interval returns the interval data family's interval
	Interval() timeutil.Interval
	// FamilyTime returns the current family's time.
	FamilyTime() int64
	// TimeRange returns the data family's base time range
	TimeRange() timeutil.TimeRange
	// Family returns the raw kv family
	Family() kv.Family
	// WriteRows writes metric rows with same family in batch.
	WriteRows(rows []metric.StorageRow) error
	// ValidateSequence validates replica sequence if valid.
	ValidateSequence(leader int32, seq int64) bool
	// CommitSequence commits written sequence after write data.
	CommitSequence(leader int32, seq int64)
	// AckSequence acknowledges sequence after memory database flush successfully.
	AckSequence(leader int32, fn func(seq int64))

	// NeedFlush checks if memory database need to flush.
	NeedFlush() bool
	// IsFlushing returns it has flush job doing in background.
	IsFlushing() bool
	// Flush flushes memory database.
	Flush() error
	// MemDBSize returns memory database heap size.
	MemDBSize() int64

	// GetState returns the current state include memory database state.
	GetState() models.DataFamilyState
	// Evict evicts family if long term no data write.
	Evict()
	// Compact compacts all data if long term no data write.
	Compact()
	// Retain increments write ref count
	Retain()
	// Release decrements write ref count,
	// if ref==0, no data will write this family.
	Release()

	// DataFilter filters data under data family based on query condition
	flow.DataFilter
	io.Closer
}

DataFamily represents a storage unit for time series data, support multi-version.

type DataFlushChecker

type DataFlushChecker interface {
	// Start starts the checker goroutine in background.
	Start()
	// Stop stops the background check goroutine.
	Stop()
	// contains filtered or unexported methods
}

DataFlushChecker represents the memory database flush checker. There are 4 flush policies of the Engine as below:

  1. FullFlush the highest priority, triggered by external API from the users. this action will block any other flush checkers.
  2. GlobalMemoryUsageChecker This checker will check the global memory usage of the host periodically, when the metric is above MemoryHighWaterMark, a `watermarkFlusher` will be spawned whose responsibility is to flush the biggest family until memory is lower than MemoryLowWaterMark.
  3. FamilyMemoryUsageChecker This checker will check each family's memory usage periodically, If this family is above FamilyMemoryUsedThreshold. it will be flushed to disk.
  4. DatabaseMetaFlusher It is a simple checker which flush the meta of database to disk periodically.

a). Each family or database is restricted to flush by one goroutine at the same time via CAS operation; b). The flush workers runs concurrently; c). All unit will be flushed when closing;

type Database

type Database interface {
	// Name returns time series database's name
	Name() string
	// NumOfShards returns number of families in time series database
	NumOfShards() int
	// GetConfig return the configuration of database.
	GetConfig() *models.DatabaseConfig
	// GetOption returns the database options
	GetOption() *option.DatabaseOption
	// CreateShards creates families for data partition
	CreateShards(shardIDs []models.ShardID) error
	// GetShard returns shard by given shard id
	GetShard(shardID models.ShardID) (Shard, bool)
	// ExecutorPool returns the pool for querying tasks
	ExecutorPool() *ExecutorPool
	// Closer closes database's underlying resource
	io.Closer
	// Metadata returns the metadata include metric/tag
	Metadata() metadb.Metadata
	// FlushMeta flushes meta to disk
	FlushMeta() error
	// WaitFlushMetaCompleted waits flush metadata job completed.
	WaitFlushMetaCompleted()
	// Flush flushes memory data of all families to disk
	Flush() error
	// Drop drops current database include all data.
	Drop() error
	// TTL expires the data of each shard base on time to live.
	TTL()
	// EvictSegment evicts segment which long term no read operation.
	EvictSegment()
	// SetLimits sets database's limits.
	SetLimits(limits *models.Limits)
	// GetLimits returns database's limits.
	GetLimits() *models.Limits
}

Database represents an abstract time series database

type Engine

type Engine interface {

	// CreateShards creates families for data partition by given options
	// 1) dump engine option into local disk
	// 2) create shard storage struct
	CreateShards(
		databaseName string,
		databaseOption *option.DatabaseOption,
		shardIDs ...models.ShardID,
	) error
	// SetDatabaseLimits sets database's limits.
	SetDatabaseLimits(database string, limits *models.Limits)
	// GetShard returns shard by given db and shard id
	GetShard(databaseName string, shardID models.ShardID) (Shard, bool)
	// GetDatabase returns the time series database by given name
	GetDatabase(databaseName string) (Database, bool)
	// GetAllDatabases returns all databases.
	GetAllDatabases() map[string]Database
	// FlushDatabase produces a signal to workers for flushing memory database by name
	FlushDatabase(ctx context.Context, databaseName string) bool
	// DropDatabases drops databases, keep active database.
	DropDatabases(activeDatabases map[string]struct{})
	// TTL expires the data of each database base on time to live.
	TTL()
	// EvictSegment evicts segment which long term no read operation.
	EvictSegment()
	// Close closes the cached time series databases
	Close()
	// contains filtered or unexported methods
}

Engine represents a time series engine

func NewEngine

func NewEngine() (Engine, error)

NewEngine creates an engine for manipulating the databases

type ExecutorPool

type ExecutorPool struct {
	Filtering concurrent.Pool
	Grouping  concurrent.Pool
	Scanner   concurrent.Pool
}

ExecutorPool represents the executor pool used by query flow for each storage engine

type FamilyManager

type FamilyManager interface {
	// AddFamily adds the family.
	AddFamily(family DataFamily)
	// RemoveFamily removes the family.
	RemoveFamily(family DataFamily)
	// WalkEntry walks each family entry via fn.
	WalkEntry(fn func(family DataFamily))
	// GetFamiliesByShard returns families for spec shard.
	GetFamiliesByShard(shard Shard) []DataFamily
}

FamilyManager represents the data family manager.

func GetFamilyManager

func GetFamilyManager() FamilyManager

GetFamilyManager returns the data family manager singleton instance. FIXME: need clean readonly family when no read long term

type IntervalSegment

type IntervalSegment interface {
	// GetOrCreateSegment creates new segment if not exist, if exist return it
	GetOrCreateSegment(segmentName string) (Segment, error)
	// GetDataFamilies returns data family list by time range, return nil if not match
	GetDataFamilies(timeRange timeutil.TimeRange) []DataFamily
	// Close closes interval segment, release resource
	Close()
	// TTL expires segment base on time to live.
	TTL() error
	// EvictSegment evicts segment which long term no read operation.
	EvictSegment()
}

IntervalSegment represents a interval segment, there are some segments in a shard.

type Segment

type Segment interface {
	// BaseTime returns segment base time.
	BaseTime() int64
	// GetOrCreateDataFamily returns the data family based on timestamp.
	GetOrCreateDataFamily(timestamp int64) (DataFamily, error)
	// GetDataFamilies returns data family list by time range, return nil if not match.
	GetDataFamilies(timeRange timeutil.TimeRange) []DataFamily
	// NeedEvict checks segment if it can evict, long term no read operation.
	NeedEvict() bool
	// EvictFamily evicts data family.
	EvictFamily(familyTime int64)
	// Close closes segment, include kv store.
	Close()
}

Segment represents a time based segment, there are some segments in a interval segment. A segment use k/v store for storing time series data.

type Shard

type Shard interface {
	// Database returns the database.
	Database() Database
	// ShardID returns the shard id.
	ShardID() models.ShardID
	// CurrentInterval returns current interval for metric write.
	CurrentInterval() timeutil.Interval
	// Indicator returns the unique shard info.
	Indicator() string
	// GetOrCrateDataFamily returns data family, if not exist create a new data family.
	GetOrCrateDataFamily(familyTime int64) (DataFamily, error)
	// GetDataFamilies returns data family list by interval type and time range, return nil if not match
	GetDataFamilies(intervalType timeutil.IntervalType, timeRange timeutil.TimeRange) []DataFamily
	// IndexDatabase returns the index-database
	IndexDatabase() indexdb.IndexDatabase
	// BufferManager returns write temp memory manager.
	BufferManager() memdb.BufferManager
	// LookupRowMetricMeta lookups the metadata of metric data for each row with same family in batch.
	LookupRowMetricMeta(rows []metric.StorageRow) error
	// FlushIndex flushes index data to disk.
	FlushIndex() error
	// WaitFlushIndexCompleted waits flush index job completed.
	WaitFlushIndexCompleted()

	// TTL expires the data of each segment base on time to live.
	TTL()
	// EvictSegment evicts segment which long term no read operation.
	EvictSegment()

	// Closer releases shard's resource, such as flush data, spawned goroutines etc.
	io.Closer
	// contains filtered or unexported methods
}

Shard is a horizontal partition of metrics for LinDB.

Directories

Path Synopsis
tblstore

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL