coordinator

package
v2.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2024 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package coordinator contains abstractions for writing points, executing statements, and accessing meta data.

Index

Constants

View Source
const (
	// DefaultMaxConcurrentQueries is the maximum number of running queries.
	// A value of zero will make the maximum query limit unlimited.
	DefaultMaxConcurrentQueries = 0

	// DefaultMaxSelectPointN is the maximum number of points a SELECT can process.
	// A value of zero will make the maximum point count unlimited.
	DefaultMaxSelectPointN = 0

	// DefaultMaxSelectSeriesN is the maximum number of series a SELECT can run.
	// A value of zero will make the maximum series count unlimited.
	DefaultMaxSelectSeriesN = 0
)

Variables

View Source
var (
	// ErrTimeout is returned when a write times out.
	ErrTimeout = errors.New("timeout")

	// ErrWriteFailed is returned when no writes succeeded.
	ErrWriteFailed = errors.New("write failed")
)
View Source
var ErrDatabaseNameRequired = errors.New("database name required")

ErrDatabaseNameRequired is returned when executing statements that require a database, when a database has not been provided.

Functions

func PrometheusCollectors added in v2.2.0

func PrometheusCollectors() []prometheus.Collector

PrometheusCollectors returns all prometheus metrics for the tsm1 package.

Types

type Config

type Config struct {
	MaxConcurrentQueries int           `toml:"max-concurrent-queries"`
	LogQueriesAfter      toml.Duration `toml:"log-queries-after"`
	MaxSelectPointN      int           `toml:"max-select-point"`
	MaxSelectSeriesN     int           `toml:"max-select-series"`
	MaxSelectBucketsN    int           `toml:"max-select-buckets"`
}

Config represents the configuration for the coordinator service.

func NewConfig

func NewConfig() Config

NewConfig returns an instance of Config with defaults.

type IteratorCreator

type IteratorCreator interface {
	query.IteratorCreator
	influxql.FieldMapper
	io.Closer
}

IteratorCreator is an interface that combines mapping fields and creating iterators.

type LocalShardMapper

type LocalShardMapper struct {
	MetaClient interface {
		ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
	}

	TSDBStore interface {
		ShardGroup(ids []uint64) tsdb.ShardGroup
	}

	DBRP influxdb.DBRPMappingService
}

LocalShardMapper implements a ShardMapper for local shards.

func (*LocalShardMapper) MapShards

MapShards maps the sources to the appropriate shards into an IteratorCreator.

type LocalShardMapping

type LocalShardMapping struct {
	ShardMap map[Source]tsdb.ShardGroup

	// MinTime is the minimum time that this shard mapper will allow.
	// Any attempt to use a time before this one will automatically result in using
	// this time instead.
	MinTime time.Time

	// MaxTime is the maximum time that this shard mapper will allow.
	// Any attempt to use a time after this one will automatically result in using
	// this time instead.
	MaxTime time.Time
}

ShardMapper maps data sources to a list of shard information.

func (*LocalShardMapping) Close

func (a *LocalShardMapping) Close() error

Close clears out the list of mapped shards.

func (*LocalShardMapping) CreateIterator

func (*LocalShardMapping) FieldDimensions

func (a *LocalShardMapping) FieldDimensions(ctx context.Context, m *influxql.Measurement) (fields map[string]influxql.DataType, dimensions map[string]struct{}, err error)

func (*LocalShardMapping) IteratorCost

func (*LocalShardMapping) MapType

type LocalTSDBStore

type LocalTSDBStore struct {
	*tsdb.Store
}

LocalTSDBStore embeds a tsdb.Store and implements IteratorCreator to satisfy the TSDBStore interface.

type MetaClient

type MetaClient interface {
	CreateContinuousQuery(database, name, query string) error
	CreateDatabase(name string) (*meta.DatabaseInfo, error)
	CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
	CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
	CreateSubscription(database, rp, name, mode string, destinations []string) error
	CreateUser(name, password string, admin bool) (meta.User, error)
	Database(name string) *meta.DatabaseInfo
	Databases() []meta.DatabaseInfo
	DropShard(id uint64) error
	DropContinuousQuery(database, name string) error
	DropDatabase(name string) error
	DropRetentionPolicy(database, name string) error
	DropSubscription(database, rp, name string) error
	DropUser(name string) error
	RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
	SetAdminPrivilege(username string, admin bool) error
	SetPrivilege(username, database string, p influxql.Privilege) error
	ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
	TruncateShardGroups(t time.Time) error
	UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
	UpdateUser(name, password string) error
	UserPrivilege(username, database string) (*influxql.Privilege, error)
	UserPrivileges(username string) (map[string]influxql.Privilege, error)
	Users() []meta.UserInfo
}

MetaClient is an interface for accessing meta data.

type PointsWriter

type PointsWriter struct {
	WriteTimeout time.Duration
	Logger       *zap.Logger

	Node *influxdb.Node

	MetaClient interface {
		Database(name string) (di *meta.DatabaseInfo)
		RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
		CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
	}

	TSDBStore interface {
		CreateShard(ctx context.Context, database, retentionPolicy string, shardID uint64, enabled bool) error
		WriteToShard(ctx context.Context, shardID uint64, points []models.Point) error
	}
	// contains filtered or unexported fields
}

PointsWriter handles writes across multiple local and remote data nodes.

func NewPointsWriter

func NewPointsWriter(writeTimeout time.Duration, path string) *PointsWriter

NewPointsWriter returns a new instance of PointsWriter for a node.

func (*PointsWriter) Close

func (w *PointsWriter) Close() error

Close closes the communication channel with the point writer.

func (*PointsWriter) MapShards

func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)

MapShards maps the points contained in wp to a ShardMapping. If a point maps to a shard group or shard that does not currently exist, it will be created before returning the mapping.

func (*PointsWriter) Open

func (w *PointsWriter) Open() error

Open opens the communication channel with the point writer.

func (*PointsWriter) WithLogger

func (w *PointsWriter) WithLogger(log *zap.Logger)

WithLogger sets the Logger on w.

func (*PointsWriter) WritePoints

func (w *PointsWriter) WritePoints(
	ctx context.Context,
	database, retentionPolicy string,
	consistencyLevel models.ConsistencyLevel,
	user meta.User,
	points []models.Point,
) error

WritePoints writes the data to the underlying storage. consistencyLevel and user are only used for clustered scenarios

func (*PointsWriter) WritePointsPrivileged

func (w *PointsWriter) WritePointsPrivileged(
	ctx context.Context,
	database, retentionPolicy string,
	consistencyLevel models.ConsistencyLevel,
	points []models.Point,
) error

WritePointsPrivileged writes the data to the underlying storage, consistencyLevel is only used for clustered scenarios

type ShardMapping

type ShardMapping struct {
	Points  map[uint64][]models.Point  // The points associated with a shard ID
	Shards  map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
	Dropped []models.Point             // Points that were dropped
	// contains filtered or unexported fields
}

ShardMapping contains a mapping of shards to points.

func NewShardMapping

func NewShardMapping(n int) *ShardMapping

NewShardMapping creates an empty ShardMapping.

func (*ShardMapping) MapPoint

func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point)

MapPoint adds the point to the ShardMapping, associated with the given shardInfo.

type Source

type Source struct {
	Database        string
	RetentionPolicy string
}

Source contains the database and retention policy source for data.

type StatementExecutor

type StatementExecutor struct {
	MetaClient MetaClient

	// TSDB storage for local node.
	TSDBStore TSDBStore

	// ShardMapper for mapping shards when executing a SELECT statement.
	ShardMapper query.ShardMapper

	DBRP influxdb.DBRPMappingService

	// Select statement limits
	MaxSelectPointN   int
	MaxSelectSeriesN  int
	MaxSelectBucketsN int
}

StatementExecutor executes a statement in the query.

func (*StatementExecutor) ExecuteStatement

func (e *StatementExecutor) ExecuteStatement(ctx context.Context, stmt influxql.Statement, ectx *query.ExecutionContext) error

ExecuteStatement executes the given statement with the given execution context.

func (*StatementExecutor) NormalizeStatement

func (e *StatementExecutor) NormalizeStatement(ctx context.Context, stmt influxql.Statement, defaultDatabase, defaultRetentionPolicy string, ectx *query.ExecutionContext) (err error)

NormalizeStatement adds a default database and policy to the measurements in statement. Parameter defaultRetentionPolicy can be "".

type TSDBStore

type TSDBStore interface {
	DeleteMeasurement(ctx context.Context, database, name string) error
	DeleteSeries(ctx context.Context, database string, sources []influxql.Source, condition influxql.Expr) error
	MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
	TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
	TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
}

TSDBStore is an interface for accessing the time series data store.

type WritePointsRequest

type WritePointsRequest struct {
	Database        string
	RetentionPolicy string
	Points          []models.Point
}

WritePointsRequest represents a request to write point data to the cluster.

func (*WritePointsRequest) AddPoint

func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string)

AddPoint adds a point to the WritePointRequest with field key 'value'

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL