Documentation ¶
Index ¶
- Variables
- type AdminEventHandler
- type Config
- type Executor
- func (e *Executor) Metrics() entity.Metrics
- func (e *Executor) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
- func (e *Executor) Run(ctx context.Context, wg *sync.WaitGroup)
- func (e *Executor) Shutdown(ctx context.Context)
- func (e *Executor) Stream() igeist.Stream
- func (e *Executor) StreamId() string
- type ExecutorMap
- type ProcessingMetrics
- type Stream
- func (s *Stream) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, ...) (error, bool)
- func (s *Stream) Extractor() entity.Extractor
- func (s *Stream) Instance() string
- func (s *Stream) Loader() entity.Loader
- func (s *Stream) Publish(ctx context.Context, event []byte) (string, error)
- func (s *Stream) Spec() *entity.Spec
- func (s *Stream) Transformer() igeist.Transformer
- type StreamBuilder
- type Supervisor
- func (s *Supervisor) AdminEventHandler() entity.Loader
- func (s *Supervisor) Init(ctx context.Context) error
- func (s *Supervisor) Metrics() map[string]entity.Metrics
- func (s *Supervisor) RegisterExecutor(ctx context.Context, executor igeist.Executor)
- func (s *Supervisor) Registry() igeist.StreamRegistry
- func (s *Supervisor) Run(ctx context.Context, ready *sync.WaitGroup) error
- func (s *Supervisor) Shutdown(ctx context.Context)
- func (s *Supervisor) Stream(id string) (igeist.Stream, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrHookUnretryableError = errors.New("PreTransfromHookFunc reported unretryable error") ErrHookInvalidAction = errors.New("PreTransfromHookFunc returned invalid action value") )
Functions ¶
This section is empty.
Types ¶
type AdminEventHandler ¶
type AdminEventHandler struct {
// contains filtered or unexported fields
}
AdminEventHandler implements the igeist.Loader interface which will be called in the stream where the loader entity is set to 'Admin'. This is the sink in the stream listening for admin events.
func (*AdminEventHandler) Shutdown ¶
func (a *AdminEventHandler) Shutdown(ctx context.Context)
func (*AdminEventHandler) StreamLoad ¶
func (a *AdminEventHandler) StreamLoad(ctx context.Context, data []*entity.Transformed) (string, error, bool)
type Config ¶
type Config struct { RegSpec *entity.Spec // Built-in spec for GEIST spec registrations AdminSpec *entity.Spec // Built-in spec for GEIST admin event notifications PreTransformHookFunc entity.PreTransformHookFunc `json:"-"` MaxStreamRetryIntervalSec int NotifyChan entity.NotifyChan `json:"-"` Log bool EventLogInterval int }
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Stream Executors operates an ETL stream, from Source to Transform to Sink, as specified by a single GEIST stream spec. The stream it is executing is configured and instantiated by the Supervisor.
func (*Executor) ProcessEvent ¶
func (e *Executor) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
ProcessEvent is called by Extractor when event extracted from source. This design is chosen instead of a channel based one, to ensure efficient and reliable offset commit/pubsub ack only when sink success is ensured. It also reduces transloading latency to a minimum. TODO: Add better description and usage of the key parameter (it is currently sent by Kafka Extractors, as the message key). If event processing is successful result.Error will be nil, and result.Status will be set to ExecutorStatusSuccessful. If executor is shutting down, result.Error will be non-nil and result.Status will be set to ExecutorStatusShuttingDown
type ExecutorMap ¶
type ProcessingMetrics ¶ added in v0.4.4
Event processing metrics. Using int64 is safe here: Total Events processed will work for 3 million years if having 100k events/sec Total processing DurationMicros will work for 290k years Total Bytes processed will work for 2856 years if ingesting at 100 MiB/sec
func (ProcessingMetrics) String ¶ added in v0.4.4
func (p ProcessingMetrics) String() string
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
func (*Stream) ExtractFromSink ¶
func (s *Stream) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)
ExtractFromSink fetches data stored in the Stream's sink. It uses a sink extractor for that purpose.
func (*Stream) Publish ¶
Publish makes it possible for Stream clients to send events directly to the source of the stream
func (*Stream) Transformer ¶
func (s *Stream) Transformer() igeist.Transformer
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder(entityFactory igeist.StreamEntityFactory) *StreamBuilder
type Supervisor ¶
type Supervisor struct {
// contains filtered or unexported fields
}
Supervisor is responsible for high-level lifecycle management of Geist streams. It initializes and starts up one or more Executor(s) per Stream Spec, in its own goroutine. Each Executor is given a newly created Stream entity comprising Extractor, Transformer and Loader objects, based on Spec and Deployment config.
func NewSupervisor ¶
func NewSupervisor( ctx context.Context, config Config, streamBuilder igeist.StreamBuilder, registry igeist.StreamRegistry) (*Supervisor, error)
Supervisor expects the provided registry to be initialized with cached specs
func (*Supervisor) AdminEventHandler ¶
func (s *Supervisor) AdminEventHandler() entity.Loader
AdminEventHandler returns the event receiver for admin events from pubsub
func (*Supervisor) Metrics ¶ added in v0.4.0
func (s *Supervisor) Metrics() map[string]entity.Metrics
func (*Supervisor) RegisterExecutor ¶
func (s *Supervisor) RegisterExecutor(ctx context.Context, executor igeist.Executor)
RegisterExecutor registers a single executor as the main one for a stream
func (*Supervisor) Registry ¶
func (s *Supervisor) Registry() igeist.StreamRegistry
Registry returns the registry containing all registered stream specs
func (*Supervisor) Shutdown ¶
func (s *Supervisor) Shutdown(ctx context.Context)
Shutdown is called by the service during shutdown