pipeline

package
v0.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2024 License: BSD-3-Clause Imports: 29 Imported by: 0

README

Match modes

And

match_mode: and — matches fields with AND operator

Example:

pipelines:
  test:
    actions:
      - type: discard
        match_fields:
          k8s_namespace: [payment, tarifficator] # use exact match
          k8s_pod: /^payment-api.*/              # use regexp match
        match_mode: and

result:

{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd"}         # discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"payment-api"}         # discarded
{"k8s_namespace": "payment-tarifficator", "k8s_pod":"payment-api"} # won't be discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"no-payment-api"}      # won't be discarded

Or

match_mode: or — matches fields with OR operator

Example:

pipelines:
  test:
    actions:
      - type: discard
        match_fields:
          k8s_namespace: [payment, tarifficator] # use exact match
          k8s_pod: /^payment-api.*/              # use regexp match
        match_mode: or

result:

{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd"}         # discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"payment-api"}         # discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api"}                  # discarded
{"k8s_namespace": "payment", "k8s_pod":"map-api"}                  # discarded
{"k8s_namespace": "tarifficator", "k8s_pod":"tarifficator-go-api"} # discarded
{"k8s_namespace": "sre", "k8s_pod":"cpu-quotas-abcd-1234"}         # won't be discarded

AndPrefix

match_mode: and_prefix — matches fields with AND operator

Example:

pipelines:
  test:
    actions:
      - type: discard
        match_fields:
          k8s_namespace: payment # use prefix match
          k8s_pod: payment-api- # use prefix match
        match_mode: and_prefix

result:

{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd-1234"}    # discarded
{"k8s_namespace": "payment-2", "k8s_pod":"payment-api-abcd-1234"}  # discarded
{"k8s_namespace": "payment", "k8s_pod":"checkout"}                 # won't be discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api-abcd-1234"}        # won't be discarded
{"k8s_namespace": "payment-abcd", "k8s_pod":"payment-api"}         # won't be discarded

OrPrefix

match_mode: or_prefix — matches fields with OR operator

Example:

pipelines:
  test:
    actions:
      - type: discard
        match_fields:
          k8s_namespace: [payment, tarifficator] # use prefix match
          k8s_pod: /-api-.*/ # use regexp match
        match_mode: or_prefix

result:

{"k8s_namespace": "payment", "k8s_pod":"payment-api-abcd-1234"}    # discarded
{"k8s_namespace": "payment", "k8s_pod":"checkout"}                 # discarded
{"k8s_namespace": "map", "k8s_pod":"map-go-api-abcd-1234"}         # discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api"}                  # won't be discarded
{"k8s_namespace": "map", "k8s_pod":"payment-api-abcd-1234"}        # discarded
{"k8s_namespace": "tariff", "k8s_pod":"tarifficator"}              # won't be discarded


Generated using insane-doc

Documentation

Index

Constants

View Source
const (
	DefaultAntispamThreshold       = 0
	DefaultSourceNameMetaField     = ""
	DefaultDecoder                 = "auto"
	DefaultIsStrict                = false
	DefaultStreamField             = "stream"
	DefaultCapacity                = 1024
	DefaultAvgInputEventSize       = 4 * 1024
	DefaultMaxInputEventSize       = 0
	DefaultCutOffEventByLimit      = false
	DefaultCutOffEventByLimitField = ""
	DefaultJSONNodePoolSize        = 1024
	DefaultMaintenanceInterval     = time.Second * 5
	DefaultEventTimeout            = time.Second * 30
	DefaultFieldValue              = "not_set"
	DefaultStreamName              = StreamName("not_set")
	DefaultMetricHoldDuration      = time.Minute * 30
	DefaultMetaCacheSize           = 1024

	EventSeqIDError = uint64(0)
)
View Source
const (
	ByNameSelector = iota + 1
)
View Source
const LevelUnknownStr = ""
View Source
const (
	UnixTime = "unixtime"
)

Variables

View Source
var MatchModes = map[string]MatchMode{
	"": MatchModeAnd,

	"and": MatchModeAnd,

	"or": MatchModeOr,

	"and_prefix": MatchModeAndPrefix,

	"or_prefix": MatchModeOrPrefix,
}

Functions

func ByteToStringUnsafe

func ByteToStringUnsafe(b []byte) string

ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)

func CloneString added in v0.4.8

func CloneString(s string) string

Clone deeply copies string

func CreateNestedField added in v0.5.10

func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node

CreateNestedField creates nested field by the path. For example, []string{"path.to", "object"} creates: { "path.to": {"object": {} } Warn: it overrides fields if it contains non-object type on the path. For example: in: { "path.to": [{"userId":"12345"}] }, out: { "path.to": {"object": {}} }

func GetBackoff added in v0.21.0

func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff

func ParseFormatName

func ParseFormatName(formatName string) (string, error)

func ParseLevelAsString added in v0.5.10

func ParseLevelAsString(level string) string

ParseLevelAsString converts log level to the string representation according to the RFC-5424.

func ParseTime added in v0.8.12

func ParseTime(format, value string) (time.Time, error)

func StringToByteUnsafe

func StringToByteUnsafe(s string) []byte

StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)

Types

type ActionPlugin

type ActionPlugin interface {
	Start(config AnyConfig, params *ActionPluginParams)
	Stop()
	Do(*Event) ActionResult
}

type ActionPluginController

type ActionPluginController interface {
	Propagate(event *Event) // throw held event back to pipeline
	Spawn(parent *Event, nodes []*insaneJSON.Node)
	IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter
}

type ActionPluginInfo

type ActionPluginInfo struct {
	*ActionPluginStaticInfo
	*PluginRuntimeInfo
}

type ActionPluginParams

type ActionPluginParams struct {
	PluginDefaultParams
	Controller ActionPluginController
	Logger     *zap.SugaredLogger
}

type ActionPluginStaticInfo

type ActionPluginStaticInfo struct {
	*PluginStaticInfo

	MetricName       string
	MetricLabels     []string
	MetricSkipStatus bool
	MatchConditions  MatchConditions
	MatchMode        MatchMode
	MatchInvert      bool

	DoIfChecker *doif.Checker
}

type ActionResult

type ActionResult int
const (
	// ActionPass pass event to the next action in a pipeline
	ActionPass ActionResult = iota
	// ActionCollapse skip further processing of event and request next event from the same stream and source as current
	// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream
	ActionCollapse
	// ActionDiscard skip further processing of event and request next event from any stream and source
	ActionDiscard
	// ActionHold hold event in a plugin and request next event from the same stream and source as current.
	// same as ActionCollapse but held event should be manually committed or returned into pipeline.
	// check out Commit()/Propagate() functions in InputPluginController.
	// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream.
	ActionHold
	// ActionBreak abort the event processing and pass it to an output.
	ActionBreak
)

type AnyConfig

type AnyConfig any

func GetConfig added in v0.18.0

func GetConfig(info *PluginStaticInfo, configJson []byte, values map[string]int) (AnyConfig, error)

type AnyPlugin

type AnyPlugin any

type BackoffOpts added in v0.21.0

type BackoffOpts struct {
	MinRetention time.Duration
	Multiplier   float64
	AttemptNum   int
}

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

func NewPreparedBatch added in v0.16.2

func NewPreparedBatch(events []*Event) *Batch

func (*Batch) ForEach added in v0.16.2

func (b *Batch) ForEach(cb func(event *Event))

type BatchStatus added in v0.12.3

type BatchStatus byte
const (
	BatchStatusNotReady BatchStatus = iota
	BatchStatusMaxSizeExceeded
	BatchStatusTimeoutExceeded
)

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func NewBatcher

func NewBatcher(opts BatcherOptions) *Batcher

func (*Batcher) Add

func (b *Batcher) Add(event *Event)

func (*Batcher) Start

func (b *Batcher) Start(ctx context.Context)

func (*Batcher) Stop

func (b *Batcher) Stop()

type BatcherMaintenanceFn

type BatcherMaintenanceFn func(*WorkerData)

type BatcherOptions added in v0.5.15

type BatcherOptions struct {
	PipelineName        string
	OutputType          string
	OutFn               BatcherOutFn
	MaintenanceFn       BatcherMaintenanceFn
	Controller          OutputPluginController
	Workers             int
	BatchSizeCount      int
	BatchSizeBytes      int
	FlushTimeout        time.Duration
	MaintenanceInterval time.Duration
	MetricCtl           *metric.Ctl
}

type BatcherOutFn

type BatcherOutFn func(*WorkerData, *Batch)

type ConditionType

type ConditionType int

type DeltaWrapper added in v0.5.5

type DeltaWrapper struct {
	// contains filtered or unexported fields
}

DeltaWrapper acts as a wrapper around int64 and returns the difference between the new and the old value when a new value is set. This is useful for metric (see the maintenance function).

type Event

type Event struct {
	Root *insaneJSON.Root
	Buf  []byte

	SeqID      uint64
	Offset     int64
	SourceID   SourceID
	SourceName string

	// Size in bytes of the raw event before any action plugin.
	Size int
	// contains filtered or unexported fields
}

func (*Event) Encode

func (e *Event) Encode(outBuf []byte) ([]byte, int)

func (*Event) IsChildKind added in v0.16.2

func (e *Event) IsChildKind() bool

func (*Event) IsChildParentKind added in v0.16.2

func (e *Event) IsChildParentKind() bool

func (*Event) IsIgnoreKind

func (e *Event) IsIgnoreKind() bool

func (*Event) IsRegularKind

func (e *Event) IsRegularKind() bool

func (*Event) IsTimeoutKind

func (e *Event) IsTimeoutKind() bool

func (*Event) IsUnlockKind

func (e *Event) IsUnlockKind() bool

func (*Event) SetChildKind added in v0.16.2

func (e *Event) SetChildKind()

func (*Event) SetChildParentKind added in v0.16.2

func (e *Event) SetChildParentKind()

func (*Event) SetTimeoutKind

func (e *Event) SetTimeoutKind()

func (*Event) SetUnlockKind

func (e *Event) SetUnlockKind()

func (*Event) StreamNameBytes

func (e *Event) StreamNameBytes() []byte

func (*Event) String

func (e *Event) String() string

type InputPlugin

type InputPlugin interface {
	Start(config AnyConfig, params *InputPluginParams)
	Stop()
	Commit(*Event)
	PassEvent(event *Event) bool
}

type InputPluginController

type InputPluginController interface {
	In(sourceID SourceID, sourceName string, offsets Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
	UseSpread()                            // don't use stream field and spread all events across all processors
	DisableStreams()                       // don't use stream field
	SuggestDecoder(t decoder.Type)         // set decoder type if pipeline uses "auto" value for decoder
	IncReadOps()                           // inc read ops for metric
	IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter
}

type InputPluginInfo

type InputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type InputPluginParams

type InputPluginParams struct {
	PluginDefaultParams
	Controller InputPluginController
	Logger     *zap.SugaredLogger
}

type Kind added in v0.13.0

type Kind byte
const (
	EventKindRegular Kind = iota

	EventKindTimeout
	EventKindUnlock
)

func (Kind) String added in v0.13.1

func (k Kind) String() string

type LogLevel added in v0.5.10

type LogLevel int
const (
	LevelUnknown LogLevel = iota - 1
	LevelEmergency
	LevelAlert
	LevelCritical
	LevelError
	LevelWarning
	LevelNotice
	LevelInformational
	LevelDebug
)

func ParseLevelAsNumber added in v0.5.10

func ParseLevelAsNumber(level string) LogLevel

ParseLevelAsNumber converts log level to the int representation according to the RFC-5424.

type MatchCondition

type MatchCondition struct {
	// Slice for nested fields. Separator is a dot symbol.
	Field  []string
	Values []string
	Regexp *regexp.Regexp
}

type MatchConditions

type MatchConditions []MatchCondition

type MatchMode

type MatchMode int
const (
	MatchModeAnd MatchMode = iota
	MatchModeOr
	MatchModeAndPrefix
	MatchModeOrPrefix
	MatchModeUnknown
)

func MatchModeFromString added in v0.7.3

func MatchModeFromString(mm string) MatchMode

type Offsets added in v0.40.4

type Offsets struct {
	// contains filtered or unexported fields
}

func NewOffsets added in v0.41.1

func NewOffsets(current int64, streamOffsets SliceMap) Offsets

type OutputPlugin

type OutputPlugin interface {
	Start(config AnyConfig, params *OutputPluginParams)
	Stop()
	Out(*Event)
}

type OutputPluginController

type OutputPluginController interface {
	Commit(event *Event) // notify input plugin that event is successfully processed and save offsets
	Error(err string)
}

type OutputPluginInfo

type OutputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type OutputPluginParams

type OutputPluginParams struct {
	PluginDefaultParams
	Controller OutputPluginController
	Logger     *zap.SugaredLogger
}

type Pipeline

type Pipeline struct {
	Name string

	Procs []*processor
	// contains filtered or unexported fields
}

func New

func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline

New creates new pipeline. Consider using `SetupHTTPHandlers` next.

func (*Pipeline) AddAction

func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)

func (*Pipeline) Commit

func (p *Pipeline) Commit(event *Event)

func (*Pipeline) DisableParallelism

func (p *Pipeline) DisableParallelism()

func (*Pipeline) DisableStreams

func (p *Pipeline) DisableStreams()

func (*Pipeline) EnableEventLog

func (p *Pipeline) EnableEventLog()

func (*Pipeline) Error

func (p *Pipeline) Error(err string)

func (*Pipeline) GetEventLogItem

func (p *Pipeline) GetEventLogItem(index int) string

func (*Pipeline) GetEventsTotal

func (p *Pipeline) GetEventsTotal() int

func (*Pipeline) GetInput

func (p *Pipeline) GetInput() InputPlugin

func (*Pipeline) GetOutput

func (p *Pipeline) GetOutput() OutputPlugin

func (*Pipeline) In

func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64)

In decodes message and passes it to event stream.

func (*Pipeline) IncCountEventPanicsRecovered added in v0.40.2

func (p *Pipeline) IncCountEventPanicsRecovered()

func (*Pipeline) IncMaxEventSizeExceeded added in v0.5.11

func (p *Pipeline) IncMaxEventSizeExceeded(lvs ...string)

func (*Pipeline) IncReadOps added in v0.5.5

func (p *Pipeline) IncReadOps()

func (*Pipeline) SetInput

func (p *Pipeline) SetInput(info *InputPluginInfo)

func (*Pipeline) SetOutput

func (p *Pipeline) SetOutput(info *OutputPluginInfo)

func (*Pipeline) SetupHTTPHandlers

func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)

SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.

func (*Pipeline) Start

func (p *Pipeline) Start()

func (*Pipeline) Stop

func (p *Pipeline) Stop()

func (*Pipeline) SuggestDecoder

func (p *Pipeline) SuggestDecoder(t decoder.Type)

func (*Pipeline) UseSpread

func (p *Pipeline) UseSpread()

type PluginDefaultParams

type PluginDefaultParams struct {
	PipelineName     string
	PipelineSettings *Settings
	MetricCtl        *metric.Ctl
}

type PluginFactory

type PluginFactory func() (AnyPlugin, AnyConfig)

type PluginKind

type PluginKind string
const (
	PluginKindInput  PluginKind = "input"
	PluginKindAction PluginKind = "action"
	PluginKindOutput PluginKind = "output"
)

type PluginRuntimeInfo

type PluginRuntimeInfo struct {
	Plugin AnyPlugin
	ID     string
}

type PluginSelector

type PluginSelector struct {
	CondType  ConditionType
	CondValue string
}

PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.

type PluginStaticInfo

type PluginStaticInfo struct {
	Type    string
	Factory PluginFactory
	Config  AnyConfig

	// Endpoints is the map of endpoint name to handlerFunc.
	// Every plugin can provide their own API through Endpoints.
	Endpoints         map[string]func(http.ResponseWriter, *http.Request)
	AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config
}

type PluginsStarterData

type PluginsStarterData struct {
	Config AnyConfig
	Params *OutputPluginParams
}

type PluginsStarterMap

type PluginsStarterMap map[string]PluginsStarterData

type PoolType added in v0.40.4

type PoolType string
const (
	PoolTypeStd    PoolType = "std"
	PoolTypeLowMem PoolType = "low_memory"
)

type RetriableBatcher added in v0.21.0

type RetriableBatcher struct {
	// contains filtered or unexported fields
}

func NewRetriableBatcher added in v0.21.0

func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher

func (*RetriableBatcher) Add added in v0.21.0

func (b *RetriableBatcher) Add(event *Event)

func (*RetriableBatcher) Out added in v0.21.0

func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch)

func (*RetriableBatcher) Start added in v0.21.0

func (b *RetriableBatcher) Start(ctx context.Context)

func (*RetriableBatcher) Stop added in v0.21.0

func (b *RetriableBatcher) Stop()

type RetriableBatcherOutFn added in v0.21.0

type RetriableBatcherOutFn func(*WorkerData, *Batch) error

type Settings

type Settings struct {
	Decoder                 string
	DecoderParams           map[string]any
	Capacity                int
	MetaCacheSize           int
	MaintenanceInterval     time.Duration
	EventTimeout            time.Duration
	AntispamThreshold       int
	AntispamExceptions      antispam.Exceptions
	SourceNameMetaField     string
	AvgEventSize            int
	MaxEventSize            int
	CutOffEventByLimit      bool
	CutOffEventByLimitField string
	StreamField             string
	IsStrict                bool
	MetricHoldDuration      time.Duration
	Pool                    PoolType
}

type SliceMap added in v0.41.1

type SliceMap []kv

SliceMap is a map of streamName to offset. It could be just map[k]v, but Go > 1.17 internal map implementation can't work with mutable strings that occurs when using unsafe cast from []byte. Also it should be not slower on 1-2 keys like linked list, which is often the case for streams per job.

func SliceFromMap added in v0.41.1

func SliceFromMap(m map[StreamName]int64) SliceMap

func (*SliceMap) Get added in v0.41.1

func (so *SliceMap) Get(streamName StreamName) (int64, bool)

func (*SliceMap) Set added in v0.41.1

func (so *SliceMap) Set(streamName StreamName, offset int64)

type SourceID

type SourceID uint64

type StreamID added in v0.5.20

type StreamID uint64

type StreamName

type StreamName string

type WorkerData

type WorkerData any

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL