geist

package module
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2023 License: MIT Imports: 9 Imported by: 0

README

Generic Event Ingestion and Stream Transloading (GEIST)

Go Report Card Go Reference Quality Gate Status Maintainability Rating Reliability Rating Security Rating

Geist provides cost-efficient high-performance capabilities to develop generic (or specific) services executing an arbitrary number of event streams between various sources and sinks, with a set of built-in single message transforms. It's an alternative to other more heavy-weight, constrained and/or costly products, although with a more slim feature scope.

Use cases include consuming events from Kafka or Pubsub and store them, or chosen parts of them, into BigTable, Firestore, BigQuery, or any other custom sink, with a schema as specified in a dynamically registered stream spec via Geist API.

It also provides Geist API as a Source with which it is possible for external services to publish events to a registered Geist stream, which then transforms and stores them into a chosen Sink.

Geist can concurrently execute an arbitrary amount of different registered stream specs, in the same instance/pod. E.g. one stream with Kafka → BigTable and another with Pubsub → BigQuery, etc.

Although single event transform capabilities are provided, Geist is not meant to support complex stream processing, including stream joins and aggregations. That is better handled in products such as Spark, Kafka Streams, Dataflow/Beam, Flink, etc.

Its main purpose is to efficiently load streaming data, with optional transformations and enrichment, into a sink, where the analytical processing is done with other more appropriate products.

The Geist Go package is completely generic, only comprising the core engine and its domain model. Geist Source and Sink connectors are found in separate repos. Custom connectors can also be provided dynamically by the service using Geist, registering them via Geist API for immediate use in stream specs.

Quick Start

Prerequisites: Go 1.18

Install with:

go get github.com/zpiroux/geist

Example of simplified Geist test usage (error handling omitted) with an interactive stream where the sink simply logs info on transformed event data (using the built-in void debug/test sink).

func main() {
	ctx := context.Background()
	g, err := geist.New(ctx, geist.NewConfig())

	go func() {
		streamId, err := g.RegisterStream(ctx, spec)
		resId, err := g.Publish(ctx, streamId, []byte("Hi there!"))
		g.Shutdown(ctx)
	}()

	g.Run(ctx)
}

var spec = []byte(`
    {
        "namespace": "my",
        "streamIdSuffix": "tiny-stream",
        "description": "Tiny test stream logging event data to console.",
        "version": 1,
        "source": {
            "type": "geistapi"
        },
        "transform": {
            "extractFields": [
                {
                    "fields": [
                        {
                            "id": "rawEvent"
                        }
                    ]
                }
            ]
        },
        "sink": {
            "type": "void",
            "config": {
                "properties": [
                    {
                        "key": "logEventData",
                        "value": "true"
                    }
                ]
            }
        }
    }
`)

Using Connectors

The Quick Start example is a minimal integration. To do something more useful, external Source/Sink connectors need to be registered. As an example, say we want to deploy a stream consuming events from Kafka and persist them in BigTable. For this we need to register a Kafka Source Extractor and a BigTable Sink Loader.

Install with:

go get github.com/zpiroux/geist-connector-kafka
go get github.com/zpiroux/geist-connector-gcp

Register connectors prior to starting up Geist with (error handling omitted):

import (
	"github.com/zpiroux/geist"
	"github.com/zpiroux/geist-connector-gcp/gbigtable"
	"github.com/zpiroux/geist-connector-kafka/gkafka"
)

...
geistConfig := geist.NewConfig()

kafkaConfig := &gkafka.Config{ /* add config */ }
btConfig := gbigtable.Config{ /* add config */ }

ef := gkafka.NewExtractorFactory(kafkaConfig)
lf, err := gbigtable.NewLoaderFactory(ctx, btConfig)

err = geistConfig.RegisterExtractorType(ef)
err = geistConfig.RegisterLoaderType(lf)

g, err := geist.New(ctx, geistConfig)
...

The service using Geist could in this way also register its own custom connectors, provided they adhere to the connector interfaces, etc.

For a complete working example, see the Emitter Stream.

Validated source connectors
Validated sink connectors

Enrichment and custom stream logic

The native transformation entity provides a set of common transformations. But to provide complete flexibility, adding capabilities for the Geist user to add custom logic such as enrichment of events, deduplication, complex filtering (if the native transform options are insufficient), etc., a client-managed hook function can be set in geist.Config prior to calling geist.New().

The function assigned to geist.Config.Hooks.PreTransformHookFunc will be called for each event retrieved by the Extractor. A pointer to the raw event is provided to the func, which could modify its contents before Geist continues with downstream transformations and Sink processing.

The following example shows a hook func injecting a new field "myNewField" in the event JSON:

c := geist.NewConfig()
c.Hooks.PreTransformHookFunc = MyEnricher
g, err := geist.New(ctx, c)
...
func MyEnricher(ctx context.Context, event *[]byte) entity.HookAction {
    *event, err = geist.EnrichEvent(*event, "myNewField", "coolValue")
    return entity.HookActionProceed
}

Continued processing of each event can be controlled via the returned action value:

HookActionProceed          // continue processing of this event
HookActionSkip             // skip processing of this event and take next
HookActionUnretryableError // let Geist handle this event as an unretryable error
HookActionShutdown         // shut down this stream instance

Stream Categories

The quick start example showed one category of streams (interactive) where the Geist client can publish events explicitly.

The other category is when a Geist client sets up an autonomous stream between a source and sink.

Autonomous streams

Source set to kafka or pubsub.

The host service sets up a fully automated stream between a source and sink, with geist.RegisterStream(), e.g. consuming from Kafka and inserting into BigTable, and only manage start and stop of the stream.

Interactive streams

Source set to geistapi.

The host service has its own API or logic on top and explicitly publish events to the Geist stream with geist.Publish(), for further transformation and insertion into the chosen sink, as specified in the stream spec when registering the stream with geist.RegisterStream(). One example could be a thin REST API service using Geist to publish events to Kafka or any other sink.

Service modes of operation

There are two main types of service use-cases or deployment setup types for the hosting service using Geist.

Service with specific use-case with self-contained stream specs

This is the simplest mode and useful for small dedicated services.

Example use case: A service receiving (or producing) certain type of events that needs to be published to Kafka and/or stored in BigTable (etc.).

In this mode Geist merely works as a generic adaptor for the sinks. The stream specs to be used could be built-in in the hosting service and registered during startup.

Generic stream platform service with stream spec persistence

This is the mode for which Geist was developed in the first place, and is meant to run as a Kubernetes deployment.

In this mode Geist functions as a generic run-time and stream management service, with an external wrapper API, e.g. REST API, provided by the hosting service, with which other services dynamically could register and deploy their own streams without any downtime or re-deployment of Geist or its hosting service.

Registered specs are persisted by Geist with an internally defined (but customizable) Stream Spec. When a Geist host pod is started, either from a new deployment or pod scaling, all registered specs are fetched from the native sink storage and booted up. The default native spec is found in regspec.go.

Any cross-pod synchronization, e.g. notification of newly updated stream specs, is done with an internal admin stream. It is managed and customizable in similar fashion as the Reg Spec and found in adminspec.go.

Stream Spec Format

A stream spec has the following required main fields/parts (additional fields are described further below):

{
  "namespace": "<service or feature acting as stream owner>",
  "streamIdSuffix": "<stream subtype>",
  "version": <stream spec version, of int type>,
  "description": "<description of the spec>",
  "source": {
    <source spec>
  },
  "transform": {
    <transform spec>
  },
  "sink": {
    <sink spec>
  }
}

The ID of the new stream returned from geist.RegisterStream() in a successful registration is constructed as:

<"namespace">-<"streamIdSuffix">

For full spec structure with all optional and additional fields, see spec.go.

During spec registration the JSON spec is converted to an entity.Spec struct for internal usage. If needed, this can also be done externally by using entity.NewSpec(specBytes), which ensures schema validation and proper default values.

Operational Config

For a more detailed operational control and tuning of each stream the optional "ops" config can be used.

Full default config used, if not specified in the stream spec, is the following:

{
  ...
  "ops": {
    "streamsPerPod": 1,
    "microBatch": false,
    "microBatchSize": 500,
    "microBatchBytes": 5000000,
    "microBatchTimeoutMs": 15000,
    "maxEventProcessingRetries": 5,
    "maxStreamRetryBackoffIntervalSec": 300,
    "handlingOfUnretryableEvents": "default",
    "logEventData": false
  },
  ...
}

See the Ops struct in spec.go for full documentation.

For scenarios where the streams need to be deployed to multiple environments with different surrounding context, an additional environment specific config option is available using the opsPerEnv field. With this field, the above Ops config can look different for each named environment.

Say we have two different production environments, prod-x and prod-y, we can then specify different config per environment using the following format (example):

{
  ...
  "opsPerEnv": {
    "prod-x": {
      "streamsPerPod": 8
    },
    "prod-y": {
      "streamsPerPod": 16
    }
  },
  ...
}

The environment string to match with the one provided in the spec is set in geist.Config.Registry.Env when creating Geist with geist.New(ctx, config).

If opsPerEnv is provided and a match is found for the deployed environment type, the specified opsPerEnv config will be set as the main "ops" specification to use in this specific deployment.

Example spec

The simple spec below exemplifies an autonomous stream using Kafka as source and BigTable as sink, inserting raw events in a table with a single column named event, row-key as <fooId>#<barType>, and a TTL of 31 days.

{
    "namespace": "my",
    "streamIdSuffix": "kafkabigtable-stream",
    "description": "Example of a simple stream spec Kafka --> BigTable",
    "version": 1,
    "ops": {
        "streamsPerPod": 2
    },
    "source": {
        "type": "kafka",
        "config": {
            "provider": "confluent",
            "topics": [
                {
                    "env": "all",
                    "names": [
                        "foo.events"
                    ]
                }
            ],
            "properties": [
                {
                    "key": "group.id",
                    "value": "my-kafkabigtable-stream"
                }
            ]
        }
    },
    "transform": {
        "extractFields": [
            {
                "fields": [
                    {
                        "id": "someField",
                        "jsonPath": "data.fooId"
                    },
                    {
                        "id": "someOtherField",
                        "jsonPath": "data.barType"
                    },
                    {
                        "id": "rawEvent"
                    }
                ]
            }
        ]
    },
    "sink": {
        "type": "bigtable",
        "config": {
            "tables": [
                {
                    "name": "foo_events",
                    "rowKey": {
                        "keys": [
                            "someField",
                            "someOtherField"
                        ],
                        "delimiter": "#"
                    },
                    "columnFamilies": [
                        {
                            "name": "d",
                            "garbageCollectionPolicy": {
                                "type": "maxAge",
                                "value": 744
                            },
                            "columnQualifiers": [
                                {
                                    "id": "rawEvent",
                                    "name": "event"
                                }
                            ]
                        }
                    ]
                }
            ]
        }
    }
}

More example specs, with more sink, source and transform types, can be found in test/specs.

Updating or disabling a stream

A running stream can be updated dynamically with a new call to geist.RegisterStream() provided the spec's mandatory version number field is incremented. The provided modified spec will fully replace the existing one. This will seamlessly cause an internal restart of the stream's executors using the updated spec.

To disable a spec it needs to have the root field "disabled" added and set to true. This will shut down all the stream's executors directly. To enable the stream again, do a new update of spec (increase version number) and set "disabled" to false.

Getting information about/from streams

The following data can be retrieved:

  • The full list of specs for all registered streams.
  • The full list of stream IDs together with status on enabled/disabled.
  • The spec for a single stream from a stream ID.
  • All the events stored in a stream. Note that this is only meant for small datasets (supported by Firestore and BigTable connectors).
  • Single event key-value lookup, retrieving the event stored in the sink with a given key/ID, provided it was stored in key/value mode (supported by Firestore and BigTable connectors).

Although functional, data retrieval is currently only used internally and not yet exposed by a Geist wrapper function.

Performance Characteristics and Tuning

Geist has several stream config parameters exposed via the Stream Spec, to enable high-performance operation and stream throughput.

The main one governing concurrency is Ops.StreamsPerPod. This defines the amount of internal stream executors that will execute the full stream concurrently, for that particular stream.

As an example, say we have Kafka as a source, and a topic with 32 partitions. To maximize throughput (and/or minimize latency) the total number of stream executors should be 32 as well (or in practice less, since it's usually enough having each stream instance consuming from several partitions each). If the service using Geist is meant to run with two pods, the Ops.StreamsPerPod should be set to 16. There are no significant disadvantages with having the total amount of stream executors higher than the number of partitions, so having pods auto-scaled (creating more than two pods) will not be an issue.

Since each stream executor runs as a light-weight Goroutine, thousands of stream executors can easily be handled by a single pod, obviously provided pod resources are meant for the particular type of stream processing logic.

But it's usually a good approach to start with a low number of streams per pod, and increase based on test results.

Further parameters can be found in the stream spec definition (spec.go), with general ones in the Ops struct and Source/Sink specific ones in respective SourceConfig and SinkConfig struct.

Observability

Logging and Notification Events

Geist sends important and informational log events to a notification channel accessible from geist.NotifyChannel(). The size of the chan buffer can be specified during Geist creation with Config.Ops.NotifyChanSize.

If usage of the notification channel is not needed, Geist can also be configured to perform standard logging of those events on its native format. This can be enabled by setting Config.Ops.Log to true.

Metrics

Real-time event processing metrics can be retrieved with geist.Metrics(). The following metrics are provided per stream ID:

type Metrics struct {

	// Total number of events sent to Executor's ProcessEvent() by the Extractor,
	// regardless of the outcome of downstream processing.
	EventsProcessed int64

	// Total time spent by Executor processing all extracted events
	EventProcessingTimeMicros int64

	// Total number of event batches sent from Extractor to Sink loader via Executor
	Microbatches int64

	// Total amount of event data processed (as sent from Extractor)
	BytesProcessed int64

	// Total number of events successfully processed by the sink.
	EventsStoredInSink int64

	// Total time spent ingesting transformed events in the sink successfully
	SinkProcessingTimeMicros int64

	// Total number of successfull calls to the Sink's StreamLoad method
	SinkOperations int64

	// Total amount of data successfully ingested
	BytesIngested int64
}

Limitations and improvement areas

Although Geist has been run in production with heavy load, no data-loss, and zero downtime for ~two years, it makes no guarantees that all combinations of stream spec options will work fully in all cases.

The following types of streams have been run extensively and concurrently with high throughput:

  • Kafka → BigTable
  • Kafka → BigQuery
  • Kafka → Various custom-built connectors

Additionally, Pubsub as source and Firestore as sink have been used extensively but with limited traffic.

Transforms

The current native transform package provides features as required by a set of use cases. In contrast to source/sink connectors it's not yet possible to add custom ones. However, any custom transformation logic can still be injected via the PreTransformHook functionality.

Chaining of transforms could be improved.

Although functional, regex transform options have some remaining todos.

Geist is not meant to support complex stream processing, including stream joins and aggregations. That is better handled in products such as Spark, Kafka Streams, Dataflow/Beam, Flink, etc.

Event schema

Only JSON currently supported.

Spec schema

For historical reasons some fields in the Spec schema are only used for specific source/sink types, even though the customConfig enables any arbitrary config to be used.

Contact

info @ zpiroux . com

License

Geist source code is available under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConfigNotInitialized   = errors.New("geist.Config need to be created with NewConfig()")
	ErrGeistNotInitialized    = errors.New("geist not initialized")
	ErrSpecAlreadyExists      = errors.New("stream ID already exists with equal or higher version - increment version number to upgrade")
	ErrInvalidStreamSpec      = errors.New("stream Specification is not valid")
	ErrInvalidStreamId        = errors.New("invalid Stream ID")
	ErrProtectedStreamId      = errors.New("spec format is valid but but stream ID is protected and cannot be used")
	ErrCodeInvalidSpecRegOp   = errors.New("use RegisterStream() instead to register new streams")
	ErrInternalDataProcessing = errors.New("internal data processing error")
	ErrInvalidEntityId        = errors.New("invalid source/sink ID")
)

Error values returned by Geist API. Many of these errors will also contain additional details about the error. Error matching can still be done with 'if errors.Is(err, ErrInvalidStreamId)' etc. due to error wrapping.

Functions

func EnrichEvent added in v0.3.1

func EnrichEvent(event []byte, path string, value any) ([]byte, error)

EnrichEvent is a convenience function that could be used for event enrichment purposes inside a hook function as specified in geist.Config.Hooks. It's a wrapper on the sjson package. See doc at https://github.com/tidwall/sjson.

Types

type AdminStreamConfig

type AdminStreamConfig struct {
	StreamSpec []byte
}

AdminStreamConfig specifies how the internal cross-pod admin event propagation should be set up. Providing a custom admin spec is advanced usage and should be done with care. It is only required to be filled in if diverting from default behavior, which is the following:

  • If SpecRegistryConfig.StorageMode is set to "inmemory" (default), the admin stream is disabled (cross-pod sync not needed)

  • If SpecRegistryConfig.StorageMode is set to "native" (firestore) or "custom", the admin stream will use GCP Pubsub for event propagation (pubsub set as its stream source type).

For complete customization (e.g. running in AWS or Azure), set StorageMode to "custom" and provide both a Stream and an Admin Spec, with appropriate sink connectors loaded.

type Config

type Config struct {
	Registry    SpecRegistryConfig
	AdminStream AdminStreamConfig
	Ops         OpsConfig
	Hooks       HookConfig
	// contains filtered or unexported fields
}

Config needs to be created with NewConfig() and filled in with config as applicable for the intended setup, and provided in the call to geist.New(). All config fields are optional. See individual struct types for documentation.

func NewConfig added in v0.3.0

func NewConfig() *Config

NewConfig returns an initialized Config struct, required for geist.New(). With this config applicable Source/Sink extractors/loaders should be registered before calling geist.New().

func (*Config) RegisterExtractorType added in v0.3.0

func (c *Config) RegisterExtractorType(extractorFactory entity.ExtractorFactory) error

RegisterExtractorType is used to prepare config for Geist to make this particular Source/Extractor type available for stream specs to use. This can only be done after a geist.NewConfig() and prior to creating Geist with geist.New().

func (*Config) RegisterLoaderType added in v0.3.0

func (c *Config) RegisterLoaderType(loaderFactory entity.LoaderFactory) error

RegisterLoaderType is used to prepare config for Geist to make this particular Sink/Loader type available for stream specs to use. This can only be done after a geist.NewConfig() and prior to creating Geist with geist.New().

type Geist

type Geist struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, config *Config) (g *Geist, err error)

New creates and configures Geist's internal services and all previously registered streams, based on the provided config, which needs to be initially created with NewConfig().

func (*Geist) Entities added in v0.3.0

func (g *Geist) Entities() map[string]map[string]bool

Entities returns IDs of all registered Extractors/Loaders for each Source/Sink. The keys for the first map are:

"extractor"
"loader"

Each of those keys holds the id/name of the source/sink type that have been registered

Example of output if marshalled to JSON

{"extractor":{"source1":true,"source2":true},"loader":{"sink1":true,"sink2":true}}

func (*Geist) GetStreamSpec

func (g *Geist) GetStreamSpec(ctx context.Context, streamId string) (specData []byte, err error)

GetStreamSpec returns the full stream spec for a specific stream ID

func (*Geist) GetStreamSpecs

func (g *Geist) GetStreamSpecs(ctx context.Context) (specs map[string][]byte, err error)

GetStreamSpecs returns all registered stream specs

func (*Geist) Metrics added in v0.4.0

func (g *Geist) Metrics() map[string]entity.Metrics

Metrics provides real-time metrics per stream ID.

func (*Geist) NotifyChannel added in v0.4.0

func (g *Geist) NotifyChannel() (entity.NotifyChan, error)

NotifyChannel returns the notification channel, from which it's possible to receive notify/log events. By setting geist.Config.Ops.Logging to false (default) and using the events from this channel in some external logging framework, it's possible to completely replace Geist's internal logging with an external one. The size of the channel buffer can be adjusted with geist.Config.Ops.NotifyChanSize at Geist creation time.

func (*Geist) Publish

func (g *Geist) Publish(ctx context.Context, streamId string, event []byte) (id string, err error)

Publish sends the provided event to the source extractor of the stream as identified by streamId, if that extractor type supports this optional functionality. Currently known source types that supports this is the internal "geistapi" and the GCP source "pubsub".

The returned ID string (or resource ID) is dependent on the sink type, but is defined as the key to be used for key lookups of the event. It is created based on the Sink configuration in the Stream Spec for sink types supporting it, for example:

		Firestore: ID is the generated Firestore entity name, as specified in the entityNameFromIds
                   section of the sink spec.
		BigTable: ID is the generated row-key, as specified in the rowKey section of the sink spec.

func (*Geist) RegisterStream

func (g *Geist) RegisterStream(ctx context.Context, specData []byte) (id string, err error)

RegisterStream validates and persist the stream spec in the chosen registry implementation. If successful, the registered stream is started up immediately, and the generated ID of the stream is returned.

func (*Geist) Run

func (g *Geist) Run(ctx context.Context) (err error)

Run starts up Geist's internal services and all previously registered streams (if Geist configured to use persistent registry), as prepared by New(). It is a blocking call until Geist is shut down, from a call to Shutdown or if its parent context is canceled. Geist must be up and running prior to usage of the other Geist API functions.

func (*Geist) Shutdown

func (g *Geist) Shutdown(ctx context.Context) (err error)

Shutdown should be called when the app is terminating

func (*Geist) ValidateStreamSpec

func (g *Geist) ValidateStreamSpec(specData []byte) (specId string, err error)

ValidateStreamSpec returns an error if the provided stream spec is invalid.

type HookConfig added in v0.3.0

type HookConfig struct {
	PreTransformHookFunc entity.PreTransformHookFunc
}

HookConfig enables a Geist client to inject custom logic to the stream processing, such as enrichment, deduplication, and filtering (if existing spec transform options not suitable).

type OpsConfig

type OpsConfig struct {

	// The maximum interval used by the stream executors during exponential backoff when
	// retrying operations that failed with errors set as retryable.
	MaxStreamRetryIntervalSec int

	// Size of the notification channel buffer
	NotifyChanSize int

	// If set to true native logging will be used (debug, info, warn, and error logs).
	// If set to false (default) no standard logging will be done, but the same type of
	// information will be provided on the notification channel, accessible with geist.NotifyChannel().
	Log bool

	// The interval used for providing metric updates on number of events processed.
	// This might be deprecated due to log/notify/metric changes.
	EventLogInterval int
}

OpsConfig provide options for observability and resilience.

type SpecRegistryConfig

type SpecRegistryConfig struct {

	// StorageMode specifies how Geist should store stream specs to be run.
	// The follow values are available:
	//
	// 	"inmemory": (default) registered specs are stored in memory only
	// 	"native":   specs are persisted in the sink specified in the native stream spec for the
	//              spec registration stream (using Firestore as default sink, for historical reasons)
	// 	"custom":   the client of Geist need to provide a Stream Spec for the Spec Registration
	//	            Stream in the RegSpec field, for example using an S3 sink connector instead
	StorageMode string

	// StreamSpec specifies the Stream Spec for the internal stream, handling Stream Spec registrations,
	// if StorageMode is set to "custom".
	StreamSpec []byte

	// Env specifies which environment string to match against stream specs using the OpsPerEnv
	// part of the spec. If empty only the common Spec.Ops will be regarded.
	Env string
}

SpecRegistryConfig is only required to be filled in if persistence of specs is required. Normally only StorageMode field is needed for setting up intended behaviour, switching from in-mem storage to native persistence. Future updates might add more built-in "native" storage modes.

Directories

Path Synopsis
transform
Package transform is the native/default implementation of a transform provider.
Package transform is the native/default implementation of a transform provider.
internal
pkg
notify
Package notify is used internally by Geist to send/log operational events.
Package notify is used internally by Geist to send/log operational events.
test

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL