sdk

package module
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 39 Imported by: 323

README

Conduit Connector SDK

License Test Go Report Card Go Reference

This repository contains the Go software development kit for implementing a connector for Conduit. If you want to implement a connector in another language please have a look at the connector protocol.

Quickstart

Create a new folder and initialize a fresh go module:

go mod init example.com/conduit-connector-demo

Add the connector SDK dependency:

go get github.com/conduitio/conduit-connector-sdk

With this you can start implementing the connector. To implement a source (a connector that reads from a 3rd party resource and sends data to Conduit) create a struct that implements sdk.Source. To implement a destination (a connector that receives data from Conduit and writes it to a 3rd party resource) create a struct that implements sdk.Destination. You can implement both to make a connector that can be used both as a source or a destination.

Apart from the source and/or destination you should create a global variable of type sdk.Connector that contains references to constructors for sdk.Source, sdk.Destination and sdk.Specification.

The last part is the entrypoint, it needs to call sdk.Serve and pass in the connector mentioned above.

package main

import (
	demo "example.com/conduit-connector-demo"
	sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
	sdk.Serve(demo.Connector)
}

Now you can build the standalone connector:

go build path/to/main.go

You will get a compiled binary which Conduit can use as a connector. To run your connector as part of a Conduit pipeline you can create it using the connectors API and specify the path to the compiled connector binary in the field plugin.

Here is an example request to POST /v1/connectors (find more about the Conduit API):

{
  "type": "TYPE_SOURCE",
  "plugin": "/path/to/compiled/connector/binary",
  "pipelineId": "...",
  "config": {
    "name": "my-connector",
    "settings": {
      "my-key": "my-value"
    }
  }
}

Find out more information on building a connector in the Go doc reference.

FAQ

Q: How to identify the source from which a record originated?

A connector can use whatever means available to associate a record with the originating source. However, to promote compatibility between connectors, we highly recommend that a record's metadata is used to indicate from which collection[^1] a record originated.

The metadata key to be used is opencdc.collection, which can be accessed through the sdk.MetadataCollection constant.

For example, if a record was read from a database table called employees, it should have the following in its metadata:

{
  "opencdc.collection": "employees",
  // other metadata
}

Additionally, Conduit automatically adds the following metadata to each record:

  • conduit.source.plugin.name: the source plugin that created a record
  • conduit.source.plugin.version: version of the source plugin that created this record

More information about metadata in OpenCDC records can be found here.

Q: If a destination connector is able to write to multiple tables (topics, collections, indexes, etc.), how should a record be routed to the correct destination?

Similarly to above, we recommend that the metadata key "opencdc.collection" is used.

For example, if a record has the metadata field "opencdc.collection" set to employees, then the PostgreSQL destination connector will write it to the employees table.

Q: Is there a standard format for errors?

Conduit doesn't expect any specific error format. We still encourage developers to follow the conventional error message formatting and include enough contextual information to make debugging as easy as possible (e.g. stack trace, information about the value that caused the error, internal state).

Q: Is there a standard format for logging?

Developers should use sdk.Logger to retrieve a *zerolog.Logger instance. It can be used to emit structured and leveled log messages that will be included in Conduit logs.

Keep in mind that logging in the hot path (e.g. reading or writing a record) can have a negative impact on performance and should be avoided. If you really want to add a log message in the hot path please use the "trace" level.

Q: How do I enable logging in my tests?

By default, logging calls made using the sdk.Logger in your tests will not produce any output. To enable logging while running your connector tests or debugging, you need to pass a custom context with a zerolog logger attached:

func TestFoo(t *testing.T) {
	logger := zerolog.New(zerolog.NewTestWriter(t))
	ctx := logger.WithContext(context.Background())

	// pass ctx to connector functions ...
}

Q: Do I need to worry about ordering?

In case of the destination connector you do not have to worry about ordering. Conduit will supply records one by one in the order they were produced in the source.

On the other hand, the source connector is in charge of producing records and thus dictates the order. That said, you do not have to worry about concurrent reads, the SDK will call Source.Read repeatedly and only in one goroutine, all you have to do is return one record at a time.

Examples

For examples of simple connectors you can look at existing connectors like conduit-connector-generator or conduit-connector-file.

[^1]: Collection is a generic term used in Conduit to describe an entity in a 3rd party system from which records are read from or to which records they are written to. Examples are: topics (in Kafka), tables (in a database), indexes (in a search engine), and collections (in NoSQL databases).

Documentation

Overview

Package sdk implements utilities for implementing a Conduit connector.

Getting started

Conduit connectors can be thought of as the edges of a Conduit pipeline. They are responsible for reading records from and writing records to third party systems. Conduit uses connectors as plugins that hide the intricacies of working with a particular third party system, so that Conduit itself can focus on efficiently processing records and moving them safely from sources to destinations.

To implement a connector, start by defining a global variable of type Connector, preferably in connector.go at the root of your project to make it easy to discover.

var Connector = sdk.Connector {
    NewSpecification: Specification,  // Specification is my connector's specification
    NewSource:        NewSource,      // NewSource is the constructor for my source
    NewDestination:   NewDestination, // NewDestination is the constructor for my destination
}

Connector will be used as the starting point for accessing three main connector components that you need to provide:

  • Specification contains general information about the plugin like its name and what it does. Writing a specification is relatively simple and straightforward, for more info check the corresponding field docs of Specification.
  • Source is the connector part that knows how to fetch data from the third party system and convert it to a [Record].
  • Destination is the connector part that knows how to write a [Record] to the third party system.

General advice for implementing connectors:

  • The SDK provides a structured logger that can be retrieved with Logger. It allows you to create structured and leveled output that will be included as part of the Conduit logs.
  • If you want to add logging to the hot path (i.e. code that is executed for every record that is read or written) you should use the log level "trace", otherwise it can greatly impact the performance of your connector.

Source

A Source is responsible for continuously reading data from a third party system and returning it in form of a [Record].

Every Source implementation needs to include an UnimplementedSource to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Source implementations.

type Source struct {
  sdk.UnimplementedSource
}

You need to implement the functions required by Source and provide your own implementations. Please look at the documentation of Source for further information about individual functions.

You should also create a constructor function for your source struct. Note that this is the same function that should be set as the value of Connector.NewSource. The constructor should be used to wrap your source in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.

func NewSource() sdk.Source {
  return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...)
}

Additional tips for implementing a source:

  • The SDK provides utilities for certain operations like creating records in SourceUtil. You can access it through the global variable Util.Source.
  • The function Source.Ack is optional and does not have to be implemented.
  • Source is responsible for creating record positions that should ideally uniquely identify a record. Think carefully about what you will store in the position, it should give the source enough information to resume reading records at that specific position.
  • The SDK provides acceptance tests, if your source doesn't pass it means your implementation has a bug¹.

Destination

A Destination is responsible for writing [Record] to third party systems.

Every Destination implementation needs to include an UnimplementedDestination to satisfy the interface. This allows us to potentially change the interface in the future while remaining backwards compatible with existing Destination implementations.

type Destination struct {
  sdk.UnimplementedSource
}

You need to implement the functions required by Destination and provide your own implementations. Please look at the documentation of Destination for further information about individual functions.

You should also create a constructor function for your destination struct. Note that this is the same function that should be set as the value of Connector.NewDestination. The constructor should be used to wrap your destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.

func NewDestination() sdk.Destination {
  return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

Additional tips for implementing a destination:

  • The SDK provides utilities for certain operations like routing records based on their operation in DestinationUtil. You can access it through the global variable Util.Destination.
  • If your destination writes records as a whole to the destination you should use [Record].Bytes to get the raw record representation.
  • If possible, make your destination writes idempotent. It is possible that the destination will receive the same record twice after a pipeline restart.
  • Some sources won't be able to distinguish create and update operations. In case your destination is updating data in place, we recommend to upsert the record on a create or update.
  • The SDK provides acceptance tests, if your destination doesn't pass it means your implementation has a bug¹.

Acceptance tests

The SDK provides acceptance tests that can be run in a simple Go test.¹

To run acceptance tests you should create a test file, preferably named acceptance_test.go at the root of your project to make it easy to discover. Inside create a Go test where you trigger the function AcceptanceTest.

func TestAcceptance(t *testing.T) {
  // set up dependencies here
  sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{
    Config: sdk.ConfigurableAcceptanceTestDriverConfig{
      Connector: Connector, // Connector is the global variable from your connector
      SourceConfig: map[string]string{ … },
      DestinationConfig: map[string]string{ … },
    },
  }
}

AcceptanceTest uses the AcceptanceTestDriver for certain operations. The SDK already provides a default implementation for the driver with ConfigurableAcceptanceTestDriver, although you can supply your own implementation if you need to adjust the behavior of acceptance tests for your connector.

Some acceptance tests will try to write data using the destination and then read the same data using the source. Because of that you need to make sure that the configurations point both to the same exact data store (e.g. in case of the file connector the source and destination need to read and write to the same file).

If your connector does not implement both sides of the connector (a source and a destination) you will need to write a custom driver that knows how to read or write, depending on which side of the connector is not implemented. Here is an example how to do that:

type CustomAcceptanceTestDriver struct {
  sdk.ConfigurableAcceptanceTestDriver
}
func (d *CustomAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record {
  // implement read
}
func (d *CustomAcceptanceTestDriver) WriteToSource(t *testing.T, records []opencdc.Record) []opencdc.Record {
  // implement write
}

For more information about what behavior can be customized please refer to the AcceptanceTestDriver interface.

¹Acceptance tests are currently still experimental.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrBackoffRetry can be returned by Source.Read to signal the SDK there is
	// no record to fetch right now and it should try again later.
	ErrBackoffRetry = errors.New("backoff retry")

	// ErrUnimplemented is returned in functions of plugins that don't implement
	// a certain method.
	ErrUnimplemented = errors.New("the connector plugin does not implement this action, please check the source code of the connector and make sure all required connector methods are implemented")
)
View Source
var Util = struct {
	// SourceUtil provides utility methods for implementing a source.
	Source SourceUtil
	// SourceUtil provides utility methods for implementing a destination.
	Destination DestinationUtil

	// ParseConfig sanitizes the configuration, applies defaults, validates it and
	// copies the values into the target object. It combines the functionality
	// provided by github.com/conduitio/conduit-commons/config.Config into a single
	// convenient function. It is intended to be used in the Configure method of a
	// connector to parse the configuration map.
	//
	// The configuration parameters should be provided through NewSource().Parameters()
	// or NewDestination().Parameters() so that parameters from SDK middlewares are
	// included too.
	//
	// The function does the following:
	//   - Removes leading and trailing spaces from all keys and values in the
	//     configuration.
	//   - Applies the default values defined in the parameter specifications to the
	//     configuration.
	//   - Validates the configuration by checking for unrecognized parameters, type
	//     validations, and value validations.
	//   - Copies configuration values into the target object. The target object must
	//     be a pointer to a struct.
	ParseConfig func(ctx context.Context, cfg config.Config, target any, params config.Parameters) error
}{
	ParseConfig: parseConfig,
}

Util provides utilities for implementing connectors.

Functions

func AcceptanceTest added in v0.3.0

func AcceptanceTest(t *testing.T, driver AcceptanceTestDriver)

AcceptanceTest is the acceptance test that all connector implementations should pass. It should manually be called from a test case in each implementation:

func TestAcceptance(t *testing.T) {
    // set up test dependencies ...
    sdk.AcceptanceTest(t, sdk.ConfigurableAcceptanceTestDriver{
        Config: sdk.ConfigurableAcceptanceTestDriverConfig{
            Connector: myConnector,
            SourceConfig: config.Config{...},      // valid source config
            DestinationConfig: config.Config{...}, // valid destination config
        },
    })
}

func BenchmarkSource added in v0.9.0

func BenchmarkSource(
	b *testing.B,
	s Source,
	cfg map[string]string,
)

BenchmarkSource is a benchmark that any source implementation can run to figure out its performance. The benchmark expects that the source resource contains at least b.N number of records. This should be prepared before the benchmark is executed. The function should be manually called from a benchmark function:

func BenchmarkConnector(b *testing.B) {
    // set up test dependencies and write b.N records to source resource ...
    sdk.BenchmarkSource(
        b,
        mySourceConnector,
        map[string]string{...}, // valid source config
    )
}

The benchmark can be run with a specific number of records by supplying the option -benchtime=Nx, where N is the number of records to be benchmarked (e.g. -benchtime=100x benchmarks reading 100 records).

func ConnectorIDFromContext added in v0.10.1

func ConnectorIDFromContext(ctx context.Context) string

ConnectorIDFromContext fetches the connector ID from the context. If the context does not contain a connector ID it returns an empty string.

func Logger

func Logger(ctx context.Context) *zerolog.Logger

Logger returns an instance of a logger that can be used for leveled and structured logging in a plugin. The logger will respect the log level configured in Conduit.

func NewDestinationPlugin

func NewDestinationPlugin(impl Destination, cfg pconnector.PluginConfig) pconnector.DestinationPlugin

NewDestinationPlugin takes a Destination and wraps it into an adapter that converts it into a pconnector.DestinationPlugin. If the parameter is nil it will wrap UnimplementedDestination instead.

func NewSourcePlugin

func NewSourcePlugin(impl Source, cfg pconnector.PluginConfig) pconnector.SourcePlugin

NewSourcePlugin takes a Source and wraps it into an adapter that converts it into a pconnector.SourcePlugin. If the parameter is nil it will wrap UnimplementedSource instead.

func NewSpecifierPlugin

func NewSpecifierPlugin(specs Specification, source Source, dest Destination) pconnector.SpecifierPlugin

NewSpecifierPlugin takes a Specification and wraps it into an adapter that converts it into a pconnector.SpecifierPlugin.

func Serve

func Serve(c Connector)

Serve starts the plugin and takes care of its whole lifecycle by blocking until the plugin can safely stop running. Any fixable errors will be output to os.Stderr and the process will exit with a status code of 1. Serve will panic for unexpected conditions where a user's fix is unknown.

It is essential that nothing gets written to stdout or stderr before this function is called, as the first output is used to perform the initial handshake.

Plugins should call Serve in their main() functions.

Types

type AcceptanceTestDriver added in v0.3.0

type AcceptanceTestDriver interface {
	// Context returns the context to use in tests.
	Context() context.Context
	// Connector is the connector to be tested.
	Connector() Connector

	// SourceConfig should be a valid config for a source connector, reading
	// from the same location as the destination will write to.
	SourceConfig(*testing.T) config.Config
	// DestinationConfig should be a valid config for a destination connector,
	// writing to the same location as the source will read from.
	DestinationConfig(*testing.T) config.Config

	// BeforeTest is executed before each acceptance test.
	BeforeTest(*testing.T)
	// AfterTest is executed after each acceptance test.
	AfterTest(*testing.T)

	// GoleakOptions will be applied to goleak.VerifyNone. Can be used to
	// suppress false positive goroutine leaks.
	GoleakOptions(*testing.T) []goleak.Option

	// GenerateRecord will generate a new Record for a certain Operation. It's
	// the responsibility of the AcceptanceTestDriver implementation to provide
	// records with appropriate contents (e.g. appropriate type of payload).
	// The generated record will contain mixed data types in the field Key and
	// Payload (i.e. RawData and StructuredData), unless configured otherwise
	// (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
	GenerateRecord(*testing.T, opencdc.Operation) opencdc.Record

	// WriteToSource receives a slice of records that should be prepared in the
	// 3rd party system so that the source will read them. The returned slice
	// will be used to verify the source connector can successfully execute
	// reads.
	// It is encouraged for the driver to return the same slice, unless there is
	// no way to write the records to the 3rd party system, then the returning
	// slice should contain the expected records a source should read.
	WriteToSource(*testing.T, []opencdc.Record) []opencdc.Record
	// ReadFromDestination should return a slice with the records that were
	// written to the destination. The slice will be used to verify the
	// destination has successfully executed writes.
	// The parameter contains records that were actually written to the
	// destination. These will be compared to the returned slice of records. It
	// is encouraged for the driver to only touch the input records to change
	// the order of records and to not change the records themselves.
	ReadFromDestination(*testing.T, []opencdc.Record) []opencdc.Record

	// ReadTimeout controls the time the test should wait for a read operation
	// to return before it considers the operation as failed.
	ReadTimeout() time.Duration
	// WriteTimeout controls the time the test should wait for a write operation
	// to return before it considers the operation as failed.
	WriteTimeout() time.Duration
}

AcceptanceTestDriver is the object that each test uses for fetching the connector and its configurations. The SDK provides a default implementation ConfigurableAcceptanceTestDriver that should fit most use cases. In case more flexibility is needed you can create your own driver, include the default driver in the struct and override methods as needed.

type ConfigurableAcceptanceTestDriver added in v0.3.0

type ConfigurableAcceptanceTestDriver struct {
	Config ConfigurableAcceptanceTestDriverConfig
	// contains filtered or unexported fields
}

ConfigurableAcceptanceTestDriver is the default implementation of AcceptanceTestDriver. It provides a convenient way of configuring the driver without the need of implementing a custom driver from scratch.

func (ConfigurableAcceptanceTestDriver) AfterTest added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) AfterTest(t *testing.T)

func (ConfigurableAcceptanceTestDriver) BeforeTest added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) BeforeTest(t *testing.T)

func (ConfigurableAcceptanceTestDriver) Connector added in v0.3.0

func (ConfigurableAcceptanceTestDriver) Context added in v0.9.0

func (ConfigurableAcceptanceTestDriver) DestinationConfig added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) DestinationConfig(*testing.T) config.Config

func (ConfigurableAcceptanceTestDriver) GenerateData added in v0.3.0

GenerateData generates either RawData or StructuredData depending on the configured data type (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).

func (ConfigurableAcceptanceTestDriver) GenerateRecord added in v0.3.0

func (ConfigurableAcceptanceTestDriver) GenerateValue added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) GenerateValue(t *testing.T) interface{}

GenerateValue generates a random value of a random builtin type.

func (ConfigurableAcceptanceTestDriver) GoleakOptions added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.Option

func (ConfigurableAcceptanceTestDriver) ReadFromDestination added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record

ReadFromDestination by default opens the source and reads all records from the source. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a source the function will fail the test.

func (ConfigurableAcceptanceTestDriver) ReadTimeout added in v0.3.0

func (ConfigurableAcceptanceTestDriver) Skip added in v0.3.0

func (ConfigurableAcceptanceTestDriver) SourceConfig added in v0.3.0

func (ConfigurableAcceptanceTestDriver) WriteTimeout added in v0.3.0

func (ConfigurableAcceptanceTestDriver) WriteToSource added in v0.3.0

func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []opencdc.Record) []opencdc.Record

WriteToSource by default opens the destination and writes records to the destination. It is expected that the destination is writing to the same location the source is reading from. If the connector does not implement a destination the function will fail the test.

type ConfigurableAcceptanceTestDriverConfig added in v0.3.0

type ConfigurableAcceptanceTestDriverConfig struct {
	// Context is the context to use in tests. The default is a context with a
	// logger that writes to the test output.
	//nolint:containedctx // We are using this as a configuration option.
	Context context.Context
	// Connector is the connector to be tested.
	Connector Connector

	// SourceConfig should be a valid config for a source connector, reading
	// from the same location as the destination will write to.
	SourceConfig config.Config
	// DestinationConfig should be a valid config for a destination connector,
	// writing to the same location as the source will read from.
	DestinationConfig config.Config

	// BeforeTest is executed before each acceptance test.
	BeforeTest func(t *testing.T)
	// AfterTest is executed after each acceptance test.
	AfterTest func(t *testing.T)

	// GoleakOptions will be applied to goleak.VerifyNone. Can be used to
	// suppress false positive goroutine leaks.
	GoleakOptions []goleak.Option

	// Skip is a slice of regular expressions used to identify tests that should
	// be skipped. The full test name will be matched against all regular
	// expressions and the test will be skipped if a match is found.
	Skip []string

	// GenerateDataType controls which Data type will be generated in test
	// records. The default is GenerateMixedData which will produce both RawData
	// and StructuredData. To generate only one type of data set this field to
	// GenerateRawData or GenerateStructuredData.
	GenerateDataType GenerateDataType

	// ReadTimeout controls the time the test should wait for a read operation
	// to return a record before it considers the operation as failed. The
	// default timeout is 5 seconds. This value should be changed only if there
	// is a good reason (uncontrollable limitations of the 3rd party system).
	ReadTimeout time.Duration
	// WriteTimeout controls the time the test should wait for a write operation
	// to return a record before it considers the operation as failed. The
	// default timeout is 5 seconds. This value should be changed only if there
	// is a good reason (uncontrollable limitations of the 3rd party system).
	WriteTimeout time.Duration
}

ConfigurableAcceptanceTestDriverConfig contains the configuration for ConfigurableAcceptanceTestDriver.

type Connector added in v0.3.0

type Connector struct {
	// NewSpecification should create a new Specification that describes the
	// connector. This field is mandatory, if it is empty the connector won't
	// work.
	NewSpecification func() Specification
	// NewSource should create a new Source plugin. If the plugin doesn't
	// implement a source connector this field can be nil.
	NewSource func() Source
	// NewDestination should create a new Destination plugin. If the plugin
	// doesn't implement a destination connector this field can be nil.
	NewDestination func() Destination
}

Connector combines all constructors for each plugin into one struct.

type Converter added in v0.5.0

type Converter interface {
	Name() string
	Configure(map[string]string) (Converter, error)
	Convert(opencdc.Record) (any, error)
}

Converter is a type that can change the structure of a Record. It's used in destination connectors to change the output structure (e.g. opencdc records, debezium records etc.).

type DebeziumConverter added in v0.5.0

type DebeziumConverter struct {
	SchemaName string
	RawDataKey string
}

DebeziumConverter outputs a Debezium record.

func (DebeziumConverter) Configure added in v0.5.0

func (c DebeziumConverter) Configure(opt map[string]string) (Converter, error)

func (DebeziumConverter) Convert added in v0.5.0

func (c DebeziumConverter) Convert(r opencdc.Record) (any, error)

func (DebeziumConverter) Name added in v0.5.0

func (c DebeziumConverter) Name() string

type Destination

type Destination interface {
	// Parameters is a map of named Parameters that describe how to configure
	// the Destination.
	Parameters() config.Parameters

	// Configure is the first function to be called in a connector. It provides the
	// connector with the configuration that needs to be validated and stored.
	// In case the configuration is not valid it should return an error.
	// Testing if your connector can reach the configured data source should be
	// done in Open, not in Configure.
	// The connector SDK will sanitize, apply defaults and validate the
	// configuration before calling this function. This means that the
	// configuration will always contain all keys defined in Parameters
	// (unprovided keys will have their default values) and all non-empty
	// values will be of the correct type.
	Configure(context.Context, config.Config) error

	// Open is called after Configure to signal the plugin it can prepare to
	// start writing records. If needed, the plugin should open connections in
	// this function.
	Open(context.Context) error

	// Write writes len(r) records from r to the destination right away without
	// caching. It should return the number of records written from r
	// (0 <= n <= len(r)) and any error encountered that caused the write to
	// stop early. Write must return a non-nil error if it returns n < len(r).
	Write(ctx context.Context, r []opencdc.Record) (n int, err error)

	// Teardown signals to the plugin that all records were written and there
	// will be no more calls to any other function. After Teardown returns, the
	// plugin should be ready for a graceful shutdown.
	Teardown(context.Context) error

	// LifecycleOnCreated is called after Configure and before Open when the
	// connector is run for the first time. This call will be skipped if the
	// connector was already started before. This method can be used to do some
	// initialization that needs to happen only once in the lifetime of a
	// connector (e.g. create a bucket). Anything that the connector creates in
	// this method is considered to be owned by this connector and should be
	// cleaned up in LifecycleOnDeleted.
	LifecycleOnCreated(ctx context.Context, config config.Config) error
	// LifecycleOnUpdated is called after Configure and before Open when the
	// connector configuration has changed since the last run. This call will be
	// skipped if the connector configuration did not change. It can be used to
	// update anything that was initialized in LifecycleOnCreated, in case the
	// configuration change affects it.
	LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error
	// LifecycleOnDeleted is called when the connector was deleted. It will be
	// the only method that is called in that case. This method can be used to
	// clean up anything that was initialized in LifecycleOnCreated.
	LifecycleOnDeleted(ctx context.Context, config config.Config) error
	// contains filtered or unexported methods
}

Destination receives records from Conduit and writes them to 3rd party resources. All implementations must embed UnimplementedDestination for forward compatibility.

func DestinationWithMiddleware added in v0.3.0

func DestinationWithMiddleware(d Destination, middleware ...DestinationMiddleware) Destination

DestinationWithMiddleware wraps the destination into the supplied middleware.

type DestinationMiddleware added in v0.3.0

type DestinationMiddleware interface {
	Wrap(Destination) Destination
}

DestinationMiddleware wraps a Destination and adds functionality to it.

func DefaultDestinationMiddleware added in v0.3.0

func DefaultDestinationMiddleware(opts ...DestinationMiddlewareOption) []DestinationMiddleware

DefaultDestinationMiddleware returns a slice of middleware that should be added to all destinations unless there's a good reason not to.

type DestinationMiddlewareOption added in v0.10.0

type DestinationMiddlewareOption interface {
	Apply(DestinationMiddleware)
}

DestinationMiddlewareOption can be used to change the behavior of the default destination middleware created with DefaultDestinationMiddleware.

type DestinationUtil added in v0.3.0

type DestinationUtil struct{}

DestinationUtil provides utility methods for implementing a destination. Use it by calling Util.Destination.*.

func (DestinationUtil) Route added in v0.3.0

func (DestinationUtil) Route(
	ctx context.Context,
	rec opencdc.Record,
	handleCreate func(context.Context, opencdc.Record) error,
	handleUpdate func(context.Context, opencdc.Record) error,
	handleDelete func(context.Context, opencdc.Record) error,
	handleSnapshot func(context.Context, opencdc.Record) error,
) error

Route makes it easier to implement a destination that mutates entities in place and thus handles different operations differently. It will inspect the operation on the record and based on that choose which handler to call.

Example usage:

func (d *Destination) Write(ctx context.Context, r opencdc.Record) error {
  return d.Util.Route(ctx, r,
    d.handleInsert,
    d.handleUpdate,
    d.handleDelete,
    d.handleSnapshot, // we could also reuse d.handleInsert
  )
}
func (d *Destination) handleInsert(ctx context.Context, r opencdc.Record) error {
  ...
}

type DestinationWithBatch added in v0.3.0

type DestinationWithBatch struct {
	Config DestinationWithBatchConfig
}

DestinationWithBatch adds support for batching on the destination. It adds two parameters to the destination config:

  • `sdk.batch.size` - Maximum size of batch before it gets written to the destination.
  • `sdk.batch.delay` - Maximum delay before an incomplete batch is written to the destination.

To change the defaults of these parameters use the fields of this struct.

func (*DestinationWithBatch) Wrap added in v0.3.0

Wrap a Destination into the batching middleware.

type DestinationWithBatchConfig added in v0.10.0

type DestinationWithBatchConfig struct {
	// BatchSize is the default value for the batch size.
	BatchSize int
	// BatchDelay is the default value for the batch delay.
	BatchDelay time.Duration
}

DestinationWithBatchConfig is the configuration for the DestinationWithBatch middleware. Fields set to their zero value are ignored and will be set to the default value.

DestinationWithBatchConfig can be used as a DestinationMiddlewareOption.

func (DestinationWithBatchConfig) Apply added in v0.10.0

Apply sets the default configuration for the DestinationWithBatch middleware.

func (DestinationWithBatchConfig) BatchDelayParameterName added in v0.10.0

func (c DestinationWithBatchConfig) BatchDelayParameterName() string

func (DestinationWithBatchConfig) BatchSizeParameterName added in v0.10.0

func (c DestinationWithBatchConfig) BatchSizeParameterName() string

type DestinationWithRateLimit added in v0.3.0

type DestinationWithRateLimit struct {
	Config DestinationWithRateLimitConfig
}

DestinationWithRateLimit adds support for rate limiting to the destination. It adds two parameters to the destination config:

  • `sdk.rate.perSecond` - Maximum number of records written per second (0 means no rate limit).
  • `sdk.rate.burst` - Allow bursts of at most X records (0 or less means that bursts are not limited). Only takes effect if a rate limit per second is set. Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch size will be equal to `sdk.rate.burst`.

To change the defaults of these parameters use the fields of this struct.

func (*DestinationWithRateLimit) Wrap added in v0.3.0

Wrap a Destination into the rate limiting middleware.

type DestinationWithRateLimitConfig added in v0.10.0

type DestinationWithRateLimitConfig struct {
	// RatePerSecond is the default value for the rate per second.
	RatePerSecond float64
	// Burst is the default value for the allowed burst count.
	Burst int
}

DestinationWithRateLimitConfig is the configuration for the DestinationWithRateLimit middleware. Fields set to their zero value are ignored and will be set to the default value.

DestinationWithRateLimitConfig can be used as a DestinationMiddlewareOption.

func (DestinationWithRateLimitConfig) Apply added in v0.10.0

Apply sets the default configuration for the DestinationWithRateLimit middleware.

func (DestinationWithRateLimitConfig) RateBurstParameterName added in v0.10.0

func (c DestinationWithRateLimitConfig) RateBurstParameterName() string

func (DestinationWithRateLimitConfig) RatePerSecondParameterName added in v0.10.0

func (c DestinationWithRateLimitConfig) RatePerSecondParameterName() string

type DestinationWithRecordFormat added in v0.5.0

type DestinationWithRecordFormat struct {
	Config DestinationWithRecordFormatConfig
}

DestinationWithRecordFormat adds support for changing the output format of records, specifically of the Record.Bytes method. It adds two parameters to the destination config:

  • `sdk.record.format` - The format of the output record. The inclusion validation exposes a list of valid options.
  • `sdk.record.format.options` - Options are used to configure the format.

func (*DestinationWithRecordFormat) Wrap added in v0.5.0

Wrap a Destination into the record format middleware.

type DestinationWithRecordFormatConfig added in v0.10.0

type DestinationWithRecordFormatConfig struct {
	// DefaultRecordFormat is the default record format.
	DefaultRecordFormat string
	RecordSerializers   []RecordSerializer
}

DestinationWithRecordFormatConfig is the configuration for the DestinationWithRecordFormat middleware. Fields set to their zero value are ignored and will be set to the default value.

DestinationWithRecordFormatConfig can be used as a DestinationMiddlewareOption.

func (DestinationWithRecordFormatConfig) Apply added in v0.10.0

Apply sets the default configuration for the DestinationWithRecordFormat middleware.

func (DestinationWithRecordFormatConfig) DefaultRecordSerializers added in v0.10.0

func (c DestinationWithRecordFormatConfig) DefaultRecordSerializers() []RecordSerializer

DefaultRecordSerializers returns the list of record serializers that are used if DestinationWithRecordFormatConfig.RecordSerializers is nil.

func (DestinationWithRecordFormatConfig) RecordFormatOptionsParameterName added in v0.10.0

func (c DestinationWithRecordFormatConfig) RecordFormatOptionsParameterName() string

func (DestinationWithRecordFormatConfig) RecordFormatParameterName added in v0.10.0

func (c DestinationWithRecordFormatConfig) RecordFormatParameterName() string

type DestinationWithSchemaExtraction added in v0.10.0

type DestinationWithSchemaExtraction struct {
	Config DestinationWithSchemaExtractionConfig
}

DestinationWithSchemaExtraction is a middleware that extracts and decodes the key and/or payload of a record using a schema. It takes the schema subject and version from the record metadata, fetches the schema from the schema service, and decodes the key and/or payload using the schema. If the schema subject and version is not found in the record metadata, it will log a warning and skip decoding the key and/or payload. This middleware is useful when the source connector sends the data with the schema attached. This middleware is the counterpart of SourceWithSchemaExtraction.

It adds two parameters to the destination config:

  • `sdk.schema.extract.key.enabled` - Whether to extract and decode the record key with a schema.
  • `sdk.schema.extract.payload.enabled` - Whether to extract and decode the record payload with a schema.

func (*DestinationWithSchemaExtraction) Wrap added in v0.10.0

Wrap a Destination into the schema middleware. It will apply default configuration values if they are not explicitly set.

type DestinationWithSchemaExtractionConfig added in v0.10.0

type DestinationWithSchemaExtractionConfig struct {
	// Whether to extract and decode the record payload with a schema.
	// If unset, defaults to true.
	PayloadEnabled *bool
	// Whether to extract and decode the record key with a schema.
	// If unset, defaults to true.
	KeyEnabled *bool
}

DestinationWithSchemaExtractionConfig is the configuration for the DestinationWithSchemaExtraction middleware. Fields set to their zero value are ignored and will be set to the default value.

DestinationWithSchemaExtractionConfig can be used as a DestinationMiddlewareOption.

func (DestinationWithSchemaExtractionConfig) Apply added in v0.10.0

Apply sets the default configuration for the DestinationWithSchemaExtraction middleware.

func (DestinationWithSchemaExtractionConfig) SchemaKeyEnabledParameterName added in v0.10.0

func (c DestinationWithSchemaExtractionConfig) SchemaKeyEnabledParameterName() string

func (DestinationWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName added in v0.10.0

func (c DestinationWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName() string

type Encoder added in v0.5.0

type Encoder interface {
	Name() string
	Configure(options map[string]string) (Encoder, error)
	Encode(r any) ([]byte, error)
}

Encoder is a type that can encode a random struct into a byte slice. It's used in destination connectors to encode records into different formats (e.g. JSON, Avro etc.).

type GenerateDataType added in v0.3.0

type GenerateDataType int

GenerateDataType is used in acceptance tests to control what data type will be generated.

const (
	GenerateMixedData GenerateDataType = iota
	GenerateRawData
	GenerateStructuredData
)

type GenericRecordSerializer added in v0.10.0

type GenericRecordSerializer struct {
	Converter
	Encoder
}

GenericRecordSerializer is a serializer that uses a Converter and Encoder to serialize a record.

func (GenericRecordSerializer) Configure added in v0.10.0

func (rf GenericRecordSerializer) Configure(optRaw string) (RecordSerializer, error)

func (GenericRecordSerializer) Name added in v0.10.0

func (rf GenericRecordSerializer) Name() string

Name returns the name of the record serializer combined from the converter name and encoder name.

func (GenericRecordSerializer) Serialize added in v0.10.0

func (rf GenericRecordSerializer) Serialize(r opencdc.Record) ([]byte, error)

Serialize converts and encodes record into a byte array.

type JSONEncoder added in v0.5.0

type JSONEncoder struct{}

JSONEncoder is an Encoder that outputs JSON.

func (JSONEncoder) Configure added in v0.5.0

func (e JSONEncoder) Configure(map[string]string) (Encoder, error)

func (JSONEncoder) Encode added in v0.5.0

func (e JSONEncoder) Encode(v any) ([]byte, error)

func (JSONEncoder) Name added in v0.5.0

func (e JSONEncoder) Name() string

type OpenCDCConverter added in v0.5.0

type OpenCDCConverter struct{}

OpenCDCConverter outputs an OpenCDC record (it does not change the structure of the record).

func (OpenCDCConverter) Configure added in v0.5.0

func (c OpenCDCConverter) Configure(map[string]string) (Converter, error)

func (OpenCDCConverter) Convert added in v0.5.0

func (c OpenCDCConverter) Convert(r opencdc.Record) (any, error)

func (OpenCDCConverter) Name added in v0.5.0

func (c OpenCDCConverter) Name() string

type RecordSerializer added in v0.10.0

type RecordSerializer interface {
	Name() string
	Configure(string) (RecordSerializer, error)
	opencdc.RecordSerializer
}

RecordSerializer is a type that can format a record to bytes. It's used in destination connectors to change the output structure and format.

type Source

type Source interface {
	// Parameters is a map of named Parameters that describe how to configure
	// the Source.
	Parameters() config.Parameters

	// Configure is the first function to be called in a connector. It provides the
	// connector with the configuration that needs to be validated and stored.
	// In case the configuration is not valid it should return an error.
	// Testing if your connector can reach the configured data source should be
	// done in Open, not in Configure.
	// The connector SDK will sanitize, apply defaults and validate the
	// configuration before calling this function. This means that the
	// configuration will always contain all keys defined in Parameters
	// (unprovided keys will have their default values) and all non-empty
	// values will be of the correct type.
	Configure(context.Context, config.Config) error

	// Open is called after Configure to signal the plugin it can prepare to
	// start producing records. If needed, the plugin should open connections in
	// this function. The position parameter will contain the position of the
	// last record that was successfully processed, Source should therefore
	// start producing records after this position. The context passed to Open
	// will be cancelled once the plugin receives a stop signal from Conduit.
	Open(context.Context, opencdc.Position) error

	// Read returns a new Record and is supposed to block until there is either
	// a new record or the context gets cancelled. It can also return the error
	// ErrBackoffRetry to signal to the SDK it should call Read again with a
	// backoff retry.
	// If Read receives a cancelled context or the context is cancelled while
	// Read is running it must stop retrieving new records from the source
	// system and start returning records that have already been buffered. If
	// there are no buffered records left Read must return the context error to
	// signal a graceful stop. If Read returns ErrBackoffRetry while the context
	// is cancelled it will also signal that there are no records left and Read
	// won't be called again.
	// After Read returns an error the function won't be called again (except if
	// the error is ErrBackoffRetry, as mentioned above).
	// Read can be called concurrently with Ack.
	Read(context.Context) (opencdc.Record, error)

	// ReadN is the same as Read, but returns a batch of records. The connector
	// is expected to return at most n records. If there are fewer records
	// available, it should return all of them. If there are no records available
	// it should block until there are records available or the context is
	// cancelled. If the context is cancelled while ReadN is running, it should
	// return the context error.
	ReadN(context.Context, int) ([]opencdc.Record, error)

	// Ack signals to the implementation that the record with the supplied
	// position was successfully processed. This method might be called after
	// the context of Read is already cancelled, since there might be
	// outstanding acks that need to be delivered. When Teardown is called it is
	// guaranteed there won't be any more calls to Ack.
	// Ack can be called concurrently with Read.
	Ack(context.Context, opencdc.Position) error

	// Teardown signals to the plugin that there will be no more calls to any
	// other function. After Teardown returns, the plugin should be ready for a
	// graceful shutdown.
	Teardown(context.Context) error

	// LifecycleOnCreated is called after Configure and before Open when the
	// connector is run for the first time. This call will be skipped if the
	// connector was already started before. This method can be used to do some
	// initialization that needs to happen only once in the lifetime of a
	// connector (e.g. create a logical replication slot). Anything that the
	// connector creates in this method is considered to be owned by this
	// connector and should be cleaned up in LifecycleOnDeleted.
	LifecycleOnCreated(ctx context.Context, config config.Config) error
	// LifecycleOnUpdated is called after Configure and before Open when the
	// connector configuration has changed since the last run. This call will be
	// skipped if the connector configuration did not change. It can be used to
	// update anything that was initialized in LifecycleOnCreated, in case the
	// configuration change affects it.
	LifecycleOnUpdated(ctx context.Context, configBefore, configAfter config.Config) error
	// LifecycleOnDeleted is called when the connector was deleted. It will be
	// the only method that is called in that case. This method can be used to
	// clean up anything that was initialized in LifecycleOnCreated.
	LifecycleOnDeleted(ctx context.Context, config config.Config) error
	// contains filtered or unexported methods
}

Source fetches records from 3rd party resources and sends them to Conduit. All implementations must embed UnimplementedSource for forward compatibility.

func SourceWithMiddleware added in v0.3.0

func SourceWithMiddleware(s Source, middleware ...SourceMiddleware) Source

SourceWithMiddleware wraps the source into the supplied middleware.

type SourceMiddleware added in v0.3.0

type SourceMiddleware interface {
	Wrap(Source) Source
}

SourceMiddleware wraps a Source and adds functionality to it.

func DefaultSourceMiddleware added in v0.3.0

func DefaultSourceMiddleware(opts ...SourceMiddlewareOption) []SourceMiddleware

DefaultSourceMiddleware returns a slice of middleware that should be added to all sources unless there's a good reason not to.

type SourceMiddlewareOption added in v0.10.0

type SourceMiddlewareOption interface {
	Apply(SourceMiddleware)
}

SourceMiddlewareOption can be used to change the behavior of the default source middleware created with DefaultSourceMiddleware.

type SourceUtil added in v0.3.0

type SourceUtil struct{}

SourceUtil provides utility methods for implementing a source. Use it by calling Util.Source.*.

func (SourceUtil) NewRecordCreate added in v0.3.0

func (SourceUtil) NewRecordCreate(
	position opencdc.Position,
	metadata opencdc.Metadata,
	key opencdc.Data,
	payload opencdc.Data,
) opencdc.Record

NewRecordCreate can be used to instantiate a record with OperationCreate.

func (SourceUtil) NewRecordDelete added in v0.3.0

func (SourceUtil) NewRecordDelete(
	position opencdc.Position,
	metadata opencdc.Metadata,
	key opencdc.Data,
	payloadBefore opencdc.Data,
) opencdc.Record

NewRecordDelete can be used to instantiate a record with OperationDelete.

func (SourceUtil) NewRecordSnapshot added in v0.3.0

func (SourceUtil) NewRecordSnapshot(
	position opencdc.Position,
	metadata opencdc.Metadata,
	key opencdc.Data,
	payload opencdc.Data,
) opencdc.Record

NewRecordSnapshot can be used to instantiate a record with OperationSnapshot.

func (SourceUtil) NewRecordUpdate added in v0.3.0

func (SourceUtil) NewRecordUpdate(
	position opencdc.Position,
	metadata opencdc.Metadata,
	key opencdc.Data,
	payloadBefore opencdc.Data,
	payloadAfter opencdc.Data,
) opencdc.Record

NewRecordUpdate can be used to instantiate a record with OperationUpdate.

type SourceWithBatch added in v0.11.1

type SourceWithBatch struct {
	Config SourceWithBatchConfig
}

SourceWithBatch adds support for batching on the source. It adds two parameters to the source config:

  • `sdk.batch.size` - Maximum size of batch before it gets written to the source.
  • `sdk.batch.delay` - Maximum delay before an incomplete batch is written to the source.

To change the defaults of these parameters use the fields of this struct.

func (*SourceWithBatch) Wrap added in v0.11.1

func (s *SourceWithBatch) Wrap(impl Source) Source

Wrap a Source into the batching middleware.

type SourceWithBatchConfig added in v0.11.1

type SourceWithBatchConfig struct {
	// BatchSize is the default value for the batch size.
	BatchSize *int
	// BatchDelay is the default value for the batch delay.
	BatchDelay *time.Duration
}

SourceWithBatchConfig is the configuration for the SourceWithBatch middleware. Fields set to their zero value are ignored and will be set to the default value.

SourceWithBatchConfig can be used as a SourceMiddlewareOption.

func (SourceWithBatchConfig) Apply added in v0.11.1

Apply sets the default configuration for the SourceWithBatch middleware.

func (SourceWithBatchConfig) BatchDelayParameterName added in v0.11.1

func (c SourceWithBatchConfig) BatchDelayParameterName() string

func (SourceWithBatchConfig) BatchSizeParameterName added in v0.11.1

func (c SourceWithBatchConfig) BatchSizeParameterName() string

type SourceWithEncoding added in v0.11.1

type SourceWithEncoding struct{}

SourceWithEncoding is a middleware that encodes the record payload and key with the provided schema. The schema is registered with the schema service and the schema subject is attached to the record metadata.

func (SourceWithEncoding) Apply added in v0.11.1

func (SourceWithEncoding) Wrap added in v0.11.1

func (s SourceWithEncoding) Wrap(impl Source) Source

type SourceWithSchemaContext added in v0.10.0

type SourceWithSchemaContext struct {
	Config SourceWithSchemaContextConfig
}

SourceWithSchemaContext is a middleware that makes it possible to configure the schema context for records read by a source.

func (*SourceWithSchemaContext) Wrap added in v0.10.0

func (s *SourceWithSchemaContext) Wrap(impl Source) Source

Wrap a Source into the schema middleware. It will apply default configuration values if they are not explicitly set.

type SourceWithSchemaContextConfig added in v0.10.0

type SourceWithSchemaContextConfig struct {
	Enabled *bool
	Name    *string
}

SourceWithSchemaContextConfig is the configuration for the SourceWithSchemaContext middleware. Fields set to their zero value are ignored and will be set to the default value.

SourceWithSchemaContextConfig can be used as a SourceMiddlewareOption.

func (SourceWithSchemaContextConfig) Apply added in v0.10.0

Apply sets the default configuration for the SourceWithSchemaExtraction middleware.

func (SourceWithSchemaContextConfig) EnabledParameterName added in v0.10.0

func (c SourceWithSchemaContextConfig) EnabledParameterName() string

func (SourceWithSchemaContextConfig) NameParameterName added in v0.10.0

func (c SourceWithSchemaContextConfig) NameParameterName() string

type SourceWithSchemaExtraction added in v0.10.0

type SourceWithSchemaExtraction struct {
	Config SourceWithSchemaExtractionConfig
}

SourceWithSchemaExtraction is a middleware that extracts a record's payload and key schemas. The schema is extracted from the record data for each record produced by the source. The schema is registered with the schema service and the schema subject is attached to the record metadata.

func (*SourceWithSchemaExtraction) Wrap added in v0.10.0

Wrap a Source into the schema middleware. It will apply default configuration values if they are not explicitly set.

type SourceWithSchemaExtractionConfig added in v0.10.0

type SourceWithSchemaExtractionConfig struct {
	// The type of the payload schema. Defaults to Avro.
	SchemaType schema.Type
	// Whether to extract and encode the record payload with a schema.
	// If unset, defaults to true.
	PayloadEnabled *bool
	// The subject of the payload schema. If unset, defaults to "payload".
	PayloadSubject *string
	// Whether to extract and encode the record key with a schema.
	// If unset, defaults to true.
	KeyEnabled *bool
	// The subject of the key schema. If unset, defaults to "key".
	KeySubject *string
}

SourceWithSchemaExtractionConfig is the configuration for the SourceWithSchemaExtraction middleware. Fields set to their zero value are ignored and will be set to the default value.

SourceWithSchemaExtractionConfig can be used as a SourceMiddlewareOption.

func (SourceWithSchemaExtractionConfig) Apply added in v0.10.0

Apply sets the default configuration for the SourceWithSchemaExtraction middleware.

func (SourceWithSchemaExtractionConfig) SchemaKeyEnabledParameterName added in v0.10.0

func (c SourceWithSchemaExtractionConfig) SchemaKeyEnabledParameterName() string

func (SourceWithSchemaExtractionConfig) SchemaKeySubjectParameterName added in v0.10.0

func (c SourceWithSchemaExtractionConfig) SchemaKeySubjectParameterName() string

func (SourceWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName added in v0.10.0

func (c SourceWithSchemaExtractionConfig) SchemaPayloadEnabledParameterName() string

func (SourceWithSchemaExtractionConfig) SchemaPayloadSubjectParameterName added in v0.10.0

func (c SourceWithSchemaExtractionConfig) SchemaPayloadSubjectParameterName() string

func (SourceWithSchemaExtractionConfig) SchemaTypeParameterName added in v0.10.0

func (c SourceWithSchemaExtractionConfig) SchemaTypeParameterName() string

type Specification

type Specification pconnector.Specification

Specification contains general information regarding the plugin like its name and what it does.

type TemplateRecordSerializer added in v0.10.0

type TemplateRecordSerializer struct {
	// contains filtered or unexported fields
}

TemplateRecordSerializer is a RecordSerializer that serializes a record using a Go template.

func (TemplateRecordSerializer) Configure added in v0.10.0

func (TemplateRecordSerializer) Name added in v0.10.0

func (TemplateRecordSerializer) Serialize added in v0.10.0

func (e TemplateRecordSerializer) Serialize(r opencdc.Record) ([]byte, error)

type UnimplementedDestination

type UnimplementedDestination struct{}

UnimplementedDestination should be embedded to have forward compatible implementations.

func (UnimplementedDestination) Configure

Configure needs to be overridden in the actual implementation.

func (UnimplementedDestination) LifecycleOnCreated added in v0.6.0

LifecycleOnCreated won't do anything by default.

func (UnimplementedDestination) LifecycleOnDeleted added in v0.6.0

LifecycleOnDeleted won't do anything by default.

func (UnimplementedDestination) LifecycleOnUpdated added in v0.6.0

LifecycleOnUpdated won't do anything by default.

func (UnimplementedDestination) Open

Open needs to be overridden in the actual implementation.

func (UnimplementedDestination) Parameters added in v0.3.0

Parameters needs to be overridden in the actual implementation.

func (UnimplementedDestination) Teardown

Teardown needs to be overridden in the actual implementation.

func (UnimplementedDestination) Write

Write needs to be overridden in the actual implementation.

type UnimplementedSource

type UnimplementedSource struct{}

UnimplementedSource should be embedded to have forward compatible implementations.

func (UnimplementedSource) Ack

Ack should be overridden if acks need to be forwarded to the source, otherwise it is optional.

func (UnimplementedSource) Configure

Configure needs to be overridden in the actual implementation.

func (UnimplementedSource) LifecycleOnCreated added in v0.6.0

func (UnimplementedSource) LifecycleOnCreated(context.Context, config.Config) error

LifecycleOnCreated won't do anything by default.

func (UnimplementedSource) LifecycleOnDeleted added in v0.6.0

func (UnimplementedSource) LifecycleOnDeleted(context.Context, config.Config) error

LifecycleOnDeleted won't do anything by default.

func (UnimplementedSource) LifecycleOnUpdated added in v0.6.0

LifecycleOnUpdated won't do anything by default.

func (UnimplementedSource) Open

Open needs to be overridden in the actual implementation.

func (UnimplementedSource) Parameters added in v0.3.0

func (UnimplementedSource) Parameters() config.Parameters

Parameters needs to be overridden in the actual implementation.

func (UnimplementedSource) Read

Read needs to be overridden in the actual implementation.

func (UnimplementedSource) ReadN added in v0.11.1

ReadN can be overridden. If it's not implemented, Read will be used as a fallback.

func (UnimplementedSource) Teardown

Teardown needs to be overridden in the actual implementation.

Directories

Path Synopsis
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data.
Package kafkaconnect contains utility functions and structures for processing Kafka Connect compatible data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL