Documentation ¶
Index ¶
- Constants
- func GetAllSpecsRaw(testDirPath string) map[string][]byte
- func SpecSpec() *entity.Spec
- func SpecSpecInMem() *entity.Spec
- type MockExecutor
- func (r *MockExecutor) Metrics() entity.Metrics
- func (e *MockExecutor) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
- func (e *MockExecutor) Run(ctx context.Context, wg *sync.WaitGroup)
- func (e *MockExecutor) Shutdown()
- func (e *MockExecutor) Spec() igeist.Spec
- func (e *MockExecutor) Stream() igeist.Stream
- func (e *MockExecutor) StreamId() string
- type MockExtractor
- func (m *MockExtractor) Extract(ctx context.Context, query entity.ExtractorQuery, result any) (error, bool)
- func (m *MockExtractor) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, ...) (error, bool)
- func (m *MockExtractor) SendToSource(ctx context.Context, eventData any) (string, error)
- func (m *MockExtractor) StreamExtract(ctx context.Context, reportEvent entity.ProcessEventFunc, err *error, ...)
- type MockLoader
- type MockTransformer
- type Stream
- func (s *Stream) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, ...) (error, bool)
- func (s *Stream) Extractor() entity.Extractor
- func (s *Stream) Instance() string
- func (s *Stream) Loader() entity.Loader
- func (s *Stream) Publish(ctx context.Context, event []byte) (string, error)
- func (s *Stream) Spec() igeist.Spec
- func (s *Stream) Transformer() igeist.Transformer
- type StreamBuilder
- type StreamEntityFactory
- func (s *StreamEntityFactory) CreateExtractor(ctx context.Context, etlSpec igeist.Spec, instanceId string) (entity.Extractor, error)
- func (s *StreamEntityFactory) CreateLoader(ctx context.Context, etlSpec igeist.Spec, instanceId string) (entity.Loader, error)
- func (s *StreamEntityFactory) CreateSinkExtractor(ctx context.Context, etlSpec igeist.Spec, instanceId string) (entity.Extractor, error)
- func (s *StreamEntityFactory) CreateTransformer(ctx context.Context, etlSpec igeist.Spec) (igeist.Transformer, error)
- func (s *StreamEntityFactory) SetAdminLoader(loader entity.Loader)
- type StreamRegistry
- func (r *StreamRegistry) Delete(ctx context.Context, id string) error
- func (r *StreamRegistry) Exists(id string) bool
- func (r *StreamRegistry) ExistsSameVersion(specBytes []byte) (bool, error)
- func (r *StreamRegistry) Fetch(ctx context.Context) error
- func (r *StreamRegistry) Get(ctx context.Context, id string) (igeist.Spec, error)
- func (r *StreamRegistry) GetAll(ctx context.Context) (map[string]igeist.Spec, error)
- func (r *StreamRegistry) Metrics() entity.Metrics
- func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
- func (r *StreamRegistry) Put(ctx context.Context, id string, spec igeist.Spec) error
- func (r *StreamRegistry) Run(ctx context.Context, wg *sync.WaitGroup)
- func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)
- func (r *StreamRegistry) Shutdown()
- func (r *StreamRegistry) Spec() igeist.Spec
- func (r *StreamRegistry) Stream() igeist.Stream
- func (r *StreamRegistry) StreamId() string
- func (r *StreamRegistry) Validate(specBytes []byte) (igeist.Spec, error)
Constants ¶
View Source
const ( SpecApiSrcBigtableSinkMinimal = "ApiSrcBigtableSinkMinimal" SpecApiSrcBigtableSinkFooRound = "ApiSrcBigtableSinkFooRound" SpecKafkaSrcBigtableSinkPlayer = "KafkaSrcBigtableSinkPlayer" SpecKafkaSrcBigtableSinkMultiSession = "KafkaSrcBigtableSinkMultiSession" SpecKafkaSrcBigtableSinkFeatureX = "KafkaSrcBigtableSinkFeatureX" SpecRegSpecPubsub = "PubsubSrcFirestoreSinkRegSpec" SpecPubsubSrcKafkaSinkFoologs = "PubsubSrcKafkaSinkFoologs" )
View Source
const ( EntityKafka = "kafka" EntityPubsub = "pubsub" EntityFirestore = "firestore" EntityBigTable = "bigtable" EntityBigQuery = "bigquery" )
The following constants are only used here locally for test spec purposes, and there are no dependencies to the real plugin entities, like kafka, etc.
Variables ¶
This section is empty.
Functions ¶
func GetAllSpecsRaw ¶
Maybe not needed (although useful for test purposes)
func SpecSpec ¶
Convenience test function to get Spec for handling GEIST specs without loading Registry
func SpecSpecInMem ¶
Types ¶
type MockExecutor ¶
type MockExecutor struct {
// contains filtered or unexported fields
}
func NewExecutor ¶
func NewExecutor(stream igeist.Stream) *MockExecutor
func (*MockExecutor) Metrics ¶ added in v0.4.0
func (r *MockExecutor) Metrics() entity.Metrics
func (*MockExecutor) ProcessEvent ¶
func (e *MockExecutor) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
func (*MockExecutor) Shutdown ¶
func (e *MockExecutor) Shutdown()
func (*MockExecutor) Spec ¶
func (e *MockExecutor) Spec() igeist.Spec
func (*MockExecutor) Stream ¶
func (e *MockExecutor) Stream() igeist.Stream
func (*MockExecutor) StreamId ¶
func (e *MockExecutor) StreamId() string
type MockExtractor ¶
type MockExtractor struct {
// contains filtered or unexported fields
}
func NewMockExtractor ¶
func NewMockExtractor(spec entity.SourceConfig) *MockExtractor
func (*MockExtractor) Extract ¶
func (m *MockExtractor) Extract(ctx context.Context, query entity.ExtractorQuery, result any) (error, bool)
func (*MockExtractor) ExtractFromSink ¶
func (m *MockExtractor) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)
func (*MockExtractor) SendToSource ¶
func (*MockExtractor) StreamExtract ¶
func (m *MockExtractor) StreamExtract( ctx context.Context, reportEvent entity.ProcessEventFunc, err *error, retryable *bool)
type MockLoader ¶
type MockLoader struct { }
func NewMockLoader ¶
func NewMockLoader() *MockLoader
func (*MockLoader) Shutdown ¶
func (s *MockLoader) Shutdown()
func (*MockLoader) StreamLoad ¶
func (s *MockLoader) StreamLoad(ctx context.Context, data []*entity.Transformed) (string, error, bool)
type MockTransformer ¶
type MockTransformer struct {
// contains filtered or unexported fields
}
func NewMockTransformer ¶
func NewMockTransformer(spec entity.Transform) *MockTransformer
func (*MockTransformer) Transform ¶
func (s *MockTransformer) Transform( ctx context.Context, event []byte, retryable *bool) ([]*entity.Transformed, error)
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
func (*Stream) ExtractFromSink ¶
func (s *Stream) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)
func (*Stream) Transformer ¶
func (s *Stream) Transformer() igeist.Transformer
type StreamBuilder ¶
type StreamBuilder struct {
// contains filtered or unexported fields
}
func NewStreamBuilder ¶
func NewStreamBuilder(entityFactory igeist.StreamEntityFactory) *StreamBuilder
type StreamEntityFactory ¶
type StreamEntityFactory struct {
// contains filtered or unexported fields
}
func NewStreamEntityFactory ¶
func NewStreamEntityFactory() *StreamEntityFactory
func (*StreamEntityFactory) CreateExtractor ¶
func (*StreamEntityFactory) CreateLoader ¶
func (*StreamEntityFactory) CreateSinkExtractor ¶
func (*StreamEntityFactory) CreateTransformer ¶
func (s *StreamEntityFactory) CreateTransformer(ctx context.Context, etlSpec igeist.Spec) (igeist.Transformer, error)
func (*StreamEntityFactory) SetAdminLoader ¶
func (s *StreamEntityFactory) SetAdminLoader(loader entity.Loader)
type StreamRegistry ¶
type StreamRegistry struct {
// contains filtered or unexported fields
}
In-memory SpecRegistry implementation for GEIST specs for testing purposes For real usage this is replaced by a proper cloud db backed registry implementation like a cloud firestore one, with dynamic updates of specs.
func NewStreamRegistry ¶
func NewStreamRegistry(testDirPath string) *StreamRegistry
func (*StreamRegistry) Delete ¶
func (r *StreamRegistry) Delete(ctx context.Context, id string) error
func (*StreamRegistry) Exists ¶
func (r *StreamRegistry) Exists(id string) bool
func (*StreamRegistry) ExistsSameVersion ¶
func (r *StreamRegistry) ExistsSameVersion(specBytes []byte) (bool, error)
func (*StreamRegistry) Metrics ¶ added in v0.4.0
func (r *StreamRegistry) Metrics() entity.Metrics
func (*StreamRegistry) ProcessEvent ¶
func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
func (*StreamRegistry) SetAdminStream ¶
func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)
func (*StreamRegistry) Shutdown ¶
func (r *StreamRegistry) Shutdown()
func (*StreamRegistry) Spec ¶
func (r *StreamRegistry) Spec() igeist.Spec
func (*StreamRegistry) Stream ¶
func (r *StreamRegistry) Stream() igeist.Stream
func (*StreamRegistry) StreamId ¶
func (r *StreamRegistry) StreamId() string
Click to show internal directories.
Click to hide internal directories.