Documentation ¶
Overview ¶
Package tsdb implements a durable time series database.
Index ¶
- Constants
- Variables
- func ErrDatabaseNotFound(name string) error
- func ErrMeasurementNotFound(name string) error
- type Config
- type DatabaseIndex
- type ErrAuthorize
- type FieldCodec
- func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
- func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
- func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
- func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
- type Fields
- type LocalMapper
- func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error
- func (l *LocalMapper) Close()
- func (l *LocalMapper) IsEmpty(tmax int64) bool
- func (l *LocalMapper) Next() (seriesKey string, timestamp int64, value interface{})
- func (l *LocalMapper) NextInterval() (interface{}, error)
- func (l *LocalMapper) Open() error
- type Measurement
- func (m *Measurement) AddSeries(s *Series) bool
- func (m *Measurement) DropSeries(seriesID uint64) bool
- func (m *Measurement) FieldNames() (a []string)
- func (m *Measurement) HasField(name string) bool
- func (m *Measurement) HasSeries() bool
- func (m *Measurement) HasTagKey(k string) bool
- func (m *Measurement) SeriesKeys() []string
- func (m *Measurement) TagKeys() []string
- func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error)
- type Measurements
- type Monitor
- type Point
- type PointBatcher
- type PointBatcherStats
- type QueryExecutor
- func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *influxql.Query, database string) error
- func (q *QueryExecutor) Begin() (influxql.Tx, error)
- func (q *QueryExecutor) DiagnosticsAsRows() []*influxql.Row
- func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
- type Series
- type Shard
- func (s *Shard) Close() error
- func (s *Shard) DB() *bolt.DB
- func (s *Shard) FieldCodec(measurementName string) *FieldCodec
- func (s *Shard) Open() error
- func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error
- func (s *Shard) WritePoints(points []Point) error
- type Store
- func (s *Store) Close() error
- func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error
- func (s *Store) DatabaseIndex(name string) *DatabaseIndex
- func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error
- func (s *Store) DeleteShard(shardID uint64) error
- func (s *Store) Measurement(database, name string) *Measurement
- func (s *Store) Open() error
- func (s *Store) Shard(shardID uint64) *Shard
- func (s *Store) ShardIDs() []uint64
- func (s *Store) ValidateAggregateFieldsInStatement(shardID uint64, measurementName string, stmt *influxql.SelectStatement) error
- func (s *Store) WriteToShard(shardID uint64, points []Point) error
- type TagFilter
- type Tags
Constants ¶
const ( // DefaultRetentionAutoCreate is the default for auto-creating retention policies DefaultRetentionAutoCreate = true // DefaultRetentionCheckEnabled is the default for checking for retention policy enforcement DefaultRetentionCheckEnabled = true // DefaultRetentionCreatePeriod represents how often the server will check to see if new // shard groups need to be created in advance for writing DefaultRetentionCreatePeriod = 45 * time.Minute // DefaultRetentionCheckPeriod is the period of time between retention policy checks are run DefaultRetentionCheckPeriod = 10 * time.Minute )
Variables ¶
var ( // ErrInvalidQuery is returned when executing an unknown query type. ErrInvalidQuery = errors.New("invalid query") // ErrNotExecuted is returned when a statement is not executed in a query. // This can occur when a previous statement in the same query has errored. ErrNotExecuted = errors.New("not executed") )
var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") )
var (
ErrShardNotFound = fmt.Errorf("shard not found")
)
Functions ¶
func ErrDatabaseNotFound ¶
func ErrMeasurementNotFound ¶
Types ¶
type Config ¶
type Config struct { Dir string `toml:"dir"` RetentionAutoCreate bool `toml:"retention-auto-create"` RetentionCheckEnabled bool `toml:"retention-check-enabled"` RetentionCheckPeriod toml.Duration `toml:"retention-check-period"` RetentionCreatePeriod toml.Duration `toml:"retention-create-period"` }
func (*Config) ShardGroupPreCreateCheckPeriod ¶
ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups. If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod
type DatabaseIndex ¶
type DatabaseIndex struct {
// contains filtered or unexported fields
}
DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewDatabaseIndex ¶
func NewDatabaseIndex() *DatabaseIndex
func (*DatabaseIndex) DropMeasurement ¶
func (db *DatabaseIndex) DropMeasurement(name string)
DropMeasurement removes the measurement and all of its underlying series from the database index
func (*DatabaseIndex) DropSeries ¶
func (db *DatabaseIndex) DropSeries(keys []string)
DropSeries removes the series keys and their tags from the index
func (*DatabaseIndex) Measurement ¶
func (d *DatabaseIndex) Measurement(name string) *Measurement
Measurement returns the measurement object from the index by the name
func (*DatabaseIndex) Measurements ¶
func (db *DatabaseIndex) Measurements() Measurements
Measurements returns a list of all measurements.
type ErrAuthorize ¶
type ErrAuthorize struct {
// contains filtered or unexported fields
}
ErrAuthorize represents an authorization error.
func (ErrAuthorize) Error ¶
func (e ErrAuthorize) Error() string
Error returns the text of the error.
type FieldCodec ¶
type FieldCodec struct {
// contains filtered or unexported fields
}
FieldCodec providecs encoding and decoding functionality for the fields of a given Measurement. It is a distinct type to avoid locking writes on this node while potentially long-running queries are executing.
It is not affected by changes to the Measurement object after codec creation. TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
However, this is here until tx.go and the engine get refactored into tsdb.
func (*FieldCodec) DecodeByID ¶
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
DecodeByID scans a byte slice for a field with the given ID, converts it to its expected type, and return that value. TODO: shouldn't be exported. refactor engine
func (*FieldCodec) DecodeFields ¶
func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
DecodeFields decodes a byte slice into a set of field ids and values.
func (*FieldCodec) DecodeFieldsWithNames ¶
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
DecodeFieldsWithNames decodes a byte slice into a set of field names and values TODO: shouldn't be exported. refactor engine
func (*FieldCodec) EncodeFields ¶
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
EncodeFields converts a map of values with string keys to a byte slice of field IDs and values.
If a field exists in the codec, but its type is different, an error is returned. If a field is not present in the codec, the system panics.
func (*FieldCodec) FieldIDByName ¶
func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
type LocalMapper ¶
type LocalMapper struct {
// contains filtered or unexported fields
}
LocalMapper implements the influxql.Mapper interface for running map tasks over a shard that is local to this server
func (*LocalMapper) Begin ¶
Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time
func (*LocalMapper) IsEmpty ¶
func (l *LocalMapper) IsEmpty(tmax int64) bool
IsEmpty returns true if either all cursors are nil or all cursors are past the passed in max time
func (*LocalMapper) Next ¶
func (l *LocalMapper) Next() (seriesKey string, timestamp int64, value interface{})
Next returns the next matching timestamped value for the LocalMapper.
func (*LocalMapper) NextInterval ¶
func (l *LocalMapper) NextInterval() (interface{}, error)
NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. If this is a raw query, interval should be the max time to hit in the query
type Measurement ¶
type Measurement struct { Name string `json:"name,omitempty"` // contains filtered or unexported fields }
Measurement represents a collection of time series in a database. It also contains in memory structures for indexing tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewMeasurement ¶
func NewMeasurement(name string, idx *DatabaseIndex) *Measurement
NewMeasurement allocates and initializes a new Measurement.
func (*Measurement) AddSeries ¶
func (m *Measurement) AddSeries(s *Series) bool
AddSeries will add a series to the measurementIndex. Returns false if already present
func (*Measurement) DropSeries ¶
func (m *Measurement) DropSeries(seriesID uint64) bool
DropSeries will remove a series from the measurementIndex. Returns true if already removed
func (*Measurement) FieldNames ¶
func (m *Measurement) FieldNames() (a []string)
FieldNames returns a list of the measurement's field names
func (*Measurement) HasField ¶
func (m *Measurement) HasField(name string) bool
HasField returns true if the measurement has a field by the given name
func (*Measurement) HasSeries ¶
func (m *Measurement) HasSeries() bool
HasSeries returns true if there is at least 1 series under this measurement
func (*Measurement) HasTagKey ¶
func (m *Measurement) HasTagKey(k string) bool
HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (*Measurement) SeriesKeys ¶
func (m *Measurement) SeriesKeys() []string
SeriesKeys returns the keys of every series in this measurement
func (*Measurement) TagKeys ¶
func (m *Measurement) TagKeys() []string
TagKeys returns a list of the measurement's tag names.
func (*Measurement) TagSets ¶
func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error)
tagSets returns the unique tag sets that exist for the given tag keys. This is used to determine what composite series will be created by a group by. i.e. "group by region" should return: {"region":"uswest"}, {"region":"useast"} or region, service returns {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... This will also populate the TagSet objects with the series IDs that match each tagset and any influx filter expression that goes with the series TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
type Measurements ¶
type Measurements []*Measurement
Measurements represents a list of *Measurement.
func (Measurements) Len ¶
func (a Measurements) Len() int
func (Measurements) Less ¶
func (a Measurements) Less(i, j int) bool
func (Measurements) Swap ¶
func (a Measurements) Swap(i, j int)
type Monitor ¶
type Monitor struct {
Store interface{}
}
Monitor represents a TSDB monitoring service.
type Point ¶
type Point interface { Name() string SetName(string) Tags() Tags AddTag(key, value string) SetTags(tags Tags) Fields() Fields AddField(name string, value interface{}) Time() time.Time SetTime(t time.Time) UnixNano() int64 HashID() uint64 Key() []byte Data() []byte SetData(buf []byte) String() string }
Point defines the values that will be written to the database
func NewPoint ¶
NewPoint returns a new point with the given measurement name, tags, fiels and timestamp
func ParsePoints ¶
ParsePoints returns a slice of Points from a text representation of a point with each point separated by newlines.
func ParsePointsString ¶
type PointBatcher ¶
type PointBatcher struct {
// contains filtered or unexported fields
}
PointBatcher accepts Points and will emit a batch of those points when either a) the batch reaches a certain size, or b) a certain time passes.
func NewPointBatcher ¶
func NewPointBatcher(sz int, d time.Duration) *PointBatcher
NewPointBatcher returns a new PointBatcher.
func (*PointBatcher) Flush ¶
func (b *PointBatcher) Flush()
Flush instructs the batcher to emit any pending points in a batch, regardless of batch size. If there are no pending points, no batch is emitted.
func (*PointBatcher) In ¶
func (b *PointBatcher) In() chan<- Point
In returns the channel to which points should be written.
func (*PointBatcher) Out ¶
func (b *PointBatcher) Out() <-chan []Point
Out returns the channel from which batches should be read.
func (*PointBatcher) Start ¶
func (b *PointBatcher) Start()
Start starts the batching process. Returns the in and out channels for points and point-batches respectively.
func (*PointBatcher) Stats ¶
func (b *PointBatcher) Stats() *PointBatcherStats
Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be closely correlated with each other statistic, it is not guaranteed.
func (*PointBatcher) Stop ¶
func (b *PointBatcher) Stop()
type PointBatcherStats ¶
type PointBatcherStats struct { BatchTotal uint64 // Total count of batches transmitted. PointTotal uint64 // Total count of points processed. SizeTotal uint64 // Number of batches that reached size threshold. TimeoutTotal uint64 // Number of timeouts that occurred. }
PointBatcherStats are the statistics each batcher tracks.
type QueryExecutor ¶
type QueryExecutor struct { // The meta store for accessing and updating cluster and schema data. MetaStore interface { Database(name string) (*meta.DatabaseInfo, error) Databases() ([]meta.DatabaseInfo, error) User(name string) (*meta.UserInfo, error) AdminUserExists() (bool, error) Authenticate(username, password string) (*meta.UserInfo, error) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) UserCount() (int, error) } // Executes statements relating to meta data. MetaStatementExecutor interface { ExecuteStatement(stmt influxql.Statement) *influxql.Result } // The stats service to report to Stats *influxdb.Stats Logger *log.Logger // contains filtered or unexported fields }
QueryExecutor executes every statement in an influxdb Query. It is responsible for coordinating between the local tsdb.Store, the meta.Store, and the other nodes in the cluster to run the query against their local tsdb.Stores. There should be one executor in a running process
func NewQueryExecutor ¶
func NewQueryExecutor(store *Store) *QueryExecutor
NewQueryExecutor returns an initialized QueryExecutor
func (*QueryExecutor) Authorize ¶
Authorize user u to execute query q on database. database can be "" for queries that do not require a database. If no user is provided it will return an error unless the query's first statement is to create a root user.
func (*QueryExecutor) Begin ¶
func (q *QueryExecutor) Begin() (influxql.Tx, error)
Begin is for influxql/engine.go to use to get a transaction object to start the query
func (*QueryExecutor) DiagnosticsAsRows ¶
func (q *QueryExecutor) DiagnosticsAsRows() []*influxql.Row
DiagnosticsAsRows returns diagnostic information about the server, as a slice of InfluxQL rows.
func (*QueryExecutor) ExecuteQuery ¶
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
ExecuteQuery executes an InfluxQL query against the server. It sends results down the passed in chan and closes it when done. It will close the chan on the first statement that throws an error.
type Series ¶
Series belong to a Measurement and represent unique time series in a database
func (*Series) MarshalBinary ¶
MarshalBinary encodes the object to a binary format.
func (*Series) UnmarshalBinary ¶
UnmarshalBinary decodes the object from a binary format.
type Shard ¶
type Shard struct {
// contains filtered or unexported fields
}
Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.
func NewShard ¶
func NewShard(index *DatabaseIndex, path string) *Shard
NewShard returns a new initialized Shard
func (*Shard) DB ¶
TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying store.
func (*Shard) FieldCodec ¶
func (s *Shard) FieldCodec(measurementName string) *FieldCodec
TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func (*Shard) ValidateAggregateFieldsInStatement ¶
func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error
func (*Shard) WritePoints ¶
WritePoints will write the raw data points and any new metadata to the index in the shard
type Store ¶
func (*Store) CreateShard ¶
func (*Store) DatabaseIndex ¶
func (s *Store) DatabaseIndex(name string) *DatabaseIndex
func (*Store) DeleteDatabase ¶
DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (*Store) DeleteShard ¶
DeleteShard removes a shard from disk.
func (*Store) Measurement ¶
func (s *Store) Measurement(database, name string) *Measurement