pipeline

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2022 License: BSD-3-Clause Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultStreamField         = "stream"
	DefaultCapacity            = 1024
	DefaultAvgInputEventSize   = 4 * 1024
	DefaultMaxInputEventSize   = 0
	DefaultJSONNodePoolSize    = 1024
	DefaultMaintenanceInterval = time.Second * 5
	DefaultEventTimeout        = time.Second * 30
	DefaultFieldValue          = "not_set"
	DefaultStreamName          = StreamName("not_set")

	EventSeqIDError = uint64(0)
)
View Source
const LevelUnknownStr = ""
View Source
const PromNamespace = "file_d"

Variables

View Source
var MatchModes = map[string]MatchMode{
	"": MatchModeAnd,

	"and": MatchModeAnd,

	"or": MatchModeOr,

	"and_prefix": MatchModeAndPrefix,

	"or_prefix": MatchModeOrPrefix,
}

Functions

func ByteToStringUnsafe

func ByteToStringUnsafe(b []byte) string

ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)

func CloneString added in v0.4.8

func CloneString(s string) string

Clone deeply copies string

func CreateNestedField added in v0.5.10

func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node

CreateNestedField creates nested field by the path. For example, []string{"path.to", "object"} creates: { "path.to": {"object": {} } Warn: it overrides fields if it contains non-object type on the path. For example: in: { "path.to": [{"userId":"12345"}] }, out: { "path.to": {"object": {}} }

func NewProcessor

func NewProcessor(
	metricsHolder *metricsHolder,
	activeCounter *atomic.Int32,
	output OutputPlugin,
	streamer *streamer,
	finalizeFn finalizeFn,
) *processor

func ParseFormatName

func ParseFormatName(formatName string) (string, error)

func ParseLevelAsString added in v0.5.10

func ParseLevelAsString(level string) string

ParseLevelAsString converts log level to the string representation according to the RFC-5424.

func StringToByteUnsafe

func StringToByteUnsafe(s string) []byte

StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)

Types

type ActionPlugin

type ActionPlugin interface {
	Start(config AnyConfig, params *ActionPluginParams)
	Stop()
	Do(*Event) ActionResult
	RegisterMetrics(ctl *metric.Ctl)
}

type ActionPluginController

type ActionPluginController interface {
	Commit(event *Event)    // commit offset of held event and skip further processing
	Propagate(event *Event) // throw held event back to pipeline
}

type ActionPluginInfo

type ActionPluginInfo struct {
	*ActionPluginStaticInfo
	*PluginRuntimeInfo
}

type ActionPluginParams

type ActionPluginParams struct {
	*PluginDefaultParams
	Controller ActionPluginController
	Logger     *zap.SugaredLogger
}

type ActionPluginStaticInfo

type ActionPluginStaticInfo struct {
	*PluginStaticInfo

	MetricName      string
	MetricLabels    []string
	MatchConditions MatchConditions
	MatchMode       MatchMode
	MatchInvert     bool
}

type ActionResult

type ActionResult int
const (
	// ActionPass pass event to the next action in a pipeline
	ActionPass ActionResult = 0
	// ActionCollapse skip further processing of event and request next event from the same stream and source as current
	// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream
	ActionCollapse ActionResult = 2
	// ActionDiscard skip further processing of event and request next event from any stream and source
	ActionDiscard ActionResult = 1
	// ActionHold hold event in a plugin and request next event from the same stream and source as current.
	// same as ActionCollapse but held event should be manually committed or returned into pipeline.
	// check out Commit()/Propagate() functions in InputPluginController.
	// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream.
	ActionHold ActionResult = 3
)

type AnyConfig

type AnyConfig any

type AnyPlugin

type AnyPlugin any

type Batch

type Batch struct {
	Events []*Event
	// contains filtered or unexported fields
}

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func NewBatcher

func NewBatcher(opts BatcherOptions) *Batcher

func (*Batcher) Add

func (b *Batcher) Add(event *Event)

func (*Batcher) Start

func (b *Batcher) Start(ctx context.Context)

func (*Batcher) Stop

func (b *Batcher) Stop()

type BatcherMaintenanceFn

type BatcherMaintenanceFn func(*WorkerData)

type BatcherOptions added in v0.5.15

type BatcherOptions struct {
	PipelineName        string
	OutputType          string
	OutFn               BatcherOutFn
	MaintenanceFn       BatcherMaintenanceFn
	Controller          OutputPluginController
	Workers             int
	BatchSizeCount      int
	BatchSizeBytes      int
	FlushTimeout        time.Duration
	MaintenanceInterval time.Duration
}

type BatcherOutFn

type BatcherOutFn func(*WorkerData, *Batch)

type ConditionType

type ConditionType int
const (
	// UnknownSelector value is default, therefore it's safer to use it as default unknown value.
	UnknownSelector ConditionType = iota
	ByNameSelector
)

type DeltaWrapper added in v0.5.5

type DeltaWrapper struct {
	// contains filtered or unexported fields
}

DeltaWrapper acts as a wrapper around int64 and returns the difference between the new and the old value when a new value is set. This is useful for metric (see the maintenance function).

type Event

type Event struct {
	Root *insaneJSON.Root
	Buf  []byte

	SeqID      uint64
	Offset     int64
	SourceID   SourceID
	SourceName string

	Size int // last known event size, it may not be actual
	// contains filtered or unexported fields
}

func (*Event) Encode

func (e *Event) Encode(outBuf []byte) ([]byte, int)

func (*Event) IsIgnoreKind

func (e *Event) IsIgnoreKind() bool

func (*Event) IsRegularKind

func (e *Event) IsRegularKind() bool

func (*Event) IsTimeoutKind

func (e *Event) IsTimeoutKind() bool

func (*Event) IsUnlockKind

func (e *Event) IsUnlockKind() bool

func (*Event) SetIgnoreKind

func (e *Event) SetIgnoreKind()

func (*Event) SetTimeoutKind

func (e *Event) SetTimeoutKind()

func (*Event) SetUnlockKind

func (e *Event) SetUnlockKind()

func (*Event) StreamNameBytes

func (e *Event) StreamNameBytes() []byte

func (*Event) String

func (e *Event) String() string

func (*Event) SubparseJSON

func (e *Event) SubparseJSON(json []byte) (*insaneJSON.Node, error)

type InputPlugin

type InputPlugin interface {
	Start(config AnyConfig, params *InputPluginParams)
	Stop()
	Commit(*Event)
	RegisterMetrics(ctl *metric.Ctl)
	PassEvent(event *Event) bool
}

type InputPluginController

type InputPluginController interface {
	In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
	UseSpread()                           // don't use stream field and spread all events across all processors
	DisableStreams()                      // don't use stream field
	SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder
	IncReadOps()                          // inc read ops for metric
	IncMaxEventSizeExceeded()             // inc max event size exceeded counter
}

type InputPluginInfo

type InputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type InputPluginParams

type InputPluginParams struct {
	*PluginDefaultParams
	Controller InputPluginController
	Logger     *zap.SugaredLogger
}

type LogLevel added in v0.5.10

type LogLevel int
const (
	LevelUnknown LogLevel = iota - 1
	LevelEmergency
	LevelAlert
	LevelCritical
	LevelError
	LevelWarning
	LevelNotice
	LevelInformational
	LevelDebug
)

func ParseLevelAsNumber added in v0.5.10

func ParseLevelAsNumber(level string) LogLevel

ParseLevelAsNumber converts log level to the int representation according to the RFC-5424.

type MatchCondition

type MatchCondition struct {
	// Slice for nested fields. Separator is a dot symbol.
	Field  []string
	Values []string
	Regexp *regexp.Regexp
}

type MatchConditions

type MatchConditions []MatchCondition

type MatchMode

type MatchMode int
const (
	MatchModeAnd MatchMode = iota
	MatchModeOr
	MatchModeAndPrefix
	MatchModeOrPrefix
	MatchModeUnknown
)

func MatchModeFromString added in v0.7.3

func MatchModeFromString(mm string) MatchMode

type OutputPlugin

type OutputPlugin interface {
	Start(config AnyConfig, params *OutputPluginParams)
	Stop()
	Out(*Event)
	RegisterMetrics(ctl *metric.Ctl)
}

type OutputPluginController

type OutputPluginController interface {
	Commit(event *Event) // notify input plugin that event is successfully processed and save offsets
	Error(err string)
}

type OutputPluginInfo

type OutputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type OutputPluginParams

type OutputPluginParams struct {
	*PluginDefaultParams
	Controller OutputPluginController
	Logger     *zap.SugaredLogger
}

type Pipeline

type Pipeline struct {
	Name string

	Procs []*processor
	// contains filtered or unexported fields
}

func New

func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline

New creates new pipeline. Consider using `SetupHTTPHandlers` next.

func (*Pipeline) AddAction

func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)

func (*Pipeline) Commit

func (p *Pipeline) Commit(event *Event)

func (*Pipeline) DisableParallelism

func (p *Pipeline) DisableParallelism()

func (*Pipeline) DisableStreams

func (p *Pipeline) DisableStreams()

func (*Pipeline) EnableEventLog

func (p *Pipeline) EnableEventLog()

func (*Pipeline) Error

func (p *Pipeline) Error(err string)

func (*Pipeline) GetEventLogItem

func (p *Pipeline) GetEventLogItem(index int) string

func (*Pipeline) GetEventsTotal

func (p *Pipeline) GetEventsTotal() int

func (*Pipeline) GetInput

func (p *Pipeline) GetInput() InputPlugin

func (*Pipeline) GetOutput

func (p *Pipeline) GetOutput() OutputPlugin

func (*Pipeline) In

func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) (seqID uint64)

In decodes message and passes it to event stream.

func (*Pipeline) IncMaxEventSizeExceeded added in v0.5.11

func (p *Pipeline) IncMaxEventSizeExceeded()

func (*Pipeline) IncReadOps added in v0.5.5

func (p *Pipeline) IncReadOps()

func (*Pipeline) SetInput

func (p *Pipeline) SetInput(info *InputPluginInfo)

func (*Pipeline) SetOutput

func (p *Pipeline) SetOutput(info *OutputPluginInfo)

func (*Pipeline) SetupHTTPHandlers

func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)

SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.

func (*Pipeline) Start

func (p *Pipeline) Start()

func (*Pipeline) Stop

func (p *Pipeline) Stop()

func (*Pipeline) SuggestDecoder

func (p *Pipeline) SuggestDecoder(t decoder.DecoderType)

func (*Pipeline) UseSpread

func (p *Pipeline) UseSpread()

type PluginDefaultParams

type PluginDefaultParams struct {
	PipelineName     string
	PipelineSettings *Settings
}

type PluginFactory

type PluginFactory func() (AnyPlugin, AnyConfig)

type PluginKind

type PluginKind string
const (
	PluginKindInput  PluginKind = "input"
	PluginKindAction PluginKind = "action"
	PluginKindOutput PluginKind = "output"
)

type PluginRuntimeInfo

type PluginRuntimeInfo struct {
	Plugin AnyPlugin
	ID     string
}

type PluginSelector

type PluginSelector struct {
	CondType  ConditionType
	CondValue string
}

PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.

type PluginStaticInfo

type PluginStaticInfo struct {
	Type    string
	Factory PluginFactory
	Config  AnyConfig

	// Endpoints is the map of endpoint name to handlerFunc.
	// Every plugin can provide their own API through Endpoints.
	Endpoints         map[string]func(http.ResponseWriter, *http.Request)
	AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config
}

type PluginsStarterData

type PluginsStarterData struct {
	Config AnyConfig
	Params *OutputPluginParams
}

type PluginsStarterMap

type PluginsStarterMap map[string]PluginsStarterData

type Settings

type Settings struct {
	Decoder             string
	Capacity            int
	MaintenanceInterval time.Duration
	EventTimeout        time.Duration
	AntispamThreshold   int
	AvgEventSize        int
	MaxEventSize        int
	StreamField         string
	IsStrict            bool
}

type SourceID

type SourceID uint64

type StreamID added in v0.5.20

type StreamID uint64

type StreamName

type StreamName string

type WorkerData

type WorkerData any

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL