Continuously send your MongoDB data stream to Kafka
running route81
setup MongoDB
Before running route81 you will need to ensure that MongoDB is setup as a replica set.
Following the instructions for deploying a replica set.
If you haven't already done so, follow the 5 step procedure to initiate and validate your replica set.
For local testing your replica set may contain a single member.
setup Kafka
If you are new to Kafka you can use the following steps to get a simple server running to test with. Run the following commands in separate terminals:
Start Zookeeper
cd kafka
bin/ config/
Start Kafka
cd kafka
bin/ config/
Start a console consumer to view data producer by route81
cd kafka
bin/ --bootstrap-server localhost:9092 --topic test.test --from-beginning
run a release version of route81
The easiest way to run route81 is via docker.
docker run --rm --net=host rwynn/route81:1.0.1
build yourself and run route81 (optional)
route81 uses the confluent go driver for Kafka. This
driver wraps the C client librdkafka. You will need to follow the instructions
for installing libdrdkakfa 1.0
on your system as a prerequisite of running route81.
You may also need the pkg-config
package to be installed on your system.
After installing the dependencies you can build and run route81 using the latest go binary.
# git clone route81 to somewhere outside your $GOPATH dir
cd route81
go run route81.go
If you have trouble installing librdkafka 1.0
and getting route81 to run you can alternatively run route81
via Docker. The Docker build handles building and installing route81 with librdkafka for you.
cd route81/docker/release/
docker run --rm --net=host rwynn/route81:1.0.1
publish MongoDB data to Kafka
At this point route81 should have connected to a MongoDB replica set on localhost. You can then use route81
to publish data into the test.test
topic by interacting with the test
collection of the test
database in
rs1:PRIMARY> use test;
switched to db test
rs1:PRIMARY> for (var i=0; i<100; ++i) db.test.insert({i: i});
WriteResult({ "nInserted" : 1 })
rs1:PRIMARY> db.test.update({}, {$set: {j: 1}}, {multi:true});
WriteResult({ "nMatched" : 100, "nUpserted" : 0, "nModified" : 100 })
rs1:PRIMARY> db.test.remove({});
WriteResult({ "nRemoved" : 100 })
As you perform these operations you should see a log of them being produced to Kafka in your terminal window.
example messages
insert operation
When you insert a document into MongoDB
rs1:PRIMARY> db.test.insert({foo: 1});
You can expect a message like the following in Kafka in topic test.test
update operation
When you update a document into MongoDB
rs1:PRIMARY> db.test.update({}, {$unset: {foo:1}, $set: {bar:1}}, {multi:true});
You can expect a message like the following in Kafka in topic test.test
remove operation
When you remove a document into MongoDB
rs1:PRIMARY> db.test.remove({});
You can expect a message like the following in Kafka in topic test.test
GridFS operation
When you upload a file using GridFS
$ echo 'hello world' > test.txt
$ base64 test.txt
$ mongofiles -d test put test.txt
2019-05-28T13:52:30.807+0000 connected to: localhost
2019-05-28T13:52:30.807+0000 added file: test.txt
You can expect 1 message Kafka for each file in topic test.fs.files
You can also expect 1 or more messages in Kafka for the chunks in each file in topic test.fs.chunks
Notice that the chunk data is sent to Kafka base64 encoded. Since only 1 chunk was sent the value matches the input base64.
The chunk parent file _id
is captured in the field files_id
configure route81
You can configure route81 via flags and/or a TOML config file. The most important options are the connection settings. By default route81 will look to connect to MongoDB and Kafka on localhost at the default ports. You can change this via
route81 -mongo mongodb://user:pass@hostname:port -kafka hostname:9092
The MongoDB URL can be customized with advanced settings for security, etc.
For Kafka you can provide a comma separated list of host:port,host:port... for the bootstrap servers.
configure what gets read and sent from MongoDB to Kafka
By default route81 will open a change stream against the entire MongoDB deployment and send change data as JSON to Kafka. You can target specific databases or collections in MongoDB.
route81 -change-stream-namespace mydb -change-stream-namespace anotherdb.mycol
These can be put into the config file as
change-stream-namespaces = [ "mydb", "anotherdb.mycol" ]
sending entire collections or views to Kafka
In addition to watching and sending changes in MongoDB to Kafka, you can send entire collections or views
direct-read-namespaces = [ "mydb.mycol", "mydb.myview" ]
save and resume your progress across restarts
Turn on resume mode to save timestamps in MongoDB periodically of data that has been sent to Kafka. When started
in this mode route81 will resume progress at the last save point. This requires route81 to be able to save documents
in the collection route81.resume
in MongoDB.
route81 -resume
configure advanced kafka producer settings
For the advanced kafka settings you will need a config file. For example,
enable-idempotence = true
request-timeout-ms = 10000
message-timeout-ms = 10000
message-max-retries = 100
retry-backoff-ms = 50
If you have build librdkafka with SSL support, you can also add
security-protocol = "ssl"
ssl-ca-location = "ca-cert"
ssl-certificate-location = "client_?????_client.pem"
ssl-key-location = "client_?????_client.key"
ssl-key-password = "abcdefgh"
route81 -f /path/to/above.toml
confluent cloud
Auto-creation of topics is disabled in Confluent Cloud. You need to pre-create topics for each MongoDB namespace that route81 sends messages for.
$ ccloud topic create mydb.mycol
If you set a topic-prefix
. e.g. route81
then you would need to include that prefix in each topic name
$ ccloud topic create route81.mydb.mycol
You also need to provide kafka settings specific to your Confluent Cloud account.
broker-version-fallback = ""
api-version-fallback = true
api-version-fallback-ms = 0
sasl-mechanisms = "PLAIN"
security-protocol = "SASL_SSL"
sasl-username = "<ccloud key>"
sasl-password = "<ccloud secret>"
There is no documentation for this package.