storage

package
v2.0.0-beta.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2020 License: MIT Imports: 33 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultRetentionInterval       = time.Hour
	DefaultSeriesFileDirectoryName = "_series"
	DefaultIndexDirectoryName      = "index"
	DefaultWALDirectoryName        = "wal"
	DefaultEngineDirectoryName     = "data"
)

Default configuration values.

Variables

View Source
var ErrEngineClosed = errors.New("engine is closed")

ErrEngineClosed is returned when a caller attempts to use the engine while it's closed.

View Source
var ErrServiceClosed = errors.New("service is currently closed")

ErrServiceClosed is returned when the service is unavailable.

Functions

func RetentionPrometheusCollectors

func RetentionPrometheusCollectors() []prometheus.Collector

RetentionPrometheusCollectors returns all prometheus metrics for retention.

Types

type BucketDeleter

type BucketDeleter interface {
	DeleteBucket(context.Context, influxdb.ID, influxdb.ID) error
}

BucketDeleter defines the behaviour of deleting a bucket.

type BucketFinder

type BucketFinder interface {
	FindBuckets(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
}

A BucketFinder is responsible for providing access to buckets via a filter.

type BucketService

type BucketService struct {
	// contains filtered or unexported fields
}

BucketService wraps an existing influxdb.BucketService implementation.

BucketService ensures that when a bucket is deleted, all stored data associated with the bucket is either removed, or marked to be removed via a future compaction.

func NewBucketService

func NewBucketService(s influxdb.BucketService, engine BucketDeleter) *BucketService

NewBucketService returns a new BucketService for the provided BucketDeleter, which typically will be an Engine.

func (*BucketService) CreateBucket

func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) error

CreateBucket creates a new bucket and sets b.ID with the new identifier.

func (*BucketService) DeleteBucket

func (s *BucketService) DeleteBucket(ctx context.Context, bucketID influxdb.ID) error

DeleteBucket removes a bucket by ID.

func (*BucketService) FindBucket

func (s *BucketService) FindBucket(ctx context.Context, filter influxdb.BucketFilter) (*influxdb.Bucket, error)

FindBucket returns the first bucket that matches filter.

func (*BucketService) FindBucketByID

func (s *BucketService) FindBucketByID(ctx context.Context, id influxdb.ID) (*influxdb.Bucket, error)

FindBucketByID returns a single bucket by ID.

func (*BucketService) FindBucketByName

func (s *BucketService) FindBucketByName(ctx context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error)

FindBucketByName returns a single bucket by name.

func (*BucketService) FindBuckets

func (s *BucketService) FindBuckets(ctx context.Context, filter influxdb.BucketFilter, opt ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)

FindBuckets returns a list of buckets that match filter and the total count of matching buckets. Additional options provide pagination & sorting.

func (*BucketService) UpdateBucket

func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error)

UpdateBucket updates a single bucket with changeset. Returns the new bucket state after update.

type BufferedPointsWriter

type BufferedPointsWriter struct {
	// contains filtered or unexported fields
}

func NewBufferedPointsWriter

func NewBufferedPointsWriter(size int, pointswriter PointsWriter) *BufferedPointsWriter

func (*BufferedPointsWriter) Available

func (b *BufferedPointsWriter) Available() int

Available returns how many models.Points are unused in the buffer.

func (*BufferedPointsWriter) Buffered

func (b *BufferedPointsWriter) Buffered() int

Buffered returns the number of models.Points that have been written into the current buffer.

func (*BufferedPointsWriter) Flush

func (b *BufferedPointsWriter) Flush(ctx context.Context) error

Flush writes any buffered data to the underlying PointsWriter.

func (*BufferedPointsWriter) WritePoints

func (b *BufferedPointsWriter) WritePoints(ctx context.Context, p []models.Point) error

WritePoints writes the points to the underlying PointsWriter.

type Config

type Config struct {
	// Frequency of retention in seconds.
	RetentionInterval toml.Duration `toml:"retention-interval"`

	// Series file config.
	SeriesFilePath string `toml:"series-file-path"` // Overrides the default path.

	// Series file config.
	SeriesFile seriesfile.Config `toml:"tsdb"`

	// WAL config.
	WAL     tsm1.WALConfig `toml:"wal"`
	WALPath string         `toml:"wal-path"` // Overrides the default path.

	// Engine config.
	Engine     tsm1.Config `toml:"engine"`
	EnginePath string      `toml:"engine-path"` // Overrides the default path.

	// Index config.
	Index     tsi1.Config `toml:"index"`
	IndexPath string      `toml:"index-path"` // Overrides the default path.
}

Config holds the configuration for an Engine.

func NewConfig

func NewConfig() Config

NewConfig initialises a new config for an Engine.

func (Config) GetEnginePath

func (c Config) GetEnginePath(base string) string

GetEnginePath returns the path to the engine.

func (Config) GetIndexPath

func (c Config) GetIndexPath(base string) string

GetIndexPath returns the path to the index.

func (Config) GetSeriesFilePath

func (c Config) GetSeriesFilePath(base string) string

GetSeriesFilePath returns the path to the series file.

func (Config) GetWALPath

func (c Config) GetWALPath(base string) string

GetWALPath returns the path to the WAL.

type Deleter

type Deleter interface {
	DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error
}

A Deleter implementation is capable of deleting data from a storage engine.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(path string, c Config, options ...Option) *Engine

NewEngine initialises a new storage engine, including a series file, index and TSM engine.

func (*Engine) AcquireSegments

func (e *Engine) AcquireSegments(ctx context.Context, fn func(segs []string) error) error

AcquireSegments closes the current WAL segment, gets the set of all the currently closed segments, and calls the callback. It does all of this under the lock on the engine.

func (*Engine) Close

func (e *Engine) Close() error

Close closes the store and all underlying resources. It returns an error if any of the underlying systems fail to close.

func (*Engine) CommitSegments

func (e *Engine) CommitSegments(ctx context.Context, segs []string, fn func() error) error

CommitSegments calls the callback and if that does not return an error, removes the segment files from the WAL. It does all of this under the lock on the engine.

func (*Engine) CreateBackup

func (e *Engine) CreateBackup(ctx context.Context) (int, []string, error)

CreateBackup creates a "snapshot" of all TSM data in the Engine.

  1. Snapshot the cache to ensure the backup includes all data written before now.
  2. Create hard links to all TSM files, in a new directory within the engine root directory.
  3. Return a unique backup ID (invalid after the process terminates) and list of files.

func (*Engine) CreateCursorIterator

func (e *Engine) CreateCursorIterator(ctx context.Context) (cursors.CursorIterator, error)

CreateCursorIterator creates a CursorIterator for usage with the read service.

func (*Engine) CreateSeriesCursor

func (e *Engine) CreateSeriesCursor(ctx context.Context, orgID, bucketID influxdb.ID, cond influxql.Expr) (SeriesCursor, error)

CreateSeriesCursor creates a SeriesCursor for usage with the read service.

func (*Engine) DeleteBucket

func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID) error

DeleteBucket deletes an entire bucket from the storage engine.

func (*Engine) DeleteBucketRange

func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error

DeleteBucketRange deletes an entire bucket from the storage engine.

func (*Engine) DeleteBucketRangePredicate

func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error

DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data deleted must be in [min, max], and the key must match the predicate if provided.

func (*Engine) DisableCompactions

func (e *Engine) DisableCompactions()

DisableCompactions disables compactions in the series file, index, & engine.

func (*Engine) EnableCompactions

func (e *Engine) EnableCompactions()

EnableCompactions allows the series file, index, & underlying engine to compact.

func (*Engine) FetchBackupFile

func (e *Engine) FetchBackupFile(ctx context.Context, backupID int, backupFile string, w io.Writer) error

FetchBackupFile writes a given backup file to the provided writer. After a successful write, the internal copy is removed.

func (*Engine) InternalBackupPath

func (e *Engine) InternalBackupPath(backupID int) string

InternalBackupPath provides the internal, full path directory name of the backup. This should not be exposed via API.

func (*Engine) MeasurementCardinalityStats

func (e *Engine) MeasurementCardinalityStats() (tsi1.MeasurementCardinalityStats, error)

MeasurementCardinalityStats returns cardinality stats for all measurements.

func (*Engine) MeasurementFields

func (e *Engine) MeasurementFields(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, start, end int64, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error)

MeasurementFields returns an iterator which enumerates the field schema for the given bucket and measurement, filtered using the optional the predicate and limited to the time range [start, end].

MeasurementFields will always return a MeasurementFieldsIterator if there is no error.

If the context is canceled before MeasurementFields has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) MeasurementFieldsNoTime

func (e *Engine) MeasurementFieldsNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, predicate influxql.Expr) (cursors.MeasurementFieldsIterator, error)

MeasurementFieldsNoTime returns an iterator which enumerates the field schema for the given bucket and measurement, filtered using the optional the predicate.

MeasurementFieldsNoTime will always return a MeasurementFieldsIterator if there is no error.

If the context is canceled before MeasurementFieldsNoTime has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) MeasurementNames

func (e *Engine) MeasurementNames(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)

MeasurementNames returns an iterator which enumerates the measurements for the given bucket and limited to the time range [start, end].

MeasurementNames will always return a StringIterator if there is no error.

If the context is canceled before MeasurementNames has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) MeasurementNamesNoTime

func (e *Engine) MeasurementNamesNoTime(ctx context.Context, orgID, bucketID influxdb.ID, predicate influxql.Expr) (cursors.StringIterator, error)

MeasurementNamesNoTime returns an iterator which enumerates the measurements for the given bucket.

MeasurementNamesNoTime will always return a StringIterator if there is no error.

If the context is canceled before MeasurementNamesNoTime has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) MeasurementStats

func (e *Engine) MeasurementStats() (tsm1.MeasurementStats, error)

MeasurementStats returns the current measurement stats for the engine.

func (*Engine) MeasurementTagKeys

func (e *Engine) MeasurementTagKeys(ctx context.Context, orgID, bucketID influxdb.ID, measurement string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)

MeasurementTagKeys returns an iterator which enumerates the tag keys for the given bucket and measurement, filtered using the optional the predicate and limited to the time range [start, end].

MeasurementTagKeys will always return a StringIterator if there is no error.

If the context is canceled before MeasurementTagKeys has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) MeasurementTagKeysNoTime

func (e *Engine) MeasurementTagKeysNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, predicate influxql.Expr) (cursors.StringIterator, error)

MeasurementTagKeysNoTime returns an iterator which enumerates the tag keys for the given bucket, measurement and tag key and filtered using the optional the predicate.

MeasurementTagKeysNoTime will always return a StringIterator if there is no error.

If the context is canceled before MeasurementTagKeysNoTime has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) MeasurementTagValues

func (e *Engine) MeasurementTagValues(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)

MeasurementTagValues returns an iterator which enumerates the tag values for the given bucket, measurement and tag key, filtered using the optional the predicate and limited to the time range [start, end].

MeasurementTagValues will always return a StringIterator if there is no error.

If the context is canceled before TagValues has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) MeasurementTagValuesNoTime

func (e *Engine) MeasurementTagValuesNoTime(ctx context.Context, orgID, bucketID influxdb.ID, measurement, tagKey string, predicate influxql.Expr) (cursors.StringIterator, error)

MeasurementTagValuesNoTime returns an iterator which enumerates the tag values for the given bucket, measurement and tag key and filtered using the optional the predicate.

MeasurementTagValuesNoTime will always return a StringIterator if there is no error.

If the context is canceled before MeasurementTagValuesNoTime has finished processing, a non-nil error will be returned along with statistics for the already scanned data.

func (*Engine) Open

func (e *Engine) Open(ctx context.Context) (err error)

Open opens the store and all underlying resources. It returns an error if any of the underlying systems fail to open.

func (*Engine) Path

func (e *Engine) Path() string

Path returns the path of the engine's base directory.

func (*Engine) PrometheusCollectors

func (e *Engine) PrometheusCollectors() []prometheus.Collector

PrometheusCollectors returns all the prometheus collectors associated with the engine and its components.

func (*Engine) SeriesCardinality

func (e *Engine) SeriesCardinality() int64

SeriesCardinality returns the number of series in the engine.

func (*Engine) TagKeys

func (e *Engine) TagKeys(ctx context.Context, orgID, bucketID influxdb.ID, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)

TagKeys returns an iterator where the values are tag keys for the bucket matching the predicate within the time range [start, end].

TagKeys will always return a StringIterator if there is no error.

func (*Engine) TagValues

func (e *Engine) TagValues(ctx context.Context, orgID, bucketID influxdb.ID, tagKey string, start, end int64, predicate influxql.Expr) (cursors.StringIterator, error)

TagValues returns an iterator which enumerates the values for the specific tagKey in the given bucket matching the predicate within the time range [start, end].

TagValues will always return a StringIterator if there is no error.

func (*Engine) WithLogger

func (e *Engine) WithLogger(log *zap.Logger)

WithLogger sets the logger on the Store. It must be called before Open.

func (*Engine) WritePoints

func (e *Engine) WritePoints(ctx context.Context, points []models.Point) error

WritePoints writes the provided points to the engine.

The Engine expects all points to have been correctly validated by the caller. However, WritePoints will determine if any tag key-pairs are missing, or if there are any field type conflicts.

Appropriate errors are returned in those cases.

type LoggingPointsWriter

type LoggingPointsWriter struct {
	// Wrapped points writer. Errored writes from here will be logged.
	Underlying PointsWriter

	// Service used to look up logging bucket.
	BucketFinder BucketFinder

	// Name of the bucket to log to.
	LogBucketName string
}

LoggingPointsWriter wraps an underlying points writer but writes logs to another bucket when an error occurs.

func (*LoggingPointsWriter) WritePoints

func (w *LoggingPointsWriter) WritePoints(ctx context.Context, p []models.Point) error

WritePoints writes points to the underlying PointsWriter. Logs on error.

type Option

type Option func(*Engine)

Option provides a set

func WithCompactionLimiter

func WithCompactionLimiter(limiter limiter.Fixed) Option

WithCompactionLimiter allows the caller to set the limiter that a storage engine uses. A typical use-case for this would be if multiple engines should share the same limiter.

func WithCompactionPlanner

func WithCompactionPlanner(planner tsm1.CompactionPlanner) Option

WithCompactionPlanner makes the engine have the provided compaction planner.

func WithCompactionSemaphore

func WithCompactionSemaphore(s influxdb.Semaphore) Option

WithCompactionSemaphore sets the semaphore used to coordinate full compactions across multiple storage engines.

func WithCurrentGenerationFunc

func WithCurrentGenerationFunc(fn func() int) Option

WithCurrentGenerationFunc sets a function for obtaining the current generation.

func WithEngineID

func WithEngineID(id int) Option

WithEngineID sets an engine id, which can be useful for logging when multiple engines are in use.

func WithFileStoreObserver

func WithFileStoreObserver(obs tsm1.FileStoreObserver) Option

WithFileStoreObserver makes the engine have the provided file store observer.

func WithNodeID

func WithNodeID(id int) Option

WithNodeID sets a node id on the engine, which can be useful for logging when a system has engines running on multiple nodes.

func WithPageFaultLimiter

func WithPageFaultLimiter(limiter *rate.Limiter) Option

WithPageFaultLimiter allows the caller to set the limiter for restricting the frequency of page faults.

func WithRetentionEnforcer

func WithRetentionEnforcer(finder BucketFinder) Option

WithRetentionEnforcer initialises a retention enforcer on the engine. WithRetentionEnforcer must be called after other options to ensure that all metrics are labelled correctly.

func WithRetentionEnforcerLimiter

func WithRetentionEnforcerLimiter(f runnable) Option

WithRetentionEnforcerLimiter sets a limiter used to control when the retention enforcer can proceed. If this option is not used then the default limiter (or the absence of one) is a no-op, and no limitations will be put on running the retention enforcer.

func WithTSMFilenameFormatter

func WithTSMFilenameFormatter(fn tsm1.FormatFileNameFunc) Option

WithTSMFilenameFormatter sets a function on the underlying tsm1.Engine to specify how TSM files are named.

func WithWritePointsValidationEnabled

func WithWritePointsValidationEnabled(v bool) Option

WithWritePointsValidationEnabled sets whether written points should be validated.

type PointsWriter

type PointsWriter interface {
	WritePoints(context.Context, []models.Point) error
}

PointsWriter describes the ability to write points into a storage engine.

type SeriesCursor

type SeriesCursor interface {
	Close()
	Next() (*SeriesCursorRow, error)
}

type SeriesCursorRow

type SeriesCursorRow struct {
	Name []byte
	Tags models.Tags
}

type Snapshotter

type Snapshotter interface {
	WriteSnapshot(ctx context.Context, status tsm1.CacheStatus) error
}

A Snapshotter implementation can take snapshots of the entire engine.

Directories

Path Synopsis
package compat helps with migrating toml files from influxdb.
package compat helps with migrating toml files from influxdb.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL