README ¶
Overview
The Registry service exposes Kafka topic and broker metadata via a gRPC & HTTP API. Resources can be queried and filtered using inherent attributes along with custom, user-defined tags.
Some example questions a user might ask by querying the Registry service:
- Give me all broker IDs where the rack equals us-east-1a
- Give me the configurations for all topics tagged "environment:dev"
Additionally, Registry is continuously receiving support for write operations. An example operations that's possible using tags and topic creation (see further into the README for the target_broker_tags
feature):
- Tag brokers 1001, 1002, 1003 with "pool:inbound"
- Create a topic named "inbound" with 32 partitions, a replication factor of 2, and place all partitions on brokers tagged "pool:inbound", and tag the topic "primary:true"
Installation
go get github.com/DataDog/kafka-kit/cmd/registry
Binary will be found at $GOPATH/bin/registry
Tested with Kafka 0.10, 2.2-2.7, ZooKeeper 3.4, 3.5
Usage
Flags
Usage of registry:
-bootstrap-servers string
Kafka bootstrap servers [REGISTRY_BOOTSTRAP_SERVERS] (default "localhost")
-enable-locking
Enable distributed locking for write operations [REGISTRY_ENABLE_LOCKING]
-enable-profiling
Enable Datadog continuous profiling [REGISTRY_ENABLE_PROFILING]
-grpc-listen string
Server gRPC listen address [REGISTRY_GRPC_LISTEN] (default "localhost:8090")
-http-listen string
Server HTTP listen address [REGISTRY_HTTP_LISTEN] (default "localhost:8080")
-kafka-sasl-mechanism string
SASL mechanism to use for authentication. Supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 [REGISTRY_KAFKA_SASL_MECHANISM]
-kafka-sasl-password string
SASL password for use with the PLAIN and SASL-SCRAM-* mechanisms [REGISTRY_KAFKA_SASL_PASSWORD]
-kafka-sasl-username string
SASL username for use with the PLAIN and SASL-SCRAM-* mechanisms [REGISTRY_KAFKA_SASL_USERNAME]
-kafka-security-protocol string
Protocol used to communicate with brokers. Supported: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL [REGISTRY_KAFKA_SECURITY_PROTOCOL]
-kafka-ssl-ca-location string
CA certificate path (.pem/.crt) for verifying broker's identity. Needed for SSL and SASL_SSL protocols. [REGISTRY_KAFKA_SSL_CA_LOCATION]
-kafka-version string
Kafka release (Semantic Versioning) [REGISTRY_KAFKA_VERSION] (default "v0.10.2")
-read-rate-limit int
Read request rate limit (reqs/s) [REGISTRY_READ_RATE_LIMIT] (default 5)
-tag-allowed-staleness int
Minutes before tags with no associated resource are deleted [REGISTRY_TAG_ALLOWED_STALENESS] (default 60)
-tag-cleanup-frequency int
Minutes between runs of tag cleanup [REGISTRY_TAG_CLEANUP_FREQUENCY] (default 20)
-version
version [REGISTRY_VERSION]
-write-rate-limit int
Write request rate limit (reqs/s) [REGISTRY_WRITE_RATE_LIMIT] (default 1)
-zk-addr string
ZooKeeper connect string [REGISTRY_ZK_ADDR] (default "localhost:2181")
-zk-prefix string
ZooKeeper prefix (if Kafka is configured with a chroot path prefix) [REGISTRY_ZK_PREFIX]
-zk-tags-prefix string
Tags storage ZooKeeper prefix [REGISTRY_ZK_TAGS_PREFIX] (default "registry")
Setup
Run Registry, point it at your ZooKeeper cluster:
$ registry --zk-addr zk-test-0.service.consul:2181 --zk-prefix kafka --bootstrap-servers kafka1:9092
2019/12/10 21:48:42 Registry running
2019/12/10 21:48:42 Connected to ZooKeeper: zk-test-0.service.consul:2181
2019/12/10 21:48:42 KafkaAdmin connected to bootstrap servers: kafka1:9092
2019/12/10 21:48:42 gRPC up: 0.0.0.0:8090
2019/12/10 21:48:42 HTTP up: 0.0.0.0:8080
For multi-node setups, it's strongly advised to set --enable-locking=true
; this backs write/update operations with a ZooKeeper based distributed lock.
API Examples
See the Registry proto definition for further details. The API is designed gRPC-first and provides HTTP using grpc-gateway; the mappings are described in the proto file.
List Topics
Lists topic names.
$ curl -s localhost:8080/v1/topics/list | jq
{
"names": [
"__consumer_offsets",
"test0",
"test1"
]
}
Get Topic
Returns full metadata for topic.
$ curl -s "localhost:8080/v1/topics?name=test0" | jq
{
"topics": {
"test0": {
"name": "test0",
"partitions": 32,
"replication": 2
}
}
}
Get Topics
Returns full metadata for all topics.
$ curl -s localhost:8080/v1/topics
<...>
List/Get Topics with Tag Filtering
Works with both list and get. Any number of tags can be specified (multiple tags currently become a logical AND).
$ curl -s "localhost:8080/v1/topics/list?tag=replication:3" | jq
{
"names": [
"__consumer_offsets",
"test0",
"test1",
"test3",
"test4"
]
}
$ curl -s "localhost:8080/v1/topics/list?tag=replication:2&tag=partitions:32" | jq
{
"names": [
"test0"
]
}
List Reassigning Topics
Lists topics that are undergoing reassignment.
$ curl -s "localhost:8080/v1/topics/reassigning" | jq
{
"names": [
"test3",
]
}
List Under Replicated Topics
Lists topics that have under-replicated ISRs.
$ curl -s "localhost:8080/v1/topics/underreplicated" | jq
{
"names": [
"test4",
]
}
Create a Topic
Topics can be created through the Registry service. Additionally, all partitions can be scoped to specific brokers by tag. This call embeds topicmappr placement constraints logic to ensure safe and optimal partition placement.
$ curl -XPOST localhost:8080/v1/topics/create -d '{
"topic": {
"name": "test2",
"partitions": 6,
"replication": 2,
"tags": {
"team": "eng"
}
},
"target_broker_tags": [
"pool:test"
]
}'
Delete a Topic
Deletes the specified topic. Attempting to delete a non-existent topic will return an error.
$ curl localhost:8080/v1/topics/list
{"names":["__consumer_offsets","test1","test2"]}
$ curl -XDELETE "localhost:8080/v1/topics/test2"
{}
$ curl localhost:8080/v1/topics/list
{"names":["__consumer_offsets","test1"]}
$ curl -XDELETE "localhost:8080/v1/topics/test2"
{"error":"topic does not exist","code":2,"message":"topic does not exist"}
List Brokers
Lists broker IDs.
$ curl -s localhost:8080/v1/brokers/list | jq
{
"ids": [
1001,
1002,
1003,
1004,
1005
]
}
Get Broker
Returns full metadata for broker.
$ curl -s "localhost:8080/v1/brokers?id=1001" | jq
{
"brokers": {
"1001": {
"id": 1001,
"listenersecurityprotocolmap": {
"PLAINTEXT": "PLAINTEXT"
},
"endpoints": [
"SASL_SSL://733fad76d5be:9093"
],
"rack": "europe-west3-a",
"jmxport": 9999,
"host": "10.14.224.198",
"timestamp": "1548442165222",
"port": 9092,
"version": 4
}
}
}
Get Brokers
Returns full metadata for all brokers.
$ curl -s localhost:8080/v1/brokers | jq
<...>
List/Get Brokers with Tag Filtering
Works with both list and get. Any number of tags can be specified, filtering is a "match all".
$ curl -s "localhost:8080/v1/brokers?tag=rack:a" | jq
{
"brokers": {
"1004": {
"id": 1004,
"listenersecurityprotocolmap": {
"PLAINTEXT": "PLAINTEXT"
},
"endpoints": [
"SASL_SSL://776d5be33fad:9093"
],
"rack": "a",
"jmxport": 9999,
"host": "10.0.1.103",
"timestamp": "1545171891",
"port": 9092,
"version": 4
}
}
}
$ curl -s "localhost:8080/v1/brokers/list?tag=port:9092&tag=rack:a" | jq
{
"ids": [
1004
]
}
Get unmapped brokers
Returns brokers that host no partitions. Optionally, exclude
topic names can be specified where any partitions belonging to excluded topics are not counted as to whether a broker is considered mapped.
$ curl -s "localhost:8080/v1/brokers/unmapped?exclude=test1&exclude=test2&exclude=__consumer_offsets" | jq
{
"ids": [
1001,
1002,
1003
]
}
Broker<->Topic Mappings
Returns brokers by topic or topics by brokers.
$ curl -s "localhost:8080/v1/mappings/topic/test0" | jq
{
"ids": [
1001,
1002,
1003
]
}
$ curl -s "localhost:8080/v1/mappings/broker/1001" | jq
{
"names": [
"__consumer_offsets",
"test0",
"test1"
]
}
Set Custom Tags
Add user-defined custom tags. Multiple tags can be set at once. These show in Get
requests and can be used seamlessly alongside default tags in lookup filtering. Since the tag filtering make no distinction between inherent attribute and custom tags, setting custom tags names that conflict with default tags are rejected and indicated as such through the API.
$ curl -XPUT "localhost:8080/v1/topics/tag/test0?tag=team:eng&tag=use:testing"
{"message":"success"}
View the above tag:
$ curl -s "localhost:8080/v1/topics?name=test0" | jq
{
"topics": {
"test0": {
"tags": {
"team": "eng",
"use": "testing"
},
"name": "test0",
"partitions": 32,
"replication": 2
}
}
}
Delete Custom Tags
Custom tags can be deleted, optionally many at once.
$ curl -XDELETE "localhost:8080/v1/topics/tag/test0?tag=team&tag=use"
{"message":"success"}
MirrorMaker2 Offset Translation
Reports upstream and local offsets for MirrorMaker2 replicated topics.
$ curl "localhost:8080/v1/translate-offsets/source/test4 | jq"
{
"offsets": {
"source.test4-1:0": {
"upstream_offset": "3",
"local_offset": "3"
},
"source.test4-2:0": {
"upstream_offset": "5",
"local_offset": "5"
},
"source.test4-2:1": {
"upstream_offset": "3",
"local_offset": "3"
},
"source.test4-2:2": {
"upstream_offset": "7",
"local_offset": "7"
}
}
}
Documentation ¶
There is no documentation for this package.