etltest

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SpecApiSrcBigtableSinkMinimal        = "ApiSrcBigtableSinkMinimal"
	SpecApiSrcBigtableSinkFooRound       = "ApiSrcBigtableSinkFooRound"
	SpecKafkaSrcBigtableSinkPlayer       = "KafkaSrcBigtableSinkPlayer"
	SpecKafkaSrcBigtableSinkMultiSession = "KafkaSrcBigtableSinkMultiSession"
	SpecKafkaSrcBigtableSinkFeatureX     = "KafkaSrcBigtableSinkFeatureX"
	SpecRegSpecPubsub                    = "PubsubSrcFirestoreSinkRegSpec"
	SpecPubsubSrcKafkaSinkFoologs        = "PubsubSrcKafkaSinkFoologs"
)
View Source
const (
	EntityKafka     = "kafka"
	EntityPubsub    = "pubsub"
	EntityFirestore = "firestore"
	EntityBigTable  = "bigtable"
	EntityBigQuery  = "bigquery"
)

The following constants are only used here locally for test spec purposes, and there are no dependencies to the real plugin entities, like kafka, etc.

Variables

This section is empty.

Functions

func GetAllSpecsRaw

func GetAllSpecsRaw(testDirPath string) map[string][]byte

Maybe not needed (although useful for test purposes)

func SpecSpec

func SpecSpec() *entity.Spec

Convenience test function to get Spec for handling GEIST specs without loading Registry

func SpecSpecInMem

func SpecSpecInMem() *entity.Spec

Types

type MockExecutor

type MockExecutor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(stream igeist.Stream) *MockExecutor

func (*MockExecutor) Metrics added in v0.4.0

func (r *MockExecutor) Metrics() entity.Metrics

func (*MockExecutor) ProcessEvent

func (e *MockExecutor) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult

func (*MockExecutor) Run

func (e *MockExecutor) Run(ctx context.Context, wg *sync.WaitGroup)

func (*MockExecutor) Shutdown

func (e *MockExecutor) Shutdown(ctx context.Context)

func (*MockExecutor) Spec

func (e *MockExecutor) Spec() *entity.Spec

func (*MockExecutor) Stream

func (e *MockExecutor) Stream() igeist.Stream

func (*MockExecutor) StreamId

func (e *MockExecutor) StreamId() string

type MockExtractor

type MockExtractor struct {
	// contains filtered or unexported fields
}

func NewMockExtractor

func NewMockExtractor(spec entity.SourceConfig) *MockExtractor

func (*MockExtractor) Extract

func (m *MockExtractor) Extract(ctx context.Context, query entity.ExtractorQuery, result any) (error, bool)

func (*MockExtractor) ExtractFromSink

func (m *MockExtractor) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)

func (*MockExtractor) SendToSource

func (m *MockExtractor) SendToSource(ctx context.Context, eventData any) (string, error)

func (*MockExtractor) StreamExtract

func (m *MockExtractor) StreamExtract(
	ctx context.Context,
	reportEvent entity.ProcessEventFunc,
	err *error,
	retryable *bool)

type MockLoader

type MockLoader struct {
}

func NewMockLoader

func NewMockLoader() *MockLoader

func (*MockLoader) Shutdown

func (s *MockLoader) Shutdown(ctx context.Context)

func (*MockLoader) StreamLoad

func (s *MockLoader) StreamLoad(ctx context.Context, data []*entity.Transformed) (string, error, bool)

type MockTransformer

type MockTransformer struct {
	// contains filtered or unexported fields
}

func NewMockTransformer

func NewMockTransformer(spec entity.Transform) *MockTransformer

func (*MockTransformer) Transform

func (s *MockTransformer) Transform(
	ctx context.Context,
	event []byte,
	retryable *bool) ([]*entity.Transformed, error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(
	spec *entity.Spec,
	extractor entity.Extractor,
	transformer igeist.Transformer,
	loader entity.Loader,
	sinkExtractor entity.Extractor) *Stream

func (*Stream) ExtractFromSink

func (s *Stream) ExtractFromSink(ctx context.Context, query entity.ExtractorQuery, result *[]*entity.Transformed) (error, bool)

func (*Stream) Extractor

func (s *Stream) Extractor() entity.Extractor

func (*Stream) Instance

func (s *Stream) Instance() string

func (*Stream) Loader

func (s *Stream) Loader() entity.Loader

func (*Stream) Publish

func (s *Stream) Publish(ctx context.Context, event []byte) (string, error)

func (*Stream) Spec

func (s *Stream) Spec() *entity.Spec

func (*Stream) Transformer

func (s *Stream) Transformer() igeist.Transformer

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder(entityFactory igeist.StreamEntityFactory) *StreamBuilder

func (*StreamBuilder) Build

func (s *StreamBuilder) Build(ctx context.Context, spec *entity.Spec) (igeist.Stream, error)

type StreamEntityFactory

type StreamEntityFactory struct {
	// contains filtered or unexported fields
}

func NewStreamEntityFactory

func NewStreamEntityFactory() *StreamEntityFactory

func (*StreamEntityFactory) CreateExtractor

func (s *StreamEntityFactory) CreateExtractor(ctx context.Context, spec *entity.Spec, instanceId string) (entity.Extractor, error)

func (*StreamEntityFactory) CreateLoader

func (s *StreamEntityFactory) CreateLoader(ctx context.Context, spec *entity.Spec, instanceId string) (entity.Loader, error)

func (*StreamEntityFactory) CreateSinkExtractor

func (s *StreamEntityFactory) CreateSinkExtractor(ctx context.Context, spec *entity.Spec, instanceId string) (entity.Extractor, error)

func (*StreamEntityFactory) CreateTransformer

func (s *StreamEntityFactory) CreateTransformer(ctx context.Context, spec *entity.Spec) (igeist.Transformer, error)

func (*StreamEntityFactory) SetAdminLoader

func (s *StreamEntityFactory) SetAdminLoader(loader entity.Loader)

type StreamRegistry

type StreamRegistry struct {
	// contains filtered or unexported fields
}

In-memory SpecRegistry implementation for GEIST specs for testing purposes For real usage this is replaced by a proper cloud db backed registry implementation like a cloud firestore one, with dynamic updates of specs.

func NewStreamRegistry

func NewStreamRegistry(testDirPath string) *StreamRegistry

func (*StreamRegistry) Delete

func (r *StreamRegistry) Delete(ctx context.Context, id string) error

func (*StreamRegistry) Exists

func (r *StreamRegistry) Exists(id string) bool

func (*StreamRegistry) ExistsWithSameOrHigherVersion added in v0.4.2

func (r *StreamRegistry) ExistsWithSameOrHigherVersion(specBytes []byte) (bool, error)

func (*StreamRegistry) Fetch

func (r *StreamRegistry) Fetch(ctx context.Context) error

func (*StreamRegistry) Get

func (r *StreamRegistry) Get(ctx context.Context, id string) (*entity.Spec, error)

func (*StreamRegistry) GetAll

func (r *StreamRegistry) GetAll(ctx context.Context) (map[string]*entity.Spec, error)

func (*StreamRegistry) Metrics added in v0.4.0

func (r *StreamRegistry) Metrics() entity.Metrics

func (*StreamRegistry) ProcessEvent

func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult

func (*StreamRegistry) Put

func (r *StreamRegistry) Put(ctx context.Context, id string, spec *entity.Spec) error

func (*StreamRegistry) Run

func (r *StreamRegistry) Run(ctx context.Context, wg *sync.WaitGroup)

func (*StreamRegistry) SetAdminStream

func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)

func (*StreamRegistry) Shutdown

func (r *StreamRegistry) Shutdown(ctx context.Context)

func (*StreamRegistry) Spec

func (r *StreamRegistry) Spec() *entity.Spec

func (*StreamRegistry) Stream

func (r *StreamRegistry) Stream() igeist.Stream

func (*StreamRegistry) StreamId

func (r *StreamRegistry) StreamId() string

func (*StreamRegistry) Validate

func (r *StreamRegistry) Validate(specBytes []byte) (*entity.Spec, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL