kafka

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

README

General

The Conduit Kafka plugin provides both, a destination and source Kafka connector, for Conduit.

How it works?

Under the hood, the plugin uses Confluent's Golang Client for Apache Kafka(tm). This client supports a wide range of configuration parameters, which makes it possible to fine tune the plugin.

Source

The Kafka source manages the offsets manually. The main reason for this is that the source connector needs to be able to "seek" to any offset in a Kafka topic.

If a messages is not received from a broker in a specified timeout (which is 5 seconds, and defined by msgTimeout in source.go), the Kafka source returns a "recoverable error", which indicates to Conduit that it should try reading data after some time again.

Destination

The destination connector uses synchronous writes to Kafka. Proper buffering support which will enable asynchronous (and more optimal) writes is planned.

How to build?

Run make build-kafka-plugin.

Testing

Run make test to run all the unit tests. Run make test-integration to run the integration tests.

The integration tests assume that an instance of Kafka at localhost:9092 is running. The Docker compose file at test/docker-compose.yml can be used to quickly start a Kafka instance.

Configuration

There's no global, plugin configuration. Each connector instance is configured separately.

name part of description required default value
servers destination, source A list of bootstrap servers to which the plugin will connect. true
topic destination, source The topic to which records will be written to. true
securityProtocol destination, source Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. false
acks destination The number of acknowledgments required before considering a record written to Kafka. Valid values: 0, 1, all false all
deliveryTimeout destination Message delivery timeout. false 10s
readFromBeginning destination Whether or not to read a topic from beginning (i.e. existing messages or only new messages). false false

Planned work

The planned work is tracked through GitHub issues.

Documentation

Index

Constants

View Source
const (
	Servers           = "servers"
	Topic             = "topic"
	SecurityProtocol  = "securityProtocol"
	Acks              = "acks"
	DeliveryTimeout   = "deliveryTimeout"
	ReadFromBeginning = "readFromBeginning"
)

Variables

View Source
var (
	ErrServersMissing = cerrors.New("servers missing")
	ErrTopicMissing   = cerrors.New("topic missing")
)
View Source
var Required = []string{Servers, Topic}

Functions

This section is empty.

Types

type Config

type Config struct {
	// A list of bootstrap servers, which will be used to discover all the servers in a cluster.
	// Maps to "bootstrap.servers" in a Kafka consumer's configuration
	Servers string
	Topic   string
	// Maps to "security.protocol" in a Kafka consumer's configuration
	SecurityProtocol string
	// Maps to "acks" in a Kafka consumer's configuration
	Acks            skafka.RequiredAcks
	DeliveryTimeout time.Duration
	// Read all messages present in a source topic.
	// Default value: false (only new messages are read)
	ReadFromBeginning bool
}

Config contains all the possible configuration parameters for Kafka sources and destinations. When changing this struct, please also change the plugin specification (in main.go) as well as the ReadMe.

func Parse

func Parse(cfg map[string]string) (Config, error)

func (Config) AsKafkaCfg

func (c Config) AsKafkaCfg() *kafka.ConfigMap

type Consumer

type Consumer interface {
	// Get returns a message from the configured topic, waiting at most 'timeoutMs' milliseconds.
	// Returns:
	// A message and the client's 'position' in Kafka, if there's no error, OR
	// A nil message, the client's position in Kafka, and a nil error,
	// if no message was retrieved within the specified timeout, OR
	// A nil message, nil position and an error if there was an error while retrieving the message (e.g. broker down).
	Get(timeout time.Duration) (*kafka.Message, map[int32]int64, error)

	// Close this consumer and the associated resources (e.g. connections to the broker)
	Close()

	// StartFrom reads messages from the given topic, starting from the given positions.
	// For new partitions or partitions not found in the 'position',
	// the reading behavior is specified by 'readFromBeginning' parameter:
	// if 'true', then all messages will be read, if 'false', only new messages will be read.
	// Returns: An error, if the consumer could not be set to read from the given position, nil otherwise.
	StartFrom(topic string, position map[int32]int64, readFromBeginning bool) error
}

func NewConsumer

func NewConsumer(config Config) (Consumer, error)

NewConsumer creates a new Kafka consumer. The current implementation uses Confluent's Kafka client. Full list of configuration properties is available here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

type Destination

type Destination struct {
	Client Producer
	Config Config
}

func (*Destination) Open

func (s *Destination) Open(ctx context.Context, cfg plugins.Config) error

func (*Destination) Teardown

func (s *Destination) Teardown() error

Teardown shuts down the Kafka client.

func (*Destination) Validate

func (s *Destination) Validate(cfg plugins.Config) error

Validate takes config and returns an error if some values are missing or incorrect.

func (*Destination) Write

func (s *Destination) Write(ctx context.Context, record record.Record) (record.Position, error)

type Producer

type Producer interface {
	// Send synchronously delivers a message.
	// Returns an error, if the message could not be delivered.
	Send(key []byte, payload []byte) error

	// Close this producer and the associated resources (e.g. connections to the broker)
	Close()
}

func NewProducer

func NewProducer(config Config) (Producer, error)

NewProducer creates a new Kafka producer. The current implementation uses Segment's kafka-go client.

type Source

type Source struct {
	Consumer Consumer
	Config   Config
	// contains filtered or unexported fields
}

func (*Source) Ack

func (*Source) Open

func (s *Source) Open(ctx context.Context, cfg plugins.Config) error

func (*Source) Read

func (s *Source) Read(ctx context.Context, position record.Position) (record.Record, error)

func (*Source) Teardown

func (s *Source) Teardown() error

func (*Source) Validate

func (s *Source) Validate(cfg plugins.Config) error

type Spec

type Spec struct {
}

func (Spec) Specify

func (s Spec) Specify() (plugins.Specification, error)

Specify returns the Kafka plugin's specification. Any changes here must also be reflected in the ReadMe.

Directories

Path Synopsis
cmd
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL