Documentation ¶
Index ¶
- Constants
- type Config
- type StreamRegistry
- func (r *StreamRegistry) Delete(ctx context.Context, id string) error
- func (r *StreamRegistry) Exists(id string) bool
- func (r *StreamRegistry) ExistsWithSameOrHigherVersion(specBytes []byte) (bool, error)
- func (r *StreamRegistry) Fetch(ctx context.Context) error
- func (r *StreamRegistry) Get(ctx context.Context, id string) (*entity.Spec, error)
- func (r *StreamRegistry) GetAll(ctx context.Context) (map[string]*entity.Spec, error)
- func (r *StreamRegistry) Metrics() entity.Metrics
- func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
- func (r *StreamRegistry) Put(ctx context.Context, id string, spec *entity.Spec) (err error)
- func (r *StreamRegistry) Run(ctx context.Context, wg *sync.WaitGroup)
- func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)
- func (r *StreamRegistry) SetLoader(loader entity.Loader)
- func (r *StreamRegistry) Shutdown(ctx context.Context)
- func (r *StreamRegistry) Stream() igeist.Stream
- func (r *StreamRegistry) StreamId() string
- func (r *StreamRegistry) Validate(specBytes []byte) (*entity.Spec, error)
Constants ¶
const RawEventField = "rawEvent"
Regardless of DB implementation for Registry, it requires the ETL spec to use RawEventField as the key for storing each spec.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Specifies which mode to use (in-memory, custom, etc) StorageMode admin.RegStorageMode // RegSpec is only used when StorageMode is set to admin.RegStorageCustom and contains // the spec registration spec to use. It is currently only needed by clients to Registry // during initialization of StreamRegistry. RegSpec []byte // Specifies which environment string to match against stream specs using the OpsPerEnv // part of the spec. If empty it is disregarded. Env string }
type StreamRegistry ¶
type StreamRegistry struct {
// contains filtered or unexported fields
}
StreamRegistry implements both the Registry and the Executor interfaces so that it can serve both as the bootstrap/CRUD Registry service for the supervisor, and as the continuous spec registration service via GEIST REST API, ingesting specs in an ETL Stream with arbitrary Sinks/Loaders.
func NewStreamRegistry ¶
func NewStreamRegistry(config Config, executor igeist.Executor, notifyChan entity.NotifyChan, logging bool) *StreamRegistry
func (*StreamRegistry) Delete ¶
func (r *StreamRegistry) Delete(ctx context.Context, id string) error
func (*StreamRegistry) Exists ¶
func (r *StreamRegistry) Exists(id string) bool
func (*StreamRegistry) ExistsWithSameOrHigherVersion ¶ added in v0.4.2
func (r *StreamRegistry) ExistsWithSameOrHigherVersion(specBytes []byte) (bool, error)
func (*StreamRegistry) Metrics ¶ added in v0.4.0
func (r *StreamRegistry) Metrics() entity.Metrics
func (*StreamRegistry) ProcessEvent ¶
func (r *StreamRegistry) ProcessEvent(ctx context.Context, events []entity.Event) entity.EventProcessingResult
func (*StreamRegistry) Put ¶
The StreamRegistry implementation of Registry.Put() only caches the spec in-memory, since the actual persistence of specs are done with its Stream Executor implementation inside ProcessEvent(), which in turn relies on the assigned Loader to store the spec. It is currently only used internally from StreamRegistry's ProcessEvent function as part of Supervisor managed updated of new specs, thus no mutex needed on specs map.
func (*StreamRegistry) SetAdminStream ¶
func (r *StreamRegistry) SetAdminStream(stream igeist.Stream)
func (*StreamRegistry) SetLoader ¶
func (r *StreamRegistry) SetLoader(loader entity.Loader)
func (*StreamRegistry) Shutdown ¶
func (r *StreamRegistry) Shutdown(ctx context.Context)
func (*StreamRegistry) Stream ¶
func (r *StreamRegistry) Stream() igeist.Stream
func (*StreamRegistry) StreamId ¶
func (r *StreamRegistry) StreamId() string