entity

package
v0.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: MIT Imports: 8 Imported by: 6

Documentation

Index

Constants

View Source
const (
	NotifyLevelInvalid = iota
	NotifyLevelDebug
	NotifyLevelInfo
	NotifyLevelWarn
	NotifyLevelError
)
View Source
const (
	NotifyLevelStrInvalid = "INVALID"
	NotifyLevelStrDebug   = "DEBUG"
	NotifyLevelStrInfo    = "INFO"
	NotifyLevelStrWarn    = "WARN"
	NotifyLevelStrError   = "ERROR"
)
View Source
const (
	DefaultStreamsPerPod                    = 1
	DefaultMicroBatchSize                   = 500
	DefaultMicroBatchBytes                  = 5000000
	DefaultMicroBatchTimeoutMs              = 15000
	DefaultMaxEventProcessingRetries        = 5
	DefaultMaxStreamRetryBackoffIntervalSec = 300
)

General Ops defaults

View Source
const (
	HoueDefault = "default"
	HoueDiscard = "discard"
	HoueDlq     = "dlq"
	HoueFail    = "fail"
)

Available options for Ops.HandlingOfUnretryableEvents

View Source
const (
	TransformedKeyKey   = "key"
	TransformedValueKey = "value"
)
View Source
const GeistIngestionTime = "@GeistIngestionTime"

Data processing and ingestion options

Variables

View Source
var ErrEntityShutdownRequested = errors.New("entity shutdown requested")

An entity can request to be shut down. This error code should be returned and it's up to the Executor to decide if entire stream should be shutdown or any other action to be taken.

Functions

func NotifyLevel added in v0.4.2

func NotifyLevel(notifyLevelName string) int

func NotifyLevelName added in v0.4.0

func NotifyLevelName(notifyLevel int) string

Types

type ArrayItems

type ArrayItems struct {
	JsonPathToArray  string           `json:"jsonPathToArray"`
	IdFromItemFields IdFromItemFields `json:"idFromItemFields"`
}

type Config added in v0.4.0

type Config struct {
	Spec       *Spec
	ID         string
	NotifyChan NotifyChan
	Log        bool
}

Config is the Entity Config to use with Entity factories

type EntityType

type EntityType string

Native stream entity types (sources, sinks or both)

const (
	EntityInvalid  EntityType = "invalid"
	EntityVoid     EntityType = "void"
	EntityAdmin    EntityType = "admin"
	EntityGeistApi EntityType = "geistapi"
	EntityEventSim EntityType = "eventsim"
)

type Environment

type Environment string

Some Stream ETL Entities need different configurations based on environements. This is not possible to set in the generic GEIST build config since ETL entities are configured in externally provided ETL Stream Specs. The environment concept is therefore required to be known to the entity and to the stream spec.

The following env types are provided by Geist for consistency across entity plugins, but any type of custom string can be used by plugin entities. For example, a custom plugin extractor could support having "env": "someregion-staging" in the stream spec using that extractor/source, since the extractor implementation can cast the Environment type back to string when matching.

const (
	EnvironmentAll   Environment = "all"
	EnvironmentDev   Environment = "dev"
	EnvironmentStage Environment = "stage"
	EnvironmentProd  Environment = "prod"
)

type Event

type Event struct {
	Data []byte
	Ts   time.Time
	Key  []byte
}

func (Event) String

func (e Event) String() string

type EventProcessingResult

type EventProcessingResult struct {
	Status     ExecutorStatus
	ResourceId string
	Error      error
	Retryable  bool
}

type ExcludeEventsWith

type ExcludeEventsWith struct {
	Key          string   `json:"key"`
	Values       []string `json:"values,omitempty"`
	ValuesNotIn  []string `json:"valuesNotIn,omitempty"`
	ValueIsEmpty *bool    `json:"valueIsEmpty,omitempty"`
}

ExcludeEventsWith specifies if certain events should be skipped directly, without further processing. If the event field as specified by the Key field matches any of the values in the Values array the event will be excluded. This is the Blacklisting option of this filter. The Key string must be on a JSON path syntax according to github.com/tidwall/gjson (see below). The value field is currently limited to string values.

If Values array is missing or empty a check will be done on ValuesNotIn. If the event field as specified by the Key field does not have a value matching any of the values in the ValuesNotIn field, the event is excluded. This is the Whitelisting option of this filter.

If ValueIsEmpty is set to true and the field string value is empty, the event will be excluded.

type ExcludeEventsWithMultipleConditions added in v0.12.0

type ExcludeEventsWithMultipleConditions struct {
	Filters []ExcludeEventsWith `json:"filters"`
}

ExcludeEventsWithMultipleConditions allows us to combine multiple ExcludeEventsWith filters. All the Filters must be true for the event to be excluded, i.e. AND type of filter. Suitable for more complex event exclusion scenarios, where the combination of fields should be taken into account for filtering.

type ExecutorStatus

type ExecutorStatus int
const (
	ExecutorStatusInvalid ExecutorStatus = iota
	ExecutorStatusSuccessful
	ExecutorStatusError
	ExecutorStatusRetriesExhausted
	ExecutorStatusShutdown
)

type ExtractFields

type ExtractFields struct {
	// ForEventsWith is used to filter which incoming event the fields should be extracted from
	// Currently only AND type filter is supported if supplying multiple key-value pairs.
	// If ForEventsWith is empty or omitted, fields will be taken from all events.
	ForEventsWith []ForEventsWith `json:"forEventsWith,omitempty"`

	// ExcludeEventsWith inside ExtractField complements the top level ExcludeEventsWith by
	// only being applicable for the events filtered with ForEventsWith.
	// As an example, this simplifies event schema evolution whereby a field containing the
	// schema version number could be governed by ForEventsWith, and event exclusion (and
	// field extraction) are handled by this and subsequent specification constructs.
	ExcludeEventsWith []ExcludeEventsWith `json:"excludeEventsWith,omitempty"`

	// The Fields a array contains the definitions of which fields to extract for the filtered-out event
	Fields []Field `json:"fields,omitempty"`
}

The ExtractFields transformation type creates root level ID fields, with values retrieved from a json path expression from the input event

type ExtractItemsFromArray

type ExtractItemsFromArray struct {
	Id            string          `json:"id"`
	ForEventsWith []ForEventsWith `json:"forEventsWith,omitempty"`
	Items         ArrayItems      `json:"items"`
}

ExtractItemsFromArray transformation returns all items in an arbitrary array inside the event json with the ID/key of each item according to the required IdFromItemFields spec. If the resulting ID/key of each item is an empty string the item will be omitted from the output map. The items will be stored inside a map in the transformed output map. It's key is specified by the "Id" field.

type Extractor

type Extractor interface {

	// StreamExtract (required) continuously consumes events from its source (until ctx is canceled),
	// and report each consumed event back to Executor with reportEvent(), for further processing.
	StreamExtract(
		ctx context.Context,
		reportEvent ProcessEventFunc,
		err *error,
		retryable *bool)

	// Extract (optional) provides generic extraction from the source based on the provided query,
	// and returns directly.
	Extract(ctx context.Context, query ExtractorQuery, result any) (error, bool)

	// ExtractFromSink (optional) extracts data from the sink used in an ETL Stream, as specified
	// in the Extractors GEIST spec. Currently only supported by Firestore and BigTable extractors.
	ExtractFromSink(ctx context.Context, query ExtractorQuery, result *[]*Transformed) (error, bool)

	// SendToSource (optional) enables external clients to send events directly to the Extractor's
	// Source with Geist.Publish().
	// For source connectors meant to be used in admin streams, this method is required.
	// Currently known connectors that implement this method are:
	//		* "geistapi" (channel) extractor
	// 		* "pubsub" GCP extractor
	//		* "kafka" extractor
	SendToSource(ctx context.Context, event any) (string, error)
}

Extractor is the interface required for stream source extractor implementations and for sink queries. The Extractor implementation should be given its GEIST Spec in a constructor.

For source stream extractors the only function required to be fully functional is StreamExtract().

For sink extractors the only function required to be fully functional is ExtractFromSink().

The others are situational depending on extractor/source entity type, and could be empty, e.g. simply returning nil, false (or someerror, false).

type ExtractorFactories

type ExtractorFactories map[string]ExtractorFactory

type ExtractorFactory

type ExtractorFactory interface {
	// SourceId returns the source ID for which the extractor is implemented
	SourceId() string

	// NewExtractor creates a new extractor entity
	NewExtractor(ctx context.Context, c Config) (Extractor, error)

	// Close is called by Geist after client has called Geist API geist.Shutdown()
	Close(ctx context.Context) error
}

ExtractorFactory enables loaders/sinks to be handled as plug-ins to Geist. A factory is registered with Geist API RegisterLoaderType() for a source type to be available for stream specs.

type ExtractorQuery

type ExtractorQuery struct {
	Type         QueryType
	Key          string
	CompositeKey []KeyValueFilter
}

type Field

type Field struct {
	Id string `json:"id"`

	// JsonPath defines which field in the JSON that should be extracted. It uses github.com/tidwall/gjson
	// syntax, such as "myCoolField" if we want to extract that field from json { "myCoolField": "isHere" }
	//
	// The full raw JSON event is also regarded as a 'field' and to extract that the JsonPath string should
	// be empty or omitted in the spec.
	JsonPath string `json:"jsonPath"`

	// - For normal fields, Type can be "string", "integer", "number", "boolean" or "float".
	// If omitted in the spec, string will be used.
	//
	// - For raw event fields the default type is []byte, unless Type is explicitly set to "string".
	// For performance critical streams, type should be omitted (avoiding conversions), especially when
	// having a stream with BigTable sink, which stores the data as byte anyway.
	//
	// - If a field is an iso timestamp string (e.g. "2019-11-30T14:57:23.389Z") the type
	// "isoTimestamp" can be used, to have a Go time.Time object created as the value for this field key.
	//
	// - If a field is a unix timestamp (number or str) (e.g. 1571831226950 or "1571831226950") the type
	// "unixTimestamp" can be used to have Go time.Time object created as the value for this field.
	//
	// - If a field is a User Agent string (e.g. "Mozilla%2F5.0%20(Macintosh%3B%20Intel%2...") the type
	// "userAgent" can be used to have parsed JSON output as string, with separate fields for each part of UA.
	Type string `json:"type,omitempty"`
}

type ForEventsWith

type ForEventsWith struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

The Key string must be on a JSON path syntax according to github.com/tidwall/gjson (see below). Note that while the 'Value' field in the ForEventsWith spec is of string type, the actual field in the incoming event can be of for example int type in addition to string, where a match will be made of its string representation. For example, if 'Value' is set to "3" and the field in the incoming event is of JSON number type (int) with a value of 3, a match will be made correctly.

type HookAction

type HookAction int
const (
	HookActionInvalid          HookAction = iota // default, not to be used
	HookActionProceed                            // continue processing of this event
	HookActionSkip                               // skip processing of this event and take next
	HookActionRetryableError                     // let Geist handle this event as a retryable error
	HookActionUnretryableError                   // let Geist handle this event as an unretryable error (e.g. corrupt event to be sent to DLQ)
	HookActionShutdown                           // shut down this stream instance
)

type IdFromItemFields

type IdFromItemFields struct {
	Delimiter string   `json:"delimiter"`
	Fields    []string `json:"fields"`
}

type KeyValueFilter

type KeyValueFilter struct {
	Key   string
	Value string
}

type Loader

type Loader interface {

	// If successful the event/resource ID of the loaded event is returned.
	// If input 'data' is nil or empty, an error is to be returned.
	StreamLoad(ctx context.Context, data []*Transformed) (string, error, bool)

	// Called by Executor during shutdown of the stream
	Shutdown(ctx context.Context)
}

Loader interface required for stream sink Loader implementations. Only certain types of Loader implementations might support multiple input Transformed object. For example, while a BigTable implementation might only support a single (first) Transformed object, as input to which fields from a single event to insert to the table, a Kafka implementation might receive multiple Transformed object (as the result from a EventSplit transformation type), all of which should be sent as separate events to the specified Kafka topic.

type LoaderFactories

type LoaderFactories map[string]LoaderFactory

type LoaderFactory

type LoaderFactory interface {
	// Sink returns the sink ID for which the loader is implemented
	SinkId() string

	// NewLoader creates a new loader entity
	NewLoader(ctx context.Context, c Config) (Loader, error)

	// NewSinkExtractor creates an extractor to enable retrieving data from the sink
	// as written by the loader.
	// This functionality is optional and if not implemented the function should return nil, nil.
	NewSinkExtractor(ctx context.Context, c Config) (Extractor, error)

	// Close is called by Geist after using Geist API geist.Shutdown()
	Close(ctx context.Context) error
}

LoaderFactory enables loaders/sinks to be handled as plug-ins to Geist. A factory is registered with Geist API RegisterLoaderType() for a sink type to be available for stream specs.

type Metrics added in v0.4.0

type Metrics struct {

	// Total number of events sent to Executor's ProcessEvent() by the Extractor,
	// regardless of the outcome of downstream processing.
	EventsProcessed int64

	// Total time spent by Executor processing all extracted events
	EventProcessingTimeMicros int64

	// Total number of event batches sent from Extractor to Sink loader via Executor
	Microbatches int64

	// Total amount of event data processed (as sent from Extractor)
	BytesProcessed int64

	// Total number of events successfully processed by the sink.
	EventsStoredInSink int64

	// Total time spent ingesting transformed events in the sink successfully
	SinkProcessingTimeMicros int64

	// Total number of successfull calls to the Sink's StreamLoad method
	SinkOperations int64

	// Total amount of data successfully ingested
	BytesIngested int64
}

Metrics provided by the engine of its operations. Accessible from Geist API with geist.Metrics()

func (*Metrics) Reset added in v0.4.0

func (m *Metrics) Reset()

type NotificationEvent added in v0.4.0

type NotificationEvent struct {

	// The nofication level
	Level string

	// Timestamp of the event on the format "2006-01-02T15:04:05.000000Z"
	Timestamp string

	// The entity type of the sender, e.g. "executor", "supervisor", etc
	Sender string

	// The unique instance ID of the sender
	Instance string

	// The stream ID, if applicable
	Stream string

	Message string

	// Location and stack info, from where notification was sent.
	// Func is always provided.
	// File and Line are added when notification level is WARN or above.
	// StackTrace is added when notification level is ERROR.
	Func       string
	File       string
	Line       int
	StackTrace string
}

NoficationEvent is the type of the events sent by Geist to the notification channel, which is accessible externally with geist.NotificationChannel().

type NotifyChan added in v0.4.0

type NotifyChan chan NotificationEvent

type Ops

type Ops struct {
	// StreamsPerPod specifies how many Executors that should execute the stream concurrently
	// in its own Goroutine.
	// This is especially important when using Kafka extractors. For max concurrency and highest throughput
	// it should be set equal to the number of topic partitions divided by expected number of pods.
	// There is negligible overhead in having more goroutines than partitions.
	// If omitted it is set to DefaultStreamsPerPod (1).
	StreamsPerPod int `json:"streamsPerPod"`

	// Disabled has the same meaning as the root level field with the same name. It is
	// available here as well in order to enable granular overrides per environment.
	// If this is omitted, the root level field will be used.
	Disabled *bool `json:"disabled,omitempty"`

	// MicroBatch specifies if events should be processed in batches, which improves throughput.
	// If set to 'true' the involved stream entities try their best to process events in batches according
	// to each ETL entity's capability for micro-batch processing.
	// If omitted or set to false, the stream will process a single event at a time.
	// Note: This is an optional feature for each Source/Extractor plugin. See doc for each plugin entity for availability.
	// Example of a plugin supporting this is 'kafka'.
	// Verified beneficial effect is when having sink set to 'bigquery', due to BQ API capabilities for this.
	MicroBatch bool `json:"microBatch"`

	// MicroBatchSize is the maximum number of events that should be included in the batch.
	// If omitted it is set to DefaultMicroBatchSize
	MicroBatchSize int `json:"microBatchSize,omitempty"`

	// MicroBatchSize specifies the threshold that when reached closes the batch regardless of number of events in
	// the batch, and forwards it downstream. The final size of the batch will be this threshold + size of next event.
	// If omitted it is set to DefaultMicroBatchSizeBytes
	MicroBatchBytes int `json:"microBatchBytes,omitempty"`

	// MicroBatchTimeout is the maximum time to wait for the batch to fill up if the max size has not been reached.
	// If the sink is set to Kafka, this value will override the pollTimeout value.
	// If omitted it is set to DefaultMicroBatchTimeoutMs
	MicroBatchTimeoutMs int `json:"microBatchTimeoutMs,omitempty"`

	// MaxEventProcessingRetries specifies how many times an extracted event from the source should be processed
	// again (transform/load), if deemed retryable, before the Executor restarts the stream on a longer back-off
	// interval (MaxStreamRetryBackoffInterval). Retryable errors will be retried indefinitely for max self-healing.
	// If omitted it is set to DefaultMaxEventProcessingRetries.
	MaxEventProcessingRetries int `json:"maxEventProcessingRetries"`

	// MaxStreamRetryBackoffInterval specifies the max time between stream restarts after exponential backoff
	// retries of retryable event processing failures.
	// If omitted or zero it is set to DefaultMaxStreamRetryBackoffInterval
	MaxStreamRetryBackoffIntervalSec int `json:"maxStreamRetryBackoffIntervalSec"`

	// HandlingOfUnretryableEvents specifies what to do with events that can't be properly transformed or loaded
	// to the sink, e.g. corrupt or otherwise non-compliant events vs the stream spec.
	// Available options are:
	//
	//		"default" - Default behaviour depending on Extractor type. For Kafka this means "discard" and for
	//					Pubsub it means Nack (continue retrying later but process other events as well).
	//					If this field is omitted it will take this value.
	//
	//		"discard" - Discard the event, log it with Warn, and continue processing other events.
	//
	//		"dlq"     - Move the event from the source topic to a DLQ topic specified in DLQ Config.
	//
	//		"fail"    - The stream will be terminated with an error message.
	//
	// Note that all source types might not support all available options. See documentation for each source type for details.
	//
	HandlingOfUnretryableEvents string `json:"handlingOfUnretryableEvents,omitempty"`

	// LogEventData is useful for enabling granular event level debugging dynamically for specific streams
	// without having to redeploy GEIST. To troubleshoot a specific stream a new version of the stream spec
	// can be uploaded at run-time with this field set to true.
	LogEventData bool `json:"logEventData"`

	// CustomProperties can be used to configure stream processing in any type of custom
	// connector or injected enrichment logic.
	CustomProperties map[string]string `json:"customProperties"`
}

func (*Ops) EnsureValidDefaults added in v0.5.2

func (o *Ops) EnsureValidDefaults()

type PostTransformHookFunc added in v0.8.0

type PostTransformHookFunc func(ctx context.Context, spec *Spec, event *[]*Transformed) HookAction

PostTransformHookFunc serves the same purpose and functionality as the PreTransformHookFunc but is called after the event transformations.

type PreTransformHookFunc

type PreTransformHookFunc func(ctx context.Context, spec *Spec, event *[]byte) HookAction

PreTransformHookFunc is a client-provided function which the stream's Executor use prior to sending the event to the Transfomer. This way the client could modifiy/enrich each event before being processed according to the transform part of the spec. Since errors in this func is solely part of the client domain there is no point in returning them to the Geist executor. It is up the the client to decide appropriate actions to take, including optionally returning one of the HookAction error values. The event is provided as a mutable argument to avoid requiring the client to always return data even if not used. The stream spec governing the provided event is provided for context and filtering logic capabilities, since the function is called for all concurrently running streams.

type ProcessEventFunc

type ProcessEventFunc func(context.Context, []Event) EventProcessingResult

ProcessEventFunc is the type of func that an Extractor calls for each extracted event to be processed downstream.

It is important for the Extractor to properly handle the returned EventProcessingResult.

EventProcessingResult.ExecutorStatus values:
	ExecutorStatusSuccessful --> continue as normal
	ExecutorStatusError --> handle error depending on Houe mode in stream spec
	ExecutorStatusRetriesExhausted --> normally a shutdown of extractor is an ok action (will be restarted)
	ExecutorStatusShutdown --> shut down extractor

type Property

type Property struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}

type QueryType

type QueryType int
const (
	Unknown QueryType = iota
	QueryTypeKeyValue
	QueryTypeCompositeKeyValue
	QueryTypeAll
)

type Regexp

type Regexp struct {
	// The regular expression, in RE2 syntax.
	Expression string `json:"expression,omitempty"`

	// If used in conjunction with fieldExtraction, this will be the field to apply regexp on.
	Field string `json:"field,omitempty"`

	// If extracted field should be kept in result or omitted. Default is false.
	KeepField bool `json:"keepField,omitempty"`

	// Time conversion of date field. Field specified must be extracted before.
	TimeConversion *TimeConv `json:"timeConversion,omitempty"`
}

Regexp specifies an optional transformation type for use in the stream spec. It transforms a string into a JSON based on the groupings in the regular expression. Minimum one grouping needs to be made. The resulting output from the transformation is found with the key "regexppayload". An example use case is when having incoming events on a Kafka topic with certain fields containing plain log text strings, from which certain parts should be extracted into fields for downstream ingestion into the sink on a structured format.

func (*Regexp) CollectGroups

func (r *Regexp) CollectGroups(exp string) []string

TODO: Shorten, but not important now since there is no performance impact.

func (*Regexp) Validate added in v0.5.2

func (r *Regexp) Validate() (err error)

Validate returns nil if the Regexp transform spec is valid, or an error otherwise.

type RowItem

type RowItem struct {
	Column    string    `json:"column"`
	Timestamp time.Time `json:"timestamp"`
	Value     any       `json:"value"`
}

RowItem or an array of row items can be used as map value in a Transformed output from for example the Extractor.ExtractFromSink() function, e.g. providing data from a BigTable row

type Sink

type Sink struct {
	Type   EntityType  `json:"type"`
	Config *SinkConfig `json:"config,omitempty"`
}

Sink spec

type SinkConfig

type SinkConfig struct {

	// Generic property container
	Properties []Property `json:"properties,omitempty"`

	// CustomConfig can be used by custom source/sink plugins for config options not explicitly provided by the Spec struct
	CustomConfig any `json:"customConfig,omitempty"`
}

type Source

type Source struct {
	Type   EntityType   `json:"type"`
	Config SourceConfig `json:"config"`
}

Source spec

type SourceConfig

type SourceConfig struct {

	// Properties is a generic property container
	Properties []Property `json:"properties,omitempty"`

	// CustomConfig can be used by custom source/sink plugins for config options not explicitly provided by the Spec struct
	CustomConfig any `json:"customConfig,omitempty"`
}

type Spec

type Spec struct {
	// Main metadata (required)
	Namespace      string `json:"namespace"`
	StreamIdSuffix string `json:"streamIdSuffix"`
	Description    string `json:"description"`
	Version        int    `json:"version"`

	// Operational config (optional)
	Disabled  bool           `json:"disabled"`
	Ops       Ops            `json:"ops"`
	OpsPerEnv map[string]Ops `json:"opsPerEnv,omitempty"`

	// Stream entity config (required)
	Source    Source    `json:"source"`
	Transform Transform `json:"transform"`
	Sink      Sink      `json:"sink"`
}

Spec implements the GEIST Stream Spec interface and specifies how each ETL stream should be executed from Source to Transform to Sink. Specs are registered and updated through a stream of its own, as specified by the configurable SpecRegistrationSpec. The Namespace + StreamIdSuffix combination must be unique (forming a GEIST Stream ID). To succeed with an upgrade of an existing spec the version number needs to be incremented.

func NewEmptySpec

func NewEmptySpec() *Spec

func NewSpec

func NewSpec(specData []byte) (*Spec, error)

NewSpec creates a new Spec from JSON and validates both against JSON schema and the transformation logic on the created spec.

func (*Spec) EnsureValidDefaults added in v0.5.2

func (s *Spec) EnsureValidDefaults()

func (*Spec) Id

func (s *Spec) Id() string

func (*Spec) IsDisabled

func (s *Spec) IsDisabled() bool

func (*Spec) JSON

func (s *Spec) JSON() []byte

func (*Spec) Validate

func (s *Spec) Validate() error

Stream spec JSON schema validation will be handled by NewSpec() using validateRawJson() against Geist spec json schema. This method enables more complex validation such as Regexp validation.

type TimeConv

type TimeConv struct {
	// Field where the data is located and should be converted.
	Field string `json:"field,omitempty"`

	// Input format of date to be converted. Mandatory.
	InputFormat string `json:"inputFormat,omitempty"`

	// Output format of date, if omitted, ISO-8601 is used.
	OutputFormat string `json:"outputFormat,omitempty"`
}

type Transform

type Transform struct {
	// TODO: Remove. Legacy construct.
	ImplId EntityType `json:"implId,omitempty"`

	// ExcludeEventsWith will be checked first to exclude events, matching conditions,
	// from all other transformations. If multiple filter objects are provided they are
	// handled as OR type of filters.
	ExcludeEventsWith []ExcludeEventsWith `json:"excludeEventsWith,omitempty"`

	// ExcludeEventsWithMultipleConditions will be checked to exclude events based on a
	// combination of conditions. If multiple items are provided they are handled as OR type of filters.
	ExcludeEventsWithMultipleConditions []ExcludeEventsWithMultipleConditions `json:"excludeEventsWithMultipleConditions,omitempty"`

	// The ExtractFields transformation type picks out fields from the input event JSON.
	// The first ExtractFields object that matches the ForEventsWith filter will be used
	// to create the resulting Transformed object.
	ExtractFields []ExtractFields `json:"extractFields,omitempty"`

	ExtractItemsFromArray []ExtractItemsFromArray `json:"extractItemsFromArray,omitempty"`

	// The Regexp transformation transforms a string into a JSON based on the groupings in
	// the regular expression. Minimum one groupings needs to be made.
	Regexp *Regexp `json:"regexp,omitempty"`

	// CustomConfig can be used by custom enrichment logic, executed via the Pre/PostTransformHookFuncs,
	// to enable arbitrary config schemas applicable for each client's enrichment needs.
	CustomConfig any `json:"customConfig,omitempty"`
}

Transform spec In contrast to the other stream entities (Source/Sink Connectors), it is currently not possible to register new custom Transform ones via GEIST API. This might change in a future release to simplify extension of custom transformation logic. Fow now, custom logic can still be achieved via the Transform hook functions, as configured in geist.Config.Hooks.

func (*Transform) Validate added in v0.5.2

func (t *Transform) Validate() (err error)

type Transformed

type Transformed struct {
	Data map[string]any `json:"data"`
}

func NewTransformed

func NewTransformed() *Transformed

func (*Transformed) String

func (t *Transformed) String() string

type TransformedItemMap

type TransformedItemMap map[string]any

TransformedItemMap is the type used for transforms creating a map of items to be stored in the output map. One example is the ExtractItemsFromArray transform, which extracts JSON array items into such a map and stores that map inside the output Transformed map array.

Directories

Path Synopsis
Package transform is the native/default implementation of a transform provider.
Package transform is the native/default implementation of a transform provider.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL