Versions in this module Expand all Collapse all v0 v0.4.0 Jun 23, 2016 Changes in this version + const DefaultMaxWALSize + const DefaultWALFlushInterval + const DefaultWALPartitionFlushDelay + const IgnoredChunkSize + const MaxGroupByPoints + const SelectColumnCountWithOneValue + const WALPartitionN + var ErrFieldNotFound = errors.New("field not found") + var ErrFieldOverflow = errors.New("field overflow") + var ErrFieldTypeConflict = errors.New("field type conflict") + var ErrFieldUnmappedID = errors.New("field ID not mapped") + var ErrInvalidQuery = errors.New("invalid query") + var ErrNotExecuted = errors.New("not executed") + var ErrShardNotFound = fmt.Errorf("shard not found") + var ErrWALPartitionNotFound = errors.New("wal partition not found") + func ErrDatabaseNotFound(name string) error + func ErrMeasurementNotFound(name string) error + func IsRetryable(err error) bool + func NewSnapshotWriter(meta []byte, store *Store) (*snapshot.Writer, error) + func NopWriteToCloser(w io.WriterTo) interface + func WALPartition(key []byte) uint8 + type AggMapper struct + func NewAggMapper(shard *Shard, stmt *influxql.SelectStatement) *AggMapper + func (am *AggMapper) Close() + func (am *AggMapper) NextChunk() (interface{}, error) + func (am *AggMapper) Open() error + func (am *AggMapper) TagSets() []string + type AggregateExecutor struct + func NewAggregateExecutor(stmt *influxql.SelectStatement, mappers []Mapper) *AggregateExecutor + func (ae *AggregateExecutor) Execute() <-chan *influxql.Row + type Config struct + Dir string + MaxWALSize int + WALFlushInterval toml.Duration + WALPartitionFlushDelay toml.Duration + func NewConfig() Config + type DatabaseIndex struct + func NewDatabaseIndex() *DatabaseIndex + func (d *DatabaseIndex) Measurement(name string) *Measurement + func (d *DatabaseIndex) MeasurementSeriesCounts() (nMeasurements int, nSeries int) + func (db *DatabaseIndex) DropMeasurement(name string) + func (db *DatabaseIndex) DropSeries(keys []string) + func (db *DatabaseIndex) Measurements() Measurements + type ErrAuthorize struct + func NewErrAuthorize(qe *QueryExecutor, q *influxql.Query, u, db, m string) *ErrAuthorize + func (e ErrAuthorize) Error() string + type Executor interface + Execute func() <-chan *influxql.Row + type FieldCodec struct + func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) + func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) + func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) + func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error) + func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) + func (f *FieldCodec) FieldIDByName(s string) (uint8, error) + type Fields map[string]interface + func (p Fields) MarshalBinary() []byte + type Mapper interface + Close func() + NextChunk func() (interface{}, error) + Open func() error + TagSets func() []string + type MapperResponse struct + Data []byte + TagSets []string + type Measurement struct + Name string + func NewMeasurement(name string, idx *DatabaseIndex) *Measurement + func (m *Measurement) AddSeries(s *Series) bool + func (m *Measurement) DropSeries(seriesID uint64) + func (m *Measurement) FieldNames() (a []string) + func (m *Measurement) HasField(name string) bool + func (m *Measurement) HasSeries() bool + func (m *Measurement) HasTagKey(k string) bool + func (m *Measurement) SeriesKeys() []string + func (m *Measurement) TagKeys() []string + func (m *Measurement) TagSets(stmt *influxql.SelectStatement, dimensions []string) ([]*influxql.TagSet, error) + func (m *Measurement) ValidateGroupBy(stmt *influxql.SelectStatement) error + type Measurements []*Measurement + func (a Measurements) Len() int + func (a Measurements) Less(i, j int) bool + func (a Measurements) Swap(i, j int) + type Monitor struct + Store interface{} + func (m *Monitor) Close() error + func (m *Monitor) Open() error + type Point interface + AddField func(name string, value interface{}) + AddTag func(key, value string) + Data func() []byte + Fields func() Fields + HashID func() uint64 + Key func() []byte + Name func() string + SetData func(buf []byte) + SetName func(string) + SetTags func(tags Tags) + SetTime func(t time.Time) + String func() string + Tags func() Tags + Time func() time.Time + UnixNano func() int64 + func NewPoint(name string, tags Tags, fields Fields, time time.Time) Point + func ParsePoints(buf []byte) ([]Point, error) + func ParsePointsString(buf string) ([]Point, error) + func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) + type PointBatcher struct + func NewPointBatcher(sz int, d time.Duration) *PointBatcher + func (b *PointBatcher) Flush() + func (b *PointBatcher) In() chan<- Point + func (b *PointBatcher) Out() <-chan []Point + func (b *PointBatcher) Start() + func (b *PointBatcher) Stats() *PointBatcherStats + func (b *PointBatcher) Stop() + type PointBatcherStats struct + BatchTotal uint64 + PointTotal uint64 + SizeTotal uint64 + TimeoutTotal uint64 + type QueryExecutor struct + Logger *log.Logger + MetaStatementExecutor interface{ ... } + MetaStore interface{ ... } + ShardMapper interface{ ... } + func NewQueryExecutor(store *Store) *QueryExecutor + func (q *QueryExecutor) Authorize(u *meta.UserInfo, query *influxql.Query, database string) error + func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) + type RawExecutor struct + func NewRawExecutor(stmt *influxql.SelectStatement, mappers []Mapper, chunkSize int) *RawExecutor + func (re *RawExecutor) Execute() <-chan *influxql.Row + type RawMapper struct + func NewRawMapper(shard *Shard, stmt *influxql.SelectStatement, chunkSize int) *RawMapper + func (rm *RawMapper) Close() + func (rm *RawMapper) NextChunk() (interface{}, error) + func (rm *RawMapper) Open() error + func (rm *RawMapper) TagSets() []string + type Series struct + Key string + Tags map[string]string + func (s *Series) MarshalBinary() ([]byte, error) + func (s *Series) UnmarshalBinary(buf []byte) error + type Shard struct + LogOutput io.Writer + MaxWALSize int + WALFlushInterval time.Duration + WALPartitionFlushDelay time.Duration + func NewShard(index *DatabaseIndex, path string) *Shard + func (s *Shard) Close() error + func (s *Shard) DB() *bolt.DB + func (s *Shard) FieldCodec(measurementName string) *FieldCodec + func (s *Shard) Flush(partitionFlushDelay time.Duration) error + func (s *Shard) FlushPartition(partitionID uint8) error + func (s *Shard) Open() error + func (s *Shard) Path() string + func (s *Shard) SeriesCount() (n int, err error) + func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt *influxql.SelectStatement) error + func (s *Shard) WritePoints(points []Point) error + type StatefulMapper struct + func (srm *StatefulMapper) NextChunk() (*mapperOutput, error) + type Store struct + Logger *log.Logger + MaxWALSize int + WALFlushInterval time.Duration + WALPartitionFlushDelay time.Duration + func NewStore(path string) *Store + func (s *Store) Close() error + func (s *Store) CreateMapper(shardID uint64, query string, chunkSize int) (Mapper, error) + func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) error + func (s *Store) DatabaseIndex(name string) *DatabaseIndex + func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error + func (s *Store) DeleteShard(shardID uint64) error + func (s *Store) Flush() error + func (s *Store) Measurement(database, name string) *Measurement + func (s *Store) Open() error + func (s *Store) Path() string + func (s *Store) Shard(shardID uint64) *Shard + func (s *Store) ShardIDs() []uint64 + func (s *Store) ValidateAggregateFieldsInStatement(shardID uint64, measurementName string, stmt *influxql.SelectStatement) error + func (s *Store) WriteToShard(shardID uint64, points []Point) error + type TagFilter struct + Key string + Op influxql.Token + Regex *regexp.Regexp + Value string + type Tags map[string]string