pipeline

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2022 License: BSD-3-Clause Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultStreamField         = "stream"
	DefaultCapacity            = 1024
	DefaultAvgInputEventSize   = 16 * 1024
	DefaultMaxInputEventSize   = 0
	DefaultJSONNodePoolSize    = 1024
	DefaultMaintenanceInterval = time.Second * 5
	DefaultEventTimeout        = time.Second * 30
	DefaultFieldValue          = "not_set"
	DefaultStreamName          = StreamName("not_set")
	DefaultWaitForPanicTimeout = time.Minute
)
View Source
const PromNamespace = "file_d"

Variables

This section is empty.

Functions

func ByteToStringUnsafe

func ByteToStringUnsafe(b []byte) string

ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)

func CloneString added in v0.4.8

func CloneString(s string) string

Clone deeply copies string

func NewProcessor

func NewProcessor(
	metricsHolder *metricsHolder,
	activeCounter *atomic.Int32,
	output OutputPlugin,
	streamer *streamer,
	finalizeFn finalizeFn,
) *processor

func ParseFormatName

func ParseFormatName(formatName string) (string, error)

func ParseLevel

func ParseLevel(level string) int

func StringToByteUnsafe

func StringToByteUnsafe(s string) []byte

StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)

func TrimSpaceFunc

func TrimSpaceFunc(r rune) bool

Types

type ActionPlugin

type ActionPlugin interface {
	Start(config AnyConfig, params *ActionPluginParams)
	Stop()
	Do(*Event) ActionResult
}

type ActionPluginController

type ActionPluginController interface {
	Commit(event *Event)    // commit offset of held event and skip further processing
	Propagate(event *Event) // throw held event back to pipeline
}

type ActionPluginInfo

type ActionPluginInfo struct {
	*ActionPluginStaticInfo
	*PluginRuntimeInfo
}

type ActionPluginParams

type ActionPluginParams struct {
	*PluginDefaultParams
	Controller ActionPluginController
	Logger     *zap.SugaredLogger
}

type ActionPluginStaticInfo

type ActionPluginStaticInfo struct {
	*PluginStaticInfo

	MetricName      string
	MetricLabels    []string
	MatchConditions MatchConditions
	MatchMode       MatchMode
	MatchInvert     bool
}

type ActionResult

type ActionResult int
const (
	// ActionPass pass event to the next action in a pipeline
	ActionPass ActionResult = 0
	// ActionCollapse skip further processing of event and request next event from the same stream and source as current
	// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream
	ActionCollapse ActionResult = 2
	// ActionDiscard skip further processing of event and request next event from any stream and source
	ActionDiscard ActionResult = 1
	// ActionHold hold event in a plugin and request next event from the same stream and source as current.
	// same as ActionCollapse but held event should be manually committed or returned into pipeline.
	// check out Commit()/Propagate() functions in InputPluginController.
	// plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream.
	ActionHold ActionResult = 3
)

type AnyConfig

type AnyConfig interface{}

type AnyPlugin

type AnyPlugin interface{}

type Batch

type Batch struct {
	Events []*Event
	// contains filtered or unexported fields
}

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func NewBatcher

func NewBatcher(
	pipelineName string,
	outputType string,
	outFn BatcherOutFn,
	maintenanceFn BatcherMaintenanceFn,
	controller OutputPluginController,
	workers int,
	batchSize int,
	flushTimeout time.Duration,
	maintenanceInterval time.Duration,
) *Batcher

func (*Batcher) Add

func (b *Batcher) Add(event *Event)

func (*Batcher) Start

func (b *Batcher) Start(ctx context.Context)

func (*Batcher) Stop

func (b *Batcher) Stop()

type BatcherMaintenanceFn

type BatcherMaintenanceFn func(*WorkerData)

type BatcherOutFn

type BatcherOutFn func(*WorkerData, *Batch)

type ConditionType

type ConditionType int
const (
	// UnknownSelector value is default, therefore it's safer to use it as default unknown value.
	UnknownSelector ConditionType = iota
	ByNameSelector
)

type Event

type Event struct {
	Root *insaneJSON.Root
	Buf  []byte

	SeqID      uint64
	Offset     int64
	SourceID   SourceID
	SourceName string

	Size int // last known event size, it may not be actual
	// contains filtered or unexported fields
}

func (*Event) Encode

func (e *Event) Encode(outBuf []byte) ([]byte, int)

func (*Event) IsIgnoreKind

func (e *Event) IsIgnoreKind() bool

func (*Event) IsRegularKind

func (e *Event) IsRegularKind() bool

func (*Event) IsTimeoutKind

func (e *Event) IsTimeoutKind() bool

func (*Event) IsUnlockKind

func (e *Event) IsUnlockKind() bool

func (*Event) SetIgnoreKind

func (e *Event) SetIgnoreKind()

func (*Event) SetTimeoutKind

func (e *Event) SetTimeoutKind()

func (*Event) SetUnlockKind

func (e *Event) SetUnlockKind()

func (*Event) StreamNameBytes

func (e *Event) StreamNameBytes() []byte

func (*Event) String

func (e *Event) String() string

func (*Event) SubparseJSON

func (e *Event) SubparseJSON(json []byte) (*insaneJSON.Node, error)

type InputPlugin

type InputPlugin interface {
	Start(config AnyConfig, params *InputPluginParams)
	Stop()
	Commit(*Event)
}

type InputPluginController

type InputPluginController interface {
	In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64
	UseSpread()                           // don't use stream field and spread all events across all processors
	DisableStreams()                      // don't use stream field
	SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder
}

type InputPluginInfo

type InputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type InputPluginParams

type InputPluginParams struct {
	*PluginDefaultParams
	Controller InputPluginController
	Logger     *zap.SugaredLogger
}

type MatchCondition

type MatchCondition struct {
	// Slice for nested fields. Separator is a dot symbol.
	Field  []string
	Value  string
	Regexp *regexp.Regexp
}

type MatchConditions

type MatchConditions []MatchCondition

type MatchMode

type MatchMode int
const (
	MatchModeAnd     MatchMode = 0
	MatchModeOr      MatchMode = 1
	MatchModeUnknown MatchMode = 2
)

type OutputPlugin

type OutputPlugin interface {
	Start(config AnyConfig, params *OutputPluginParams)
	Stop()
	Out(*Event)
}

type OutputPluginController

type OutputPluginController interface {
	Commit(event *Event) // notify input plugin that event is successfully processed and save offsets
	Error(err string)
}

type OutputPluginInfo

type OutputPluginInfo struct {
	*PluginStaticInfo
	*PluginRuntimeInfo
}

type OutputPluginParams

type OutputPluginParams struct {
	*PluginDefaultParams
	Controller OutputPluginController
	Logger     *zap.SugaredLogger
}

type Pipeline

type Pipeline struct {
	Name string

	Procs []*processor
	// contains filtered or unexported fields
}

func New

func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline

New creates new pipeline. Consider using `SetupHTTPHandlers` next.

func (*Pipeline) AddAction

func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)

func (*Pipeline) Commit

func (p *Pipeline) Commit(event *Event)

func (*Pipeline) DisableParallelism

func (p *Pipeline) DisableParallelism()

func (*Pipeline) DisableStreams

func (p *Pipeline) DisableStreams()

func (*Pipeline) EnableEventLog

func (p *Pipeline) EnableEventLog()

func (*Pipeline) Error

func (p *Pipeline) Error(err string)

func (*Pipeline) GetEventLogItem

func (p *Pipeline) GetEventLogItem(index int) string

func (*Pipeline) GetEventsTotal

func (p *Pipeline) GetEventsTotal() int

func (*Pipeline) GetInput

func (p *Pipeline) GetInput() InputPlugin

func (*Pipeline) GetOutput

func (p *Pipeline) GetOutput() OutputPlugin

func (*Pipeline) In

func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) uint64

func (*Pipeline) SetInput

func (p *Pipeline) SetInput(info *InputPluginInfo)

func (*Pipeline) SetOutput

func (p *Pipeline) SetOutput(info *OutputPluginInfo)

func (*Pipeline) SetupHTTPHandlers

func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)

SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.

func (*Pipeline) Start

func (p *Pipeline) Start()

func (*Pipeline) Stop

func (p *Pipeline) Stop()

func (*Pipeline) SuggestDecoder

func (p *Pipeline) SuggestDecoder(t decoder.DecoderType)

func (*Pipeline) UseSpread

func (p *Pipeline) UseSpread()

type PluginDefaultParams

type PluginDefaultParams struct {
	PipelineName     string
	PipelineSettings *Settings
}

type PluginFactory

type PluginFactory func() (AnyPlugin, AnyConfig)

type PluginKind

type PluginKind string
const (
	PluginKindInput  PluginKind = "input"
	PluginKindAction PluginKind = "action"
	PluginKindOutput PluginKind = "output"
)

type PluginRuntimeInfo

type PluginRuntimeInfo struct {
	Plugin AnyPlugin
	ID     string
}

type PluginSelector

type PluginSelector struct {
	CondType  ConditionType
	CondValue string
}

PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.

type PluginStaticInfo

type PluginStaticInfo struct {
	Type    string
	Factory PluginFactory
	Config  AnyConfig

	// Endpoints is the map of endpoint name to handlerFunc.
	// Every plugin can provide their own API through Endpoints.
	Endpoints         map[string]func(http.ResponseWriter, *http.Request)
	AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config
}

type PluginsStarterData

type PluginsStarterData struct {
	Config AnyConfig
	Params *OutputPluginParams
}

type PluginsStarterMap

type PluginsStarterMap map[string]PluginsStarterData

type Settings

type Settings struct {
	Decoder             string
	Capacity            int
	MaintenanceInterval time.Duration
	EventTimeout        time.Duration
	AntispamThreshold   int
	AvgEventSize        int
	MaxEventSize        int
	StreamField         string
	IsStrict            bool
}

type SourceID

type SourceID uint64

type StreamName

type StreamName string

type WorkerData

type WorkerData interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL