Documentation ¶
Index ¶
- Variables
- func EnrichEvent(event []byte, path string, value any) ([]byte, error)
- type AdminStreamConfig
- type Config
- type Geist
- func (g *Geist) Entities() map[string]map[string]bool
- func (g *Geist) GetStreamSpec(streamId string) (specData []byte, err error)
- func (g *Geist) GetStreamSpecs(ctx context.Context) (specs map[string][]byte, err error)
- func (g *Geist) Publish(ctx context.Context, streamId string, event []byte) (id string, err error)
- func (g *Geist) RegisterStream(ctx context.Context, specData []byte) (id string, err error)
- func (g *Geist) Run(ctx context.Context) (err error)
- func (g *Geist) Shutdown(ctx context.Context) (err error)
- func (g *Geist) ValidateStreamSpec(specData []byte) (specId string, err error)
- type HookConfig
- type OpsConfig
- type SpecRegistryConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrConfigNotInitialized = errors.New("geist.Config need to be created with NewConfig()") ErrGeistNotInitialized = errors.New("geist not initialized") ErrSpecAlreadyExists = errors.New("stream ID already exists with that version - increment version number to upgrade") ErrInvalidStreamSpec = errors.New("stream Specification is not valid") ErrInvalidStreamId = errors.New("invalid Stream ID") ErrProtectedStreamId = errors.New("spec format is valid but but stream ID is protected and cannot be used") ErrCodeInvalidSpecRegOp = errors.New("use RegisterStream() instead to register new streams") ErrInternalDataProcessing = errors.New("internal data processing error") ErrInvalidEntityId = errors.New("invalid source/sink ID") )
Error values returned by Geist API. Many of these errors will also contain additional details about the error. Error matching can still be done with 'if errors.Is(err, ErrInvalidStreamId)' etc. due to error wrapping.
Functions ¶
func EnrichEvent ¶ added in v0.3.1
EnrichEvent is a convenience function that could be used for event enrichment purposes inside a hook function as specified in geist.Config.Hooks. It's a wrapper on the sjson package. See doc at https://github.com/tidwall/sjson.
Types ¶
type AdminStreamConfig ¶
type AdminStreamConfig struct {
StreamSpec []byte
}
AdminStreamConfig specifies how the internal cross-pod admin event propagation should be set up. Providing a custom admin spec is advanced usage and should be done with care. It is only required to be filled in if diverting from default behavior, which is the following:
- If SpecRegistryConfig.StorageMode is set to "inmemory" (default), the admin stream is disabled (cross-pod sync not needed)
- If SpecRegistryConfig.StorageMode is set to "native" (firestore) or "custom", the admin stream will use GCP Pubsub for event propagation (pubsub set as its stream source type).
For complete customization, set StorageMode to "custom" and provide both a Stream and an Admin Spec.
type Config ¶
type Config struct { Registry SpecRegistryConfig AdminStream AdminStreamConfig Hooks HookConfig Ops OpsConfig // contains filtered or unexported fields }
Config needs to be created with NewConfig() and filled in with config as applicable for the intended setup, and provided in the call to geist.New(). All config fields are optional.
func NewConfig ¶ added in v0.3.0
func NewConfig() *Config
NewConfig returns an initialized Config struct, required for geist.New(). With this config applicable Source/Sink extractors/loaders should be registered before calling geist.New().
func (*Config) RegisterExtractorType ¶ added in v0.3.0
func (c *Config) RegisterExtractorType(extractorFactory entity.ExtractorFactory) error
RegisterExtractorType is used to prepare config for Geist to make this particular Source/Extractor type available for stream specs to use. This can only be done after a geist.NewConfig() and prior to creating Geist with geist.New().
func (*Config) RegisterLoaderType ¶ added in v0.3.0
func (c *Config) RegisterLoaderType(loaderFactory entity.LoaderFactory) error
RegisterLoaderType is used to prepare config for Geist to make this particular Sink/Loader type available for stream specs to use. This can only be done after a geist.NewConfig() and prior to creating Geist with geist.New().
type Geist ¶
type Geist struct {
// contains filtered or unexported fields
}
func New ¶
New creates and configures Geist's internal services and all previously registered streams, based on the provided config, which needs to be initially created with NewConfig().
func (*Geist) Entities ¶ added in v0.3.0
Entities returns IDs of all registered Extractors/Loaders for each Source/Sink. The keys for the first map are:
"extractor" "loader"
Each of those keys holds the id/name of the source/sink type that have been registered
Example of output if marshalled to JSON
{"extractor":{"source1":true,"source2":true},"loader":{"sink1":true,"sink2":true}}
func (*Geist) GetStreamSpec ¶
GetStreamSpec returns the full stream spec for a specific stream ID
func (*Geist) GetStreamSpecs ¶
GetStreamSpecs returns all registered stream specs
func (*Geist) Publish ¶
Publish sends the provided event to the source extractor of the stream as identified by streamId, if that extractor type supports this optional functionality. Currently known source types that supports this is the internal "geistapi" and the GCP source "pubsub".
The returned ID string (or resource ID) is dependent on the sink type, but is defined as the key to be used for key lookups of the event. It is created based on the Sink configuration in the Stream Spec for sink types supporting it, for example:
Firestore: ID is the generated Firestore entity name, as specified in the entityNameFromIds section of the sink spec. BigTable: ID is the generated row-key, as specified in the rowKey section of the sink spec.
func (*Geist) RegisterStream ¶
RegisterStream validates and persist the stream spec in the chosen registry implementation. If successful, the registered stream is started up immediately, and the generated ID of the stream is returned.
func (*Geist) Run ¶
Run starts up Geist's internal services and all previously registered streams (if Geist configured to use persistent registry), as prepared by New(). It is a blocking call until Geist is shut down, from a call to Shutdown or if its parent context is canceled. Geist must be up and running prior to usage of the other Geist API functions.
type HookConfig ¶ added in v0.3.0
type HookConfig struct {
PreTransformHookFunc entity.PreTransformHookFunc
}
HookConfig enables a Geist client to inject custom logic to the stream processing, such as enrichment, deduplication, and filtering (if existing spec transform options not suitable).
type SpecRegistryConfig ¶
type SpecRegistryConfig struct { // StorageMode specifies how Geist should store stream specs to be run. // The follow values are available: // // "inmemory": (default) registered specs are stored in memory only // "native": specs are persisted in the sink specified in the native stream spec for the // spec registration stream (using Firestore as default sink) // "custom": the client of Geist need to provide a Stream Spec for the Spec Registration Stream // in the RegSpec field (advanced usage) StorageMode string // StreamSpec specifies the Stream Spec for the internal stream, handling Stream Spec registrations, // if StorageMode is set to "custom". StreamSpec []byte }
SpecRegistryConfig is only required to be filled in if persistence of specs is required. Normally only StorageMode field is needed for setting up intended behaviour, switching from in-mem storage to native persistence. Future updates might add more built-in "native" storage modes.