pipeline

package
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: BSD-3-Clause Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultStreamField         = "stream"
	DefaultCapacity            = 1024
	DefaultAvgInputEventSize   = 4 * 1024
	DefaultMaxInputEventSize   = 0
	DefaultJSONNodePoolSize    = 1024
	DefaultMaintenanceInterval = time.Second * 5
	DefaultEventTimeout        = time.Second * 30
	DefaultFieldValue          = "not_set"
	DefaultStreamName          = StreamName("not_set")
	DefaultMetricHoldDuration  = time.Minute * 30

	EventSeqIDError = uint64(0)
)
View Source
const (
	ByNameSelector = iota + 1
)
View Source
const LevelUnknownStr = ""
View Source
const (
	UnixTime = "unixtime"
)

Variables

View Source
var MatchModes = map[string]MatchMode{
	"": MatchModeAnd,

	"and": MatchModeAnd,

	"or": MatchModeOr,

	"and_prefix": MatchModeAndPrefix,

	"or_prefix": MatchModeOrPrefix,
}

Functions

func ByteToStringUnsafe

func ByteToStringUnsafe(b []byte) string

ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)

func CloneString added in v0.4.8

func CloneString(s string) string

Clone deeply copies string

func CreateNestedField added in v0.5.10

func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node

CreateNestedField creates nested field by the path. For example, []string{"path.to", "object"} creates: { "path.to": {"object": {} } Warn: it overrides fields if it contains non-object type on the path. For example: in: { "path.to": [{"userId":"12345"}] }, out: { "path.to": {"object": {}} }

func GetBackoff added in v0.21.0

func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff

func ParseFormatName

func ParseFormatName(formatName string) (string, error)

func ParseLevelAsString added in v0.5.10

func ParseLevelAsString(level string) string

ParseLevelAsString converts log level to the string representation according to the RFC-5424.

func ParseTime added in v0.8.12

func ParseTime(format, value string) (time.Time, error)

func StringToByteUnsafe

func StringToByteUnsafe(s string) []byte

StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)

Types

type ActionPlugin

type ActionPlugin interface {
	Start(config AnyConfig, params *ActionPluginParams)
	Stop()
	Do(*Event) ActionResult
}

type ActionPluginController

type ActionPluginController interface {
	Propagate(event *Event) // throw held event back to pipeline
	Spawn(parent *Event, nodes []*insaneJSON.Node)
	IncMaxEventSizeExceeded() // inc max event size exceeded counter
}

type ActionPluginInfo

type ActionPluginInfo struct {
	*ActionPluginStaticInfo
	*PluginRuntimeInfo
}

type ActionPluginParams

type ActionPluginParams struct {
	PluginDefaultParams
	Controller ActionPluginController
	Logger     *zap.SugaredLogger
}

type ActionPluginStaticInfo

type ActionPluginStaticInfo struct {
	*PluginStaticInfo

	MetricName       string
	MetricLabels     []string
	MetricSkipStatus bool
	MatchConditions  MatchConditions
	MatchMode        MatchMode
	MatchInvert      bool

	DoIfChecker *DoIfChecker
}

type ActionResult

type ActionResult int
const (
	// ActionPass pass event to the next action in a pipeline
	ActionPass ActionResult = iota
	// ActionCollapse skip further processing of event and request next event from the same stream and source as current
	// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream
	ActionCollapse
	// ActionDiscard skip further processing of event and request next event from any stream and source
	ActionDiscard
	// ActionHold hold event in a plugin and request next event from the same stream and source as current.
	// same as ActionCollapse but held event should be manually committed or returned into pipeline.
	// check out Commit()/Propagate() functions in InputPluginController.
	// plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream.
	ActionHold
	// ActionBreak abort the event processing and pass it to an output.
	ActionBreak
)

type AnyConfig

type AnyConfig any

func GetConfig added in v0.18.0

func GetConfig(info *PluginStaticInfo, configJson []byte, values map[string]int) (AnyConfig, error)

type AnyPlugin

type AnyPlugin any

type BackoffOpts added in v0.21.0

type BackoffOpts struct {
	MinRetention time.Duration
	Multiplier   float64
	AttemptNum   int
}

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

func NewPreparedBatch added in v0.16.2

func NewPreparedBatch(events []*Event) *Batch

func (*Batch) ForEach added in v0.16.2

func (b *Batch) ForEach(cb func(event *Event))

type BatchStatus added in v0.12.3

type BatchStatus byte
const (
	BatchStatusNotReady BatchStatus = iota
	BatchStatusMaxSizeExceeded
	BatchStatusTimeoutExceeded
)

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func NewBatcher

func NewBatcher(opts BatcherOptions) *Batcher

func (*Batcher) Add

func (b *Batcher) Add(event *Event)

func (*Batcher) Start

func (b *Batcher) Start(ctx context.Context)

func (*Batcher) Stop

func (b *Batcher) Stop()

type BatcherMaintenanceFn

type BatcherMaintenanceFn func(*WorkerData)

type BatcherOptions added in v0.5.15

type BatcherOptions struct {
	PipelineName        string
	OutputType          string
	OutFn               BatcherOutFn
	MaintenanceFn       BatcherMaintenanceFn
	Controller          OutputPluginController
	Workers             int
	BatchSizeCount      int
	BatchSizeBytes      int
	FlushTimeout        time.Duration
	MaintenanceInterval time.Duration
	MetricCtl           *metric.Ctl
}

type BatcherOutFn

type BatcherOutFn func(*WorkerData, *Batch)

type ConditionType

type ConditionType int

type DeltaWrapper added in v0.5.5

type DeltaWrapper struct {
	// contains filtered or unexported fields
}

DeltaWrapper acts as a wrapper around int64 and returns the difference between the new and the old value when a new value is set. This is useful for metric (see the maintenance function).

type DoIfChecker added in v0.16.2

type DoIfChecker struct {
	// contains filtered or unexported fields
}

func NewDoIfChecker added in v0.16.2

func NewDoIfChecker(root DoIfNode) *DoIfChecker

func (*DoIfChecker) Check added in v0.16.2

func (c *DoIfChecker) Check(eventRoot *insaneJSON.Root) bool

func (*DoIfChecker) IsEqualTo added in v0.16.2

func (c *DoIfChecker) IsEqualTo(c2 *DoIfChecker) error

type DoIfNode added in v0.16.2

type DoIfNode interface {
	Type() DoIfNodeType
	Check(*insaneJSON.Root) bool
	// contains filtered or unexported methods
}

func NewFieldOpNode added in v0.16.2

func NewFieldOpNode(op string, field string, caseSensitive bool, values [][]byte) (DoIfNode, error)

func NewLogicalNode added in v0.16.2

func NewLogicalNode(op string, operands []DoIfNode) (DoIfNode, error)

type DoIfNodeType added in v0.16.2

type DoIfNodeType int
const (
	DoIfNodeEmpty DoIfNodeType = iota

	// > Type of node where matching rules for fields are stored.
	DoIfNodeFieldOp // *

	// > Type of node where logical rules for applying other rules are stored.
	DoIfNodeLogicalOp // *
)

type Event

type Event struct {
	Root *insaneJSON.Root
	Buf  []byte

	SeqID      uint64
	Offset     int64
	SourceID   SourceID
	SourceName string

	Size int // last known event size, it may not be actual
	// contains filtered or unexported fields
}

func (*Event) Encode

func (e *Event) Encode(outBuf []byte) ([]byte, int)

func (*Event) IsChildKind added in v0.16.2

func (e *Event) IsChildKind() bool

func (*Event) IsChildParentKind added in v0.16.2

func (e *Event) IsChildParentKind() bool

func (*Event) IsIgnoreKind

func (e *Event) IsIgnoreKind() bool

func (*Event) IsRegularKind

func (e *Event) IsRegularKind() bool

func (*Event) IsTimeoutKind

func (e *Event) IsTimeoutKind() bool

func (*Event) IsUnlockKind

func (e *Event) IsUnlockKind() bool

func (*Event) SetChildKind added in v0.16.2

func (e *Event) SetChildKind()

func (*Event) SetChildParentKind added in v0.16.2

func (e *Event) SetChildParentKind()

func (*Event) SetTimeoutKind

func (e *Event) SetTimeoutKind()

func (*Event) SetUnlockKind

func (e *Event) SetUnlockKind()

func (*Event) StreamNameBytes

func (e *Event) StreamNameBytes() []byte

func (*Event) String

func (e *Event) String() string

func (*Event) SubparseJSON

func (e *Event) SubparseJSON(json []byte) (*insaneJSON.Node, error)

type InputPlugin

type InputPlugin interface {
	Start(config AnyConfig, params *InputPluginParams)
	Stop()
	Commit(*Event)
	PassEvent(event *Event) bool
}

type InputPluginController

type InputPluginController interface {
	In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
	UseSpread()                           // don't use stream field and spread all events across all processors
	DisableStreams()                      // don't use stream field
	SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder
	IncReadOps()                          // inc read ops for metric
	IncMaxEventSizeExceeded()             // inc max event size exceeded counter
}

type InputPluginInfo

type InputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type InputPluginParams

type InputPluginParams struct {
	PluginDefaultParams
	Controller InputPluginController
	Logger     *zap.SugaredLogger
}

type Kind added in v0.13.0

type Kind byte
const (
	EventKindRegular Kind = iota

	EventKindTimeout
	EventKindUnlock
)

func (Kind) String added in v0.13.1

func (k Kind) String() string

type LogLevel added in v0.5.10

type LogLevel int
const (
	LevelUnknown LogLevel = iota - 1
	LevelEmergency
	LevelAlert
	LevelCritical
	LevelError
	LevelWarning
	LevelNotice
	LevelInformational
	LevelDebug
)

func ParseLevelAsNumber added in v0.5.10

func ParseLevelAsNumber(level string) LogLevel

ParseLevelAsNumber converts log level to the int representation according to the RFC-5424.

type MatchCondition

type MatchCondition struct {
	// Slice for nested fields. Separator is a dot symbol.
	Field  []string
	Values []string
	Regexp *regexp.Regexp
}

type MatchConditions

type MatchConditions []MatchCondition

type MatchMode

type MatchMode int
const (
	MatchModeAnd MatchMode = iota
	MatchModeOr
	MatchModeAndPrefix
	MatchModeOrPrefix
	MatchModeUnknown
)

func MatchModeFromString added in v0.7.3

func MatchModeFromString(mm string) MatchMode

type OutputPlugin

type OutputPlugin interface {
	Start(config AnyConfig, params *OutputPluginParams)
	Stop()
	Out(*Event)
}

type OutputPluginController

type OutputPluginController interface {
	Commit(event *Event) // notify input plugin that event is successfully processed and save offsets
	Error(err string)
}

type OutputPluginInfo

type OutputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type OutputPluginParams

type OutputPluginParams struct {
	PluginDefaultParams
	Controller OutputPluginController
	Logger     *zap.SugaredLogger
}

type Pipeline

type Pipeline struct {
	Name string

	Procs []*processor
	// contains filtered or unexported fields
}

func New

func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline

New creates new pipeline. Consider using `SetupHTTPHandlers` next.

func (*Pipeline) AddAction

func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)

func (*Pipeline) Commit

func (p *Pipeline) Commit(event *Event)

func (*Pipeline) DisableParallelism

func (p *Pipeline) DisableParallelism()

func (*Pipeline) DisableStreams

func (p *Pipeline) DisableStreams()

func (*Pipeline) EnableEventLog

func (p *Pipeline) EnableEventLog()

func (*Pipeline) Error

func (p *Pipeline) Error(err string)

func (*Pipeline) GetEventLogItem

func (p *Pipeline) GetEventLogItem(index int) string

func (*Pipeline) GetEventsTotal

func (p *Pipeline) GetEventsTotal() int

func (*Pipeline) GetInput

func (p *Pipeline) GetInput() InputPlugin

func (*Pipeline) GetOutput

func (p *Pipeline) GetOutput() OutputPlugin

func (*Pipeline) In

func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) (seqID uint64)

In decodes message and passes it to event stream.

func (*Pipeline) IncMaxEventSizeExceeded added in v0.5.11

func (p *Pipeline) IncMaxEventSizeExceeded()

func (*Pipeline) IncReadOps added in v0.5.5

func (p *Pipeline) IncReadOps()

func (*Pipeline) SetInput

func (p *Pipeline) SetInput(info *InputPluginInfo)

func (*Pipeline) SetOutput

func (p *Pipeline) SetOutput(info *OutputPluginInfo)

func (*Pipeline) SetupHTTPHandlers

func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)

SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.

func (*Pipeline) Start

func (p *Pipeline) Start()

func (*Pipeline) Stop

func (p *Pipeline) Stop()

func (*Pipeline) SuggestDecoder

func (p *Pipeline) SuggestDecoder(t decoder.DecoderType)

func (*Pipeline) UseSpread

func (p *Pipeline) UseSpread()

type PluginDefaultParams

type PluginDefaultParams struct {
	PipelineName     string
	PipelineSettings *Settings
	MetricCtl        *metric.Ctl
}

type PluginFactory

type PluginFactory func() (AnyPlugin, AnyConfig)

type PluginKind

type PluginKind string
const (
	PluginKindInput  PluginKind = "input"
	PluginKindAction PluginKind = "action"
	PluginKindOutput PluginKind = "output"
)

type PluginRuntimeInfo

type PluginRuntimeInfo struct {
	Plugin AnyPlugin
	ID     string
}

type PluginSelector

type PluginSelector struct {
	CondType  ConditionType
	CondValue string
}

PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.

type PluginStaticInfo

type PluginStaticInfo struct {
	Type    string
	Factory PluginFactory
	Config  AnyConfig

	// Endpoints is the map of endpoint name to handlerFunc.
	// Every plugin can provide their own API through Endpoints.
	Endpoints         map[string]func(http.ResponseWriter, *http.Request)
	AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config
}

type PluginsStarterData

type PluginsStarterData struct {
	Config AnyConfig
	Params *OutputPluginParams
}

type PluginsStarterMap

type PluginsStarterMap map[string]PluginsStarterData

type RetriableBatcher added in v0.21.0

type RetriableBatcher struct {
	// contains filtered or unexported fields
}

func NewRetriableBatcher added in v0.21.0

func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher

func (*RetriableBatcher) Add added in v0.21.0

func (b *RetriableBatcher) Add(event *Event)

func (*RetriableBatcher) Out added in v0.21.0

func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch)

func (*RetriableBatcher) Start added in v0.21.0

func (b *RetriableBatcher) Start(ctx context.Context)

func (*RetriableBatcher) Stop added in v0.21.0

func (b *RetriableBatcher) Stop()

type RetriableBatcherOutFn added in v0.21.0

type RetriableBatcherOutFn func(*WorkerData, *Batch) error

type Settings

type Settings struct {
	Decoder             string
	Capacity            int
	MaintenanceInterval time.Duration
	EventTimeout        time.Duration
	AntispamThreshold   int
	AntispamExceptions  matchrule.RuleSets
	AvgEventSize        int
	MaxEventSize        int
	StreamField         string
	IsStrict            bool
	MetricHoldDuration  time.Duration
}

type SourceID

type SourceID uint64

type StreamID added in v0.5.20

type StreamID uint64

type StreamName

type StreamName string

type WorkerData

type WorkerData any

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL