stages

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2025 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrDropStageEmptyConfig       = "drop stage config must contain at least one of `source`, `expression`, `older_than` or `longer_than`"
	ErrDropStageInvalidConfig     = "drop stage config error, `value` and `expression` cannot both be defined at the same time."
	ErrDropStageInvalidRegex      = "drop stage regex compilation error: %v"
	ErrDropStageNoSourceWithValue = "drop stage config must contain `source` if `value` is specified"
)
View Source
const (
	ErrExpressionsRequired  = "JMES expression is required"
	ErrCouldNotCompileJMES  = "could not compile JMES expression"
	ErrEmptyJSONStageConfig = "empty json stage configuration"
	ErrEmptyJSONStageSource = "empty source"
	ErrMalformedJSON        = "malformed json"
)

Config Errors

View Source
const (
	ErrEmptyLabelStageConfig = "label stage config cannot be empty"
	ErrInvalidLabelName      = "invalid label name: %s"
)
View Source
const (
	StageTypeCRI        = "cri"
	StageTypeDecolorize = "decolorize"
	StageTypeDocker     = "docker"
	StageTypeDrop       = "drop"
	//TODO(thampiotr): Add support for eventlogmessage stage
	StageTypeEventLogMessage    = "eventlogmessage"
	StageTypeGeoIP              = "geoip"
	StageTypeJSON               = "json"
	StageTypeLabel              = "labels"
	StageTypeLabelAllow         = "labelallow"
	StageTypeLabelDrop          = "labeldrop"
	StageTypeLimit              = "limit"
	StageTypeLogfmt             = "logfmt"
	StageTypeLuhn               = "luhn"
	StageTypeMatch              = "match"
	StageTypeMetric             = "metrics"
	StageTypeMultiline          = "multiline"
	StageTypeOutput             = "output"
	StageTypePack               = "pack"
	StageTypePipeline           = "pipeline"
	StageTypeRegex              = "regex"
	StageTypeReplace            = "replace"
	StageTypeSampling           = "sampling"
	StageTypeStaticLabels       = "static_labels"
	StageTypeStructuredMetadata = "structured_metadata"
	StageTypeTemplate           = "template"
	StageTypeTenant             = "tenant"
	StageTypeTimestamp          = "timestamp"
)

TODO(@tpaschalis) Let's use this as the list of stages we need to port over.

View Source
const (
	ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
View Source
const (
	ErrTimestampContainsYear = "timestamp '%s' is expected to not contain the year date component"
)
View Source
const MinReasonableMaxDistinctLabels = 10000 // 80bytes per rate.Limiter ~ 1MiB memory

MinReasonableMaxDistinctLabels provides a sensible default.

View Source
const (
	RFC3339Nano = "RFC3339Nano"
)
View Source
const ReservedLabelTenantID = "__tenant_id__"

ReservedLabelTenantID is a shared value used to refer to the tenant ID.

Variables

View Source
var (
	ErrEmptyDBPathGeoIPStageConfig          = errors.New("db path cannot be empty")
	ErrEmptySourceGeoIPStageConfig          = errors.New("source cannot be empty")
	ErrEmptyDBTypeGeoIPStageConfig          = errors.New("db type should be either city or asn")
	ErrEmptyDBTypeAndValuesGeoIPStageConfig = errors.New("db type or values need to be set")
)
View Source
var (
	ErrLimitStageInvalidRateOrBurst = errors.New("limit stage failed to parse rate or burst")
	ErrLimitStageByLabelMustDrop    = errors.New("When ratelimiting by label, drop must be true")
)

Configuration errors.

View Source
var (
	ErrMappingRequired        = errors.New("logfmt mapping is required")
	ErrEmptyLogfmtStageConfig = errors.New("empty logfmt stage configuration")
)

Config Errors

View Source
var (
	ErrSelectorRequired    = errors.New("selector statement required for match stage")
	ErrMatchRequiresStages = errors.New("match stage requires at least one additional stage to be defined in '- stages'")
	ErrSelectorSyntax      = errors.New("invalid selector syntax for match stage")
	ErrStagesWithDropLine  = errors.New("match stage configured to drop entries cannot contains stages")
	ErrUnknownMatchAction  = errors.New("match stage action should be 'keep' or 'drop'")

	MatchActionKeep = "keep"
	MatchActionDrop = "drop"
)

Configuration errors.

View Source
var (
	ErrMultilineStageEmptyConfig  = errors.New("multiline stage config must define `firstline` regular expression")
	ErrMultilineStageInvalidRegex = errors.New("multiline stage first line regex compilation error")
)

Configuration errors.

View Source
var (
	ErrEmptyOutputStageConfig = errors.New("output stage config cannot be empty")
	ErrOutputSourceRequired   = errors.New("output source value is required if output is specified")
)

Config Errors.

View Source
var (
	ErrExpressionRequired    = errors.New("expression is required")
	ErrCouldNotCompileRegex  = errors.New("could not compile regular expression")
	ErrEmptyRegexStageSource = errors.New("empty source")
)

Config Errors.

View Source
var (
	ErrTenantStageEmptyLabelSourceOrValue        = errors.New("label, source or value config are required")
	ErrTenantStageConflictingLabelSourceAndValue = errors.New("label, source and value are mutually exclusive: you should set source, value or label but not all")
)

Configuration errors.

View Source
var (
	ErrEmptyTimestampStageConfig = errors.New("timestamp stage config cannot be empty")
	ErrTimestampSourceRequired   = errors.New("timestamp source value is required if timestamp is specified")
	ErrTimestampFormatRequired   = errors.New("timestamp format is required")
	ErrInvalidLocation           = errors.New("invalid location specified: %v")
	ErrInvalidActionOnFailure    = errors.New("invalid action on failure (supported values are %v)")
	ErrTimestampSourceMissing    = errors.New("extracted data did not contain a timestamp")
	ErrTimestampConversionFailed = errors.New("failed to convert extracted time to string")
	ErrTimestampParsingFailed    = errors.New("failed to parse time")

	Unix   = "Unix"
	UnixMs = "UnixMs"
	UnixUs = "UnixUs"
	UnixNs = "UnixNs"

	TimestampActionOnFailureSkip    = "skip"
	TimestampActionOnFailureFudge   = "fudge"
	TimestampActionOnFailureDefault = TimestampActionOnFailureFudge
)

Config errors.

View Source
var (
	// Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level
	// so this global is used for that purpose. This allows us to skip allocations of log messages at the
	// debug level when debug level logging is not enabled. Log level allocations can become very expensive
	// as we log numerous log entries per log line at debug level.
	Debug = false

	// Inspect is used to debug promtail pipelines by showing diffs between pipeline stages
	Inspect = false
)
View Source
var DefaultCRIConfig = CRIConfig{
	MaxPartialLines:            100,
	MaxPartialLineSize:         0,
	MaxPartialLineSizeTruncate: false,
}

DefaultCRIConfig contains the default CRIConfig values.

View Source
var DefaultMultilineConfig = MultilineConfig{
	MaxLines:    128,
	MaxWaitTime: 3 * time.Second,
}

DefaultMultilineConfig applies the default values on

View Source
var DefaultPackConfig = PackConfig{
	IngestTimestamp: true,
}

DefaultPackConfig sets the defaults.

View Source
var ErrEmptyLabelAllowStageConfig = errors.New("labelallow stage config cannot be empty")

ErrEmptyLabelAllowStageConfig error is returned if the config is empty.

View Source
var ErrEmptyLabelDropStageConfig = errors.New("labeldrop stage config cannot be empty")

ErrEmptyLabelDropStageConfig error returned if the config is empty.

View Source
var ErrEmptyStaticLabelStageConfig = errors.New("static_labels stage config cannot be empty")

ErrEmptyStaticLabelStageConfig error returned if the config is empty.

View Source
var (
	ErrTemplateSourceRequired = errors.New("template source value is required")
)

Config Errors.

TimestampActionOnFailureOptions defines the available options for the `action_on_failure` field.

Functions

func RunWith

func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry

RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.

func RunWithSkipOrSendMany

func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry

RunWithSkipOrSendMany same as RunWith, except it handles sending multiple entries at the same time and it wil skip sending the batch to output channel, if `process` functions returns `skip` true.

func SanitizeFullLabelName

func SanitizeFullLabelName(input string) string

Sanitize a input string to convert it into a valid prometheus label TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName

func SetReadLineRateLimiter

func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool)

Types

type CRIConfig

type CRIConfig struct {
	MaxPartialLines            int    `alloy:"max_partial_lines,attr,optional"`
	MaxPartialLineSize         uint64 `alloy:"max_partial_line_size,attr,optional"`
	MaxPartialLineSizeTruncate bool   `alloy:"max_partial_line_size_truncate,attr,optional"`
}

CRIConfig is an empty struct that is used to enable a pre-defined pipeline for decoding entries that are using the CRI logging format.

func (*CRIConfig) SetToDefault

func (args *CRIConfig) SetToDefault()

SetToDefault implements syntax.Defaulter.

func (*CRIConfig) Validate

func (args *CRIConfig) Validate() error

Validate implements syntax.Validator.

type DecolorizeConfig

type DecolorizeConfig struct{}

type DockerConfig

type DockerConfig struct{}

DockerConfig is an empty struct that is used to enable a pre-defined pipeline for decoding entries that are using the Docker logs format.

type DropConfig

type DropConfig struct {
	DropReason string           `alloy:"drop_counter_reason,attr,optional"`
	Source     string           `alloy:"source,attr,optional"`
	Value      string           `alloy:"value,attr,optional"`
	Separator  string           `alloy:"separator,attr,optional"`
	Expression string           `alloy:"expression,attr,optional"`
	OlderThan  time.Duration    `alloy:"older_than,attr,optional"`
	LongerThan units.Base2Bytes `alloy:"longer_than,attr,optional"`
}

DropConfig contains the configuration for a dropStage

type Entry

type Entry struct {
	Extracted map[string]interface{}
	loki.Entry
}

type EventLogMessageConfig

type EventLogMessageConfig struct {
	Source            string `alloy:"source,attr,optional"`
	DropInvalidLabels bool   `alloy:"drop_invalid_labels,attr,optional"`
	OverwriteExisting bool   `alloy:"overwrite_existing,attr,optional"`
}

func (*EventLogMessageConfig) SetToDefault

func (e *EventLogMessageConfig) SetToDefault()

func (*EventLogMessageConfig) Validate

func (e *EventLogMessageConfig) Validate() error

type GenerationalMap

type GenerationalMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

GenerationalMap is ported from Loki's pkg/util package. It didn't exist in our dependency at the time, so I copied the implementation over.

func NewGenMap

func NewGenMap[K comparable, V any](maxSize int, newV func() V, gcCb func()) GenerationalMap[K, V]

NewGenMap created which maintains at most maxSize recently used entries

func (*GenerationalMap[K, T]) GetOrCreate

func (m *GenerationalMap[K, T]) GetOrCreate(key K) T

type GeoIPConfig

type GeoIPConfig struct {
	DB            string            `alloy:"db,attr"`
	Source        *string           `alloy:"source,attr"`
	DBType        string            `alloy:"db_type,attr,optional"`
	CustomLookups map[string]string `alloy:"custom_lookups,attr,optional"`
}

GeoIPConfig represents GeoIP stage config

type GeoIPFields

type GeoIPFields int
const (
	CITYNAME GeoIPFields = iota
	COUNTRYNAME
	COUNTRYCODE
	CONTINENTNAME
	CONTINENTCODE
	LOCATION
	POSTALCODE
	TIMEZONE
	SUBDIVISIONNAME
	SUBDIVISIONCODE
	ASN
	ASNORG
)

type JSONConfig

type JSONConfig struct {
	Expressions   map[string]string `alloy:"expressions,attr"`
	Source        *string           `alloy:"source,attr,optional"`
	DropMalformed bool              `alloy:"drop_malformed,attr,optional"`
}

JSONConfig represents a JSON Stage configuration

type LabelAllowConfig

type LabelAllowConfig struct {
	Values []string `alloy:"values,attr"`
}

LabelAllowConfig contains the slice of labels to allow through.

type LabelDropConfig

type LabelDropConfig struct {
	Values []string `alloy:"values,attr"`
}

LabelDropConfig contains the slice of labels to be dropped.

type LabelsConfig

type LabelsConfig struct {
	Values map[string]*string `alloy:"values,attr"`
}

LabelsConfig is a set of labels to be extracted

type LimitConfig

type LimitConfig struct {
	Rate              float64 `alloy:"rate,attr"`
	Burst             int     `alloy:"burst,attr"`
	Drop              bool    `alloy:"drop,attr,optional"`
	ByLabelName       string  `alloy:"by_label_name,attr,optional"`
	MaxDistinctLabels int     `alloy:"max_distinct_labels,attr,optional"`
}

LimitConfig sets up a Limit stage.

type LogfmtConfig

type LogfmtConfig struct {
	Mapping map[string]string `alloy:"mapping,attr"`
	Source  string            `alloy:"source,attr,optional"`
}

LogfmtConfig represents a logfmt Stage configuration

type LuhnFilterConfig

type LuhnFilterConfig struct {
	Replacement string  `alloy:"replacement,attr,optional"`
	Source      *string `alloy:"source,attr,optional"`
	MinLength   int     `alloy:"min_length,attr,optional"`
}

LuhnFilterConfig configures a processing stage that filters out Luhn-valid numbers.

type MatchConfig

type MatchConfig struct {
	Selector     string        `alloy:"selector,attr"`
	Stages       []StageConfig `alloy:"stage,enum,optional"`
	Action       string        `alloy:"action,attr,optional"`
	PipelineName string        `alloy:"pipeline_name,attr,optional"`
	DropReason   string        `alloy:"drop_counter_reason,attr,optional"`
}

MatchConfig contains the configuration for a matcherStage

type MetricConfig

type MetricConfig struct {
	Counter   *metric.CounterConfig   `alloy:"counter,block,optional"`
	Gauge     *metric.GaugeConfig     `alloy:"gauge,block,optional"`
	Histogram *metric.HistogramConfig `alloy:"histogram,block,optional"`
}

MetricConfig is a single metrics configuration.

type MetricsConfig

type MetricsConfig struct {
	Metrics []MetricConfig `alloy:"metric,enum,optional"`
}

MetricsConfig is a set of configured metrics.

type MultilineConfig

type MultilineConfig struct {
	Expression  string        `alloy:"firstline,attr"`
	MaxLines    uint64        `alloy:"max_lines,attr,optional"`
	MaxWaitTime time.Duration `alloy:"max_wait_time,attr,optional"`
}

MultilineConfig contains the configuration for a Multiline stage.

func (*MultilineConfig) SetToDefault

func (args *MultilineConfig) SetToDefault()

SetToDefault implements syntax.Defaulter.

func (*MultilineConfig) Validate

func (args *MultilineConfig) Validate() error

Validate implements syntax.Validator.

type OutputConfig

type OutputConfig struct {
	Source string `alloy:"source,attr"`
}

OutputConfig initializes a configuration stage which sets the log line to a value from the extracted map.

type PackConfig

type PackConfig struct {
	Labels          []string `alloy:"labels,attr"`
	IngestTimestamp bool     `alloy:"ingest_timestamp,attr,optional"`
}

PackConfig contains the configuration for a packStage

func (*PackConfig) SetToDefault

func (p *PackConfig) SetToDefault()

SetToDefault implements syntax.Defaulter.

type Packed

type Packed struct {
	Labels map[string]string `json:",inline"`
	Entry  string            `json:"_entry"`
}

Packed keeps track of the labels and log entry.

func (Packed) MarshalJSON

func (w Packed) MarshalJSON() ([]byte, error)

MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object

func (*Packed) UnmarshalJSON

func (w *Packed) UnmarshalJSON(data []byte) error

UnmarshalJSON populates a Packed struct where every key except the _entry key is added to the Labels field

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline pass down a log entry to each stage for mutation and/or label extraction.

func NewPipeline

func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer) (*Pipeline, error)

NewPipeline creates a new log entry pipeline from a configuration

func (*Pipeline) Cleanup added in v1.2.0

func (p *Pipeline) Cleanup()

Cleanup implements Stage.

func (*Pipeline) Name

func (p *Pipeline) Name() string

Name implements Stage

func (*Pipeline) Run

func (p *Pipeline) Run(in chan Entry) chan Entry

Run implements Stage

func (*Pipeline) Size

func (p *Pipeline) Size() int

Size gets the current number of stages in the pipeline

func (*Pipeline) Wrap

func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler

Wrap implements EntryMiddleware

type Processor

type Processor interface {
	Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string)
	Name() string
}

Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated timestamp and log entry

type RegexConfig

type RegexConfig struct {
	Expression string  `alloy:"expression,attr"`
	Source     *string `alloy:"source,attr,optional"`
}

RegexConfig configures a processing stage uses regular expressions to extract values from log lines into the shared values map.

type ReplaceConfig

type ReplaceConfig struct {
	Expression string `alloy:"expression,attr"`
	Source     string `alloy:"source,attr,optional"`
	Replace    string `alloy:"replace,attr,optional"`
}

ReplaceConfig contains a regexStage configuration

type SamplingConfig

type SamplingConfig struct {
	DropReason   string  `alloy:"drop_counter_reason,attr,optional"`
	SamplingRate float64 `alloy:"rate,attr"`
}

SamplingConfig contains the configuration for a samplingStage

func (*SamplingConfig) SetToDefault

func (s *SamplingConfig) SetToDefault()

func (*SamplingConfig) Validate

func (s *SamplingConfig) Validate() error

type Stage

type Stage interface {
	Name() string
	Run(chan Entry) chan Entry
	Cleanup()
}

Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel.

func New

func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometheus.Registerer) (Stage, error)

New creates a new stage for the given type and configuration.

func NewCRI

func NewCRI(logger log.Logger, config CRIConfig, registerer prometheus.Registerer) (Stage, error)

NewCRI creates a predefined pipeline for parsing entries in the CRI log format.

func NewDocker

func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error)

NewDocker creates a predefined pipeline for parsing entries in the Docker json log format.

type StageConfig

type StageConfig struct {
	CRIConfig             *CRIConfig             `alloy:"cri,block,optional"`
	DecolorizeConfig      *DecolorizeConfig      `alloy:"decolorize,block,optional"`
	DockerConfig          *DockerConfig          `alloy:"docker,block,optional"`
	DropConfig            *DropConfig            `alloy:"drop,block,optional"`
	EventLogMessageConfig *EventLogMessageConfig `alloy:"eventlogmessage,block,optional"`
	GeoIPConfig           *GeoIPConfig           `alloy:"geoip,block,optional"`
	JSONConfig            *JSONConfig            `alloy:"json,block,optional"`
	LabelAllowConfig      *LabelAllowConfig      `alloy:"label_keep,block,optional"`
	LabelDropConfig       *LabelDropConfig       `alloy:"label_drop,block,optional"`
	LabelsConfig          *LabelsConfig          `alloy:"labels,block,optional"`
	LimitConfig           *LimitConfig           `alloy:"limit,block,optional"`
	LogfmtConfig          *LogfmtConfig          `alloy:"logfmt,block,optional"`
	LuhnFilterConfig      *LuhnFilterConfig      `alloy:"luhn,block,optional"`
	MatchConfig           *MatchConfig           `alloy:"match,block,optional"`
	MetricsConfig         *MetricsConfig         `alloy:"metrics,block,optional"`
	MultilineConfig       *MultilineConfig       `alloy:"multiline,block,optional"`
	OutputConfig          *OutputConfig          `alloy:"output,block,optional"`
	PackConfig            *PackConfig            `alloy:"pack,block,optional"`
	RegexConfig           *RegexConfig           `alloy:"regex,block,optional"`
	ReplaceConfig         *ReplaceConfig         `alloy:"replace,block,optional"`
	StaticLabelsConfig    *StaticLabelsConfig    `alloy:"static_labels,block,optional"`
	StructuredMetadata    *LabelsConfig          `alloy:"structured_metadata,block,optional"`
	SamplingConfig        *SamplingConfig        `alloy:"sampling,block,optional"`
	TemplateConfig        *TemplateConfig        `alloy:"template,block,optional"`
	TenantConfig          *TenantConfig          `alloy:"tenant,block,optional"`
	TimestampConfig       *TimestampConfig       `alloy:"timestamp,block,optional"`
}

StageConfig defines a single stage in a processing pipeline. We define these as pointers types so we can use reflection to check that exactly one is set.

type StaticLabelsConfig

type StaticLabelsConfig struct {
	Values map[string]*string `alloy:"values,attr"`
}

StaticLabelsConfig contains a map of static labels to be set.

type TemplateConfig

type TemplateConfig struct {
	Source   string `alloy:"source,attr"`
	Template string `alloy:"template,attr"`
}

TemplateConfig configures template value extraction.

type TenantConfig

type TenantConfig struct {
	Label  string `alloy:"label,attr,optional"`
	Source string `alloy:"source,attr,optional"`
	Value  string `alloy:"value,attr,optional"`
}

TenantConfig configures a tenant stage.

type TimestampConfig

type TimestampConfig struct {
	Source          string   `alloy:"source,attr"`
	Format          string   `alloy:"format,attr"`
	FallbackFormats []string `alloy:"fallback_formats,attr,optional"`
	Location        *string  `alloy:"location,attr,optional"`
	ActionOnFailure string   `alloy:"action_on_failure,attr,optional"`
}

TimestampConfig configures a processing stage for timestamp extraction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL