Documentation ¶
Overview ¶
Package tsdb implements a durable time series database.
Index ¶
- Constants
- Variables
- func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string)
- func MarshalTags(tags map[string]string) []byte
- func MeasurementFromSeriesKey(key string) string
- func NewFieldKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func NewSeriesIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func NewShardError(id uint64, err error) error
- func NewTagKeysIterator(sh *Shard, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func RegisterEngine(name string, fn NewEngineFunc)
- func RegisteredEngines() []string
- type Config
- type Cursor
- type DatabaseIndex
- func (d *DatabaseIndex) AssignShard(k string, shardID uint64)
- func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement
- func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series
- func (d *DatabaseIndex) DropMeasurement(name string)
- func (d *DatabaseIndex) DropSeries(keys []string)
- func (d *DatabaseIndex) Measurement(name string) *Measurement
- func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
- func (d *DatabaseIndex) Measurements() Measurements
- func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
- func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement
- func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements
- func (d *DatabaseIndex) RemoveShard(shardID uint64)
- func (d *DatabaseIndex) Series(key string) *Series
- func (d *DatabaseIndex) SeriesKeys() []string
- func (d *DatabaseIndex) SeriesN() int
- func (d *DatabaseIndex) SeriesShardN(shardID uint64) int
- func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic
- func (d *DatabaseIndex) TagsForSeries(key string) map[string]string
- func (d *DatabaseIndex) UnassignShard(k string, shardID uint64)
- type Engine
- type EngineFormat
- type EngineOptions
- type Field
- type FieldCreate
- type FilterExprs
- type IndexStatistics
- type KeyValue
- type KeyValues
- type Measurement
- func (m *Measurement) AddSeries(s *Series) bool
- func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string
- func (m *Measurement) DropSeries(series *Series)
- func (m *Measurement) FieldNames() []string
- func (m *Measurement) HasField(name string) bool
- func (m *Measurement) HasSeries() bool
- func (m *Measurement) HasTagKey(k string) bool
- func (m *Measurement) SeriesByID(id uint64) *Series
- func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series
- func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error)
- func (m *Measurement) SeriesKeys() []string
- func (m *Measurement) SetFieldName(name string)
- func (m *Measurement) TagKeys() []string
- func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
- func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
- func (m *Measurement) TagValues(key string) []string
- func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error
- type MeasurementFields
- func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error
- func (m *MeasurementFields) Field(name string) *Field
- func (m *MeasurementFields) FieldSet() map[string]influxql.DataType
- func (m *MeasurementFields) MarshalBinary() ([]byte, error)
- func (m *MeasurementFields) UnmarshalBinary(buf []byte) error
- type Measurements
- type NewEngineFunc
- type PointBatcher
- type PointBatcherStats
- type Series
- type SeriesCreate
- type SeriesIDs
- func (a SeriesIDs) Equals(other SeriesIDs) bool
- func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Len() int
- func (a SeriesIDs) Less(i, j int) bool
- func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Swap(i, j int)
- func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs
- type Shard
- func (s *Shard) Close() error
- func (s *Shard) ContainsSeries(seriesKeys []string) (map[string]bool, error)
- func (s *Shard) CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error)
- func (s *Shard) CreateSnapshot() (string, error)
- func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error
- func (s *Shard) DeleteSeries(seriesKeys []string) error
- func (s *Shard) DeleteSeriesRange(seriesKeys []string, min, max int64) error
- func (s *Shard) DiskSize() (int64, error)
- func (s *Shard) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
- func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
- func (s *Shard) Open() error
- func (s *Shard) Path() string
- func (s *Shard) Restore(r io.Reader, basePath string) error
- func (s *Shard) SeriesCount() (int, error)
- func (s *Shard) SetEnabled(enabled bool)
- func (s *Shard) SetLogOutput(w io.Writer)
- func (s *Shard) Statistics(tags map[string]string) []models.Statistic
- func (s *Shard) WritePoints(points []models.Point) error
- func (s *Shard) WriteTo(w io.Writer) (int64, error)
- type ShardError
- type ShardStatistics
- type Shards
- type Store
- func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error
- func (s *Store) Close() error
- func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
- func (s *Store) CreateShardSnapshot(id uint64) (string, error)
- func (s *Store) DatabaseIndex(name string) *DatabaseIndex
- func (s *Store) DatabaseIndexN() int
- func (s *Store) Databases() []string
- func (s *Store) DeleteDatabase(name string) error
- func (s *Store) DeleteMeasurement(database, name string) error
- func (s *Store) DeleteRetentionPolicy(database, name string) error
- func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
- func (s *Store) DeleteShard(shardID uint64) error
- func (s *Store) DiskSize() (int64, error)
- func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
- func (s *Store) IteratorCreator(shards []uint64, opt *influxql.SelectOptions) (influxql.IteratorCreator, error)
- func (s *Store) IteratorCreators() influxql.IteratorCreators
- func (s *Store) Measurement(database, name string) *Measurement
- func (s *Store) Measurements(database string, cond influxql.Expr) ([]string, error)
- func (s *Store) Open() error
- func (s *Store) Path() string
- func (s *Store) RestoreShard(id uint64, r io.Reader) error
- func (s *Store) SetLogOutput(w io.Writer)
- func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error
- func (s *Store) Shard(id uint64) *Shard
- func (s *Store) ShardIDs() []uint64
- func (s *Store) ShardIteratorCreator(id uint64, opt *influxql.SelectOptions) influxql.IteratorCreator
- func (s *Store) ShardN() int
- func (s *Store) ShardRelativePath(id uint64) (string, error)
- func (s *Store) Shards(ids []uint64) []*Shard
- func (s *Store) Statistics(tags map[string]string) []models.Statistic
- func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error)
- func (s *Store) WriteToShard(shardID uint64, points []models.Point) error
- type TagFilter
- type TagValues
Constants ¶
const ( // DefaultEngine is the default engine for new shards DefaultEngine = "tsm1" // DefaultCacheMaxMemorySize is the maximum size a shard's cache can // reach before it starts rejecting writes. DefaultCacheMaxMemorySize = 500 * 1024 * 1024 // 500MB // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if // the shard hasn't received writes or deletes DefaultCacheSnapshotWriteColdDuration = time.Duration(time.Hour) // DefaultCompactFullWriteColdDuration is the duration at which the engine // will compact all TSM files in a shard if it hasn't received a write or delete DefaultCompactFullWriteColdDuration = time.Duration(24 * time.Hour) // DefaultMaxPointsPerBlock is the maximum number of points in an encoded // block in a TSM file DefaultMaxPointsPerBlock = 1000 // DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database. DefaultMaxSeriesPerDatabase = 1000000 )
const EOF = int64(-1)
EOF represents a "not found" key returned by a Cursor.
Variables ¶
var ( // ErrFormatNotFound is returned when no format can be determined from a path. ErrFormatNotFound = errors.New("format not found") // ErrUnknownEngineFormat is returned when the engine format is // unknown. ErrUnknownEngineFormat is currently returned if a format // other than tsm1 is encountered. ErrUnknownEngineFormat = errors.New("unknown engine format") )
var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") // ErrEngineClosed is returned when a caller attempts indirectly to // access the shard's underlying engine. ErrEngineClosed = errors.New("engine is closed") // ErrShardDisabled is returned when a the shard is not available for // queries or writes. ErrShardDisabled = errors.New("shard is disabled") )
var ( // ErrShardNotFound gets returned when trying to get a non existing shard. ErrShardNotFound = fmt.Errorf("shard not found") // ErrStoreClosed gets returned when trying to use a closed Store. ErrStoreClosed = fmt.Errorf("store is closed") )
Functions ¶
func DecodeStorePath ¶ added in v0.11.0
DecodeStorePath extracts the database and retention policy names from a given shard or WAL path.
func MarshalTags ¶ added in v0.9.3
MarshalTags converts a tag set to bytes for use as a lookup key
func MeasurementFromSeriesKey ¶ added in v0.9.3
MeasurementFromSeriesKey returns the name of the measurement from a key that contains a measurement name.
func NewFieldKeysIterator ¶ added in v0.11.0
func NewSeriesIterator ¶ added in v0.11.0
NewSeriesIterator returns a new instance of SeriesIterator.
func NewShardError ¶ added in v0.11.0
NewShardError returns a new ShardError.
func NewTagKeysIterator ¶ added in v0.11.0
NewTagKeysIterator returns a new instance of TagKeysIterator.
func RegisterEngine ¶ added in v0.9.3
func RegisterEngine(name string, fn NewEngineFunc)
RegisterEngine registers a storage engine initializer by name.
func RegisteredEngines ¶ added in v0.9.5
func RegisteredEngines() []string
RegisteredEngines returns the slice of currently registered engines.
Types ¶
type Config ¶
type Config struct { Dir string `toml:"dir"` Engine string `toml:"engine"` // General WAL configuration options WALDir string `toml:"wal-dir"` WALLoggingEnabled bool `toml:"wal-logging-enabled"` // Query logging QueryLogEnabled bool `toml:"query-log-enabled"` // Compaction options for tsm1 (descriptions above with defaults) CacheMaxMemorySize uint64 `toml:"cache-max-memory-size"` CacheSnapshotMemorySize uint64 `toml:"cache-snapshot-memory-size"` CacheSnapshotWriteColdDuration toml.Duration `toml:"cache-snapshot-write-cold-duration"` CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"` MaxPointsPerBlock int `toml:"max-points-per-block"` // MaxSeriesPerDatabase is the maximum number of series a node can hold per database. // When this limit is exceeded, writes return a 'max series per database exceeded' error. // A value of 0 disables the limit. MaxSeriesPerDatabase int `toml:"max-series-per-database"` TraceLoggingEnabled bool `toml:"trace-logging-enabled"` }
Config holds the configuration for the tsbd package.
type Cursor ¶ added in v0.9.3
type Cursor interface { SeekTo(seek int64) (key int64, value interface{}) Next() (key int64, value interface{}) Ascending() bool }
Cursor represents an iterator over a series.
type DatabaseIndex ¶
type DatabaseIndex struct {
// contains filtered or unexported fields
}
DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewDatabaseIndex ¶
func NewDatabaseIndex(name string) *DatabaseIndex
NewDatabaseIndex returns a new initialized DatabaseIndex.
func (*DatabaseIndex) AssignShard ¶ added in v0.12.0
func (d *DatabaseIndex) AssignShard(k string, shardID uint64)
AssignShard update the index to indicate that series k exists in the given shardID
func (*DatabaseIndex) CreateMeasurementIndexIfNotExists ¶ added in v0.9.3
func (d *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement
CreateMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement
func (*DatabaseIndex) CreateSeriesIndexIfNotExists ¶ added in v0.9.3
func (d *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series
CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object
func (*DatabaseIndex) DropMeasurement ¶
func (d *DatabaseIndex) DropMeasurement(name string)
DropMeasurement removes the measurement and all of its underlying series from the database index
func (*DatabaseIndex) DropSeries ¶
func (d *DatabaseIndex) DropSeries(keys []string)
DropSeries removes the series keys and their tags from the index
func (*DatabaseIndex) Measurement ¶
func (d *DatabaseIndex) Measurement(name string) *Measurement
Measurement returns the measurement object from the index by the name
func (*DatabaseIndex) MeasurementSeriesCounts ¶
func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database. Useful for reporting and monitoring.
func (*DatabaseIndex) Measurements ¶
func (d *DatabaseIndex) Measurements() Measurements
Measurements returns a list of all measurements.
func (*DatabaseIndex) MeasurementsByExpr ¶ added in v1.0.0
func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error)
MeasurementsByExpr takes an expression containing only tags and returns a list of matching *Measurement. The bool return argument returns if the expression was a measurement expression. It is used to differentiate a list of no measurements because all measurements were filtered out (when the bool is true) against when there are no measurements because the expression wasn't evaluated (when the bool is false).
func (*DatabaseIndex) MeasurementsByName ¶ added in v0.9.5
func (d *DatabaseIndex) MeasurementsByName(names []string) []*Measurement
MeasurementsByName returns a list of measurements.
func (*DatabaseIndex) MeasurementsByRegex ¶ added in v0.11.0
func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements
MeasurementsByRegex returns the measurements that match the regex.
func (*DatabaseIndex) RemoveShard ¶ added in v0.13.0
func (d *DatabaseIndex) RemoveShard(shardID uint64)
RemoveShard removes all references to shardID from any series or measurements in the index. If the shard was the only owner of data for the series, the series is removed from the index.
func (*DatabaseIndex) Series ¶ added in v0.9.3
func (d *DatabaseIndex) Series(key string) *Series
Series returns a series by key.
func (*DatabaseIndex) SeriesKeys ¶ added in v0.13.0
func (d *DatabaseIndex) SeriesKeys() []string
func (*DatabaseIndex) SeriesN ¶ added in v0.9.3
func (d *DatabaseIndex) SeriesN() int
SeriesN returns the number of series.
func (*DatabaseIndex) SeriesShardN ¶ added in v1.0.0
func (d *DatabaseIndex) SeriesShardN(shardID uint64) int
SeriesShardN returns the series count for a shard.
func (*DatabaseIndex) Statistics ¶ added in v1.0.0
func (d *DatabaseIndex) Statistics(tags map[string]string) []models.Statistic
Statistics returns statistics for periodic monitoring.
func (*DatabaseIndex) TagsForSeries ¶ added in v0.9.3
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string
TagsForSeries returns the tag map for the passed in series
func (*DatabaseIndex) UnassignShard ¶ added in v0.13.0
func (d *DatabaseIndex) UnassignShard(k string, shardID uint64)
UnassignShard updates the index to indicate that series k does not exist in the given shardID
type Engine ¶ added in v0.9.3
type Engine interface { Open() error Close() error SetLogOutput(io.Writer) LoadMetadataIndex(shardID uint64, index *DatabaseIndex) error Backup(w io.Writer, basePath string, since time.Time) error Restore(r io.Reader, basePath string) error CreateIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) WritePoints(points []models.Point) error ContainsSeries(keys []string) (map[string]bool, error) DeleteSeries(keys []string) error DeleteSeriesRange(keys []string, min, max int64) error DeleteMeasurement(name string, seriesKeys []string) error SeriesCount() (n int, err error) MeasurementFields(measurement string) *MeasurementFields CreateSnapshot() (string, error) SetEnabled(enabled bool) // Format will return the format for the engine Format() EngineFormat // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic io.WriterTo }
Engine represents a swappable storage engine for the shard.
type EngineFormat ¶ added in v0.9.5
type EngineFormat int
EngineFormat represents the format for an engine.
const ( // TSM1Format is the format used by the tsm1 engine. TSM1Format EngineFormat = 2 )
type EngineOptions ¶ added in v0.9.3
EngineOptions represents the options used to initialize the engine.
func NewEngineOptions ¶ added in v0.9.3
func NewEngineOptions() EngineOptions
NewEngineOptions returns the default options.
type Field ¶ added in v0.9.3
type Field struct { ID uint8 `json:"id,omitempty"` Name string `json:"name,omitempty"` Type influxql.DataType `json:"type,omitempty"` }
Field represents a series field.
type FieldCreate ¶ added in v0.9.3
FieldCreate holds information for a field to create on a measurement
type FilterExprs ¶ added in v0.9.5
FilterExprs represents a map of series IDs to filter expressions.
func (FilterExprs) DeleteBoolLiteralTrues ¶ added in v0.9.5
func (fe FilterExprs) DeleteBoolLiteralTrues()
DeleteBoolLiteralTrues deletes all elements whose filter expression is a boolean literal true.
func (FilterExprs) Len ¶ added in v0.9.5
func (fe FilterExprs) Len() int
Len returns the number of elements.
type IndexStatistics ¶ added in v1.0.0
IndexStatistics maintains statistics for the index.
type Measurement ¶
type Measurement struct { Name string `json:"name,omitempty"` // contains filtered or unexported fields }
Measurement represents a collection of time series in a database. It also contains in memory structures for indexing tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewMeasurement ¶
func NewMeasurement(name string) *Measurement
NewMeasurement allocates and initializes a new Measurement.
func (*Measurement) AddSeries ¶
func (m *Measurement) AddSeries(s *Series) bool
AddSeries will add a series to the measurementIndex. Returns false if already present
func (*Measurement) AppendSeriesKeysByID ¶ added in v0.13.0
func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string
AppendSeriesKeysByID appends keys for a list of series ids to a buffer.
func (*Measurement) DropSeries ¶
func (m *Measurement) DropSeries(series *Series)
DropSeries will remove a series from the measurementIndex.
func (*Measurement) FieldNames ¶
func (m *Measurement) FieldNames() []string
FieldNames returns a list of the measurement's field names
func (*Measurement) HasField ¶
func (m *Measurement) HasField(name string) bool
HasField returns true if the measurement has a field by the given name
func (*Measurement) HasSeries ¶
func (m *Measurement) HasSeries() bool
HasSeries returns true if there is at least 1 series under this measurement
func (*Measurement) HasTagKey ¶
func (m *Measurement) HasTagKey(k string) bool
HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (*Measurement) SeriesByID ¶ added in v0.9.3
func (m *Measurement) SeriesByID(id uint64) *Series
SeriesByID returns a series by identifier.
func (*Measurement) SeriesByIDSlice ¶ added in v1.0.0
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series
SeriesByIDSlice returns a list of series by identifiers.
func (*Measurement) SeriesIDsAllOrByExpr ¶ added in v1.0.0
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error)
SeriesIDsAllOrByExpr walks an expressions for matching series IDs or, if no expressions is given, returns all series IDs for the measurement.
func (*Measurement) SeriesKeys ¶
func (m *Measurement) SeriesKeys() []string
SeriesKeys returns the keys of every series in this measurement
func (*Measurement) SetFieldName ¶ added in v0.9.3
func (m *Measurement) SetFieldName(name string)
SetFieldName adds the field name to the measurement.
func (*Measurement) TagKeys ¶
func (m *Measurement) TagKeys() []string
TagKeys returns a list of the measurement's tag names.
func (*Measurement) TagKeysByExpr ¶ added in v1.0.0
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
tagKeysByExpr extracts the tag keys wanted by the expression.
func (*Measurement) TagSets ¶
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error)
TagSets returns the unique tag sets that exist for the given tag keys. This is used to determine what composite series will be created by a group by. i.e. "group by region" should return: {"region":"uswest"}, {"region":"useast"} or region, service returns {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... This will also populate the TagSet objects with the series IDs that match each tagset and any influx filter expression that goes with the series TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (*Measurement) TagValues ¶ added in v0.9.4
func (m *Measurement) TagValues(key string) []string
TagValues returns all the values for the given tag key
func (*Measurement) ValidateGroupBy ¶ added in v0.9.2
func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error
ValidateGroupBy ensures that the GROUP BY is not a field.
type MeasurementFields ¶ added in v0.9.3
type MeasurementFields struct {
// contains filtered or unexported fields
}
MeasurementFields holds the fields of a measurement and their codec.
func NewMeasurementFields ¶ added in v0.12.1
func NewMeasurementFields() *MeasurementFields
func (*MeasurementFields) CreateFieldIfNotExists ¶ added in v0.9.3
func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType, limitCount bool) error
CreateFieldIfNotExists creates a new field with an autoincrementing ID. Returns an error if 255 fields have already been created on the measurement or the fields already exists with a different type.
func (*MeasurementFields) Field ¶ added in v0.12.1
func (m *MeasurementFields) Field(name string) *Field
func (*MeasurementFields) FieldSet ¶ added in v1.0.0
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType
func (*MeasurementFields) MarshalBinary ¶ added in v0.9.3
func (m *MeasurementFields) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*MeasurementFields) UnmarshalBinary ¶ added in v0.9.3
func (m *MeasurementFields) UnmarshalBinary(buf []byte) error
UnmarshalBinary decodes the object from a binary format.
type Measurements ¶
type Measurements []*Measurement
Measurements represents a list of *Measurement.
func (Measurements) Len ¶
func (a Measurements) Len() int
func (Measurements) Less ¶
func (a Measurements) Less(i, j int) bool
func (Measurements) Swap ¶
func (a Measurements) Swap(i, j int)
type NewEngineFunc ¶ added in v0.9.3
type NewEngineFunc func(path string, walPath string, options EngineOptions) Engine
NewEngineFunc creates a new engine.
type PointBatcher ¶
type PointBatcher struct {
// contains filtered or unexported fields
}
PointBatcher accepts Points and will emit a batch of those points when either a) the batch reaches a certain size, or b) a certain time passes.
func NewPointBatcher ¶
func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher
NewPointBatcher returns a new PointBatcher. sz is the batching size, bp is the maximum number of batches that may be pending. d is the time after which a batch will be emitted after the first point is received for the batch, regardless of its size.
func (*PointBatcher) Flush ¶
func (b *PointBatcher) Flush()
Flush instructs the batcher to emit any pending points in a batch, regardless of batch size. If there are no pending points, no batch is emitted.
func (*PointBatcher) In ¶
func (b *PointBatcher) In() chan<- models.Point
In returns the channel to which points should be written.
func (*PointBatcher) Out ¶
func (b *PointBatcher) Out() <-chan []models.Point
Out returns the channel from which batches should be read.
func (*PointBatcher) Start ¶
func (b *PointBatcher) Start()
Start starts the batching process. Returns the in and out channels for points and point-batches respectively.
func (*PointBatcher) Stats ¶
func (b *PointBatcher) Stats() *PointBatcherStats
Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be closely correlated with each other statistic, it is not guaranteed.
func (*PointBatcher) Stop ¶
func (b *PointBatcher) Stop()
Stop stops the batching process. Stop waits for the batching routine to stop before returning.
type PointBatcherStats ¶
type PointBatcherStats struct { BatchTotal uint64 // Total count of batches transmitted. PointTotal uint64 // Total count of points processed. SizeTotal uint64 // Number of batches that reached size threshold. TimeoutTotal uint64 // Number of timeouts that occurred. }
PointBatcherStats are the statistics each batcher tracks.
type Series ¶
Series belong to a Measurement and represent unique time series in a database
func (*Series) AssignShard ¶ added in v0.12.0
func (*Series) MarshalBinary ¶
MarshalBinary encodes the object to a binary format.
func (*Series) UnassignShard ¶ added in v0.13.0
func (*Series) UnmarshalBinary ¶
UnmarshalBinary decodes the object from a binary format.
type SeriesCreate ¶ added in v0.9.3
SeriesCreate holds information for a series to create
type SeriesIDs ¶ added in v0.9.3
type SeriesIDs []uint64
SeriesIDs is a convenience type for sorting, checking equality, and doing union and intersection of collections of series ids.
func (SeriesIDs) Intersect ¶ added in v0.9.3
Intersect returns a new collection of series ids in sorted order that is the intersection of the two. The two collections must already be sorted.
type Shard ¶
type Shard struct { EnableOnOpen bool // contains filtered or unexported fields }
Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.
func NewShard ¶
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard
NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func (*Shard) ContainsSeries ¶ added in v0.13.0
func (*Shard) CreateIterator ¶ added in v0.11.0
CreateIterator returns an iterator for the data in the shard.
func (*Shard) CreateSnapshot ¶ added in v1.0.0
CreateSnapshot will return a path to a temp directory containing hard links to the underlying shard files
func (*Shard) DeleteMeasurement ¶ added in v0.9.3
DeleteMeasurement deletes a measurement and all underlying series.
func (*Shard) DeleteSeries ¶ added in v0.9.3
DeleteSeries deletes a list of series.
func (*Shard) DeleteSeriesRange ¶ added in v0.13.0
DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
func (*Shard) ExpandSources ¶ added in v0.12.0
ExpandSources expands regex sources and removes duplicates. NOTE: sources must be normalized (db and rp set) before calling this function.
func (*Shard) FieldDimensions ¶ added in v0.11.0
func (s *Shard) FieldDimensions(sources influxql.Sources) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
FieldDimensions returns unique sets of fields and dimensions across a list of sources.
func (*Shard) Restore ¶ added in v1.0.0
Restore restores data to the underlying engine for the shard. The shard is reopened after restore.
func (*Shard) SeriesCount ¶ added in v0.9.1
SeriesCount returns the number of series buckets on the shard.
func (*Shard) SetEnabled ¶ added in v1.0.0
SetEnabled enables the shard for queries and write. When disabled, all writes and queries return an error and compactions are stopped for the shard.
func (*Shard) SetLogOutput ¶ added in v0.13.0
SetLogOutput sets the writer to which log output will be written. It is safe for concurrent use.
func (*Shard) Statistics ¶ added in v1.0.0
Statistics returns statistics for periodic monitoring.
func (*Shard) WritePoints ¶
WritePoints will write the raw data points and any new metadata to the index in the shard
type ShardError ¶ added in v0.11.0
type ShardError struct { Err error // contains filtered or unexported fields }
A ShardError implements the error interface, and contains extra context about the shard that generated the error.
func (ShardError) Error ¶ added in v0.11.0
func (e ShardError) Error() string
type ShardStatistics ¶ added in v1.0.0
type ShardStatistics struct { WriteReq int64 SeriesCreated int64 FieldsCreated int64 WritePointsFail int64 WritePointsOK int64 BytesWritten int64 DiskBytes int64 }
ShardStatistics maintains statistics for a shard.
type Store ¶
type Store struct { EngineOptions EngineOptions Logger *log.Logger // contains filtered or unexported fields }
Store manages shards and indexes for databases.
func NewStore ¶
NewStore returns a new store with the given path and a default configuration. The returned store must be initialized by calling Open before using it.
func (*Store) BackupShard ¶ added in v0.10.0
BackupShard will get the shard and have the engine backup since the passed in time to the writer
func (*Store) Close ¶
Close closes the store and all associated shards. After calling Close accessing shards through the Store will result in ErrStoreClosed being returned.
func (*Store) CreateShard ¶
CreateShard creates a shard with the given id and retention policy on a database.
func (*Store) CreateShardSnapshot ¶ added in v1.0.0
CreateShardSnapShot will create a hard link to the underlying shard and return a path The caller is responsible for cleaning up (removing) the file path returned
func (*Store) DatabaseIndex ¶
func (s *Store) DatabaseIndex(name string) *DatabaseIndex
DatabaseIndex returns the index for a database by its name.
func (*Store) DatabaseIndexN ¶ added in v0.9.3
DatabaseIndexN returns the number of databases indicies in the store.
func (*Store) DeleteDatabase ¶
DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (*Store) DeleteMeasurement ¶ added in v0.11.0
DeleteMeasurement removes a measurement and all associated series from a database.
func (*Store) DeleteRetentionPolicy ¶ added in v0.11.0
DeleteRetentionPolicy will close all shards associated with the provided retention policy, remove the retention policy directories on both the DB and WAL, and remove all shard files from disk.
func (*Store) DeleteSeries ¶ added in v0.11.0
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
func (*Store) DeleteShard ¶
DeleteShard removes a shard from disk.
func (*Store) DiskSize ¶ added in v0.9.4
DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size.
func (*Store) ExpandSources ¶ added in v0.11.0
ExpandSources expands sources against all local shards.
func (*Store) IteratorCreator ¶ added in v0.13.0
func (s *Store) IteratorCreator(shards []uint64, opt *influxql.SelectOptions) (influxql.IteratorCreator, error)
func (*Store) IteratorCreators ¶ added in v0.12.0
func (s *Store) IteratorCreators() influxql.IteratorCreators
IteratorCreators returns a set of all local shards as iterator creators.
func (*Store) Measurement ¶
func (s *Store) Measurement(database, name string) *Measurement
Measurement returns a measurement by name from the given database.
func (*Store) Measurements ¶ added in v1.0.0
func (*Store) Open ¶
Open initializes the store, creating all necessary directories, loading all shards and indexes and initializing periodic maintenance of all shards.
func (*Store) RestoreShard ¶ added in v1.0.0
RestoreShard restores a backup from r to a given shard. This will only overwrite files included in the backup.
func (*Store) SetLogOutput ¶ added in v0.13.0
SetLogOutput sets the writer to which all logs are written. It is safe for concurrent use.
func (*Store) SetShardEnabled ¶ added in v1.0.0
SetShardEnabled enables or disables a shard for read and writes
func (*Store) ShardIteratorCreator ¶ added in v0.11.0
func (s *Store) ShardIteratorCreator(id uint64, opt *influxql.SelectOptions) influxql.IteratorCreator
ShardIteratorCreator returns an iterator creator for a shard.
func (*Store) ShardRelativePath ¶ added in v0.10.0
ShardRelativePath will return the relative path to the shard. i.e. <database>/<retention>/<id>