controller

package
v0.39.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnevaluated = errors.New("managed component not built")

ErrUnevaluated is returned if ComponentNode.Run is called before a managed component is built.

Functions

This section is empty.

Types

type ArgumentConfigNode added in v0.34.0

type ArgumentConfigNode struct {
	// contains filtered or unexported fields
}

func NewArgumentConfigNode added in v0.34.0

func NewArgumentConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ArgumentConfigNode

NewArgumentConfigNode creates a new ArgumentConfigNode from an initial ast.BlockStmt. The underlying config isn't applied until Evaluate is called.

func (*ArgumentConfigNode) Block added in v0.34.0

func (cn *ArgumentConfigNode) Block() *ast.BlockStmt

Block implements BlockNode and returns the current block of the managed config node.

func (*ArgumentConfigNode) Default added in v0.34.0

func (cn *ArgumentConfigNode) Default() any

func (*ArgumentConfigNode) Evaluate added in v0.34.0

func (cn *ArgumentConfigNode) Evaluate(scope *vm.Scope) error

Evaluate implements BlockNode and updates the arguments for the managed config block by re-evaluating its River block with the provided scope. The managed config block will be built the first time Evaluate is called.

Evaluate will return an error if the River block cannot be evaluated or if decoding to arguments fails.

func (*ArgumentConfigNode) Label added in v0.34.0

func (cn *ArgumentConfigNode) Label() string

func (*ArgumentConfigNode) NodeID added in v0.34.0

func (cn *ArgumentConfigNode) NodeID() string

NodeID implements dag.Node and returns the unique ID for the config node.

func (*ArgumentConfigNode) Optional added in v0.34.0

func (cn *ArgumentConfigNode) Optional() bool

type BlockNode added in v0.33.0

type BlockNode interface {
	dag.Node

	// Block returns the current block of the managed config node.
	Block() *ast.BlockStmt

	// Evaluate updates the arguments for the managed component
	// by re-evaluating its River block with the provided scope. The managed component
	// will be built the first time Evaluate is called.
	//
	// Evaluate will return an error if the River block cannot be evaluated or if
	// decoding to arguments fails.
	Evaluate(scope *vm.Scope) error
}

BlockNode is a node in the DAG which manages a River block and can be evaluated.

func NewConfigNode added in v0.29.0

func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, diag.Diagnostics)

NewConfigNode creates a new ConfigNode from an initial ast.BlockStmt. The underlying config isn't applied until Evaluate is called.

type ComponentGlobals

type ComponentGlobals struct {
	Logger              *logging.Logger                        // Logger shared between all managed components.
	TraceProvider       trace.TracerProvider                   // Tracer shared between all managed components.
	DataPath            string                                 // Shared directory where component data may be stored
	OnComponentUpdate   func(cn *ComponentNode)                // Informs controller that we need to reevaluate
	OnExportsChange     func(exports map[string]any)           // Invoked when the managed component updated its exports
	Registerer          prometheus.Registerer                  // Registerer for serving agent and component metrics
	ControllerID        string                                 // ID of controller.
	NewModuleController func(id string) ModuleController       // Func to generate a module controller.
	GetServiceData      func(name string) (interface{}, error) // Get data for a service.
}

ComponentGlobals are used by ComponentNodes to build managed components. All ComponentNodes should use the same ComponentGlobals.

type ComponentID

type ComponentID []string

ComponentID is a fully-qualified name of a component. Each element in ComponentID corresponds to a fragment of the period-delimited string; "remote.http.example" is ComponentID{"remote", "http", "example"}.

func BlockComponentID

func BlockComponentID(b *ast.BlockStmt) ComponentID

BlockComponentID returns the ComponentID specified by an River block.

func (ComponentID) Equals

func (id ComponentID) Equals(other ComponentID) bool

Equals returns true if id == other.

func (ComponentID) String

func (id ComponentID) String() string

String returns the string representation of a component ID.

type ComponentNode

type ComponentNode struct {
	OnComponentUpdate func(cn *ComponentNode) // Informs controller that we need to reevaluate
	// contains filtered or unexported fields
}

ComponentNode is a controller node which manages a user-defined component.

ComponentNode manages the underlying component and caches its current arguments and exports. ComponentNode manages the arguments for the component from a River block.

func NewComponentNode

func NewComponentNode(globals ComponentGlobals, reg component.Registration, b *ast.BlockStmt) *ComponentNode

NewComponentNode creates a new ComponentNode from an initial ast.BlockStmt. The underlying managed component isn't created until Evaluate is called.

func (*ComponentNode) Arguments

func (cn *ComponentNode) Arguments() component.Arguments

Arguments returns the current arguments of the managed component.

func (*ComponentNode) Block added in v0.33.0

func (cn *ComponentNode) Block() *ast.BlockStmt

Block implements BlockNode and returns the current block of the managed component.

func (*ComponentNode) Component added in v0.35.0

func (cn *ComponentNode) Component() component.Component

Component returns the instance of the managed component. Component may be nil if the ComponentNode has not been successfully evaluated yet.

func (*ComponentNode) ComponentName added in v0.28.0

func (cn *ComponentNode) ComponentName() string

ComponentName returns the component's type, i.e. `local.file.test` returns `local.file`.

func (*ComponentNode) CurrentHealth

func (cn *ComponentNode) CurrentHealth() component.Health

CurrentHealth returns the current health of the ComponentNode.

The health of a ComponentNode is determined by combining:

  1. Health from the call to Run().
  2. Health from the last call to Evaluate().
  3. Health reported from the component.

func (*ComponentNode) DebugInfo

func (cn *ComponentNode) DebugInfo() interface{}

DebugInfo returns debugging information from the managed component (if any).

func (*ComponentNode) Evaluate

func (cn *ComponentNode) Evaluate(scope *vm.Scope) error

Evaluate implements BlockNode and updates the arguments for the managed component by re-evaluating its River block with the provided scope. The managed component will be built the first time Evaluate is called.

Evaluate will return an error if the River block cannot be evaluated or if decoding to arguments fails.

func (*ComponentNode) Exports

func (cn *ComponentNode) Exports() component.Exports

Exports returns the current set of exports from the managed component. Exports returns nil if the managed component does not have exports.

func (*ComponentNode) ID

func (cn *ComponentNode) ID() ComponentID

ID returns the component ID of the managed component from its River block.

func (*ComponentNode) Label added in v0.28.0

func (cn *ComponentNode) Label() string

Label returns the label for the block or "" if none was specified.

func (*ComponentNode) ModuleIDs added in v0.35.0

func (cn *ComponentNode) ModuleIDs() []string

ModuleIDs returns the current list of modules that this component is managing.

func (*ComponentNode) NodeID

func (cn *ComponentNode) NodeID() string

NodeID implements dag.Node and returns the unique ID for this node. The NodeID is the string representation of the component's ID from its River block.

func (*ComponentNode) Registration added in v0.35.0

func (cn *ComponentNode) Registration() component.Registration

Registration returns the original registration of the component.

func (*ComponentNode) Run

func (cn *ComponentNode) Run(ctx context.Context) error

Run runs the managed component in the calling goroutine until ctx is canceled. Evaluate must have been called at least once without returning an error before calling Run.

Run will immediately return ErrUnevaluated if Evaluate has never been called successfully. Otherwise, Run will return nil.

func (*ComponentNode) UpdateBlock

func (cn *ComponentNode) UpdateBlock(b *ast.BlockStmt)

UpdateBlock updates the River block used to construct arguments for the managed component. The new block isn't used until the next time Evaluate is invoked.

UpdateBlock will panic if the block does not match the component ID of the ComponentNode.

type ComponentRegistry added in v0.36.0

type ComponentRegistry interface {
	// Get looks up a component by name.
	Get(name string) (component.Registration, bool)
}

ComponentRegistry is a collection of registered components.

type ConfigNodeMap added in v0.34.0

type ConfigNodeMap struct {
	// contains filtered or unexported fields
}

ConfigNodeMap represents the config BlockNodes in their explicit types. This is helpful when validating node conditions specific to config node types.

func NewConfigNodeMap added in v0.34.0

func NewConfigNodeMap() *ConfigNodeMap

NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called to populate NewConfigNodeMap.

func (*ConfigNodeMap) Append added in v0.34.0

func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics

Append will add a config node to the ConfigNodeMap. This will overwrite values on the ConfigNodeMap that are matched and previously set.

func (*ConfigNodeMap) Validate added in v0.34.0

func (nodeMap *ConfigNodeMap) Validate(isInModule bool, args map[string]any) diag.Diagnostics

Validate wraps all validators for ConfigNodeMap.

func (*ConfigNodeMap) ValidateModuleConstraints added in v0.34.0

func (nodeMap *ConfigNodeMap) ValidateModuleConstraints(isInModule bool) diag.Diagnostics

ValidateModuleConstraints will make sure config blocks with module constraints get followed.

func (*ConfigNodeMap) ValidateUnsupportedArguments added in v0.34.0

func (nodeMap *ConfigNodeMap) ValidateUnsupportedArguments(args map[string]any) diag.Diagnostics

ValidateUnsupportedArguments will validate each provided argument is supported in the config.

type DefaultComponentRegistry added in v0.36.0

type DefaultComponentRegistry struct{}

DefaultComponentRegistry is the default ComponentRegistry which gets components registered to github.com/grafana/agent/component.

func (DefaultComponentRegistry) Get added in v0.36.0

Get retrieves a component using component.Get.

type DialFunc added in v0.34.0

type DialFunc func(ctx context.Context, network, address string) (net.Conn, error)

DialFunc is a function to establish a network connection.

type ExportConfigNode added in v0.33.0

type ExportConfigNode struct {
	// contains filtered or unexported fields
}

func NewExportConfigNode added in v0.33.0

func NewExportConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ExportConfigNode

NewExportConfigNode creates a new ExportConfigNode from an initial ast.BlockStmt. The underlying config isn't applied until Evaluate is called.

func (*ExportConfigNode) Block added in v0.33.0

func (cn *ExportConfigNode) Block() *ast.BlockStmt

Block implements BlockNode and returns the current block of the managed config node.

func (*ExportConfigNode) Evaluate added in v0.33.0

func (cn *ExportConfigNode) Evaluate(scope *vm.Scope) error

Evaluate implements BlockNode and updates the arguments for the managed config block by re-evaluating its River block with the provided scope. The managed config block will be built the first time Evaluate is called.

Evaluate will return an error if the River block cannot be evaluated or if decoding to arguments fails.

func (*ExportConfigNode) Label added in v0.34.0

func (cn *ExportConfigNode) Label() string

func (*ExportConfigNode) NodeID added in v0.33.0

func (cn *ExportConfigNode) NodeID() string

NodeID implements dag.Node and returns the unique ID for the config node.

func (*ExportConfigNode) Value added in v0.34.0

func (cn *ExportConfigNode) Value() any

Value returns the value of the export.

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

The Loader builds and evaluates ComponentNodes from River blocks.

func NewLoader

func NewLoader(opts LoaderOptions) *Loader

NewLoader creates a new Loader. Components built by the Loader will be built with co for their options.

func (*Loader) Apply

func (l *Loader) Apply(args map[string]any, componentBlocks []*ast.BlockStmt, configBlocks []*ast.BlockStmt) diag.Diagnostics

Apply loads a new set of components into the Loader. Apply will drop any previously loaded component which is not described in the set of River blocks.

Apply will reuse existing components if there is an existing component which matches the component ID specified by any of the provided River blocks. Reused components will be updated to point at the new River block.

Apply will perform an evaluation of all loaded components before returning. The provided parentContext can be used to provide global variables and functions to components. A child context will be constructed from the parent to expose values of other components.

func (*Loader) Cleanup added in v0.35.0

func (l *Loader) Cleanup(stopWorkerPool bool)

Cleanup unregisters any existing metrics and optionally stops the worker pool.

func (*Loader) Components

func (l *Loader) Components() []*ComponentNode

Components returns the current set of loaded components.

func (*Loader) EvaluateDependants added in v0.39.0

func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*ComponentNode)

EvaluateDependants sends components which depend directly on components in updatedNodes for evaluation to the workerPool. It should be called whenever components update their exports. It is beneficial to call EvaluateDependants with a batch of components, as it will enqueue the entire batch before the worker pool starts to evaluate them, resulting in smaller number of total evaluations when node updates are frequent. If the worker pool's queue is full, EvaluateDependants will retry with a backoff until it succeeds or until the ctx is cancelled.

func (*Loader) Graph

func (l *Loader) Graph() *dag.Graph

Graph returns a copy of the DAG managed by the Loader.

func (*Loader) OriginalGraph added in v0.28.0

func (l *Loader) OriginalGraph() *dag.Graph

OriginalGraph returns a copy of the graph before Reduce was called. This can be used if you want to show a UI of the original graph before the reduce function was called.

func (*Loader) Services added in v0.36.0

func (l *Loader) Services() []*ServiceNode

Services returns the current set of service nodes.

func (*Loader) Variables added in v0.27.0

func (l *Loader) Variables() map[string]interface{}

Variables returns the Variables the Loader exposes for other Flow components to reference.

type LoaderOptions added in v0.36.0

type LoaderOptions struct {
	// ComponentGlobals contains data to use when creating components.
	ComponentGlobals ComponentGlobals

	Services          []service.Service // Services to load into the DAG.
	Host              service.Host      // Service host (when running services).
	ComponentRegistry ComponentRegistry // Registry to search for components.
	WorkerPool        worker.Pool       // Worker pool to use for async tasks.
}

LoaderOptions holds options for creating a Loader.

type LoggingConfigNode added in v0.33.0

type LoggingConfigNode struct {
	// contains filtered or unexported fields
}

func NewDefaultLoggingConfigNode added in v0.33.0

func NewDefaultLoggingConfigNode(globals ComponentGlobals) *LoggingConfigNode

NewDefaultLoggingConfigNode creates a new LoggingConfigNode with nil block and eval. This will force evaluate to use the default logging options for this node.

func NewLoggingConfigNode added in v0.33.0

func NewLoggingConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *LoggingConfigNode

NewLoggingConfigNode creates a new LoggingConfigNode from an initial ast.BlockStmt. The underlying config isn't applied until Evaluate is called.

func (*LoggingConfigNode) Block added in v0.33.0

func (cn *LoggingConfigNode) Block() *ast.BlockStmt

Block implements BlockNode and returns the current block of the managed config node.

func (*LoggingConfigNode) Evaluate added in v0.33.0

func (cn *LoggingConfigNode) Evaluate(scope *vm.Scope) error

Evaluate implements BlockNode and updates the arguments for the managed config block by re-evaluating its River block with the provided scope. The managed config block will be built the first time Evaluate is called.

Evaluate will return an error if the River block cannot be evaluated or if decoding to arguments fails.

func (*LoggingConfigNode) NodeID added in v0.33.0

func (cn *LoggingConfigNode) NodeID() string

NodeID implements dag.Node and returns the unique ID for the config node.

type ModuleController added in v0.35.0

type ModuleController interface {
	component.ModuleController

	// ModuleIDs returns the list of managed modules in unspecified order.
	ModuleIDs() []string
}

ModuleController is a lower-level interface for module controllers which allows probing for the list of managed modules.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a thread-safe, insertion-ordered set of components.

Queue is intended for tracking components that have updated their Exports for later reevaluation.

func NewQueue

func NewQueue() *Queue

NewQueue returns a new queue.

func (*Queue) Chan

func (q *Queue) Chan() <-chan struct{}

Chan returns a channel which is written to when the queue is non-empty.

func (*Queue) DequeueAll added in v0.37.0

func (q *Queue) DequeueAll() []*ComponentNode

DequeueAll removes all components from the queue and returns them.

func (*Queue) Enqueue

func (q *Queue) Enqueue(c *ComponentNode)

Enqueue inserts a new component into the Queue. Enqueue is a no-op if the component is already in the Queue.

type Reference

type Reference struct {
	Target BlockNode // BlockNode being referenced

	// Traversal describes which nested field relative to Target is being
	// accessed.
	Traversal Traversal
}

Reference describes an River expression reference to a BlockNode.

func ComponentReferences

func ComponentReferences(cn dag.Node, g *dag.Graph) ([]Reference, diag.Diagnostics)

ComponentReferences returns the list of references a component is making to other components.

type RegistryMap added in v0.36.0

type RegistryMap map[string]component.Registration

RegistryMap is a map which implements ComponentRegistry.

func (RegistryMap) Get added in v0.36.0

func (m RegistryMap) Get(name string) (component.Registration, bool)

Get retrieves a component using component.Get.

type RunnableNode

type RunnableNode interface {
	NodeID() string
	Run(ctx context.Context) error
}

RunnableNode is any dag.Node which can also be run.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler runs components.

func NewScheduler

func NewScheduler() *Scheduler

NewScheduler creates a new Scheduler. Call Synchronize to manage the set of components which are running.

Call Close to stop the Scheduler and all running components.

func (*Scheduler) Close

func (s *Scheduler) Close() error

Close stops the Scheduler and returns after all running goroutines have exited.

func (*Scheduler) Synchronize

func (s *Scheduler) Synchronize(rr []RunnableNode) error

Synchronize synchronizes the running components to those defined by rr.

New RunnableNodes will be launched as new goroutines. RunnableNodes already managed by Scheduler will be kept running, while running RunnableNodes that are not in rr will be shut down and removed.

Existing components will be restarted if they stopped since the previous call to Synchronize.

type ServiceMap added in v0.36.0

type ServiceMap map[string]service.Service

ServiceMap is a map of service name to services.

func NewServiceMap added in v0.36.0

func NewServiceMap(services []service.Service) ServiceMap

NewServiceMap creates a ServiceMap from a slice of services.

func (ServiceMap) FilterByName added in v0.36.0

func (sm ServiceMap) FilterByName(keepNames []string) ServiceMap

FilterByName creates a new ServiceMap where services that are not defined in keepNames are removed.

func (ServiceMap) Get added in v0.36.0

func (sm ServiceMap) Get(name string) (svc service.Service, found bool)

Get looks up a service by name.

func (ServiceMap) List added in v0.36.0

func (sm ServiceMap) List() []service.Service

List returns a slice of all the services.

type ServiceNode added in v0.36.0

type ServiceNode struct {
	// contains filtered or unexported fields
}

ServiceNode is a Flow DAG node which represents a running service.

func NewServiceNode added in v0.36.0

func NewServiceNode(host service.Host, svc service.Service) *ServiceNode

NewServiceNode creates a new instance of a ServiceNode from an instance of a Service. The provided host is used when running the service.

func (*ServiceNode) Block added in v0.36.0

func (sn *ServiceNode) Block() *ast.BlockStmt

Block implements BlockNode. It returns nil, since ServiceNodes don't have associated configs.

func (*ServiceNode) Definition added in v0.36.0

func (sn *ServiceNode) Definition() service.Definition

Definition returns the service definition associated with the node.

func (*ServiceNode) Evaluate added in v0.36.0

func (sn *ServiceNode) Evaluate(scope *vm.Scope) error

Evaluate implements BlockNode, evaluating the configuration for a service. Evalute returns an error if the service doesn't support being configured and the ServiceNode has an associated block from a call to UpdateBlock.

func (*ServiceNode) NodeID added in v0.36.0

func (sn *ServiceNode) NodeID() string

NodeID returns the ID of the ServiceNode, which is equal to the service's name.

func (*ServiceNode) Run added in v0.36.0

func (sn *ServiceNode) Run(ctx context.Context) error

func (*ServiceNode) Service added in v0.36.0

func (sn *ServiceNode) Service() service.Service

Service returns the service instance associated with the node.

func (*ServiceNode) UpdateBlock added in v0.36.0

func (sn *ServiceNode) UpdateBlock(b *ast.BlockStmt)

UpdateBlock updates the River block used to construct arguments for the service. The new block isn't used until the next time Evaluate is called.

UpdateBlock will panic if the block does not match the ID of the ServiceNode.

Call UpdateBlock with a nil block to remove the block associated with the ServiceNode.

type TracingConfigNode added in v0.33.0

type TracingConfigNode struct {
	// contains filtered or unexported fields
}

func NewDefaulTracingConfigNode added in v0.33.0

func NewDefaulTracingConfigNode(globals ComponentGlobals) *TracingConfigNode

NewDefaulTracingConfigNode creates a new TracingConfigNode with nil block and eval. This will force evaluate to use the default tracing options for this node.

func NewTracingConfigNode added in v0.33.0

func NewTracingConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *TracingConfigNode

NewTracingConfigNode creates a new TracingConfigNode from an initial ast.BlockStmt. The underlying config isn't applied until Evaluate is called.

func (*TracingConfigNode) Block added in v0.33.0

func (cn *TracingConfigNode) Block() *ast.BlockStmt

Block implements BlockNode and returns the current block of the managed config node.

func (*TracingConfigNode) Evaluate added in v0.33.0

func (cn *TracingConfigNode) Evaluate(scope *vm.Scope) error

Evaluate implements BlockNode and updates the arguments for the managed config block by re-evaluating its River block with the provided scope. The managed config block will be built the first time Evaluate is called.

Evaluate will return an error if the River block cannot be evaluated or if decoding to arguments fails.

func (*TracingConfigNode) NodeID added in v0.33.0

func (cn *TracingConfigNode) NodeID() string

NodeID implements dag.Node and returns the unique ID for the config node.

type Traversal added in v0.27.0

type Traversal []*ast.Ident

Traversal describes accessing a sequence of fields relative to a component. Traversal only include uninterrupted sequences of field accessors; for an expression "component.field_a.field_b.field_c[0].inner_field", the Traversal will be (field_a, field_b, field_c).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL