procbuiltin

package
v0.9.0-nightly.20240206 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FormatDebezium     = "debezium"
	FormatKafkaConnect = "kafka-connect"
	FormatOpenCDC      = "opencdc"
)

Variables

This section is empty.

Functions

func DecodeWithSchemaKey added in v0.7.0

func DecodeWithSchemaKey(config processor.Config) (processor.Interface, error)

DecodeWithSchemaKey builds a processor with the following config fields:

  • `url` (Required) - URL of the schema registry (e.g. http://localhost:8085)
  • `auth.basic.username` (Optional) - Configures the username to use with basic authentication. This option is required if `auth.basic.password` contains a value. If both `auth.basic.username` and `auth.basic.password` are empty basic authentication is disabled.
  • `auth.basic.password` (Optional) - Configures the password to use with basic authentication. This option is required if `auth.basic.username` contains a value. If both `auth.basic.username` and `auth.basic.password` are empty basic authentication is disabled.
  • `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA certificates. If this option is empty, Conduit falls back to using the host's root CA set.
  • `tls.client.cert` (Optional) - Path to a file containing a PEM encoded certificate. This option is required if `tls.client.key` contains a value. If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.
  • `tls.client.key` (Optional) - Path to a file containing a PEM encoded private key. This option is required if `tls.client.cert` contains a value. If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.

The processor takes raw data (bytes) and decodes it from the Confluent wire format into structured data. It extracts the schema ID from the data, downloads the associated schema from the schema registry and decodes the payload. The schema is cached locally after it's first downloaded. Currently, the processor only supports the Avro format. If the processor encounters structured data or the data can't be decoded it returns an error.

More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html

func DecodeWithSchemaPayload added in v0.7.0

func DecodeWithSchemaPayload(config processor.Config) (processor.Interface, error)

DecodeWithSchemaPayload builds the same processor as DecodeWithSchemaKey, except that it operates on the field Record.Payload.After.

func EncodeWithSchemaKey added in v0.7.0

func EncodeWithSchemaKey(config processor.Config) (processor.Interface, error)

EncodeWithSchemaKey builds a processor with the following config fields:

  • `url` (Required) - URL of the schema registry (e.g. http://localhost:8085)
  • `schema.strategy` (Required, Enum: `preRegistered`,`autoRegister`) - Specifies which strategy to use to determine the schema for the record. Available strategies:
  • `preRegistered` (recommended) - Download an existing schema from the schema registry. This strategy is further configured with options starting with `schema.preRegistered.*`.
  • `autoRegister` (for development purposes) - Infer the schema from the record and register it in the schema registry. This strategy is further configured with options starting with `schema.autoRegister.*`.
  • `schema.preRegistered.subject` (Required if `schema.strategy` = `preRegistered`) - Specifies the subject of the schema in the schema registry used to encode the record.
  • `schema.preRegistered.version` (Required if `schema.strategy` = `preRegistered`) - Specifies the version of the schema in the schema registry used to encode the record.
  • `schema.autoRegister.subject` (Required if `schema.strategy` = `autoRegister`) - Specifies the subject name under which the inferred schema will be registered in the schema registry.
  • `schema.autoRegister.format` (Required if `schema.strategy` = `autoRegister`, Enum: `avro`) - Specifies the schema format that should be inferred. Currently the only supported format is `avro`.
  • `auth.basic.username` (Optional) - Configures the username to use with basic authentication. This option is required if `auth.basic.password` contains a value. If both `auth.basic.username` and `auth.basic.password` are empty basic authentication is disabled.
  • `auth.basic.password` (Optional) - Configures the password to use with basic authentication. This option is required if `auth.basic.username` contains a value. If both `auth.basic.username` and `auth.basic.password` are empty basic authentication is disabled.
  • `tls.ca.cert` (Optional) - Path to a file containing PEM encoded CA certificates. If this option is empty, Conduit falls back to using the host's root CA set.
  • `tls.client.cert` (Optional) - Path to a file containing a PEM encoded certificate. This option is required if `tls.client.key` contains a value. If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.
  • `tls.client.key` (Optional) - Path to a file containing a PEM encoded private key. This option is required if `tls.client.cert` contains a value. If both `tls.client.cert` and `tls.client.key` are empty TLS is disabled.

The processor takes structured data and encodes it using a schema into the Confluent wire format. It provides two strategies for determining the schema:

  • `preRegistered` (recommended)

    This strategy downloads an existing schema from the schema registry and uses it to encode the record. This requires the schema to already be registered in the schema registry. The schema is downloaded only once and cached locally.

  • `autoRegister` (for development purposes) This strategy infers the schema by inspecting the structured data and registers it in the schema registry. If the record schema is known in advance it's recommended to use the `preRegistered` strategy and manually register the schema, as this strategy comes with limitations.

    The strategy uses reflection to traverse the structured data of each record and determine the type of each field. If a specific field is set to `nil` the processor won't have enough information to determine the type and will default to a nullable string. Because of this it is not guaranteed that two records with the same structure produce the same schema or even a backwards compatible schema. The processor registers each inferred schema in the schema registry with the same subject, therefore the schema compatibility checks need to be disabled for this schema to prevent failures. If the schema subject does not exist before running this processor, it will automatically set the correct compatibility settings the first time it registers the schema.

The processor currently only supports the Avro format.

More info about the Confluent wire format: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format More info about the Confluent schema registry: https://docs.confluent.io/platform/current/schema-registry/index.html

func EncodeWithSchemaPayload added in v0.7.0

func EncodeWithSchemaPayload(config processor.Config) (processor.Interface, error)

EncodeWithSchemaPayload builds the same processor as EncodeWithSchemaKey, except that it operates on the field Record.Payload.After.

func ExtractFieldKey

func ExtractFieldKey(config processor.Config) (processor.Interface, error)

ExtractFieldKey builds the following processor:

  • If the key is raw, return an error (not supported yet).
  • If the key is structured, extract the field and use it to replace the entire key.

func ExtractFieldPayload

func ExtractFieldPayload(config processor.Config) (processor.Interface, error)

ExtractFieldPayload builds the same processor as ExtractFieldKey, except that it operates on the field Record.Payload.After.

func FilterFieldKey

func FilterFieldKey(config processor.Config) (processor.Interface, error)

FilterFieldKey builds a processor with the following config fields:

  • `type` sets the behavior to "include" or "exclude" the record based on the result of the condition.
  • `condition` is an XPath query expression that the user defines to forward or drop a record on its results.
  • `missingornull` defines how to handle the record in the event the fields the query would use don't exist.
  • `exists` field in the config gives the user a chance to define an existence query for a given filter.

If `condition` passes, then it will immediately handle the record as `type` dictates. If `condition` doesn't match and `exists` matches nothing, then it will handle the record as `missingornull` specifies.

Example processor config with noted possible values:

  {
	   "type": "include", // [include, exclude]
	   "condition":"<xpath expression>",
	   "exists": "<xpath expression>",
	   "missingornull": "fail" // [fail, include, exclude]
  }

func FilterFieldPayload

func FilterFieldPayload(config processor.Config) (processor.Interface, error)

FilterFieldPayload builds the same processor as FilterFieldKey, except that it operates on the field Record.Payload.After.

func HTTPRequest

func HTTPRequest(config processor.Config) (processor.Interface, error)

HTTPRequest builds a processor that sends an HTTP request to the specified URL with the specified HTTP method (default is POST) with a content-type header as the specified value (default is application/json). the whole record as json will be used as the request body and the raw response body will be set under Record.Payload.After. if the response code is (204 No Content) then the record will be filtered out.

func HoistFieldKey

func HoistFieldKey(config processor.Config) (processor.Interface, error)

HoistFieldKey builds the following processor:

  • If the key is raw and has a schema attached, return an error (not supported yet).
  • If the key is raw and has no schema, transforms it into structured data by creating a map with the hoisted field and raw data as the value.
  • If the key is structured, wrap it using the specified field name in a map.

func HoistFieldPayload

func HoistFieldPayload(config processor.Config) (processor.Interface, error)

HoistFieldPayload builds the same processor as HoistFieldKey, except that it operates on the field Record.Payload.After.

func InsertFieldKey

func InsertFieldKey(config processor.Config) (processor.Interface, error)

InsertFieldKey builds the following processor:

  • If the key is raw, return an error (not supported yet).
  • If the key is structured, set the field(s) in the key data.

func InsertFieldPayload

func InsertFieldPayload(config processor.Config) (processor.Interface, error)

InsertFieldPayload builds the same processor as InsertFieldKey, except that it operates on the field Record.Payload.After.

func MaskFieldKey

func MaskFieldKey(config processor.Config) (processor.Interface, error)

MaskFieldKey builds the following processor:

  • If the key is raw, return an error (not supported yet).
  • If the key is structured, replace the field with the zero value of the fields type.

func MaskFieldPayload

func MaskFieldPayload(config processor.Config) (processor.Interface, error)

MaskFieldPayload builds the same processor as MaskFieldKey, except that it operates on the field Record.Payload.After.

func ParseJSONKey added in v0.5.0

func ParseJSONKey(_ processor.Config) (processor.Interface, error)

ParseJSONKey parses the record key from raw to structured data

func ParseJSONPayload added in v0.5.0

func ParseJSONPayload(_ processor.Config) (processor.Interface, error)

ParseJSONPayload parses the record payload from raw to structured data

func ReplaceFieldKey

func ReplaceFieldKey(config processor.Config) (processor.Interface, error)

ReplaceFieldKey builds a processor which replaces a field in a structured key. Raw data is not supported. The processor can be controlled by 3 variables:

  • "exclude" - is a comma separated list of fields that should be excluded from the processed record ("exclude" takes precedence over "include").
  • "include" - is a comma separated list of fields that should be included in the processed record.
  • "rename" - is a comma separated list of pairs separated by colons, that controls the mapping of old field names to new field names.

If "include" is not configured or is empty then all fields in the record will be included by default (except if they are configured in "exclude"). If "include" is not empty, then all fields are excluded by default and only fields in "include" will be added to the processed record.

func ReplaceFieldPayload

func ReplaceFieldPayload(config processor.Config) (processor.Interface, error)

ReplaceFieldPayload builds the same processor as ReplaceFieldKey, except that it operates on the field Record.Payload.After.

func TimestampConverterKey added in v0.6.0

func TimestampConverterKey(config processor.Config) (processor.Interface, error)

TimestampConverterKey builds a processor which converts a timestamp in a field in the key into a different type. The supported types are:

  • "string"
  • "unix"
  • "time.Time".

Any combination of the supported types is possible. For example, it's possible to convert from a Unix timestamp to Go's time.Time or to convert from a string to a Unix timestamp.

The processor supports only structured data.

func TimestampConverterPayload added in v0.6.0

func TimestampConverterPayload(config processor.Config) (processor.Interface, error)

TimestampConverterPayload builds the same processor as TimestampConverterKey, except that it operates on the field Record.Payload.After.

func Unwrap added in v0.5.0

func Unwrap(config processor.Config) (processor.Interface, error)

func ValueToKey

func ValueToKey(config processor.Config) (processor.Interface, error)

ValueToKey builds a processor that replaces the record key with a new key formed from a subset of fields in the record value.

  • If Payload.After is structured, the created key will also be structured with a subset of fields.
  • If Payload.After is raw, return an error (not supported yet).

Types

type FuncWrapper added in v0.5.0

type FuncWrapper struct {
	// contains filtered or unexported fields
}

FuncWrapper is an adapter allowing use of a function as an Interface.

func NewFuncWrapper added in v0.5.0

func NewFuncWrapper(f func(context.Context, record.Record) (record.Record, error)) FuncWrapper

func (FuncWrapper) Close added in v0.6.0

func (f FuncWrapper) Close()

func (FuncWrapper) InspectIn added in v0.5.0

func (f FuncWrapper) InspectIn(ctx context.Context, id string) *inspector.Session

func (FuncWrapper) InspectOut added in v0.5.0

func (f FuncWrapper) InspectOut(ctx context.Context, id string) *inspector.Session

func (FuncWrapper) Process added in v0.5.0

func (f FuncWrapper) Process(ctx context.Context, inRec record.Record) (record.Record, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL