kafka

package
v0.2.0-nightly.20220218 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

README

General

The Conduit Kafka plugin provides both, a source and a destination Kafka connector, for Conduit.

How it works?

Under the hood, the plugin uses Segment's Go Client for Apache Kafka(tm). It was chosen since it has no CGo dependency, making it possible to build the plugin for a wider range of platforms and architectures. It also supports contexts, which will likely use in the future.

Source

A Kafka source connector is represented by a single consumer in a Kafka consumer group. By virtue of that, a source's logical position is the respective consumer's offset in Kafka. Internally, though, we're not saving the offset as the position: instead, we're saving the consumer group ID, since that's all which is needed for Kafka to find the offsets for our consumer.

A source is getting associated with a consumer group ID the first time the Read() method is called.

Destination

The destination connector uses synchronous writes to Kafka. Proper buffering support which will enable asynchronous (and more optimal) writes is planned.

How to build?

Run make build-kafka-plugin.

Testing

Run make test to run all the unit tests. Run make test-integration to run the integration tests.

The integration tests assume that an instance of Kafka at localhost:9092 is running. The Docker compose file at test/docker-compose.yml can be used to quickly start a Kafka instance.

Configuration

There's no global, plugin configuration. Each connector instance is configured separately.

name part of description required default value
servers destination, source A list of bootstrap servers to which the plugin will connect. true
topic destination, source The topic to which records will be written to. true
acks destination The number of acknowledgments required before considering a record written to Kafka. Valid values: 0, 1, all false all
deliveryTimeout destination Message delivery timeout. false 10s
readFromBeginning destination Whether or not to read a topic from beginning (i.e. existing messages or only new messages). false false

Planned work

The planned work is tracked through GitHub issues.

Documentation

Index

Constants

View Source
const (
	Servers           = "servers"
	Topic             = "topic"
	SecurityProtocol  = "securityProtocol"
	Acks              = "acks"
	DeliveryTimeout   = "deliveryTimeout"
	ReadFromBeginning = "readFromBeginning"
)

Variables

View Source
var (
	ErrServersMissing = cerrors.New("servers missing")
	ErrTopicMissing   = cerrors.New("topic missing")
)
View Source
var Required = []string{Servers, Topic}

Functions

This section is empty.

Types

type Config

type Config struct {
	// A list of bootstrap servers, which will be used to discover all the servers in a cluster.
	Servers []string
	Topic   string
	// Required acknowledgments when writing messages to a topic:
	// Can be: 0, 1, -1 (all)
	Acks            kafka.RequiredAcks
	DeliveryTimeout time.Duration
	// Read all messages present in a source topic.
	// Default value: false (only new messages are read)
	ReadFromBeginning bool
}

Config contains all the possible configuration parameters for Kafka sources and destinations. When changing this struct, please also change the plugin specification (in main.go) as well as the ReadMe.

func Parse

func Parse(cfg map[string]string) (Config, error)

type Consumer

type Consumer interface {
	// StartFrom instructs the consumer to connect to a broker and a topic, using the provided consumer group ID.
	// The group ID is significant for this consumer's offsets.
	// By using the same group ID after a restart, we make sure that the consumer continues from where it left off.
	// Returns: An error, if the consumer could not be set to read from the given position, nil otherwise.
	StartFrom(config Config, groupID string) error

	// Get returns a message from the configured topic. Waits until a messages is available
	// or until it errors out.
	// Returns: a message (if available), the consumer group ID and an error (if there was one).
	Get(ctx context.Context) (*kafka.Message, string, error)

	Ack() error

	// Close this consumer and the associated resources (e.g. connections to the broker)
	Close()
}

Consumer represents a Kafka consumer in a simplified form, with just the functionality which is needed for this plugin. A Consumer's offset is being managed by the broker.

func NewConsumer

func NewConsumer() (Consumer, error)

NewConsumer creates a new Kafka consumer. The consumer needs to be started (using the StartFrom method) before actually being used.

type Destination

type Destination struct {
	Client Producer
	Config Config
}

func (*Destination) Open

func (s *Destination) Open(ctx context.Context, cfg plugins.Config) error

func (*Destination) Teardown

func (s *Destination) Teardown() error

Teardown shuts down the Kafka client.

func (*Destination) Validate

func (s *Destination) Validate(cfg plugins.Config) error

Validate takes config and returns an error if some values are missing or incorrect.

func (*Destination) Write

func (s *Destination) Write(ctx context.Context, record record.Record) (record.Position, error)

type Producer

type Producer interface {
	// Send synchronously delivers a message.
	// Returns an error, if the message could not be delivered.
	Send(key []byte, payload []byte) error

	// Close this producer and the associated resources (e.g. connections to the broker)
	Close()
}

func NewProducer

func NewProducer(config Config) (Producer, error)

NewProducer creates a new Kafka producer. The current implementation uses Segment's kafka-go client.

type Source

type Source struct {
	Consumer Consumer
	Config   Config
	// contains filtered or unexported fields
}

func (*Source) Ack

func (*Source) Open

func (s *Source) Open(ctx context.Context, cfg plugins.Config) error

func (*Source) Read

func (s *Source) Read(ctx context.Context, position record.Position) (record.Record, error)

func (*Source) Teardown

func (s *Source) Teardown() error

func (*Source) Validate

func (s *Source) Validate(cfg plugins.Config) error

type Spec

type Spec struct {
}

func (Spec) Specify

func (s Spec) Specify() (plugins.Specification, error)

Specify returns the Kafka plugin's specification. Any changes here must also be reflected in the ReadMe.

Directories

Path Synopsis
cmd
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL