Documentation ¶
Index ¶
- Constants
- Variables
- func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry
- func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry
- func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry
- func SanitizeFullLabelName(input string) string
- func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool)
- type CriConfig
- type DropConfig
- type Entry
- type EventLogMessageConfig
- type GeoIPConfig
- type GeoIPFields
- type JSONConfig
- type LabelAllowConfig
- type LabelDropConfig
- type LabelsConfig
- type LimitConfig
- type LogfmtConfig
- type MatcherConfig
- type MetricConfig
- type MetricsConfig
- type MultilineConfig
- type OutputConfig
- type PackConfig
- type Packed
- type Pipeline
- type PipelineStage
- type PipelineStages
- type Processor
- type RegexConfig
- type ReplaceConfig
- type SamplingConfig
- type Stage
- type StageCreationParams
- type StaticLabelConfig
- type StaticLabelStage
- type TemplateConfig
- type TenantConfig
- type TimestampConfig
Constants ¶
const ( ErrDropStageEmptyConfig = "drop stage config must contain at least one of `source`, `expression`, `older_than` or `longer_than`" ErrDropStageInvalidDuration = "drop stage invalid duration, %v cannot be converted to a duration: %v" ErrDropStageInvalidConfig = "drop stage config error, `value` and `expression` cannot both be defined at the same time." ErrDropStageInvalidRegex = "drop stage regex compilation error: %v" ErrDropStageInvalidByteSize = "drop stage failed to parse longer_than to bytes: %v" ErrDropStageInvalidSource = "drop stage source invalid type should be string or list of strings" ErrDropStageNoSourceWithValue = "drop stage config must contain `source` if `value` is specified" )
const ( RFC3339Nano = "RFC3339Nano" MaxPartialLinesSize = 100 // Max buffer size to hold partial lines. )
const ( ErrEmptyGeoIPStageConfig = "geoip stage config cannot be empty" ErrEmptyDBPathGeoIPStageConfig = "db path cannot be empty" ErrEmptySourceGeoIPStageConfig = "source cannot be empty" ErrEmptyDBTypeGeoIPStageConfig = "db type should be either city or asn" )
const ( ErrExpressionsRequired = "JMES expression is required" ErrCouldNotCompileJMES = "could not compile JMES expression" ErrEmptyJSONStageConfig = "empty json stage configuration" ErrEmptyJSONStageSource = "empty source" ErrMalformedJSON = "malformed json" )
Config Errors
const ( ErrEmptyLabelStageConfig = "label stage config cannot be empty" ErrInvalidLabelName = "invalid label name: %s" )
const ( ErrLimitStageInvalidRateOrBurst = "limit stage failed to parse rate or burst" ErrLimitStageByLabelMustDrop = "When ratelimiting by label, drop must be true" MinReasonableMaxDistinctLabels = 10000 // 80bytes per rate.Limiter ~ 1MiB memory )
const ( ErrMappingRequired = "logfmt mapping is required" ErrEmptyLogfmtStageConfig = "empty logfmt stage configuration" ErrEmptyLogfmtStageSource = "empty source" )
Config Errors
const ( ErrEmptyMatchStageConfig = "match stage config cannot be empty" ErrPipelineNameRequired = "match stage pipeline name can be omitted but cannot be an empty string" ErrSelectorRequired = "selector statement required for match stage" ErrMatchRequiresStages = "match stage requires at least one additional stage to be defined in '- stages'" ErrSelectorSyntax = "invalid selector syntax for match stage" ErrStagesWithDropLine = "match stage configured to drop entries cannot contains stages" ErrUnknownMatchAction = "match stage action should be 'keep' or 'drop'" MatchActionKeep = "keep" MatchActionDrop = "drop" )
const ( MetricTypeCounter = "counter" MetricTypeGauge = "gauge" MetricTypeHistogram = "histogram" ErrEmptyMetricsStageConfig = "empty metric stage configuration" ErrMetricsStageInvalidType = "invalid metric type '%s', metric type must be one of 'counter', 'gauge', or 'histogram'" ErrInvalidIdleDur = "max_idle_duration could not be parsed as a time.Duration: '%s'" ErrSubSecIdleDur = "max_idle_duration less than 1s not allowed" )
const ( ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression" ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v" ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v" )
const ( ErrEmptyOutputStageConfig = "output stage config cannot be empty" ErrOutputSourceRequired = "output source value is required if output is specified" )
Config Errors
const ( ErrExpressionRequired = "expression is required" ErrCouldNotCompileRegex = "could not compile regular expression" ErrEmptyRegexStageConfig = "empty regex stage configuration" ErrEmptyRegexStageSource = "empty source" )
Config Errors
const ( ErrEmptyReplaceStageConfig = "empty replace stage configuration" ErrEmptyReplaceStageSource = "empty source in replace stage" )
Config Errors
const ( StageTypeJSON = "json" StageTypeLogfmt = "logfmt" StageTypeRegex = "regex" StageTypeReplace = "replace" StageTypeMetric = "metrics" StageTypeLabel = "labels" StageTypeLabelDrop = "labeldrop" StageTypeTimestamp = "timestamp" StageTypeOutput = "output" StageTypeDocker = "docker" StageTypeCRI = "cri" StageTypeMatch = "match" StageTypeTemplate = "template" StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" StageTypeSampling = "sampling" StageTypeLimit = "limit" StageTypeMultiline = "multiline" StageTypePack = "pack" StageTypeLabelAllow = "labelallow" StageTypeStaticLabels = "static_labels" StageTypeDecolorize = "decolorize" StageTypeEventLogMessage = "eventlogmessage" StageTypeGeoIP = "geoip" // Deprecated. Renamed to `structured_metadata`. Will be removed after the migration. StageTypeNonIndexedLabels = "non_indexed_labels" StageTypeStructuredMetadata = "structured_metadata" )
const ( ErrEmptyTemplateStageConfig = "template stage config cannot be empty" ErrTemplateSourceRequired = "template source value is required" )
Config Errors
const ( ErrTenantStageEmptyLabelSourceOrValue = "label, source or value config are required" ErrTenantStageConflictingLabelSourceAndValue = "label, source and value are mutually exclusive: you should set source, value or label but not all" )
const ( ErrEmptyTimestampStageConfig = "timestamp stage config cannot be empty" ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified" ErrTimestampFormatRequired = "timestamp format is required" ErrInvalidLocation = "invalid location specified: %v" ErrInvalidActionOnFailure = "invalid action on failure (supported values are %v)" ErrTimestampSourceMissing = "extracted data did not contain a timestamp" ErrTimestampConversionFailed = "failed to convert extracted time to string" ErrTimestampParsingFailed = "failed to parse time" Unix = "Unix" UnixMs = "UnixMs" UnixUs = "UnixUs" UnixNs = "UnixNs" TimestampActionOnFailureSkip = "skip" TimestampActionOnFailureFudge = "fudge" TimestampActionOnFailureDefault = TimestampActionOnFailureFudge )
const (
ErrEmptyEvtLogMsgStageConfig = "empty event log message stage configuration"
)
const (
// ErrEmptyLabelAllowStageConfig error returned if config is empty
ErrEmptyLabelAllowStageConfig = "labelallow stage config cannot be empty"
)
const (
// ErrEmptyLabelDropStageConfig error returned if config is empty
ErrEmptyLabelDropStageConfig = "labeldrop stage config cannot be empty"
)
const (
// ErrEmptyStaticLabelStageConfig error returned if config is empty
ErrEmptyStaticLabelStageConfig = "static_labels stage config cannot be empty"
)
const (
ErrSamplingStageInvalidRate = "sampling stage failed to parse rate,Sampling Rate must be between 0.0 and 1.0, received %f"
)
const (
ErrTimestampContainsYear = "timestamp '%s' is expected to not contain the year date component"
)
Variables ¶
var ( // Debug is used to wrap debug log statements, the go-kit logger won't let us introspect the current log level // so this global is used for that purpose. This allows us to skip allocations of log messages at the // debug level when debug level logging is not enabled. Log level allocations can become very expensive // as we log numerous log entries per log line at debug level. Debug = false // Inspect is used to debug promtail pipelines by showing diffs between pipeline stages Inspect = false )
var (
TimestampActionOnFailureOptions = []string{TimestampActionOnFailureSkip, TimestampActionOnFailureFudge}
)
Functions ¶
func RunWith ¶
RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel.
func RunWithSkip ¶
RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true.
func RunWithSkipOrSendMany ¶
RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries.
func SanitizeFullLabelName ¶
Sanitize a input string to convert it into a valid prometheus label TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SetReadLineRateLimiter ¶
Types ¶
type CriConfig ¶
type CriConfig struct { MaxPartialLines int `mapstructure:"max_partial_lines"` MaxPartialLineSize flagext.ByteSize `mapstructure:"max_partial_line_size"` MaxPartialLineSizeTruncate bool `mapstructure:"max_partial_line_size_truncate"` }
CriConfig contains the configuration for the cri stage
type DropConfig ¶
type DropConfig struct { DropReason *string `mapstructure:"drop_counter_reason"` Source interface{} `mapstructure:"source"` Value *string `mapstructure:"value"` Separator *string `mapstructure:"separator"` Expression *string `mapstructure:"expression"` OlderThan *string `mapstructure:"older_than"` LongerThan *string `mapstructure:"longer_than"` // contains filtered or unexported fields }
DropConfig contains the configuration for a dropStage
type EventLogMessageConfig ¶
type GeoIPConfig ¶
type GeoIPConfig struct { DB string `mapstructure:"db"` Source *string `mapstructure:"source"` DBType string `mapstructure:"db_type"` }
GeoIPConfig represents GeoIP stage config
type GeoIPFields ¶
type GeoIPFields int
const ( CITYNAME GeoIPFields = iota COUNTRYNAME CONTINENTNAME CONTINENTCODE LOCATION POSTALCODE TIMEZONE SUBDIVISIONNAME SUBDIVISIONCODE )
type JSONConfig ¶
type JSONConfig struct { Expressions map[string]string `mapstructure:"expressions"` Source *string `mapstructure:"source"` DropMalformed bool `mapstructure:"drop_malformed"` }
JSONConfig represents a JSON Stage configuration
type LabelAllowConfig ¶
type LabelAllowConfig []string
labelallowConfig is a slice of labels to be included
type LabelDropConfig ¶
type LabelDropConfig []string
LabelDropConfig is a slice of labels to be dropped
type LabelsConfig ¶
LabelsConfig is a set of labels to be extracted
type LimitConfig ¶
type LogfmtConfig ¶
type LogfmtConfig struct { Mapping map[string]string `mapstructure:"mapping"` Source *string `mapstructure:"source"` }
LogfmtConfig represents a logfmt Stage configuration
type MatcherConfig ¶
type MatcherConfig struct { PipelineName *string `mapstructure:"pipeline_name"` Selector string `mapstructure:"selector"` Stages PipelineStages `mapstructure:"stages"` Action string `mapstructure:"action"` DropReason *string `mapstructure:"drop_counter_reason"` }
MatcherConfig contains the configuration for a matcherStage
type MetricConfig ¶
type MetricConfig struct { MetricType string `mapstructure:"type"` Description string `mapstructure:"description"` Source *string `mapstructure:"source"` Prefix string `mapstructure:"prefix"` IdleDuration *string `mapstructure:"max_idle_duration"` Config interface{} `mapstructure:"config"` // contains filtered or unexported fields }
MetricConfig is a single metrics configuration.
type MetricsConfig ¶
type MetricsConfig map[string]MetricConfig
MetricsConfig is a set of configured metrics.
type MultilineConfig ¶
type MultilineConfig struct { Expression *string `mapstructure:"firstline"` MaxLines *uint64 `mapstructure:"max_lines"` MaxWaitTime *string `mapstructure:"max_wait_time"` // contains filtered or unexported fields }
MultilineConfig contains the configuration for a multilineStage
type OutputConfig ¶
type OutputConfig struct {
Source string `mapstructure:"source"`
}
OutputConfig configures output value extraction
type PackConfig ¶
type PackConfig struct { Labels []string `mapstrcuture:"labels"` IngestTimestamp *bool `mapstructure:"ingest_timestamp"` }
PackConfig contains the configuration for a packStage
type Packed ¶
func (Packed) MarshalJSON ¶
MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object
func (*Packed) UnmarshalJSON ¶
UnmarshalJSON populates a Packed struct where every key except the _entry key is added to the Labels field
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline pass down a log entry to each stage for mutation and/or label extraction.
func NewPipeline ¶
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error)
NewPipeline creates a new log entry pipeline from a configuration
func (*Pipeline) Wrap ¶
func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler
Wrap implements EntryMiddleware
type PipelineStage ¶
type PipelineStage = map[interface{}]interface{}
PipelineStage contains configuration for a single pipeline stage
type PipelineStages ¶
type PipelineStages = []interface{}
PipelineStages contains configuration for each stage within a pipeline
type Processor ¶
type Processor interface { Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) Name() string }
Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated timestamp and log entry
type RegexConfig ¶
type RegexConfig struct { Expression string `mapstructure:"expression"` Source *string `mapstructure:"source"` }
RegexConfig contains a regexStage configuration
type ReplaceConfig ¶
type ReplaceConfig struct { Expression string `mapstructure:"expression"` Source *string `mapstructure:"source"` Replace string `mapstructure:"replace"` }
ReplaceConfig contains a regexStage configuration
type SamplingConfig ¶
type SamplingConfig struct { DropReason *string `mapstructure:"drop_counter_reason"` // SamplingRate float64 `mapstructure:"rate"` }
SamplingConfig contains the configuration for a samplingStage
type Stage ¶
Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel.
func New ¶
func New(logger log.Logger, jobName *string, stageType string, cfg interface{}, registerer prometheus.Registerer) (Stage, error)
New creates a new stage for the given type and configuration.
func NewCRI ¶
func NewCRI(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error)
NewCRI creates a CRI format specific pipeline stage
func NewDocker ¶
func NewDocker(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
NewDocker creates a Docker json log format specific pipeline stage.
type StageCreationParams ¶
type StageCreationParams struct {
// contains filtered or unexported fields
}
type StaticLabelConfig ¶
StaticLabelConfig is a slice of static-labels to be included
type StaticLabelStage ¶
type StaticLabelStage struct {
// contains filtered or unexported fields
}
type TemplateConfig ¶
type TemplateConfig struct { Source string `mapstructure:"source"` Template string `mapstructure:"template"` }
TemplateConfig configures template value extraction
type TenantConfig ¶
type TimestampConfig ¶
type TimestampConfig struct { Source string `mapstructure:"source"` Format string `mapstructure:"format"` FallbackFormats []string `mapstructure:"fallback_formats"` Location *string `mapstructure:"location"` ActionOnFailure *string `mapstructure:"action_on_failure"` }
TimestampConfig configures timestamp extraction
Source Files ¶
- decolorize.go
- drop.go
- eventlogmessage.go
- extensions.go
- geoip.go
- inspector.go
- json.go
- labelallow.go
- labeldrop.go
- labels.go
- limit.go
- logfmt.go
- match.go
- metrics.go
- multiline.go
- output.go
- pack.go
- pipeline.go
- regex.go
- replace.go
- sampling.go
- stage.go
- static_labels.go
- structuredmetadata.go
- template.go
- tenant.go
- timestamp.go
- util.go