processor

package
v0.0.0-...-1f7aab1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultKafkaMessageBufferCap = 128
)

Variables

This section is empty.

Functions

func AnyFilterMatches

func AnyFilterMatches(filters []Filter, event *kube.EnhancedEvent) bool

AnyFilterMatches returns true if any of the provided filters matches the event.

func RelevantFieldSelection

func RelevantFieldSelection(event *kube.EnhancedEvent) map[string]string

RelevantFieldSelection returns a map of key-value pairs containing the relevant fields from the event.

Types

type FieldSelectionFallback

type FieldSelectionFallback func(event *kube.EnhancedEvent, err error) map[string]string

type Filter

type Filter struct {
	// Kind matches the kind of the involved object.
	Kind string `yaml:"kind"`
	// Namespace matches the namespace of the event.
	Namespace string `yaml:"namespace"`
	// Reason matches the reason of the event.
	Reason string `yaml:"reason"`
	// Message matches the human-readable message of the event.
	Message string `yaml:"message"`
	// Type matches the type of the event.
	Type string `yaml:"type"`
	// Component matches the component of the event source.
	Component string `yaml:"component"`
}

Filter is used to filter events based on their attributes. All fields in the filter are optional and are treated as regular expressions. If a field is empty, it is ignored.

func (*Filter) MatchEvent

func (f *Filter) MatchEvent(event *kube.EnhancedEvent) bool

MatchEvent checks if the provided event matches the filter. This means that the event must match all the non-empty fields of the filter.

func (*Filter) Validate

func (f *Filter) Validate() error

Validate checks whether all non-empty fields of the filter are valid regular expressions.

type KafkaMessageBuffer

type KafkaMessageBuffer = circular.RingBuffer[*kafka.Message]

func NewKafkaMessageBuffer

func NewKafkaMessageBuffer(capacity int) *KafkaMessageBuffer

type Option

type Option func(*Processor)

func WithFilters

func WithFilters(filters []Filter) Option

WithFilters sets the filters to be applied to the events before processing.

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger sets the logger to be used by the processor.

func WithSelectors

func WithSelectors(selectors []Selector) Option

WithSelectors sets the selectors used to customize the final event payload.

func WriteTo

func WriteTo(output *KafkaMessageBuffer) Option

WriteTo sets the buffer where the processor writes processed events as ready to be sent kafka.Message.

type PayloadCustomizer

type PayloadCustomizer struct {
	Selectors []Selector
	// OnSelectionError is a function that is called when an error occurs during the
	// selection of a field. As some fields in kube.EnhancedEvent are pointers, it is
	// possible that the field is nil, which would cause an error during the selection.
	// This function returns a map of key-value pairs that represents the final event payload.
	OnSelectionError FieldSelectionFallback
}

PayloadCustomizer is used to customize the final event payload by selecting specific fields from the event and storing them under a specific key in the payload struct.

func (*PayloadCustomizer) Customize

func (pc *PayloadCustomizer) Customize(event *kube.EnhancedEvent) map[string]string

Customize selects the fields from the event based on the selectors and return a map of key-value pairs that represents the final event payload.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func New

func New(source *watcher.EventBuffer, opts ...Option) *Processor

func (*Processor) GetBuffer

func (p *Processor) GetBuffer() *KafkaMessageBuffer

GetBuffer returns the buffer where the processor writes processed events as ready to be sent kafka.Message.

func (*Processor) Process

func (p *Processor) Process(ctx context.Context)

Process reads events from the source buffer, applies the filters, customizes the event payload and writes it to the output buffer as ready to be sent kafka.Message.

type Selector

type Selector struct {
	Key   string `yaml:"key"`
	Value string `yaml:"value"`
}

Selector is used to select a specific field from the event. The key is the name under which the value is stored in the payload struct, whereas the value is a template string that is used to extract the desired field.

func (*Selector) Validate

func (s *Selector) Validate() error

Validate checks whether the selector is valid. This means that key must not be empty and the value must be a valid template string.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL