GEIST Kafka Connector
Geist Kafka Connector enables Kafka as source and sink type in stream specs when using Geist.
Usage
See GEIST core repo for general information.
Install with:
go get github.com/zpiroux/geist-connector-kafka
Geist Integration
Register connector prior to starting up Geist with (error handling omitted):
import (
"github.com/zpiroux/geist"
"github.com/zpiroux/geist-connector-kafka/gkafka"
)
...
geistConfig := geist.NewConfig()
kafkaConfig := &gkafka.Config{ /* add config */ }
err = geistConfig.RegisterExtractorType(gkafka.NewExtractorFactory(kafkaConfig))
err = geistConfig.RegisterLoaderType(gkafka.NewLoaderFactory(kafkaConfig))
g, err := geist.New(ctx, geistConfig)
...
For details on available config options in gkafka.Config
, see gkafka.go.
Stream Spec Configuration
Common default Kafka properties for the streams can be provided when creating the factory using fields in gkafka.Config
.
Any Kafka property can also be provided in the Stream Spec itself, which then overrides the factory provided ones.
The only special config option provided in addition to the Kafka-native ones is the config for generating unique group IDs.
If the group.id
property is assigned with a value on the format "@UniqueWithPrefix.my-groupid-prefix"
a unique group.id
value will be generated on the format "my-groupid-prefix.<unique extractor id>.<ISO UTC timestamp micros>"
.
In addition to config as part of Kafka config properties, other fields for topic spec, poll timeout and DLQ config, etc, are available. See spec.go for details.
Example specs (as used in unit tests) can be found in gkafka_test.go.
Limitations and improvement areas
Kafka Sink entity (Loader)
Although the Kafka Source entity (Extractor) exhibits high performance and guaranteed delivery, the Kafka Sink entity has lower throughput if the stream is configured for at-least once guarantees.
There is an option in the Stream Spec to increase the sink throughput for a given stream (set sink.config.synchronous
to false
), but that could in rare cases lead to message loss, e.g. in case of Geist host crashing, so should only be used for non-critical streams.
The underlying Kafka library (librdkafka) did not have support for batch publish/delivery reports, that could improve this, when this connector was developed, but newer versions might have added this.
info @ zpiroux . com
License
Geist Kafka Connector source code is available under the MIT License.