Documentation ¶
Index ¶
- Constants
- Variables
- func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry
- func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry
- func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry
- func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool)
- type CRIConfig
- type DockerConfig
- type DropConfig
- type Entry
- type GenerationalMap
- type GeoIPConfig
- type GeoIPFields
- type JSONConfig
- type LabelAllowConfig
- type LabelDropConfig
- type LabelsConfig
- type LimitConfig
- type LogfmtConfig
- type MatchConfig
- type MetricConfig
- type MetricsConfig
- type MultilineConfig
- type OutputConfig
- type PackConfig
- type Packed
- type Pipeline
- type Processor
- type RegexConfig
- type ReplaceConfig
- type Stage
- type StageConfig
- type StaticLabelsConfig
- type TemplateConfig
- type TenantConfig
- type TimestampConfig
Constants ¶
const ( RFC3339Nano = "RFC3339Nano" MaxPartialLinesSize = 100 // MaxPartialLinesSize is the max buffer size to hold partial lines when parsing the CRI stage format.lines. )
const ( ErrExpressionsRequired = "JMES expression is required" ErrCouldNotCompileJMES = "could not compile JMES expression" ErrEmptyJSONStageConfig = "empty json stage configuration" ErrEmptyJSONStageSource = "empty source" ErrMalformedJSON = "malformed json" )
Config Errors
const ( ErrEmptyLabelStageConfig = "label stage config cannot be empty" ErrInvalidLabelName = "invalid label name: %s" )
const ( MetricTypeCounter = "counter" MetricTypeGauge = "gauge" MetricTypeHistogram = "histogram" )
Metric types.
const ( StageTypeJSON = "json" StageTypeLogfmt = "logfmt" StageTypeRegex = "regex" StageTypeReplace = "replace" StageTypeMetric = "metrics" StageTypeLabel = "labels" StageTypeLabelDrop = "labeldrop" StageTypeTimestamp = "timestamp" StageTypeOutput = "output" StageTypeDocker = "docker" StageTypeCRI = "cri" StageTypeMatch = "match" StageTypeTemplate = "template" StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" StageTypeLimit = "limit" StageTypeMultiline = "multiline" StageTypePack = "pack" StageTypeLabelAllow = "labelallow" StageTypeStaticLabels = "static_labels" StageTypeGeoIP = "geoip" )
TODO(@tpaschalis) Let's use this as the list of stages we need to port over.
const (
ErrTimestampContainsYear = "timestamp '%s' is expected to not contain the year date component"
)
const MinReasonableMaxDistinctLabels = 10000 // 80bytes per rate.Limiter ~ 1MiB memory
MinReasonableMaxDistinctLabels provides a sensible default.
const ReservedLabelTenantID = "__tenant_id__"
ReservedLabelTenantID is a shared value used to refer to the tenant ID.
Variables ¶
var ( ErrDropStageEmptyConfig = errors.New("drop stage config must contain at least one of `source`, `expression`, `older_than` or `longer_than`") ErrDropStageInvalidConfig = errors.New("drop stage config error, `value` and `expression` cannot both be defined at the same time") ErrDropStageInvalidRegex = errors.New("drop stage regex compilation error") )
Configuration errors.
var ( ErrEmptyGeoIPStageConfig = errors.New("geoip stage config cannot be empty") ErrEmptyDBPathGeoIPStageConfig = errors.New("db path cannot be empty") ErrEmptySourceGeoIPStageConfig = errors.New("source cannot be empty") ErrEmptyDBTypeGeoIPStageConfig = errors.New("db type should be either city or asn") ErrEmptyDBTypeAndValuesGeoIPStageConfig = errors.New("db type or values need to be set") )
var ( ErrLimitStageInvalidRateOrBurst = errors.New("limit stage failed to parse rate or burst") ErrLimitStageByLabelMustDrop = errors.New("When ratelimiting by label, drop must be true") )
Configuration errors.
var ( ErrMappingRequired = errors.New("logfmt mapping is required") ErrEmptyLogfmtStageConfig = errors.New("empty logfmt stage configuration") ErrEmptyLogfmtStageSource = errors.New("empty source") )
Config Errors
var ( ErrEmptyMatchStageConfig = errors.New("match stage config cannot be empty") ErrPipelineNameRequired = errors.New("match stage pipeline name can be omitted but cannot be an empty string") ErrSelectorRequired = errors.New("selector statement required for match stage") ErrMatchRequiresStages = errors.New("match stage requires at least one additional stage to be defined in '- stages'") ErrSelectorSyntax = errors.New("invalid selector syntax for match stage") ErrStagesWithDropLine = errors.New("match stage configured to drop entries cannot contains stages") ErrUnknownMatchAction = errors.New("match stage action should be 'keep' or 'drop'") MatchActionKeep = "keep" MatchActionDrop = "drop" )
Configuration errors.
var ( ErrEmptyMetricsStageConfig = errors.New("empty metric stage configuration") ErrMetricsStageInvalidType = errors.New("invalid metric type: must be one of 'counter', 'gauge', or 'histogram'") ErrInvalidIdleDur = errors.New("max_idle_duration could not be parsed as a time.Duration") ErrSubSecIdleDur = errors.New("max_idle_duration less than 1s not allowed") )
Configuration errors.
var ( ErrMultilineStageEmptyConfig = errors.New("multiline stage config must define `firstline` regular expression") ErrMultilineStageInvalidRegex = errors.New("multiline stage first line regex compilation error") ErrMultilineStageInvalidMaxWaitTime = errors.New("multiline stage `max_wait_time` parse error") )
Configuration errors.
var ( ErrEmptyOutputStageConfig = errors.New("output stage config cannot be empty") ErrOutputSourceRequired = errors.New("output source value is required if output is specified") )
Config Errors.
var ( ErrExpressionRequired = errors.New("expression is required") ErrCouldNotCompileRegex = errors.New("could not compile regular expression") ErrEmptyRegexStageSource = errors.New("empty source") )
Config Errors.
var ( ErrEmptyReplaceStageConfig = errors.New("empty replace stage configuration") ErrEmptyReplaceStageSource = errors.New("empty source in replace stage") )
Config Errors
var ( ErrEmptyTemplateStageConfig = errors.New("template stage config cannot be empty") ErrTemplateSourceRequired = errors.New("template source value is required") )
Config Errors.
var ( ErrTenantStageEmptyLabelSourceOrValue = errors.New("label, source or value config are required") ErrTenantStageConflictingLabelSourceAndValue = errors.New("label, source and value are mutually exclusive: you should set source, value or label but not all") )
Configuration errors.
var ( ErrEmptyTimestampStageConfig = errors.New("timestamp stage config cannot be empty") ErrTimestampSourceRequired = errors.New("timestamp source value is required if timestamp is specified") ErrTimestampFormatRequired = errors.New("timestamp format is required") ErrInvalidLocation = errors.New("invalid location specified: %v") ErrInvalidActionOnFailure = errors.New("invalid action on failure (supported values are %v)") ErrTimestampSourceMissing = errors.New("extracted data did not contain a timestamp") ErrTimestampConversionFailed = errors.New("failed to convert extracted time to string") ErrTimestampParsingFailed = errors.New("failed to parse time") Unix = "Unix" UnixMs = "UnixMs" UnixUs = "UnixUs" UnixNs = "UnixNs" TimestampActionOnFailureSkip = "skip" TimestampActionOnFailureFudge = "fudge" TimestampActionOnFailureDefault = TimestampActionOnFailureFudge )
Config errors.
var ( // Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level // so this global is used for that purpose. This allows us to skip allocations of log messages at the // debug level when debug level logging is not enabled. Log level allocations can become very expensive // as we log numerous log entries per log line at debug level. Debug = false // Inspect is used to debug promtail pipelines by showing diffs between pipeline stages Inspect = false )
var DefaultMultilineConfig = MultilineConfig{ MaxLines: 128, MaxWaitTime: 3 * time.Second, }
DefaultMultilineConfig applies the default values on
var DefaultPackConfig = PackConfig{ IngestTimestamp: true, }
DefaultPackConfig sets the defaults.
var ErrEmptyLabelAllowStageConfig = errors.New("labelallow stage config cannot be empty")
ErrEmptyLabelAllowStageConfig error is returned if the config is empty.
var ErrEmptyLabelDropStageConfig = errors.New("labeldrop stage config cannot be empty")
ErrEmptyLabelDropStageConfig error returned if the config is empty.
var ErrEmptyStaticLabelStageConfig = errors.New("static_labels stage config cannot be empty")
ErrEmptyStaticLabelStageConfig error returned if the config is empty.
var TimestampActionOnFailureOptions = []string{TimestampActionOnFailureSkip, TimestampActionOnFailureFudge}
TimestampActionOnFailureOptions defines the available options for the `action_on_failure` field.
Functions ¶
func RunWith ¶
RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.
func RunWithSkip ¶
RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.
func RunWithSkipOrSendMany ¶
RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.
func SetReadLineRateLimiter ¶
Types ¶
type CRIConfig ¶
type CRIConfig struct{}
CRIConfig is an empty struct that is used to enable a pre-defined pipeline for decoding entries that are using the CRI logging format.
type DockerConfig ¶
type DockerConfig struct{}
DockerConfig is an empty struct that is used to enable a pre-defined pipeline for decoding entries that are using the Docker logs format.
type DropConfig ¶
type DropConfig struct { DropReason string `river:"drop_counter_reason,attr,optional"` Source string `river:"source,attr,optional"` Value string `river:"value,attr,optional"` Expression string `river:"expression,attr,optional"` OlderThan time.Duration `river:"older_than,attr,optional"` LongerThan units.Base2Bytes `river:"longer_than,attr,optional"` // contains filtered or unexported fields }
DropConfig contains the configuration for a dropStage
type GenerationalMap ¶
type GenerationalMap[K comparable, V any] struct { // contains filtered or unexported fields }
GenerationalMap is ported from Loki's pkg/util package. It didn't exist in our dependency at the time, so I copied the implementation over.
func NewGenMap ¶
func NewGenMap[K comparable, V any](maxSize int, newV func() V, gcCb func()) GenerationalMap[K, V]
NewGenMap created which maintains at most maxSize recently used entries
func (*GenerationalMap[K, T]) GetOrCreate ¶
func (m *GenerationalMap[K, T]) GetOrCreate(key K) T
type GeoIPConfig ¶
type GeoIPConfig struct { DB string `river:"db,attr"` Source *string `river:"source,attr"` DBType string `river:"db_type,attr,optional"` CustomLookups map[string]string `river:"custom_lookups,attr,optional"` }
GeoIPConfig represents GeoIP stage config
type GeoIPFields ¶
type GeoIPFields int
const ( CITYNAME GeoIPFields = iota COUNTRYNAME CONTINENTNAME CONTINENTCODE LOCATION POSTALCODE TIMEZONE SUBDIVISIONNAME SUBDIVISIONCODE )
type JSONConfig ¶
type JSONConfig struct { Expressions map[string]string `river:"expressions,attr"` Source *string `river:"source,attr,optional"` DropMalformed bool `river:"drop_malformed,attr,optional"` }
JSONConfig represents a JSON Stage configuration
type LabelAllowConfig ¶
type LabelAllowConfig struct {
Values []string `river:"values,attr"`
}
LabelAllowConfig contains the slice of labels to allow through.
type LabelDropConfig ¶
type LabelDropConfig struct {
Values []string `river:"values,attr"`
}
LabelDropConfig contains the slice of labels to be dropped.
type LabelsConfig ¶
LabelsConfig is a set of labels to be extracted
type LimitConfig ¶
type LimitConfig struct { Rate float64 `river:"rate,attr"` Burst int `river:"burst,attr"` Drop bool `river:"drop,attr,optional"` ByLabelName string `river:"by_label_name,attr,optional"` MaxDistinctLabels int `river:"max_distinct_labels,attr,optional"` }
LimitConfig sets up a Limit stage.
type LogfmtConfig ¶
type LogfmtConfig struct { Mapping map[string]string `river:"mapping,attr"` Source string `river:"source,attr,optional"` }
LogfmtConfig represents a logfmt Stage configuration
type MatchConfig ¶
type MatchConfig struct { Selector string `river:"selector,attr"` Stages []StageConfig `river:"stage,enum,optional"` Action string `river:"action,attr,optional"` PipelineName string `river:"pipeline_name,attr,optional"` DropReason string `river:"drop_counter_reason,attr,optional"` }
MatchConfig contains the configuration for a matcherStage
type MetricConfig ¶
type MetricConfig struct { Counter *metric.CounterConfig `river:"counter,block,optional"` Gauge *metric.GaugeConfig `river:"gauge,block,optional"` Histogram *metric.HistogramConfig `river:"histogram,block,optional"` }
MetricConfig is a single metrics configuration. TODO(@tpaschalis) Rework once River squashing is implemented.
type MetricsConfig ¶
type MetricsConfig struct {
Metrics []MetricConfig `river:"metric,enum,optional"`
}
MetricsConfig is a set of configured metrics.
type MultilineConfig ¶
type MultilineConfig struct { Expression string `river:"firstline,attr"` MaxLines uint64 `river:"max_lines,attr,optional"` MaxWaitTime time.Duration `river:"max_wait_time,attr,optional"` // contains filtered or unexported fields }
MultilineConfig contains the configuration for a Multiline stage.
func (*MultilineConfig) SetToDefault ¶
func (args *MultilineConfig) SetToDefault()
SetToDefault implements river.Defaulter.
func (*MultilineConfig) Validate ¶
func (args *MultilineConfig) Validate() error
Validate implements river.Validator.
type OutputConfig ¶
type OutputConfig struct {
Source string `river:"source,attr"`
}
OutputConfig initializes a configuration stage which sets the log line to a value from the extracted map.
type PackConfig ¶
type PackConfig struct { Labels []string `river:"labels,attr"` IngestTimestamp bool `river:"ingest_timestamp,attr,optional"` }
PackConfig contains the configuration for a packStage
func (*PackConfig) SetToDefault ¶
func (p *PackConfig) SetToDefault()
SetToDefault implements river.Defaulter.
type Packed ¶
Packed keeps track of the labels and log entry.
func (Packed) MarshalJSON ¶
MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object
func (*Packed) UnmarshalJSON ¶
UnmarshalJSON populates a Packed struct where every key except the _entry key is added to the Labels field
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline pass down a log entry to each stage for mutation and/or label extraction.
func NewPipeline ¶
func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer) (*Pipeline, error)
NewPipeline creates a new log entry pipeline from a configuration
func (*Pipeline) Wrap ¶
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler
Wrap implements EntryMiddleware
type Processor ¶
type Processor interface { Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) Name() string }
Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated timestamp and log entry
type RegexConfig ¶
type RegexConfig struct { Expression string `river:"expression,attr"` Source *string `river:"source,attr,optional"` }
RegexConfig configures a processing stage uses regular expressions to extract values from log lines into the shared values map.
type ReplaceConfig ¶
type ReplaceConfig struct { Expression string `river:"expression,attr"` Source string `river:"source,attr,optional"` Replace string `river:"replace,attr,optional"` }
ReplaceConfig contains a regexStage configuration
type Stage ¶
Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel.
func New ¶
func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometheus.Registerer) (Stage, error)
New creates a new stage for the given type and configuration.
func NewCRI ¶
func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
NewCRI creates a predefined pipeline for parsing entries in the CRI log format.
func NewDocker ¶
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
NewDocker creates a predefined pipeline for parsing entries in the Docker json log format.
type StageConfig ¶
type StageConfig struct { JSONConfig *JSONConfig `river:"json,block,optional"` LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"` LabelsConfig *LabelsConfig `river:"labels,block,optional"` LabelAllowConfig *LabelAllowConfig `river:"label_keep,block,optional"` LabelDropConfig *LabelDropConfig `river:"label_drop,block,optional"` StaticLabelsConfig *StaticLabelsConfig `river:"static_labels,block,optional"` DockerConfig *DockerConfig `river:"docker,block,optional"` CRIConfig *CRIConfig `river:"cri,block,optional"` RegexConfig *RegexConfig `river:"regex,block,optional"` TimestampConfig *TimestampConfig `river:"timestamp,block,optional"` OutputConfig *OutputConfig `river:"output,block,optional"` ReplaceConfig *ReplaceConfig `river:"replace,block,optional"` MultilineConfig *MultilineConfig `river:"multiline,block,optional"` MatchConfig *MatchConfig `river:"match,block,optional"` DropConfig *DropConfig `river:"drop,block,optional"` PackConfig *PackConfig `river:"pack,block,optional"` TemplateConfig *TemplateConfig `river:"template,block,optional"` TenantConfig *TenantConfig `river:"tenant,block,optional"` LimitConfig *LimitConfig `river:"limit,block,optional"` MetricsConfig *MetricsConfig `river:"metrics,block,optional"` GeoIPConfig *GeoIPConfig `river:"geoip,block,optional"` }
StageConfig defines a single stage in a processing pipeline. We define these as pointers types so we can use reflection to check that exactly one is set.
type StaticLabelsConfig ¶
StaticLabelsConfig contains a map of static labels to be set.
type TemplateConfig ¶
type TemplateConfig struct { Source string `river:"source,attr"` Template string `river:"template,attr"` }
TemplateConfig configures template value extraction.
type TenantConfig ¶
type TenantConfig struct { Label string `river:"label,attr,optional"` Source string `river:"source,attr,optional"` Value string `river:"value,attr,optional"` }
TenantConfig configures a tenant stage.
type TimestampConfig ¶
type TimestampConfig struct { Source string `river:"source,attr"` Format string `river:"format,attr"` FallbackFormats []string `river:"fallback_formats,attr,optional"` Location *string `river:"location,attr,optional"` ActionOnFailure string `river:"action_on_failure,attr,optional"` }
TimestampConfig configures a processing stage for timestamp extraction.