stages

package
v0.31.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2023 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrExpressionsRequired  = "JMES expression is required"
	ErrCouldNotCompileJMES  = "could not compile JMES expression"
	ErrEmptyJSONStageConfig = "empty json stage configuration"
	ErrEmptyJSONStageSource = "empty source"
	ErrMalformedJSON        = "malformed json"
)

Config Errors

View Source
const (
	ErrEmptyLabelStageConfig = "label stage config cannot be empty"
	ErrInvalidLabelName      = "invalid label name: %s"
)
View Source
const (
	StageTypeJSON         = "json"
	StageTypeLogfmt       = "logfmt"
	StageTypeRegex        = "regex"
	StageTypeReplace      = "replace"
	StageTypeMetric       = "metrics"
	StageTypeLabel        = "labels"
	StageTypeLabelDrop    = "labeldrop"
	StageTypeTimestamp    = "timestamp"
	StageTypeOutput       = "output"
	StageTypeDocker       = "docker"
	StageTypeCRI          = "cri"
	StageTypeMatch        = "match"
	StageTypeTemplate     = "template"
	StageTypePipeline     = "pipeline"
	StageTypeTenant       = "tenant"
	StageTypeDrop         = "drop"
	StageTypeLimit        = "limit"
	StageTypeMultiline    = "multiline"
	StageTypePack         = "pack"
	StageTypeLabelAllow   = "labelallow"
	StageTypeStaticLabels = "static_labels"
)

TODO(@tpaschalis) Let's use this as the list of stages we need to port over.

View Source
const (
	ErrTimestampContainsYear = "timestamp '%s' is expected to not contain the year date component"
)
View Source
const MaxPartialLinesSize = 100

MaxPartialLinesSize is the max buffer size to hold partial lines when parsing the CRI stage format.

Variables

View Source
var (
	ErrDropStageEmptyConfig   = errors.New("drop stage config must contain at least one of `source`, `expression`, `older_than` or `longer_than`")
	ErrDropStageInvalidConfig = errors.New("drop stage config error, `value` and `expression` cannot both be defined at the same time")
	ErrDropStageInvalidRegex  = errors.New("drop stage regex compilation error")
)

Configuration errors.

View Source
var (
	ErrMappingRequired        = errors.New("logfmt mapping is required")
	ErrEmptyLogfmtStageConfig = errors.New("empty logfmt stage configuration")
	ErrEmptyLogfmtStageSource = errors.New("empty source")
)

Config Errors

View Source
var (
	ErrEmptyMatchStageConfig = errors.New("match stage config cannot be empty")
	ErrPipelineNameRequired  = errors.New("match stage pipeline name can be omitted but cannot be an empty string")
	ErrSelectorRequired      = errors.New("selector statement required for match stage")
	ErrMatchRequiresStages   = errors.New("match stage requires at least one additional stage to be defined in '- stages'")
	ErrSelectorSyntax        = errors.New("invalid selector syntax for match stage")
	ErrStagesWithDropLine    = errors.New("match stage configured to drop entries cannot contains stages")
	ErrUnknownMatchAction    = errors.New("match stage action should be 'keep' or 'drop'")

	MatchActionKeep = "keep"
	MatchActionDrop = "drop"
)

Configuration errors.

View Source
var (
	ErrMultilineStageEmptyConfig        = errors.New("multiline stage config must define `firstline` regular expression")
	ErrMultilineStageInvalidRegex       = errors.New("multiline stage first line regex compilation error")
	ErrMultilineStageInvalidMaxWaitTime = errors.New("multiline stage `max_wait_time` parse error")
)

Configuration errors.

View Source
var (
	ErrEmptyOutputStageConfig = errors.New("output stage config cannot be empty")
	ErrOutputSourceRequired   = errors.New("output source value is required if output is specified")
)

Config Errors.

View Source
var (
	ErrExpressionRequired    = errors.New("expression is required")
	ErrCouldNotCompileRegex  = errors.New("could not compile regular expression")
	ErrEmptyRegexStageSource = errors.New("empty source")
)

Config Errors.

View Source
var (
	ErrEmptyReplaceStageConfig = errors.New("empty replace stage configuration")
	ErrEmptyReplaceStageSource = errors.New("empty source in replace stage")
)

Config Errors

View Source
var (
	ErrEmptyTimestampStageConfig = errors.New("timestamp stage config cannot be empty")
	ErrTimestampSourceRequired   = errors.New("timestamp source value is required if timestamp is specified")
	ErrTimestampFormatRequired   = errors.New("timestamp format is required")
	ErrInvalidLocation           = errors.New("invalid location specified: %v")
	ErrInvalidActionOnFailure    = errors.New("invalid action on failure (supported values are %v)")
	ErrTimestampSourceMissing    = errors.New("extracted data did not contain a timestamp")
	ErrTimestampConversionFailed = errors.New("failed to convert extracted time to string")
	ErrTimestampParsingFailed    = errors.New("failed to parse time")

	Unix   = "Unix"
	UnixMs = "UnixMs"
	UnixUs = "UnixUs"
	UnixNs = "UnixNs"

	TimestampActionOnFailureSkip    = "skip"
	TimestampActionOnFailureFudge   = "fudge"
	TimestampActionOnFailureDefault = TimestampActionOnFailureFudge
)

Config errors.

View Source
var (
	// Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level
	// so this global is used for that purpose. This allows us to skip allocations of log messages at the
	// debug level when debug level logging is not enabled. Log level allocations can become very expensive
	// as we log numerous log entries per log line at debug level.
	Debug = false

	// Inspect is used to debug promtail pipelines by showing diffs between pipeline stages
	Inspect = false
)
View Source
var DefaultMultilineConfig = MultilineConfig{
	MaxLines:    128,
	MaxWaitTime: 3 * time.Second,
}

DefaultMultilineConfig applies the default values on

View Source
var ErrEmptyLabelAllowStageConfig = errors.New("labelallow stage config cannot be empty")

ErrEmptyLabelAllowStageConfig error is returned if the config is empty.

View Source
var ErrEmptyLabelDropStageConfig = errors.New("labeldrop stage config cannot be empty")

ErrEmptyLabelDropStageConfig error returned if the config is empty.

View Source
var ErrEmptyStaticLabelStageConfig = errors.New("static_labels stage config cannot be empty")

ErrEmptyStaticLabelStageConfig error returned if the config is empty.

TimestampActionOnFailureOptions defines the available options for the `action_on_failure` field.

Functions

func RunWith

func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry

RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.

func RunWithSkip

func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry

RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.

func SetReadLineRateLimiter

func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool)

Types

type CRIConfig added in v0.31.0

type CRIConfig struct{}

CRIConfig is an empty struct that is used to enable a pre-defined pipeline for decoding entries that are using the CRI logging format.

type DockerConfig added in v0.31.0

type DockerConfig struct{}

DockerConfig is an empty struct that is used to enable a pre-defined pipeline for decoding entries that are using the Docker logs format.

type DropConfig added in v0.31.0

type DropConfig struct {
	DropReason string           `river:"drop_counter_reason,attr,optional"`
	Source     string           `river:"source,attr,optional"`
	Value      string           `river:"value,attr,optional"`
	Expression string           `river:"expression,attr,optional"`
	OlderThan  time.Duration    `river:"older_than,attr,optional"`
	LongerThan units.Base2Bytes `river:"longer_than,attr,optional"`
	// contains filtered or unexported fields
}

DropConfig contains the configuration for a dropStage

type Entry

type Entry struct {
	Extracted map[string]interface{}
	loki.Entry
}

type JSONConfig

type JSONConfig struct {
	Expressions   map[string]string `river:"expressions,attr"`
	Source        *string           `river:"source,attr,optional"`
	DropMalformed bool              `river:"drop_malformed,attr,optional"`
}

JSONConfig represents a JSON Stage configuration

type LabelAllowConfig added in v0.31.0

type LabelAllowConfig struct {
	Values []string `river:"values,attr"`
}

LabelAllowConfig contains the slice of labels to allow through.

type LabelDropConfig added in v0.31.0

type LabelDropConfig struct {
	Values []string `river:"values,attr"`
}

LabelDropConfig contains the slice of labels to be dropped.

type LabelsConfig

type LabelsConfig struct {
	Values map[string]*string `river:"values,attr"`
}

LabelsConfig is a set of labels to be extracted

type LogfmtConfig added in v0.31.0

type LogfmtConfig struct {
	Mapping map[string]string `river:"mapping,attr"`
	Source  string            `river:"source,attr,optional"`
}

LogfmtConfig represents a logfmt Stage configuration

type MatchConfig added in v0.31.0

type MatchConfig struct {
	Selector     string        `river:"selector,attr"`
	Stages       []StageConfig `river:"stage,block,optional"`
	Action       string        `river:"action,attr,optional"`
	PipelineName string        `river:"pipeline_name,attr,optional"`
	DropReason   string        `river:"drop_counter_reason,attr,optional"`
}

MatchConfig contains the configuration for a matcherStage

type MultilineConfig added in v0.31.0

type MultilineConfig struct {
	Expression  string        `river:"firstline,attr"`
	MaxLines    uint64        `river:"max_lines,attr,optional"`
	MaxWaitTime time.Duration `river:"max_wait_time,attr,optional"`
	// contains filtered or unexported fields
}

MultilineConfig contains the configuration for a Multiline stage.

func (*MultilineConfig) UnmarshalRiver added in v0.31.0

func (args *MultilineConfig) UnmarshalRiver(f func(interface{}) error) error

UnmarshalRiver implements river.Unmarshaler, applying defaults and validating the provided config.

type OutputConfig added in v0.31.0

type OutputConfig struct {
	Source string `river:"source,attr"`
}

OutputConfig initializes a configuration stage which sets the log line to a value from the extracted map.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline pass down a log entry to each stage for mutation and/or label extraction.

func NewPipeline

func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer) (*Pipeline, error)

NewPipeline creates a new log entry pipeline from a configuration

func (*Pipeline) Name

func (p *Pipeline) Name() string

Name implements Stage

func (*Pipeline) Run

func (p *Pipeline) Run(in chan Entry) chan Entry

Run implements Stage

func (*Pipeline) Size

func (p *Pipeline) Size() int

Size gets the current number of stages in the pipeline

func (*Pipeline) Wrap

func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler

Wrap implements EntryMiddleware

type Processor

type Processor interface {
	Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string)
	Name() string
}

Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated timestamp and log entry

type RegexConfig added in v0.31.0

type RegexConfig struct {
	Expression string  `river:"expression,attr"`
	Source     *string `river:"source,attr,optional"`
}

RegexConfig configures a processing stage uses regular expressions to extract values from log lines into the shared values map.

type ReplaceConfig added in v0.31.0

type ReplaceConfig struct {
	Expression string `river:"expression,attr"`
	Source     string `river:"source,attr,optional"`
	Replace    string `river:"replace,attr,optional"`
}

ReplaceConfig contains a regexStage configuration

type Stage

type Stage interface {
	Name() string
	Run(chan Entry) chan Entry
}

Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel.

func New

func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometheus.Registerer) (Stage, error)

New creates a new stage for the given type and configuration.

func NewCRI added in v0.31.0

func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)

NewCRI creates a predefined pipeline for parsing entries in the CRI log format.

func NewDocker added in v0.31.0

func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error)

NewDocker creates a predefined pipeline for parsing entries in the Docker json log format.

type StageConfig

type StageConfig struct {
	// TODO (@tpaschalis) The fact that this type belongs to an internal package
	// _and_ is part of the loki.process component Arguments, means that an
	// external caller will not be able to construct the component Arguments by
	// hand. This will be fixed once we've gained confidence in the ported
	// processing stages and the package is made non-internal.
	JSONConfig         *JSONConfig         `river:"json,block,optional"`
	LogfmtConfig       *LogfmtConfig       `river:"logfmt,block,optional"`
	LabelsConfig       *LabelsConfig       `river:"labels,block,optional"`
	LabelAllowConfig   *LabelAllowConfig   `river:"label_keep,block,optional"`
	LabelDropConfig    *LabelDropConfig    `river:"label_drop,block,optional"`
	StaticLabelsConfig *StaticLabelsConfig `river:"static_labels,block,optional"`
	DockerConfig       *DockerConfig       `river:"docker,block,optional"`
	CRIConfig          *CRIConfig          `river:"cri,block,optional"`
	RegexConfig        *RegexConfig        `river:"regex,block,optional"`
	TimestampConfig    *TimestampConfig    `river:"timestamp,block,optional"`
	OutputConfig       *OutputConfig       `river:"output,block,optional"`
	ReplaceConfig      *ReplaceConfig      `river:"replace,block,optional"`
	MultilineConfig    *MultilineConfig    `river:"multiline,block,optional"`
	MatchConfig        *MatchConfig        `river:"match,block,optional"`
	DropConfig         *DropConfig         `river:"drop,block,optional"`
}

StageConfig defines a single stage in a processing pipeline. We define these as pointers types so we can use reflection to check that exactly one is set.

func (*StageConfig) UnmarshalRiver

func (arg *StageConfig) UnmarshalRiver(f func(interface{}) error) error

UnmarshalRiver implements river.Unmarshaler.

type StaticLabelsConfig added in v0.31.0

type StaticLabelsConfig struct {
	Values map[string]*string `river:"values,attr"`
}

StaticLabelsConfig contains a map of static labels to be set.

type TimestampConfig added in v0.31.0

type TimestampConfig struct {
	Source          string   `river:"source,attr"`
	Format          string   `river:"format,attr"`
	FallbackFormats []string `river:"fallback_formats,attr,optional"`
	Location        *string  `river:"location,attr,optional"`
	ActionOnFailure string   `river:"action_on_failure,attr,optional"`
}

TimestampConfig configures a processing stage for timestamp extraction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL