coordinator

package
v0.0.0-...-2f6135f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2020 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package coordinator contains abstractions for writing points, executing statements, and accessing meta data.

Index

Constants

View Source
const (
	// DefaultWriteTimeout is the default timeout for a complete write to succeed.
	DefaultWriteTimeout = 10 * time.Second

	// DefaultMaxConcurrentQueries is the maximum number of running queries.
	// A value of zero will make the maximum query limit unlimited.
	DefaultMaxConcurrentQueries = 0

	// DefaultMaxSelectPointN is the maximum number of points a SELECT can process.
	// A value of zero will make the maximum point count unlimited.
	DefaultMaxSelectPointN = 0

	// DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run.
	// A value of zero will make the maximum series count unlimited.
	DefaultMaxSelectSeriesN = 0
)

Variables

View Source
var (
	// ErrTimeout is returned when a write times out.
	ErrTimeout = errors.New("timeout")

	// ErrPartialWrite is returned when a write partially succeeds but does
	// not meet the requested consistency level.
	ErrPartialWrite = errors.New("partial write")

	// ErrWriteFailed is returned when no writes succeeded.
	ErrWriteFailed = errors.New("write failed")
)
View Source
var ErrDatabaseNameRequired = errors.New("database name required")

ErrDatabaseNameRequired is returned when executing statements that require a database, when a database has not been provided.

Functions

This section is empty.

Types

type BufferedPointsWriter

type BufferedPointsWriter struct {
	// contains filtered or unexported fields
}

BufferedPointsWriter adds buffering to a pointsWriter so that SELECT INTO queries write their points to the destination in batches.

func NewBufferedPointsWriter

func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter

NewBufferedPointsWriter returns a new BufferedPointsWriter.

func (*BufferedPointsWriter) Cap

func (w *BufferedPointsWriter) Cap() int

Cap returns the capacity (in points) of the buffer.

func (*BufferedPointsWriter) Flush

func (w *BufferedPointsWriter) Flush() error

Flush writes all buffered points to the underlying writer.

func (*BufferedPointsWriter) Len

func (w *BufferedPointsWriter) Len() int

Len returns the number of points buffered.

func (*BufferedPointsWriter) WritePointsInto

func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error

WritePointsInto implements pointsWriter for BufferedPointsWriter.

type Config

type Config struct {
	WriteTimeout         toml.Duration `toml:"write-timeout"`
	MaxConcurrentQueries int           `toml:"max-concurrent-queries"`
	QueryTimeout         toml.Duration `toml:"query-timeout"`
	LogQueriesAfter      toml.Duration `toml:"log-queries-after"`
	MaxSelectPointN      int           `toml:"max-select-point"`
	MaxSelectSeriesN     int           `toml:"max-select-series"`
	MaxSelectBucketsN    int           `toml:"max-select-buckets"`
}

Config represents the configuration for the coordinator service.

func NewConfig

func NewConfig() Config

NewConfig returns an instance of Config with defaults.

func (Config) Diagnostics

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

type IntoWriteRequest

type IntoWriteRequest struct {
	Database        string
	RetentionPolicy string
	Points          []models.Point
}

IntoWriteRequest is a partial copy of cluster.WriteRequest

type IteratorCreator

type IteratorCreator interface {
	query.IteratorCreator
	influxql.FieldMapper
	io.Closer
}

IteratorCreator is an interface that combines mapping fields and creating iterators.

type LocalShardMapper

type LocalShardMapper struct {
	MetaClient interface {
		ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
	}

	TSDBStore interface {
		ShardGroup(ids []uint64) tsdb.ShardGroup
	}
}

LocalShardMapper implements a ShardMapper for local shards.

func (*LocalShardMapper) MapShards

MapShards maps the sources to the appropriate shards into an IteratorCreator.

type LocalShardMapping

type LocalShardMapping struct {
	ShardMap map[Source]tsdb.ShardGroup

	// MinTime is the minimum time that this shard mapper will allow.
	// Any attempt to use a time before this one will automatically result in using
	// this time instead.
	MinTime time.Time

	// MaxTime is the maximum time that this shard mapper will allow.
	// Any attempt to use a time after this one will automatically result in using
	// this time instead.
	MaxTime time.Time
}

ShardMapper maps data sources to a list of shard information.

func (*LocalShardMapping) Close

func (a *LocalShardMapping) Close() error

Close clears out the list of mapped shards.

func (*LocalShardMapping) CreateIterator

func (*LocalShardMapping) FieldDimensions

func (a *LocalShardMapping) FieldDimensions(m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)

func (*LocalShardMapping) IteratorCost

func (*LocalShardMapping) MapType

type LocalTSDBStore

type LocalTSDBStore struct {
	*tsdb.Store
}

LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator to satisfy the TSDBStore interface.

type MetaClient

type MetaClient interface {
	CreateContinuousQuery(database, name, query string) error
	CreateDatabase(name string) (*meta.DatabaseInfo, error)
	CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
	CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
	CreateSubscription(database, rp, name, mode string, destinations []string) error
	CreateUser(name, password string, admin bool) (meta.User, error)
	Database(name string) *meta.DatabaseInfo
	Databases() []meta.DatabaseInfo
	DropShard(id uint64) error
	DropContinuousQuery(database, name string) error
	DropDatabase(name string) error
	DropRetentionPolicy(database, name string) error
	DropSubscription(database, rp, name string) error
	DropUser(name string) error
	RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
	SetAdminPrivilege(username string, admin bool) error
	SetPrivilege(username, database string, p influxql.Privilege) error
	ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
	TruncateShardGroups(t time.Time) error
	UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
	UpdateUser(name, password string) error
	UserPrivilege(username, database string) (*influxql.Privilege, error)
	UserPrivileges(username string) (map[string]influxql.Privilege, error)
	Users() []meta.UserInfo
}

MetaClient is an interface for accessing meta data.

type PointsWriter

type PointsWriter struct {
	WriteTimeout time.Duration
	Logger       *zap.Logger

	Node *influxdb.Node

	MetaClient interface {
		Database(name string) (di *meta.DatabaseInfo)
		RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
		CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
	}

	TSDBStore interface {
		CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
		WriteToShard(shardID uint64, points []models.Point) error
	}
	// contains filtered or unexported fields
}

PointsWriter handles writes across multiple local and remote data nodes.

func NewPointsWriter

func NewPointsWriter() *PointsWriter

NewPointsWriter returns a new instance of PointsWriter for a node.

func (*PointsWriter) AddWriteSubscriber

func (w *PointsWriter) AddWriteSubscriber(c chan<- *WritePointsRequest)

func (*PointsWriter) Close

func (w *PointsWriter) Close() error

Close closes the communication channel with the point writer.

func (*PointsWriter) MapShards

func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)

MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.

func (*PointsWriter) Open

func (w *PointsWriter) Open() error

Open opens the communication channel with the point writer.

func (*PointsWriter) Statistics

func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*PointsWriter) WithLogger

func (w *PointsWriter) WithLogger(log *zap.Logger)

WithLogger sets the Logger on w.

func (*PointsWriter) WritePoints

func (w *PointsWriter) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error

WritePoints writes the data to the underlying storage. consitencyLevel and user are only used for clustered scenarios

func (*PointsWriter) WritePointsInto

func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error

WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of a cluster structure for information. This is to avoid a circular dependency.

func (*PointsWriter) WritePointsPrivileged

func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error

WritePointsPrivileged writes the data to the underlying storage, consitencyLevel is only used for clustered scenarios

type ShardIteratorCreator

type ShardIteratorCreator interface {
	ShardIteratorCreator(id uint64) query.IteratorCreator
}

ShardIteratorCreator is an interface for creating an IteratorCreator to access a specific shard.

type ShardMapping

type ShardMapping struct {
	Points  map[uint64][]models.Point  // The points associated with a shard ID
	Shards  map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
	Dropped []models.Point             // Points that were dropped
	// contains filtered or unexported fields
}

ShardMapping contains a mapping of shards to points.

func NewShardMapping

func NewShardMapping(n int) *ShardMapping

NewShardMapping creates an empty ShardMapping.

func (*ShardMapping) MapPoint

func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point)

MapPoint adds the point to the ShardMapping, associated with the given shardInfo.

type Source

type Source struct {
	Database        string
	RetentionPolicy string
}

Source contains the database and retention policy source for data.

type StatementExecutor

type StatementExecutor struct {
	MetaClient MetaClient

	// TaskManager holds the StatementExecutor that handles task-related commands.
	TaskManager query.StatementExecutor

	// TSDB storage for local node.
	TSDBStore TSDBStore

	// ShardMapper for mapping shards when executing a SELECT statement.
	ShardMapper query.ShardMapper

	// Holds monitoring data for SHOW STATS and SHOW DIAGNOSTICS.
	Monitor *monitor.Monitor

	// Used for rewriting points back into system for SELECT INTO statements.
	PointsWriter pointsWriter

	// Select statement limits
	MaxSelectPointN   int
	MaxSelectSeriesN  int
	MaxSelectBucketsN int
}

StatementExecutor executes a statement in the query.

func (*StatementExecutor) ExecuteStatement

func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *query.ExecutionContext) error

ExecuteStatement executes the given statement with the given execution context.

func (*StatementExecutor) NormalizeStatement

func (e *StatementExecutor) NormalizeStatement(stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string) (err error)

NormalizeStatement adds a default database and policy to the measurements in statement. Parameter defaultRetentionPolicy can be "".

type TSDBStore

type TSDBStore interface {
	CreateShard(database, policy string, shardID uint64, enabled bool) error
	WriteToShard(shardID uint64, points []models.Point) error

	RestoreShard(id uint64, r io.Reader) error
	BackupShard(id uint64, since time.Time, w io.Writer) error

	DeleteDatabase(name string) error
	DeleteMeasurement(database, name string) error
	DeleteRetentionPolicy(database, name string) error
	DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
	DeleteShard(id uint64) error

	MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
	TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
	TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)

	SeriesCardinality(database string) (int64, error)
	MeasurementsCardinality(database string) (int64, error)
}

TSDBStore is an interface for accessing the time series data store.

type WritePointsRequest

type WritePointsRequest struct {
	Database        string
	RetentionPolicy string
	Points          []models.Point
}

WritePointsRequest represents a request to write point data to the cluster.

func (*WritePointsRequest) AddPoint

func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)

AddPoint adds a point to the WritePointRequest with field key 'value'

type WriteStatistics

type WriteStatistics struct {
	WriteReq           int64
	PointWriteReq      int64
	PointWriteReqLocal int64
	WriteOK            int64
	WriteDropped       int64
	WriteTimeout       int64
	WriteErr           int64
	SubWriteOK         int64
	SubWriteDrop       int64
}

WriteStatistics keeps statistics related to the PointsWriter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL