Documentation ¶
Index ¶
- Constants
- Variables
- func ByteToStringUnsafe(b []byte) string
- func CloneString(s string) string
- func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node
- func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff
- func ParseFormatName(formatName string) (string, error)
- func ParseLevelAsString(level string) string
- func ParseTime(format, value string) (time.Time, error)
- func StringToByteUnsafe(s string) []byte
- type ActionPlugin
- type ActionPluginController
- type ActionPluginInfo
- type ActionPluginParams
- type ActionPluginStaticInfo
- type ActionResult
- type AnyConfig
- type AnyPlugin
- type BackoffOpts
- type Batch
- type BatchStatus
- type Batcher
- type BatcherMaintenanceFn
- type BatcherOptions
- type BatcherOutFn
- type ConditionType
- type DeltaWrapper
- type Event
- func (e *Event) Encode(outBuf []byte) ([]byte, int)
- func (e *Event) IsChildKind() bool
- func (e *Event) IsChildParentKind() bool
- func (e *Event) IsIgnoreKind() bool
- func (e *Event) IsRegularKind() bool
- func (e *Event) IsTimeoutKind() bool
- func (e *Event) IsUnlockKind() bool
- func (e *Event) SetChildKind()
- func (e *Event) SetChildParentKind()
- func (e *Event) SetTimeoutKind()
- func (e *Event) SetUnlockKind()
- func (e *Event) StreamNameBytes() []byte
- func (e *Event) String() string
- type InputPlugin
- type InputPluginController
- type InputPluginInfo
- type InputPluginParams
- type Kind
- type LogLevel
- type MatchCondition
- type MatchConditions
- type MatchMode
- type Offsets
- type OutputPlugin
- type OutputPluginController
- type OutputPluginInfo
- type OutputPluginParams
- type Pipeline
- func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)
- func (p *Pipeline) Commit(event *Event)
- func (p *Pipeline) DisableParallelism()
- func (p *Pipeline) DisableStreams()
- func (p *Pipeline) EnableEventLog()
- func (p *Pipeline) Error(err string)
- func (p *Pipeline) GetEventLogItem(index int) string
- func (p *Pipeline) GetEventsTotal() int
- func (p *Pipeline) GetInput() InputPlugin
- func (p *Pipeline) GetOutput() OutputPlugin
- func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, bytes []byte, ...) (seqID uint64)
- func (p *Pipeline) IncCountEventPanicsRecovered()
- func (p *Pipeline) IncMaxEventSizeExceeded(lvs ...string)
- func (p *Pipeline) IncReadOps()
- func (p *Pipeline) SetInput(info *InputPluginInfo)
- func (p *Pipeline) SetOutput(info *OutputPluginInfo)
- func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)
- func (p *Pipeline) Start()
- func (p *Pipeline) Stop()
- func (p *Pipeline) SuggestDecoder(t decoder.Type)
- func (p *Pipeline) UseSpread()
- type PluginDefaultParams
- type PluginFactory
- type PluginKind
- type PluginRuntimeInfo
- type PluginSelector
- type PluginStaticInfo
- type PluginsStarterData
- type PluginsStarterMap
- type PoolType
- type RetriableBatcher
- type RetriableBatcherOutFn
- type Settings
- type SliceMap
- type SourceID
- type StreamID
- type StreamName
- type WorkerData
Constants ¶
const ( DefaultAntispamThreshold = 0 DefaultSourceNameMetaField = "" DefaultDecoder = "auto" DefaultIsStrict = false DefaultStreamField = "stream" DefaultCapacity = 1024 DefaultAvgInputEventSize = 4 * 1024 DefaultMaxInputEventSize = 0 DefaultCutOffEventByLimit = false DefaultCutOffEventByLimitField = "" DefaultJSONNodePoolSize = 1024 DefaultMaintenanceInterval = time.Second * 5 DefaultEventTimeout = time.Second * 30 DefaultFieldValue = "not_set" DefaultStreamName = StreamName("not_set") DefaultMetricHoldDuration = time.Minute * 30 DefaultMetaCacheSize = 1024 EventSeqIDError = uint64(0) )
const (
ByNameSelector = iota + 1
)
const LevelUnknownStr = ""
const (
UnixTime = "unixtime"
)
Variables ¶
var MatchModes = map[string]MatchMode{ "": MatchModeAnd, "and": MatchModeAnd, "or": MatchModeOr, "and_prefix": MatchModeAndPrefix, "or_prefix": MatchModeOrPrefix, }
Functions ¶
func ByteToStringUnsafe ¶
ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)
func CreateNestedField ¶ added in v0.5.10
func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node
CreateNestedField creates nested field by the path. For example, []string{"path.to", "object"} creates: { "path.to": {"object": {} } Warn: it overrides fields if it contains non-object type on the path. For example: in: { "path.to": [{"userId":"12345"}] }, out: { "path.to": {"object": {}} }
func GetBackoff ¶ added in v0.21.0
func ParseFormatName ¶
func ParseLevelAsString ¶ added in v0.5.10
ParseLevelAsString converts log level to the string representation according to the RFC-5424.
func StringToByteUnsafe ¶
StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)
Types ¶
type ActionPlugin ¶
type ActionPlugin interface { Start(config AnyConfig, params *ActionPluginParams) Stop() Do(*Event) ActionResult }
type ActionPluginController ¶
type ActionPluginController interface { Propagate(event *Event) // throw held event back to pipeline Spawn(parent *Event, nodes []*insaneJSON.Node) IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter }
type ActionPluginInfo ¶
type ActionPluginInfo struct { *ActionPluginStaticInfo *PluginRuntimeInfo }
type ActionPluginParams ¶
type ActionPluginParams struct { PluginDefaultParams Controller ActionPluginController Logger *zap.SugaredLogger }
type ActionPluginStaticInfo ¶
type ActionPluginStaticInfo struct { *PluginStaticInfo MetricName string MetricLabels []string MetricSkipStatus bool MatchConditions MatchConditions MatchMode MatchMode MatchInvert bool DoIfChecker *doif.Checker }
type ActionResult ¶
type ActionResult int
const ( // ActionPass pass event to the next action in a pipeline ActionPass ActionResult = iota // ActionCollapse skip further processing of event and request next event from the same stream and source as current // plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream ActionCollapse // ActionDiscard skip further processing of event and request next event from any stream and source ActionDiscard // ActionHold hold event in a plugin and request next event from the same stream and source as current. // same as ActionCollapse but held event should be manually committed or returned into pipeline. // check out Commit()/Propagate() functions in InputPluginController. // plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream. ActionHold // ActionBreak abort the event processing and pass it to an output. ActionBreak )
type BackoffOpts ¶ added in v0.21.0
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
func NewPreparedBatch ¶ added in v0.16.2
type BatchStatus ¶ added in v0.12.3
type BatchStatus byte
const ( BatchStatusNotReady BatchStatus = iota BatchStatusMaxSizeExceeded BatchStatusTimeoutExceeded )
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
func NewBatcher ¶
func NewBatcher(opts BatcherOptions) *Batcher
type BatcherMaintenanceFn ¶
type BatcherMaintenanceFn func(*WorkerData)
type BatcherOptions ¶ added in v0.5.15
type BatcherOptions struct { PipelineName string OutputType string OutFn BatcherOutFn MaintenanceFn BatcherMaintenanceFn Controller OutputPluginController Workers int BatchSizeCount int BatchSizeBytes int FlushTimeout time.Duration MaintenanceInterval time.Duration MetricCtl *metric.Ctl }
type BatcherOutFn ¶
type BatcherOutFn func(*WorkerData, *Batch)
type ConditionType ¶
type ConditionType int
type DeltaWrapper ¶ added in v0.5.5
type DeltaWrapper struct {
// contains filtered or unexported fields
}
DeltaWrapper acts as a wrapper around int64 and returns the difference between the new and the old value when a new value is set. This is useful for metric (see the maintenance function).
type Event ¶
type Event struct { Root *insaneJSON.Root Buf []byte SeqID uint64 Offset int64 SourceID SourceID SourceName string // Size in bytes of the raw event before any action plugin. Size int // contains filtered or unexported fields }
func (*Event) IsChildKind ¶ added in v0.16.2
func (*Event) IsChildParentKind ¶ added in v0.16.2
func (*Event) IsIgnoreKind ¶
func (*Event) IsRegularKind ¶
func (*Event) IsTimeoutKind ¶
func (*Event) IsUnlockKind ¶
func (*Event) SetChildKind ¶ added in v0.16.2
func (e *Event) SetChildKind()
func (*Event) SetChildParentKind ¶ added in v0.16.2
func (e *Event) SetChildParentKind()
func (*Event) SetTimeoutKind ¶
func (e *Event) SetTimeoutKind()
func (*Event) SetUnlockKind ¶
func (e *Event) SetUnlockKind()
func (*Event) StreamNameBytes ¶
type InputPlugin ¶
type InputPlugin interface { Start(config AnyConfig, params *InputPluginParams) Stop() Commit(*Event) PassEvent(event *Event) bool }
type InputPluginController ¶
type InputPluginController interface { In(sourceID SourceID, sourceName string, offsets Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64 UseSpread() // don't use stream field and spread all events across all processors DisableStreams() // don't use stream field SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder IncReadOps() // inc read ops for metric IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter }
type InputPluginInfo ¶
type InputPluginInfo struct { *PluginStaticInfo *PluginRuntimeInfo }
type InputPluginParams ¶
type InputPluginParams struct { PluginDefaultParams Controller InputPluginController Logger *zap.SugaredLogger }
type LogLevel ¶ added in v0.5.10
type LogLevel int
func ParseLevelAsNumber ¶ added in v0.5.10
ParseLevelAsNumber converts log level to the int representation according to the RFC-5424.
type MatchCondition ¶
type MatchConditions ¶
type MatchConditions []MatchCondition
type Offsets ¶ added in v0.40.4
type Offsets struct {
// contains filtered or unexported fields
}
func NewOffsets ¶ added in v0.41.1
type OutputPlugin ¶
type OutputPlugin interface { Start(config AnyConfig, params *OutputPluginParams) Stop() Out(*Event) }
type OutputPluginController ¶
type OutputPluginInfo ¶
type OutputPluginInfo struct { *PluginStaticInfo *PluginRuntimeInfo }
type OutputPluginParams ¶
type OutputPluginParams struct { PluginDefaultParams Controller OutputPluginController Logger *zap.SugaredLogger }
type Pipeline ¶
type Pipeline struct { Name string Procs []*processor // contains filtered or unexported fields }
func New ¶
func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline
New creates new pipeline. Consider using `SetupHTTPHandlers` next.
func (*Pipeline) AddAction ¶
func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)
func (*Pipeline) DisableParallelism ¶
func (p *Pipeline) DisableParallelism()
func (*Pipeline) DisableStreams ¶
func (p *Pipeline) DisableStreams()
func (*Pipeline) EnableEventLog ¶
func (p *Pipeline) EnableEventLog()
func (*Pipeline) GetEventLogItem ¶
func (*Pipeline) GetEventsTotal ¶
func (*Pipeline) GetInput ¶
func (p *Pipeline) GetInput() InputPlugin
func (*Pipeline) GetOutput ¶
func (p *Pipeline) GetOutput() OutputPlugin
func (*Pipeline) In ¶
func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64)
In decodes message and passes it to event stream.
func (*Pipeline) IncCountEventPanicsRecovered ¶ added in v0.40.2
func (p *Pipeline) IncCountEventPanicsRecovered()
func (*Pipeline) IncMaxEventSizeExceeded ¶ added in v0.5.11
func (*Pipeline) IncReadOps ¶ added in v0.5.5
func (p *Pipeline) IncReadOps()
func (*Pipeline) SetInput ¶
func (p *Pipeline) SetInput(info *InputPluginInfo)
func (*Pipeline) SetOutput ¶
func (p *Pipeline) SetOutput(info *OutputPluginInfo)
func (*Pipeline) SetupHTTPHandlers ¶
SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.
func (*Pipeline) SuggestDecoder ¶
type PluginDefaultParams ¶
type PluginFactory ¶
type PluginKind ¶
type PluginKind string
const ( PluginKindInput PluginKind = "input" PluginKindAction PluginKind = "action" PluginKindOutput PluginKind = "output" )
type PluginRuntimeInfo ¶
type PluginSelector ¶
type PluginSelector struct { CondType ConditionType CondValue string }
PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.
type PluginStaticInfo ¶
type PluginStaticInfo struct { Type string Factory PluginFactory Config AnyConfig // Endpoints is the map of endpoint name to handlerFunc. // Every plugin can provide their own API through Endpoints. Endpoints map[string]func(http.ResponseWriter, *http.Request) AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config }
type PluginsStarterData ¶
type PluginsStarterData struct { Config AnyConfig Params *OutputPluginParams }
type PluginsStarterMap ¶
type PluginsStarterMap map[string]PluginsStarterData
type RetriableBatcher ¶ added in v0.21.0
type RetriableBatcher struct {
// contains filtered or unexported fields
}
func NewRetriableBatcher ¶ added in v0.21.0
func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher
func (*RetriableBatcher) Add ¶ added in v0.21.0
func (b *RetriableBatcher) Add(event *Event)
func (*RetriableBatcher) Out ¶ added in v0.21.0
func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch)
func (*RetriableBatcher) Start ¶ added in v0.21.0
func (b *RetriableBatcher) Start(ctx context.Context)
func (*RetriableBatcher) Stop ¶ added in v0.21.0
func (b *RetriableBatcher) Stop()
type RetriableBatcherOutFn ¶ added in v0.21.0
type RetriableBatcherOutFn func(*WorkerData, *Batch) error
type Settings ¶
type Settings struct { Decoder string DecoderParams map[string]any Capacity int MetaCacheSize int MaintenanceInterval time.Duration EventTimeout time.Duration AntispamThreshold int AntispamExceptions antispam.Exceptions SourceNameMetaField string AvgEventSize int MaxEventSize int CutOffEventByLimit bool CutOffEventByLimitField string StreamField string IsStrict bool MetricHoldDuration time.Duration Pool PoolType }
type SliceMap ¶ added in v0.41.1
type SliceMap []kv
SliceMap is a map of streamName to offset. It could be just map[k]v, but Go > 1.17 internal map implementation can't work with mutable strings that occurs when using unsafe cast from []byte. Also it should be not slower on 1-2 keys like linked list, which is often the case for streams per job.
func SliceFromMap ¶ added in v0.41.1
func SliceFromMap(m map[StreamName]int64) SliceMap
func (*SliceMap) Get ¶ added in v0.41.1
func (so *SliceMap) Get(streamName StreamName) (int64, bool)
func (*SliceMap) Set ¶ added in v0.41.1
func (so *SliceMap) Set(streamName StreamName, offset int64)
type StreamName ¶
type StreamName string
type WorkerData ¶
type WorkerData any