event

package
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: MIT, MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Any

func Any(fns ...func(ev Event) bool) func(ev Event) bool

Any as helper function combines different event conditions into a single function

func Is

func Is[T Event](ev Event) bool

Is as helper function is syntax-sugar to do an Event type check as a boolean function

Types

type AnnotatedEvent

type AnnotatedEvent struct {
	Event       Event
	EmitContext uint64 // uniquely identifies the emission of the event, useful for debugging and creating diagrams
}

type AttachEmitter

type AttachEmitter interface {
	AttachEmitter(em Emitter)
}

type CriticalErrorEvent added in v1.9.3

type CriticalErrorEvent struct {
	Err error
}

func (CriticalErrorEvent) String added in v1.9.3

func (ev CriticalErrorEvent) String() string

type DebugDeriver

type DebugDeriver struct {
	Log log.Logger
}

func (DebugDeriver) OnEvent

func (d DebugDeriver) OnEvent(ev Event)

type Deriver

type Deriver interface {
	OnEvent(ev Event) bool
}

type DeriverFunc

type DeriverFunc func(ev Event) bool

DeriverFunc implements the Deriver interface as a function, similar to how the std-lib http HandlerFunc implements a Handler. This can be used for small in-place derivers, test helpers, etc.

func (DeriverFunc) OnEvent

func (fn DeriverFunc) OnEvent(ev Event) bool

type DeriverMux

type DeriverMux []Deriver

DeriverMux takes an event-signal as deriver, and synchronously fans it out to all contained Deriver ends. Technically this is a DeMux: single input to multi output.

func (*DeriverMux) OnEvent

func (s *DeriverMux) OnEvent(ev Event) bool

type Drainer

type Drainer interface {
	// Drain processes all events.
	Drain() error
	// DrainUntil processes all events until a condition is hit.
	// If excl, the event that matches the condition is not processed yet.
	// If not excl, the event that matches is processed.
	DrainUntil(fn func(ev Event) bool, excl bool) error
}

type Emitter

type Emitter interface {
	Emit(ev Event)
}

type EmitterDrainer

type EmitterDrainer interface {
	Emitter
	Drainer
}

type EmitterFunc

type EmitterFunc func(ev Event)

func (EmitterFunc) Emit

func (fn EmitterFunc) Emit(ev Event)

type EmitterOpts

type EmitterOpts struct {
	Limiting  bool
	Rate      rate.Limit
	Burst     int
	OnLimited func()
}

type Event

type Event interface {
	// String returns the name of the event.
	// The name must be simple and identify the event type, not the event content.
	// This name is used for metric-labeling.
	String() string
}

type Executable

type Executable interface {
	RunEvent(ev AnnotatedEvent)
}

type ExecutableFunc

type ExecutableFunc func(ev AnnotatedEvent)

ExecutableFunc implements the Executable interface as a function, similar to how the std-lib http HandlerFunc implements a Handler. This can be used for small in-place executables, test helpers, etc.

func (ExecutableFunc) RunEvent

func (fn ExecutableFunc) RunEvent(ev AnnotatedEvent)

type Executor

type Executor interface {
	Add(d Executable, opts *ExecutorOpts) (leaveExecutor func())
	Enqueue(ev AnnotatedEvent) error
}

type ExecutorOpts

type ExecutorOpts struct {
	Capacity int // If there is a local buffer capacity
}

type GlobalSyncExec

type GlobalSyncExec struct {
	// contains filtered or unexported fields
}

func NewGlobalSynchronous

func NewGlobalSynchronous(ctx context.Context) *GlobalSyncExec

func (*GlobalSyncExec) Add

func (gs *GlobalSyncExec) Add(d Executable, _ *ExecutorOpts) (leaveExecutor func())

func (*GlobalSyncExec) Drain

func (gs *GlobalSyncExec) Drain() error

func (*GlobalSyncExec) DrainUntil

func (gs *GlobalSyncExec) DrainUntil(fn func(ev Event) bool, excl bool) error

func (*GlobalSyncExec) Enqueue

func (gs *GlobalSyncExec) Enqueue(ev AnnotatedEvent) error

type Limiter

type Limiter[E Emitter] struct {
	// contains filtered or unexported fields
}

func NewLimiter

func NewLimiter[E Emitter](ctx context.Context, em E, eventRate rate.Limit, eventBurst int, onLimited func()) *Limiter[E]

NewLimiter returns an event rate-limiter. This can be used to prevent event loops from (accidentally) running too hot. The eventRate is the number of events per second. The eventBurst is the margin of events to eat into until the rate-limit kicks in. The onLimited function is optional, and will be called if an emitted event is getting rate-limited

func (*Limiter[E]) Emit

func (l *Limiter[E]) Emit(ev Event)

Emit is thread-safe, multiple parallel derivers can safely emit events to it.

type LimiterDrainer

type LimiterDrainer Limiter[EmitterDrainer]

LimiterDrainer is a variant of Limiter that supports event draining.

func NewLimiterDrainer

func NewLimiterDrainer(ctx context.Context, em EmitterDrainer, eventRate rate.Limit, eventBurst int, onLimited func()) *LimiterDrainer

func (*LimiterDrainer) Drain

func (l *LimiterDrainer) Drain() error

func (*LimiterDrainer) DrainUntil

func (l *LimiterDrainer) DrainUntil(fn func(ev Event) bool, excl bool) error

func (*LimiterDrainer) Emit

func (l *LimiterDrainer) Emit(ev Event)

type LogTracer

type LogTracer struct {
	// contains filtered or unexported fields
}

func NewLogTracer

func NewLogTracer(log log.Logger, lvl slog.Level) *LogTracer

func (*LogTracer) OnDeriveEnd

func (lt *LogTracer) OnDeriveEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool)

func (*LogTracer) OnDeriveStart

func (lt *LogTracer) OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time)

func (*LogTracer) OnEmit

func (lt *LogTracer) OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time)

func (*LogTracer) OnRateLimited

func (lt *LogTracer) OnRateLimited(name string, derivContext uint64)

type Metrics

type Metrics interface {
	RecordEmittedEvent(eventName string, emitter string)
	RecordProcessedEvent(eventName string, deriver string, duration time.Duration)
	RecordEventsRateLimited()
}

type MetricsTracer

type MetricsTracer struct {
	// contains filtered or unexported fields
}

func NewMetricsTracer

func NewMetricsTracer(m Metrics) *MetricsTracer

func (*MetricsTracer) OnDeriveEnd

func (mt *MetricsTracer) OnDeriveEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool)

func (*MetricsTracer) OnDeriveStart

func (mt *MetricsTracer) OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time)

func (*MetricsTracer) OnEmit

func (mt *MetricsTracer) OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time)

func (*MetricsTracer) OnRateLimited

func (mt *MetricsTracer) OnRateLimited(name string, derivContext uint64)

type NoopDeriver

type NoopDeriver struct{}

func (NoopDeriver) OnEvent

func (d NoopDeriver) OnEvent(ev Event)

type NoopEmitter

type NoopEmitter struct{}

func (NoopEmitter) Emit

func (e NoopEmitter) Emit(ev Event)

type NoopMetrics

type NoopMetrics struct {
}

func (NoopMetrics) RecordEmittedEvent

func (n NoopMetrics) RecordEmittedEvent(eventName string, emitter string)

func (NoopMetrics) RecordEventsRateLimited

func (n NoopMetrics) RecordEventsRateLimited()

func (NoopMetrics) RecordProcessedEvent

func (n NoopMetrics) RecordProcessedEvent(eventName string, deriver string, duration time.Duration)

type RegisterOpts

type RegisterOpts struct {
	Executor ExecutorOpts
	Emitter  EmitterOpts
}

RegisterOpts represents the set of parameters to configure a new deriver/emitter with that is registered with an event System. These options may be reused for multiple registrations.

func DefaultRegisterOpts

func DefaultRegisterOpts() *RegisterOpts

type Registry added in v1.9.3

type Registry interface {
	// Register registers a named event-emitter, optionally processing events itself:
	// deriver may be nil, not all registrants have to process events.
	// A non-nil deriver may implement AttachEmitter to automatically attach the Emitter to it,
	// before the deriver itself becomes executable.
	Register(name string, deriver Deriver, opts *RegisterOpts) Emitter
	// Unregister removes a named emitter,
	// also removing it from the set of events-receiving derivers (if registered with non-nil deriver).
	Unregister(name string) (old Emitter)
}

type SequenceTracer

type SequenceTracer struct {
	StructTracer
}

func NewSequenceTracer

func NewSequenceTracer() *SequenceTracer

func (*SequenceTracer) Output

func (st *SequenceTracer) Output(showDurations bool) string

type StructTracer

type StructTracer struct {
	Entries []TraceEntry
	// contains filtered or unexported fields
}

func NewStructTracer

func NewStructTracer() *StructTracer

func (*StructTracer) OnDeriveEnd

func (st *StructTracer) OnDeriveEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool)

func (*StructTracer) OnDeriveStart

func (st *StructTracer) OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time)

func (*StructTracer) OnEmit

func (st *StructTracer) OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time)

func (*StructTracer) OnRateLimited

func (st *StructTracer) OnRateLimited(name string, derivContext uint64)

type Sys

type Sys struct {
	// contains filtered or unexported fields
}

Sys is the canonical implementation of System.

func NewSystem

func NewSystem(log log.Logger, ex Executor) *Sys

func (*Sys) AddTracer

func (s *Sys) AddTracer(t Tracer)

func (*Sys) Register

func (s *Sys) Register(name string, deriver Deriver, opts *RegisterOpts) Emitter

func (*Sys) RemoveTracer

func (s *Sys) RemoveTracer(t Tracer)

func (*Sys) Stop

func (s *Sys) Stop()

Stop shuts down the system by unregistering all emitters/derivers, freeing up executor resources.

func (*Sys) Unregister

func (s *Sys) Unregister(name string) (previous Emitter)

type System

type System interface {
	Registry
	// AddTracer registers a tracer to capture all event deriver/emitter work. It runs until RemoveTracer is called.
	// Duplicate tracers are allowed.
	AddTracer(t Tracer)
	// RemoveTracer removes a tracer. This is a no-op if the tracer was not previously added.
	// It will remove all added duplicates of the tracer.
	RemoveTracer(t Tracer)
	// Stop shuts down the System by un-registering all derivers/emitters.
	Stop()
}

type TimingTracer

type TimingTracer struct {
	StructTracer
}

TimingTracer generates an HTML output with an SVG that shows, per deriver, per event-type, bands for event-execution scaled by the execution time. This trace gives an idea of patterns between events and where execution-time is spent.

func NewTimingTracer

func NewTimingTracer() *TimingTracer

func (*TimingTracer) Output

func (st *TimingTracer) Output() string

type TraceEntry

type TraceEntry struct {
	Kind TraceEntryKind

	Name         string
	DerivContext uint64

	// Not present if Kind == TraceRateLimited
	EmitContext uint64
	// Not present if Kind == TraceRateLimited
	EventName string

	// Set to deriver start-time if derive-start/end, or emit-time if emitted. Not set if Kind == TraceRateLimited
	EventTime time.Time

	// Only present if Kind == TraceDeriveEnd
	DeriveEnd struct {
		Duration time.Duration
		Effect   bool
	}
}

type TraceEntryKind

type TraceEntryKind int
const (
	TraceDeriveStart TraceEntryKind = iota
	TraceDeriveEnd
	TraceRateLimited
	TraceEmit
)

type Tracer

type Tracer interface {
	OnDeriveStart(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time)
	OnDeriveEnd(name string, ev AnnotatedEvent, derivContext uint64, startTime time.Time, duration time.Duration, effect bool)
	OnRateLimited(name string, derivContext uint64)
	OnEmit(name string, ev AnnotatedEvent, derivContext uint64, emitTime time.Time)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL