Documentation ¶
Overview ¶
Package inmem implements a shared, in-memory index for each database.
The in-memory index is the original index implementation and provides fast access to index data. However, it also forces high memory usage for large datasets and can cause OOM errors.
Index is the shared index structure that provides most of the functionality. However, ShardIndex is a light per-shard wrapper that adapts this original shared index format to the new per-shard format.
Index ¶
- Constants
- func NewShardIndex(id uint64, database, path string, opt tsdb.EngineOptions) tsdb.Index
- type FilterExprs
- type Index
- func (i *Index) AssignShard(k string, shardID uint64)
- func (i *Index) Close() error
- func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement
- func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags models.Tags, opt *tsdb.EngineOptions, ...) error
- func (i *Index) DropMeasurement(name []byte) error
- func (i *Index) DropSeries(key []byte) error
- func (i *Index) ForEachMeasurementName(fn func(name []byte) error) error
- func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
- func (i *Index) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
- func (i *Index) HasTagKey(name, key []byte) (bool, error)
- func (i *Index) HasTagValue(name, key, value []byte) bool
- func (i *Index) Measurement(name []byte) (*Measurement, error)
- func (i *Index) MeasurementExists(name []byte) (bool, error)
- func (i *Index) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
- func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
- func (i *Index) MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error)
- func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
- func (i *Index) MeasurementsByName(names [][]byte) ([]*Measurement, error)
- func (i *Index) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
- func (i *Index) Open() (err error)
- func (i *Index) RemoveShard(shardID uint64)
- func (i *Index) Series(key []byte) (*Series, error)
- func (i *Index) SeriesKeys() []string
- func (i *Index) SeriesN() int64
- func (i *Index) SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
- func (i *Index) SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
- func (i *Index) SetFieldName(measurement []byte, name string)
- func (i *Index) SetFieldSet(*tsdb.MeasurementFieldSet)
- func (i *Index) SnapshotTo(path string) error
- func (i *Index) TagKeyCardinality(name, key []byte) int
- func (i *Index) TagSets(shardID uint64, name []byte, opt influxql.IteratorOptions) ([]*influxql.TagSet, error)
- func (i *Index) TagValueN(name, key []byte) int
- func (i *Index) TagsForSeries(key string) (models.Tags, error)
- func (i *Index) Type() string
- func (i *Index) UnassignShard(k string, shardID uint64) error
- func (i *Index) WithLogger(zap.Logger)
- type Measurement
- func (m *Measurement) AddSeries(s *Series) bool
- func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string
- func (m *Measurement) Cardinality(key string) int
- func (m *Measurement) CardinalityBytes(key []byte) int
- func (m *Measurement) DropSeries(series *Series)
- func (m *Measurement) FieldNames() []string
- func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error
- func (m *Measurement) HasField(name string) bool
- func (m *Measurement) HasSeries() bool
- func (m *Measurement) HasTagKey(k string) bool
- func (m *Measurement) HasTagKeyValue(k, v []byte) bool
- func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs
- func (m *Measurement) SeriesByID(id uint64) *Series
- func (m *Measurement) SeriesByIDMap() map[uint64]*Series
- func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series
- func (m *Measurement) SeriesByTagKeyValue(key string) map[string]SeriesIDs
- func (m *Measurement) SeriesIDs() SeriesIDs
- func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error)
- func (m *Measurement) SeriesKeys() [][]byte
- func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte
- func (m *Measurement) SetFieldName(name string)
- func (m *Measurement) TagKeys() []string
- func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error)
- func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*influxql.TagSet, error)
- func (m *Measurement) TagValues(key string) []string
- func (m *Measurement) WalkTagKeys(fn func(k string))
- func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error)
- type Measurements
- type Series
- func (s *Series) AssignShard(shardID uint64)
- func (s *Series) Assigned(shardID uint64) bool
- func (s *Series) CopyTags()
- func (s *Series) ForEachTag(fn func(models.Tag))
- func (s *Series) GetTagString(key string) string
- func (s *Series) Measurement() *Measurement
- func (s *Series) SetMeasurement(m *Measurement)
- func (s *Series) ShardN() int
- func (s *Series) Tags() models.Tags
- func (s *Series) UnassignShard(shardID uint64)
- type SeriesIDs
- func (a SeriesIDs) Equals(other SeriesIDs) bool
- func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Len() int
- func (a SeriesIDs) Less(i, j int) bool
- func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Swap(i, j int)
- func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs
- type ShardIndex
- func (i *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
- func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error
- func (i *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error
- func (i *ShardIndex) TagSets(name []byte, opt influxql.IteratorOptions) ([]*influxql.TagSet, error)
- type TagFilter
Constants ¶
const IndexName = "inmem"
IndexName is the name of this index.
Variables ¶
This section is empty.
Functions ¶
func NewShardIndex ¶
NewShardIndex returns a new index for a shard.
Types ¶
type FilterExprs ¶
FilterExprs represents a map of series IDs to filter expressions.
func (FilterExprs) DeleteBoolLiteralTrues ¶
func (fe FilterExprs) DeleteBoolLiteralTrues()
DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true.
type Index ¶
type Index struct {
// contains filtered or unexported fields
}
Index is the in memory index of a collection of measurements, time series, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks.
func (*Index) AssignShard ¶
AssignShard update the index to indicate that series k exists in the given shardID.
func (*Index) CreateMeasurementIndexIfNotExists ¶
func (i *Index) CreateMeasurementIndexIfNotExists(name []byte) *Measurement
CreateMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement
func (*Index) CreateSeriesIfNotExists ¶
func (i *Index) CreateSeriesIfNotExists(shardID uint64, key, name []byte, tags models.Tags, opt *tsdb.EngineOptions, ignoreLimits bool) error
CreateSeriesIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object
func (*Index) DropMeasurement ¶
DropMeasurement removes the measurement and all of its underlying series from the database index
func (*Index) DropSeries ¶
DropSeries removes the series key and its tags from the index.
func (*Index) ForEachMeasurementName ¶
ForEachMeasurementName iterates over each measurement name.
func (*Index) ForEachMeasurementSeriesByExpr ¶
func (i *Index) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error
ForEachMeasurementSeriesByExpr iterates over all series in a measurement filtered by an expression.
func (*Index) ForEachMeasurementTagKey ¶
ForEachMeasurementTagKey iterates over all tag keys for a measurement.
func (*Index) HasTagValue ¶
HasTagValue returns true if tag value exists.
func (*Index) Measurement ¶
func (i *Index) Measurement(name []byte) (*Measurement, error)
Measurement returns the measurement object from the index by the name
func (*Index) MeasurementExists ¶
MeasurementExists returns true if the measurement exists.
func (*Index) MeasurementNamesByExpr ¶
MeasurementNamesByExpr takes an expression containing only tags and returns a list of matching meaurement names.
func (*Index) MeasurementNamesByRegex ¶
MeasurementNamesByRegex returns the measurements that match the regex.
func (*Index) MeasurementSeriesKeysByExpr ¶
func (*Index) MeasurementTagKeysByExpr ¶
func (i *Index) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeysByExpr returns an ordered set of tag keys filtered by an expression.
func (*Index) MeasurementsByName ¶
func (i *Index) MeasurementsByName(names [][]byte) ([]*Measurement, error)
MeasurementsByName returns a list of measurements.
func (*Index) MeasurementsSketches ¶
MeasurementsSketches returns the sketches for the measurements.
func (*Index) RemoveShard ¶
RemoveShard removes all references to shardID from any series or measurements in the index. If the shard was the only owner of data for the series, the series is removed from the index.
func (*Index) SeriesKeys ¶
func (*Index) SeriesN ¶
SeriesN returns the number of unique non-tombstoned series in the index. Since indexes are not shared across shards, the count returned by SeriesN cannot be combined with other shards' counts.
func (*Index) SeriesPointIterator ¶
SeriesPointIterator returns an influxql iterator over all series.
func (*Index) SeriesSketches ¶
SeriesSketches returns the sketches for the series.
func (*Index) SetFieldName ¶
SetFieldName adds a field name to a measurement.
func (*Index) SetFieldSet ¶
func (i *Index) SetFieldSet(*tsdb.MeasurementFieldSet)
SetFieldSet sets a shared field set from the engine.
func (*Index) SnapshotTo ¶
SnapshotTo is a no-op since this is an in-memory index.
func (*Index) TagKeyCardinality ¶
TagKeyCardinality returns the number of values for a measurement/tag key.
func (*Index) TagSets ¶
func (i *Index) TagSets(shardID uint64, name []byte, opt influxql.IteratorOptions) ([]*influxql.TagSet, error)
TagSets returns a list of tag sets.
func (*Index) TagsForSeries ¶
TagsForSeries returns the tag map for the passed in series
func (*Index) UnassignShard ¶
UnassignShard updates the index to indicate that series k does not exist in the given shardID.
func (*Index) WithLogger ¶
type Measurement ¶
type Measurement struct { Name string `json:"name,omitempty"` // contains filtered or unexported fields }
Measurement represents a collection of time series in a database. It also contains in memory structures for indexing tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks.
func NewMeasurement ¶
func NewMeasurement(database, name string) *Measurement
NewMeasurement allocates and initializes a new Measurement.
func (*Measurement) AddSeries ¶
func (m *Measurement) AddSeries(s *Series) bool
AddSeries adds a series to the measurement's index. It returns true if the series was added successfully or false if the series was already present.
func (*Measurement) AppendSeriesKeysByID ¶
func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string
AppendSeriesKeysByID appends keys for a list of series ids to a buffer.
func (*Measurement) Cardinality ¶
func (m *Measurement) Cardinality(key string) int
Cardinality returns the number of values associated with the given tag key.
func (*Measurement) CardinalityBytes ¶
func (m *Measurement) CardinalityBytes(key []byte) int
CardinalityBytes returns the number of values associated with the given tag key.
func (*Measurement) DropSeries ¶
func (m *Measurement) DropSeries(series *Series)
DropSeries removes a series from the measurement's index.
func (*Measurement) FieldNames ¶
func (m *Measurement) FieldNames() []string
FieldNames returns a list of the measurement's field names, in an arbitrary order.
func (*Measurement) ForEachSeriesByExpr ¶
func (m *Measurement) ForEachSeriesByExpr(condition influxql.Expr, fn func(tags models.Tags) error) error
ForEachSeriesByExpr iterates over all series filtered by condition.
func (*Measurement) HasField ¶
func (m *Measurement) HasField(name string) bool
func (*Measurement) HasSeries ¶
func (m *Measurement) HasSeries() bool
HasSeries returns true if there is at least 1 series under this measurement.
func (*Measurement) HasTagKey ¶
func (m *Measurement) HasTagKey(k string) bool
HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (*Measurement) HasTagKeyValue ¶
func (m *Measurement) HasTagKeyValue(k, v []byte) bool
func (*Measurement) IDsForExpr ¶
func (m *Measurement) IDsForExpr(n *influxql.BinaryExpr) SeriesIDs
IDsForExpr returns the series IDs that are candidates to match the given expression.
func (*Measurement) SeriesByID ¶
func (m *Measurement) SeriesByID(id uint64) *Series
SeriesByID returns a series by identifier.
func (*Measurement) SeriesByIDMap ¶
func (m *Measurement) SeriesByIDMap() map[uint64]*Series
SeriesByIDMap returns the internal seriesByID map.
func (*Measurement) SeriesByIDSlice ¶
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series
SeriesByIDSlice returns a list of series by identifiers.
func (*Measurement) SeriesByTagKeyValue ¶
func (m *Measurement) SeriesByTagKeyValue(key string) map[string]SeriesIDs
func (*Measurement) SeriesIDs ¶
func (m *Measurement) SeriesIDs() SeriesIDs
func (*Measurement) SeriesIDsAllOrByExpr ¶
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error)
SeriesIDsAllOrByExpr walks an expressions for matching series IDs or, if no expressions is given, returns all series IDs for the measurement.
func (*Measurement) SeriesKeys ¶
func (m *Measurement) SeriesKeys() [][]byte
SeriesKeys returns the keys of every series in this measurement
func (*Measurement) SeriesKeysByID ¶
func (m *Measurement) SeriesKeysByID(ids SeriesIDs) [][]byte
SeriesKeysByID returns the a list of keys for a set of ids.
func (*Measurement) SetFieldName ¶
func (m *Measurement) SetFieldName(name string)
SetFieldName adds the field name to the measurement.
func (*Measurement) TagKeys ¶
func (m *Measurement) TagKeys() []string
TagKeys returns a list of the measurement's tag names, in sorted order.
func (*Measurement) TagKeysByExpr ¶
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (map[string]struct{}, error)
tagKeysByExpr extracts the tag keys wanted by the expression.
func (*Measurement) TagSets ¶
func (m *Measurement) TagSets(shardID uint64, opt influxql.IteratorOptions) ([]*influxql.TagSet, error)
TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine what composite series will be created by a group by. i.e. "group by region" should return: {"region":"uswest"}, {"region":"useast"} or region, service returns {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... This will also populate the TagSet objects with the series IDs that match each tagset and any influx filter expression that goes with the series TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (*Measurement) TagValues ¶
func (m *Measurement) TagValues(key string) []string
TagValues returns all the values for the given tag key, in an arbitrary order.
func (*Measurement) WalkTagKeys ¶
func (m *Measurement) WalkTagKeys(fn func(k string))
WalkTagKeys calls fn for each tag key associated with m. The order of the keys is undefined.
func (*Measurement) WalkWhereForSeriesIds ¶
func (m *Measurement) WalkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, FilterExprs, error)
WalkWhereForSeriesIds recursively walks the WHERE clause and returns an ordered set of series IDs and a map from those series IDs to filter expressions that should be used to limit points returned in the final query result.
type Measurements ¶
type Measurements []*Measurement
Measurements represents a list of *Measurement.
func (Measurements) Intersect ¶
func (a Measurements) Intersect(other Measurements) Measurements
func (Measurements) Less ¶
func (a Measurements) Less(i, j int) bool
Less implements sort.Interface.
func (Measurements) Union ¶
func (a Measurements) Union(other Measurements) Measurements
type Series ¶
Series belong to a Measurement and represent unique time series in a database.
func (*Series) AssignShard ¶
func (*Series) CopyTags ¶
func (s *Series) CopyTags()
CopyTags clones the tags on the series in-place,
func (*Series) ForEachTag ¶
ForEachTag executes fn for every tag. Iteration occurs under lock.
func (*Series) GetTagString ¶
GetTagString returns a tag value under lock.
func (*Series) Measurement ¶
func (s *Series) Measurement() *Measurement
Measurement returns the measurement on the series.
func (*Series) SetMeasurement ¶
func (s *Series) SetMeasurement(m *Measurement)
SetMeasurement sets the measurement on the series.
func (*Series) UnassignShard ¶
type SeriesIDs ¶
type SeriesIDs []uint64
SeriesIDs is a convenience type for sorting, checking equality, and doing union and intersection of collections of series ids.
func (SeriesIDs) Intersect ¶
Intersect returns a new collection of series ids in sorted order that is the intersection of the two. The two collections must already be sorted.
type ShardIndex ¶
type ShardIndex struct { *Index // contains filtered or unexported fields }
ShardIndex represents a shim between the TSDB index interface and the shared in-memory index. This is required because per-shard in-memory indexes will grow the heap size too large.
func (*ShardIndex) CreateSeriesIfNotExists ¶
func (i *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
func (*ShardIndex) CreateSeriesListIfNotExists ¶
func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error
CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk.
func (*ShardIndex) InitializeSeries ¶
func (i *ShardIndex) InitializeSeries(key, name []byte, tags models.Tags) error
InitializeSeries is called during startup. This works the same as CreateSeriesIfNotExists except it ignore limit errors.
func (*ShardIndex) TagSets ¶
func (i *ShardIndex) TagSets(name []byte, opt influxql.IteratorOptions) ([]*influxql.TagSet, error)
TagSets returns a list of tag sets based on series filtering.