stages

package
v0.30.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrExpressionsRequired  = "JMES expression is required"
	ErrCouldNotCompileJMES  = "could not compile JMES expression"
	ErrEmptyJSONStageConfig = "empty json stage configuration"
	ErrEmptyJSONStageSource = "empty source"
	ErrMalformedJSON        = "malformed json"
)

Config Errors

View Source
const (
	ErrEmptyLabelStageConfig = "label stage config cannot be empty"
	ErrInvalidLabelName      = "invalid label name: %s"
)
View Source
const (
	StageTypeJSON         = "json"
	StageTypeLogfmt       = "logfmt"
	StageTypeRegex        = "regex"
	StageTypeReplace      = "replace"
	StageTypeMetric       = "metrics"
	StageTypeLabel        = "labels"
	StageTypeLabelDrop    = "labeldrop"
	StageTypeTimestamp    = "timestamp"
	StageTypeOutput       = "output"
	StageTypeDocker       = "docker"
	StageTypeCRI          = "cri"
	StageTypeMatch        = "match"
	StageTypeTemplate     = "template"
	StageTypePipeline     = "pipeline"
	StageTypeTenant       = "tenant"
	StageTypeDrop         = "drop"
	StageTypeLimit        = "limit"
	StageTypeMultiline    = "multiline"
	StageTypePack         = "pack"
	StageTypeLabelAllow   = "labelallow"
	StageTypeStaticLabels = "static_labels"
)

TODO(@tpaschalis) Let's use this as the list of stages we need to port over.

Variables

View Source
var (
	// Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level
	// so this global is used for that purpose. This allows us to skip allocations of log messages at the
	// debug level when debug level logging is not enabled. Log level allocations can become very expensive
	// as we log numerous log entries per log line at debug level.
	Debug = false

	// Inspect is used to debug promtail pipelines by showing diffs between pipeline stages
	Inspect = false
)

Functions

func RunWith

func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry

RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.

func RunWithSkip

func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry

RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.

func SetReadLineRateLimiter

func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool)

Types

type Entry

type Entry struct {
	Extracted map[string]interface{}
	loki.Entry
}

type JSONConfig

type JSONConfig struct {
	Expressions   map[string]string `river:"expressions,attr"`
	Source        *string           `river:"source,attr,optional"`
	DropMalformed bool              `river:"drop_malformed,attr,optional"`
}

JSONConfig represents a JSON Stage configuration

type LabelsConfig

type LabelsConfig struct {
	Values map[string]*string `river:"values,attr"`
}

LabelsConfig is a set of labels to be extracted

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline pass down a log entry to each stage for mutation and/or label extraction.

func NewPipeline

func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer) (*Pipeline, error)

NewPipeline creates a new log entry pipeline from a configuration

func (*Pipeline) Name

func (p *Pipeline) Name() string

Name implements Stage

func (*Pipeline) Run

func (p *Pipeline) Run(in chan Entry) chan Entry

Run implements Stage

func (*Pipeline) Size

func (p *Pipeline) Size() int

Size gets the current number of stages in the pipeline

func (*Pipeline) Wrap

func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler

Wrap implements EntryMiddleware

type Processor

type Processor interface {
	Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string)
	Name() string
}

Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated timestamp and log entry

type Stage

type Stage interface {
	Name() string
	Run(chan Entry) chan Entry
}

Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel.

func New

func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometheus.Registerer) (Stage, error)

New creates a new stage for the given type and configuration.

type StageConfig

type StageConfig struct {
	JSONConfig   *JSONConfig   `river:"json,block,optional"`
	LabelsConfig *LabelsConfig `river:"labels,block,optional"`
}

StageConfig defines a single stage in a processing pipeline. We define these as pointers types so we can use reflection to check that exactly one is set.

func (*StageConfig) UnmarshalRiver

func (arg *StageConfig) UnmarshalRiver(f func(interface{}) error) error

UnmarshalRiver implements river.Unmarshaler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL