core

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidPath = errors.New("invalid path")
	ErrNoSuchField = errors.New("no such field on the provided path") // this error should be returned in any case
)

Functions

func FindInPayload added in v0.2.0

func FindInPayload(p Payload, key string) (any, error)

func NewDirectBroadcastSoftUnit

func NewDirectBroadcastSoftUnit(c Broadcast, in <-chan *Event, outsCount, bufferSize int) (unit *bcastSoftUnit, unitOuts []<-chan *Event)

func NewDirectFusionSoftUnit

func NewDirectFusionSoftUnit(c Fusion, ins []<-chan *Event, bufferSize int) (unit *fusionSoftUnit, unitOut <-chan *Event)

func NewDirectInputSoftUnit

func NewDirectInputSoftUnit(i Input, f []Filter, stop <-chan struct{}, bufferSize int) (unit *inSoftUnit, unitOut <-chan *Event)

func NewDirectOutputSoftUnit

func NewDirectOutputSoftUnit(o Output, f []Filter, in <-chan *Event, bufferSize int) (unit *outSoftUnit)

func NewDirectProcessorSoftUnit

func NewDirectProcessorSoftUnit(p Processor, f []Filter, in <-chan *Event, bufferSize int) (unit *procSoftUnit, unitOut <-chan *Event)

Types

type BaseCore

type BaseCore struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
	Obs metrics.ObserveFunc
}

func (*BaseCore) Observe

func (b *BaseCore) Observe(status metrics.EventStatus, dur time.Duration)

type BaseFilter

type BaseFilter struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
	Obs metrics.ObserveFunc
	In  <-chan *Event
	Rej chan<- *Event
	Acc chan<- *Event
}

func (*BaseFilter) Observe

func (b *BaseFilter) Observe(status metrics.EventStatus, dur time.Duration)

func (*BaseFilter) SetChannels

func (b *BaseFilter) SetChannels(in <-chan *Event, rejected chan<- *Event, accepted chan<- *Event)

type BaseInput

type BaseInput struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
	Obs metrics.ObserveFunc
	Out chan<- *Event
}

func (*BaseInput) Observe

func (b *BaseInput) Observe(status metrics.EventStatus, dur time.Duration)

func (*BaseInput) SetChannels

func (b *BaseInput) SetChannels(out chan<- *Event)

type BaseKeykeeper

type BaseKeykeeper struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
}

type BaseOutput

type BaseOutput struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
	Obs metrics.ObserveFunc
	In  <-chan *Event
}

func (*BaseOutput) Observe

func (b *BaseOutput) Observe(status metrics.EventStatus, dur time.Duration)

func (*BaseOutput) SetChannels

func (b *BaseOutput) SetChannels(in <-chan *Event)

type BaseParser

type BaseParser struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
	Obs metrics.ObserveFunc
}

func (*BaseParser) Observe

func (b *BaseParser) Observe(status metrics.EventStatus, dur time.Duration)

type BaseProcessor

type BaseProcessor struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
	Obs metrics.ObserveFunc
	In  <-chan *Event
	Out chan<- *Event
}

func (*BaseProcessor) Observe

func (b *BaseProcessor) Observe(status metrics.EventStatus, dur time.Duration)

func (*BaseProcessor) SetChannels

func (b *BaseProcessor) SetChannels(in <-chan *Event, out chan<- *Event)

type BaseSerializer

type BaseSerializer struct {
	Alias    string
	Plugin   string
	Pipeline string

	Log *slog.Logger
	Obs metrics.ObserveFunc
}

func (*BaseSerializer) Observe

func (b *BaseSerializer) Observe(status metrics.EventStatus, dur time.Duration)

type Broadcast

type Broadcast interface {
	SetChannels(in <-chan *Event, outs []chan<- *Event)
	Runner
}

type Errors

type Errors []error

func (Errors) MarshalJSON

func (me Errors) MarshalJSON() ([]byte, error)

func (Errors) Slice

func (me Errors) Slice() []string

type Event

type Event struct {
	Id         string
	UUID       uuid.UUID // for internal usage only
	Timestamp  time.Time
	RoutingKey string
	Tags       []string
	Labels     map[string]string
	Data       Payload
	Errors     Errors
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent(routingKey string) *Event

func NewEventWithData

func NewEventWithData(routingKey string, data Payload) *Event

func (*Event) AddTag

func (e *Event) AddTag(tag string)

func (*Event) Clone

func (e *Event) Clone() *Event

func (*Event) Context

func (e *Event) Context() context.Context

func (*Event) DeleteField

func (e *Event) DeleteField(key string) error

func (*Event) DeleteLabel

func (e *Event) DeleteLabel(key string)

func (*Event) DeleteTag

func (e *Event) DeleteTag(tag string)

func (*Event) Done

func (e *Event) Done()

func (*Event) Duty

func (e *Event) Duty() int32

func (*Event) GetField

func (e *Event) GetField(key string) (any, error)

func (*Event) GetLabel

func (e *Event) GetLabel(key string) (string, bool)

func (*Event) HasTag

func (e *Event) HasTag(tag string) bool

func (*Event) ReplaceContext

func (e *Event) ReplaceContext(ctx context.Context)

func (*Event) SetField

func (e *Event) SetField(key string, value any) error

func (*Event) SetHook

func (e *Event) SetHook(hook hookFunc)

func (*Event) SetLabel

func (e *Event) SetLabel(key, value string)

func (*Event) StackError

func (e *Event) StackError(err error)

type Filter

type Filter interface {
	SetChannels(in <-chan *Event, rejected chan<- *Event, accepted chan<- *Event)
	io.Closer
	Runner
	Initer
}

filter plugin sorts events by conditions

type Fusion

type Fusion interface {
	SetChannels(ins []<-chan *Event, out chan<- *Event)
	Runner
}

core plugins used in core units only

type Initer

type Initer interface {
	Init() error
}

type Input

type Input interface {
	SetChannels(out chan<- *Event)
	io.Closer
	Runner
	Initer
}

input plugin consumes events from outer world

type Keykeeper

type Keykeeper interface {
	Get(key string) (any, error)
	io.Closer
	Initer
}

type Output

type Output interface {
	SetChannels(in <-chan *Event)
	io.Closer
	Runner
	Initer
}

output plugin produces events to outer world

type Parser

type Parser interface {
	Parse(data []byte, routingKey string) ([]*Event, error)
	io.Closer
	Initer
}

parser plugin parses raw format data into events

type Payload added in v0.2.0

type Payload any

map[string]any or []any

func ClonePayload added in v0.2.0

func ClonePayload(p Payload) Payload

func DeleteFromPayload added in v0.2.0

func DeleteFromPayload(p Payload, key string) (Payload, error)

func PutInPayload added in v0.2.0

func PutInPayload(p Payload, key string, val any) (Payload, error)

type Processor

type Processor interface {
	SetChannels(in <-chan *Event, out chan<- *Event)
	io.Closer
	Runner
	Initer
}

processor plugin transforms events

type Runner

type Runner interface {
	Run()
}

type Serializer

type Serializer interface {
	Serialize(event ...*Event) ([]byte, error)
	io.Closer
	Initer
}

serializer plugin serializes events into configured format

type SetId

type SetId interface {
	SetId(id uint64)
}

plugins that need unique id must implement this interface id is unique for each plugin, but it's same for one processor in multiple lines id is randomly generated at application startup

type SetParser

type SetParser interface {
	SetParser(p Parser)
}

plugins that need parsers must implement this interface

type SetSerializer

type SetSerializer interface {
	SetSerializer(s Serializer)
}

plugins that need serializers must implement this interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL