Documentation ¶
Index ¶
- Constants
- func ExtractFieldKey(config processor.Config) (processor.Interface, error)
- func ExtractFieldPayload(config processor.Config) (processor.Interface, error)
- func FilterFieldKey(config processor.Config) (processor.Interface, error)
- func FilterFieldPayload(config processor.Config) (processor.Interface, error)
- func HTTPRequest(config processor.Config) (processor.Interface, error)
- func HoistFieldKey(config processor.Config) (processor.Interface, error)
- func HoistFieldPayload(config processor.Config) (processor.Interface, error)
- func InsertFieldKey(config processor.Config) (processor.Interface, error)
- func InsertFieldPayload(config processor.Config) (processor.Interface, error)
- func MaskFieldKey(config processor.Config) (processor.Interface, error)
- func MaskFieldPayload(config processor.Config) (processor.Interface, error)
- func ParseJSONKey(config processor.Config) (processor.Interface, error)
- func ParseJSONPayload(config processor.Config) (processor.Interface, error)
- func ReplaceFieldKey(config processor.Config) (processor.Interface, error)
- func ReplaceFieldPayload(config processor.Config) (processor.Interface, error)
- func TimestampConvertorKey(config processor.Config) (processor.Interface, error)
- func TimestampConvertorPayload(config processor.Config) (processor.Interface, error)
- func Unwrap(config processor.Config) (processor.Interface, error)
- func ValueToKey(config processor.Config) (processor.Interface, error)
- type FuncWrapper
Constants ¶
const ( FormatDebezium = "debezium" FormatKafkaConnect = "kafka-connect" )
Variables ¶
This section is empty.
Functions ¶
func ExtractFieldKey ¶
ExtractFieldKey builds the following processor:
- If the key is raw and has a schema attached, extract the field and use it to replace the entire key.
- If the key is raw and has no schema, return an error (not supported).
- If the key is structured, extract the field and use it to replace the entire key.
func ExtractFieldPayload ¶
ExtractFieldPayload builds the same processor as ExtractFieldKey, except that it operates on the field Record.Payload.After.
func FilterFieldKey ¶
FilterFieldKey builds a processor with the following config fields:
- `type` sets the behavior to "include" or "exclude" the record based on the result of the condition.
- `condition` is an XPath query expression that the user defines to forward or drop a record on its results.
- `missingornull` defines how to handle the record in the event the fields the query would use don't exist.
- `exists` field in the config gives the user a chance to define an existence query for a given filter.
If `condition` passes, then it will immediately handle the record as `type` dictates. If `condition` doesn't match and `exists` matches nothing, then it will handle the record as `missingornull` specifies.
Example processor config with noted possible values:
{ "type": "include", // [include, exclude] "condition":"<xpath expression>", "exists": "<xpath expression>", "missingornull": "fail" // [fail, include, exclude] }
func FilterFieldPayload ¶
FilterFieldPayload builds the same processor as FilterFieldKey, except that it operates on the field Record.Payload.After.
func HTTPRequest ¶
HTTPRequest builds a processor that sends an HTTP request to the specified URL with the specified HTTP method (default is POST). Record.Payload.After is used as the request body and the raw response body overwrites the field.
func HoistFieldKey ¶
HoistFieldKey builds the following processor:
- If the key is raw and has a schema attached, wrap it using the specified field name in a struct.
- If the key is raw and has no schema, transforms it into structured data by creating a map with the hoisted field and raw data as the value.
- If the key is structured, wrap it using the specified field name in a map.
func HoistFieldPayload ¶
HoistFieldPayload builds the same processor as HoistFieldKey, except that it operates on the field Record.Payload.After.
func InsertFieldKey ¶
InsertFieldKey builds the following processor:
- If the key is raw and has a schema attached, insert the field(s) in the key data.
- If the key is raw and has no schema, return an error (not supported).
- If the key is structured, set the field(s) in the key data.
func InsertFieldPayload ¶
InsertFieldPayload builds the same processor as InsertFieldKey, except that it operates on the field Record.Payload.After.
func MaskFieldKey ¶
MaskFieldKey builds the following processor:
- If the key is raw and has a schema attached, replace the field with the zero value of the fields type.
- If the key is raw and has no schema, return an error (not supported).
- If the key is structured, replace the field with the zero value of the fields type.
func MaskFieldPayload ¶
MaskFieldPayload builds the same processor as MaskFieldKey, except that it operates on the field Record.Payload.After.
func ParseJSONKey ¶ added in v0.5.0
ParseJSONKey parses the record key from raw to structured data
func ParseJSONPayload ¶ added in v0.5.0
ParseJSONPayload parses the record payload from raw to structured data
func ReplaceFieldKey ¶
ReplaceFieldKey builds a processor which replaces a field in the key in raw data with a schema or in structured data. Raw data without a schema is not supported. The processor can be controlled by 3 variables:
- "exclude" - is a comma separated list of fields that should be excluded from the processed record ("exclude" takes precedence over "include").
- "include" - is a comma separated list of fields that should be included in the processed record.
- "rename" - is a comma separated list of pairs separated by colons, that controls the mapping of old field names to new field names.
If "include" is not configured or is empty then all fields in the record will be included by default (except if they are configured in "exclude"). If "include" is not empty, then all fields are excluded by default and only fields in "include" will be added to the processed record.
func ReplaceFieldPayload ¶
ReplaceFieldPayload builds the same processor as ReplaceFieldKey, except that it operates on the field Record.Payload.After.
func TimestampConvertorKey ¶
TimestampConvertorKey todo
func TimestampConvertorPayload ¶
TimestampConvertorPayload todo
func ValueToKey ¶
ValueToKey builds a processor that replaces the record key with a new key formed from a subset of fields in the record value.
- If Payload.After is raw and has a schema attached, the created key will also have a schema with a subset of fields.
- If Payload.After is structured, the created key will also be structured with a subset of fields.
- If payload.After is raw and has no schema, return an error.
Types ¶
type FuncWrapper ¶ added in v0.5.0
type FuncWrapper struct {
// contains filtered or unexported fields
}
FuncWrapper is an adapter allowing use of a function as an Interface.
func NewFuncWrapper ¶ added in v0.5.0
func (FuncWrapper) Close ¶ added in v0.6.0
func (f FuncWrapper) Close()
func (FuncWrapper) InspectIn ¶ added in v0.5.0
func (f FuncWrapper) InspectIn(ctx context.Context) *inspector.Session
func (FuncWrapper) InspectOut ¶ added in v0.5.0
func (f FuncWrapper) InspectOut(ctx context.Context) *inspector.Session