kafka

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 7 Imported by: 1

README

Conduit Connector Kafka

scarf pixel

The Kafka connector is one of Conduit builtin plugins. It provides both, a source and a destination connector for Apache Kafka.

How to build?

Run make build to build the connector.

Testing

Run make test to run all the unit and integration tests. Tests require Docker to be installed and running. The command will handle starting and stopping docker containers for you.

Tests will run twice, once against an Apache Kafka instance and a second time against a Redpanda instance.

Source

A Kafka source connector is represented by a single consumer in a Kafka consumer group. By virtue of that, a source's logical position is the respective consumer's offset in Kafka. Internally, though, we're not saving the offset as the position: instead, we're saving the consumer group ID, since that's all which is needed for Kafka to find the offsets for our consumer.

A source is getting associated with a consumer group ID the first time the Read() method is called.

Configuration
name description required default value
servers Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. true
topics Topics is a comma separated list of Kafka topics from which records will be read, ex: "topic1,topic2". true
topic Topic is the Kafka topic to read from. Deprecated: use topics instead. false
clientID A Kafka client ID. false conduit-connector-kafka
readFromBeginning Determines from whence the consumer group should begin consuming when it finds a partition without a committed offset. If this option is set to true it will start with the first message in that partition. false false
groupID Defines the consumer group ID. false
tls.enabled Defines whether TLS is enabled. false false
clientCert A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. false
clientKey A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. false
caCert The Kafka broker's certificate, in PEM format. false
insecureSkipVerify Controls whether a client verifies the server's certificate chain and host name. If true, accepts any certificate presented by the server and any host name in that certificate. false false
saslMechanism SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. false
saslUsername SASL username. If provided, a password needs to be provided too. false
saslPassword SASL password. If provided, a username needs to be provided too. false

Destination

The destination connector sends records to Kafka.

Configuration

There's no global, connector configuration. Each connector instance is configured separately.

name description required default value
servers Servers is a list of Kafka bootstrap servers, which will be used to discover all the servers in a cluster. true
topic Topic is the Kafka topic. It can contain a Go template that will be executed for each record to determine the topic. By default, the topic is the value of the opencdc.collection metadata field. false {{ index .Metadata "opencdc.collection" }}
clientID A Kafka client ID. false conduit-connector-kafka
acks Acks defines the number of acknowledges from partition replicas required before receiving a response to a produce request. none = fire and forget, one = wait for the leader to acknowledge the writes, all = wait for the full ISR to acknowledge the writes. false all
deliveryTimeout Message delivery timeout. false
batchBytes Limits the maximum size of a request in bytes before being sent to a partition. This mirrors Kafka's max.message.bytes. false 1000012
compression Compression applied to messages. Possible values: none, gzip, snappy, lz4, zstd. false snappy
clientCert A certificate for the Kafka client, in PEM format. If provided, the private key needs to be provided too. false
clientKey A private key for the Kafka client, in PEM format. If provided, the certificate needs to be provided too. false
caCert The Kafka broker's certificate, in PEM format. false
insecureSkipVerify Controls whether a client verifies the server's certificate chain and host name. If true, accepts any certificate presented by the server and any host name in that certificate. false false
saslMechanism SASL mechanism to be used. Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512. If empty, authentication won't be performed. false
saslUsername SASL username. If provided, a password needs to be provided too. false
saslPassword SASL password. If provided, a username needs to be provided too. false
Output format

The output format can be adjusted using configuration options provided by the connector SDK:

  • sdk.record.format: used to choose the format
  • sdk.record.format.options: used to configure the specifics of the chosen format

See this article for more info on configuring the output format.

Batching

Batching can also be configured using connector SDK provided options:

  • sdk.batch.size: maximum number of records in batch before it gets written to the destination (defaults to 0, no batching)
  • sdk.batch.delay: maximum delay before an incomplete batch is written to the destination (defaults to 0, no limit)

Documentation

Overview

Package kafka contains implementations for Kafka source and destination connectors for Conduit.

Index

Constants

View Source
const (
	MetadataKafkaHeaderPrefix = "kafka.header."
)

Variables

View Source
var Connector = sdk.Connector{
	NewSpecification: Specification,
	NewSource:        NewSource,
	NewDestination:   NewDestination,
}

Functions

func NewDestination

func NewDestination() sdk.Destination

func NewSource

func NewSource() sdk.Source

func Specification

func Specification() sdk.Specification

Specification returns the Kafka plugin's specification. Any changes here must also be reflected in the ReadMe.

Types

type Destination

type Destination struct {
	sdk.UnimplementedDestination
	// contains filtered or unexported fields
}

func (*Destination) Configure

func (d *Destination) Configure(_ context.Context, cfg map[string]string) error

func (*Destination) Open

func (d *Destination) Open(ctx context.Context) error

func (*Destination) Parameters added in v0.3.0

func (d *Destination) Parameters() map[string]sdk.Parameter

func (*Destination) Teardown

func (d *Destination) Teardown(ctx context.Context) error

Teardown shuts down the Kafka client.

func (*Destination) Write

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error)

type Source

type Source struct {
	sdk.UnimplementedSource
	// contains filtered or unexported fields
}

func (*Source) Ack

func (s *Source) Ack(ctx context.Context, _ sdk.Position) error

func (*Source) Configure

func (s *Source) Configure(ctx context.Context, cfg map[string]string) error

func (*Source) Open

func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) error

func (*Source) Parameters added in v0.3.0

func (s *Source) Parameters() map[string]sdk.Parameter

func (*Source) Read

func (s *Source) Read(ctx context.Context) (sdk.Record, error)

func (*Source) Teardown

func (s *Source) Teardown(ctx context.Context) error

Directories

Path Synopsis
cmd
Package destination is a generated GoMock package.
Package destination is a generated GoMock package.
Package source is a generated GoMock package.
Package source is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL