README ¶
Line Protocol
The line protocol is a text based format for writing points to InfluxDB. Each line defines a single point.
Multiple lines must be separated by the newline character \n
. The format of the line consists of three parts:
[key] [fields] [timestamp]
Each section is separated by spaces. The minimum required point consists of a measurement name and at least one field. Points without a specified timestamp will be written using the server's local timestamp. Timestamps are assumed to be in nanoseconds unless a precision
value is passed in the query string.
Key
The key is the measurement name and any optional tags separated by commas. Measurement names, tag keys, and tag values must escape any spaces or commas using a backslash (\
). For example: \
and \,
. All tag values are stored as strings and should not be surrounded in quotes.
Tags should be sorted by key before being sent for best performance. The sort should match that from the Go bytes.Compare
function (http://golang.org/pkg/bytes/#Compare).
Examples
# measurement only
cpu
# measurment and tags
cpu,host=serverA,region=us-west
# measurment with commas
cpu\,01,host=serverA,region=us-west
# tag value with spaces
cpu,host=server\ A,region=us\ west
Fields
Fields are key-value metrics associated with the measurement. Every line must have at least one field. Multiple fields must be separated with commas and not spaces.
Field keys are always strings and follow the same syntactical rules as described above for tag keys and values. Field values can be one of four types. The first value written for a given field on a given measurement defines the type of that field for all series under that measurement.
- integer - Numeric values that do not include a decimal. (e.g. 1, 345, 2015, -10)
- float - Numeric values that include a decimal. (e.g. 1.0, -3.14, 6.0+e5). Note that all values must have a decimal even if the decimal value is zero (1 is an integer, 1.0 is a float).
- boolean - A value indicating true or false. Valid boolean strings are (t, T, true, TRUE, f, F, false, and FALSE).
- string - A text value. All string values must be surrounded in double-quotes
"
. If the string contains a double-quote, it must be escaped with a backslash, e.g.\"
.
# integer value
cpu value=1
# float value
cpu_load value=1.2
# boolean value
error fatal=true
# string value
event msg="logged out"
# multiple values
cpu load=10.0,alert=true,reason="value above maximum threshold"
Timestamp
The timestamp section is optional but should be specified if possible. The value is an integer representing nanoseconds since the epoch. If the timestamp is not provided the point will inherit the server's local timestamp.
Some write APIs allow passing a lower precision. If the API supports a lower precision, the timestamp may also be an integer epoch in microseconds, milliseconds, seconds, minutes or hours.
Full Example
A full example is shown below.
cpu,host=server01,region=uswest value=1.0 1434055562000000000
cpu,host=server02,region=uswest value=3.0 1434055562000010000
In this example the first line shows a measurement
of "cpu", there are two tags "host" and "region, the value
is 1.0, and the timestamp
is 1434055562000000000. Following this is a second line, also a point in the measurement
"cpu" but belonging to a different "host".
cpu,host=server\ 01,region=uswest value=1.0,msg="all systems nominal"
cpu,host=server\ 01,region=us\,west value_int=1
In these examples, the "host" is set to server 01
. The field value associated with field key msg
is double-quoted, as it is a string. The second example shows a region of us,west
with the comma properly escaped. In the first example value
is written as a floating point number. In the second, value_int
is an integer.
Documentation ¶
Overview ¶
Package tsdb implements a durable time series database.
Index ¶
- Constants
- Variables
- func ErrDatabaseNotFound(name string) error
- func ErrMeasurementNotFound(name string) error
- func IsRetryable(err error) bool
- func NewSnapshotWriter(meta []byte, store *Store) (*snapshot.Writer, error)
- func NopWriteToCloser(w io.WriterTo) interface{ ... }
- func WALPartition(key []byte) uint8
- type Config
- type DatabaseIndex
- func (db *DatabaseIndex) DropMeasurement(name string)
- func (db *DatabaseIndex) DropSeries(keys []string)
- func (d *DatabaseIndex) Measurement(name string) *Measurement
- func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
- func (db *DatabaseIndex) Measurements() Measurements
- type ErrAuthorize
- type FieldCodec
- func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
- func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
- func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
- func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
- type Fields
- type LocalMapper
- func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int) error
- func (l *LocalMapper) Close()
- func (l *LocalMapper) IsEmpty(tmax int64) bool
- func (l *LocalMapper) Next() (seriesKey string, timestamp int64, value interface{})
- func (l *LocalMapper) NextInterval() (interface{}, error)
- func (l *LocalMapper) Open() error
- type Measurement
- func (m *Measurement) AddSeries(s *Series) bool
- func (m *Measurement) DropSeries(seriesID uint64)
- func (m *Measurement) FieldNames() (a []string)
- func (m *Measurement) HasField(name string) bool
- func (m *Measurement) HasSeries() bool
- func (m *Measurement) HasTagKey(k string) bool
- func (m *Measurement) SeriesKeys() []string
- func (m *Measurement) TagKeys() []string
- func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error)
- type Measurements
- type Monitor
- type Point
- type PointBatcher
- type PointBatcherStats
- type QueryExecutor
- type Series
- type Shard
- func (s *Shard) Close() error
- func (s *Shard) DB() *bolt.DB
- func (s *Shard) FieldCodec(measurementName string) *FieldCodec
- func (s *Shard) Flush() error
- func (s *Shard) FlushPartition(partitionID uint8) error
- func (s *Shard) Open() error
- func (s *Shard) Path() string
- func (s *Shard) SeriesCount() (n int, err error)
- func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error
- func (s *Shard) WritePoints(points []Point) error
- type Store
- func (s *Store) Close() error
- func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error
- func (s *Store) DatabaseIndex(name string) *DatabaseIndex
- func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error
- func (s *Store) DeleteShard(shardID uint64) error
- func (s *Store) Flush() error
- func (s *Store) Measurement(database, name string) *Measurement
- func (s *Store) Open() error
- func (s *Store) Path() string
- func (s *Store) Shard(shardID uint64) *Shard
- func (s *Store) ShardIDs() []uint64
- func (s *Store) ValidateAggregateFieldsInStatement(shardID uint64, measurementName string, stmt *influxql.SelectStatement) error
- func (s *Store) WriteToShard(shardID uint64, points []Point) error
- type TagFilter
- type Tags
Constants ¶
const ( // DefaultRetentionAutoCreate is the default for auto-creating retention policies DefaultRetentionAutoCreate = true // DefaultRetentionCheckEnabled is the default for checking for retention policy enforcement DefaultRetentionCheckEnabled = true // DefaultRetentionCreatePeriod represents how often the server will check to see if new // shard groups need to be created in advance for writing DefaultRetentionCreatePeriod = 45 * time.Minute // DefaultRetentionCheckPeriod is the period of time between retention policy checks are run DefaultRetentionCheckPeriod = 10 * time.Minute // DefaultMaxWALSize is the default size of the WAL before it is flushed. DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB // DefaultWALFlushInterval is the frequency the WAL will get flushed if // it doesn't reach its size threshold. DefaultWALFlushInterval = 10 * time.Minute // DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes. DefaultWALPartitionFlushDelay = 2 * time.Second )
const WALPartitionN = 8
WALPartitionN is the number of partitions in the write ahead log.
Variables ¶
var ( // ErrInvalidQuery is returned when executing an unknown query type. ErrInvalidQuery = errors.New("invalid query") // ErrNotExecuted is returned when a statement is not executed in a query. // This can occur when a previous statement in the same query has errored. ErrNotExecuted = errors.New("not executed") )
var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") // ErrWALPartitionNotFound is returns when flushing a WAL partition that // does not exist. ErrWALPartitionNotFound = errors.New("wal partition not found") )
var (
ErrShardNotFound = fmt.Errorf("shard not found")
)
Functions ¶
func ErrDatabaseNotFound ¶
func ErrMeasurementNotFound ¶
func IsRetryable ¶
IsRetryable returns true if this error is temporary and could be retried
func NewSnapshotWriter ¶
NewSnapshotWriter returns a new snapshot.Writer that will write metadata and the store's shards to an archive.
func NopWriteToCloser ¶
NopWriteToCloser returns an io.WriterTo that implements io.Closer.
func WALPartition ¶ added in v0.9.1
WALPartition returns the partition number that key belongs to.
Types ¶
type Config ¶
type Config struct { Dir string `toml:"dir"` MaxWALSize int `toml:"max-wal-size"` WALFlushInterval toml.Duration `toml:"wal-flush-interval"` WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"` RetentionAutoCreate bool `toml:"retention-auto-create"` RetentionCheckEnabled bool `toml:"retention-check-enabled"` RetentionCheckPeriod toml.Duration `toml:"retention-check-period"` RetentionCreatePeriod toml.Duration `toml:"retention-create-period"` }
func (*Config) ShardGroupPreCreateCheckPeriod ¶
ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups. If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod
type DatabaseIndex ¶
type DatabaseIndex struct {
// contains filtered or unexported fields
}
DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewDatabaseIndex ¶
func NewDatabaseIndex() *DatabaseIndex
func (*DatabaseIndex) DropMeasurement ¶
func (db *DatabaseIndex) DropMeasurement(name string)
DropMeasurement removes the measurement and all of its underlying series from the database index
func (*DatabaseIndex) DropSeries ¶
func (db *DatabaseIndex) DropSeries(keys []string)
DropSeries removes the series keys and their tags from the index
func (*DatabaseIndex) Measurement ¶
func (d *DatabaseIndex) Measurement(name string) *Measurement
Measurement returns the measurement object from the index by the name
func (*DatabaseIndex) MeasurementSeriesCounts ¶
func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database. Useful for reporting and monitoring.
func (*DatabaseIndex) Measurements ¶
func (db *DatabaseIndex) Measurements() Measurements
Measurements returns a list of all measurements.
type ErrAuthorize ¶
type ErrAuthorize struct {
// contains filtered or unexported fields
}
ErrAuthorize represents an authorization error.
func (ErrAuthorize) Error ¶
func (e ErrAuthorize) Error() string
Error returns the text of the error.
type FieldCodec ¶
type FieldCodec struct {
// contains filtered or unexported fields
}
FieldCodec providecs encoding and decoding functionality for the fields of a given Measurement. It is a distinct type to avoid locking writes on this node while potentially long-running queries are executing.
It is not affected by changes to the Measurement object after codec creation. TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
However, this is here until tx.go and the engine get refactored into tsdb.
func (*FieldCodec) DecodeByID ¶
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
DecodeByID scans a byte slice for a field with the given ID, converts it to its expected type, and return that value. TODO: shouldn't be exported. refactor engine
func (*FieldCodec) DecodeFields ¶
func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
DecodeFields decodes a byte slice into a set of field ids and values.
func (*FieldCodec) DecodeFieldsWithNames ¶
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
DecodeFieldsWithNames decodes a byte slice into a set of field names and values TODO: shouldn't be exported. refactor engine
func (*FieldCodec) EncodeFields ¶
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
EncodeFields converts a map of values with string keys to a byte slice of field IDs and values.
If a field exists in the codec, but its type is different, an error is returned. If a field is not present in the codec, the system panics.
func (*FieldCodec) FieldIDByName ¶
func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
type LocalMapper ¶
type LocalMapper struct {
// contains filtered or unexported fields
}
LocalMapper implements the influxql.Mapper interface for running map tasks over a shard that is local to this server
func (*LocalMapper) Begin ¶
Begin will set up the mapper to run the map function for a given aggregate call starting at the passed in time
func (*LocalMapper) IsEmpty ¶
func (l *LocalMapper) IsEmpty(tmax int64) bool
IsEmpty returns true if either all cursors are nil or all cursors are past the passed in max time
func (*LocalMapper) Next ¶
func (l *LocalMapper) Next() (seriesKey string, timestamp int64, value interface{})
Next returns the next matching timestamped value for the LocalMapper.
func (*LocalMapper) NextInterval ¶
func (l *LocalMapper) NextInterval() (interface{}, error)
NextInterval will get the time ordered next interval of the given interval size from the mapper. This is a forward only operation from the start time passed into Begin. Will return nil when there is no more data to be read. If this is a raw query, interval should be the max time to hit in the query
type Measurement ¶
type Measurement struct { Name string `json:"name,omitempty"` // contains filtered or unexported fields }
Measurement represents a collection of time series in a database. It also contains in memory structures for indexing tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewMeasurement ¶
func NewMeasurement(name string, idx *DatabaseIndex) *Measurement
NewMeasurement allocates and initializes a new Measurement.
func (*Measurement) AddSeries ¶
func (m *Measurement) AddSeries(s *Series) bool
AddSeries will add a series to the measurementIndex. Returns false if already present
func (*Measurement) DropSeries ¶
func (m *Measurement) DropSeries(seriesID uint64)
DropSeries will remove a series from the measurementIndex.
func (*Measurement) FieldNames ¶
func (m *Measurement) FieldNames() (a []string)
FieldNames returns a list of the measurement's field names
func (*Measurement) HasField ¶
func (m *Measurement) HasField(name string) bool
HasField returns true if the measurement has a field by the given name
func (*Measurement) HasSeries ¶
func (m *Measurement) HasSeries() bool
HasSeries returns true if there is at least 1 series under this measurement
func (*Measurement) HasTagKey ¶
func (m *Measurement) HasTagKey(k string) bool
HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (*Measurement) SeriesKeys ¶
func (m *Measurement) SeriesKeys() []string
SeriesKeys returns the keys of every series in this measurement
func (*Measurement) TagKeys ¶
func (m *Measurement) TagKeys() []string
TagKeys returns a list of the measurement's tag names.
func (*Measurement) TagSets ¶
func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error)
tagSets returns the unique tag sets that exist for the given tag keys. This is used to determine what composite series will be created by a group by. i.e. "group by region" should return: {"region":"uswest"}, {"region":"useast"} or region, service returns {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... This will also populate the TagSet objects with the series IDs that match each tagset and any influx filter expression that goes with the series TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
type Measurements ¶
type Measurements []*Measurement
Measurements represents a list of *Measurement.
func (Measurements) Len ¶
func (a Measurements) Len() int
func (Measurements) Less ¶
func (a Measurements) Less(i, j int) bool
func (Measurements) Swap ¶
func (a Measurements) Swap(i, j int)
type Monitor ¶
type Monitor struct {
Store interface{}
}
Monitor represents a TSDB monitoring service.
type Point ¶
type Point interface { Name() string SetName(string) Tags() Tags AddTag(key, value string) SetTags(tags Tags) Fields() Fields AddField(name string, value interface{}) Time() time.Time SetTime(t time.Time) UnixNano() int64 HashID() uint64 Key() []byte Data() []byte SetData(buf []byte) String() string }
Point defines the values that will be written to the database
func NewPoint ¶
NewPoint returns a new point with the given measurement name, tags, fiels and timestamp
func ParsePoints ¶
ParsePoints returns a slice of Points from a text representation of a point with each point separated by newlines.
func ParsePointsString ¶
type PointBatcher ¶
type PointBatcher struct {
// contains filtered or unexported fields
}
PointBatcher accepts Points and will emit a batch of those points when either a) the batch reaches a certain size, or b) a certain time passes.
func NewPointBatcher ¶
func NewPointBatcher(sz int, d time.Duration) *PointBatcher
NewPointBatcher returns a new PointBatcher.
func (*PointBatcher) Flush ¶
func (b *PointBatcher) Flush()
Flush instructs the batcher to emit any pending points in a batch, regardless of batch size. If there are no pending points, no batch is emitted.
func (*PointBatcher) In ¶
func (b *PointBatcher) In() chan<- Point
In returns the channel to which points should be written.
func (*PointBatcher) Out ¶
func (b *PointBatcher) Out() <-chan []Point
Out returns the channel from which batches should be read.
func (*PointBatcher) Start ¶
func (b *PointBatcher) Start()
Start starts the batching process. Returns the in and out channels for points and point-batches respectively.
func (*PointBatcher) Stats ¶
func (b *PointBatcher) Stats() *PointBatcherStats
Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be closely correlated with each other statistic, it is not guaranteed.
func (*PointBatcher) Stop ¶
func (b *PointBatcher) Stop()
type PointBatcherStats ¶
type PointBatcherStats struct { BatchTotal uint64 // Total count of batches transmitted. PointTotal uint64 // Total count of points processed. SizeTotal uint64 // Number of batches that reached size threshold. TimeoutTotal uint64 // Number of timeouts that occurred. }
PointBatcherStats are the statistics each batcher tracks.
type QueryExecutor ¶
type QueryExecutor struct { // The meta store for accessing and updating cluster and schema data. MetaStore interface { Database(name string) (*meta.DatabaseInfo, error) Databases() ([]meta.DatabaseInfo, error) User(name string) (*meta.UserInfo, error) AdminUserExists() (bool, error) Authenticate(username, password string) (*meta.UserInfo, error) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) UserCount() (int, error) } // Executes statements relating to meta data. MetaStatementExecutor interface { ExecuteStatement(stmt influxql.Statement) *influxql.Result } Logger *log.Logger // contains filtered or unexported fields }
QueryExecutor executes every statement in an influxdb Query. It is responsible for coordinating between the local tsdb.Store, the meta.Store, and the other nodes in the cluster to run the query against their local tsdb.Stores. There should be one executor in a running process
func NewQueryExecutor ¶
func NewQueryExecutor(store *Store) *QueryExecutor
NewQueryExecutor returns an initialized QueryExecutor
func (*QueryExecutor) Authorize ¶
Authorize user u to execute query q on database. database can be "" for queries that do not require a database. If no user is provided it will return an error unless the query's first statement is to create a root user.
func (*QueryExecutor) Begin ¶
func (q *QueryExecutor) Begin() (influxql.Tx, error)
Begin is for influxql/engine.go to use to get a transaction object to start the query
func (*QueryExecutor) ExecuteQuery ¶
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
ExecuteQuery executes an InfluxQL query against the server. It sends results down the passed in chan and closes it when done. It will close the chan on the first statement that throws an error.
type Series ¶
Series belong to a Measurement and represent unique time series in a database
func (*Series) MarshalBinary ¶
MarshalBinary encodes the object to a binary format.
func (*Series) UnmarshalBinary ¶
UnmarshalBinary decodes the object from a binary format.
type Shard ¶
type Shard struct { // The maximum size and time thresholds for flushing the WAL. MaxWALSize int WALFlushInterval time.Duration WALPartitionFlushDelay time.Duration // The writer used by the logger. LogOutput io.Writer // contains filtered or unexported fields }
Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.
func NewShard ¶
func NewShard(index *DatabaseIndex, path string) *Shard
NewShard returns a new initialized Shard
func (*Shard) DB ¶
TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying store.
func (*Shard) FieldCodec ¶
func (s *Shard) FieldCodec(measurementName string) *FieldCodec
TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func (*Shard) Flush ¶ added in v0.9.1
Flush writes all points from the write ahead log to the index.
func (*Shard) FlushPartition ¶ added in v0.9.1
FlushPartition flushes a single WAL partition.
func (*Shard) SeriesCount ¶ added in v0.9.1
SeriesCount returns the number of series buckets on the shard. This does not include a count from the WAL.
func (*Shard) ValidateAggregateFieldsInStatement ¶
func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error
func (*Shard) WritePoints ¶
WritePoints will write the raw data points and any new metadata to the index in the shard
type Store ¶
type Store struct { MaxWALSize int WALFlushInterval time.Duration WALPartitionFlushDelay time.Duration Logger *log.Logger // contains filtered or unexported fields }
func (*Store) CreateShard ¶
func (*Store) DatabaseIndex ¶
func (s *Store) DatabaseIndex(name string) *DatabaseIndex
func (*Store) DeleteDatabase ¶
DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (*Store) DeleteShard ¶
DeleteShard removes a shard from disk.
func (*Store) Measurement ¶
func (s *Store) Measurement(database, name string) *Measurement