Documentation ¶
Index ¶
- Constants
- func ByteToStringUnsafe(b []byte) string
- func CloneString(s string) string
- func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node
- func NewProcessor(metricsHolder *metricsHolder, activeCounter *atomic.Int32, output OutputPlugin, ...) *processor
- func ParseFormatName(formatName string) (string, error)
- func ParseLevelAsString(level string) string
- func StringToByteUnsafe(s string) []byte
- type ActionPlugin
- type ActionPluginController
- type ActionPluginInfo
- type ActionPluginParams
- type ActionPluginStaticInfo
- type ActionResult
- type AnyConfig
- type AnyPlugin
- type Batch
- type Batcher
- type BatcherMaintenanceFn
- type BatcherOutFn
- type ConditionType
- type DeltaWrapper
- type Event
- func (e *Event) Encode(outBuf []byte) ([]byte, int)
- func (e *Event) IsIgnoreKind() bool
- func (e *Event) IsRegularKind() bool
- func (e *Event) IsTimeoutKind() bool
- func (e *Event) IsUnlockKind() bool
- func (e *Event) SetIgnoreKind()
- func (e *Event) SetTimeoutKind()
- func (e *Event) SetUnlockKind()
- func (e *Event) StreamNameBytes() []byte
- func (e *Event) String() string
- func (e *Event) SubparseJSON(json []byte) (*insaneJSON.Node, error)
- type InputPlugin
- type InputPluginController
- type InputPluginInfo
- type InputPluginParams
- type LogLevel
- type MatchCondition
- type MatchConditions
- type MatchMode
- type OutputPlugin
- type OutputPluginController
- type OutputPluginInfo
- type OutputPluginParams
- type Pipeline
- func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)
- func (p *Pipeline) Commit(event *Event)
- func (p *Pipeline) DisableParallelism()
- func (p *Pipeline) DisableStreams()
- func (p *Pipeline) EnableEventLog()
- func (p *Pipeline) Error(err string)
- func (p *Pipeline) GetEventLogItem(index int) string
- func (p *Pipeline) GetEventsTotal() int
- func (p *Pipeline) GetInput() InputPlugin
- func (p *Pipeline) GetOutput() OutputPlugin
- func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, ...) (seqID uint64)
- func (p *Pipeline) IncMaxEventSizeExceeded()
- func (p *Pipeline) IncReadOps()
- func (p *Pipeline) SetInput(info *InputPluginInfo)
- func (p *Pipeline) SetOutput(info *OutputPluginInfo)
- func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux)
- func (p *Pipeline) Start()
- func (p *Pipeline) Stop()
- func (p *Pipeline) SuggestDecoder(t decoder.DecoderType)
- func (p *Pipeline) UseSpread()
- type PluginDefaultParams
- type PluginFactory
- type PluginKind
- type PluginRuntimeInfo
- type PluginSelector
- type PluginStaticInfo
- type PluginsStarterData
- type PluginsStarterMap
- type Settings
- type SourceID
- type StreamName
- type WorkerData
Constants ¶
const ( DefaultStreamField = "stream" DefaultCapacity = 1024 DefaultAvgInputEventSize = 4 * 1024 DefaultMaxInputEventSize = 0 DefaultJSONNodePoolSize = 1024 DefaultMaintenanceInterval = time.Second * 5 DefaultEventTimeout = time.Second * 30 DefaultFieldValue = "not_set" DefaultStreamName = StreamName("not_set") EventSeqIDError = uint64(0) )
const LevelUnknownStr = ""
const PromNamespace = "file_d"
Variables ¶
This section is empty.
Functions ¶
func ByteToStringUnsafe ¶
ByteToStringUnsafe converts byte slice to string without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify provided byte slice)
func CreateNestedField ¶ added in v0.5.10
func CreateNestedField(root *insaneJSON.Root, path []string) *insaneJSON.Node
CreateNestedField creates nested field by the path. For example, []string{"path.to", "object"} creates: { "path.to": {"object": {} } Warn: it overrides fields if it contains non-object type on the path. For example: in: { "path.to": [{"userId":"12345"}] }, out: { "path.to": {"object": {}} }
func NewProcessor ¶
func NewProcessor( metricsHolder *metricsHolder, activeCounter *atomic.Int32, output OutputPlugin, streamer *streamer, finalizeFn finalizeFn, ) *processor
func ParseFormatName ¶
func ParseLevelAsString ¶ added in v0.5.10
ParseLevelAsString converts log level to the string representation according to the RFC-5424.
func StringToByteUnsafe ¶
StringToByteUnsafe converts string to byte slice without memory copy This creates mutable string, thus unsafe method, should be used with caution (never modify resulting byte slice)
Types ¶
type ActionPlugin ¶
type ActionPlugin interface { Start(config AnyConfig, params *ActionPluginParams) Stop() Do(*Event) ActionResult }
type ActionPluginController ¶
type ActionPluginInfo ¶
type ActionPluginInfo struct { *ActionPluginStaticInfo *PluginRuntimeInfo }
type ActionPluginParams ¶
type ActionPluginParams struct { *PluginDefaultParams Controller ActionPluginController Logger *zap.SugaredLogger }
type ActionPluginStaticInfo ¶
type ActionPluginStaticInfo struct { *PluginStaticInfo MetricName string MetricLabels []string MatchConditions MatchConditions MatchMode MatchMode MatchInvert bool }
type ActionResult ¶
type ActionResult int
const ( // ActionPass pass event to the next action in a pipeline ActionPass ActionResult = 0 // ActionCollapse skip further processing of event and request next event from the same stream and source as current // plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream ActionCollapse ActionResult = 2 // ActionDiscard skip further processing of event and request next event from any stream and source ActionDiscard ActionResult = 1 // ActionHold hold event in a plugin and request next event from the same stream and source as current. // same as ActionCollapse but held event should be manually committed or returned into pipeline. // check out Commit()/Propagate() functions in InputPluginController. // plugin may receive event with eventKindTimeout if it takes to long to read next event from same stream. ActionHold ActionResult = 3 )
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
func NewBatcher ¶
func NewBatcher( pipelineName string, outputType string, outFn BatcherOutFn, maintenanceFn BatcherMaintenanceFn, controller OutputPluginController, workers int, batchSize int, flushTimeout time.Duration, maintenanceInterval time.Duration, ) *Batcher
type BatcherMaintenanceFn ¶
type BatcherMaintenanceFn func(*WorkerData)
type BatcherOutFn ¶
type BatcherOutFn func(*WorkerData, *Batch)
type ConditionType ¶
type ConditionType int
const ( // UnknownSelector value is default, therefore it's safer to use it as default unknown value. UnknownSelector ConditionType = iota ByNameSelector )
type DeltaWrapper ¶ added in v0.5.5
type DeltaWrapper struct {
// contains filtered or unexported fields
}
DeltaWrapper acts as a wrapper around int64 and returns the difference between the new and the old value when a new value is set. This is useful for metrics (see the maintenance function).
type Event ¶
type Event struct { Root *insaneJSON.Root Buf []byte SeqID uint64 Offset int64 SourceID SourceID SourceName string Size int // last known event size, it may not be actual // contains filtered or unexported fields }
func (*Event) IsIgnoreKind ¶
func (*Event) IsRegularKind ¶
func (*Event) IsTimeoutKind ¶
func (*Event) IsUnlockKind ¶
func (*Event) SetIgnoreKind ¶
func (e *Event) SetIgnoreKind()
func (*Event) SetTimeoutKind ¶
func (e *Event) SetTimeoutKind()
func (*Event) SetUnlockKind ¶
func (e *Event) SetUnlockKind()
func (*Event) StreamNameBytes ¶
func (*Event) SubparseJSON ¶
func (e *Event) SubparseJSON(json []byte) (*insaneJSON.Node, error)
type InputPlugin ¶
type InputPlugin interface { Start(config AnyConfig, params *InputPluginParams) Stop() Commit(*Event) }
type InputPluginController ¶
type InputPluginController interface { In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool) uint64 UseSpread() // don't use stream field and spread all events across all processors DisableStreams() // don't use stream field SuggestDecoder(t decoder.DecoderType) // set decoder if pipeline uses "auto" value for decoder IncReadOps() // inc read ops for stats IncMaxEventSizeExceeded() // inc max event size exceeded counter }
type InputPluginInfo ¶
type InputPluginInfo struct { *PluginStaticInfo *PluginRuntimeInfo }
type InputPluginParams ¶
type InputPluginParams struct { *PluginDefaultParams Controller InputPluginController Logger *zap.SugaredLogger }
type LogLevel ¶ added in v0.5.10
type LogLevel int
func ParseLevelAsNumber ¶ added in v0.5.10
ParseLevelAsNumber converts log level to the int representation according to the RFC-5424.
type MatchCondition ¶
type MatchConditions ¶
type MatchConditions []MatchCondition
type OutputPlugin ¶
type OutputPlugin interface { Start(config AnyConfig, params *OutputPluginParams) Stop() Out(*Event) }
type OutputPluginController ¶
type OutputPluginInfo ¶
type OutputPluginInfo struct { *PluginStaticInfo *PluginRuntimeInfo }
type OutputPluginParams ¶
type OutputPluginParams struct { *PluginDefaultParams Controller OutputPluginController Logger *zap.SugaredLogger }
type Pipeline ¶
type Pipeline struct { Name string Procs []*processor // contains filtered or unexported fields }
func New ¶
func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline
New creates new pipeline. Consider using `SetupHTTPHandlers` next.
func (*Pipeline) AddAction ¶
func (p *Pipeline) AddAction(info *ActionPluginStaticInfo)
func (*Pipeline) DisableParallelism ¶
func (p *Pipeline) DisableParallelism()
func (*Pipeline) DisableStreams ¶
func (p *Pipeline) DisableStreams()
func (*Pipeline) EnableEventLog ¶
func (p *Pipeline) EnableEventLog()
func (*Pipeline) GetEventLogItem ¶
func (*Pipeline) GetEventsTotal ¶
func (*Pipeline) GetInput ¶
func (p *Pipeline) GetInput() InputPlugin
func (*Pipeline) GetOutput ¶
func (p *Pipeline) GetOutput() OutputPlugin
func (*Pipeline) In ¶
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool) (seqID uint64)
In decodes message and passes it to event stream.
func (*Pipeline) IncMaxEventSizeExceeded ¶ added in v0.5.11
func (p *Pipeline) IncMaxEventSizeExceeded()
func (*Pipeline) IncReadOps ¶ added in v0.5.5
func (p *Pipeline) IncReadOps()
func (*Pipeline) SetInput ¶
func (p *Pipeline) SetInput(info *InputPluginInfo)
func (*Pipeline) SetOutput ¶
func (p *Pipeline) SetOutput(info *OutputPluginInfo)
func (*Pipeline) SetupHTTPHandlers ¶
SetupHTTPHandlers creates handlers for plugin endpoints and pipeline info. Plugin endpoints can be accessed via URL `/pipelines/<pipeline_name>/<plugin_index_in_config>/<plugin_endpoint>`. Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`.
func (*Pipeline) SuggestDecoder ¶
func (p *Pipeline) SuggestDecoder(t decoder.DecoderType)
type PluginDefaultParams ¶
type PluginFactory ¶
type PluginKind ¶
type PluginKind string
const ( PluginKindInput PluginKind = "input" PluginKindAction PluginKind = "action" PluginKindOutput PluginKind = "output" )
type PluginRuntimeInfo ¶
type PluginSelector ¶
type PluginSelector struct { CondType ConditionType CondValue string }
PluginSelector the only valid value for now is ByNameSelector and only value is string type. It can be expanded with a custom type in the future.
type PluginStaticInfo ¶
type PluginStaticInfo struct { Type string Factory PluginFactory Config AnyConfig // Endpoints is the map of endpoint name to handlerFunc. // Every plugin can provide their own API through Endpoints. Endpoints map[string]func(http.ResponseWriter, *http.Request) AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config }
type PluginsStarterData ¶
type PluginsStarterData struct { Config AnyConfig Params *OutputPluginParams }
type PluginsStarterMap ¶
type PluginsStarterMap map[string]PluginsStarterData
type StreamName ¶
type StreamName string
type WorkerData ¶
type WorkerData interface{}