Documentation ¶
Index ¶
- Constants
- Variables
- func ByteToStringUnsafe(b []byte) string
- func CloneString(s string) string
- func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node
- func GetBackoff(minRetention time.Duration, multiplier float64, attemptNum uint64) backoff.BackOff
- func ParseFormatName(formatName string) (string, error)
- func ParseLevelAsString(level string) string
- func ParseTime(format, value string) (time.Time, error)
- func StringToByteUnsafe(s string) []byte
- type ActionPlugin
- type ActionPluginController
- type ActionPluginInfo
- type ActionPluginParams
- type ActionPluginStaticInfo
- type ActionResult
- type AnyConfig
- type AnyPlugin
- type BackoffOpts
- type Batch
- type BatchStatus
- type Batcher
- type BatcherMaintenanceFn
- type BatcherOptions
- type BatcherOutFn
- type ConditionType
- type DeltaWrapper
- type Event
- func (e *Event) Encode(outBuf []byte) ([]byte, int)
- func (e *Event) IsChildKind() bool
- func (e *Event) IsChildParentKind() bool
- func (e *Event) IsIgnoreKind() bool
- func (e *Event) IsRegularKind() bool
- func (e *Event) IsTimeoutKind() bool
- func (e *Event) IsUnlockKind() bool
- func (e *Event) SetChildKind()
- func (e *Event) SetChildParentKind()
- func (e *Event) SetTimeoutKind()
- func (e *Event) SetUnlockKind()
- func (e *Event) StreamNameBytes() []byte
- func (e *Event) String() string
- type InputPlugin
- type InputPluginController
- type InputPluginInfo
- type InputPluginParams
- type Kind
- type LogLevel
- type MatchCondition
- type MatchConditions
- type MatchMode
- type OutputPlugin
- type OutputPluginController
- type OutputPluginInfo
- type OutputPluginParams
- type Pipeline
- func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)
- func (p *Pipeline) Commit(event *Event)
- func (p *Pipeline) DisableParallelism()
- func (p *Pipeline) DisableStreams()
- func (p *Pipeline) EnableEventLog()
- func (p *Pipeline) Error(err string)
- func (p *Pipeline) GetEventLogItem(index int) string
- func (p *Pipeline) GetEventsTotal() int
- func (p *Pipeline) GetInput() InputPlugin
- func (p *Pipeline) GetOutput() OutputPlugin
- func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, ...) (seqID uint64)
- func (p *Pipeline) IncMaxEventSizeExceeded()
- func (p *Pipeline) IncReadOps()
- func (p *Pipeline) SetInput(info *InputPluginInfo)
- func (p *Pipeline) SetOutput(info *OutputPluginInfo)
- func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)
- func (p *Pipeline) Start()
- func (p *Pipeline) Stop()
- func (p *Pipeline) SuggestDecoder(t decoder.Type)
- func (p *Pipeline) UseSpread()
- type PluginDefaultParams
- type PluginFactory
- type PluginKind
- type PluginRuntimeInfo
- type PluginSelector
- type PluginStaticInfo
- type PluginsStarterData
- type PluginsStarterMap
- type RetriableBatcher
- type RetriableBatcherOutFn
- type Settings
- type SourceID
- type StreamID
- type StreamName
- type WorkerData
Constants ¶
const ( DefaultStreamField = "stream" DefaultCapacity = 1024 DefaultAvgInputEventSize = 4 * 1024 DefaultMaxInputEventSize = 0 DefaultJSONNodePoolSize = 1024 DefaultMaintenanceInterval = time.Second * 5 DefaultEventTimeout = time.Second * 30 DefaultFieldValue = "not_set" DefaultStreamName = StreamName("not_set") DefaultMetricHoldDuration = time.Minute * 30 EventSeqIDError = uint64(0) )
const (
ByNameSelector = iota + 1
)
const LevelUnknownStr = ""
const (
UnixTime = "unixtime"
)
Variables ¶
var MatchModes = map[string]MatchMode{ "": MatchModeAnd, "and": MatchModeAnd, "or": MatchModeOr, "and_prefix": MatchModeAndPrefix, "or_prefix": MatchModeOrPrefix, }
Functions ¶
func ByteToStringUnsafe ¶
ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)
func CreateNestedField ¶ added in v0.5.10
func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node
CreateNestedField creates nested field by the path. For example, []string{"path.to", "object"} creates: { "path.to": {"object": {} } Warn: it overrides fields if it contains non-object type on the path. For example: in: { "path.to": [{"userId":"12345"}] }, out: { "path.to": {"object": {}} }
func GetBackoff ¶ added in v0.21.0
func ParseFormatName ¶
func ParseLevelAsString ¶ added in v0.5.10
ParseLevelAsString converts log level to the string representation according to the RFC-5424.
func StringToByteUnsafe ¶
StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)
Types ¶
type ActionPlugin ¶
type ActionPlugin interface { Start(config AnyConfig, params *ActionPluginParams) Stop() Do(*Event) ActionResult }
type ActionPluginController ¶
type ActionPluginController interface { Propagate(event *Event) // throw held event back to pipeline Spawn(parent *Event, nodes []*insaneJSON.Node) IncMaxEventSizeExceeded() // inc max event size exceeded counter }
type ActionPluginInfo ¶
type ActionPluginInfo struct { *ActionPluginStaticInfo *PluginRuntimeInfo }
type ActionPluginParams ¶
type ActionPluginParams struct { PluginDefaultParams Controller ActionPluginController Logger *zap.SugaredLogger }
type ActionPluginStaticInfo ¶
type ActionPluginStaticInfo struct { *PluginStaticInfo MetricName string MetricLabels []string MetricSkipStatus bool MatchConditions MatchConditions MatchMode MatchMode MatchInvert bool DoIfChecker *doif.Checker }
type ActionResult ¶
type ActionResult int
const ( // ActionPass pass event to the next action in a pipeline ActionPass ActionResult = iota // ActionCollapse skip further processing of event and request next event from the same stream and source as current // plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream ActionCollapse // ActionDiscard skip further processing of event and request next event from any stream and source ActionDiscard // ActionHold hold event in a plugin and request next event from the same stream and source as current. // same as ActionCollapse but held event should be manually committed or returned into pipeline. // check out Commit()/Propagate() functions in InputPluginController. // plugin may receive event with EventKindTimeout if it takes to long to read next event from same stream. ActionHold // ActionBreak abort the event processing and pass it to an output. ActionBreak )
type BackoffOpts ¶ added in v0.21.0
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
func NewPreparedBatch ¶ added in v0.16.2
type BatchStatus ¶ added in v0.12.3
type BatchStatus byte
const ( BatchStatusNotReady BatchStatus = iota BatchStatusMaxSizeExceeded BatchStatusTimeoutExceeded )
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
func NewBatcher ¶
func NewBatcher(opts BatcherOptions) *Batcher
type BatcherMaintenanceFn ¶
type BatcherMaintenanceFn func(*WorkerData)
type BatcherOptions ¶ added in v0.5.15
type BatcherOptions struct { PipelineName string OutputType string OutFn BatcherOutFn MaintenanceFn BatcherMaintenanceFn Controller OutputPluginController Workers int BatchSizeCount int BatchSizeBytes int FlushTimeout time.Duration MaintenanceInterval time.Duration MetricCtl *metric.Ctl }
type BatcherOutFn ¶
type BatcherOutFn func(*WorkerData, *Batch)
type ConditionType ¶
type ConditionType int
type DeltaWrapper ¶ added in v0.5.5
type DeltaWrapper struct {
// contains filtered or unexported fields
}
DeltaWrapper acts as a wrapper around int64 and returns the difference between the new and the old value when a new value is set. This is useful for metric (see the maintenance function).
type Event ¶
type Event struct { Root *insaneJSON.Root Buf []byte SeqID uint64 Offset int64 SourceID SourceID SourceName string Size int // last known event size, it may not be actual // contains filtered or unexported fields }
func (*Event) IsChildKind ¶ added in v0.16.2
func (*Event) IsChildParentKind ¶ added in v0.16.2
func (*Event) IsIgnoreKind ¶
func (*Event) IsRegularKind ¶
func (*Event) IsTimeoutKind ¶
func (*Event) IsUnlockKind ¶
func (*Event) SetChildKind ¶ added in v0.16.2
func (e *Event) SetChildKind()
func (*Event) SetChildParentKind ¶ added in v0.16.2
func (e *Event) SetChildParentKind()
func (*Event) SetTimeoutKind ¶
func (e *Event) SetTimeoutKind()
func (*Event) SetUnlockKind ¶
func (e *Event) SetUnlockKind()
func (*Event) StreamNameBytes ¶
type InputPlugin ¶
type InputPlugin interface { Start(config AnyConfig, params *InputPluginParams) Stop() Commit(*Event) PassEvent(event *Event) bool }
type InputPluginController ¶
type InputPluginController interface { In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64 UseSpread() // don't use stream field and spread all events across all processors DisableStreams() // don't use stream field SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder IncReadOps() // inc read ops for metric IncMaxEventSizeExceeded() // inc max event size exceeded counter }
type InputPluginInfo ¶
type InputPluginInfo struct { *PluginStaticInfo *PluginRuntimeInfo }
type InputPluginParams ¶
type InputPluginParams struct { PluginDefaultParams Controller InputPluginController Logger *zap.SugaredLogger }
type LogLevel ¶ added in v0.5.10
type LogLevel int
func ParseLevelAsNumber ¶ added in v0.5.10
ParseLevelAsNumber converts log level to the int representation according to the RFC-5424.
type MatchCondition ¶
type MatchConditions ¶
type MatchConditions []MatchCondition
type OutputPlugin ¶
type OutputPlugin interface { Start(config AnyConfig, params *OutputPluginParams) Stop() Out(*Event) }
type OutputPluginController ¶
type OutputPluginInfo ¶
type OutputPluginInfo struct { *PluginStaticInfo *PluginRuntimeInfo }
type OutputPluginParams ¶
type OutputPluginParams struct { PluginDefaultParams Controller OutputPluginController Logger *zap.SugaredLogger }
type Pipeline ¶
type Pipeline struct { Name string Procs []*processor // contains filtered or unexported fields }
func New ¶
func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline
New creates new pipeline. Consider using `SetupHTTPHandlers` next.
func (*Pipeline) AddAction ¶
func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)
func (*Pipeline) DisableParallelism ¶
func (p *Pipeline) DisableParallelism()
func (*Pipeline) DisableStreams ¶
func (p *Pipeline) DisableStreams()
func (*Pipeline) EnableEventLog ¶
func (p *Pipeline) EnableEventLog()
func (*Pipeline) GetEventLogItem ¶
func (*Pipeline) GetEventsTotal ¶
func (*Pipeline) GetInput ¶
func (p *Pipeline) GetInput() InputPlugin
func (*Pipeline) GetOutput ¶
func (p *Pipeline) GetOutput() OutputPlugin
func (*Pipeline) In ¶
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64)
In decodes message and passes it to event stream.
func (*Pipeline) IncMaxEventSizeExceeded ¶ added in v0.5.11
func (p *Pipeline) IncMaxEventSizeExceeded()
func (*Pipeline) IncReadOps ¶ added in v0.5.5
func (p *Pipeline) IncReadOps()
func (*Pipeline) SetInput ¶
func (p *Pipeline) SetInput(info *InputPluginInfo)
func (*Pipeline) SetOutput ¶
func (p *Pipeline) SetOutput(info *OutputPluginInfo)
func (*Pipeline) SetupHTTPHandlers ¶
SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.
func (*Pipeline) SuggestDecoder ¶
type PluginDefaultParams ¶
type PluginFactory ¶
type PluginKind ¶
type PluginKind string
const ( PluginKindInput PluginKind = "input" PluginKindAction PluginKind = "action" PluginKindOutput PluginKind = "output" )
type PluginRuntimeInfo ¶
type PluginSelector ¶
type PluginSelector struct { CondType ConditionType CondValue string }
PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.
type PluginStaticInfo ¶
type PluginStaticInfo struct { Type string Factory PluginFactory Config AnyConfig // Endpoints is the map of endpoint name to handlerFunc. // Every plugin can provide their own API through Endpoints. Endpoints map[string]func(http.ResponseWriter, *http.Request) AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config }
type PluginsStarterData ¶
type PluginsStarterData struct { Config AnyConfig Params *OutputPluginParams }
type PluginsStarterMap ¶
type PluginsStarterMap map[string]PluginsStarterData
type RetriableBatcher ¶ added in v0.21.0
type RetriableBatcher struct {
// contains filtered or unexported fields
}
func NewRetriableBatcher ¶ added in v0.21.0
func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher
func (*RetriableBatcher) Add ¶ added in v0.21.0
func (b *RetriableBatcher) Add(event *Event)
func (*RetriableBatcher) Out ¶ added in v0.21.0
func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch)
func (*RetriableBatcher) Start ¶ added in v0.21.0
func (b *RetriableBatcher) Start(ctx context.Context)
func (*RetriableBatcher) Stop ¶ added in v0.21.0
func (b *RetriableBatcher) Stop()
type RetriableBatcherOutFn ¶ added in v0.21.0
type RetriableBatcherOutFn func(*WorkerData, *Batch) error
type Settings ¶
type Settings struct { Decoder string DecoderParams map[string]any Capacity int MaintenanceInterval time.Duration EventTimeout time.Duration AntispamThreshold int AntispamField string AntispamExceptions antispam.Exceptions AvgEventSize int MaxEventSize int StreamField string IsStrict bool MetricHoldDuration time.Duration }
type StreamName ¶
type StreamName string
type WorkerData ¶
type WorkerData any