rollupprocessor

package
v2.21.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

README

Rollup processor

Supported pipeline types: traces, logs.

The rollup processor accepts traces or logs and rolls them up, based on attributes.

It is highly recommended to configure batch processor just before rollup processor. Rollup processor rolls up traces or logs which are a part of one message.

Configuration

processors:
  batch/prerollup:
    timeout: 1s
    send_batch_size: 10000
  rollup:
    rollups:
      - from: body_size
        to: body_size_min
        type: min
      - from: body_size
        to: body_size_max
        type: max
      - from: body_size
        to: body_size_sum
        type: sum
Field name Type Required Description
rollups []Rollup No Array of rollups. One per resulting rolled up field.
rollups.from string Yes Field from which value to be rolled up is read.
rollups.to string Yes Field to which rolled up value is written.
rollups.type string Yes Rollup type. See below for possible values.
Rollup types
Type name From type To type Description
min string int64 Returns minimum value.
max string int64 Returns maximum value.
sum string int64 Returns sum of values.

Design

Rollup processor is basically merging maps. In order to merge two maps, they need to have the same key attribues. Key attribute is an attribute which was not specified as rollup attribute in the configuration i.e. was not specified in any of rollups.from fields.

Merging of such maps results with a map with key attributes and rollup result attributes. In addition, there is rollup_count attribute, which describes how many maps were rolled up into this single result. Please refer to the example below to fully understand.

Example

Assume the rollup processor is configured with config given in the previous section. On the input we have following maps (they are parts of either Traces of Logs).

[
  { "path": "/foo", "user-agent": "fizz", "body_size": "5" },
  { "path": "/foo", "user-agent": "fizz", "body_size": "6" },
  { "path": "/bar", "user-agent": "fizz", "body_size": "7" }
]

This will result in:

[
  {
    "path": "/foo",
    "user-agent": "fizz",
    "body_size_min": 5,
    "body_size_max": 6,
    "body_size_sum": 11,
    "rollup_count": 2
  },
  {
    "path": "/bar",
    "user-agent": "fizz",
    "body_size_min": 7,
    "body_size_max": 7,
    "body_size_sum": 7,
    "rollup_count": 1
  }
]

Documentation

Index

Constants

View Source
const (

	// RedactedAttributeValue is a value that replaces actual attribute value
	// in case it exceeds cardinality limit.
	RedactedAttributeValue = "REDACTED_VIA_CARDINALITY_LIMIT"
)
View Source
const (
	// RollupCountKey is the key used to store the count of the rollup.
	RollupCountKey = "rollup_count"
)

Variables

This section is empty.

Functions

func AggregateField

func AggregateField(field string, rollupType RollupType) string

AggregateField returns the aggregate field name for the given field and rollup type.

func CreateLogsProcessor

func CreateLogsProcessor(
	_ context.Context,
	set processor.CreateSettings,
	cfg component.Config,
	nextConsumer consumer.Logs,
) (processor.Logs, error)

CreateLogsProcessor returns rollupProcessor handling logs.

func NewFactory

func NewFactory(promRegistry *prometheus.Registry) processor.Factory

NewFactory returns a new factory for the Rollup processor.

Types

type Config

type Config struct {
	AttributeCardinalityLimit int       `mapstructure:"attribute_cardinality_limit"`
	RollupBuckets             []float64 `mapstructure:"rollup_buckets"`
	// contains filtered or unexported fields
}

Config defines configuration for rollup processor.

type Rollup

type Rollup struct {
	FromField      string
	ToField        string
	Type           RollupType
	TreatAsMissing []string
}

Rollup represents single rollup operation. It describes Type of operation to be done on all `FromField`s from logs/traces. Result of operation is stored in `ToField`.

func NewRollups

func NewRollups(groups []RollupGroup) []*Rollup

NewRollups creates individual rollups based on rollup groups.

func (*Rollup) GetFromFieldValue

func (rollup *Rollup) GetFromFieldValue(attributes pcommon.Map) (float64, bool)

GetFromFieldValue returns value of `FromField` from attributes as float64.

func (*Rollup) GetToFieldValue

func (rollup *Rollup) GetToFieldValue(attributes pcommon.Map) (float64, bool)

GetToFieldValue returns value of `ToField` from attributes as float64.

type RollupGroup

type RollupGroup struct {
	FromField      string
	WithDatasketch bool
	TreatAsMissing []string
}

RollupGroup represents a group of rollup operations of different types that all use the same FromField.

By default all basic rollup types will be used (sum, max, min, sum of squares). WithDatasketch enables also the Datasketch rollup type.

The name of ToField of Rollup will be inferred from the type of the rollup.

type RollupType

type RollupType string

RollupType represents rollup type available in the processor.

const (
	// RollupSum rolls up fields by adding them.
	RollupSum RollupType = "sum"
	// RollupMax rolls up fields by getting max value of them.
	RollupMax RollupType = "max"
	// RollupMin rolls up fields by getting min value of them.
	RollupMin RollupType = "min"
	// RollupSumOfSquares rolls up fields by summing squares of them.
	RollupSumOfSquares RollupType = "sumOfSquares"
	// RollupDatasketch rolls up fields by creating datasketch from them.
	RollupDatasketch RollupType = "datasketch"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL