Documentation ¶
Index ¶
- Constants
- Variables
- func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context
- type Change
- type Data
- type JSONMarshalOptions
- type JSONSerializer
- type Metadata
- func (m Metadata) GetCollection() (string, error)
- func (m Metadata) GetConduitDLQNackError() (string, error)
- func (m Metadata) GetConduitDLQNackNodeID() (string, error)
- func (m Metadata) GetConduitDestinationPluginName() (string, error)
- func (m Metadata) GetConduitDestinationPluginVersion() (string, error)
- func (m Metadata) GetConduitSourceConnectorID() (string, error)
- func (m Metadata) GetConduitSourcePluginName() (string, error)
- func (m Metadata) GetConduitSourcePluginVersion() (string, error)
- func (m Metadata) GetCreatedAt() (time.Time, error)
- func (m Metadata) GetKeySchemaSubject() (string, error)
- func (m Metadata) GetKeySchemaVersion() (int, error)
- func (m Metadata) GetOpenCDCVersion() (string, error)
- func (m Metadata) GetPayloadSchemaSubject() (string, error)
- func (m Metadata) GetPayloadSchemaVersion() (int, error)
- func (m Metadata) GetReadAt() (time.Time, error)
- func (m Metadata) SetCollection(collection string)
- func (m Metadata) SetConduitDLQNackError(err string)
- func (m Metadata) SetConduitDLQNackNodeID(id string)
- func (m Metadata) SetConduitDestinationPluginName(name string)
- func (m Metadata) SetConduitDestinationPluginVersion(version string)
- func (m Metadata) SetConduitSourceConnectorID(id string)
- func (m Metadata) SetConduitSourcePluginName(name string)
- func (m Metadata) SetConduitSourcePluginVersion(version string)
- func (m Metadata) SetCreatedAt(createdAt time.Time)
- func (m Metadata) SetKeySchemaSubject(subject string)
- func (m Metadata) SetKeySchemaVersion(version int)
- func (m Metadata) SetOpenCDCVersion()
- func (m Metadata) SetPayloadSchemaSubject(subject string)
- func (m Metadata) SetPayloadSchemaVersion(version int)
- func (m Metadata) SetReadAt(createdAt time.Time)
- type Operation
- type Position
- type RawData
- type Record
- func (r Record) Bytes() []byte
- func (r Record) Clone() Record
- func (r *Record) FromProto(proto *opencdcv1.Record) error
- func (r Record) Map() map[string]interface{}
- func (r *Record) SetSerializer(serializer RecordSerializer)
- func (r Record) ToProto(proto *opencdcv1.Record) error
- func (r *Record) UnmarshalJSON(b []byte) error
- type RecordSerializer
- type StructuredData
Constants ¶
const ( // OpenCDCVersion is a constant that should be used as the value in the // metadata field MetadataVersion. It ensures the OpenCDC format version can // be easily identified in case the record gets marshaled into a different // untyped format (e.g. JSON). OpenCDCVersion = "v1" // MetadataOpenCDCVersion is a Record.Metadata key for the version of the // OpenCDC format (e.g. "v1"). This field exists to ensure the OpenCDC // format version can be easily identified in case the record gets marshaled // into a different untyped format (e.g. JSON). MetadataOpenCDCVersion = "opencdc.version" // MetadataCreatedAt is a Record.Metadata key for the time when the record // was created in the 3rd party system. The expected format is a unix // timestamp in nanoseconds. MetadataCreatedAt = "opencdc.createdAt" // MetadataReadAt is a Record.Metadata key for the time when the record was // read from the 3rd party system. The expected format is a unix timestamp // in nanoseconds. MetadataReadAt = "opencdc.readAt" // MetadataCollection is a Record.Metadata key for the name of the collection // where the record originated from and/or where it should be stored. MetadataCollection = "opencdc.collection" // MetadataKeySchemaSubject is a Record.Metadata key for the subject of the schema of // the record's .Key field. MetadataKeySchemaSubject = "opencdc.key.schema.subject" // MetadataKeySchemaVersion is a Record.Metadata key for the version of the schema of // the record's .Key field. MetadataKeySchemaVersion = "opencdc.key.schema.version" // MetadataPayloadSchemaSubject is a Record.Metadata key for the subject of the schema of // the record's .Payload field. MetadataPayloadSchemaSubject = "opencdc.payload.schema.subject" // MetadataPayloadSchemaVersion is a Record.Metadata key for the version of the schema of // the record's .Payload field. MetadataPayloadSchemaVersion = "opencdc.payload.schema.version" // MetadataConduitSourcePluginName is a Record.Metadata key for the name of // the source plugin that created this record. MetadataConduitSourcePluginName = "conduit.source.plugin.name" // MetadataConduitSourcePluginVersion is a Record.Metadata key for the // version of the source plugin that created this record. MetadataConduitSourcePluginVersion = "conduit.source.plugin.version" // MetadataConduitDestinationPluginName is a Record.Metadata key for the // name of the destination plugin that has written this record // (only available in records once they are written by a destination). MetadataConduitDestinationPluginName = "conduit.destination.plugin.name" // MetadataConduitDestinationPluginVersion is a Record.Metadata key for the // version of the destination plugin that has written this record // (only available in records once they are written by a destination). MetadataConduitDestinationPluginVersion = "conduit.destination.plugin.version" // MetadataConduitSourceConnectorID is a Record.Metadata key for the ID of // the source connector that produced this record. MetadataConduitSourceConnectorID = "conduit.source.connector.id" // MetadataConduitDLQNackError is a Record.Metadata key for the error that // caused a record to be nacked and pushed to the dead-letter queue. MetadataConduitDLQNackError = "conduit.dlq.nack.error" // MetadataConduitDLQNackNodeID is a Record.Metadata key for the ID of the // internal node that nacked the record. MetadataConduitDLQNackNodeID = "conduit.dlq.nack.node.id" )
Variables ¶
var ( // ErrMetadataFieldNotFound is returned in metadata utility functions when a // metadata field is not found. ErrMetadataFieldNotFound = errors.New("metadata field not found") // ErrUnknownOperation is returned when trying to parse an Operation string // and encountering an unknown operation. ErrUnknownOperation = errors.New("unknown operation") // ErrInvalidProtoDataType is returned when trying to convert a proto data // type to raw or structured data. ErrInvalidProtoDataType = errors.New("invalid proto data type") )
Functions ¶
func WithJSONMarshalOptions ¶
func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context
WithJSONMarshalOptions attaches JSONMarshalOptions to a context.
Types ¶
type Change ¶
type Change struct { // Before contains the data before the operation occurred. This field is // optional and should only be populated for operations OperationUpdate // OperationDelete (if the system supports fetching the data before the // operation). Before Data `json:"before"` // After contains the data after the operation occurred. This field should // be populated for all operations except OperationDelete. After Data `json:"after"` }
type Data ¶
type Data interface { Bytes() []byte Clone() Data ToProto(*opencdcv1.Data) error // contains filtered or unexported methods }
Data is a structure that contains some bytes. The only structs implementing Data are RawData and StructuredData.
type JSONMarshalOptions ¶
type JSONMarshalOptions struct { // RawDataAsString is a flag that indicates if the RawData type should be // serialized as a string. If set to false, RawData will be serialized as a // base64 encoded string. If set to true, RawData will be serialized as a // string without conversion. RawDataAsString bool }
JSONMarshalOptions can customize how a record is serialized to JSON. It can be attached to a context using WithJSONMarshalOptions and supplied to json.MarshalContext to customize the serialization behavior.
type JSONSerializer ¶
type JSONSerializer JSONMarshalOptions
JSONSerializer is a RecordSerializer that serializes records to JSON using the configured options.
type Metadata ¶
func (Metadata) GetCollection ¶ added in v0.2.0
GetCollection returns the value for key MetadataCollection. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDLQNackError ¶
GetConduitDLQNackError returns the value for key MetadataConduitDLQNackError. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDLQNackNodeID ¶
GetConduitDLQNackNodeID returns the value for key MetadataConduitDLQNackNodeID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDestinationPluginName ¶
GetConduitDestinationPluginName returns the value for key MetadataConduitDestinationPluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitDestinationPluginVersion ¶
GetConduitDestinationPluginVersion returns the value for key MetadataConduitDestinationPluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitSourceConnectorID ¶
GetConduitSourceConnectorID returns the value for key MetadataConduitSourceConnectorID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitSourcePluginName ¶
GetConduitSourcePluginName returns the value for key MetadataConduitSourcePluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetConduitSourcePluginVersion ¶
GetConduitSourcePluginVersion returns the value for key MetadataConduitSourcePluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetCreatedAt ¶
GetCreatedAt parses the value for key MetadataCreatedAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.
func (Metadata) GetKeySchemaSubject ¶ added in v0.3.0
GetKeySchemaSubject returns the value for key MetadataKeySchemaSubject. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetKeySchemaVersion ¶ added in v0.3.0
GetKeySchemaVersion returns the value for key MetadataKeySchemaVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetOpenCDCVersion ¶
GetOpenCDCVersion returns the value for key MetadataOpenCDCVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetPayloadSchemaSubject ¶ added in v0.3.0
GetPayloadSchemaSubject returns the value for key MetadataPayloadSchemaSubject. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetPayloadSchemaVersion ¶ added in v0.3.0
GetPayloadSchemaVersion returns the value for key MetadataPayloadSchemaVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.
func (Metadata) GetReadAt ¶
GetReadAt parses the value for key MetadataReadAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.
func (Metadata) SetCollection ¶ added in v0.2.0
SetCollection sets the metadata value for key MetadataCollection.
func (Metadata) SetConduitDLQNackError ¶
SetConduitDLQNackError sets the metadata value for key MetadataConduitDLQNackError.
func (Metadata) SetConduitDLQNackNodeID ¶
SetConduitDLQNackNodeID sets the metadata value for key MetadataConduitDLQNackNodeID.
func (Metadata) SetConduitDestinationPluginName ¶
SetConduitDestinationPluginName sets the metadata value for key MetadataConduitDestinationPluginName.
func (Metadata) SetConduitDestinationPluginVersion ¶
SetConduitDestinationPluginVersion sets the metadata value for key MetadataConduitDestinationPluginVersion.
func (Metadata) SetConduitSourceConnectorID ¶
SetConduitSourceConnectorID sets the metadata value for key MetadataConduitSourceConnectorID.
func (Metadata) SetConduitSourcePluginName ¶
SetConduitSourcePluginName sets the metadata value for key MetadataConduitSourcePluginName.
func (Metadata) SetConduitSourcePluginVersion ¶
SetConduitSourcePluginVersion sets the metadata value for key MetadataConduitSourcePluginVersion.
func (Metadata) SetCreatedAt ¶
SetCreatedAt sets the metadata value for key MetadataCreatedAt as a unix timestamp in nanoseconds.
func (Metadata) SetKeySchemaSubject ¶ added in v0.3.0
SetKeySchemaSubject sets the metadata value for key MetadataKeySchemaSubject.
func (Metadata) SetKeySchemaVersion ¶ added in v0.3.0
SetKeySchemaVersion sets the metadata value for key MetadataKeySchemaVersion.
func (Metadata) SetOpenCDCVersion ¶
func (m Metadata) SetOpenCDCVersion()
SetOpenCDCVersion sets the metadata value for key MetadataVersion to the current version of OpenCDC used.
func (Metadata) SetPayloadSchemaSubject ¶ added in v0.3.0
SetPayloadSchemaSubject sets the metadata value for key MetadataPayloadSchemaSubject.
func (Metadata) SetPayloadSchemaVersion ¶ added in v0.3.0
SetPayloadSchemaVersion sets the metadata value for key MetadataPayloadSchemaVersion.
type Operation ¶
type Operation int
Operation defines what triggered the creation of a record.
func (Operation) MarshalText ¶
func (*Operation) UnmarshalText ¶
type Position ¶
type Position []byte
Position is a unique identifier for a record being processed. It's a Source's responsibility to choose and assign record positions, as they will be used by the Source in subsequent pipeline runs.
type Record ¶
type Record struct { // Position uniquely represents the record. Position Position `json:"position"` // Operation defines what triggered the creation of a record. There are four // possibilities: create, update, delete or snapshot. The first three // operations are encountered during normal CDC operation, while "snapshot" // is meant to represent records during an initial load. Depending on the // operation, the record will contain either the payload before the change, // after the change, both or none (see field Payload). Operation Operation `json:"operation"` // Metadata contains additional information regarding the record. Metadata Metadata `json:"metadata"` // Key represents a value that should identify the entity (e.g. database // row). Key Data `json:"key"` // Payload holds the payload change (data before and after the operation // occurred). Payload Change `json:"payload"` // contains filtered or unexported fields }
Record represents a single data record produced by a source and/or consumed by a destination connector. Record should be used as a value, not a pointer, except when (de)serializing the record. Note that methods related to (de)serializing the record mutate the record and are thus not thread-safe (see SetSerializer, FromProto and UnmarshalJSON).
func (Record) Bytes ¶
Bytes returns the serialized representation of the Record. By default, this function returns a JSON representation. The serialization logic can be changed using SetSerializer.
func (*Record) FromProto ¶
FromProto takes data from the supplied proto object and populates the receiver. If the proto object is nil, the receiver is set to its zero value. If the function returns an error, the receiver could be partially populated.
func (*Record) SetSerializer ¶
func (r *Record) SetSerializer(serializer RecordSerializer)
SetSerializer sets the serializer used to encode the record into bytes. If serializer is nil, the serializing behavior is reset to the default (JSON). This method mutates the receiver and is not thread-safe.
func (Record) ToProto ¶
ToProto takes data from the receiver and populates the supplied proto object. If the function returns an error, the proto object could be partially populated.
func (*Record) UnmarshalJSON ¶
type RecordSerializer ¶
RecordSerializer is a type that can serialize a record to bytes. It's used in destination connectors to change the output structure and format.
type StructuredData ¶
type StructuredData map[string]interface{}
StructuredData contains data in form of a map with string keys and arbitrary values.
func (StructuredData) Bytes ¶
func (d StructuredData) Bytes() []byte
func (StructuredData) Clone ¶
func (d StructuredData) Clone() Data