Documentation ¶
Overview ¶
Package tsdb implements a durable time series database.
Index ¶
- Constants
- Variables
- func MakeTagsKey(keys []string, tags models.Tags) []byte
- func MarshalTags(tags map[string]string) []byte
- func MeasurementFromSeriesKey(key []byte) []byte
- func NewFieldKeysIterator(engine Engine, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func NewShardError(id uint64, err error) error
- func NewTagKeysIterator(engine Engine, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func RegisterEngine(name string, fn NewEngineFunc)
- func RegisterIndex(name string, fn NewIndexFunc)
- func RegisteredEngines() []string
- func RegisteredIndexes() []string
- type Config
- type Cursor
- type Engine
- type EngineFormat
- type EngineOptions
- type Field
- type FieldCreate
- type Index
- type IndexFormat
- type KeyValue
- type KeyValues
- type LimitError
- type MeasurementFieldSet
- type MeasurementFields
- func (m *MeasurementFields) Clone() *MeasurementFields
- func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType, limitCount bool) error
- func (m *MeasurementFields) Field(name string) *Field
- func (m *MeasurementFields) FieldBytes(name []byte) *Field
- func (m *MeasurementFields) FieldN() int
- func (m *MeasurementFields) FieldSet() map[string]influxql.DataType
- func (m *MeasurementFields) HasField(name string) bool
- func (m *MeasurementFields) MarshalBinary() ([]byte, error)
- func (m *MeasurementFields) UnmarshalBinary(buf []byte) error
- type NewEngineFunc
- type NewIndexFunc
- type PartialWriteError
- type PointBatcher
- type PointBatcherStats
- type Shard
- func (s *Shard) Backup(w io.Writer, basePath string, since time.Time) error
- func (s *Shard) Close() error
- func (s *Shard) CloseFast() error
- func (s *Shard) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func (s *Shard) CreateSnapshot() (string, error)
- func (s *Shard) Database() string
- func (s *Shard) DeleteMeasurement(name []byte) error
- func (s *Shard) DeleteSeries(seriesKeys [][]byte) error
- func (s *Shard) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error
- func (s *Shard) DiskSize() (int64, error)
- func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
- func (s *Shard) ForEachMeasurementName(fn func(name []byte) error) error
- func (s *Shard) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error
- func (s *Shard) ID() uint64
- func (s *Shard) Import(r io.Reader, basePath string) error
- func (s *Shard) IndexType() string
- func (s *Shard) IsIdle() bool
- func (s *Shard) LastModified() time.Time
- func (s *Shard) MeasurementExists(name []byte) (bool, error)
- func (s *Shard) MeasurementFields(name []byte) *MeasurementFields
- func (s *Shard) MeasurementNamesByExpr(cond influxql.Expr) ([][]byte, error)
- func (s *Shard) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
- func (s *Shard) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr) ([][]byte, error)
- func (s *Shard) MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
- func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
- func (s *Shard) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
- func (s *Shard) Open() error
- func (s *Shard) Path() string
- func (s *Shard) Restore(r io.Reader, basePath string) error
- func (s *Shard) RetentionPolicy() string
- func (s *Shard) SeriesN() int64
- func (s *Shard) SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
- func (s *Shard) SetCompactionsEnabled(enabled bool)
- func (s *Shard) SetEnabled(enabled bool)
- func (s *Shard) Statistics(tags map[string]string) []models.Statistic
- func (s *Shard) TagKeyCardinality(name, key []byte) int
- func (s *Shard) UnloadIndex()
- func (s *Shard) WithLogger(log zap.Logger)
- func (s *Shard) WritePoints(points []models.Point) error
- func (s *Shard) WriteTo(w io.Writer) (int64, error)
- type ShardError
- type ShardGroup
- type ShardStatistics
- type Shards
- func (a Shards) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
- func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
- func (a Shards) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
- func (a Shards) Len() int
- func (a Shards) Less(i, j int) bool
- func (a Shards) MapType(measurement, field string) influxql.DataType
- func (a Shards) MeasurementsByRegex(re *regexp.Regexp) []string
- func (a Shards) Swap(i, j int)
- type Store
- func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error
- func (s *Store) Close() error
- func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
- func (s *Store) CreateShardSnapshot(id uint64) (string, error)
- func (s *Store) Databases() []string
- func (s *Store) DeleteDatabase(name string) error
- func (s *Store) DeleteMeasurement(database, name string) error
- func (s *Store) DeleteRetentionPolicy(database, name string) error
- func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
- func (s *Store) DeleteShard(shardID uint64) error
- func (s *Store) DiskSize() (int64, error)
- func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error)
- func (s *Store) ImportShard(id uint64, r io.Reader) error
- func (s *Store) MeasurementNames(database string, cond influxql.Expr) ([][]byte, error)
- func (s *Store) MeasurementSeriesCounts(database string) (measuments int, series int)
- func (s *Store) MeasurementsCardinality(database string) (int64, error)
- func (s *Store) Open() error
- func (s *Store) Path() string
- func (s *Store) RestoreShard(id uint64, r io.Reader) error
- func (s *Store) SeriesCardinality(database string) (int64, error)
- func (s *Store) SetShardEnabled(shardID uint64, enabled bool) error
- func (s *Store) Shard(id uint64) *Shard
- func (s *Store) ShardGroup(ids []uint64) ShardGroup
- func (s *Store) ShardIDs() []uint64
- func (s *Store) ShardN() int
- func (s *Store) ShardRelativePath(id uint64) (string, error)
- func (s *Store) Shards(ids []uint64) []*Shard
- func (s *Store) Statistics(tags map[string]string) []models.Statistic
- func (s *Store) TagValues(database string, cond influxql.Expr) ([]TagValues, error)
- func (s *Store) WithLogger(log zap.Logger)
- func (s *Store) WriteToShard(shardID uint64, points []models.Point) error
- type TagValues
- type TagValuesSlice
Constants ¶
const ( // DefaultEngine is the default engine for new shards DefaultEngine = "tsm1" // DefaultIndex is the default index for new shards DefaultIndex = "inmem" // DefaultCacheMaxMemorySize is the maximum size a shard's cache can // reach before it starts rejecting writes. DefaultCacheMaxMemorySize = 1024 * 1024 * 1024 // 1GB // DefaultCacheSnapshotMemorySize is the size at which the engine will // snapshot the cache and write it to a TSM file, freeing up memory DefaultCacheSnapshotMemorySize = 25 * 1024 * 1024 // 25MB // DefaultCacheSnapshotWriteColdDuration is the length of time at which // the engine will snapshot the cache and write it to a new TSM file if // the shard hasn't received writes or deletes DefaultCacheSnapshotWriteColdDuration = time.Duration(10 * time.Minute) // DefaultCompactFullWriteColdDuration is the duration at which the engine // will compact all TSM files in a shard if it hasn't received a write or delete DefaultCompactFullWriteColdDuration = time.Duration(4 * time.Hour) // DefaultMaxPointsPerBlock is the maximum number of points in an encoded // block in a TSM file DefaultMaxPointsPerBlock = 1000 // DefaultMaxSeriesPerDatabase is the maximum number of series a node can hold per database. // This limit only applies to the "inmem" index. DefaultMaxSeriesPerDatabase = 1000000 // DefaultMaxValuesPerTag is the maximum number of values a tag can have within a measurement. DefaultMaxValuesPerTag = 100000 // DefaultMaxConcurrentCompactions is the maximum number of concurrent full and level compactions // that can run at one time. A value of results in runtime.GOMAXPROCS(0) used at runtime. DefaultMaxConcurrentCompactions = 0 )
const EOF = influxql.ZeroTime
EOF represents a "not found" key returned by a Cursor.
Variables ¶
var ( // ErrFormatNotFound is returned when no format can be determined from a path. ErrFormatNotFound = errors.New("format not found") // ErrUnknownEngineFormat is returned when the engine format is // unknown. ErrUnknownEngineFormat is currently returned if a format // other than tsm1 is encountered. ErrUnknownEngineFormat = errors.New("unknown engine format") )
var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") // ErrEngineClosed is returned when a caller attempts indirectly to // access the shard's underlying engine. ErrEngineClosed = errors.New("engine is closed") // ErrShardDisabled is returned when a the shard is not available for // queries or writes. ErrShardDisabled = errors.New("shard is disabled") )
var ( // ErrShardNotFound is returned when trying to get a non existing shard. ErrShardNotFound = fmt.Errorf("shard not found") // ErrStoreClosed is returned when trying to use a closed Store. ErrStoreClosed = fmt.Errorf("store is closed") )
var NewInmemIndex func(name string) (interface{}, error)
NewInmemIndex returns a new "inmem" index type.
Functions ¶
func MakeTagsKey ¶ added in v1.3.0
MakeTagsKey converts a tag set to bytes for use as a lookup key.
func MarshalTags ¶ added in v0.9.3
MarshalTags converts a tag set to bytes for use as a lookup key.
func MeasurementFromSeriesKey ¶ added in v0.9.3
MeasurementFromSeriesKey returns the name of the measurement from a key that contains a measurement name.
func NewFieldKeysIterator ¶ added in v0.11.0
NewFieldKeysIterator returns an iterator that can be iterated over to retrieve field keys.
func NewShardError ¶ added in v0.11.0
NewShardError returns a new ShardError.
func NewTagKeysIterator ¶ added in v0.11.0
NewTagKeysIterator returns a new instance of TagKeysIterator.
func RegisterEngine ¶ added in v0.9.3
func RegisterEngine(name string, fn NewEngineFunc)
RegisterEngine registers a storage engine initializer by name.
func RegisterIndex ¶ added in v1.3.0
func RegisterIndex(name string, fn NewIndexFunc)
RegisterIndex registers a storage index initializer by name.
func RegisteredEngines ¶ added in v0.9.5
func RegisteredEngines() []string
RegisteredEngines returns the slice of currently registered engines.
func RegisteredIndexes ¶ added in v1.3.0
func RegisteredIndexes() []string
RegisteredIndexs returns the slice of currently registered indexes.
Types ¶
type Config ¶
type Config struct { Dir string `toml:"dir"` Engine string `toml:"-"` Index string `toml:"index-version"` // General WAL configuration options WALDir string `toml:"wal-dir"` // WALFsyncDelay is the amount of time that a write will wait before fsyncing. A duration // greater than 0 can be used to batch up multiple fsync calls. This is useful for slower // disks or when WAL write contention is seen. A value of 0 fsyncs every write to the WAL. WALFsyncDelay toml.Duration `toml:"wal-fsync-delay"` // Query logging QueryLogEnabled bool `toml:"query-log-enabled"` // Compaction options for tsm1 (descriptions above with defaults) CacheMaxMemorySize uint64 `toml:"cache-max-memory-size"` CacheSnapshotMemorySize uint64 `toml:"cache-snapshot-memory-size"` CacheSnapshotWriteColdDuration toml.Duration `toml:"cache-snapshot-write-cold-duration"` CompactFullWriteColdDuration toml.Duration `toml:"compact-full-write-cold-duration"` // MaxSeriesPerDatabase is the maximum number of series a node can hold per database. // When this limit is exceeded, writes return a 'max series per database exceeded' error. // A value of 0 disables the limit. This limit only applies when using the "inmem" index. MaxSeriesPerDatabase int `toml:"max-series-per-database"` // MaxValuesPerTag is the maximum number of tag values a single tag key can have within // a measurement. When the limit is execeeded, writes return an error. // A value of 0 disables the limit. MaxValuesPerTag int `toml:"max-values-per-tag"` // MaxConcurrentCompactions is the maximum number of concurrent level and full compactions // that can be running at one time across all shards. Compactions scheduled to run when the // limit is reached are blocked until a running compaction completes. Snapshot compactions are // not affected by this limit. A value of 0 limits compactions to runtime.GOMAXPROCS(0). MaxConcurrentCompactions int `toml:"max-concurrent-compactions"` TraceLoggingEnabled bool `toml:"trace-logging-enabled"` }
Config holds the configuration for the tsbd package.
func (Config) Diagnostics ¶ added in v1.3.0
func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)
Diagnostics returns a diagnostics representation of a subset of the Config.
type Cursor ¶ added in v0.9.3
type Cursor interface { SeekTo(seek int64) (key int64, value interface{}) Next() (key int64, value interface{}) Ascending() bool }
Cursor represents an iterator over a series.
type Engine ¶ added in v0.9.3
type Engine interface { Open() error Close() error SetEnabled(enabled bool) SetCompactionsEnabled(enabled bool) WithLogger(zap.Logger) LoadMetadataIndex(shardID uint64, index Index) error CreateSnapshot() (string, error) Backup(w io.Writer, basePath string, since time.Time) error Restore(r io.Reader, basePath string) error Import(r io.Reader, basePath string) error CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error) WritePoints(points []models.Point) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error DeleteSeriesRange(keys [][]byte, min, max int64) error SeriesSketches() (estimator.Sketch, estimator.Sketch, error) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) SeriesN() int64 MeasurementExists(name []byte) (bool, error) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) MeasurementFields(measurement []byte) *MeasurementFields ForEachMeasurementName(fn func(name []byte) error) error DeleteMeasurement(name []byte) error // TagKeys(name []byte) ([][]byte, error) HasTagKey(name, key []byte) (bool, error) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error TagKeyCardinality(name, key []byte) int // InfluxQL iterators MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) // Statistics will return statistics relevant to this engine. Statistics(tags map[string]string) []models.Statistic LastModified() time.Time DiskSize() int64 IsIdle() bool io.WriterTo }
Engine represents a swappable storage engine for the shard.
type EngineFormat ¶ added in v0.9.5
type EngineFormat int
EngineFormat represents the format for an engine.
const ( // TSM1Format is the format used by the tsm1 engine. TSM1Format EngineFormat = 2 )
type EngineOptions ¶ added in v0.9.3
type EngineOptions struct { EngineVersion string IndexVersion string ShardID uint64 InmemIndex interface{} // shared in-memory index CompactionLimiter limiter.Fixed Config Config }
EngineOptions represents the options used to initialize the engine.
func NewEngineOptions ¶ added in v0.9.3
func NewEngineOptions() EngineOptions
NewEngineOptions returns the default options.
type Field ¶ added in v0.9.3
type Field struct { ID uint8 `json:"id,omitempty"` Name string `json:"name,omitempty"` Type influxql.DataType `json:"type,omitempty"` }
Field represents a series field.
type FieldCreate ¶ added in v0.9.3
FieldCreate holds information for a field to create on a measurement.
type Index ¶ added in v1.3.0
type Index interface { Open() error Close() error WithLogger(zap.Logger) MeasurementExists(name []byte) (bool, error) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) DropMeasurement(name []byte) error ForEachMeasurementName(fn func(name []byte) error) error InitializeSeries(key, name []byte, tags models.Tags) error CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error DropSeries(key []byte) error SeriesSketches() (estimator.Sketch, estimator.Sketch, error) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) SeriesN() int64 HasTagKey(name, key []byte) (bool, error) TagSets(name []byte, options influxql.IteratorOptions) ([]*influxql.TagSet, error) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error) MeasurementTagKeyValuesByExpr(name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) ForEachMeasurementTagKey(name []byte, fn func(key []byte) error) error TagKeyCardinality(name, key []byte) int // InfluxQL system iterators MeasurementSeriesKeysByExpr(name []byte, condition influxql.Expr) ([][]byte, error) ForEachMeasurementSeriesByExpr(name []byte, expr influxql.Expr, fn func(tags models.Tags) error) error SeriesPointIterator(opt influxql.IteratorOptions) (influxql.Iterator, error) // Sets a shared fieldset from the engine. SetFieldSet(fs *MeasurementFieldSet) // Creates hard links inside path for snapshotting. SnapshotTo(path string) error // To be removed w/ tsi1. SetFieldName(measurement []byte, name string) AssignShard(k string, shardID uint64) UnassignShard(k string, shardID uint64) error RemoveShard(shardID uint64) Type() string }
func MustOpenIndex ¶ added in v1.3.0
func MustOpenIndex(id uint64, database, path string, options EngineOptions) Index
type IndexFormat ¶ added in v1.3.0
type IndexFormat int
IndexFormat represents the format for an index.
const ( // InMemFormat is the format used by the original in-memory shared index. InMemFormat IndexFormat = 1 // TSI1Format is the format used by the tsi1 index. TSI1Format IndexFormat = 2 )
type KeyValue ¶ added in v1.0.0
type KeyValue struct {
Key, Value string
}
KeyValue holds a string key and a string value.
type KeyValues ¶ added in v1.0.0
type KeyValues []KeyValue
KeyValues is a sortable slice of KeyValue.
type LimitError ¶ added in v1.3.0
type LimitError struct {
Reason string
}
LimitError represents an error caused by a configurable limit.
func (*LimitError) Error ¶ added in v1.3.0
func (e *LimitError) Error() string
type MeasurementFieldSet ¶ added in v1.3.0
type MeasurementFieldSet struct {
// contains filtered or unexported fields
}
MeasurementFieldSet represents a collection of fields by measurement. This safe for concurrent use.
func NewMeasurementFieldSet ¶ added in v1.3.0
func NewMeasurementFieldSet() *MeasurementFieldSet
NewMeasurementFieldSet returns a new instance of MeasurementFieldSet.
func (*MeasurementFieldSet) CreateFieldsIfNotExists ¶ added in v1.3.0
func (fs *MeasurementFieldSet) CreateFieldsIfNotExists(name []byte) *MeasurementFields
CreateFieldsIfNotExists returns fields for a measurement by name.
func (*MeasurementFieldSet) Delete ¶ added in v1.3.0
func (fs *MeasurementFieldSet) Delete(name string)
Delete removes a field set for a measurement.
func (*MeasurementFieldSet) DeleteWithLock ¶ added in v1.3.0
func (fs *MeasurementFieldSet) DeleteWithLock(name string, fn func() error) error
DeleteWithLock executes fn and removes a field set from a measurement under lock.
func (*MeasurementFieldSet) Fields ¶ added in v1.3.0
func (fs *MeasurementFieldSet) Fields(name string) *MeasurementFields
Fields returns fields for a measurement by name.
type MeasurementFields ¶ added in v0.9.3
type MeasurementFields struct {
// contains filtered or unexported fields
}
MeasurementFields holds the fields of a measurement and their codec.
func NewMeasurementFields ¶ added in v0.12.1
func NewMeasurementFields() *MeasurementFields
NewMeasurementFields returns an initialised *MeasurementFields value.
func (*MeasurementFields) Clone ¶ added in v1.3.0
func (m *MeasurementFields) Clone() *MeasurementFields
Clone returns copy of the MeasurementFields
func (*MeasurementFields) CreateFieldIfNotExists ¶ added in v0.9.3
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType, limitCount bool) error
CreateFieldIfNotExists creates a new field with an autoincrementing ID. Returns an error if 255 fields have already been created on the measurement or the fields already exists with a different type.
func (*MeasurementFields) Field ¶ added in v0.12.1
func (m *MeasurementFields) Field(name string) *Field
Field returns the field for name, or nil if there is no field for name.
func (*MeasurementFields) FieldBytes ¶ added in v1.1.0
func (m *MeasurementFields) FieldBytes(name []byte) *Field
FieldBytes returns the field for name, or nil if there is no field for name. FieldBytes should be preferred to Field when the caller has a []byte, because it avoids a string allocation, which can't be avoided if the caller converts the []byte to a string and calls Field.
func (*MeasurementFields) FieldN ¶ added in v1.3.0
func (m *MeasurementFields) FieldN() int
func (*MeasurementFields) FieldSet ¶ added in v1.0.0
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType
FieldSet returns the set of fields and their types for the measurement.
func (*MeasurementFields) HasField ¶ added in v1.3.0
func (m *MeasurementFields) HasField(name string) bool
func (*MeasurementFields) MarshalBinary ¶ added in v0.9.3
func (m *MeasurementFields) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*MeasurementFields) UnmarshalBinary ¶ added in v0.9.3
func (m *MeasurementFields) UnmarshalBinary(buf []byte) error
UnmarshalBinary decodes the object from a binary format.
type NewEngineFunc ¶ added in v0.9.3
type NewEngineFunc func(id uint64, i Index, database, path string, walPath string, options EngineOptions) Engine
NewEngineFunc creates a new engine.
type NewIndexFunc ¶ added in v1.3.0
type NewIndexFunc func(id uint64, database, path string, options EngineOptions) Index
NewIndexFunc creates a new index.
type PartialWriteError ¶ added in v1.1.0
type PartialWriteError struct { Reason string Dropped int // The set of series keys that were dropped. Can be nil. DroppedKeys map[string]struct{} }
PartialWriteError indicates a write request could only write a portion of the requested values.
func (PartialWriteError) Error ¶ added in v1.1.0
func (e PartialWriteError) Error() string
type PointBatcher ¶
type PointBatcher struct {
// contains filtered or unexported fields
}
PointBatcher accepts Points and will emit a batch of those points when either a) the batch reaches a certain size, or b) a certain time passes.
func NewPointBatcher ¶
func NewPointBatcher(sz int, bp int, d time.Duration) *PointBatcher
NewPointBatcher returns a new PointBatcher. sz is the batching size, bp is the maximum number of batches that may be pending. d is the time after which a batch will be emitted after the first point is received for the batch, regardless of its size.
func (*PointBatcher) Flush ¶
func (b *PointBatcher) Flush()
Flush instructs the batcher to emit any pending points in a batch, regardless of batch size. If there are no pending points, no batch is emitted.
func (*PointBatcher) In ¶
func (b *PointBatcher) In() chan<- models.Point
In returns the channel to which points should be written.
func (*PointBatcher) Out ¶
func (b *PointBatcher) Out() <-chan []models.Point
Out returns the channel from which batches should be read.
func (*PointBatcher) Start ¶
func (b *PointBatcher) Start()
Start starts the batching process. Returns the in and out channels for points and point-batches respectively.
func (*PointBatcher) Stats ¶
func (b *PointBatcher) Stats() *PointBatcherStats
Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be closely correlated with each other statistic, it is not guaranteed.
func (*PointBatcher) Stop ¶
func (b *PointBatcher) Stop()
Stop stops the batching process. Stop waits for the batching routine to stop before returning.
type PointBatcherStats ¶
type PointBatcherStats struct { BatchTotal uint64 // Total count of batches transmitted. PointTotal uint64 // Total count of points processed. SizeTotal uint64 // Number of batches that reached size threshold. TimeoutTotal uint64 // Number of timeouts that occurred. }
PointBatcherStats are the statistics each batcher tracks.
type Shard ¶
type Shard struct { EnableOnOpen bool // contains filtered or unexported fields }
Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.
func NewShard ¶
func NewShard(id uint64, path string, walPath string, opt EngineOptions) *Shard
NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func (*Shard) Backup ¶ added in v1.3.6
Backup backs up the shard by creating a tar archive of all TSM files that have been modified since the provided time. See Engine.Backup for more details.
func (*Shard) CloseFast ¶ added in v1.3.0
CloseFast closes the shard without cleaning up the shard ID or any of the shard's series keys from the index it belongs to.
CloseFast can be called when the entire index is being removed, e.g., when the database the shard belongs to is being dropped.
func (*Shard) CreateIterator ¶ added in v0.11.0
func (s *Shard) CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error)
CreateIterator returns an iterator for the data in the shard.
func (*Shard) CreateSnapshot ¶ added in v1.0.0
CreateSnapshot will return a path to a temp directory containing hard links to the underlying shard files.
func (*Shard) DeleteMeasurement ¶ added in v0.9.3
DeleteMeasurement deletes a measurement and all underlying series.
func (*Shard) DeleteSeries ¶ added in v0.9.3
DeleteSeries deletes a list of series.
func (*Shard) DeleteSeriesRange ¶ added in v0.13.0
DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
func (*Shard) FieldDimensions ¶ added in v0.11.0
func (s *Shard) FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)
FieldDimensions returns unique sets of fields and dimensions across a list of sources.
func (*Shard) ForEachMeasurementName ¶ added in v1.3.6
ForEachMeasurementName iterates over each measurement in the shard.
func (*Shard) ForEachMeasurementTagKey ¶ added in v1.3.0
func (*Shard) Import ¶ added in v1.3.0
Import imports data to the underlying engine for the shard. r should be a reader from a backup created by Backup.
func (*Shard) IsIdle ¶ added in v1.3.0
IsIdle return true if the shard is not receiving writes and is fully compacted.
func (*Shard) LastModified ¶ added in v1.2.0
LastModified returns the time when this shard was last modified.
func (*Shard) MeasurementExists ¶ added in v1.3.0
MeasurementExists returns true if the shard contains name. TODO(edd): This method is currently only being called from tests; do we really need it?
func (*Shard) MeasurementFields ¶ added in v1.3.0
func (s *Shard) MeasurementFields(name []byte) *MeasurementFields
MeasurementFields returns fields for a measurement. TODO(edd): This method is currently only being called from tests; do we really need it?
func (*Shard) MeasurementNamesByExpr ¶ added in v1.3.0
MeasurementNamesByExpr returns names of measurements matching the condition. If cond is nil then all measurement names are returned.
func (*Shard) MeasurementNamesByRegex ¶ added in v1.3.6
MeasurementNamesByRegex returns names of measurements matching the regular expression.
func (*Shard) MeasurementSeriesKeysByExpr ¶ added in v1.3.6
MeasurementSeriesKeysByExpr returns a list of series keys from the shard matching expr.
func (*Shard) MeasurementTagKeyValuesByExpr ¶ added in v1.3.6
func (s *Shard) MeasurementTagKeyValuesByExpr(name []byte, key []string, expr influxql.Expr, keysSorted bool) ([][]string, error)
MeasurementTagKeyValuesByExpr returns all the tag keys values for the provided expression.
func (*Shard) MeasurementTagKeysByExpr ¶ added in v1.3.6
func (s *Shard) MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
MeasurementTagKeysByExpr returns all the tag keys for the provided expression.
func (*Shard) MeasurementsSketches ¶ added in v1.3.0
MeasurementsSketches returns the measurement sketches for the shard.
func (*Shard) Restore ¶ added in v1.0.0
Restore restores data to the underlying engine for the shard. The shard is reopened after restore.
func (*Shard) RetentionPolicy ¶ added in v1.3.0
RetentionPolicy returns the retention policy of the shard.
func (*Shard) SeriesSketches ¶ added in v1.3.0
SeriesSketches returns the series sketches for the shard.
func (*Shard) SetCompactionsEnabled ¶ added in v1.3.0
SetCompactionsEnabled enables or disable shard background compactions.
func (*Shard) SetEnabled ¶ added in v1.0.0
SetEnabled enables the shard for queries and write. When disabled, all writes and queries return an error and compactions are stopped for the shard.
func (*Shard) Statistics ¶ added in v1.0.0
Statistics returns statistics for periodic monitoring.
func (*Shard) TagKeyCardinality ¶ added in v1.3.0
func (*Shard) UnloadIndex ¶ added in v1.0.1
func (s *Shard) UnloadIndex()
UnloadIndex removes all references to this shard from the DatabaseIndex
func (*Shard) WithLogger ¶ added in v1.2.0
WithLogger sets the logger on the shard.
func (*Shard) WritePoints ¶
WritePoints will write the raw data points and any new metadata to the index in the shard.
type ShardError ¶ added in v0.11.0
type ShardError struct { Err error // contains filtered or unexported fields }
A ShardError implements the error interface, and contains extra context about the shard that generated the error.
func (ShardError) Error ¶ added in v0.11.0
func (e ShardError) Error() string
Error returns the string representation of the error, to satisfy the error interface.
type ShardGroup ¶ added in v1.2.0
type ShardGroup interface { MeasurementsByRegex(re *regexp.Regexp) []string FieldDimensions(measurements []string) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error) MapType(measurement, field string) influxql.DataType CreateIterator(measurement string, opt influxql.IteratorOptions) (influxql.Iterator, error) ExpandSources(sources influxql.Sources) (influxql.Sources, error) }
type ShardStatistics ¶ added in v1.0.0
type ShardStatistics struct { WriteReq int64 WriteReqOK int64 WriteReqErr int64 FieldsCreated int64 WritePointsErr int64 WritePointsDropped int64 WritePointsOK int64 BytesWritten int64 DiskBytes int64 }
ShardStatistics maintains statistics for a shard.
type Shards ¶ added in v0.11.0
type Shards []*Shard
Shards represents a sortable list of shards.
func (Shards) CreateIterator ¶ added in v1.2.0
func (Shards) ExpandSources ¶ added in v1.2.0
func (Shards) FieldDimensions ¶ added in v1.2.0
func (Shards) MeasurementsByRegex ¶ added in v1.2.0
MeasurementsByRegex returns the unique set of measurements matching the provided regex, for all the shards.
type Store ¶
type Store struct { EngineOptions EngineOptions Logger zap.Logger // contains filtered or unexported fields }
Store manages shards and indexes for databases.
func NewStore ¶
NewStore returns a new store with the given path and a default configuration. The returned store must be initialized by calling Open before using it.
func (*Store) BackupShard ¶ added in v0.10.0
BackupShard will get the shard and have the engine backup since the passed in time to the writer.
func (*Store) Close ¶
Close closes the store and all associated shards. After calling Close accessing shards through the Store will result in ErrStoreClosed being returned.
func (*Store) CreateShard ¶
CreateShard creates a shard with the given id and retention policy on a database.
func (*Store) CreateShardSnapshot ¶ added in v1.0.0
CreateShardSnapShot will create a hard link to the underlying shard and return a path. The caller is responsible for cleaning up (removing) the file path returned.
func (*Store) Databases ¶ added in v0.9.4
Databases returns the names of all databases managed by the store.
func (*Store) DeleteDatabase ¶
DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (*Store) DeleteMeasurement ¶ added in v0.11.0
DeleteMeasurement removes a measurement and all associated series from a database.
func (*Store) DeleteRetentionPolicy ¶ added in v0.11.0
DeleteRetentionPolicy will close all shards associated with the provided retention policy, remove the retention policy directories on both the DB and WAL, and remove all shard files from disk.
func (*Store) DeleteSeries ¶ added in v0.11.0
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeries loops through the local shards and deletes the series data for the passed in series keys.
func (*Store) DeleteShard ¶
DeleteShard removes a shard from disk.
func (*Store) DiskSize ¶ added in v0.9.4
DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size.
func (*Store) ExpandSources ¶ added in v0.11.0
ExpandSources expands sources against all local shards.
func (*Store) ImportShard ¶ added in v1.3.0
ImportShard imports the contents of r to a given shard. All files in the backup are added as new files which may cause duplicated data to occur requiring more expensive compactions.
func (*Store) MeasurementNames ¶ added in v1.3.0
MeasurementNames returns a slice of all measurements. Measurements accepts an optional condition expression. If cond is nil, then all measurements for the database will be returned.
func (*Store) MeasurementSeriesCounts ¶ added in v1.3.0
MeasurementSeriesCounts returns the number of measurements and series in all the shards' indices.
func (*Store) MeasurementsCardinality ¶ added in v1.3.0
MeasurementsCardinality returns the measurement cardinality for the provided database.
func (*Store) Open ¶
Open initializes the store, creating all necessary directories, loading all shards as well as initializing periodic maintenance of them.
func (*Store) RestoreShard ¶ added in v1.0.0
RestoreShard restores a backup from r to a given shard. This will only overwrite files included in the backup.
func (*Store) SeriesCardinality ¶ added in v1.3.0
SeriesCardinality returns the series cardinality for the provided database.
func (*Store) SetShardEnabled ¶ added in v1.0.0
SetShardEnabled enables or disables a shard for read and writes.
func (*Store) ShardGroup ¶ added in v1.2.0
func (s *Store) ShardGroup(ids []uint64) ShardGroup
ShardGroup returns a ShardGroup with a list of shards by id.
func (*Store) ShardRelativePath ¶ added in v0.10.0
ShardRelativePath will return the relative path to the shard, i.e., <database>/<retention>/<id>.
func (*Store) Statistics ¶ added in v1.0.0
Statistics returns statistics for period monitoring.
func (*Store) TagValues ¶ added in v1.0.0
TagValues returns the tag keys and values in the given database, matching the condition.
func (*Store) WithLogger ¶ added in v1.2.0
WithLogger sets the logger for the store.
type TagValuesSlice ¶ added in v1.3.0
type TagValuesSlice []TagValues
func (TagValuesSlice) Len ¶ added in v1.3.0
func (a TagValuesSlice) Len() int
func (TagValuesSlice) Less ¶ added in v1.3.0
func (a TagValuesSlice) Less(i, j int) bool
func (TagValuesSlice) Swap ¶ added in v1.3.0
func (a TagValuesSlice) Swap(i, j int)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package engine can be imported to initialize and register all available TSDB engines.
|
Package engine can be imported to initialize and register all available TSDB engines. |
tsm1
Package tsm1 provides a TSDB in the Time Structured Merge tree format.
|
Package tsm1 provides a TSDB in the Time Structured Merge tree format. |
inmem
Package inmem implements a shared, in-memory index for each database.
|
Package inmem implements a shared, in-memory index for each database. |
tsi1
Package tsi1 provides a memory-mapped index implementation that supports high cardinality series.
|
Package tsi1 provides a memory-mapped index implementation that supports high cardinality series. |
Package meta is a generated protocol buffer package.
|
Package meta is a generated protocol buffer package. |