Documentation ¶
Overview ¶
Package tsdb implements a durable time series database.
Index ¶
- Constants
- Variables
- func ErrDatabaseNotFound(name string) error
- func ErrMeasurementNotFound(name string) error
- func IsRetryable(err error) bool
- func MarshalTags(tags map[string]string) []byte
- func MeasurementFromSeriesKey(key string) string
- func NewSnapshotWriter(meta []byte, store *Store) (*snapshot.Writer, error)
- func NopWriteToCloser(w io.WriterTo) interface{ ... }
- func ProcessAggregateDerivative(results [][]interface{}, isNonNegative bool, interval time.Duration) [][]interface{}
- func RegisterEngine(name string, fn NewEngineFunc)
- type Config
- type Cursor
- type DatabaseIndex
- func (s *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement
- func (s *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series
- func (db *DatabaseIndex) DropMeasurement(name string)
- func (db *DatabaseIndex) DropSeries(keys []string)
- func (d *DatabaseIndex) Measurement(name string) *Measurement
- func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
- func (db *DatabaseIndex) Measurements() Measurements
- func (d *DatabaseIndex) Names() []string
- func (d *DatabaseIndex) Series(key string) *Series
- func (d *DatabaseIndex) SeriesN() int
- type Engine
- type EngineOptions
- type ErrAuthorize
- type Executor
- type FieldCodec
- func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error)
- func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
- func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
- func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
- func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
- type FieldCreate
- type Fields
- type LocalMapper
- type Mapper
- type MapperOutput
- type MapperValue
- type MapperValues
- type Measurement
- func (m *Measurement) AddSeries(s *Series) bool
- func (m *Measurement) DropSeries(seriesID uint64)
- func (m *Measurement) FieldNames() (a []string)
- func (m *Measurement) HasField(name string) bool
- func (m *Measurement) HasSeries() bool
- func (m *Measurement) HasTagKey(k string) bool
- func (m *Measurement) SeriesKeys() []string
- func (m *Measurement) SetFieldName(name string)
- func (m *Measurement) TagKeys() []string
- func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error)
- func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error
- type MeasurementFields
- type Measurements
- type Monitor
- type NewEngineFunc
- type Point
- type PointBatcher
- type PointBatcherStats
- type QueryExecutor
- func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *influxql.Query, database string) error
- func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
- func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Executor, error)
- type RawQueryDerivativeProcessor
- type Series
- type SeriesCreate
- type SeriesIDs
- func (a SeriesIDs) Equals(other SeriesIDs) bool
- func (a SeriesIDs) Intersect(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Len() int
- func (a SeriesIDs) Less(i, j int) bool
- func (a SeriesIDs) Reject(other SeriesIDs) SeriesIDs
- func (a SeriesIDs) Swap(i, j int)
- func (a SeriesIDs) Union(other SeriesIDs) SeriesIDs
- type Shard
- func (s *Shard) Close() error
- func (s *Shard) DeleteMeasurement(name string, seriesKeys []string) error
- func (s *Shard) DeleteSeries(keys []string) error
- func (s *Shard) FieldCodec(measurementName string) *FieldCodec
- func (s *Shard) Open() error
- func (s *Shard) Path() string
- func (s *Shard) SeriesCount() (int, error)
- func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error
- func (s *Shard) WritePoints(points []Point) error
- type StatefulMapper
- type Store
- func (s *Store) Close() error
- func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mapper, error)
- func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error
- func (s *Store) DatabaseIndex(name string) *DatabaseIndex
- func (s *Store) DatabaseIndexN() int
- func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error
- func (s *Store) DeleteShard(shardID uint64) error
- func (s *Store) Measurement(database, name string) *Measurement
- func (s *Store) Open() error
- func (s *Store) Path() string
- func (s *Store) Shard(id uint64) *Shard
- func (s *Store) ShardIDs() []uint64
- func (s *Store) ShardN() int
- func (s *Store) ValidateAggregateFieldsInStatement(shardID uint64, measurementName string, stmt *influxql.SelectStatement) error
- func (s *Store) WriteToShard(shardID uint64, points []Point) error
- type TagFilter
- type Tags
- type Tx
Constants ¶
const ( // DefaultMaxWALSize is the default size of the WAL before it is flushed. DefaultMaxWALSize = 100 * 1024 * 1024 // 100MB // DefaultWALFlushInterval is the frequency the WAL will get flushed if // it doesn't reach its size threshold. DefaultWALFlushInterval = 10 * time.Minute // DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes. DefaultWALPartitionFlushDelay = 2 * time.Second )
const ( // Return an error if the user is trying to select more than this number of points in a group by statement. // Most likely they specified a group by interval without time boundaries. MaxGroupByPoints = 100000 // Since time is always selected, the column count when selecting only a single other value will be 2 SelectColumnCountWithOneValue = 2 // IgnoredChunkSize is what gets passed into Mapper.Begin for aggregate queries as they don't chunk points out IgnoredChunkSize = 0 )
const DefaultEngine = "v1"
DefaultEngine is the default engine used by the shard when initializing.
Variables ¶
var ( // ErrInvalidQuery is returned when executing an unknown query type. ErrInvalidQuery = errors.New("invalid query") // ErrNotExecuted is returned when a statement is not executed in a query. // This can occur when a previous statement in the same query has errored. ErrNotExecuted = errors.New("not executed") )
var ( // ErrFieldOverflow is returned when too many fields are created on a measurement. ErrFieldOverflow = errors.New("field overflow") // ErrFieldTypeConflict is returned when a new field already exists with a different type. ErrFieldTypeConflict = errors.New("field type conflict") // ErrFieldNotFound is returned when a field cannot be found. ErrFieldNotFound = errors.New("field not found") // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID // there is no mapping for. ErrFieldUnmappedID = errors.New("field ID not mapped") )
var ( // ErrFormatNotFound is returned when no format can be determined from a path. ErrFormatNotFound = errors.New("format not found") )
var (
ErrShardNotFound = fmt.Errorf("shard not found")
)
Functions ¶
func ErrDatabaseNotFound ¶
func ErrMeasurementNotFound ¶
func IsRetryable ¶
IsRetryable returns true if this error is temporary and could be retried
func MarshalTags ¶
used to convert the tag set to bytes for use as a lookup key
func NewSnapshotWriter ¶
NewSnapshotWriter returns a new snapshot.Writer that will write metadata and the store's shards to an archive.
func NopWriteToCloser ¶
NopWriteToCloser returns an io.WriterTo that implements io.Closer.
func ProcessAggregateDerivative ¶
func ProcessAggregateDerivative(results [][]interface{}, isNonNegative bool, interval time.Duration) [][]interface{}
ProcessAggregateDerivative returns the derivatives of an aggregate result set
func RegisterEngine ¶
func RegisterEngine(name string, fn NewEngineFunc)
RegisterEngine registers a storage engine initializer by name.
Types ¶
type Config ¶
type DatabaseIndex ¶
type DatabaseIndex struct {
// contains filtered or unexported fields
}
DatabaseIndex is the in memory index of a collection of measurements, time series, and their tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewDatabaseIndex ¶
func NewDatabaseIndex() *DatabaseIndex
func (*DatabaseIndex) CreateMeasurementIndexIfNotExists ¶
func (s *DatabaseIndex) CreateMeasurementIndexIfNotExists(name string) *Measurement
CreateMeasurementIndexIfNotExists creates or retrieves an in memory index object for the measurement
func (*DatabaseIndex) CreateSeriesIndexIfNotExists ¶
func (s *DatabaseIndex) CreateSeriesIndexIfNotExists(measurementName string, series *Series) *Series
CreateSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object
func (*DatabaseIndex) DropMeasurement ¶
func (db *DatabaseIndex) DropMeasurement(name string)
DropMeasurement removes the measurement and all of its underlying series from the database index
func (*DatabaseIndex) DropSeries ¶
func (db *DatabaseIndex) DropSeries(keys []string)
DropSeries removes the series keys and their tags from the index
func (*DatabaseIndex) Measurement ¶
func (d *DatabaseIndex) Measurement(name string) *Measurement
Measurement returns the measurement object from the index by the name
func (*DatabaseIndex) MeasurementSeriesCounts ¶
func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int)
MeasurementSeriesCounts returns the number of measurements and series currently indexed by the database. Useful for reporting and monitoring.
func (*DatabaseIndex) Measurements ¶
func (db *DatabaseIndex) Measurements() Measurements
Measurements returns a list of all measurements.
func (*DatabaseIndex) Names ¶
func (d *DatabaseIndex) Names() []string
Names returns a sorted list of measurement names.
func (*DatabaseIndex) Series ¶
func (d *DatabaseIndex) Series(key string) *Series
Series returns a series by key.
func (*DatabaseIndex) SeriesN ¶
func (d *DatabaseIndex) SeriesN() int
SeriesN returns the number of series.
type Engine ¶
type Engine interface { Open() error Close() error SetLogOutput(io.Writer) LoadMetadataIndex(index *DatabaseIndex, measurementFields map[string]*MeasurementFields) error Begin(writable bool) (Tx, error) WritePoints(points []Point, measurementFieldsToSave map[string]*MeasurementFields, seriesToCreate []*SeriesCreate) error DeleteSeries(keys []string) error DeleteMeasurement(name string, seriesKeys []string) error SeriesCount() (n int, err error) }
Engine represents a swappable storage engine for the shard.
type EngineOptions ¶
type EngineOptions struct { MaxWALSize int WALFlushInterval time.Duration WALPartitionFlushDelay time.Duration }
EngineOptions represents the options used to initialize the engine.
func NewEngineOptions ¶
func NewEngineOptions() EngineOptions
NewEngineOptions returns the default options.
type ErrAuthorize ¶
type ErrAuthorize struct {
// contains filtered or unexported fields
}
ErrAuthorize represents an authorization error.
func NewErrAuthorize ¶
func NewErrAuthorize(qe *QueryExecutor, q *influxql.Query, u, db, m string) *ErrAuthorize
newAuthorizationError returns a new instance of AuthorizationError.
func (ErrAuthorize) Error ¶
func (e ErrAuthorize) Error() string
Error returns the text of the error.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int) *Executor
NewRawExecutor returns a new RawExecutor.
type FieldCodec ¶
type FieldCodec struct {
// contains filtered or unexported fields
}
FieldCodec provides encoding and decoding functionality for the fields of a given Measurement. It is a distinct type to avoid locking writes on this node while potentially long-running queries are executing.
It is not affected by changes to the Measurement object after codec creation. TODO: this shouldn't be exported. nothing outside the shard should know about field encodings.
However, this is here until tx.go and the engine get refactored into tsdb.
func NewFieldCodec ¶
func NewFieldCodec(fields map[string]*field) *FieldCodec
NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with a RLock that protects the Measurement.
func (*FieldCodec) DecodeByID ¶
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error)
DecodeByID scans a byte slice for a field with the given ID, converts it to its expected type, and return that value. TODO: shouldn't be exported. refactor engine
func (*FieldCodec) DecodeByName ¶
func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error)
DecodeByName scans a byte slice for a field with the given name, converts it to its expected type, and return that value.
func (*FieldCodec) DecodeFields ¶
func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error)
DecodeFields decodes a byte slice into a set of field ids and values.
func (*FieldCodec) DecodeFieldsWithNames ¶
func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error)
DecodeFieldsWithNames decodes a byte slice into a set of field names and values TODO: shouldn't be exported. refactor engine
func (*FieldCodec) EncodeFields ¶
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error)
EncodeFields converts a map of values with string keys to a byte slice of field IDs and values.
If a field exists in the codec, but its type is different, an error is returned. If a field is not present in the codec, the system panics.
func (*FieldCodec) FieldIDByName ¶
func (f *FieldCodec) FieldIDByName(s string) (uint8, error)
TODO: this shouldn't be exported. remove when tx.go and engine.go get refactored into tsdb
type FieldCreate ¶
type FieldCreate struct { Measurement string Field *field }
struct to hold information for a field to create on a measurement
type LocalMapper ¶
type LocalMapper struct {
// contains filtered or unexported fields
}
LocalMapper is for retrieving data for a query, from a given shard.
func NewLocalMapper ¶
func NewLocalMapper(shard *Shard, stmt influxql.Statement, chunkSize int) *LocalMapper
NewLocalMapper returns a mapper for the given shard, which will return data for the SELECT statement.
func (*LocalMapper) NextChunk ¶
func (lm *LocalMapper) NextChunk() (interface{}, error)
func (*LocalMapper) TagSets ¶
func (lm *LocalMapper) TagSets() []string
TagSets returns the list of TagSets for which this mapper has data.
type MapperOutput ¶
type MapperOutput struct { Name string `json:"name,omitempty"` Tags map[string]string `json:"tags,omitempty"` Values []*MapperValue `json:"values,omitempty"` // For aggregates contains a single value at [0] }
type MapperValue ¶
type MapperValue struct { Time int64 `json:"time,omitempty"` // Ignored for aggregate output. Value interface{} `json:"value,omitempty"` // For aggregate, contains interval time multiple values. }
MapperValue is a complex type, which can encapsulate data from both raw and aggregate mappers. This currently allows marshalling and network system to remain simpler. For aggregate output Time is ignored, and actual Time-Value pairs are contained soley within the Value field.
type MapperValues ¶
type MapperValues []*MapperValue
func (MapperValues) Len ¶
func (a MapperValues) Len() int
func (MapperValues) Less ¶
func (a MapperValues) Less(i, j int) bool
func (MapperValues) Swap ¶
func (a MapperValues) Swap(i, j int)
type Measurement ¶
type Measurement struct { Name string `json:"name,omitempty"` // contains filtered or unexported fields }
Measurement represents a collection of time series in a database. It also contains in memory structures for indexing tags. Exported functions are goroutine safe while un-exported functions assume the caller will use the appropriate locks
func NewMeasurement ¶
func NewMeasurement(name string, idx *DatabaseIndex) *Measurement
NewMeasurement allocates and initializes a new Measurement.
func (*Measurement) AddSeries ¶
func (m *Measurement) AddSeries(s *Series) bool
AddSeries will add a series to the measurementIndex. Returns false if already present
func (*Measurement) DropSeries ¶
func (m *Measurement) DropSeries(seriesID uint64)
DropSeries will remove a series from the measurementIndex.
func (*Measurement) FieldNames ¶
func (m *Measurement) FieldNames() (a []string)
FieldNames returns a list of the measurement's field names
func (*Measurement) HasField ¶
func (m *Measurement) HasField(name string) bool
HasField returns true if the measurement has a field by the given name
func (*Measurement) HasSeries ¶
func (m *Measurement) HasSeries() bool
HasSeries returns true if there is at least 1 series under this measurement
func (*Measurement) HasTagKey ¶
func (m *Measurement) HasTagKey(k string) bool
HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (*Measurement) SeriesKeys ¶
func (m *Measurement) SeriesKeys() []string
SeriesKeys returns the keys of every series in this measurement
func (*Measurement) SetFieldName ¶
func (m *Measurement) SetFieldName(name string)
SetFieldName adds the field name to the measurement.
func (*Measurement) TagKeys ¶
func (m *Measurement) TagKeys() []string
TagKeys returns a list of the measurement's tag names.
func (*Measurement) TagSets ¶
func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error)
tagSets returns the unique tag sets that exist for the given tag keys. This is used to determine what composite series will be created by a group by. i.e. "group by region" should return: {"region":"uswest"}, {"region":"useast"} or region, service returns {"region": "uswest", "service": "redis"}, {"region": "uswest", "service": "mysql"}, etc... This will also populate the TagSet objects with the series IDs that match each tagset and any influx filter expression that goes with the series TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (*Measurement) ValidateGroupBy ¶
func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error
ValidateGroupBy ensures that the GROUP BY is not a field.
type MeasurementFields ¶
type MeasurementFields struct { Fields map[string]*field `json:"fields"` Codec *FieldCodec }
func (*MeasurementFields) MarshalBinary ¶
func (m *MeasurementFields) MarshalBinary() ([]byte, error)
MarshalBinary encodes the object to a binary format.
func (*MeasurementFields) UnmarshalBinary ¶
func (m *MeasurementFields) UnmarshalBinary(buf []byte) error
UnmarshalBinary decodes the object from a binary format.
type Measurements ¶
type Measurements []*Measurement
Measurements represents a list of *Measurement.
func (Measurements) Len ¶
func (a Measurements) Len() int
func (Measurements) Less ¶
func (a Measurements) Less(i, j int) bool
func (Measurements) Swap ¶
func (a Measurements) Swap(i, j int)
type Monitor ¶
type Monitor struct {
Store interface{}
}
Monitor represents a TSDB monitoring service.
type NewEngineFunc ¶
type NewEngineFunc func(path string, options EngineOptions) Engine
NewEngineFunc creates a new engine.
type Point ¶
type Point interface { Name() string SetName(string) Tags() Tags AddTag(key, value string) SetTags(tags Tags) Fields() Fields AddField(name string, value interface{}) Time() time.Time SetTime(t time.Time) UnixNano() int64 HashID() uint64 Key() []byte Data() []byte SetData(buf []byte) String() string }
Point defines the values that will be written to the database
func NewPoint ¶
NewPoint returns a new point with the given measurement name, tags, fields and timestamp
func ParsePoints ¶
ParsePoints returns a slice of Points from a text representation of a point with each point separated by newlines.
func ParsePointsString ¶
type PointBatcher ¶
type PointBatcher struct {
// contains filtered or unexported fields
}
PointBatcher accepts Points and will emit a batch of those points when either a) the batch reaches a certain size, or b) a certain time passes.
func NewPointBatcher ¶
func NewPointBatcher(sz int, d time.Duration) *PointBatcher
NewPointBatcher returns a new PointBatcher.
func (*PointBatcher) Flush ¶
func (b *PointBatcher) Flush()
Flush instructs the batcher to emit any pending points in a batch, regardless of batch size. If there are no pending points, no batch is emitted.
func (*PointBatcher) In ¶
func (b *PointBatcher) In() chan<- Point
In returns the channel to which points should be written.
func (*PointBatcher) Out ¶
func (b *PointBatcher) Out() <-chan []Point
Out returns the channel from which batches should be read.
func (*PointBatcher) Start ¶
func (b *PointBatcher) Start()
Start starts the batching process. Returns the in and out channels for points and point-batches respectively.
func (*PointBatcher) Stats ¶
func (b *PointBatcher) Stats() *PointBatcherStats
Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be closely correlated with each other statistic, it is not guaranteed.
func (*PointBatcher) Stop ¶
func (b *PointBatcher) Stop()
type PointBatcherStats ¶
type PointBatcherStats struct { BatchTotal uint64 // Total count of batches transmitted. PointTotal uint64 // Total count of points processed. SizeTotal uint64 // Number of batches that reached size threshold. TimeoutTotal uint64 // Number of timeouts that occurred. }
PointBatcherStats are the statistics each batcher tracks.
type QueryExecutor ¶
type QueryExecutor struct { // The meta store for accessing and updating cluster and schema data. MetaStore interface { Database(name string) (*meta.DatabaseInfo, error) Databases() ([]meta.DatabaseInfo, error) User(name string) (*meta.UserInfo, error) AdminUserExists() (bool, error) Authenticate(username, password string) (*meta.UserInfo, error) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) UserCount() (int, error) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) NodeID() uint64 } // Executes statements relating to meta data. MetaStatementExecutor interface { ExecuteStatement(stmt influxql.Statement) *influxql.Result } // Maps shards for queries. ShardMapper interface { CreateMapper(shard meta.ShardInfo, stmt string, chunkSize int) (Mapper, error) } Logger *log.Logger // the local data store Store *Store }
QueryExecutor executes every statement in an influxdb Query. It is responsible for coordinating between the local tsdb.Store, the meta.Store, and the other nodes in the cluster to run the query against their local tsdb.Stores. There should be one executor in a running process
func NewQueryExecutor ¶
func NewQueryExecutor(store *Store) *QueryExecutor
NewQueryExecutor returns an initialized QueryExecutor
func (*QueryExecutor) Authorize ¶
Authorize user u to execute query q on database. database can be "" for queries that do not require a database. If no user is provided it will return an error unless the query's first statement is to create a root user.
func (*QueryExecutor) ExecuteQuery ¶
func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
ExecuteQuery executes an InfluxQL query against the server. It sends results down the passed in chan and closes it when done. It will close the chan on the first statement that throws an error.
func (*QueryExecutor) Plan ¶
func (q *QueryExecutor) Plan(stmt *influxql.SelectStatement, chunkSize int) (*Executor, error)
Plan creates an execution plan for the given SelectStatement and returns an Executor.
type RawQueryDerivativeProcessor ¶
type RawQueryDerivativeProcessor struct { LastValueFromPreviousChunk *MapperValue IsNonNegative bool // Whether to drop negative differences DerivativeInterval time.Duration }
func (*RawQueryDerivativeProcessor) Process ¶
func (rqdp *RawQueryDerivativeProcessor) Process(input []*MapperValue) []*MapperValue
type Series ¶
Series belong to a Measurement and represent unique time series in a database
func (*Series) MarshalBinary ¶
MarshalBinary encodes the object to a binary format.
func (*Series) UnmarshalBinary ¶
UnmarshalBinary decodes the object from a binary format.
type SeriesCreate ¶
struct to hold information for a series to create
type SeriesIDs ¶
type SeriesIDs []uint64
SeriesIDs is a convenience type for sorting, checking equality, and doing union and intersection of collections of series ids.
func (SeriesIDs) Intersect ¶
Intersect returns a new collection of series ids in sorted order that is the intersection of the two. The two collections must already be sorted.
type Shard ¶
type Shard struct { // The writer used by the logger. LogOutput io.Writer // contains filtered or unexported fields }
Shard represents a self-contained time series database. An inverted index of the measurement and tag data is kept along with the raw time series data. Data can be split across many shards. The query engine in TSDB is responsible for combining the output of many shards into a single query result.
func NewShard ¶
func NewShard(index *DatabaseIndex, path string, options EngineOptions) *Shard
NewShard returns a new initialized Shard
func (*Shard) DeleteMeasurement ¶
DeleteMeasurement deletes a measurement and all underlying series.
func (*Shard) DeleteSeries ¶
DeleteSeries deletes a list of series.
func (*Shard) FieldCodec ¶
func (s *Shard) FieldCodec(measurementName string) *FieldCodec
TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func (*Shard) SeriesCount ¶
SeriesCount returns the number of series buckets on the shard.
func (*Shard) ValidateAggregateFieldsInStatement ¶
func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error
func (*Shard) WritePoints ¶
WritePoints will write the raw data points and any new metadata to the index in the shard
type StatefulMapper ¶
type StatefulMapper struct { Mapper // contains filtered or unexported fields }
StatefulMapper encapsulates a Mapper and some state that the executor needs to track for that mapper.
func (*StatefulMapper) NextChunk ¶
func (sm *StatefulMapper) NextChunk() (*MapperOutput, error)
NextChunk wraps a RawMapper and some state.
type Store ¶
type Store struct { EngineOptions EngineOptions Logger *log.Logger // contains filtered or unexported fields }
func (*Store) CreateMapper ¶
func (*Store) CreateShard ¶
func (*Store) DatabaseIndex ¶
func (s *Store) DatabaseIndex(name string) *DatabaseIndex
func (*Store) DatabaseIndexN ¶
DatabaseIndexN returns the number of databases indicies in the store.
func (*Store) DeleteDatabase ¶
DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (*Store) DeleteShard ¶
DeleteShard removes a shard from disk.
func (*Store) Measurement ¶
func (s *Store) Measurement(database, name string) *Measurement