stages

package
v1.6.2-0...-b66c343 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: AGPL-3.0 Imports: 48 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrDropStageEmptyConfig       = "drop stage config must contain at least one of `source`, `expression`, `older_than` or `longer_than`"
	ErrDropStageInvalidDuration   = "drop stage invalid duration, %v cannot be converted to a duration: %v"
	ErrDropStageInvalidConfig     = "drop stage config error, `value` and `expression` cannot both be defined at the same time."
	ErrDropStageInvalidRegex      = "drop stage regex compilation error: %v"
	ErrDropStageInvalidByteSize   = "drop stage failed to parse longer_than to bytes: %v"
	ErrDropStageInvalidSource     = "drop stage source invalid type should be string or list of strings"
	ErrDropStageNoSourceWithValue = "drop stage config must contain `source` if `value` is specified"
)
View Source
const (
	RFC3339Nano         = "RFC3339Nano"
	MaxPartialLinesSize = 100 // Max buffer size to hold partial lines.
)
View Source
const (
	ErrEmptyGeoIPStageConfig       = "geoip stage config cannot be empty"
	ErrEmptyDBPathGeoIPStageConfig = "db path cannot be empty"
	ErrEmptySourceGeoIPStageConfig = "source cannot be empty"
	ErrEmptyDBTypeGeoIPStageConfig = "db type should be either city or asn"
)
View Source
const (
	ErrExpressionsRequired  = "JMES expression is required"
	ErrCouldNotCompileJMES  = "could not compile JMES expression"
	ErrEmptyJSONStageConfig = "empty json stage configuration"
	ErrEmptyJSONStageSource = "empty source"
	ErrMalformedJSON        = "malformed json"
)

Config Errors

View Source
const (
	ErrEmptyLabelStageConfig = "label stage config cannot be empty"
	ErrInvalidLabelName      = "invalid label name: %s"
)
View Source
const (
	ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst"
	ErrLimitStageByLabelMustDrop    = "When ratelimiting by label, drop must be true"
	MinReasonableMaxDistinctLabels  = 10000 // 80bytes per rate.Limiter ~ 1MiB memory
)
View Source
const (
	ErrMappingRequired        = "logfmt mapping is required"
	ErrEmptyLogfmtStageConfig = "empty logfmt stage configuration"
	ErrEmptyLogfmtStageSource = "empty source"
)

Config Errors

View Source
const (
	ErrEmptyMatchStageConfig = "match stage config cannot be empty"
	ErrPipelineNameRequired  = "match stage pipeline name can be omitted but cannot be an empty string"
	ErrSelectorRequired      = "selector statement required for match stage"
	ErrMatchRequiresStages   = "match stage requires at least one additional stage to be defined in '- stages'"
	ErrSelectorSyntax        = "invalid selector syntax for match stage"
	ErrStagesWithDropLine    = "match stage configured to drop entries cannot contains stages"
	ErrUnknownMatchAction    = "match stage action should be 'keep' or 'drop'"
	MatchActionKeep          = "keep"
	MatchActionDrop          = "drop"
)
View Source
const (
	MetricTypeCounter   = "counter"
	MetricTypeGauge     = "gauge"
	MetricTypeHistogram = "histogram"

	ErrEmptyMetricsStageConfig = "empty metric stage configuration"
	ErrMetricsStageInvalidType = "invalid metric type '%s', metric type must be one of 'counter', 'gauge', or 'histogram'"
	ErrInvalidIdleDur          = "max_idle_duration could not be parsed as a time.Duration: '%s'"
	ErrSubSecIdleDur           = "max_idle_duration less than 1s not allowed"
)
View Source
const (
	ErrMultilineStageEmptyConfig        = "multiline stage config must define `firstline` regular expression"
	ErrMultilineStageInvalidRegex       = "multiline stage first line regex compilation error: %v"
	ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v"
)
View Source
const (
	ErrEmptyOutputStageConfig = "output stage config cannot be empty"
	ErrOutputSourceRequired   = "output source value is required if output is specified"
)

Config Errors

View Source
const (
	ErrExpressionRequired    = "expression is required"
	ErrCouldNotCompileRegex  = "could not compile regular expression"
	ErrEmptyRegexStageConfig = "empty regex stage configuration"
	ErrEmptyRegexStageSource = "empty source"
)

Config Errors

View Source
const (
	ErrEmptyReplaceStageConfig = "empty replace stage configuration"
	ErrEmptyReplaceStageSource = "empty source in replace stage"
)

Config Errors

View Source
const (
	StageTypeJSON            = "json"
	StageTypeLogfmt          = "logfmt"
	StageTypeRegex           = "regex"
	StageTypeReplace         = "replace"
	StageTypeMetric          = "metrics"
	StageTypeLabel           = "labels"
	StageTypeLabelDrop       = "labeldrop"
	StageTypeTimestamp       = "timestamp"
	StageTypeOutput          = "output"
	StageTypeDocker          = "docker"
	StageTypeCRI             = "cri"
	StageTypeMatch           = "match"
	StageTypeTemplate        = "template"
	StageTypePipeline        = "pipeline"
	StageTypeTenant          = "tenant"
	StageTypeDrop            = "drop"
	StageTypeSampling        = "sampling"
	StageTypeLimit           = "limit"
	StageTypeMultiline       = "multiline"
	StageTypePack            = "pack"
	StageTypeLabelAllow      = "labelallow"
	StageTypeStaticLabels    = "static_labels"
	StageTypeDecolorize      = "decolorize"
	StageTypeEventLogMessage = "eventlogmessage"
	StageTypeGeoIP           = "geoip"
	// Deprecated. Renamed to `structured_metadata`. Will be removed after the migration.
	StageTypeNonIndexedLabels   = "non_indexed_labels"
	StageTypeStructuredMetadata = "structured_metadata"
)
View Source
const (
	ErrEmptyTemplateStageConfig = "template stage config cannot be empty"
	ErrTemplateSourceRequired   = "template source value is required"
)

Config Errors

View Source
const (
	ErrTenantStageEmptyLabelSourceOrValue        = "label, source or value config are required"
	ErrTenantStageConflictingLabelSourceAndValue = "label, source and value are mutually exclusive: you should set source, value or label but not all"
)
View Source
const (
	ErrEmptyTimestampStageConfig = "timestamp stage config cannot be empty"
	ErrTimestampSourceRequired   = "timestamp source value is required if timestamp is specified"
	ErrTimestampFormatRequired   = "timestamp format is required"
	ErrInvalidLocation           = "invalid location specified: %v"
	ErrInvalidActionOnFailure    = "invalid action on failure (supported values are %v)"
	ErrTimestampSourceMissing    = "extracted data did not contain a timestamp"
	ErrTimestampConversionFailed = "failed to convert extracted time to string"
	ErrTimestampParsingFailed    = "failed to parse time"

	Unix   = "Unix"
	UnixMs = "UnixMs"
	UnixUs = "UnixUs"
	UnixNs = "UnixNs"

	TimestampActionOnFailureSkip    = "skip"
	TimestampActionOnFailureFudge   = "fudge"
	TimestampActionOnFailureDefault = TimestampActionOnFailureFudge
)
View Source
const (
	ErrEmptyEvtLogMsgStageConfig = "empty event log message stage configuration"
)
View Source
const (
	// ErrEmptyLabelAllowStageConfig error returned if config is empty
	ErrEmptyLabelAllowStageConfig = "labelallow stage config cannot be empty"
)
View Source
const (
	// ErrEmptyLabelDropStageConfig error returned if config is empty
	ErrEmptyLabelDropStageConfig = "labeldrop stage config cannot be empty"
)
View Source
const (
	// ErrEmptyStaticLabelStageConfig error returned if config is empty
	ErrEmptyStaticLabelStageConfig = "static_labels stage config cannot be empty"
)
View Source
const (
	ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
View Source
const (
	ErrTimestampContainsYear = "timestamp '%s' is expected to not contain the year date component"
)

Variables

View Source
var (
	// Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level
	// so this global is used for that purpose. This allows us to skip allocations of log messages at the
	// debug level when debug level logging is not enabled. Log level allocations can become very expensive
	// as we log numerous log entries per log line at debug level.
	Debug = false

	// Inspect is used to debug promtail pipelines by showing diffs between pipeline stages
	Inspect = false
)
View Source
var (
	TimestampActionOnFailureOptions = []string{TimestampActionOnFailureSkip, TimestampActionOnFailureFudge}
)

Functions

func RunWith

func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry

RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.

func RunWithSkip

func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry

RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.

func RunWithSkipOrSendMany

func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry

RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.

func SanitizeFullLabelName

func SanitizeFullLabelName(input string) string

Sanitize a input string to convert it into a valid prometheus label TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName

func SetReadLineRateLimiter

func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool)

Types

type CriConfig

type CriConfig struct {
	MaxPartialLines            int              `mapstructure:"max_partial_lines"`
	MaxPartialLineSize         flagext.ByteSize `mapstructure:"max_partial_line_size"`
	MaxPartialLineSizeTruncate bool             `mapstructure:"max_partial_line_size_truncate"`
}

CriConfig contains the configuration for the cri stage

type DropConfig

type DropConfig struct {
	DropReason *string     `mapstructure:"drop_counter_reason"`
	Source     interface{} `mapstructure:"source"`

	Value      *string `mapstructure:"value"`
	Separator  *string `mapstructure:"separator"`
	Expression *string `mapstructure:"expression"`

	OlderThan *string `mapstructure:"older_than"`

	LongerThan *string `mapstructure:"longer_than"`
	// contains filtered or unexported fields
}

DropConfig contains the configuration for a dropStage

type Entry

type Entry struct {
	Extracted map[string]interface{}
	api.Entry
}

type EventLogMessageConfig

type EventLogMessageConfig struct {
	Source            *string `mapstructure:"source"`
	DropInvalidLabels bool    `mapstructure:"drop_invalid_labels"`
	OverwriteExisting bool    `mapstructure:"overwrite_existing"`
}

type GeoIPConfig

type GeoIPConfig struct {
	DB     string  `mapstructure:"db"`
	Source *string `mapstructure:"source"`
	DBType string  `mapstructure:"db_type"`
}

GeoIPConfig represents GeoIP stage config

type GeoIPFields

type GeoIPFields int
const (
	CITYNAME GeoIPFields = iota
	COUNTRYNAME
	CONTINENTNAME
	CONTINENTCODE
	LOCATION
	POSTALCODE
	TIMEZONE
	SUBDIVISIONNAME
	SUBDIVISIONCODE
)

type JSONConfig

type JSONConfig struct {
	Expressions   map[string]string `mapstructure:"expressions"`
	Source        *string           `mapstructure:"source"`
	DropMalformed bool              `mapstructure:"drop_malformed"`
}

JSONConfig represents a JSON Stage configuration

type LabelAllowConfig

type LabelAllowConfig []string

labelallowConfig is a slice of labels to be included

type LabelDropConfig

type LabelDropConfig []string

LabelDropConfig is a slice of labels to be dropped

type LabelsConfig

type LabelsConfig map[string]*string

LabelsConfig is a set of labels to be extracted

type LimitConfig

type LimitConfig struct {
	Rate              float64 `mapstructure:"rate"`
	Burst             int     `mapstructure:"burst"`
	Drop              bool    `mapstructure:"drop"`
	ByLabelName       string  `mapstructure:"by_label_name"`
	MaxDistinctLabels int     `mapstructure:"max_distinct_labels"`
}

type LogfmtConfig

type LogfmtConfig struct {
	Mapping map[string]string `mapstructure:"mapping"`
	Source  *string           `mapstructure:"source"`
}

LogfmtConfig represents a logfmt Stage configuration

type MatcherConfig

type MatcherConfig struct {
	PipelineName *string        `mapstructure:"pipeline_name"`
	Selector     string         `mapstructure:"selector"`
	Stages       PipelineStages `mapstructure:"stages"`
	Action       string         `mapstructure:"action"`
	DropReason   *string        `mapstructure:"drop_counter_reason"`
}

MatcherConfig contains the configuration for a matcherStage

type MetricConfig

type MetricConfig struct {
	MetricType   string  `mapstructure:"type"`
	Description  string  `mapstructure:"description"`
	Source       *string `mapstructure:"source"`
	Prefix       string  `mapstructure:"prefix"`
	IdleDuration *string `mapstructure:"max_idle_duration"`

	Config interface{} `mapstructure:"config"`
	// contains filtered or unexported fields
}

MetricConfig is a single metrics configuration.

type MetricsConfig

type MetricsConfig map[string]MetricConfig

MetricsConfig is a set of configured metrics.

type MultilineConfig

type MultilineConfig struct {
	Expression *string `mapstructure:"firstline"`

	MaxLines    *uint64 `mapstructure:"max_lines"`
	MaxWaitTime *string `mapstructure:"max_wait_time"`
	// contains filtered or unexported fields
}

MultilineConfig contains the configuration for a multilineStage

type OutputConfig

type OutputConfig struct {
	Source string `mapstructure:"source"`
}

OutputConfig configures output value extraction

type PackConfig

type PackConfig struct {
	Labels          []string `mapstrcuture:"labels"`
	IngestTimestamp *bool    `mapstructure:"ingest_timestamp"`
}

PackConfig contains the configuration for a packStage

type Packed

type Packed struct {
	Labels map[string]string `json:",inline"`
	Entry  string            `json:"_entry"`
}

func (Packed) MarshalJSON

func (w Packed) MarshalJSON() ([]byte, error)

MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object

func (*Packed) UnmarshalJSON

func (w *Packed) UnmarshalJSON(data []byte) error

UnmarshalJSON populates a Packed struct where every key except the _entry key is added to the Labels field

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline pass down a log entry to each stage for mutation and/or label extraction.

func NewPipeline

func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error)

NewPipeline creates a new log entry pipeline from a configuration

func (*Pipeline) Name

func (p *Pipeline) Name() string

Name implements Stage

func (*Pipeline) Run

func (p *Pipeline) Run(in chan Entry) chan Entry

Run implements Stage

func (*Pipeline) Size

func (p *Pipeline) Size() int

Size gets the current number of stages in the pipeline

func (*Pipeline) Wrap

func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler

Wrap implements EntryMiddleware

type PipelineStage

type PipelineStage = map[interface{}]interface{}

PipelineStage contains configuration for a single pipeline stage

type PipelineStages

type PipelineStages = []interface{}

PipelineStages contains configuration for each stage within a pipeline

type Processor

type Processor interface {
	Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string)
	Name() string
}

Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated timestamp and log entry

type RegexConfig

type RegexConfig struct {
	Expression string  `mapstructure:"expression"`
	Source     *string `mapstructure:"source"`
}

RegexConfig contains a regexStage configuration

type ReplaceConfig

type ReplaceConfig struct {
	Expression string  `mapstructure:"expression"`
	Source     *string `mapstructure:"source"`
	Replace    string  `mapstructure:"replace"`
}

ReplaceConfig contains a regexStage configuration

type SamplingConfig

type SamplingConfig struct {
	DropReason *string `mapstructure:"drop_counter_reason"`
	//
	SamplingRate float64 `mapstructure:"rate"`
}

SamplingConfig contains the configuration for a samplingStage

type Stage

type Stage interface {
	Name() string
	Run(chan Entry) chan Entry
}

Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel.

func New

func New(logger log.Logger, jobName *string, stageType string,
	cfg interface{}, registerer prometheus.Registerer) (Stage, error)

New creates a new stage for the given type and configuration.

func NewCRI

func NewCRI(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error)

NewCRI creates a CRI format specific pipeline stage

func NewDocker

func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error)

NewDocker creates a Docker json log format specific pipeline stage.

type StageCreationParams

type StageCreationParams struct {
	// contains filtered or unexported fields
}

type StaticLabelConfig

type StaticLabelConfig map[string]*string

StaticLabelConfig is a slice of static-labels to be included

type StaticLabelStage

type StaticLabelStage struct {
	// contains filtered or unexported fields
}

func (*StaticLabelStage) Name

func (l *StaticLabelStage) Name() string

Name implements Stage

func (*StaticLabelStage) Process

func (l *StaticLabelStage) Process(labels model.LabelSet, _ map[string]interface{}, _ *time.Time, _ *string)

Process implements Stage

type TemplateConfig

type TemplateConfig struct {
	Source   string `mapstructure:"source"`
	Template string `mapstructure:"template"`
}

TemplateConfig configures template value extraction

type TenantConfig

type TenantConfig struct {
	Label  string `mapstructure:"label"`
	Source string `mapstructure:"source"`
	Value  string `mapstructure:"value"`
}

type TimestampConfig

type TimestampConfig struct {
	Source          string   `mapstructure:"source"`
	Format          string   `mapstructure:"format"`
	FallbackFormats []string `mapstructure:"fallback_formats"`
	Location        *string  `mapstructure:"location"`
	ActionOnFailure *string  `mapstructure:"action_on_failure"`
}

TimestampConfig configures timestamp extraction

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL