Documentation ¶
Index ¶
- Constants
- type Config
- type StreamRegistry
- func (r *StreamRegistry) Delete(ctx context.Context, id string) error
- func (r *StreamRegistry) Exists(id string) bool
- func (r *StreamRegistry) ExistsWithSameOrHigherVersion(specBytes []byte) (bool, error)
- func (r *StreamRegistry) Fetch(ctx context.Context) error
- func (r *StreamRegistry) Get(ctx context.Context, id string) (igeist.Spec, error)
- func (r *StreamRegistry) GetAll(ctx context.Context) (map[string]igeist.Spec, error)
- func (r *StreamRegistry) Metrics() entity.Metrics
- func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
- func (r *StreamRegistry) Put(ctx context.Context, id string, spec igeist.Spec) error
- func (r *StreamRegistry) Run(ctx context.Context, wg *sync.WaitGroup)
- func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)
- func (r *StreamRegistry) SetLoader(loader entity.Loader)
- func (r *StreamRegistry) Shutdown()
- func (r *StreamRegistry) Stream() igeist.Stream
- func (r *StreamRegistry) StreamId() string
- func (r *StreamRegistry) Validate(specBytes []byte) (igeist.Spec, error)
Constants ¶
const RawEventField = "rawEvent"
Regardless of DB implementation for Registry, it requires the ETL spec to use RawEventField as the key for storing each spec.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { StorageMode admin.RegStorageMode RegSpec []byte // currently only needed outside during initialization of StreamRegistry }
type StreamRegistry ¶
type StreamRegistry struct {
// contains filtered or unexported fields
}
StreamRegistry implements both the Registry and the Executor interfaces so that it can serve both as the bootstrap/CRUD Registry service for the supervisor, and as the continuous spec registration service via GEIST REST API, ingesting specs in an ETL Stream with arbitrary Sinks/Loaders.
func NewStreamRegistry ¶
func NewStreamRegistry(config Config, executor igeist.Executor, notifyChan entity.NotifyChan, logging bool) *StreamRegistry
func (*StreamRegistry) Delete ¶
func (r *StreamRegistry) Delete(ctx context.Context, id string) error
func (*StreamRegistry) Exists ¶
func (r *StreamRegistry) Exists(id string) bool
func (*StreamRegistry) ExistsWithSameOrHigherVersion ¶ added in v0.4.2
func (r *StreamRegistry) ExistsWithSameOrHigherVersion(specBytes []byte) (bool, error)
func (*StreamRegistry) Metrics ¶ added in v0.4.0
func (r *StreamRegistry) Metrics() entity.Metrics
func (*StreamRegistry) ProcessEvent ¶
func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
func (*StreamRegistry) Put ¶
The StreamRegistry implementation of Registry.Put() only caches the spec in-memory, since the actual persistence of specs are done with its Stream ETL Executor implementation inside ProcessEvent(), which in turn relies on the assigned Loader to store the spec. It is currently only used internally from StreamRegistry's ProcessEvent function as part of Supervisor managed updated of new specs, thus no mutex needed on specs map.
func (*StreamRegistry) SetAdminStream ¶
func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)
func (*StreamRegistry) SetLoader ¶
func (r *StreamRegistry) SetLoader(loader entity.Loader)
func (*StreamRegistry) Shutdown ¶
func (r *StreamRegistry) Shutdown()
func (*StreamRegistry) Stream ¶
func (r *StreamRegistry) Stream() igeist.Stream
func (*StreamRegistry) StreamId ¶
func (r *StreamRegistry) StreamId() string