rollupprocessor

package
v0.1.3-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2022 License: AGPL-3.0 Imports: 14 Imported by: 0

README

Rollup processor

Supported pipeline types: traces, logs.

The rollup processor accepts traces or logs and rolls them up, based on attributes.

It is highly recommended to configure batch processor just before rollup processor. Rollup processor rolls up traces or logs which are a part of one message.

Configuration

processors:
  batch/prerollup:
    timeout: 1s
    send_batch_size: 10000
  rollup:
    rollups:
      - from: body_size
        to: body_size_min
        type: min
      - from: body_size
        to: body_size_max
        type: max
      - from: body_size
        to: body_size_sum
        type: sum
Field name Type Required Description
rollups []Rollup No Array of rollups. One per resulting rolled up field.
rollups.from string Yes Field from which value to be rolled up is read.
rollups.to string Yes Field to which rolled up value is written.
rollups.type string Yes Rollup type. See below for possible values.
Rollup types
Type name From type To type Description
min string int64 Returns minimum value.
max string int64 Returns maximum value.
sum string int64 Returns sum of values.

Design

Rollup processor is basically merging maps. In order to merge two maps, they need to have the same key attribues. Key attribute is an attribute which was not specified as rollup attribute in the configuration i.e. was not specified in any of rollups.from fields.

Merging of such maps results with a map with key attributes and rollup result attributes. In addition, there is rollup_count attribute, which describes how many maps were rolled up into this single result. Please refer to the example below to fully understand.

Example

Assume the rollup processor is configured with config given in the previous section. On the input we have following maps (they are parts of either Traces of Logs).

[
  { "path": "/foo", "user-agent": "fizz", "body_size": "5" },
  { "path": "/foo", "user-agent": "fizz", "body_size": "6" },
  { "path": "/bar", "user-agent": "fizz", "body_size": "7" }
]

This will result in:

[
  {
    "path": "/foo",
    "user-agent": "fizz",
    "body_size_min": 5,
    "body_size_max": 6,
    "body_size_sum": 11,
    "rollup_count": 2
  },
  {
    "path": "/bar",
    "user-agent": "fizz",
    "body_size_min": 7,
    "body_size_max": 7,
    "body_size_sum": 7,
    "rollup_count": 1
  }
]

Documentation

Index

Constants

View Source
const (
	// RollupCountKey is the key used to store the count of the rollup.
	RollupCountKey = "rollup_count"
)

Variables

This section is empty.

Functions

func AggregateField added in v0.1.3

func AggregateField(field string, rollupType RollupType) string

AggregateField returns the aggregate field name for the given field and rollup type.

func CreateLogsProcessor

func CreateLogsProcessor(
	_ context.Context,
	set component.ProcessorCreateSettings,
	cfg config.Processor,
	nextConsumer consumer.Logs,
) (component.LogsProcessor, error)

CreateLogsProcessor returns rollupProcessor handling logs.

func CreateTracesProcessor

func CreateTracesProcessor(
	_ context.Context,
	set component.ProcessorCreateSettings,
	cfg config.Processor,
	nextConsumer consumer.Traces,
) (component.TracesProcessor, error)

CreateTracesProcessor returns rollupProcessor handling traces.

func NewFactory

func NewFactory() component.ProcessorFactory

NewFactory returns a new factory for the Rollup processor.

Types

type Config

type Config struct {
	config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

Config defines configuration for rollup processor.

type Rollup

type Rollup struct {
	FromField   string     `mapstructure:"from"`
	ToField     string     `mapstructure:"to"`
	Type        RollupType `mapstructure:"type"`
	TreatAsZero []string   `mapstructure:"treat_as_zero"`
}

Rollup represents singe rollup operation. It describes Type of operation to be done on all `FromField`s from logs/traces. Result of operation is stored in `ToField`.

func (*Rollup) GetFromFieldValue added in v0.1.3

func (rollup *Rollup) GetFromFieldValue(attributes pcommon.Map) (float64, bool)

GetFromFieldValue returns value of `FromField` from attributes as float64.

func (*Rollup) GetToFieldValue added in v0.1.3

func (rollup *Rollup) GetToFieldValue(attributes pcommon.Map) (float64, bool)

GetToFieldValue returns value of `ToField` from attributes as float64.

type RollupType

type RollupType string

RollupType represents rollup type available in the processor.

const (
	// RollupSum rolls up fields by adding them.
	RollupSum RollupType = "sum"
	// RollupMax rolls up fields by getting max value of them.
	RollupMax RollupType = "max"
	// RollupMin rolls up fields by getting min value of them.
	RollupMin RollupType = "min"
	// RollupSumOfSquares rolls up fields by summing squares of them.
	RollupSumOfSquares RollupType = "sumOfSquares"
	// RollupDatasketch rolls up fields by creating datasketch from them.
	RollupDatasketch RollupType = "datasketch"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL