Documentation ¶
Overview ¶
Package component describes the interfaces which Flow components implement.
A Flow component is a distinct piece of business logic that accepts inputs (Arguments) for its configuration and can optionally export a set of outputs (Exports).
Arguments and Exports do not need to be static for the lifetime of a component. A component will be given a new Config if the runtime configuration changes. A component may also update its Exports throughout its lifetime, such as a component which outputs the current day of the week.
Components are built by users with River, where they can use River expressions to refer to any input or exported field from other components. This allows users to connect components together to declaratively form a pipeline.
Defining Arguments and Exports structs ¶
Arguments and Exports implemented by new components must be able to be encoded to and from River. "river" struct field tags are used for encoding; refer to the package documentation at pkg/river for a description of how to write these tags.
The set of River element names of a given component's Arguments and Exports types must not overlap. Additionally, the following River field and block names are reserved for use by the Flow controller:
- for_each
- enabled
- health
- debug
Default values for Arguments may be provided by implementing river.Unmarshaler.
Arguments and Exports immutability ¶
Arguments passed to a component should be treated as immutable, as memory can be shared between components as an optimization. Components should make copies for fields they need to modify. An exception to this is for fields which are expected to be mutable (e.g., interfaces which expose a goroutine-safe API).
Similarly, Exports and the fields within Exports must be considered immutable after they are written for the same reason.
Mapping River strings to custom types ¶
Custom encoding and decoding of fields is available by implementing encoding.TextMarshaler and encoding.TextUnmarshaler. Types implementing these interfaces will be represented as strings in River.
Component registration ¶
Components are registered globally by calling Register. These components are then made available by including them in the import path. The "all" child package imports all known component packages and should be updated when creating a new one.
Index ¶
- Variables
- func AllNames() []string
- func Register(r Registration)
- type Arguments
- type Component
- type DebugComponent
- type ExportFunc
- type Exports
- type Health
- type HealthComponent
- type HealthType
- type ID
- type Info
- type InfoOptions
- type Module
- type ModuleController
- type Options
- type Provider
- type Registration
Constants ¶
This section is empty.
Variables ¶
var ( // ErrComponentNotFound is returned by [Provider.GetComponent] when the // specified component isn't found. ErrComponentNotFound = errors.New("component not found") // ErrModuleNotFound is returned by [Provider.ListComponents] when the // specified module isn't found. ErrModuleNotFound = errors.New("module not found") )
Functions ¶
func Register ¶
func Register(r Registration)
Register registers a component. Register will panic if the name is in use by another component, if the name is invalid, or if the component name has a suffix length mismatch with an existing component.
Types ¶
type Arguments ¶
type Arguments interface{}
The Arguments contains the input fields for a specific component, which is unmarshaled from River.
Refer to the package documentation for details around how to build proper Arguments implementations.
type Component ¶
type Component interface { // Run starts the component, blocking until ctx is canceled or the component // suffers a fatal error. Run is guaranteed to be called exactly once per // Component. // // Implementations of Component should perform any necessary cleanup before // returning from Run. Run(ctx context.Context) error // Update provides a new Config to the component. The type of newConfig will // always match the struct type which the component registers. // // Update will be called concurrently with Run. The component must be able to // gracefully handle updating its config while still running. // // An error may be returned if the provided config is invalid. Update(args Arguments) error }
Component is the base interface for a Flow component. Components may implement extension interfaces (named <Extension>Component) to implement extra known behavior.
type DebugComponent ¶
type DebugComponent interface { Component // DebugInfo returns the current debug information of the component. May // return nil if there is no debug info to currently report. The result of // DebugInfo must be encodable to River like Arguments and Exports. // // Values from DebugInfo are not exposed to other components for use in // expressions. // // DebugInfo must be safe for calling concurrently. DebugInfo() interface{} }
DebugComponent is an extension interface for components which can report debugging information upon request.
type ExportFunc ¶ added in v0.35.0
ExportFunc is used for onExport of the Module
type Exports ¶
type Exports interface{}
Exports contains the current set of outputs for a specific component, which is then marshaled to River.
Refer to the package documentation for details around how to build proper Exports implementations.
type Health ¶
type Health struct { // The specific health value. Health HealthType `river:"state,attr"` // An optional message to describe the health; useful to say why a component // is unhealthy. Message string `river:"message,attr,optional"` // An optional time to indicate when the component last modified something // which updated its health. UpdateTime time.Time `river:"update_time,attr,optional"` }
Health is the reported health state of a component. It can be encoded to River.
func LeastHealthy ¶ added in v0.33.0
LeastHealthy returns the Health from the provided arguments which is considered to be the least healthy.
Health types are first prioritized by HealthTypeExited, followed by HealthTypeUnhealthy, HealthTypeUnknown, and HealthTypeHealthy.
If multiple arguments have the same Health type, the Health with the most recent timestamp is returned.
Finally, if multiple arguments have the same Health type and the same timestamp, the earlier argument is chosen.
type HealthComponent ¶
type HealthComponent interface { Component // CurrentHealth returns the current Health status for the component. // // CurrentHealth may be overridden by the Flow controller if there is a // higher-level issue, such as a config file being invalid or a Component // shutting down unexpectedly. CurrentHealth() Health }
HealthComponent is an optional extension interface for Components which report health information.
Health information is exposed to the end user for informational purposes and cannot be referened in a River expression.
type HealthType ¶
type HealthType uint8
HealthType holds the health value for a component.
const ( // HealthTypeUnknown is the initial health of components, set when they're // first created. HealthTypeUnknown HealthType = iota // HealthTypeHealthy represents a component which is working as expected. HealthTypeHealthy // HealthTypeUnhealthy represents a component which is not working as // expected. HealthTypeUnhealthy // HealthTypeExited represents a component which has stopped running. HealthTypeExited )
func (HealthType) MarshalText ¶
func (ht HealthType) MarshalText() (text []byte, err error)
MarshalText implements encoding.TextMarshaler.
func (HealthType) String ¶
func (ht HealthType) String() string
String returns the string representation of ht.
func (*HealthType) UnmarshalText ¶
func (ht *HealthType) UnmarshalText(text []byte) error
UnmarshalText implements encoding.TextUnmarshaler.
type ID ¶ added in v0.35.0
type ID struct { ModuleID string // Unique ID of the module that the component is running in. LocalID string // Local ID of the component, unique to the module it is running in. }
ID is a globally unique identifier for a component.
type Info ¶ added in v0.35.0
type Info struct { // Component is the instance of the component. Component may be nil if a // component exists in the controller's DAG but it has not been successfully // evaluated yet. Component Component // ModuleIDs includes the list of current module IDs that the component is // running. Module IDs are always globally unique. // // The sort order of the list is not guaranteed. ModuleIDs []string ID ID // ID of the component. Label string // Component label. Not set for singleton components. // References and ReferencedBy are the list of IDs in the same module that // this component depends on, or is depended on by, respectively. References, ReferencedBy []string ComponentName string // Name of the component. Health Health // Current component health. Arguments Arguments // Current arguments value of the component. Exports Exports // Current exports value of the component. DebugInfo interface{} // Current debug info of the component. }
Info ia detailed information about a component.
func GetAllComponents ¶ added in v0.35.0
func GetAllComponents(p Provider, opts InfoOptions) []*Info
GetAllComponents enumerates over all of the modules in p and returns the set of all components.
func (*Info) MarshalJSON ¶ added in v0.35.0
MarshalJSON returns a JSON representation of cd. The format of the representation is not stable and is subject to change.
type InfoOptions ¶ added in v0.35.0
type InfoOptions struct { GetHealth bool // When true, sets the Health field of returned components. GetArguments bool // When true, sets the Arguments field of returned components. GetExports bool // When true, sets the Exports field of returned components. GetDebugInfo bool // When true, sets the DebugInfo field of returned components. }
InfoOptions is used by to determine how much information to return with Info.
type Module ¶ added in v0.35.0
type Module interface { // LoadConfig parses River config and loads it into the Module. // LoadConfig can be called multiple times, and called prior to // [Module.Run]. LoadConfig(config []byte, args map[string]any) error // Run starts the Module. No components within the Module // will be run until Run is called. // // Run blocks until the provided context is canceled. The ID of a module as defined in // ModuleController.NewModule will not be released until Run returns. Run(context.Context) error }
Module is a controller for running components within a Module.
type ModuleController ¶ added in v0.35.0
type ModuleController interface { // NewModule creates a new, un-started Module with a given ID. Multiple calls to // NewModule must provide unique values for id. The empty string is a valid unique // value for id. // // If id is non-empty, it must be a valid River identifier, matching the // regex /[A-Za-z_][A-Za-z0-9_]/. NewModule(id string, export ExportFunc) (Module, error) }
ModuleController is a mechanism responsible for allowing components to create other components via modules.
type Options ¶
type Options struct { // ModuleController allows for the creation of modules. ModuleController ModuleController // ID of the component. Guaranteed to be globally unique across all running // components. ID string // Logger the component may use for logging. Logs emitted with the logger // always include the component ID as a field. Logger log.Logger // A path to a directory with this component may use for storage. The path is // guaranteed to be unique across all running components. // // The directory may not exist when the component is created; components // should create the directory if needed. DataPath string // OnStateChange may be invoked at any time by a component whose Export value // changes. The Flow controller then will queue re-processing components // which depend on the changed component. // // OnStateChange will panic if e does not match the Exports type registered // by the component; a component must use the same Exports type for its // lifetime. OnStateChange func(e Exports) // Registerer allows components to add their own metrics. The registerer will // come pre-wrapped with the component ID. It is not necessary for components // to unregister metrics on shutdown. Registerer prometheus.Registerer // Tracer allows components to record spans. The tracer will include an // attribute denoting the component ID. Tracer trace.TracerProvider // GetServiceData retrieves data for a service by calling // [service.Service.Data] for the specified service. // // GetServiceData will return an error if the service does not exist. // // The result of GetServiceData may be cached as the value will not change at // runtime. GetServiceData func(name string) (interface{}, error) }
Options are provided to a component when it is being constructed. Options are static for the lifetime of a component.
type Provider ¶ added in v0.35.0
type Provider interface { // GetComponent returns information about an individual running component // given its global ID. The provided opts field configures how much detail to // return; see [InfoOptions] for more information. // // GetComponent returns ErrComponentNotFound if a component is not found. GetComponent(id ID, opts InfoOptions) (*Info, error) // ListComponents returns the list of active components. The provided opts // field configures how much detail to return; see [InfoOptions] for more // information. // // Returns ErrModuleNotFound if the provided moduleID doesn't exist. ListComponents(moduleID string, opts InfoOptions) ([]*Info, error) }
A Provider is a system which exposes a list of running components.
type Registration ¶
type Registration struct { // Name of the component. Must be a list of period-delimited valid // identifiers, such as "remote.s3". Components sharing a prefix must have // the same number of identifiers; it is valid to register "remote.s3" and // "remote.http" but not "remote". // // Components may not have more than 2 identifiers. // // Each identifier must start with a valid ASCII letter, and be followed by // any number of underscores or alphanumeric ASCII characters. Name string // An example Arguments value that the registered component expects to // receive as input. Components should provide the zero value of their // Arguments type here. Args Arguments // An example Exports value that the registered component may emit as output. // A component which does not expose exports must leave this set to nil. Exports Exports // Build should construct a new component from an initial Arguments and set // of options. Build func(opts Options, args Arguments) (Component, error) }
Registration describes a single component.
func (Registration) CloneArguments ¶
func (r Registration) CloneArguments() Arguments
CloneArguments returns a new zero value of the registered Arguments type.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package all imports all known component packages.
|
Package all imports all known component packages. |
common
|
|
config
Package config contains types from github.com/prometheus/common/config, but modifies them to be serializable with River.
|
Package config contains types from github.com/prometheus/common/config, but modifies them to be serializable with River. |
net
Package http contains a River serializable definition of the dskit config in https://github.com/grafana/dskit/blob/main/server/server.go#L72.
|
Package http contains a River serializable definition of the dskit config in https://github.com/grafana/dskit/blob/main/server/server.go#L72. |
docker
Package docker implements the discovery.docker component.
|
Package docker implements the discovery.docker component. |
gce
Package gce implements the discovery.gce component.
|
Package gce implements the discovery.gce component. |
kubelet
Package kubelet implements a discovery.kubelet component.
|
Package kubelet implements a discovery.kubelet component. |
kubernetes
Package kubernetes implements a discovery.kubernetes component.
|
Package kubernetes implements a discovery.kubernetes component. |
faro
|
|
local
|
|
loki
|
|
process
The code in this package is adapted/ported over from grafana/loki/clients/pkg/logentry.
|
The code in this package is adapted/ported over from grafana/loki/clients/pkg/logentry. |
source/kubernetes
Package kubernetes implements the loki.source.kubernetes component.
|
Package kubernetes implements the loki.source.kubernetes component. |
source/kubernetes/kubetail
Package kubetail implements a log file tailer using the Kubernetes API.
|
Package kubetail implements a log file tailer using the Kubernetes API. |
source/kubernetes_events
Package kubernetes_events implements the loki.source.kubernetes_events component.
|
Package kubernetes_events implements the loki.source.kubernetes_events component. |
mimir
|
|
git
Package git implements the module.git component.
|
Package git implements the module.git component. |
auth
Package auth provides utilities to create a Flow component from OpenTelemetry Collector authentication extensions.
|
Package auth provides utilities to create a Flow component from OpenTelemetry Collector authentication extensions. |
auth/basic
Package basic provides an otelcol.auth.basic component.
|
Package basic provides an otelcol.auth.basic component. |
auth/bearer
Package bearer provides an otelcol.auth.bearer component.
|
Package bearer provides an otelcol.auth.bearer component. |
auth/headers
Package headers provides an otelcol.auth.headers component.
|
Package headers provides an otelcol.auth.headers component. |
connector
Package connector exposes utilities to create a Flow component from OpenTelemetry Collector connectors.
|
Package connector exposes utilities to create a Flow component from OpenTelemetry Collector connectors. |
connector/host_info
Package host_info provides an otelcol.connector.host_info component.
|
Package host_info provides an otelcol.connector.host_info component. |
connector/spanlogs
Package spanlogs provides an otelcol.connector.spanlogs component.
|
Package spanlogs provides an otelcol.connector.spanlogs component. |
connector/spanmetrics
Package spanmetrics provides an otelcol.connector.spanmetrics component.
|
Package spanmetrics provides an otelcol.connector.spanmetrics component. |
exporter
Package exporter exposes utilities to create a Flow component from OpenTelemetry Collector exporters.
|
Package exporter exposes utilities to create a Flow component from OpenTelemetry Collector exporters. |
exporter/loadbalancing
Package loadbalancing provides an otelcol.exporter.loadbalancing component.
|
Package loadbalancing provides an otelcol.exporter.loadbalancing component. |
exporter/logging
Package logging provides an otelcol.exporter.logging component.
|
Package logging provides an otelcol.exporter.logging component. |
exporter/loki
Package loki provides an otelcol.exporter.loki component.
|
Package loki provides an otelcol.exporter.loki component. |
exporter/loki/internal/convert
Package convert implements conversion utilities to convert between OpenTelemetry Collector and Loki data.
|
Package convert implements conversion utilities to convert between OpenTelemetry Collector and Loki data. |
exporter/otlp
Package otlp provides an otelcol.exporter.otlp component.
|
Package otlp provides an otelcol.exporter.otlp component. |
exporter/otlphttp
Package otlphttp provides an otelcol.exporter.otlphttp component.
|
Package otlphttp provides an otelcol.exporter.otlphttp component. |
exporter/prometheus
Package prometheus provides an otelcol.exporter.prometheus component.
|
Package prometheus provides an otelcol.exporter.prometheus component. |
exporter/prometheus/internal/convert
Package convert implements conversion utilities to convert between OpenTelemetry Collector data and Prometheus data.
|
Package convert implements conversion utilities to convert between OpenTelemetry Collector data and Prometheus data. |
extension
Package extension provides utilities to create a Flow component from OpenTelemetry Collector extensions.
|
Package extension provides utilities to create a Flow component from OpenTelemetry Collector extensions. |
internal/lazyconsumer
Package lazyconsumer implements a lazy OpenTelemetry Collector consumer which can lazily forward request to another consumer implementation.
|
Package lazyconsumer implements a lazy OpenTelemetry Collector consumer which can lazily forward request to another consumer implementation. |
internal/scheduler
Package scheduler exposes utilities for scheduling and running OpenTelemetry Collector components.
|
Package scheduler exposes utilities for scheduling and running OpenTelemetry Collector components. |
processor
Package processor exposes utilities to create a Flow component from OpenTelemetry Collector processors.
|
Package processor exposes utilities to create a Flow component from OpenTelemetry Collector processors. |
processor/attributes
Package attributes provides an otelcol.processor.attributes component.
|
Package attributes provides an otelcol.processor.attributes component. |
processor/batch
Package batch provides an otelcol.processor.batch component.
|
Package batch provides an otelcol.processor.batch component. |
processor/discovery
Package discovery provides an otelcol.processor.discovery component.
|
Package discovery provides an otelcol.processor.discovery component. |
processor/k8sattributes
Package attributes provides an otelcol.processor.k8sattributes component.
|
Package attributes provides an otelcol.processor.k8sattributes component. |
processor/memorylimiter
Package memorylimiter provides an otelcol.processor.memory_limiter component.
|
Package memorylimiter provides an otelcol.processor.memory_limiter component. |
processor/probabilistic_sampler
Package probabilistic_sampler provides an otelcol.processor.probabilistic_sampler component.
|
Package probabilistic_sampler provides an otelcol.processor.probabilistic_sampler component. |
processor/span
Package span provides an otelcol.processor.span component.
|
Package span provides an otelcol.processor.span component. |
processor/tail_sampling
Package tail_sampling provides an otelcol.processor.tail_sampling component.
|
Package tail_sampling provides an otelcol.processor.tail_sampling component. |
processor/transform
Package transform provides an otelcol.processor.transform component.
|
Package transform provides an otelcol.processor.transform component. |
receiver
Package receiver utilities to create a Flow component from OpenTelemetry Collector receivers.
|
Package receiver utilities to create a Flow component from OpenTelemetry Collector receivers. |
receiver/jaeger
Package jaeger provides an otelcol.receiver.jaeger component.
|
Package jaeger provides an otelcol.receiver.jaeger component. |
receiver/kafka
Package kafka provides an otelcol.receiver.kafka component.
|
Package kafka provides an otelcol.receiver.kafka component. |
receiver/loki
Package loki provides an otelcol.receiver.loki component.
|
Package loki provides an otelcol.receiver.loki component. |
receiver/opencensus
Package opencensus provides an otelcol.receiver.opencensus component.
|
Package opencensus provides an otelcol.receiver.opencensus component. |
receiver/otlp
Package otlp provides an otelcol.receiver.otlp component.
|
Package otlp provides an otelcol.receiver.otlp component. |
receiver/prometheus
Package prometheus provides an otelcol.receiver.prometheus component.
|
Package prometheus provides an otelcol.receiver.prometheus component. |
receiver/prometheus/internal
Package internal is a near copy of https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.87.0/receiver/prometheusreceiver/internal A copy was made because the upstream package is internal.
|
Package internal is a near copy of https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.87.0/receiver/prometheusreceiver/internal A copy was made because the upstream package is internal. |
receiver/vcenter
Package vcenter provides an otelcol.receiver.vcenter component.
|
Package vcenter provides an otelcol.receiver.vcenter component. |
receiver/zipkin
Package zipkin provides an otelcol.receiver.zipkin component.
|
Package zipkin provides an otelcol.receiver.zipkin component. |
scrape/internal/fastdelta
Package fastdelta tries to match up samples between two pprof profiles and take their difference.
|
Package fastdelta tries to match up samples between two pprof profiles and take their difference. |
scrape/internal/pproflite
Package pproflite implements zero-allocation pprof encoding and decoding.
|
Package pproflite implements zero-allocation pprof encoding and decoding. |
remote
|
|
http
Package http implements the remote.http component.
|
Package http implements the remote.http component. |
kubernetes
Package kubernetes implements the logic for remote.kubernetes.secret and remote.kubernetes.configmap component.
|
Package kubernetes implements the logic for remote.kubernetes.secret and remote.kubernetes.configmap component. |