model

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2022 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventNameKey    = "eventName"
	EventRawDataKey = "rawEvent"
)
View Source
const (
	DefaultStreamsPerPod                    = 1
	DefaultMicroBatchSize                   = 500
	DefaultMicroBatchBytes                  = 5000000
	DefaultMicroBatchTimeoutMs              = 15000
	DefaultMaxEventProcessingRetries        = 5
	DefaultMaxStreamRetryBackoffIntervalSec = 300
)

General Ops defaults

View Source
const (
	HoueDefault = "default"
	HoueDiscard = "discard"
	HoueDlq     = "dlq"
	HoueFail    = "fail"
)

Available options for Ops.HandlingOfUnretryableEvents

View Source
const (
	TransformedKeyKey   = "key"
	TransformedValueKey = "value"
)
View Source
const (
	EventStreamRegistryModified = "stream_registry_modified"
)

Admin event and operation types.

View Source
const GeistIngestionTime = "@GeistIngestionTime"

Data processing and ingestion options

View Source
const (
	OperationStreamRegistration = "streamRegistration"
)

Variables

View Source
var AdminEventSpec = []byte(`
{
   "namespace": "geist",
   "streamIdSuffix": "adminevents",
   "description": "A built-in GEIST-internal admin notification stream. For example, when a new or updated spec is written to Registry db, Registry sends an event to pubsub, which are listened to by all GEIST deployments. This in turn makes Registries, not yet synchronized, load the new specs from db.",
   "version": 1,
   "source": {
      "type": "pubsub",
      "config": {
         "topics": [
            {
               "env": "all",
               "names": [
                  "geist-admin-events"
               ]
            }
         ],
         "subscription": {
            "type": "unique"
         }
      }
   },
   "transform": {
      "extractFields": [
         {
            "fields": [
               {
                  "id": "eventName",
                  "jsonPath": "name"
               },
               {
                  "id": "rawEvent",
                  "type": "string"
               }
            ]
         }
      ]
   },
   "sink": {
      "type": "admin"
   }
}`)
View Source
var AdminEventSpecInMem = []byte(`
{
   "namespace": "geist",
   "streamIdSuffix": "adminevents-inmem",
   "description": "When running Registry in-mem, there's no point in having cross-pod admin events, so this spec is a bare-bones naked one'",
   "version": 1,
   "source": {
      "type": "geistapi"
   },
   "transform": {
      "extractFields": [
         {
            "fields": [
               {
                  "id": "eventName",
                  "jsonPath": "name"
               },
               {
                  "id": "rawEvent",
                  "type": "string"
               }
            ]
         }
      ]
   },
   "sink": {
      "type": "admin"
   }
}`)
View Source
var ErrEntityShutdownRequested = errors.New("entity shutdown requested")

An entity can request to be shut down. This error code should be returned and it's up to the Executor to decide if entire stream should be shutdown or any other action to be taken.

View Source
var SpecRegistrationSpec = []byte(`
{
   "namespace": "geist",
   "streamIdSuffix": "specs",
   "description": "A stream ingestion of GEIST Specs, with source set as GEIST REST API, no Transformation, and into a Firestore Sink (GEIST Registry).",
   "version": 1,
   "source": {
      "type": "geistapi"
   },
   "transform": {
      "extractFields": [
         {
            "fields": [
               {
                  "id": "namespace",
                  "jsonPath": "namespace"
               },
               {
                  "id": "idSuffix",
                  "jsonPath": "streamIdSuffix"
               },
               {
                  "id": "description",
                  "jsonPath": "description"
               },
               {
                  "id": "version",
                  "jsonPath": "version",
                  "type": "integer"
               },
               {
                  "id": "disabled",
                  "jsonPath": "disabled",
                  "type": "bool"
               },
               {
                  "id": "rawEvent",
                  "type": "string"
               }
            ]
         }
      ]
   },
   "sink": {
      "type": "firestore",
      "config": {
         "kinds": [
            {
               "name": "EtlSpec",
               "entityNameFromIds": {
                  "ids": [
                     "namespace",
                     "idSuffix"
                  ],
                  "delimiter": "-"
               },
               "properties": [
                  {
                     "id": "version",
                     "name": "version",
                     "index": true
                  },
                  {
                     "id": "description",
                     "name": "description",
                     "index": false
                  },
                  {
                     "id": "disabled",
                     "name": "disabled",
                     "index": true
                  },
                  {
                     "id": "rawEvent",
                     "name": "specData",
                     "index": false
                  }
               ]
            }
         ]
      }
   }
}`)

SpecRegistrationSpec is the internal, built-in GEIST spec required for handling of GEIST specs themselves. It defines how specs can be received from the chosen Source, and where and how to store them. This spec cannot store itself into that repository for bootstrap reasons. We can have a number of these supported spec registration flows here, and it's up to the GEIST app service init to choose the appropriate one. We can easily change the spec reg flow to have GEIST spec deployments be done by sending specs to pubsub instead. Just switch spec.source.type to "pubsub" and add pubsub topic config.

View Source
var SpecRegistrationSpecInMem = []byte(`
{
   "namespace": "geist",
   "streamIdSuffix": "specs",
   "description": "A stream ingestion of GEIST Specs, with source set as GEIST API, no Transformation, and no sink (specs stored in-memory).",
   "version": 1,
   "source": {
      "type": "geistapi"
   },
   "transform": {
      "extractFields": [
         {
            "fields": [
               {
                  "id": "rawEvent",
                  "type": "string"
               }
            ]
         }
      ]
   },
   "sink": {
      "type": "void",
      "config": {
         "properties": [
            {
               "key": "mode",
               "value": "inMemRegistrySink"
            }
         ]
      }
   }
}`)

SpecRegistrationSpecInMem is the in-mem alternative to SpecRegistrationSpec (see that one for more detailed doc)

Functions

This section is empty.

Types

type AddFieldsToJson

type AddFieldsToJson struct {
	JsonId      string
	FieldsToAdd []string
}

type AdminEvent

type AdminEvent struct {
	Name         string           `json:"name"`
	DateOccurred time.Time        `json:"dateOccurred"`
	Version      string           `json:"version"`
	EventId      string           `json:"eventId"`
	Location     Location         `json:"location"`
	Data         []AdminEventData `json:"data"`
}

func NewAdminEvent

func NewAdminEvent(name string, operation string, streamId string) AdminEvent

type AdminEventData

type AdminEventData struct {
	Operation string `json:"operation,omitempty"`
	StreamId  string `json:"streamId,omitempty"`
}

type ArrayItems

type ArrayItems struct {
	JsonPathToArray  string           `json:"jsonPathToArray"`
	IdFromItemFields IdFromItemFields `json:"idFromItemFields"`
}

type ChanEvent

type ChanEvent struct {
	Event         any // TODO: change to []byte
	ResultChannel chan ResultChanEvent
}

type Column

type Column struct {
	// Name of the column as specified at spec registration time.
	// One of Name or NameFromId needs to be present in the column spec.
	Name string `json:"name"`

	// If NameFromId is non-nil columns will be generated dynamically based on transformation output.
	// The name of the column will be set to the value in the Transformed map, with the key as found in NameFromId.
	// Note that the field fetched from the event, to be the column name, need to be of string type.
	NameFromId *NameFromId `json:"nameFromId,omitempty"`

	// Mode uses the definitions as set by BigQuery with "NULLABLE", "REQUIRED" or "REPEATED"
	Mode string `json:"mode"`

	// Type uses the BigQuery Standard SQL types.
	// The type here needs to match the one used in the Transform extract field spec.
	// For date/time/timestamp types the type used in the Transform extract field spec needs to be set to
	// "isoTimestamp" or "unixTimestamp".
	Type string `json:"type"`

	Description string   `json:"description"`
	Fields      []Column `json:"fields"` // For nested columns

	// ValueFromId is not part of schema definition per se, but specifies what value from the incoming
	// transformed data that should be inserted here.
	// A special value can be set to have a column with GEIST ingestion time, which could be used together
	// with TimePartitioning config, as an alternative to the also available default BQ insert partitioning.
	// To enable this, the field should be set to "@geistIngestionTime", with column type set to "TIMESTAMP"
	// and mode set to "NULLABLE".
	ValueFromId string `json:"valueFromId"`
}

type ColumnFamily

type ColumnFamily struct {
	Name                    string                   `json:"name"`
	GarbageCollectionPolicy *GarbageCollectionPolicy `json:"garbageCollectionPolicy"`
	ColumnQualifiers        []ColumnQualifier        `json:"columnQualifiers"`
}

type ColumnQualifier

type ColumnQualifier struct {
	Id         string      `json:"id"`
	Name       string      `json:"name,omitempty"`
	NameFromId *NameFromId `json:"nameFromId,omitempty"`
}

The Id field can be used directly in the Transformed map to fetch the value to be inserted The Name field is the actual CQ name to be used in the table. Either Name or NameFromId must be present, not both.

type DatasetCreation

type DatasetCreation struct {
	Description string `json:"description"`

	// Geo location of dataset.
	// Valid values are:
	// EU
	// europe
	// US
	// plus all regional ones as described here: https://cloud.google.com/bigquery/docs/locations
	// If omitted or empty the default location will be set to EU.
	Location string `json:"location"`
}

DatasetCreation config contains table creation details. It is currently only used by BigQuery sinks.

type EntityProperty

type EntityProperty struct {
	Name string `json:"name"`

	// Id is the key/field ID in the Transformed output map, which contains the actual value
	// for this property. The value type is the same as the output from the Transform.
	Id string `json:"id"`

	// For most properties this should be set to true, for improved query performance, but for big event
	// fields that might exceed 1500 bytes, this should be set to false, since that is a built-in
	// Firestore limit.
	Index bool `json:"index"`
}

type EntityType

type EntityType string

Available Stream ETL Entity Types (sources, sinks or both)

const (
	EntityInvalid   EntityType = "invalid"
	EntityVoid      EntityType = "void"
	EntityAdmin     EntityType = "admin"
	EntityGeistApi  EntityType = "geistapi"
	EntityKafka     EntityType = "kafka"
	EntityPubsub    EntityType = "pubsub"
	EntityFirestore EntityType = "firestore"
	EntityBigTable  EntityType = "bigtable"
	EntityBigQuery  EntityType = "bigquery"
)

type Environment

type Environment string

Some Stream ETL Entities need different configurations based on environements. This is not possible to set in the generic GEIST build config since ETL entities are configured in externally provided ETL Stream Specs. The environment concept is therefore required to be known to the entity and to the stream spec.

const (
	EnvironmentAll   Environment = "all"
	EnvironmentDev   Environment = "dev"
	EnvironmentStage Environment = "stage"
	EnvironmentProd  Environment = "prod"
)

type Event

type Event struct {
	Data []byte
	Ts   time.Time
	Key  []byte
}

func (Event) String

func (e Event) String() string

type EventChannel

type EventChannel chan ChanEvent

type EventProcessingResult

type EventProcessingResult struct {
	Status     ExecutorStatus
	ResourceId string
	Error      error
	Retryable  bool
}

type ExcludeEventsWith

type ExcludeEventsWith struct {
	Key    string   `json:"key"`
	Values []string `json:"values"`
}

ExcludeEventsWith specifies if certain events should be skipped directly, without further processing. If the event field as specified by the Key field matches any of the values in the Values array the event will be excluded. The Key string must be on a JSON path syntax according to github.com/tidwall/gjson (see below). The value field is currently limited to string values.

type ExecutorStatus

type ExecutorStatus int
const (
	ExecutorStatusInvalid ExecutorStatus = iota
	ExecutorStatusSuccessful
	ExecutorStatusError
	ExecutorStatusRetriesExhausted
	ExecutorStatusShutdown
)

type ExtractFields

type ExtractFields struct {
	// ForEventsWith is used to filter which incoming event the fields should be extracted from
	// Currently only AND type filter is supported if supplying multiple key-value pairs.
	// If ForEventsWith is empty or omitted, fields will be taken from all events.
	ForEventsWith []ForEventsWith `json:"forEventsWith,omitempty"`

	// The Fields a array contains the definitions of which fields to extract for the filtered-out event
	Fields []Field `json:"fields,omitempty"`
}

The ExtractFields transformation type creates root level ID fields, with values retrieved from a json path expression from the input event

type ExtractItemsFromArray

type ExtractItemsFromArray struct {
	Id            string          `json:"id"`
	ForEventsWith []ForEventsWith `json:"forEventsWith,omitempty"`
	Items         ArrayItems      `json:"items"`
}

ExtractItemsFromArray transformation returns all items in an arbitrary array inside the event json with the ID/key of each item according to the required IdFromItemFields spec. If the resulting ID/key of each item is an empty string the item will be omitted from the output map. The items will be stored inside a map in the transformed output map. It's key is specified by the "Id" field.

type ExtractorQuery

type ExtractorQuery struct {
	Type         QueryType
	Key          string
	CompositeKey []KeyValueFilter
}

type Field

type Field struct {
	Id string `json:"id"`

	// JsonPath defines which field in the JSON that should be extracted. It uses github.com/tidwall/gjson
	// syntax, such as "myCoolField" if we want to extract that field from json { "myCoolField": "isHere" }
	//
	// The full raw JSON event is also regarded as a 'field' and to extract that the JsonPath string should
	// be empty or omitted in the spec.
	JsonPath string `json:"jsonPath"`

	// - For normal fields, Type can be "string", "integer", "number", "boolean" or "float".
	// If omitted in the spec, string will be used.
	//
	// - For raw event fields the default type is []byte, unless Type is explicitly set to "string".
	// For performance critical streams, type should be omitted (avoiding conversions), especially when
	// having a stream with BigTable sink, which stores the data as byte anyway.
	//
	// - If a field is an iso timestamp string (e.g. "2019-11-30T14:57:23.389Z") the type
	// "isoTimestamp" can be used, to have a Go time.Time object created as the value for this field key.
	//
	// - If a field is a unix timestamp (number or str) (e.g. 1571831226950 or "1571831226950") the type
	// "unixTimestamp" can be used to have Go time.Time object created as the value for this field.
	//
	// - If a field is a User Agent string (e.g. "Mozilla%2F5.0%20(Macintosh%3B%20Intel%2...") the type
	// "userAgent" can be used to have parsed JSON output as string, with separate fields for each part of UA.
	Type string `json:"type,omitempty"`
}

type ForEventsWith

type ForEventsWith struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

The Key string must be on a JSON path syntax according to github.com/tidwall/gjson (see below). The value field is currently limited to string values.

type GarbageCollectionPolicy

type GarbageCollectionPolicy struct {
	Type  string `json:"type"`
	Value int    `json:"value"`
}

TODO: Add support for Intersection and Union policies The following types are supported: - MaxVersions: where Value takes an integer of number of old versions to keep (-1) - MaxAge: where Value takes an integer of number of hours before deleting the data.

type IdFromItemFields

type IdFromItemFields struct {
	Delimiter string   `json:"delimiter"`
	Fields    []string `json:"fields"`
}

type KeyValueFilter

type KeyValueFilter struct {
	Key   string
	Value string
}

type Kind

type Kind struct {
	// If Namespace here is present, it will override the global one.
	// If both are missing, the Kind will use native 'default'
	Namespace string `json:"namespace,omitempty"`
	Name      string `json:"name"`

	// If set, will be used as the actual Entity Name
	EntityName string `json:"entityName,omitempty"`

	// If set, will be used to create the Entity Name from the "id" values in the Transload output map.
	// The value is currently restricted to be of type string.
	EntityNameFromIds struct {
		Ids       []string `json:"ids,omitempty"`
		Delimiter string   `json:"delimiter,omitempty"`
	} `json:"entityNameFromIds,omitempty"`

	Properties []EntityProperty `json:"properties,omitempty"`
}

The Kind struct is used for Firestore sinks (in datastore mode). Currently, one of EntityName or EntityNameFromIds needs to be present in spec. TODO: Add creation of UUID if EntityName/Ref not present

type Location

type Location struct {
	Service string `json:"service"`
}

type Message

type Message struct {
	// PayloadFromId is the key/field ID in the Transformed output map, which contains the actual message payload
	PayloadFromId string `json:"payloadFromId,omitempty"`
}

Message is used for sinks like PubSub and Kafka, specifying how the message should be published

type NameFromId

type NameFromId struct {
	Prefix       string `json:"prefix"`
	SuffixFromId string `json:"suffixFromId"`

	// Preset contains a list of Column/CQ names that will be added to table directly during table creation.
	// This is not support (not needed) by BigTable loader, only BigQuery loader.
	Preset []string `json:"preset,omitempty"`
}

Creates a Column/CQ name from id outputs in transloaded event map

type Ops

type Ops struct {
	// StreamsPerPod specifies how many Executors that should execute the stream concurrently
	// in its own Goroutine.
	// This is especially important when using Kafka extractors. For max concurrency and highest throughput
	// it should be set equal to the number of topic partitions divided by expected number of pods.
	// There is negligible overhead in having more goroutines than partitions.
	// If omitted it is set to DefaultStreamsPerPod (1).
	StreamsPerPod int `json:"streamsPerPod"`

	// MicroBatch specifies if events should be processed in batches, which improves throughput.
	// If set to 'true' the involved stream entities try their best to process events in batches according
	// to each ETL entity's capability for micro-batch processing.
	// If omitted or set to false, the stream will process a single event at a time.
	// Note: Currently, only streams with source set to 'kafka' support micro batching. And it's only when
	// having sink set to 'bigquery' that this will have a beneficial effect due to BQ API capabilities for this.
	MicroBatch bool `json:"microBatch"`

	// MicroBatchSize is the maximum number of events that should be included in the batch.
	// If omitted it is set to DefaultMicroBatchSize
	MicroBatchSize int `json:"microBatchSize"`

	// MicroBatchSize specifies the threshold that when reached closes the batch regardless of number of events in
	// the batch, and forwards it downstream. The final size of the batch will be this threshold + size of next event.
	// If omitted it is set to DefaultMicroBatchSizeBytes
	MicroBatchBytes int `json:"microBatchBytes"`

	// MicroBatchTimeout is the maximum time to wait for the batch to fill up if the max size has not been reached.
	// If the sink is set to Kafka, this value will override the pollTimeout value.
	// If omitted it is set to DefaultMicroBatchTimeoutMs
	MicroBatchTimoutMs int `json:"microBatchTimeoutMs"`

	// MaxEventProcessingRetries specifies how many times an extracted event from the source should be processed
	// again (transform/load), if deemed retryable, before the Executor restarts the stream on a longer back-off
	// interval (MaxStreamRetryBackoffInterval). Retryable errors will be retried indefinitely for max self-healing.
	// If omitted it is set to DefaultMaxEventProcessingRetries.
	MaxEventProcessingRetries int `json:"maxEventProcessingRetries"`

	// MaxStreamRetryBackoffInterval specifies the max time between stream restarts after exponential backoff
	// retries of retryable event processing failures.
	// If omitted it is set to DefaultMaxStreamRetryBackoffInterval
	MaxStreamRetryBackoffIntervalSec int `json:"maxStreamRetryBackoffIntervalSec"`

	// HandlingOfUnretryableEvents specifies what to do with events that can't be properly transformed or loaded
	// to the sink, e.g. corrupt or otherwise non-compliant events vs the stream spec.
	// Available options are:
	//
	//		"default" - Default behaviour depending on Extractor type. For Kafka this means "discard" and for
	//					Pubsub it means Nack (continue retrying later but process other events as well).
	//					If this field is omitted it will take this value.
	//
	//		"discard" - Discard the event, log it with Warn, and continue processing other events.
	//
	//		"dlq"     - Move the event from the source topic to a DLQ topic named "geist.dlq.<streamId>".
	//
	//		"fail"    - The stream will be terminated with an error message.
	//
	// Note that all sink types might not support all available options. See documentation for each sink type for details.
	//
	HandlingOfUnretryableEvents string `json:"handlingOfUnretryableEvents"`

	// LogEventData is useful for enabling granular event level debugging dynamically for specific streams
	// without having to redeploy GEIST. To troubleshoot a specific stream a new version of the stream spec
	// can be uploaded at run-time (e.g. with LSD) with this field set to true.
	LogEventData bool `json:"logEventData"`
}

type ProcessEventFunc

type ProcessEventFunc func(context.Context, []Event) EventProcessingResult

type Property

type Property struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

type QueryType

type QueryType int
const (
	Unknown QueryType = iota
	KeyValue
	CompositeKeyValue
	All
)

type RegStorageMode

type RegStorageMode string
const (
	RegStorageUndefined RegStorageMode = ""
	RegStorageNative    RegStorageMode = "native"
	RegStorageInMemory  RegStorageMode = "inmemory"
	RegStorageCustom    RegStorageMode = "custom"
)

type Regexp

type Regexp struct {
	// The regular expression, in RE2 syntax.
	Expression string `json:"expression,omitempty"`

	// If used in conjunction with fieldExtraction, this will be the field to apply regexp on.
	Field string `json:"field,omitempty"`

	// If extracted field should be kept in result or omitted. Default is false.
	KeepField bool `json:"keepField,omitempty"`

	// Time conversion of date field. Field specified must be extracted before.
	TimeConversion *TimeConv `json:"timeConversion,omitempty"`
}

func (*Regexp) CollectGroups

func (r *Regexp) CollectGroups(exp string) []string

TODO: Shorten, but not important now since its no performance impact.

type ResultChanEvent

type ResultChanEvent struct {
	Id      string // Id of published event (created resource), according to rules in ETL stream spec
	Success bool   // Result status that will only be true if set explicitly, avoiding default value issues with Err
	Error   error  // Contains error info in case of Success == false
}

type ResultChannel

type ResultChannel <-chan ResultChanEvent

type RowItem

type RowItem struct {
	Column    string    `json:"column"`
	Timestamp time.Time `json:"timestamp"`
	Value     any       `json:"value"`
}

RowItem or an array of row items can be used as map value in a Transformed output from for example the Extractor.ExtractFromSink() function, e.g. providing data from a BigTable row

type RowKey

type RowKey struct {
	Predefined string   `json:"predefined,omitempty"`
	Keys       []string `json:"keys,omitempty"`
	Delimiter  string   `json:"delimiter,omitempty"`

	// Only required when using the Predefined rowkey option "keysInMap". This id should map to the transformed
	// output map item specified in ExtractItemsFromArray.Id
	MapId string `json:"mapId,omitempty"`
}

RowKey specifies how the row-key should be generated for BigTable sinks. If one of the Predefined options are set, that will be used. Currently available Predefined options are:

"timestampIso"
"invertedTimestamp"
"uuid"
"keysInMap"

If Predefined is not set, the Keys array should be used to specify which extracted fields from the event should be used. TODO: Add padding config

type Sink

type Sink struct {
	// Type specifies the type of sink into which data should be loaded.
	// Important stream constraints are noted below for each sink type where needed.
	//
	//		"bigquery" - Each transformed event (to be inserted as a row) should be well below 5MB to avoid
	//                   BigQuery http request size limit of 10MB for streaming inserts.
	//      "kafka"    - Max size of published events are default set to 2MB, but topics can set this higher
	//                   to max 8MB.
	Type EntityType `json:"type"`

	Config *SinkConfig `json:"config,omitempty"`
}

Sink spec

type SinkConfig

type SinkConfig struct {
	Provider string      `json:"provider,omitempty"` // Available options (only used for Kafka sink): "native" and "confluent"
	Topic    []SinkTopic `json:"topic,omitempty"`
	Message  *Message    `json:"message,omitempty"`
	Tables   []Table     `json:"tables,omitempty"`
	Kinds    []Kind      `json:"kinds,omitempty"` // TODO: Probably remove array and keep single object

	// Synchronous is used by Kafka sink/loader to specify if ensuring each event is guaranteed to be persisted to
	// broker (Synchronous: true), giving lower throughput (without not yet provided batch option), or if verifying
	// delivery report asynchronously (Synchronous: false), giving much higher throughput, but could lead to
	// message loss if GEIST crashes.
	Synchronous *bool `json:"synchronous,omitempty"`

	// DiscardInvalidData specifies if invalid data should be prevented from being stored in the sink and instead
	// logged and discarded.
	// It increases CPU load somewhat but can be useful to enable in case of data from an unreliable source is
	// being continuously retried and where the stream's HandlingOfUnretryableEvents mode is not granular enough.
	// One example is when having the MicroBatch mode enabled and we want to just discard individual invalid
	// events, instead of retrying or DLQ:ing the whole micro batch.
	// It is currently only regarded when using the BigQuery sink.
	DiscardInvalidData bool `json:"discardInvalidData,omitempty"`

	// Direct low-level entity properties like Kafka producer props, without impact on GEIST implementation.
	Properties []Property `json:"properties,omitempty"`
}

type SinkTopic

type SinkTopic struct {
	Env       Environment         `json:"env,omitempty"`
	TopicSpec *TopicSpecification `json:"topicSpec,omitempty"`
}

type Source

type Source struct {
	Type   EntityType   `json:"type"`
	Config SourceConfig `json:"config"`
}

Source spec

type SourceConfig

type SourceConfig struct {
	Provider     string        `json:"provider,omitempty"` // Available options (only used for Kafka Source): "native" and "confluent"
	Topics       []Topics      `json:"topics,omitempty"`
	Subscription *Subscription `json:"subscription,omitempty"`

	// PollTimeoutMs is a Kafka consumer specific property, specifying after how long time to return from the Poll()
	// call, if no messages are available for consumption. If this is omitted the value will be set to GEIST config
	// default (app.kafka.pollTimeoutMs). Normally this is not needed to be provided in the stream spec, nor changed
	// in the config. It has no impact on throughput. A higher value will lower the cpu load on idle streams.
	PollTimeoutMs *int `json:"pollTimeoutMs,omitempty"`

	// MaxOutstandingMessages is a PubSub consumer specific property, specifying max number of fetched but not yet
	// acknowledged messages in pubsub consumer. If this is omitted the value will be set to GEIST
	// config default (app.pubsub.maxOutstandingMessages).
	// For time consuming transform/sink streams decrease this value while increasing ops.streamsPerPod
	MaxOutstandingMessages *int `json:"maxOutstandingMessages,omitempty"`

	// MaxOutstandingBytes is a PubSub consumer specific property, specifying max size of fetched but not yet
	// acknowledged messages.
	MaxOutstandingBytes *int `json:"maxOutstandingBytes,omitempty"`

	// Synchronous is a PubSub consumer specific property, that can be used to tune certain type of streams (e.g.
	// spiky input flow of messages with very heavy transforms or slow sinks, where setting this to true could
	// reduce number of expired messages. Default is false.
	Synchronous *bool `json:"synchronous,omitempty"`

	// NumGoroutines is a PubSub consumer specific property used for increasing rate of incoming messages in case
	// downstream ETL is not cpu starved or blocked on sink ops, while Extractor cannot keep up with consuming
	// incoming messages. Depending on type of Sink/Loader a better/alternative approach is to increase ops.streamsPerPod.
	// If omitted it is set to 1.
	NumGoroutines *int `json:"numGoroutines,omitempty"`

	// Direct low-level entity properties like Kafka consumer props, without impact on GEIST implementation.
	Properties []Property `json:"properties,omitempty"`
}

type Spec

type Spec struct {
	Namespace      string    `json:"namespace"`
	StreamIdSuffix string    `json:"streamIdSuffix"`
	Description    string    `json:"description"`
	Version        int       `json:"version"`
	Disabled       bool      `json:"disabled"`
	Ops            Ops       `json:"ops"`
	Source         Source    `json:"source"`
	Transform      Transform `json:"transform"`
	Sink           Sink      `json:"sink"`
}

Spec implements the GEIST ETL Stream Spec interface and specifies how each ETL stream should be executed from Source to Transform to Sink. Specs are registered and updated through an ETL stream of its own, as specified by the built-in SpecRegistrationSpec. The Namespace + StreamIdSuffix combination must be unique (forming a GEIST ETL Stream ID). To succeed with an upgrade of an existing spec the version number needs to be incremented.

func NewEmptySpec

func NewEmptySpec() *Spec

func NewSpec

func NewSpec(specData []byte) (*Spec, error)

NewSpec creates a new Spec from JSON and validates both against JSON schema and the transformation logic on the created spec.

func (*Spec) Id

func (s *Spec) Id() string

func (*Spec) IsDisabled

func (s *Spec) IsDisabled() bool

func (*Spec) JSON

func (s *Spec) JSON() []byte

func (*Spec) SetRequiredDefaults

func (s *Spec) SetRequiredDefaults()

func (*Spec) Validate

func (s *Spec) Validate() error

Stream spec JSON schema validation will be handled by NewSpec() using validateRawJson() against geist spec json schema. This func contains more complex validation such as Regexp validation.

type Subscription

type Subscription struct {
	// Type can be:
	//
	// 		"shared" - meaning multiple consumers share this subscription in a competing consumer pattern.
	//				   Only one of the subscribers will receive each event.
	//				   If this is set, the name of the subscription needs to be present in the "Name" field.
	//
	//		"unique" - meaning each transloading stream instance will have its own unique subscription.
	//				   All instances will thus get all events from the topic.
	//				   If this is set, a unique subscription name will be created and the Name field is
	//				   ignored. This one is used internally by each pod's Supervisor to receive notifications
	//                 about registry updates, from other Supervisors' registry instances.
	Type string `json:"type,omitempty"`

	Name string `json:"name,omitempty"`
}

type Table

type Table struct {
	Name string `json:"name"`

	// Dataset is optional depending on sink type. Currently only used by BigQuery.
	Dataset string `json:"dataset"`

	// DatasetCreation is only required if the dataset is meant to be created by this stream
	// *and* if other values than the default ones are required.
	// Default values are location: EU and empty description.
	DatasetCreation *DatasetCreation `json:"datasetCreation,omitempty"`

	// Table spec for SQL type sinks such as BigQuery
	Columns       []Column       `json:"columns"`
	TableCreation *TableCreation `json:"tableCreation,omitempty"`

	// InsertIdFromId defines which value in the Transformed output map will contain the insert ID,
	// as extracted from one of the input event fields.
	// The value referred to in the transloaded output map needs to be of string type.
	// This is used for BigQuery best-effort deduplication.
	InsertIdFromId string `json:"insertIdFromId"`

	// Table spec for BigTable are built up by RowKey and ColumnFamilies
	RowKey         RowKey         `json:"rowKey"`
	ColumnFamilies []ColumnFamily `json:"columnFamilies"`

	// Only input transformations satisfying the whitelist key/value filter will be
	// processed by the sink (mostly needed in multi-table Sink specs)
	Whitelist *Whitelist `json:"whitelist,omitempty"`
}

The Table struct is used for BigTable, BigQuery and other table based sinks.

type TableCreation

type TableCreation struct {
	Description string `json:"description"`

	// If non-nil, the table is partitioned by time. Only one of
	// time partitioning or range partitioning can be specified.
	TimePartitioning *TimePartitioning `json:"timePartitioning,omitempty"`

	// If set to true, queries that reference this table must specify a
	// partition filter (e.g. a WHERE clause) that can be used to eliminate
	// partitions. Used to prevent unintentional full data scans on large
	// partitioned tables.
	RequirePartitionFilter bool `json:"requirePartitionFilter"`

	// Clustering specifies the data clustering configuration for the table.
	Clustering []string `json:"clustering,omitempty"`
}

TableCreation config contains table creation details. It is currently only used by BigQuery sinks and most of the fields/comments in the struct are copied directly from BQ client, with modifications to fit with the GEIST spec format.

type TimeConv

type TimeConv struct {
	// Field where the data is located and should be converted.
	Field string `json:"field,omitempty"`

	// Input format of date to be converted. Mandatory.
	InputFormat string `json:"inputFormat,omitempty"`

	// Output format of date, if omitted, ISO-8601 is used.
	OutputFormat string `json:"outputFormat,omitempty"`
}

type TimePartitioning

type TimePartitioning struct {
	// Defines the partition interval type. Supported values are "DAY" or "HOUR".
	Type string `json:"type"`

	// The amount of hours to keep the storage for a partition.
	// If the duration is empty (0), the data in the partitions do not expire.
	ExpirationHours int `json:"expirationHours"`

	// If empty, the table is partitioned by pseudo column '_PARTITIONTIME'; if set, the
	// table is partitioned by this field. The field must be a top-level TIMESTAMP or
	// DATE field. Its mode must be NULLABLE or REQUIRED.
	Field string `json:"field"`
}

TimePartitioning describes the time-based date partitioning on a table. It is currently only used by BigQuery sinks and most of the fields/docs in the struct are copied directly from BQ client, with modifications to fit with the GEIST spec format. For more information see: https://cloud.google.com/bigquery/docs/creating-partitioned-tables.

type TopicSpecification

type TopicSpecification struct {
	Name              string            `json:"name"`
	NumPartitions     int               `json:"numPartitions"`
	ReplicationFactor int               `json:"replicationFactor"`
	Config            map[string]string `json:"config,omitempty"` // not yet supported
}

Name, NumPartitions and ReplicationFactor are required. If sink topic is referring to an existing topic only Name will be used.

type Topics

type Topics struct {
	Env   Environment `json:"env,omitempty"`
	Names []string    `json:"names,omitempty"`
}

type Transform

type Transform struct {
	// ImplId denotes the Implementation ID (type of Transform implementation).
	// The GEIST built-in type is named 'native' and is currently the only one supported
	// TODO: Change this name to type? or just id?
	ImplId EntityType `json:"implId,omitempty"`

	// ExcludeEventsWith will be checked first to exclude events, matching conditions, from all other transformations.
	// If multiple filter objects are provided they are handled as OR type of filters.
	ExcludeEventsWith []ExcludeEventsWith `json:"excludeEventsWith,omitempty"`

	// The ExtractFields transformation type picks out fields from the input event JSON.
	// The first ExtractFields object that matches the ForEventsWith filter will be used
	// to create the resulting Transformed object.
	ExtractFields []ExtractFields `json:"extractFields,omitempty"`

	ExtractItemsFromArray []ExtractItemsFromArray `json:"extractItemsFromArray,omitempty"`

	// AddFieldsToJson can contain a list of extracted fields to be added to a another extracted field,
	// if that field is a JSON.
	AddFieldsToJson AddFieldsToJson `json:"addFieldsToJson,omitempty"`

	// The Regexp transformation transforms a string into a JSON based on the groupings in
	// the regular expression.
	// Minimum one groupings needs to be made.
	Regexp *Regexp `json:"regexp,omitempty"`
}

Transform spec

type Transformed

type Transformed struct {
	Data map[string]any `json:"data"`
}

func NewTransformed

func NewTransformed() *Transformed

func (*Transformed) String

func (t *Transformed) String() string

type TransformedItemMap

type TransformedItemMap map[string]any

TransformedItemMap is the type used for transforms creating a map of items to be stored in the output map. One example is the ExtractItemsFromArray transform, which extracts JSON array items into such a map and stores that map inside the output Transformed map array.

type Whitelist

type Whitelist struct {
	Id     string   `json:"id"`
	Type   string   `json:"type"`
	Values []string `json:"values"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL