opencdc

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 11 Imported by: 118

Documentation

Index

Constants

View Source
const (
	// OpenCDCVersion is a constant that should be used as the value in the
	// metadata field MetadataVersion. It ensures the OpenCDC format version can
	// be easily identified in case the record gets marshaled into a different
	// untyped format (e.g. JSON).
	OpenCDCVersion = "v1"

	// MetadataOpenCDCVersion is a Record.Metadata key for the version of the
	// OpenCDC format (e.g. "v1"). This field exists to ensure the OpenCDC
	// format version can be easily identified in case the record gets marshaled
	// into a different untyped format (e.g. JSON).
	MetadataOpenCDCVersion = "opencdc.version"
	// MetadataCreatedAt is a Record.Metadata key for the time when the record
	// was created in the 3rd party system. The expected format is a unix
	// timestamp in nanoseconds.
	MetadataCreatedAt = "opencdc.createdAt"
	// MetadataReadAt is a Record.Metadata key for the time when the record was
	// read from the 3rd party system. The expected format is a unix timestamp
	// in nanoseconds.
	MetadataReadAt = "opencdc.readAt"
	// MetadataCollection is a Record.Metadata key for the name of the collection
	// where the record originated from and/or where it should be stored.
	MetadataCollection = "opencdc.collection"

	// MetadataKeySchemaSubject is a Record.Metadata key for the subject of the schema of
	// the record's .Key field.
	MetadataKeySchemaSubject = "opencdc.key.schema.subject"
	// MetadataKeySchemaVersion is a Record.Metadata key for the version of the schema of
	// the record's .Key field.
	MetadataKeySchemaVersion = "opencdc.key.schema.version"

	// MetadataPayloadSchemaSubject is a Record.Metadata key for the subject of the schema of
	// the record's .Payload field.
	MetadataPayloadSchemaSubject = "opencdc.payload.schema.subject"
	// MetadataPayloadSchemaVersion is a Record.Metadata key for the version of the schema of
	// the record's .Payload field.
	MetadataPayloadSchemaVersion = "opencdc.payload.schema.version"

	// MetadataConduitSourcePluginName is a Record.Metadata key for the name of
	// the source plugin that created this record.
	MetadataConduitSourcePluginName = "conduit.source.plugin.name"
	// MetadataConduitSourcePluginVersion is a Record.Metadata key for the
	// version of the source plugin that created this record.
	MetadataConduitSourcePluginVersion = "conduit.source.plugin.version"
	// MetadataConduitDestinationPluginName is a Record.Metadata key for the
	// name of the destination plugin that has written this record
	// (only available in records once they are written by a destination).
	MetadataConduitDestinationPluginName = "conduit.destination.plugin.name"
	// MetadataConduitDestinationPluginVersion is a Record.Metadata key for the
	// version of the destination plugin that has written this record
	// (only available in records once they are written by a destination).
	MetadataConduitDestinationPluginVersion = "conduit.destination.plugin.version"

	// MetadataConduitSourceConnectorID is a Record.Metadata key for the ID of
	// the source connector that produced this record.
	MetadataConduitSourceConnectorID = "conduit.source.connector.id"
	// MetadataConduitDLQNackError is a Record.Metadata key for the error that
	// caused a record to be nacked and pushed to the dead-letter queue.
	MetadataConduitDLQNackError = "conduit.dlq.nack.error"
	// MetadataConduitDLQNackNodeID is a Record.Metadata key for the ID of the
	// internal node that nacked the record.
	MetadataConduitDLQNackNodeID = "conduit.dlq.nack.node.id"
)

Variables

View Source
var (
	// ErrMetadataFieldNotFound is returned in metadata utility functions when a
	// metadata field is not found.
	ErrMetadataFieldNotFound = errors.New("metadata field not found")
	// ErrUnknownOperation is returned when trying to parse an Operation string
	// and encountering an unknown operation.
	ErrUnknownOperation = errors.New("unknown operation")
	// ErrInvalidProtoDataType is returned when trying to convert a proto data
	// type to raw or structured data.
	ErrInvalidProtoDataType = errors.New("invalid proto data type")
)

Functions

func WithJSONMarshalOptions

func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context

WithJSONMarshalOptions attaches JSONMarshalOptions to a context.

Types

type Change

type Change struct {
	// Before contains the data before the operation occurred. This field is
	// optional and should only be populated for operations OperationUpdate
	// OperationDelete (if the system supports fetching the data before the
	// operation).
	Before Data `json:"before"`
	// After contains the data after the operation occurred. This field should
	// be populated for all operations except OperationDelete.
	After Data `json:"after"`
}

func (*Change) FromProto

func (c *Change) FromProto(proto *opencdcv1.Change) error

FromProto takes data from the supplied proto object and populates the receiver. If the proto object is nil, the receiver is set to its zero value. If the function returns an error, the receiver could be partially populated.

func (Change) ToProto

func (c Change) ToProto(proto *opencdcv1.Change) error

ToProto takes data from the receiver and populates the supplied proto object. If the function returns an error, the proto object could be partially populated.

type Data

type Data interface {
	Bytes() []byte
	Clone() Data
	ToProto(*opencdcv1.Data) error
	// contains filtered or unexported methods
}

Data is a structure that contains some bytes. The only structs implementing Data are RawData and StructuredData.

type JSONMarshalOptions

type JSONMarshalOptions struct {
	// RawDataAsString is a flag that indicates if the RawData type should be
	// serialized as a string. If set to false, RawData will be serialized as a
	// base64 encoded string. If set to true, RawData will be serialized as a
	// string without conversion.
	RawDataAsString bool
}

JSONMarshalOptions can customize how a record is serialized to JSON. It can be attached to a context using WithJSONMarshalOptions and supplied to json.MarshalContext to customize the serialization behavior.

type JSONSerializer

type JSONSerializer JSONMarshalOptions

JSONSerializer is a RecordSerializer that serializes records to JSON using the configured options.

func (JSONSerializer) Serialize

func (s JSONSerializer) Serialize(r Record) ([]byte, error)

type Metadata

type Metadata map[string]string

func (Metadata) GetCollection added in v0.2.0

func (m Metadata) GetCollection() (string, error)

GetCollection returns the value for key MetadataCollection. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDLQNackError

func (m Metadata) GetConduitDLQNackError() (string, error)

GetConduitDLQNackError returns the value for key MetadataConduitDLQNackError. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDLQNackNodeID

func (m Metadata) GetConduitDLQNackNodeID() (string, error)

GetConduitDLQNackNodeID returns the value for key MetadataConduitDLQNackNodeID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDestinationPluginName

func (m Metadata) GetConduitDestinationPluginName() (string, error)

GetConduitDestinationPluginName returns the value for key MetadataConduitDestinationPluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitDestinationPluginVersion

func (m Metadata) GetConduitDestinationPluginVersion() (string, error)

GetConduitDestinationPluginVersion returns the value for key MetadataConduitDestinationPluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourceConnectorID

func (m Metadata) GetConduitSourceConnectorID() (string, error)

GetConduitSourceConnectorID returns the value for key MetadataConduitSourceConnectorID. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourcePluginName

func (m Metadata) GetConduitSourcePluginName() (string, error)

GetConduitSourcePluginName returns the value for key MetadataConduitSourcePluginName. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetConduitSourcePluginVersion

func (m Metadata) GetConduitSourcePluginVersion() (string, error)

GetConduitSourcePluginVersion returns the value for key MetadataConduitSourcePluginVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetCreatedAt

func (m Metadata) GetCreatedAt() (time.Time, error)

GetCreatedAt parses the value for key MetadataCreatedAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.

func (Metadata) GetKeySchemaSubject added in v0.3.0

func (m Metadata) GetKeySchemaSubject() (string, error)

GetKeySchemaSubject returns the value for key MetadataKeySchemaSubject. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetKeySchemaVersion added in v0.3.0

func (m Metadata) GetKeySchemaVersion() (int, error)

GetKeySchemaVersion returns the value for key MetadataKeySchemaVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetOpenCDCVersion

func (m Metadata) GetOpenCDCVersion() (string, error)

GetOpenCDCVersion returns the value for key MetadataOpenCDCVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetPayloadSchemaSubject added in v0.3.0

func (m Metadata) GetPayloadSchemaSubject() (string, error)

GetPayloadSchemaSubject returns the value for key MetadataPayloadSchemaSubject. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetPayloadSchemaVersion added in v0.3.0

func (m Metadata) GetPayloadSchemaVersion() (int, error)

GetPayloadSchemaVersion returns the value for key MetadataPayloadSchemaVersion. If the value does not exist or is empty the function returns ErrMetadataFieldNotFound.

func (Metadata) GetReadAt

func (m Metadata) GetReadAt() (time.Time, error)

GetReadAt parses the value for key MetadataReadAt as a unix timestamp. If the value does not exist or the value is empty the function returns ErrMetadataFieldNotFound. If the value is not a valid unix timestamp in nanoseconds the function returns an error.

func (Metadata) SetCollection added in v0.2.0

func (m Metadata) SetCollection(collection string)

SetCollection sets the metadata value for key MetadataCollection.

func (Metadata) SetConduitDLQNackError

func (m Metadata) SetConduitDLQNackError(err string)

SetConduitDLQNackError sets the metadata value for key MetadataConduitDLQNackError.

func (Metadata) SetConduitDLQNackNodeID

func (m Metadata) SetConduitDLQNackNodeID(id string)

SetConduitDLQNackNodeID sets the metadata value for key MetadataConduitDLQNackNodeID.

func (Metadata) SetConduitDestinationPluginName

func (m Metadata) SetConduitDestinationPluginName(name string)

SetConduitDestinationPluginName sets the metadata value for key MetadataConduitDestinationPluginName.

func (Metadata) SetConduitDestinationPluginVersion

func (m Metadata) SetConduitDestinationPluginVersion(version string)

SetConduitDestinationPluginVersion sets the metadata value for key MetadataConduitDestinationPluginVersion.

func (Metadata) SetConduitSourceConnectorID

func (m Metadata) SetConduitSourceConnectorID(id string)

SetConduitSourceConnectorID sets the metadata value for key MetadataConduitSourceConnectorID.

func (Metadata) SetConduitSourcePluginName

func (m Metadata) SetConduitSourcePluginName(name string)

SetConduitSourcePluginName sets the metadata value for key MetadataConduitSourcePluginName.

func (Metadata) SetConduitSourcePluginVersion

func (m Metadata) SetConduitSourcePluginVersion(version string)

SetConduitSourcePluginVersion sets the metadata value for key MetadataConduitSourcePluginVersion.

func (Metadata) SetCreatedAt

func (m Metadata) SetCreatedAt(createdAt time.Time)

SetCreatedAt sets the metadata value for key MetadataCreatedAt as a unix timestamp in nanoseconds.

func (Metadata) SetKeySchemaSubject added in v0.3.0

func (m Metadata) SetKeySchemaSubject(subject string)

SetKeySchemaSubject sets the metadata value for key MetadataKeySchemaSubject.

func (Metadata) SetKeySchemaVersion added in v0.3.0

func (m Metadata) SetKeySchemaVersion(version int)

SetKeySchemaVersion sets the metadata value for key MetadataKeySchemaVersion.

func (Metadata) SetOpenCDCVersion

func (m Metadata) SetOpenCDCVersion()

SetOpenCDCVersion sets the metadata value for key MetadataVersion to the current version of OpenCDC used.

func (Metadata) SetPayloadSchemaSubject added in v0.3.0

func (m Metadata) SetPayloadSchemaSubject(subject string)

SetPayloadSchemaSubject sets the metadata value for key MetadataPayloadSchemaSubject.

func (Metadata) SetPayloadSchemaVersion added in v0.3.0

func (m Metadata) SetPayloadSchemaVersion(version int)

SetPayloadSchemaVersion sets the metadata value for key MetadataPayloadSchemaVersion.

func (Metadata) SetReadAt

func (m Metadata) SetReadAt(createdAt time.Time)

SetReadAt sets the metadata value for key MetadataReadAt as a unix timestamp in nanoseconds.

type Operation

type Operation int

Operation defines what triggered the creation of a record.

const (
	OperationCreate   Operation = iota + 1 // create
	OperationUpdate                        // update
	OperationDelete                        // delete
	OperationSnapshot                      // snapshot
)

func (Operation) MarshalText

func (o Operation) MarshalText() ([]byte, error)

func (Operation) String

func (i Operation) String() string

func (*Operation) UnmarshalText

func (o *Operation) UnmarshalText(b []byte) error

type Position

type Position []byte

Position is a unique identifier for a record being processed. It's a Source's responsibility to choose and assign record positions, as they will be used by the Source in subsequent pipeline runs.

func (Position) String

func (p Position) String() string

String is used when displaying the position in logs.

type RawData

type RawData []byte

RawData contains unstructured data in form of a byte slice.

func (RawData) Bytes

func (d RawData) Bytes() []byte

func (RawData) Clone

func (d RawData) Clone() Data

func (RawData) MarshalJSON

func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error)

func (RawData) ToProto

func (d RawData) ToProto(proto *opencdcv1.Data) error

ToProto takes data from the receiver and populates the supplied proto object.

type Record

type Record struct {
	// Position uniquely represents the record.
	Position Position `json:"position"`
	// Operation defines what triggered the creation of a record. There are four
	// possibilities: create, update, delete or snapshot. The first three
	// operations are encountered during normal CDC operation, while "snapshot"
	// is meant to represent records during an initial load. Depending on the
	// operation, the record will contain either the payload before the change,
	// after the change, both or none (see field Payload).
	Operation Operation `json:"operation"`
	// Metadata contains additional information regarding the record.
	Metadata Metadata `json:"metadata"`

	// Key represents a value that should identify the entity (e.g. database
	// row).
	Key Data `json:"key"`
	// Payload holds the payload change (data before and after the operation
	// occurred).
	Payload Change `json:"payload"`
	// contains filtered or unexported fields
}

Record represents a single data record produced by a source and/or consumed by a destination connector. Record should be used as a value, not a pointer, except when (de)serializing the record. Note that methods related to (de)serializing the record mutate the record and are thus not thread-safe (see SetSerializer, FromProto and UnmarshalJSON).

func (Record) Bytes

func (r Record) Bytes() []byte

Bytes returns the serialized representation of the Record. By default, this function returns a JSON representation. The serialization logic can be changed using SetSerializer.

func (Record) Clone

func (r Record) Clone() Record

func (*Record) FromProto

func (r *Record) FromProto(proto *opencdcv1.Record) error

FromProto takes data from the supplied proto object and populates the receiver. If the proto object is nil, the receiver is set to its zero value. If the function returns an error, the receiver could be partially populated.

func (Record) Map

func (r Record) Map() map[string]interface{}

func (*Record) SetSerializer

func (r *Record) SetSerializer(serializer RecordSerializer)

SetSerializer sets the serializer used to encode the record into bytes. If serializer is nil, the serializing behavior is reset to the default (JSON). This method mutates the receiver and is not thread-safe.

func (Record) ToProto

func (r Record) ToProto(proto *opencdcv1.Record) error

ToProto takes data from the receiver and populates the supplied proto object. If the function returns an error, the proto object could be partially populated.

func (*Record) UnmarshalJSON

func (r *Record) UnmarshalJSON(b []byte) error

type RecordSerializer

type RecordSerializer interface {
	Serialize(Record) ([]byte, error)
}

RecordSerializer is a type that can serialize a record to bytes. It's used in destination connectors to change the output structure and format.

type StructuredData

type StructuredData map[string]interface{}

StructuredData contains data in form of a map with string keys and arbitrary values.

func (StructuredData) Bytes

func (d StructuredData) Bytes() []byte

func (StructuredData) Clone

func (d StructuredData) Clone() Data

func (StructuredData) ToProto

func (d StructuredData) ToProto(proto *opencdcv1.Data) error

ToProto takes data from the receiver and populates the supplied proto object.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL