Documentation ¶
Index ¶
- Variables
- func FindInPayload(p Payload, key string) (any, error)
- func NewDirectBroadcastSoftUnit(c Broadcast, in <-chan *Event, outsCount, bufferSize int) (unit *bcastSoftUnit, unitOuts []<-chan *Event)
- func NewDirectFusionSoftUnit(c Fusion, ins []<-chan *Event, bufferSize int) (unit *fusionSoftUnit, unitOut <-chan *Event)
- func NewDirectInputSoftUnit(i Input, f []Filter, stop <-chan struct{}, bufferSize int) (unit *inSoftUnit, unitOut <-chan *Event)
- func NewDirectOutputSoftUnit(o Output, f []Filter, in <-chan *Event, bufferSize int) (unit *outSoftUnit)
- func NewDirectProcessorSoftUnit(p Processor, f []Filter, in <-chan *Event, bufferSize int) (unit *procSoftUnit, unitOut <-chan *Event)
- type BaseCore
- type BaseFilter
- type BaseInput
- type BaseKeykeeper
- type BaseOutput
- type BaseParser
- type BaseProcessor
- type BaseSerializer
- type Broadcast
- type Errors
- type Event
- func (e *Event) AddTag(tag string)
- func (e *Event) Clone() *Event
- func (e *Event) Context() context.Context
- func (e *Event) DeleteField(key string) error
- func (e *Event) DeleteLabel(key string)
- func (e *Event) DeleteTag(tag string)
- func (e *Event) Done()
- func (e *Event) Duty() int32
- func (e *Event) GetField(key string) (any, error)
- func (e *Event) GetLabel(key string) (string, bool)
- func (e *Event) HasTag(tag string) bool
- func (e *Event) ReplaceContext(ctx context.Context)
- func (e *Event) SetField(key string, value any) error
- func (e *Event) SetHook(hook hookFunc)
- func (e *Event) SetLabel(key, value string)
- func (e *Event) StackError(err error)
- type Filter
- type Fusion
- type Initer
- type Input
- type Keykeeper
- type Output
- type Parser
- type Payload
- type Processor
- type Runner
- type Serializer
- type SetId
- type SetParser
- type SetSerializer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrInvalidPath = errors.New("invalid path") ErrNoSuchField = errors.New("no such field on the provided path") // this error should be returned in any case )
Functions ¶
func NewDirectFusionSoftUnit ¶
func NewDirectInputSoftUnit ¶
func NewDirectOutputSoftUnit ¶
Types ¶
type BaseCore ¶
type BaseFilter ¶
type BaseFilter struct { Alias string Plugin string Pipeline string Log *slog.Logger Obs metrics.ObserveFunc In <-chan *Event Rej chan<- *Event Acc chan<- *Event }
func (*BaseFilter) Observe ¶
func (b *BaseFilter) Observe(status metrics.EventStatus, dur time.Duration)
func (*BaseFilter) SetChannels ¶
func (b *BaseFilter) SetChannels(in <-chan *Event, rejected chan<- *Event, accepted chan<- *Event)
type BaseInput ¶
type BaseInput struct { Alias string Plugin string Pipeline string Log *slog.Logger Obs metrics.ObserveFunc Out chan<- *Event }
func (*BaseInput) Observe ¶
func (b *BaseInput) Observe(status metrics.EventStatus, dur time.Duration)
func (*BaseInput) SetChannels ¶
type BaseKeykeeper ¶
type BaseOutput ¶
type BaseOutput struct { Alias string Plugin string Pipeline string Log *slog.Logger Obs metrics.ObserveFunc In <-chan *Event }
func (*BaseOutput) Observe ¶
func (b *BaseOutput) Observe(status metrics.EventStatus, dur time.Duration)
func (*BaseOutput) SetChannels ¶
func (b *BaseOutput) SetChannels(in <-chan *Event)
type BaseParser ¶
type BaseParser struct { Alias string Plugin string Pipeline string Log *slog.Logger Obs metrics.ObserveFunc }
func (*BaseParser) Observe ¶
func (b *BaseParser) Observe(status metrics.EventStatus, dur time.Duration)
type BaseProcessor ¶
type BaseProcessor struct { Alias string Plugin string Pipeline string Log *slog.Logger Obs metrics.ObserveFunc In <-chan *Event Out chan<- *Event }
func (*BaseProcessor) Observe ¶
func (b *BaseProcessor) Observe(status metrics.EventStatus, dur time.Duration)
func (*BaseProcessor) SetChannels ¶
func (b *BaseProcessor) SetChannels(in <-chan *Event, out chan<- *Event)
type BaseSerializer ¶
type BaseSerializer struct { Alias string Plugin string Pipeline string Log *slog.Logger Obs metrics.ObserveFunc }
func (*BaseSerializer) Observe ¶
func (b *BaseSerializer) Observe(status metrics.EventStatus, dur time.Duration)
type Event ¶
type Event struct { Id string UUID uuid.UUID // for internal usage only Timestamp time.Time RoutingKey string Tags []string Labels map[string]string Data Payload Errors Errors // contains filtered or unexported fields }
func NewEventWithData ¶
func (*Event) DeleteField ¶
func (*Event) DeleteLabel ¶
func (*Event) ReplaceContext ¶
func (*Event) StackError ¶
type Filter ¶
type Filter interface { SetChannels(in <-chan *Event, rejected chan<- *Event, accepted chan<- *Event) io.Closer Runner Initer }
filter plugin sorts events by conditions
type Payload ¶ added in v0.2.0
type Payload any
map[string]any or []any
func ClonePayload ¶ added in v0.2.0
func DeleteFromPayload ¶ added in v0.2.0
type Processor ¶
type Processor interface { SetChannels(in <-chan *Event, out chan<- *Event) io.Closer Runner Initer }
processor plugin transforms events
type Serializer ¶
serializer plugin serializes events into configured format
type SetId ¶
type SetId interface {
SetId(id uint64)
}
plugins that need unique id must implement this interface id is unique for each plugin, but it's same for one processor in multiple lines id is randomly generated at application startup
type SetParser ¶
type SetParser interface {
SetParser(p Parser)
}
plugins that need parsers must implement this interface
type SetSerializer ¶
type SetSerializer interface {
SetSerializer(s Serializer)
}
plugins that need serializers must implement this interface
Click to show internal directories.
Click to hide internal directories.