control

package
v2.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package control keeps track of resources and manages queries.

The Controller manages the resources available to each query by managing the memory allocation and concurrency usage of each query. The Controller will compile a program by using the passed in language and it will start the program using the ResourceManager.

It will guarantee that each program that is started has at least one goroutine that it can use with the dispatcher and it will ensure a minimum amount of memory is available before the program runs.

Other goroutines and memory usage is at the will of the specific resource strategy that the Controller is using.

The Controller also provides visibility into the lifetime of the query and its current resource usage.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// ConcurrencyQuota is the number of queries that are allowed to execute concurrently.
	//
	// This value is limited to an int32 because it's used to set the initial delta on the
	// controller's WaitGroup, and WG deltas have an effective limit of math.MaxInt32.
	// See: https://github.com/golang/go/issues/20687
	ConcurrencyQuota int32

	// InitialMemoryBytesQuotaPerQuery is the initial number of bytes allocated for a query
	// when it is started. If this is unset, then the MemoryBytesQuotaPerQuery will be used.
	InitialMemoryBytesQuotaPerQuery int64

	// MemoryBytesQuotaPerQuery is the maximum number of bytes (in table memory) a query is allowed to use at
	// any given time.
	//
	// A query may not be able to use its entire quota of memory if requesting more memory would conflict
	// with the maximum amount of memory that the controller can request.
	MemoryBytesQuotaPerQuery int64

	// MaxMemoryBytes is the maximum amount of memory the controller is allowed to
	// allocated to queries.
	//
	// If this is unset, then this number is ConcurrencyQuota * MemoryBytesQuotaPerQuery.
	// This number must be greater than or equal to the ConcurrencyQuota * InitialMemoryBytesQuotaPerQuery.
	// This number may be less than the ConcurrencyQuota * MemoryBytesQuotaPerQuery.
	MaxMemoryBytes int64

	// QueueSize is the number of queries that are allowed to be awaiting execution before new queries are rejected.
	//
	// This value is limited to an int32 because it's used to make(chan *Query, QueueSize) on controller startup.
	// Through trial-and-error I found that make(chan *Query, N) starts to panic for N > 1<<45 - 12, so not all
	// ints or int64s are safe to pass here. Using that max value still immediately crashes the program with an OOM,
	// because it tries to allocate TBs of memory for the channel.
	// I was able to boot influxd locally using math.MaxInt32 for this parameter.
	//
	// Less-scientifically, this was the only Config parameter other than ConcurrencyQuota to be typed as an int
	// instead of an explicit int64. When ConcurrencyQuota changed to an int32, it felt like a decent idea for
	// this to follow suit.
	QueueSize int32

	// MetricLabelKeys is a list of labels to add to the metrics produced by the controller.
	// The value for a given key will be read off the context.
	// The context value must be a string or an implementation of the Stringer interface.
	MetricLabelKeys []string

	ExecutorDependencies []flux.Dependency

	// FluxLogEnabled logs any in-progress queries that get cancelled due to the server being shut down.
	FluxLogEnabled bool
}

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller provides a central location to manage all incoming queries. The controller is responsible for compiling, queueing, and executing queries.

func New

func New(config Config, logger *zap.Logger) (*Controller, error)

func (*Controller) GetUnusedMemoryBytes

func (c *Controller) GetUnusedMemoryBytes() int64

func (*Controller) GetUsedMemoryBytes

func (c *Controller) GetUsedMemoryBytes() int64

func (*Controller) PrometheusCollectors

func (c *Controller) PrometheusCollectors() []prometheus.Collector

PrometheusCollectors satisfies the prom.PrometheusCollector interface.

func (*Controller) Queries

func (c *Controller) Queries() []*Query

Queries reports the active queries.

func (*Controller) Query

func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, error)

Query satisfies the AsyncQueryService while ensuring the request is propagated on the context.

func (*Controller) Shutdown

func (c *Controller) Shutdown(ctx context.Context) error

Shutdown will signal to the Controller that it should not accept any new queries and that it should finish executing any existing queries. This will return once the Controller's run loop has been exited and all queries have been finished or until the Context has been canceled.

type Query

type Query struct {
	// contains filtered or unexported fields
}

Query represents a single request.

func (*Query) Cancel

func (q *Query) Cancel()

Cancel will stop the query execution.

func (*Query) Done

func (q *Query) Done()

Done signals to the Controller that this query is no longer being used and resources related to the query may be freed.

func (*Query) Err

func (q *Query) Err() error

Err reports any error the query may have encountered.

func (*Query) ID

func (q *Query) ID() QueryID

ID reports an ephemeral unique ID for the query.

func (*Query) ProfilerResults

func (q *Query) ProfilerResults() (flux.ResultIterator, error)

func (*Query) Results

func (q *Query) Results() <-chan flux.Result

Results returns a channel that will deliver the query results.

It's possible that the channel is closed before any results arrive. In particular, if a query's context or the query itself is canceled, the query may close the results channel before any results are computed.

The query may also have an error during execution so the Err() function should be used to check if an error happened.

func (*Query) State

func (q *Query) State() State

State reports the current state of the query.

func (*Query) Statistics

func (q *Query) Statistics() flux.Statistics

Statistics reports the statistics for the query.

This method must be called after Done. It will block until the query has been finalized unless a context is given.

type QueryID

type QueryID uint64

type State

type State int

State is the query state.

const (
	// Created indicates the query has been created.
	Created State = iota

	// Compiling indicates that the query is in the process
	// of executing the compiler associated with the query.
	Compiling

	// Queueing indicates the query is waiting inside of the
	// scheduler to be executed.
	Queueing

	// Executing indicates that the query is currently executing.
	Executing

	// Errored indicates that there was an error when attempting
	// to execute a query within any state inside of the controller.
	Errored

	// Finished indicates that the query has been marked as Done
	// and it is awaiting removal from the Controller or has already
	// been removed.
	Finished

	// Canceled indicates that the query was signaled to be
	// canceled. A canceled query must still be released with Done.
	Canceled
)

func (State) String

func (s State) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL