Documentation
¶
Index ¶
- Constants
- Variables
- func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry
- func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry
- func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool)
- type Entry
- type JSONConfig
- type LabelsConfig
- type Pipeline
- type Processor
- type Stage
- type StageConfig
Constants ¶
const ( ErrExpressionsRequired = "JMES expression is required" ErrCouldNotCompileJMES = "could not compile JMES expression" ErrEmptyJSONStageConfig = "empty json stage configuration" ErrEmptyJSONStageSource = "empty source" ErrMalformedJSON = "malformed json" )
Config Errors
const ( ErrEmptyLabelStageConfig = "label stage config cannot be empty" ErrInvalidLabelName = "invalid label name: %s" )
const ( StageTypeJSON = "json" StageTypeLogfmt = "logfmt" StageTypeRegex = "regex" StageTypeReplace = "replace" StageTypeMetric = "metrics" StageTypeLabel = "labels" StageTypeLabelDrop = "labeldrop" StageTypeTimestamp = "timestamp" StageTypeOutput = "output" StageTypeDocker = "docker" StageTypeCRI = "cri" StageTypeMatch = "match" StageTypeTemplate = "template" StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" StageTypeLimit = "limit" StageTypeMultiline = "multiline" StageTypePack = "pack" StageTypeLabelAllow = "labelallow" StageTypeStaticLabels = "static_labels" )
TODO(@tpaschalis) Let's use this as the list of stages we need to port over.
Variables ¶
var ( // Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level // so this global is used for that purpose. This allows us to skip allocations of log messages at the // debug level when debug level logging is not enabled. Log level allocations can become very expensive // as we log numerous log entries per log line at debug level. Debug = false // Inspect is used to debug promtail pipelines by showing diffs between pipeline stages Inspect = false )
Functions ¶
func RunWith ¶
RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.
func RunWithSkip ¶
RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.
func SetReadLineRateLimiter ¶
Types ¶
type JSONConfig ¶
type JSONConfig struct { Expressions map[string]string `river:"expressions,attr"` Source *string `river:"source,attr,optional"` DropMalformed bool `river:"drop_malformed,attr,optional"` }
JSONConfig represents a JSON Stage configuration
type LabelsConfig ¶
LabelsConfig is a set of labels to be extracted
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline pass down a log entry to each stage for mutation and/or label extraction.
func NewPipeline ¶
func NewPipeline(logger log.Logger, stages []StageConfig, jobName *string, registerer prometheus.Registerer) (*Pipeline, error)
NewPipeline creates a new log entry pipeline from a configuration
func (*Pipeline) Wrap ¶
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler
Wrap implements EntryMiddleware
type Processor ¶
type Processor interface { Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) Name() string }
Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated timestamp and log entry
type Stage ¶
Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel.
func New ¶
func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometheus.Registerer) (Stage, error)
New creates a new stage for the given type and configuration.
type StageConfig ¶
type StageConfig struct { JSONConfig *JSONConfig `river:"json,block,optional"` LabelsConfig *LabelsConfig `river:"labels,block,optional"` }
StageConfig defines a single stage in a processing pipeline. We define these as pointers types so we can use reflection to check that exactly one is set.
func (*StageConfig) UnmarshalRiver ¶
func (arg *StageConfig) UnmarshalRiver(f func(interface{}) error) error
UnmarshalRiver implements river.Unmarshaler.