KAFQA
Kafka quality analyser, measuring data loss, ops, latency
Running
- ensure go modules is enabled GO111MODULES=on if part of GOPATH and having old go version.
- ensure kafka broker mentioned in config is up.
source kafkqa.env && go build && ./kafkqa
- run
make
to run tests and linting
Report
Tool generates report which contains the following information.
- latency: average, min, max of latency (consumption till msg received)
- Total messages sent, received and lost
- App run time
+---+--------------------------------+--------------+
| | DESCRIPTION | VALUE |
+---+--------------------------------+--------------+
| 1 | Messages Lost | 49995 |
| 2 | Messages Sent | 50000 |
| 3 | Messages Received | 5 |
| 3 | Min Consumption Latency Millis | 7446 |
| 3 | Max Consumption Latency Millis | 7461 |
| 3 | App Run Time | 8.801455502s |
+---+--------------------------------+--------------+
Dashboard
prometheus metrics can be viewed in grafana by importing the dashboard in scripts/dasbhoard
Data
Message format sent over kafka
message {
sequence id
id (unique) UUID
timestamp
random (size s/m/l)
}
Running separate consumer and producers
CONSUMER_ENABLED, PRODUCER_ENABLED
can be set to only run specific component
- setting
PRODUCER_TOTAL_MESSAGES=-1
will produce the messages infinitely.
# run only consumer
CONSUMER_ENABLED="true"
PRODUCER_ENABLED="false"
- If you want to consume message produce in proto format from non kafqa producer
- The latency will be measured from the consumed time to the timestamp given in the proto.
export PROTO_PARSER_ENABLED="true"
export PROTO_PARSER_MESSAGE_NAME="com.test.user.UserLocationLogMessage"
export PROTO_PARSER_FILE_PATH=/proto/test.proto
export PROTO_PARSER_TIMESTAMP_INDEX=3
- Requires
redis
store to track and ack messages
STORE_TYPE="redis"
STORE_REDIS_HOST="127.0.0.1:6379"
STORE_RUN_ID="run-$CONSUMER_GROUP_ID"
SSL Setup
Producer and consumer supports SSL, set the following env configuration
CONSUMER_SECURITY_PROTOCOL="ssl"
CONSUMER_CA_LOCATION="/certs/ca/rootCA.crt" # Public root ca certificate
CONSUMER_CERTIFICATE_LOCATION="/certs/client/client.crt" # certificate signed by ICA / root CA
CONSUMER_KEY_LOCATION="/certs/client/client.key" # private key
Disable consumer Auto commit
if consumer is restarted, some messages could be not tracked, as it's committed before processing.
To disable and commit after processing the messages (This increases the run time though) set `CONSUMER_ENABLE_AUTO_COMMIT="false"
Configuration of application is customisable with kafkq.env
eg: tweak the concurrency of producers/consumers.
Todo
- Compute now - kafka timestamp and report it
- Generate Random consumer group and topic id (for development)
- Add more metrics on messages which're lost (ID/Sequence/Duplicates)
- Producer to handle high throughput (queue full issue)
- measure % of data loss, average of latency
Done:
- convert fmt to log
- Add timestamp to kafka message
- Makefile
- Compute lag (receive t - produce t)
- Consumer
- listen to interrupt and kill consumer or stop with timeout
- Add store to keep track of messages (producer) [interface]
- Ack in store to for received messages (consumer)
- Generate produce & consume basic report
- Prometheus exporter for metrics
- CI (vet/lint/golangci) (travis)
- Capture throughput metrics