receivercreator

package module
v0.118.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 28 Imported by: 7

README

Receiver Creator

Status
Stability alpha: logs, traces
beta: metrics
Distributions contrib, k8s
Issues Open issues Closed issues
Code Owners @dmitryax
Emeritus @rmfitzpatrick

This receiver can instantiate other receivers at runtime based on whether observed endpoints match a configured rule. To use the receiver creator, you must first configure one or more observers that will discover networked endpoints that you may be interested in. The configured rules will be evaluated for each endpoint discovered. If the rule evaluates to true then the receiver for that rule will be started against the matched endpoint.

If you use the receiver creator in multiple pipelines of differing telemetry types, but a given dynamically instantiated receiver doesn't support one of the pipeline's type, it will effectively lead to a logged no-op that won't cause a collector service failure.

Configuration

watch_observers

A list of observers previously defined to be run in the extensions section. receiver_creator will watch for endpoints generated by these observers.

receivers

A map of receiver names (e.g. redis/1) to a template for when and how to instantiate that receiver.

receivers.<receiver_type/id>.rule

Rule expression using expvar syntax. Variables available are detailed below in Rule Expressions.

Note: The built-in type function introduced in v1.14.1 has been relocated to typeOf.

receivers.<receiver_type/id>.config

This is configuration that will be used when creating the receiver at runtime.

This option can use static and dynamic configuration values. Static values are normal YAML values. However, the value can also be dynamically constructed from the discovered endpoint object. Dynamic values are surrounded by backticks (`). If a literal backtick is needed use \` to escape it. Dynamic values can be used with static values in which case they are concatenated. For example:

config:
   secure_url: https://`pod.labels["secure_host"]`

The value of secure_url will be https:// concatenated with the value of the secure_host label.

This can also be used when the discovered endpoint needs to be changed dynamically. For instance, suppose the IP 1.2.3.4 is discovered without a port but the port needs to be set inside endpoint. You could do:

config:
   endpoint: '`endpoint`:8080'

If your target receiver provides an endpoint config field and you aren't manually setting it like the above example, the observer endpoint target value will automatically be sourced. If no endpoint field is available you are required to specify any necessary fields.

receivers.resource_attributes

resource_attributes:
  <endpoint type>:
    <attribute>: <attribute value>

This setting controls what resource attributes are set on telemetry emitted from the created receiver. These attributes can be set from values in the endpoint that was matched by the rule. These attributes vary based on the endpoint type. These defaults can be disabled by setting the attribute to be removed to an empty value. Note that the values can be dynamic and processed the same as in config.

Note that the backticks below are not typos--they indicate the value is set dynamically.

type == "pod"

Resource Attribute Default
k8s.pod.name `name`
k8s.pod.uid `uid`
k8s.namespace.name `namespace`

type == "port"

Resource Attribute Default
k8s.pod.name `pod.name`
k8s.pod.uid `pod.uid`
k8s.namespace.name `pod.namespace`

type == "pod.container"

Resource Attribute Default
k8s.pod.name `pod.name`
k8s.pod.uid `pod.uid`
k8s.namespace.name `pod.namespace`
container.name `name`
k8s.container.name `container_name`
container.image.name `container_image`
container.id `container_id`

type == "container"

Resource Attribute Default
container.name `name`
container.image.name `image`

type == "hostport"

None

type == "k8s.service"

Resource Attribute Default
k8s.namespace.name `namespace`

type == "k8s.node"

Resource Attribute Default
k8s.node.name `name`
k8s.node.uid `uid`

type == "k8s.ingress"

Resource Attribute Default
k8s.namespace.name `namespace`

See redis/2 in examples.

receivers.<receiver_type/id>.resource_attributes

receivers:
  <receiver_type>:
    resource_attributes:
      <attribute>: <attribute string value>

Similar to the per-endpoint type resource_attributes described above but for individual receiver instances. Duplicate attribute entries (including the empty string) in this receiver-specific mapping take precedence. These attribute values also support expansion from endpoint environment content. At this time their values must be strings.

Rule Expressions

Each rule must start with type == ("pod"|"port"|"pod.container"|"hostport"|"container"|"k8s.service"|"k8s.node"|"k8s.ingress") && such that the rule matches only one endpoint type. Depending on the type of endpoint the rule is targeting it will have different variables available.

Pod
Variable Description Data Type
type "pod" String
id ID of source endpoint String
name name of the pod String
namespace namespace of the pod String
uid unique id of the pod String
labels map of labels set on the pod Map with String key and value
annotations map of annotations set on the pod Map with String key and value
Port
Variable Description Data Type
type "port" String
id ID of source endpoint String
name container port name String
port port number Integer
protocol The transport protocol ("TCP" or "UDP") String
pod.name name of the owning pod String
pod.namespace namespace of the pod String
pod.uid unique id of the pod String
pod.labels map of labels of the owning pod Map with String key and value
pod.annotations map of annotations of the owning pod Map with String key and value
Pod Container
Variable Description Data Type
type "pod.container" String
id ID of source endpoint String
container_name container name String
container_id container id String
container_image container image String
pod.name name of the owning pod String
pod.namespace namespace of the pod String
pod.uid unique id of the pod String
pod.labels map of labels of the owning pod Map with String key and value
pod.annotations map of annotations of the owning pod Map with String key and value
Host Port
Variable Description Data Type
type "hostport" String
id ID of source endpoint String
process_name Name of the process String
command Command line with the used to invoke the process String
is_ipv6 true if endpoint is IPv6, otherwise false Boolean
port Port number Integer
transport The transport protocol ("TCP" or "UDP") String
Container
Variable Description Data Type
type "container" String
id ID of source endpoint String
name Primary name of the container String
image Name of the container image String
port Exposed port of the container Integer
alternate_port Exposed port accessed through redirection, such as a mapped port Integer
command The command used to invoke the process of the container String
container_id ID of the container String
host Hostname or IP of the underlying host the container is running on String
transport Transport protocol used by the endpoint (TCP or UDP) String
labels User-specified metadata labels on the container Map with String key and value
Kubernetes Service
Variable Description Data Type
type "k8s.service" String
id ID of source endpoint String
name The name of the Kubernetes service String
namespace The namespace of the service String
uid The unique ID for the service String
labels The map of labels set on the service Map with String key and value
annotations The map of annotations set on the service Map with String key and value
service_type The type of the kubernetes service: ClusterIP, NodePort, LoadBalancer, ExternalName String
cluster_ip The cluster IP assigned to the service String
Kubernetes Ingress
Variable Description Data Type
type "k8s.ingress" String
id ID of source endpoint String
name The name of the Kubernetes ingress String
namespace The namespace of the ingress String
uid The unique ID for the ingress String
labels The map of labels set on the ingress Map with String key and value
annotations The map of annotations set on the ingress Map with String key and value
scheme Scheme represents whether the ingress path is accessible via HTTPS or HTTP. String
host Host is the FQDN that map to backends String
path Path that map requests to backends String
Kubernetes Node
Variable Description Data Type
type "k8s.node" String
id ID of source endpoint String
name The name of the Kubernetes node String
uid The unique ID for the node String
hostname The node's hostname as reported by its Status object String
external_ip The node's external IP address as reported by its Status object String
internal_ip The node's internal IP address as reported by its Status object String
external_dns The node's external DNS record as reported by its Status object String
internal_dns The node's internal DNS record as reported by its Status object String
annotations A key-value map of non-identifying, user-specified node metadata Map with String key and value
labels A key-value map of user-specified node metadata Map with String key and value
kubelet_endpoint_port The node Status object's DaemonEndpoints.KubeletEndpoint.Port value Integer

Examples

extensions:
  # Configures the Kubernetes observer to watch for pod start and stop events.
  k8s_observer:
    observe_nodes: true
    observe_services: true
    observe_ingresses: true
  host_observer:

receivers:
  receiver_creator/1:
    # Name of the extensions to watch for endpoints to start and stop.
    watch_observers: [k8s_observer]
    receivers:
      prometheus_simple:
        # Configure prometheus scraping if standard prometheus annotations are set on the pod.
        rule: type == "pod" && annotations["prometheus.io/scrape"] == "true"
        config:
          metrics_path: '`"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/metrics"`'
          endpoint: '`endpoint`:`"prometheus.io/port" in annotations ? annotations["prometheus.io/port"] : 9090`'
        resource_attributes:
          an.attribute: a.value
          # Dynamic configuration values
          app.version: '`labels["app_version"]`'

      redis/1:
        # If this rule matches an instance of this receiver will be started.
        rule: type == "port" && port == 6379
        config:
          # Static receiver-specific config.
          password: secret
          # Dynamic configuration value.
          collection_interval: '`pod.annotations["collection_interval"]`'

      redis/2:
        # Set a resource attribute based on endpoint value.
        rule: type == "port" && port == 6379

      sqlserver:
        rule: type == "port" && pod.name matches "(?i)mssql"
        config:
          server: '`host`'
          port: '`port`'
          username: sa
          password: password

    resource_attributes:
      # Dynamic configuration values, overwriting default attributes`
      pod:
        service.name: '`labels["service_name"]`'
        app: '`labels["app"]`'
      port:
        service.name: '`pod.labels["service_name"]`'
        app: '`pod.labels["app"]`'
  receiver_creator/2:
    # Name of the extensions to watch for endpoints to start and stop.
    watch_observers: [host_observer]
    receivers:
      redis/on_host:
        # If this rule matches an instance of this receiver will be started.
        rule: type == "port" && port == 6379 && is_ipv6 == true
        resource_attributes:
          service.name: redis_on_host
  receiver_creator/3:
    watch_observers: [k8s_observer]
    receivers:
      kubeletstats:
        rule: type == "k8s.node"
        config:
          auth_type: serviceAccount
          collection_interval: 10s
          endpoint: '`endpoint`:`kubelet_endpoint_port`'
          extra_metadata_labels:
            - container.id
          metric_groups:
            - container
            - pod
            - node
      httpcheck:
        # Configure probing if standard prometheus annotations are set on the pod.
        rule: type == "k8s.service" && annotations["prometheus.io/probe"] == "true"
        config:
          targets:
          - endpoint: 'http://`endpoint`:`"prometheus.io/port" in annotations ? annotations["prometheus.io/port"] : 9090``"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/health"`'
            method: GET
          collection_interval: 10s
  receiver_creator/4:
    watch_observers: [k8s_observer]
    receivers:
      httpcheck:
        # Configure probing if standard prometheus annotations are set on the pod.
        rule: type == "k8s.ingress" && annotations["prometheus.io/probe"] == "true"
        config:
          targets:
          - endpoint: '`scheme`://`endpoint`:`port``"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/health"`'
            method: GET
          collection_interval: 10s
  receiver_creator/logs:
    watch_observers: [ k8s_observer ]
    receivers:
      filelog/busybox:
        rule: type == "pod.container" && container_name == "busybox"
        config:
          include:
            - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
          include_file_name: false
          include_file_path: true
          operators:
            - id: container-parser
              type: container
            - type: add
              field: attributes.log.template
              value: busybox
      filelog/lazybox:
        rule: type == "pod.container" && container_name == "lazybox"
        config:
          include:
            - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
          include_file_name: false
          include_file_path: true
          operators:
            - id: container-parser
              type: container
            - type: add
              field: attributes.log.template
              value: lazybox

processors:
  exampleprocessor:

exporters:
  exampleexporter:

service:
  pipelines:
    metrics:
      receivers: [receiver_creator/1, receiver_creator/2, receiver_creator/3, receiver_creator/4]
      processors: [exampleprocessor]
      exporters: [exampleexporter]
    logs:
      receivers: [receiver_creator/logs]
      processors: [exampleprocessor]
      exporters: [exampleexporter]
  extensions: [k8s_observer, host_observer]

The full list of settings exposed for this receiver are documented here with detailed sample configurations here.

Generate receiver configurations from provided Hints

Note: When hints feature is enabled if hints are present for an endpoint no receiver templates will be evaluated.

Currently this feature is only supported for K8s environments and the k8sobserver.

The discovery feature for K8s is enabled with the following setting:

receiver_creator/metrics:
  watch_observers: [ k8s_observer ]
  discovery:
     enabled: true
     # Define which receivers should be ignored when provided through annotations
     # ignore_receivers: []

Find bellow the supported annotations that user can define to automatically enable receivers to start collecting metrics and logs signals from the target Pods/containers.

Supported metrics annotations
Enable/disable discovery

io.opentelemetry.discovery.metrics/enabled (Required. "true" or "false")

Define scraper

io.opentelemetry.discovery.metrics/scraper (example: "nginx")

Define configuration

io.opentelemetry.discovery.metrics/config

For "endpoint" setting specifically, it sticks to urls that include "`endpoint`" as it comes from the Port endpoint which is in form of pod_ip:container_port. This is to ensure that each Pod can only generate configuration that targets itself and not others. If no endpoint is provided the Pod's endpoint will be used (in form of pod_ip:container_port).

Example:

io.opentelemetry.discovery.metrics/config: |
  endpoint: "http://`endpoint`/nginx_status"
  collection_interval: "20s"
  initial_delay: "20s"
  read_buffer_size: "10"
  xyz: "abc"
Support multiple target containers

Users can target the annotation to a specific container by suffixing it with the name of the port that container exposes: io.opentelemetry.discovery.metrics.<container_port>/config. For example:

io.opentelemetry.discovery.metrics.80/config: |
  endpoint: "http://`endpoint`/nginx_status"

where 80 is the port that the target container exposes.

If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and the Pod level hints are used as a fallback (see detailed example bellow).

The current implementation relies on the implementation of k8sobserver extension and specifically the pod_endpoint. The hints are evaluated per container by extracting the annotations from each Port endpoint that is emitted.

Supported logs annotations

This feature enables filelog receiver in order to collect logs from the discovered Pods.

Enable/disable discovery

io.opentelemetry.discovery.logs/enabled (Required. Example: "true")

By default "false".

Define configuration

The default configuration for the filelog receiver is the following:

include:
  - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
include_file_name: false
include_file_path: true
operators:
  - id: container-parser
    type: container

This default can be extended or overridden using the respective annotation: io.opentelemetry.discovery.logs/config

Example:

io.opentelemetry.discovery.logs/config: |
  include_file_name: true
  max_log_size: "2MiB"
  operators:
    - type: container
      id: container-parser
    - type: regex_parser
      regex: "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$"

include cannot be overridden and is fixed to discovered container's log file path.

Support multiple target containers

Users can target the annotation to a specific container by suffixing it with the name of that container: io.opentelemetry.discovery.logs.<container_name>/endpoint. For example:

io.opentelemetry.discovery.logs.busybox/config: |
  max_log_size: "3MiB"
  operators:
    - type: container
      id: container-parser
    - id: some
      type: add
      field: attributes.tag
      value: hints

where busybox is the name of the target container.

If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and the Pod level hints are used as a fallback (see detailed example bellow).

The current implementation relies on the implementation of k8sobserver extension and specifically the pod_endpoint. The hints are evaluated per container by extracting the annotations from each [Pod Container endpoint](#Pod Container) that is emitted.

Examples
Metrics and Logs example

Collector's configuration:

receivers:
  receiver_creator/metrics:
    watch_observers: [ k8s_observer ]
    discovery:
      enabled: true
    receivers:
  
  receiver_creator/logs:
    watch_observers: [ k8s_observer ]
    discovery:
      enabled: true
    receivers:

service:
  extensions: [ k8s_observer]
  pipelines:
    metrics:
      receivers: [ receiver_creator/metrics ]
      processors: []
      exporters: [ debug ]
    logs:
      receivers: [ receiver_creator/logs ]
      processors: []
      exporters: [ debug ]

Target Pod annotated with hints:

apiVersion: v1
kind: ConfigMap
metadata:
  name: nginx-conf
data:
  nginx.conf: |
    user  nginx;
    worker_processes  1;
    error_log  /dev/stderr warn;
    pid        /var/run/nginx.pid;
    events {
      worker_connections  1024;
    }
    http {
      include       /etc/nginx/mime.types;
      default_type  application/octet-stream;

      log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                        '$status $body_bytes_sent "$http_referer" '
                        '"$http_user_agent" "$http_x_forwarded_for"';
      access_log  /dev/stdout main;
      server {
          listen 80;
          server_name localhost;

          location /nginx_status {
              stub_status on;
          }
      }
      include /etc/nginx/conf.d/*;
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis-deployment
  labels:
    app: redis
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
      annotations:
        # redis container port metrics hints
        io.opentelemetry.discovery.metrics.6379/enabled: "true"
        io.opentelemetry.discovery.metrics.6379/scraper: redis
        io.opentelemetry.discovery.metrics.6379/config: |
          collection_interval: "20s"
          timeout: "10s"

        # nginx container port metrics hints
        io.opentelemetry.discovery.metrics.80/enabled: "true"
        io.opentelemetry.discovery.metrics.80/scraper: nginx
        io.opentelemetry.discovery.metrics.80/config: |
          endpoint: "http://`endpoint`/nginx_status"
          collection_interval: "30s"
          timeout: "20s"

        # redis pod container logs hints
        io.opentelemetry.discovery.logs.redis/enabled: "true"
        io.opentelemetry.discovery.logs.redis/config: |
          max_log_size: "4MiB"
          operators:
            - type: container
              id: container-parser
            - id: some
              type: add
              field: attributes.tag
              value: logs_hints

        # nginx pod container logs hints
        io.opentelemetry.discovery.logs.webserver/enabled: "true"
        io.opentelemetry.discovery.logs.webserver/config: |
          max_log_size: "3MiB"
    spec:
      volumes:
        - name: nginx-conf
          configMap:
            name: nginx-conf
            items:
              - key: nginx.conf
                path: nginx.conf
      containers:
        - name: webserver
          image: nginx:latest
          ports:
            - containerPort: 80
              name: webserver
          volumeMounts:
            - mountPath: /etc/nginx/nginx.conf
              readOnly: true
              subPath: nginx.conf
              name: nginx-conf
        - image: redis
          imagePullPolicy: IfNotPresent
          name: redis
          ports:
            - name: redis
              containerPort: 6379
              protocol: TCP

Documentation

Overview

Package receivercreator implements receiver_creator that can instantiate other receivers at runtime.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFactory added in v0.8.0

func NewFactory() receiver.Factory

NewFactory creates a factory for receiver creator.

Types

type Config

type Config struct {

	// WatchObservers are the extensions to listen to endpoints from.
	WatchObservers []component.ID `mapstructure:"watch_observers"`
	// ResourceAttributes is a map of default resource attributes to add to each resource
	// object received by this receiver from dynamically created receivers.
	ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"`
	Discovery          DiscoveryConfig    `mapstructure:"discovery"`
	// contains filtered or unexported fields
}

Config defines configuration for receiver_creator.

func (*Config) Unmarshal added in v0.25.0

func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error

type DiscoveryConfig added in v0.115.0

type DiscoveryConfig struct {
	Enabled         bool     `mapstructure:"enabled"`
	IgnoreReceivers []string `mapstructure:"ignore_receivers"`
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL