pipeline

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2022 License: BSD-3-Clause Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultStreamField         = "stream"
	DefaultCapacity            = 1024
	DefaultAvgInputEventSize   = 16 * 1024
	DefaultMaxInputEventSize   = 0
	DefaultJSONNodePoolSize    = 1024
	DefaultMaintenanceInterval = time.Second * 5
	DefaultEventTimeout        = time.Second * 30
	DefaultFieldValue          = "not_set"
	DefaultStreamName          = StreamName("not_set")
	DefaultWaitForPanicTimeout = time.Minute
)
View Source
const PromNamespace = "file_d"

Variables

This section is empty.

Functions

func ByteToStringUnsafe

func ByteToStringUnsafe(b []byte) string

func NewProcessor

func NewProcessor(
	metricsHolder *metricsHolder,
	activeCounter *atomic.Int32,
	output OutputPlugin,
	streamer *streamer,
	finalizeFn finalizeFn,
) *processor

func ParseFormatName

func ParseFormatName(formatName string) (string, error)

func ParseLevel

func ParseLevel(level string) int

func StringToByteUnsafe

func StringToByteUnsafe(s string) []byte

func TrimSpaceFunc

func TrimSpaceFunc(r rune) bool

Types

type ActionPlugin

type ActionPlugin interface {
	Start(config AnyConfig, params *ActionPluginParams)
	Stop()
	Do(*Event) ActionResult
}

type ActionPluginController

type ActionPluginController interface {
	Commit(event *Event)    // commit offset of held event and skip further processing
	Propagate(event *Event) // throw held event back to pipeline
}

type ActionPluginInfo

type ActionPluginInfo struct {
	*ActionPluginStaticInfo
	*PluginRuntimeInfo
}

type ActionPluginParams

type ActionPluginParams struct {
	*PluginDefaultParams
	Controller ActionPluginController
	Logger     *zap.SugaredLogger
}

type ActionPluginStaticInfo

type ActionPluginStaticInfo struct {
	*PluginStaticInfo

	MetricName      string
	MetricLabels    []string
	MatchConditions MatchConditions
	MatchMode       MatchMode
	MatchInvert     bool
}

type ActionResult

type ActionResult int
const (
	// ActionPass pass event to the next action in a pipeline
	ActionPass ActionResult = 0
	// ActionCollapse skip further processing of event and request next event from the same stream and source as current
	// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream
	ActionCollapse ActionResult = 2
	// ActionDiscard skip further processing of event and request next event from any stream and source
	ActionDiscard ActionResult = 1
	// ActionHold hold event in a plugin and request next event from the same stream and source as current.
	// same as ActionCollapse but held event should be manually committed or returned into pipeline.
	// check out Commit()/Propagate() functions in InputPluginController.
	// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream
	ActionHold ActionResult = 3
)

type AnyConfig

type AnyConfig interface{}

type AnyPlugin

type AnyPlugin interface{}

type Batch

type Batch struct {
	Events []*Event
	// contains filtered or unexported fields
}

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func NewBatcher

func NewBatcher(
	pipelineName string,
	outputType string,
	outFn BatcherOutFn,
	maintenanceFn BatcherMaintenanceFn,
	controller OutputPluginController,
	workers int,
	batchSize int,
	flushTimeout time.Duration,
	maintenanceInterval time.Duration,
) *Batcher

func (*Batcher) Add

func (b *Batcher) Add(event *Event)

func (*Batcher) Start

func (b *Batcher) Start()

func (*Batcher) Stop

func (b *Batcher) Stop()

type BatcherMaintenanceFn

type BatcherMaintenanceFn func(*WorkerData)

type BatcherOutFn

type BatcherOutFn func(*WorkerData, *Batch)

type ConditionType

type ConditionType int
const (
	// UnknownSelector value is default, therefore it's safer to use it as default unknown value.
	UnknownSelector ConditionType = iota
	ByNameSelector
)

type Event

type Event struct {
	Root *insaneJSON.Root
	Buf  []byte

	SeqID      uint64
	Offset     int64
	SourceID   SourceID
	SourceName string

	Size int // last known event size, it may not be actual
	// contains filtered or unexported fields
}

func (*Event) Encode

func (e *Event) Encode(outBuf []byte) ([]byte, int)

func (*Event) IsIgnoreKind

func (e *Event) IsIgnoreKind() bool

func (*Event) IsRegularKind

func (e *Event) IsRegularKind() bool

func (*Event) IsTimeoutKind

func (e *Event) IsTimeoutKind() bool

func (*Event) IsUnlockKind

func (e *Event) IsUnlockKind() bool

func (*Event) SetIgnoreKind

func (e *Event) SetIgnoreKind()

func (*Event) SetTimeoutKind

func (e *Event) SetTimeoutKind()

func (*Event) SetUnlockKind

func (e *Event) SetUnlockKind()

func (*Event) StreamNameBytes

func (e *Event) StreamNameBytes() []byte

func (*Event) String

func (e *Event) String() string

func (*Event) SubparseJSON

func (e *Event) SubparseJSON(json []byte) (*insaneJSON.Node, error)

type InputPlugin

type InputPlugin interface {
	Start(config AnyConfig, params *InputPluginParams)
	Stop()
	Commit(*Event)
}

type InputPluginController

type InputPluginController interface {
	In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
	UseSpread()                           // don't use stream field and spread all events across all processors
	DisableStreams()                      // don't use stream field
	SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder
}

type InputPluginInfo

type InputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type InputPluginParams

type InputPluginParams struct {
	*PluginDefaultParams
	Controller InputPluginController
	Logger     *zap.SugaredLogger
}

type MatchCondition

type MatchCondition struct {
	// Slice for nested fields. Separator is a dot symbol.
	Field  []string
	Value  string
	Regexp *regexp.Regexp
}

type MatchConditions

type MatchConditions []MatchCondition

type MatchMode

type MatchMode int
const (
	MatchModeAnd     MatchMode = 0
	MatchModeOr      MatchMode = 1
	MatchModeUnknown MatchMode = 2
)

type OutputPlugin

type OutputPlugin interface {
	Start(config AnyConfig, params *OutputPluginParams)
	Stop()
	Out(*Event)
}

type OutputPluginController

type OutputPluginController interface {
	Commit(event *Event) // notify input plugin that event is successfully processed and save offsets
	Error(err string)
}

type OutputPluginInfo

type OutputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type OutputPluginParams

type OutputPluginParams struct {
	*PluginDefaultParams
	Controller OutputPluginController
	Logger     *zap.SugaredLogger
}

type Pipeline

type Pipeline struct {
	Name string

	Procs []*processor
	// contains filtered or unexported fields
}

func New

func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline

New creates new pipeline. Consider using `SetupHTTPHandlers` next.

func (*Pipeline) AddAction

func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)

func (*Pipeline) Commit

func (p *Pipeline) Commit(event *Event)

func (*Pipeline) DisableParallelism

func (p *Pipeline) DisableParallelism()

func (*Pipeline) DisableStreams

func (p *Pipeline) DisableStreams()

func (*Pipeline) EnableEventLog

func (p *Pipeline) EnableEventLog()

func (*Pipeline) Error

func (p *Pipeline) Error(err string)

func (*Pipeline) GetEventLogItem

func (p *Pipeline) GetEventLogItem(index int) string

func (*Pipeline) GetEventsTotal

func (p *Pipeline) GetEventsTotal() int

func (*Pipeline) GetInput

func (p *Pipeline) GetInput() InputPlugin

func (*Pipeline) GetOutput

func (p *Pipeline) GetOutput() OutputPlugin

func (*Pipeline) In

func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) uint64

func (*Pipeline) SetInput

func (p *Pipeline) SetInput(info *InputPluginInfo)

func (*Pipeline) SetOutput

func (p *Pipeline) SetOutput(info *OutputPluginInfo)

func (*Pipeline) SetupHTTPHandlers

func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)

SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.

func (*Pipeline) Start

func (p *Pipeline) Start()

func (*Pipeline) Stop

func (p *Pipeline) Stop()

func (*Pipeline) SuggestDecoder

func (p *Pipeline) SuggestDecoder(t decoder.DecoderType)

func (*Pipeline) UseSpread

func (p *Pipeline) UseSpread()

type PluginDefaultParams

type PluginDefaultParams struct {
	PipelineName     string
	PipelineSettings *Settings
}

type PluginFactory

type PluginFactory func() (AnyPlugin, AnyConfig)

type PluginKind

type PluginKind string
const (
	PluginKindInput  PluginKind = "input"
	PluginKindAction PluginKind = "action"
	PluginKindOutput PluginKind = "output"
)

type PluginRuntimeInfo

type PluginRuntimeInfo struct {
	Plugin AnyPlugin
	ID     string
}

type PluginSelector

type PluginSelector struct {
	CondType  ConditionType
	CondValue string
}

PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.

type PluginStaticInfo

type PluginStaticInfo struct {
	Type    string
	Factory PluginFactory
	Config  AnyConfig

	// Endpoints is the map of endpoint name to handlerFunc.
	// Every plugin can provide their own API through Endpoints.
	Endpoints         map[string]func(http.ResponseWriter, *http.Request)
	AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config
}

type PluginsStarterData

type PluginsStarterData struct {
	Config AnyConfig
	Params *OutputPluginParams
}

type PluginsStarterMap

type PluginsStarterMap map[string]PluginsStarterData

type Settings

type Settings struct {
	Decoder             string
	Capacity            int
	MaintenanceInterval time.Duration
	EventTimeout        time.Duration
	AntispamThreshold   int
	AvgEventSize        int
	MaxEventSize        int
	StreamField         string
	IsStrict            bool
}

type SourceID

type SourceID uint64

type StreamName

type StreamName string

type WorkerData

type WorkerData interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL