Documentation ¶
Index ¶
- Variables
- func EnrichEvent(event []byte, path string, value any) ([]byte, error)
- type AdminStreamConfig
- type Config
- type Geist
- func (g *Geist) Entities() map[string]map[string]bool
- func (g *Geist) GetStreamSpec(streamId string) (specData []byte, err error)
- func (g *Geist) GetStreamSpecs(ctx context.Context) (specs map[string][]byte, err error)
- func (g *Geist) Metrics() map[string]entity.Metrics
- func (g *Geist) NotifyChannel() (entity.NotifyChan, error)
- func (g *Geist) Publish(ctx context.Context, streamId string, event []byte) (id string, err error)
- func (g *Geist) RegisterStream(ctx context.Context, specData []byte) (id string, err error)
- func (g *Geist) Run(ctx context.Context) (err error)
- func (g *Geist) Shutdown(ctx context.Context) (err error)
- func (g *Geist) ValidateStreamSpec(specData []byte) (specId string, err error)
- type HookConfig
- type OpsConfig
- type SpecRegistryConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrConfigNotInitialized = errors.New("geist.Config need to be created with NewConfig()") ErrGeistNotInitialized = errors.New("geist not initialized") ErrSpecAlreadyExists = errors.New("stream ID already exists with equal or higher version - increment version number to upgrade") ErrInvalidStreamSpec = errors.New("stream Specification is not valid") ErrInvalidStreamId = errors.New("invalid Stream ID") ErrProtectedStreamId = errors.New("spec format is valid but but stream ID is protected and cannot be used") ErrCodeInvalidSpecRegOp = errors.New("use RegisterStream() instead to register new streams") ErrInternalDataProcessing = errors.New("internal data processing error") ErrInvalidEntityId = errors.New("invalid source/sink ID") )
Error values returned by Geist API. Many of these errors will also contain additional details about the error. Error matching can still be done with 'if errors.Is(err, ErrInvalidStreamId)' etc. due to error wrapping.
Functions ¶
func EnrichEvent ¶ added in v0.3.1
EnrichEvent is a convenience function that could be used for event enrichment purposes inside a hook function as specified in geist.Config.Hooks. It's a wrapper on the sjson package. See doc at https://github.com/tidwall/sjson.
Types ¶
type AdminStreamConfig ¶
type AdminStreamConfig struct {
StreamSpec []byte
}
AdminStreamConfig specifies how the internal cross-pod admin event propagation should be set up. Providing a custom admin spec is advanced usage and should be done with care. It is only required to be filled in if diverting from default behavior, which is the following:
- If SpecRegistryConfig.StorageMode is set to "inmemory" (default), the admin stream is disabled (cross-pod sync not needed)
- If SpecRegistryConfig.StorageMode is set to "native" (firestore) or "custom", the admin stream will use GCP Pubsub for event propagation (pubsub set as its stream source type).
For complete customization (e.g. running in AWS or Azure), set StorageMode to "custom" and provide both a Stream and an Admin Spec, with appropriate sink connectors loaded.
type Config ¶
type Config struct { Registry SpecRegistryConfig AdminStream AdminStreamConfig Ops OpsConfig Hooks HookConfig // contains filtered or unexported fields }
Config needs to be created with NewConfig() and filled in with config as applicable for the intended setup, and provided in the call to geist.New(). All config fields are optional. See individual struct types for documentation.
func NewConfig ¶ added in v0.3.0
func NewConfig() *Config
NewConfig returns an initialized Config struct, required for geist.New(). With this config applicable Source/Sink extractors/loaders should be registered before calling geist.New().
func (*Config) RegisterExtractorType ¶ added in v0.3.0
func (c *Config) RegisterExtractorType(extractorFactory entity.ExtractorFactory) error
RegisterExtractorType is used to prepare config for Geist to make this particular Source/Extractor type available for stream specs to use. This can only be done after a geist.NewConfig() and prior to creating Geist with geist.New().
func (*Config) RegisterLoaderType ¶ added in v0.3.0
func (c *Config) RegisterLoaderType(loaderFactory entity.LoaderFactory) error
RegisterLoaderType is used to prepare config for Geist to make this particular Sink/Loader type available for stream specs to use. This can only be done after a geist.NewConfig() and prior to creating Geist with geist.New().
type Geist ¶
type Geist struct {
// contains filtered or unexported fields
}
func New ¶
New creates and configures Geist's internal services and all previously registered streams, based on the provided config, which needs to be initially created with NewConfig().
func (*Geist) Entities ¶ added in v0.3.0
Entities returns IDs of all registered Extractors/Loaders for each Source/Sink. The keys for the first map are:
"extractor" "loader"
Each of those keys holds the id/name of the source/sink type that have been registered
Example of output if marshalled to JSON
{"extractor":{"source1":true,"source2":true},"loader":{"sink1":true,"sink2":true}}
func (*Geist) GetStreamSpec ¶
GetStreamSpec returns the full stream spec for a specific stream ID
func (*Geist) GetStreamSpecs ¶
GetStreamSpecs returns all registered stream specs
func (*Geist) NotifyChannel ¶ added in v0.4.0
func (g *Geist) NotifyChannel() (entity.NotifyChan, error)
NotifyChannel returns the notification channel, from which it's possible to receive notify/log events. By setting geist.Config.Ops.Logging to false (default) and using the events from this channel in some external logging framework, it's possible to completely replace Geist's internal logging with an external one. The size of the channel buffer can be adjusted with geist.Config.Ops.NotifyChanSize at Geist creation time.
func (*Geist) Publish ¶
Publish sends the provided event to the source extractor of the stream as identified by streamId, if that extractor type supports this optional functionality. Currently known source types that supports this is the internal "geistapi" and the GCP source "pubsub".
The returned ID string (or resource ID) is dependent on the sink type, but is defined as the key to be used for key lookups of the event. It is created based on the Sink configuration in the Stream Spec for sink types supporting it, for example:
Firestore: ID is the generated Firestore entity name, as specified in the entityNameFromIds section of the sink spec. BigTable: ID is the generated row-key, as specified in the rowKey section of the sink spec.
func (*Geist) RegisterStream ¶
RegisterStream validates and persist the stream spec in the chosen registry implementation. If successful, the registered stream is started up immediately, and the generated ID of the stream is returned.
func (*Geist) Run ¶
Run starts up Geist's internal services and all previously registered streams (if Geist configured to use persistent registry), as prepared by New(). It is a blocking call until Geist is shut down, from a call to Shutdown or if its parent context is canceled. Geist must be up and running prior to usage of the other Geist API functions.
type HookConfig ¶ added in v0.3.0
type HookConfig struct {
PreTransformHookFunc entity.PreTransformHookFunc
}
HookConfig enables a Geist client to inject custom logic to the stream processing, such as enrichment, deduplication, and filtering (if existing spec transform options not suitable).
type OpsConfig ¶
type OpsConfig struct { // The maximum interval used by the stream executors during exponential backoff when // retrying operations that failed with errors set as retryable. MaxStreamRetryIntervalSec int // Size of the notification channel buffer NotifyChanSize int // If set to true native logging will be used (debug, info, warn, and error logs). // If set to false (default) no standard logging will be done, but the same type of // information will be provided on the notification channel, accessible with geist.NotifyChannel(). Log bool // The interval used for providing metric updates on number of events processed. // This might be deprecated due to log/notify/metric changes. EventLogInterval int }
OpsConfig provide options for observability and resilience.
type SpecRegistryConfig ¶
type SpecRegistryConfig struct { // StorageMode specifies how Geist should store stream specs to be run. // The follow values are available: // // "inmemory": (default) registered specs are stored in memory only // "native": specs are persisted in the sink specified in the native stream spec for the // spec registration stream (using Firestore as default sink, for historical reasons) // "custom": the client of Geist need to provide a Stream Spec for the Spec Registration // Stream in the RegSpec field, for example using an S3 sink connector instead StorageMode string // StreamSpec specifies the Stream Spec for the internal stream, handling Stream Spec registrations, // if StorageMode is set to "custom". StreamSpec []byte }
SpecRegistryConfig is only required to be filled in if persistence of specs is required. Normally only StorageMode field is needed for setting up intended behaviour, switching from in-mem storage to native persistence. Future updates might add more built-in "native" storage modes.
Directories ¶
Path | Synopsis |
---|---|
transform
Package transform is the native/default implementation of a transform provider.
|
Package transform is the native/default implementation of a transform provider. |
internal
|
|
pkg
|
|
notify
Package notify is used internally by Geist to send/log operational events.
|
Package notify is used internally by Geist to send/log operational events. |
test
|
|