Documentation ¶
Overview ¶
Package tsdb implements a durable time series database.
Index ¶
- Constants
- Variables
- func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string)
- func DedupeEntries(a [][]byte) [][]byte
- func IsNumeric(c *influxql.Call) bool
- func IsRetryable(err error) bool
- func MarshalTags(tags map[string]string) []byte
- func MeasurementFromSeriesKey(key string) string
- func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func RegisterEngine(name string, fn NewEngineFunc)
- func RegisteredEngines() []string
- type ByteSlices
- type Config
- type Cursor
- type DatabaseIndex
- func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement
- func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series
- func (d *DatabaseIndex) DropMeasurement(name string)
- func (d *DatabaseIndex) DropSeries(keys []string)
- func (d *DatabaseIndex) Measurement(name string) *Measurement
- func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
- func (d *DatabaseIndex) Measurements() Measurements
- func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement
- func (d *DatabaseIndex) Series(key string) *Series
- func (d *DatabaseIndex) SeriesN() int
- func (d *DatabaseIndex) TagsForSeries(key string) map[string]string
- type Engine
- type EngineFormat
- type EngineOptions
- type Executor
- type Field
- type FieldCodec
- func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
- func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
- func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
- func (f *FieldCodec) FieldByName(name string) *Field
- func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
- func (f *FieldCodec) Fields() []*Field
- type FieldCreate
- type FilterExprs
- type FloatCursorIterator
- type Measurement
- func (m *Measurement) AddSeries(s *Series) bool
- func (m *Measurement) DropSeries(seriesID uint64)
- func (m *Measurement) FieldNames() []string
- func (m *Measurement) HasField(name string) bool
- func (m *Measurement) HasSeries() bool
- func (m *Measurement) HasTagKey(k string) bool
- func (m *Measurement) SelectFields(stmt *influxql.SelectStatement) []string
- func (m *Measurement) SelectTags(stmt *influxql.SelectStatement) []string
- func (m *Measurement) SeriesByID(id uint64) *Series
- func (m *Measurement) SeriesKeys() []string
- func (m *Measurement) SetFieldName(name string)
- func (m *Measurement) TagKeys() []string
- func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
- func (m *Measurement) TagValues(key string) []string
- func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error
- func (m *Measurement) WhereFields(stmt *influxql.SelectStatement) []string
- type MeasurementFields
- type MeasurementIterator
- type Measurements
- func (a Measurements) Len() int
- func (a Measurements) Less(i, j int) bool
- func (a Measurements) SelectFields(stmt *influxql.SelectStatement) []string
- func (a Measurements) SelectTags(stmt *influxql.SelectStatement) []string
- func (a Measurements) Swap(i, j int)
- func (a Measurements) WhereFields(stmt *influxql.SelectStatement) []string
- type NewEngineFunc
- type PointBatcher
- type PointBatcherStats
- type Series
- type SeriesCreate
- type SeriesIDs
- func (a SeriesIDs) Equals(other SeriesIDs) bool
- func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Len() int
- func (a SeriesIDs) Less(i, j int) bool
- func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Swap(i, j int)
- func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs
- type Shard
- func (s *Shard) Close() error
- func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
- func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error
- func (s *Shard) DeleteSeries(seriesKeys []string) error
- func (s *Shard) DiskSize() (int64, error)
- func (s *Shard) FieldCodec(measurementName string) *FieldCodec
- func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
- func (s *Shard) Open() error
- func (s *Shard) Path() string
- func (s *Shard) PerformMaintenance()
- func (s *Shard) SeriesCount() (int, error)
- func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error)
- func (s *Shard) WritePoints(points []models.Point) error
- func (s *Shard) WriteTo(w io.Writer) (int64, error)
- type Shards
- type Store
- func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error
- func (s *Store) Close() error
- func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error
- func (s *Store) DatabaseIndex(name string) *DatabaseIndex
- func (s *Store) DatabaseIndexN() int
- func (s *Store) Databases() []string
- func (s *Store) DeleteDatabase(name string) error
- func (s *Store) DeleteMeasurement(database, name string) error
- func (s *Store) DeleteRetentionPolicy(database, name string) error
- func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
- func (s *Store) DeleteShard(shardID uint64) error
- func (s *Store) DiskSize() (int64, error)
- func (s *Store) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
- func (s *Store) ExecuteShowSeriesStatement(stmt *influxql.ShowSeriesStatement, database string) (models.Rows, error)
- func (s *Store) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
- func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
- func (s *Store) Measurement(database, name string) *Measurement
- func (s *Store) Open() error
- func (s *Store) Path() string
- func (s *Store) Shard(id uint64) *Shard
- func (s *Store) ShardIDs() []uint64
- func (s *Store) ShardIteratorCreator(id uint64) influxql.IteratorCreator
- func (s *Store) ShardN() int
- func (s *Store) ShardRelativePath(id uint64) (string, error)
- func (s *Store) Shards(ids []uint64) []*Shard
- func (s *Store) WriteToShard(shardID uint64, points []models.Point) error
- type TagFilter
Constants ¶
const ( // DefaultEngine is the default engine for new shards DefaultEngine = "tsm1" // DefaultMaxWALSize is the default size of the WAL before it is flushed. DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB // DefaultWALFlushInterval is the frequency the WAL will get flushed if // it doesn't reach its size threshold. DefaultWALFlushInterval = 10 * time.Minute // DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes. DefaultWALPartitionFlushDelay = 2 * time.Second // DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed DefaultReadySeriesSize = 30 * 1024 // DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size DefaultCompactionThreshold = 0.5 // DefaultMaxSeriesSize specifies the size at which a series will be forced to flush DefaultMaxSeriesSize = 1024 * 1024 // DefaultFlushColdInterval specifies how long after a partition has been cold // for writes that a full flush and compaction are forced DefaultFlushColdInterval = 5 * time.Second // DefaultPartitionSizeThreshold specifies when a partition gets to this size in // memory, we should slow down writes until it gets a chance to compact. // This will force clients to get backpressure if they're writing too fast. We need // this because the WAL can take writes much faster than the index. So eventually // we'll need to create backpressure, otherwise we'll fill up the memory and die. // This number multiplied by the parition count is roughly the max possible memory // size for the in-memory WAL cache. DefaultPartitionSizeThreshold = 50 * 1024 * 1024 // 50MB // DefaultCacheMaxMemorySize is the maximum size a shard's cache can // reach before it starts rejecting writes. DefaultCacheMaxMemorySize = 500 * 1024 * 1024 // 500MB // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if // the shard hasn't received writes or deletes DefaultCacheSnapshotWriteColdDuration = time.Duration(time.Hour) // DefaultCompactFullWriteColdDuration is the duration at which the engine // will compact all TSM files in a shard if it hasn't received a write or delete DefaultCompactFullWriteColdDuration = time.Duration(24 * time.Hour) // DefaultMaxPointsPerBlock is the maximum number of points in an encoded // block in a TSM file DefaultMaxPointsPerBlock = 1000 )
const EOF = int64(-1)
EOF represents a "not found" key returned by a Cursor.
Variables ¶
var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") )
var ( // ErrShardNotFound gets returned when trying to get a non existing shard. ErrShardNotFound = fmt.Errorf("shard not found") // ErrStoreClosed gets returned when trying to use a closed Store. ErrStoreClosed = fmt.Errorf("store is closed") )
var ( // ErrFormatNotFound is returned when no format can be determined from a path. ErrFormatNotFound = errors.New("format not found") )
Functions ¶
func DecodeStorePath ¶
DecodeStorePath extracts the database and retention policy names from a given shard or WAL path.
func DedupeEntries ¶
DedupeEntries returns slices with unique keys (the first 8 bytes).
func IsRetryable ¶
IsRetryable returns true if this error is temporary and could be retried
func MarshalTags ¶
MarshalTags converts a tag set to bytes for use as a lookup key
func MeasurementFromSeriesKey ¶
MeasurementFromSeriesKey returns the name of the measurement from a key that contains a measurement name.
func NewFieldKeysIterator ¶
func NewTagKeysIterator ¶
NewTagKeysIterator returns a new instance of TagKeysIterator.
func RegisterEngine ¶
func RegisterEngine(name string, fn NewEngineFunc)
RegisterEngine registers a storage engine initializer by name.
func RegisteredEngines ¶
func RegisteredEngines() []string
RegisteredEngines returns the slice of currently registered engines.
Types ¶
type ByteSlices ¶
type ByteSlices [][]byte
ByteSlices wraps a list of byte-slices for sorting.
func (ByteSlices) Len ¶
func (a ByteSlices) Len() int
func (ByteSlices) Less ¶
func (a ByteSlices) Less(i, j int) bool
func (ByteSlices) Swap ¶
func (a ByteSlices) Swap(i, j int)
type Config ¶
type Config struct { Enabled bool `toml:"enabled"` Dir string `toml:"dir"` Engine string `toml:"engine"` // WAL config options for b1 (introduced in 0.9.2) MaxWALSize int `toml:"max-wal-size"` WALFlushInterval toml.Duration `toml:"wal-flush-interval"` WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"` // WAL configuration options for bz1 (introduced in 0.9.3) WALDir string `toml:"wal-dir"` WALLoggingEnabled bool `toml:"wal-logging-enabled"` WALReadySeriesSize int `toml:"wal-ready-series-size"` WALCompactionThreshold float64 `toml:"wal-compaction-threshold"` WALMaxSeriesSize int `toml:"wal-max-series-size"` WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"` WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"` // Query logging QueryLogEnabled bool `toml:"query-log-enabled"` // Compaction options for tsm1 (descriptions above with defaults) CacheMaxMemorySize uint64 `toml:"cache-max-memory-size"` CacheSnapshotMemorySize uint64 `toml:"cache-snapshot-memory-size"` CacheSnapshotWriteColdDuration toml.Duration `toml:"cache-snapshot-write-cold-duration"` CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"` MaxPointsPerBlock int `toml:"max-points-per-block"` DataLoggingEnabled bool `toml:"data-logging-enabled"` }
Config holds the configuration for the tsbd package.
type Cursor ¶
type Cursor interface { SeekTo(seek int64) (key int64, value interface{}) Next() (key int64, value interface{}) Ascending() bool }
Cursor represents an iterator over a series.
func MultiCursor ¶
MultiCursor returns a single cursor that combines the results of all cursors in order.
If the same key is returned from multiple cursors then the first cursor specified will take precendence. A key will only be returned once from the returned cursor.
type DatabaseIndex ¶
type DatabaseIndex struct {
// contains filtered or unexported fields
}
DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewDatabaseIndex ¶
func NewDatabaseIndex(name string) *DatabaseIndex
NewDatabaseIndex returns a new initialized DatabaseIndex.
func (*DatabaseIndex) CreateMeasurementIndexIfNotExists ¶
func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement
CreateMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement
func (*DatabaseIndex) CreateSeriesIndexIfNotExists ¶
func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series
CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object
func (*DatabaseIndex) DropMeasurement ¶
func (d *DatabaseIndex) DropMeasurement(name string)
DropMeasurement removes the measurement and all of its underlying series from the database index
func (*DatabaseIndex) DropSeries ¶
func (d *DatabaseIndex) DropSeries(keys []string)
DropSeries removes the series keys and their tags from the index
func (*DatabaseIndex) Measurement ¶
func (d *DatabaseIndex) Measurement(name string) *Measurement
Measurement returns the measurement object from the index by the name
func (*DatabaseIndex) MeasurementSeriesCounts ¶
func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database. Useful for reporting and monitoring.
func (*DatabaseIndex) Measurements ¶
func (d *DatabaseIndex) Measurements() Measurements
Measurements returns a list of all measurements.
func (*DatabaseIndex) MeasurementsByName ¶
func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement
MeasurementsByName returns a list of measurements.
func (*DatabaseIndex) Series ¶
func (d *DatabaseIndex) Series(key string) *Series
Series returns a series by key.
func (*DatabaseIndex) SeriesN ¶
func (d *DatabaseIndex) SeriesN() int
SeriesN returns the number of series.
func (*DatabaseIndex) TagsForSeries ¶
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string
TagsForSeries returns the tag map for the passed in series
type Engine ¶
type Engine interface { Open() error Close() error SetLogOutput(io.Writer) LoadMetadataIndex(shard *Shard, index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error) WritePoints(points []models.Point, measurementFieldsToSave map[string]*MeasurementFields, seriesToCreate []*SeriesCreate) error DeleteSeries(keys []string) error DeleteMeasurement(name string, seriesKeys []string) error SeriesCount() (n int, err error) // PerformMaintenance will get called periodically by the store PerformMaintenance() // Format will return the format for the engine Format() EngineFormat io.WriterTo Backup(w io.Writer, basePath string, since time.Time) error }
Engine represents a swappable storage engine for the shard.
type EngineFormat ¶
type EngineFormat int
EngineFormat represents the format for an engine.
const ( // TSM1Format is the format used by the tsm1 engine. TSM1Format EngineFormat = 2 )
type EngineOptions ¶
type EngineOptions struct { EngineVersion string MaxWALSize int WALFlushInterval time.Duration WALPartitionFlushDelay time.Duration Config Config }
EngineOptions represents the options used to initialize the engine.
func NewEngineOptions ¶
func NewEngineOptions() EngineOptions
NewEngineOptions returns the default options.
type Field ¶
type Field struct { ID uint8 `json:"id,omitempty"` Name string `json:"name,omitempty"` Type influxql.DataType `json:"type,omitempty"` }
Field represents a series field.
type FieldCodec ¶
type FieldCodec struct {
// contains filtered or unexported fields
}
FieldCodec provides encoding and decoding functionality for the fields of a given Measurement. It is a distinct type to avoid locking writes on this node while potentially long-running queries are executing.
It is not affected by changes to the Measurement object after codec creation. TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
However, this is here until tx.go and the engine get refactored into tsdb.
func NewFieldCodec ¶
func NewFieldCodec(fields map[string]*Field) *FieldCodec
NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with a RLock that protects the Measurement.
func (*FieldCodec) DecodeByID ¶
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
DecodeByID scans a byte slice for a field with the given ID, converts it to its expected type, and return that value. TODO: shouldn't be exported. refactor engine
func (*FieldCodec) DecodeByName ¶
func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error)
DecodeByName scans a byte slice for a field with the given name, converts it to its expected type, and return that value.
func (*FieldCodec) DecodeFields ¶
func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
DecodeFields decodes a byte slice into a set of field ids and values.
func (*FieldCodec) DecodeFieldsWithNames ¶
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
DecodeFieldsWithNames decodes a byte slice into a set of field names and values TODO: shouldn't be exported. refactor engine
func (*FieldCodec) EncodeFields ¶
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
EncodeFields converts a map of values with string keys to a byte slice of field IDs and values.
If a field exists in the codec, but its type is different, an error is returned. If a field is not present in the codec, the system panics.
func (*FieldCodec) FieldByName ¶
func (f *FieldCodec) FieldByName(name string) *Field
FieldByName returns the field by its name. It will return a nil if not found
func (*FieldCodec) FieldIDByName ¶
func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
FieldIDByName returns the ID of the field with the given name s. TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
func (*FieldCodec) Fields ¶
func (f *FieldCodec) Fields() []*Field
Fields returns a unsorted list of the codecs fields.
type FieldCreate ¶
FieldCreate holds information for a field to create on a measurement
type FilterExprs ¶
FilterExprs represents a map of series IDs to filter expressions.
func (FilterExprs) DeleteBoolLiteralTrues ¶
func (fe FilterExprs) DeleteBoolLiteralTrues()
DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true.
type FloatCursorIterator ¶
type FloatCursorIterator struct {
// contains filtered or unexported fields
}
FloatCursorIterator represents a wrapper for Cursor to produce an influxql.FloatIterator.
func NewFloatCursorIterator ¶
func NewFloatCursorIterator(name string, tagMap map[string]string, cur Cursor, opt influxql.IteratorOptions) *FloatCursorIterator
NewFloatCursorIterator returns a new instance of FloatCursorIterator.
func (*FloatCursorIterator) Close ¶
func (itr *FloatCursorIterator) Close() error
Close closes the iterator.
func (*FloatCursorIterator) Next ¶
func (itr *FloatCursorIterator) Next() *influxql.FloatPoint
Next returns the next point from the cursor.
type Measurement ¶
type Measurement struct { Name string `json:"name,omitempty"` // contains filtered or unexported fields }
Measurement represents a collection of time series in a database. It also contains in memory structures for indexing tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewMeasurement ¶
func NewMeasurement(name string, idx *DatabaseIndex) *Measurement
NewMeasurement allocates and initializes a new Measurement.
func (*Measurement) AddSeries ¶
func (m *Measurement) AddSeries(s *Series) bool
AddSeries will add a series to the measurementIndex. Returns false if already present
func (*Measurement) DropSeries ¶
func (m *Measurement) DropSeries(seriesID uint64)
DropSeries will remove a series from the measurementIndex.
func (*Measurement) FieldNames ¶
func (m *Measurement) FieldNames() []string
FieldNames returns a list of the measurement's field names
func (*Measurement) HasField ¶
func (m *Measurement) HasField(name string) bool
HasField returns true if the measurement has a field by the given name
func (*Measurement) HasSeries ¶
func (m *Measurement) HasSeries() bool
HasSeries returns true if there is at least 1 series under this measurement
func (*Measurement) HasTagKey ¶
func (m *Measurement) HasTagKey(k string) bool
HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (*Measurement) SelectFields ¶
func (m *Measurement) SelectFields(stmt *influxql.SelectStatement) []string
SelectFields returns a list of fields in the SELECT section of stmt.
func (*Measurement) SelectTags ¶
func (m *Measurement) SelectTags(stmt *influxql.SelectStatement) []string
SelectTags returns a list of non-field tags in the SELECT section of stmt.
func (*Measurement) SeriesByID ¶
func (m *Measurement) SeriesByID(id uint64) *Series
SeriesByID returns a series by identifier.
func (*Measurement) SeriesKeys ¶
func (m *Measurement) SeriesKeys() []string
SeriesKeys returns the keys of every series in this measurement
func (*Measurement) SetFieldName ¶
func (m *Measurement) SetFieldName(name string)
SetFieldName adds the field name to the measurement.
func (*Measurement) TagKeys ¶
func (m *Measurement) TagKeys() []string
TagKeys returns a list of the measurement's tag names.
func (*Measurement) TagSets ¶
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine what composite series will be created by a group by. i.e. "group by region" should return: {"region":"uswest"}, {"region":"useast"} or region, service returns {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... This will also populate the TagSet objects with the series IDs that match each tagset and any influx filter expression that goes with the series TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (*Measurement) TagValues ¶
func (m *Measurement) TagValues(key string) []string
TagValues returns all the values for the given tag key
func (*Measurement) ValidateGroupBy ¶
func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error
ValidateGroupBy ensures that the GROUP BY is not a field.
func (*Measurement) WhereFields ¶
func (m *Measurement) WhereFields(stmt *influxql.SelectStatement) []string
WhereFields returns a list of non-"time" fields in the WHERE section of stmt.
type MeasurementFields ¶
type MeasurementFields struct { Fields map[string]*Field `json:"fields"` Codec *FieldCodec }
MeasurementFields holds the fields of a measurement and their codec.
func (*MeasurementFields) CreateFieldIfNotExists ¶
func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error
CreateFieldIfNotExists creates a new field with an autoincrementing ID. Returns an error if 255 fields have already been created on the measurement or the fields already exists with a different type.
func (*MeasurementFields) MarshalBinary ¶
func (m *MeasurementFields) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*MeasurementFields) UnmarshalBinary ¶
func (m *MeasurementFields) UnmarshalBinary(buf []byte) error
UnmarshalBinary decodes the object from a binary format.
type MeasurementIterator ¶
type MeasurementIterator struct {
// contains filtered or unexported fields
}
MeasurementIterator represents a string iterator that emits all measurement names in a shard.
func NewMeasurementIterator ¶
func NewMeasurementIterator(sh *Shard, opt influxql.IteratorOptions) (*MeasurementIterator, error)
NewMeasurementIterator returns a new instance of MeasurementIterator.
func (*MeasurementIterator) Close ¶
func (itr *MeasurementIterator) Close() error
Close closes the iterator.
func (*MeasurementIterator) Next ¶
func (itr *MeasurementIterator) Next() *influxql.FloatPoint
Next emits the next measurement name.
type Measurements ¶
type Measurements []*Measurement
Measurements represents a list of *Measurement.
func (Measurements) Len ¶
func (a Measurements) Len() int
func (Measurements) Less ¶
func (a Measurements) Less(i, j int) bool
func (Measurements) SelectFields ¶
func (a Measurements) SelectFields(stmt *influxql.SelectStatement) []string
SelectFields returns a list of fields in the SELECT section of stmt.
func (Measurements) SelectTags ¶
func (a Measurements) SelectTags(stmt *influxql.SelectStatement) []string
SelectTags returns a list of non-field tags in the SELECT section of stmt.
func (Measurements) Swap ¶
func (a Measurements) Swap(i, j int)
func (Measurements) WhereFields ¶
func (a Measurements) WhereFields(stmt *influxql.SelectStatement) []string
WhereFields returns a list of non-"time" fields in the WHERE section of stmt.
type NewEngineFunc ¶
type NewEngineFunc func(path string, walPath string, options EngineOptions) Engine
NewEngineFunc creates a new engine.
type PointBatcher ¶
type PointBatcher struct {
// contains filtered or unexported fields
}
PointBatcher accepts Points and will emit a batch of those points when either a) the batch reaches a certain size, or b) a certain time passes.
func NewPointBatcher ¶
func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher
NewPointBatcher returns a new PointBatcher. sz is the batching size, bp is the maximum number of batches that may be pending. d is the time after which a batch will be emitted after the first point is received for the batch, regardless of its size.
func (*PointBatcher) Flush ¶
func (b *PointBatcher) Flush()
Flush instructs the batcher to emit any pending points in a batch, regardless of batch size. If there are no pending points, no batch is emitted.
func (*PointBatcher) In ¶
func (b *PointBatcher) In() chan<- models.Point
In returns the channel to which points should be written.
func (*PointBatcher) Out ¶
func (b *PointBatcher) Out() <-chan []models.Point
Out returns the channel from which batches should be read.
func (*PointBatcher) Start ¶
func (b *PointBatcher) Start()
Start starts the batching process. Returns the in and out channels for points and point-batches respectively.
func (*PointBatcher) Stats ¶
func (b *PointBatcher) Stats() *PointBatcherStats
Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be closely correlated with each other statistic, it is not guaranteed.
func (*PointBatcher) Stop ¶
func (b *PointBatcher) Stop()
Stop stops the batching process. Stop waits for the batching routine to stop before returning.
type PointBatcherStats ¶
type PointBatcherStats struct { BatchTotal uint64 // Total count of batches transmitted. PointTotal uint64 // Total count of points processed. SizeTotal uint64 // Number of batches that reached size threshold. TimeoutTotal uint64 // Number of timeouts that occurred. }
PointBatcherStats are the statistics each batcher tracks.
type Series ¶
Series belong to a Measurement and represent unique time series in a database
func (*Series) InitializeShards ¶
func (s *Series) InitializeShards()
InitializeShards initializes the list of shards.
func (*Series) MarshalBinary ¶
MarshalBinary encodes the object to a binary format.
func (*Series) UnmarshalBinary ¶
UnmarshalBinary decodes the object from a binary format.
type SeriesCreate ¶
SeriesCreate holds information for a series to create
type SeriesIDs ¶
type SeriesIDs []uint64
SeriesIDs is a convenience type for sorting, checking equality, and doing union and intersection of collections of series ids.
func (SeriesIDs) Intersect ¶
Intersect returns a new collection of series ids in sorted order that is the intersection of the two. The two collections must already be sorted.
type Shard ¶
type Shard struct { // The writer used by the logger. LogOutput io.Writer // contains filtered or unexported fields }
Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.
func NewShard ¶
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard
NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func (*Shard) CreateIterator ¶
CreateIterator returns an iterator for the data in the shard.
func (*Shard) DeleteMeasurement ¶
DeleteMeasurement deletes a measurement and all underlying series.
func (*Shard) DeleteSeries ¶
DeleteSeries deletes a list of series.
func (*Shard) FieldCodec ¶
func (s *Shard) FieldCodec(measurementName string) *FieldCodec
FieldCodec returns the field encoding for a measurement. TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func (*Shard) FieldDimensions ¶
func (s *Shard) FieldDimensions(sources influxql.Sources) (fields, dimensions map[string]struct{}, err error)
FieldDimensions returns unique sets of fields and dimensions across a list of sources.
func (*Shard) PerformMaintenance ¶
func (s *Shard) PerformMaintenance()
PerformMaintenance gets called periodically to have the engine perform any maintenance tasks like WAL flushing and compaction
func (*Shard) SeriesCount ¶
SeriesCount returns the number of series buckets on the shard.
func (*Shard) SeriesKeys ¶
func (s *Shard) SeriesKeys(opt influxql.IteratorOptions) (influxql.SeriesList, error)
SeriesKeys returns a list of series in the shard.
func (*Shard) WritePoints ¶
WritePoints will write the raw data points and any new metadata to the index in the shard
type Store ¶
type Store struct { EngineOptions EngineOptions Logger *log.Logger // contains filtered or unexported fields }
Store manages shards and indexes for databases.
func NewStore ¶
NewStore returns a new store with the given path and a default configuration. The returned store must be initialized by calling Open before using it.
func (*Store) BackupShard ¶
BackupShard will get the shard and have the engine backup since the passed in time to the writer
func (*Store) Close ¶
Close closes the store and all associated shards. After calling Close accessing shards through the Store will result in ErrStoreClosed being returned.
func (*Store) CreateShard ¶
CreateShard creates a shard with the given id and retention policy on a database.
func (*Store) DatabaseIndex ¶
func (s *Store) DatabaseIndex(name string) *DatabaseIndex
DatabaseIndex returns the index for a database by its name.
func (*Store) DatabaseIndexN ¶
DatabaseIndexN returns the number of databases indicies in the store.
func (*Store) DeleteDatabase ¶
DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (*Store) DeleteMeasurement ¶
DeleteMeasurement removes a measurement and all associated series from a database.
func (*Store) DeleteRetentionPolicy ¶
DeleteRetentionPolicy will close all shards associated with the provided retention policy, remove the retention policy directories on both the DB and WAL, and remove all shard files from disk.
func (*Store) DeleteSeries ¶
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
func (*Store) DeleteShard ¶
DeleteShard removes a shard from disk.
func (*Store) DiskSize ¶
DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size.
func (*Store) ExecuteShowFieldKeysStatement ¶
func (*Store) ExecuteShowSeriesStatement ¶
func (*Store) ExecuteShowTagValuesStatement ¶
func (*Store) ExpandSources ¶
ExpandSources expands regex sources and removes duplicates. NOTE: sources must be normalized (db and rp set) before calling this function.
func (*Store) Measurement ¶
func (s *Store) Measurement(database, name string) *Measurement
Measurement returns a measurement by name from the given database.
func (*Store) Open ¶
Open initializes the store, creating all necessary directories, loading all shards and indexes and initializing periodic maintenance of all shards.
func (*Store) ShardIteratorCreator ¶
func (s *Store) ShardIteratorCreator(id uint64) influxql.IteratorCreator
ShardIteratorCreator returns an iterator creator for a shard.
func (*Store) ShardRelativePath ¶
ShardRelativePath will return the relative path to the shard. i.e. <database>/<retention>/<id>