etltest

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SpecApiSrcBigtableSinkMinimal        = "ApiSrcBigtableSinkMinimal"
	SpecApiSrcBigtableSinkFooRound       = "ApiSrcBigtableSinkFooRound"
	SpecKafkaSrcBigtableSinkPlayer       = "KafkaSrcBigtableSinkPlayer"
	SpecKafkaSrcBigtableSinkMultiSession = "KafkaSrcBigtableSinkMultiSession"
	SpecKafkaSrcBigtableSinkFeatureX     = "KafkaSrcBigtableSinkFeatureX"
	SpecRegSpecPubsub                    = "PubsubSrcFirestoreSinkRegSpec"
	SpecPubsubSrcKafkaSinkFoologs        = "PubsubSrcKafkaSinkFoologs"
)

Variables

This section is empty.

Functions

func GetAllSpecsRaw

func GetAllSpecsRaw(testDirPath string) map[string][]byte

Maybe not needed (although useful for test purposes)

func SpecSpec

func SpecSpec() *model.Spec

Convenience test function to get Spec for handling GEIST specs without loading Registry

func SpecSpecInMem

func SpecSpecInMem() *model.Spec

Types

type MockExecutor

type MockExecutor struct {
	// contains filtered or unexported fields
}

func NewExecutor

func NewExecutor(stream igeist.Stream) *MockExecutor

func (*MockExecutor) ProcessEvent

func (e *MockExecutor) ProcessEvent(ctx context.Context, events []model.Event) model.EventProcessingResult

func (*MockExecutor) Run

func (e *MockExecutor) Run(ctx context.Context, wg *sync.WaitGroup)

func (*MockExecutor) Shutdown

func (e *MockExecutor) Shutdown()

func (*MockExecutor) Spec

func (e *MockExecutor) Spec() igeist.Spec

func (*MockExecutor) Stream

func (e *MockExecutor) Stream() igeist.Stream

func (*MockExecutor) StreamId

func (e *MockExecutor) StreamId() string

type MockExtractor

type MockExtractor struct {
	// contains filtered or unexported fields
}

func NewMockExtractor

func NewMockExtractor(spec model.SourceConfig) *MockExtractor

func (*MockExtractor) Extract

func (m *MockExtractor) Extract(ctx context.Context, query model.ExtractorQuery, result any) (error, bool)

func (*MockExtractor) ExtractFromSink

func (m *MockExtractor) ExtractFromSink(ctx context.Context, query model.ExtractorQuery, result *[]*model.Transformed) (error, bool)

func (*MockExtractor) SendToSource

func (m *MockExtractor) SendToSource(ctx context.Context, eventData any) (string, error)

func (*MockExtractor) StreamExtract

func (m *MockExtractor) StreamExtract(
	ctx context.Context,
	reportEvent model.ProcessEventFunc,
	err *error,
	retryable *bool)

type MockLoader

type MockLoader struct {
}

func NewMockLoader

func NewMockLoader() *MockLoader

func (*MockLoader) Shutdown

func (s *MockLoader) Shutdown()

func (*MockLoader) StreamLoad

func (s *MockLoader) StreamLoad(ctx context.Context, data []*model.Transformed) (string, error, bool)

type MockTransformer

type MockTransformer struct {
	// contains filtered or unexported fields
}

func NewMockTransformer

func NewMockTransformer(spec model.Transform) *MockTransformer

func (*MockTransformer) Transform

func (s *MockTransformer) Transform(
	ctx context.Context,
	event []byte,
	retryable *bool) ([]*model.Transformed, error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(
	spec igeist.Spec,
	extractor igeist.Extractor,
	transformer igeist.Transformer,
	loader igeist.Loader,
	sinkExtractor igeist.Extractor) *Stream

func (*Stream) ExtractFromSink

func (s *Stream) ExtractFromSink(ctx context.Context, query model.ExtractorQuery, result *[]*model.Transformed) (error, bool)

func (*Stream) Extractor

func (s *Stream) Extractor() igeist.Extractor

func (*Stream) Instance

func (s *Stream) Instance() string

func (*Stream) Loader

func (s *Stream) Loader() igeist.Loader

func (*Stream) Publish

func (s *Stream) Publish(ctx context.Context, event []byte) (string, error)

func (*Stream) Spec

func (s *Stream) Spec() igeist.Spec

func (*Stream) Transformer

func (s *Stream) Transformer() igeist.Transformer

type StreamBuilder

type StreamBuilder struct {
	// contains filtered or unexported fields
}

func NewStreamBuilder

func NewStreamBuilder(entityFactory igeist.StreamEntityFactory) *StreamBuilder

func (*StreamBuilder) Build

func (s *StreamBuilder) Build(ctx context.Context, spec igeist.Spec) (igeist.Stream, error)

type StreamEntityFactory

type StreamEntityFactory struct {
	// contains filtered or unexported fields
}

func NewStreamEntityFactory

func NewStreamEntityFactory() *StreamEntityFactory

func (*StreamEntityFactory) CreateExtractor

func (s *StreamEntityFactory) CreateExtractor(ctx context.Context, etlSpec igeist.Spec, instanceId string) (igeist.Extractor, error)

func (*StreamEntityFactory) CreateLoader

func (s *StreamEntityFactory) CreateLoader(ctx context.Context, etlSpec igeist.Spec, instanceId string) (igeist.Loader, error)

func (*StreamEntityFactory) CreateSinkExtractor

func (s *StreamEntityFactory) CreateSinkExtractor(ctx context.Context, etlSpec igeist.Spec, instanceId string) (igeist.Extractor, error)

func (*StreamEntityFactory) CreateTransformer

func (s *StreamEntityFactory) CreateTransformer(ctx context.Context, etlSpec igeist.Spec) (igeist.Transformer, error)

func (*StreamEntityFactory) SetAdminLoader

func (s *StreamEntityFactory) SetAdminLoader(loader igeist.Loader)

type StreamRegistry

type StreamRegistry struct {
	// contains filtered or unexported fields
}

In-memory SpecRegistry implementation for GEIST specs for testing purposes For real usage this is replaced by a proper cloud db backed registry implementation like a cloud firestore one, with dynamic updates of specs.

func NewStreamRegistry

func NewStreamRegistry(testDirPath string) *StreamRegistry

func (*StreamRegistry) Delete

func (r *StreamRegistry) Delete(ctx context.Context, id string) error

func (*StreamRegistry) Exists

func (r *StreamRegistry) Exists(id string) bool

func (*StreamRegistry) ExistsSameVersion

func (r *StreamRegistry) ExistsSameVersion(specBytes []byte) (bool, error)

func (*StreamRegistry) Fetch

func (r *StreamRegistry) Fetch(ctx context.Context) error

func (*StreamRegistry) Get

func (r *StreamRegistry) Get(ctx context.Context, id string) (igeist.Spec, error)

func (*StreamRegistry) GetAll

func (r *StreamRegistry) GetAll(ctx context.Context) (map[string]igeist.Spec, error)

func (*StreamRegistry) ProcessEvent

func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []model.Event) model.EventProcessingResult

func (*StreamRegistry) Put

func (r *StreamRegistry) Put(ctx context.Context, id string, spec igeist.Spec) error

func (*StreamRegistry) Run

func (r *StreamRegistry) Run(ctx context.Context, wg *sync.WaitGroup)

func (*StreamRegistry) SetAdminStream

func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)

func (*StreamRegistry) Shutdown

func (r *StreamRegistry) Shutdown()

func (*StreamRegistry) Spec

func (r *StreamRegistry) Spec() igeist.Spec

func (*StreamRegistry) Stream

func (r *StreamRegistry) Stream() igeist.Stream

func (*StreamRegistry) StreamId

func (r *StreamRegistry) StreamId() string

func (*StreamRegistry) Validate

func (r *StreamRegistry) Validate(specBytes []byte) (igeist.Spec, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL