Documentation ¶
Index ¶
- Constants
- Variables
- func RetentionPrometheusCollectors() []prometheus.Collector
- type BucketDeleter
- type BucketFinder
- type BucketService
- func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error
- func (s *BucketService) DeleteBucket(ctx context.Context, bucketID influxdb.ID) error
- func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error)
- func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error)
- func (s *BucketService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error)
- func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
- func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error)
- type BufferedPointsWriter
- type Config
- type Deleter
- type Engine
- func (e *Engine) AcquireSegments(ctx context.Context, fn func(segs []string) error) error
- func (e *Engine) Close() error
- func (e *Engine) CommitSegments(ctx context.Context, segs []string, fn func() error) error
- func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error)
- func (e *Engine) CreateCursorIterator(ctx context.Context) (cursors.CursorIterator, error)
- func (e *Engine) CreateSeriesCursor(ctx context.Context, orgID, bucketID influxdb.ID, cond influxql.Expr) (SeriesCursor, error)
- func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID) error
- func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error
- func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, ...) error
- func (e *Engine) DisableCompactions()
- func (e *Engine) EnableCompactions()
- func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error
- func (e *Engine) InternalBackupPath(backupID int) string
- func (e *Engine) MeasurementCardinalityStats() (tsi1.MeasurementCardinalityStats, error)
- func (e *Engine) MeasurementFields(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, ...) (cursors.MeasurementFieldsIterator, error)
- func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, ...) (cursors.StringIterator, error)
- func (e *Engine) MeasurementStats() (tsm1.MeasurementStats, error)
- func (e *Engine) MeasurementTagKeys(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, ...) (cursors.StringIterator, error)
- func (e *Engine) MeasurementTagValues(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, ...) (cursors.StringIterator, error)
- func (e *Engine) Open(ctx context.Context) (err error)
- func (e *Engine) Path() string
- func (e *Engine) PrometheusCollectors() []prometheus.Collector
- func (e *Engine) SeriesCardinality() int64
- func (e *Engine) TagKeys(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, ...) (cursors.StringIterator, error)
- func (e *Engine) TagValues(ctx context.Context, orgID, bucketID influxdb.ID, tagKey string, ...) (cursors.StringIterator, error)
- func (e *Engine) WithLogger(log *zap.Logger)
- func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error
- type LoggingPointsWriter
- type Option
- func WithCompactionLimiter(limiter limiter.Fixed) Option
- func WithCompactionPlanner(planner tsm1.CompactionPlanner) Option
- func WithCompactionSemaphore(s influxdb.Semaphore) Option
- func WithCurrentGenerationFunc(fn func() int) Option
- func WithEngineID(id int) Option
- func WithFileStoreObserver(obs tsm1.FileStoreObserver) Option
- func WithNodeID(id int) Option
- func WithRetentionEnforcer(finder BucketFinder) Option
- func WithRetentionEnforcerLimiter(f runnable) Option
- func WithTSMFilenameFormatter(fn tsm1.FormatFileNameFunc) Option
- func WithWritePointsValidationEnabled(v bool) Option
- type PointsWriter
- type SeriesCursor
- type SeriesCursorRow
- type Snapshotter
Constants ¶
const ( DefaultRetentionInterval = time.Hour DefaultSeriesFileDirectoryName = "_series" DefaultIndexDirectoryName = "index" DefaultWALDirectoryName = "wal" DefaultEngineDirectoryName = "data" )
Default configuration values.
Variables ¶
var ErrEngineClosed = errors.New("engine is closed")
ErrEngineClosed is returned when a caller attempts to use the engine while it's closed.
var ErrServiceClosed = errors.New("service is currently closed")
ErrServiceClosed is returned when the service is unavailable.
Functions ¶
func RetentionPrometheusCollectors ¶
func RetentionPrometheusCollectors() []prometheus.Collector
RetentionPrometheusCollectors returns all prometheus metrics for retention.
Types ¶
type BucketDeleter ¶
BucketDeleter defines the behaviour of deleting a bucket.
type BucketFinder ¶
type BucketFinder interface {
FindBuckets(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
}
A BucketFinder is responsible for providing access to buckets via a filter.
type BucketService ¶
type BucketService struct {
// contains filtered or unexported fields
}
BucketService wraps an existing influxdb.BucketService implementation.
BucketService ensures that when a bucket is deleted, all stored data associated with the bucket is either removed, or marked to be removed via a future compaction.
func NewBucketService ¶
func NewBucketService(s influxdb.BucketService, engine BucketDeleter) *BucketService
NewBucketService returns a new BucketService for the provided BucketDeleter, which typically will be an Engine.
func (*BucketService) CreateBucket ¶
func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error
CreateBucket creates a new bucket and sets b.ID with the new identifier.
func (*BucketService) DeleteBucket ¶
func (s *BucketService) DeleteBucket(ctx context.Context, bucketID influxdb.ID) error
DeleteBucket removes a bucket by ID.
func (*BucketService) FindBucket ¶
func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error)
FindBucket returns the first bucket that matches filter.
func (*BucketService) FindBucketByID ¶
func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error)
FindBucketByID returns a single bucket by ID.
func (*BucketService) FindBucketByName ¶
func (s *BucketService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error)
FindBucketByName returns a single bucket by name.
func (*BucketService) FindBuckets ¶
func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
FindBuckets returns a list of buckets that match filter and the total count of matching buckets. Additional options provide pagination & sorting.
func (*BucketService) UpdateBucket ¶
func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error)
UpdateBucket updates a single bucket with changeset. Returns the new bucket state after update.
type BufferedPointsWriter ¶
type BufferedPointsWriter struct {
// contains filtered or unexported fields
}
func NewBufferedPointsWriter ¶
func NewBufferedPointsWriter(size int, pointswriter PointsWriter) *BufferedPointsWriter
func (*BufferedPointsWriter) Available ¶
func (b *BufferedPointsWriter) Available() int
Available returns how many models.Points are unused in the buffer.
func (*BufferedPointsWriter) Buffered ¶
func (b *BufferedPointsWriter) Buffered() int
Buffered returns the number of models.Points that have been written into the current buffer.
func (*BufferedPointsWriter) Flush ¶
func (b *BufferedPointsWriter) Flush(ctx context.Context) error
Flush writes any buffered data to the underlying PointsWriter.
func (*BufferedPointsWriter) WritePoints ¶
WritePoints writes the points to the underlying PointsWriter.
type Config ¶
type Config struct { // Frequency of retention in seconds. RetentionInterval toml.Duration `toml:"retention-interval"` // Series file config. SeriesFilePath string `toml:"series-file-path"` // Overrides the default path. // Series file config. SeriesFile seriesfile.Config `toml:"tsdb"` // WAL config. WAL tsm1.WALConfig `toml:"wal"` WALPath string `toml:"wal-path"` // Overrides the default path. // Engine config. Engine tsm1.Config `toml:"engine"` EnginePath string `toml:"engine-path"` // Overrides the default path. // Index config. Index tsi1.Config `toml:"index"` IndexPath string `toml:"index-path"` // Overrides the default path. }
Config holds the configuration for an Engine.
func (Config) GetEnginePath ¶
GetEnginePath returns the path to the engine.
func (Config) GetIndexPath ¶
GetIndexPath returns the path to the index.
func (Config) GetSeriesFilePath ¶
GetSeriesFilePath returns the path to the series file.
func (Config) GetWALPath ¶
GetWALPath returns the path to the WAL.
type Deleter ¶
type Deleter interface {
DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error
}
A Deleter implementation is capable of deleting data from a storage engine.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func NewEngine ¶
NewEngine initialises a new storage engine, including a series file, index and TSM engine.
func (*Engine) AcquireSegments ¶
AcquireSegments closes the current WAL segment, gets the set of all the currently closed segments, and calls the callback. It does all of this under the lock on the engine.
func (*Engine) Close ¶
Close closes the store and all underlying resources. It returns an error if any of the underlying systems fail to close.
func (*Engine) CommitSegments ¶
CommitSegments calls the callback and if that does not return an error, removes the segment files from the WAL. It does all of this under the lock on the engine.
func (*Engine) CreateBackup ¶
CreateBackup creates a "snapshot" of all TSM data in the Engine.
- Snapshot the cache to ensure the backup includes all data written before now.
- Create hard links to all TSM files, in a new directory within the engine root directory.
- Return a unique backup ID (invalid after the process terminates) and list of files.
func (*Engine) CreateCursorIterator ¶
CreateCursorIterator creates a CursorIterator for usage with the read service.
func (*Engine) CreateSeriesCursor ¶
func (e *Engine) CreateSeriesCursor(ctx context.Context, orgID, bucketID influxdb.ID, cond influxql.Expr) (SeriesCursor, error)
CreateSeriesCursor creates a SeriesCursor for usage with the read service.
func (*Engine) DeleteBucket ¶
DeleteBucket deletes an entire bucket from the storage engine.
func (*Engine) DeleteBucketRange ¶
func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error
DeleteBucketRange deletes an entire bucket from the storage engine.
func (*Engine) DeleteBucketRangePredicate ¶
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error
DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data deleted must be in [min, max], and the key must match the predicate if provided.
func (*Engine) DisableCompactions ¶
func (e *Engine) DisableCompactions()
DisableCompactions disables compactions in the series file, index, & engine.
func (*Engine) EnableCompactions ¶
func (e *Engine) EnableCompactions()
EnableCompactions allows the series file, index, & underlying engine to compact.
func (*Engine) FetchBackupFile ¶
func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error
FetchBackupFile writes a given backup file to the provided writer. After a successful write, the internal copy is removed.
func (*Engine) InternalBackupPath ¶
InternalBackupPath provides the internal, full path directory name of the backup. This should not be exposed via API.
func (*Engine) MeasurementCardinalityStats ¶
func (e *Engine) MeasurementCardinalityStats() (tsi1.MeasurementCardinalityStats, error)
MeasurementCardinalityStats returns cardinality stats for all measurements.
func (*Engine) MeasurementFields ¶
func (e *Engine) MeasurementFields(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, start, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error)
MeasurementFields returns an iterator which enumerates the field schema for the given bucket and measurement, filtered using the optional the predicate and limited to the time range [start, end].
MeasurementFields will always return a MeasurementFieldsIterator if there is no error.
If the context is canceled before MeasurementFields has finished processing, a non-nil error will be returned along with statistics for the already scanned data.
func (*Engine) MeasurementNames ¶
func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)
MeasurementNames returns an iterator which enumerates the measurements for the given bucket and limited to the time range [start, end].
MeasurementNames will always return a StringIterator if there is no error.
If the context is canceled before MeasurementNames has finished processing, a non-nil error will be returned along with statistics for the already scanned data.
func (*Engine) MeasurementStats ¶
func (e *Engine) MeasurementStats() (tsm1.MeasurementStats, error)
MeasurementStats returns the current measurement stats for the engine.
func (*Engine) MeasurementTagKeys ¶
func (e *Engine) MeasurementTagKeys(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)
MeasurementTagKeys returns an iterator which enumerates the tag keys for the given bucket and measurement, filtered using the optional the predicate and limited to the time range [start, end].
MeasurementTagKeys will always return a StringIterator if there is no error.
If the context is canceled before MeasurementTagKeys has finished processing, a non-nil error will be returned along with statistics for the already scanned data.
func (*Engine) MeasurementTagValues ¶
func (e *Engine) MeasurementTagValues(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)
MeasurementTagValues returns an iterator which enumerates the tag values for the given bucket, measurement and tag key, filtered using the optional the predicate and limited to the time range [start, end].
MeasurementTagValues will always return a StringIterator if there is no error.
If the context is canceled before TagValues has finished processing, a non-nil error will be returned along with statistics for the already scanned data.
func (*Engine) Open ¶
Open opens the store and all underlying resources. It returns an error if any of the underlying systems fail to open.
func (*Engine) PrometheusCollectors ¶
func (e *Engine) PrometheusCollectors() []prometheus.Collector
PrometheusCollectors returns all the prometheus collectors associated with the engine and its components.
func (*Engine) SeriesCardinality ¶
SeriesCardinality returns the number of series in the engine.
func (*Engine) TagKeys ¶
func (e *Engine) TagKeys(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)
TagKeys returns an iterator where the values are tag keys for the bucket matching the predicate within the time range [start, end].
TagKeys will always return a StringIterator if there is no error.
func (*Engine) TagValues ¶
func (e *Engine) TagValues(ctx context.Context, orgID, bucketID influxdb.ID, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)
TagValues returns an iterator which enumerates the values for the specific tagKey in the given bucket matching the predicate within the time range [start, end].
TagValues will always return a StringIterator if there is no error.
func (*Engine) WithLogger ¶
WithLogger sets the logger on the Store. It must be called before Open.
func (*Engine) WritePoints ¶
WritePoints writes the provided points to the engine.
The Engine expects all points to have been correctly validated by the caller. However, WritePoints will determine if any tag key-pairs are missing, or if there are any field type conflicts.
Appropriate errors are returned in those cases.
type LoggingPointsWriter ¶
type LoggingPointsWriter struct { // Wrapped points writer. Errored writes from here will be logged. Underlying PointsWriter // Service used to look up logging bucket. BucketFinder BucketFinder // Name of the bucket to log to. LogBucketName string }
LoggingPointsWriter wraps an underlying points writer but writes logs to another bucket when an error occurs.
func (*LoggingPointsWriter) WritePoints ¶
WritePoints writes points to the underlying PointsWriter. Logs on error.
type Option ¶
type Option func(*Engine)
Option provides a set
func WithCompactionLimiter ¶
WithCompactionLimiter allows the caller to set the limiter that a storage engine uses. A typical use-case for this would be if multiple engines should share the same limiter.
func WithCompactionPlanner ¶
func WithCompactionPlanner(planner tsm1.CompactionPlanner) Option
WithCompactionPlanner makes the engine have the provided compaction planner.
func WithCompactionSemaphore ¶
func WithCompactionSemaphore(s influxdb.Semaphore) Option
WithCompactionSemaphore sets the semaphore used to coordinate full compactions across multiple storage engines.
func WithCurrentGenerationFunc ¶
WithCurrentGenerationFunc sets a function for obtaining the current generation.
func WithEngineID ¶
WithEngineID sets an engine id, which can be useful for logging when multiple engines are in use.
func WithFileStoreObserver ¶
func WithFileStoreObserver(obs tsm1.FileStoreObserver) Option
WithFileStoreObserver makes the engine have the provided file store observer.
func WithNodeID ¶
WithNodeID sets a node id on the engine, which can be useful for logging when a system has engines running on multiple nodes.
func WithRetentionEnforcer ¶
func WithRetentionEnforcer(finder BucketFinder) Option
WithRetentionEnforcer initialises a retention enforcer on the engine. WithRetentionEnforcer must be called after other options to ensure that all metrics are labelled correctly.
func WithRetentionEnforcerLimiter ¶
func WithRetentionEnforcerLimiter(f runnable) Option
WithRetentionEnforcerLimiter sets a limiter used to control when the retention enforcer can proceed. If this option is not used then the default limiter (or the absence of one) is a no-op, and no limitations will be put on running the retention enforcer.
func WithTSMFilenameFormatter ¶
func WithTSMFilenameFormatter(fn tsm1.FormatFileNameFunc) Option
WithTSMFilenameFormatter sets a function on the underlying tsm1.Engine to specify how TSM files are named.
func WithWritePointsValidationEnabled ¶
WithWritePointsValidationEnabled sets whether written points should be validated.
type PointsWriter ¶
PointsWriter describes the ability to write points into a storage engine.
type SeriesCursor ¶
type SeriesCursor interface { Close() Next() (*SeriesCursorRow, error) }
type SeriesCursorRow ¶
type Snapshotter ¶
type Snapshotter interface {
WriteSnapshot(ctx context.Context, status tsm1.CacheStatus) error
}
A Snapshotter implementation can take snapshots of the entire engine.