engine

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrHookRetryableError   = errors.New("Pre/PostTransfromHookFunc reported retryable error")
	ErrHookUnretryableError = errors.New("Pre/PostTransfromHookFunc reported unretryable error")
	ErrHookInvalidAction    = errors.New("Pre/PostTransfromHookFunc returned invalid action value")
)

Functions

This section is empty.

Types

type AdminEventHandler

type AdminEventHandler struct {
	// contains filtered or unexported fields
}

AdminEventHandler implements the igeist.Loader interface which will be called in the stream where the loader entity is set to 'Admin'. This is the sink in the stream listening for admin events.

func (*AdminEventHandler) Shutdown

func (a *AdminEventHandler) Shutdown(ctx context.Context)

func (*AdminEventHandler) StreamLoad

func (a *AdminEventHandler) StreamLoad(ctx context.Context, data []*entity.Transformed) (string, error, bool)

type Config

type Config struct {
	RegSpec                   *entity.Spec                 // Built-in spec for GEIST spec registrations
	AdminSpec                 *entity.Spec                 // Built-in spec for GEIST admin event notifications
	PreTransformHookFunc      entity.PreTransformHookFunc  `json:"-"`
	PostTransformHookFunc     entity.PostTransformHookFunc `json:"-"`
	MaxStreamRetryIntervalSec int
	NotifyChan                entity.NotifyChan `json:"-"`
	Log                       bool
	EventLogInterval          int
}

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Stream Executors operates an ETL stream, from Source to Transform to Sink, as specified by a single GEIST stream spec. The stream it is executing is configured and instantiated by the Supervisor.

func NewExecutor

func NewExecutor(config Config, stream igeist.Stream) *Executor

func (*Executor) Metrics added in v0.4.0

func (e *Executor) Metrics() entity.Metrics

func (*Executor) ProcessEvent

func (e *Executor) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult

ProcessEvent is called by Extractor when event extracted from source. This design is chosen instead of a channel based one, to ensure efficient and reliable offset commit/pubsub ack only when sink success is ensured. It also reduces transloading latency to a minimum. TODO: Add better description and usage of the key parameter (it is currently sent by Kafka Extractors, as the message key). If event processing is successful result.Error will be nil, and result.Status will be set to ExecutorStatusSuccessful. If executor is shutting down, result.Error will be non-nil and result.Status will be set to ExecutorStatusShuttingDown

func (*Executor) Run

func (e *Executor) Run(ctx context.Context, wg *sync.WaitGroup)

func (*Executor) Shutdown

func (e *Executor) Shutdown(ctx context.Context)

func (*Executor) Stream

func (e *Executor) Stream() igeist.Stream

func (*Executor) StreamId

func (e *Executor) StreamId() string

type ExecutorMap

type ExecutorMap map[string][]igeist.Executor

type ProcessingMetrics added in v0.4.4

type ProcessingMetrics struct {
	Events         int64
	DurationMicros int64
	Bytes          int64
	Operations     int64
}

Event processing metrics. Using int64 is safe here: Total Events processed will work for 3 million years if having 100k events/sec Total processing DurationMicros will work for 290k years Total Bytes processed will work for 2856 years if ingesting at 100 MiB/sec

func (ProcessingMetrics) String added in v0.4.4

func (p ProcessingMetrics) String() string

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(
	spec *entity.Spec,
	instance string,
	extractor entity.Extractor,
	transformer igeist.Transformer,
	loader entity.Loader,
	sinkExtractor entity.Extractor) *Stream

func (*Stream) ExtractFromSink

func (s *Stream) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)

ExtractFromSink fetches data stored in the Stream's sink. It uses a sink extractor for that purpose.

func (*Stream) Extractor

func (s *Stream) Extractor() entity.Extractor

func (*Stream) Instance

func (s *Stream) Instance() string

func (*Stream) Loader

func (s *Stream) Loader() entity.Loader

func (*Stream) Publish

func (s *Stream) Publish(ctx context.Context, event []byte) (string, error)

Publish makes it possible for Stream clients to send events directly to the source of the stream

func (*Stream) Spec

func (s *Stream) Spec() *entity.Spec

func (*Stream) Transformer

func (s *Stream) Transformer() igeist.Transformer

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder(entityFactory igeist.StreamEntityFactory) *StreamBuilder

func (*StreamBuilder) Build

func (s *StreamBuilder) Build(ctx context.Context, spec *entity.Spec) (igeist.Stream, error)

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor is responsible for high-level lifecycle management of Geist streams. It initializes and starts up one or more Executor(s) per Stream Spec, in its own goroutine. Each Executor is given a newly created Stream entity comprising Extractor, Transformer and Loader objects, based on Spec and Deployment config.

func NewSupervisor

func NewSupervisor(
	ctx context.Context,
	config Config,
	streamBuilder igeist.StreamBuilder,
	registry igeist.StreamRegistry) (*Supervisor, error)

Supervisor expects the provided registry to be initialized with cached specs

func (*Supervisor) AdminEventHandler

func (s *Supervisor) AdminEventHandler() entity.Loader

AdminEventHandler returns the event receiver for admin events from pubsub

func (*Supervisor) Init

func (s *Supervisor) Init(ctx context.Context) error

func (*Supervisor) Metrics added in v0.4.0

func (s *Supervisor) Metrics() map[string]entity.Metrics

func (*Supervisor) RegisterExecutor

func (s *Supervisor) RegisterExecutor(ctx context.Context, executor igeist.Executor)

RegisterExecutor registers a single executor as the main one for a stream

func (*Supervisor) Registry

func (s *Supervisor) Registry() igeist.StreamRegistry

Registry returns the registry containing all registered stream specs

func (*Supervisor) Run

func (s *Supervisor) Run(ctx context.Context, ready *sync.WaitGroup) error

Run is the main entry point for GEIST execution of all streams

func (*Supervisor) Shutdown

func (s *Supervisor) Shutdown(ctx context.Context)

Shutdown is called by the service during shutdown

func (*Supervisor) Stream

func (s *Supervisor) Stream(id string) (igeist.Stream, error)

Stream returns the first (main) stream instance for a stream id, for use with getting stream spec info and stream publishing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL