continuous_querier

package
v0.0.0-...-2f6135f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2020 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package continuous_querier provides the continuous query service.

Index

Constants

View Source
const (
	// The default value of how often to check whether any CQs need to be run.
	DefaultRunInterval = time.Second
)

Default values for aspects of interval computation.

View Source
const (
	// NoChunkingSize specifies when not to chunk results. When planning
	// a select statement, passing zero tells it not to chunk results.
	// Only applies to raw queries.
	NoChunkingSize = 0
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Enables logging in CQ service to display when CQ's are processed and how many points were written.
	LogEnabled bool `toml:"log-enabled"`

	// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
	Enabled bool `toml:"enabled"`

	// QueryStatsEnabled enables logging of individual query execution statistics to the self-monitoring data
	// store. The default is false.
	QueryStatsEnabled bool `toml:"query-stats-enabled"`

	// Run interval for checking continuous queries. This should be set to the least common factor
	// of the interval for running continuous queries. If you only aggregate continuous queries
	// every minute, this should be set to 1 minute. The default is set to '1s' so the interval
	// is compatible with most aggregations.
	RunInterval toml.Duration `toml:"run-interval"`
}

Config represents a configuration for the continuous query service.

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of Config with defaults.

func (Config) Diagnostics

func (c Config) Diagnostics() (*diagnostics.Diagnostics, error)

Diagnostics returns a diagnostics representation of a subset of the Config.

func (Config) Validate

func (c Config) Validate() error

Validate returns an error if the Config is invalid.

type ContinuousQuerier

type ContinuousQuerier interface {
	// Run executes the named query in the named database.  Blank database or name matches all.
	Run(database, name string, t time.Time) error
}

ContinuousQuerier represents a service that executes continuous queries.

type ContinuousQuery

type ContinuousQuery struct {
	Database string
	Info     *meta.ContinuousQueryInfo
	HasRun   bool
	LastRun  time.Time
	Resample ResampleOptions
	// contains filtered or unexported fields
}

ContinuousQuery is a local wrapper / helper around continuous queries.

func NewContinuousQuery

func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error)

NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement.

type Monitor

type Monitor interface {
	Enabled() bool
	WritePoints(models.Points) error
}

type ResampleOptions

type ResampleOptions struct {
	// The query will be resampled at this time interval. The first query will be
	// performed at this time interval. If this option is not given, the resample
	// interval is set to the group by interval.
	Every time.Duration

	// The query will continue being resampled for this time duration. If this
	// option is not given, the resample duration is the same as the group by
	// interval. A bucket's time is calculated based on the bucket's start time,
	// so a 40m resample duration with a group by interval of 10m will resample
	// the bucket 4 times (using the default time interval).
	For time.Duration
}

ResampleOptions controls the resampling intervals and duration of this continuous query.

type RunRequest

type RunRequest struct {
	// Now tells the CQ serivce what the current time is.
	Now time.Time
	// CQs tells the CQ service which queries to run.
	// If nil, all queries will be run.
	CQs []string
}

RunRequest is a request to run one or more CQs.

type Service

type Service struct {
	MetaClient    metaClient
	QueryExecutor *query.Executor
	Monitor       Monitor
	Config        *Config
	RunInterval   time.Duration
	// RunCh can be used by clients to signal service to run CQs.
	RunCh  chan *RunRequest
	Logger *zap.Logger
	// contains filtered or unexported fields
}

Service manages continuous query execution.

func NewService

func NewService(c Config) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

Close stops the service.

func (*Service) ExecuteContinuousQuery

func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) (bool, error)

ExecuteContinuousQuery may execute a single CQ. This will return false if there were no errors and the CQ was not run.

func (*Service) Open

func (s *Service) Open() error

Open starts the service.

func (*Service) Run

func (s *Service) Run(database, name string, t time.Time) error

Run runs the specified continuous query, or all CQs if none is specified.

func (*Service) Statistics

func (s *Service) Statistics(tags map[string]string) []models.Statistic

Statistics returns statistics for periodic monitoring.

func (*Service) WithLogger

func (s *Service) WithLogger(log *zap.Logger)

WithLogger sets the logger on the service.

type Statistics

type Statistics struct {
	QueryOK   int64
	QueryFail int64
}

Statistics maintains the statistics for the continuous query service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL