kafka

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

README

xk6-kafka

This project is a k6 extension that can be used to load test Kafka, using a producer. Per each connection to Kafka, many messages can be sent. These messages are an array of objects containing a key and a value. There is also a consumer for testing purposes, i.e. to make sure you send the correct data to Kafka. The consumer is not meant to be used for testing Kafka under load. The extension supports producing and consuming messages in Avro format, given a schema for key and/or value.

The real purpose of this extension is not only to test Apache Kafka, but also the system you've designed that uses Apache Kafka. So, you can test your consumers, and hence your system, by auto-generating messages and sending them to your system via Apache Kafka.

In order to build the source, you should have the latest version of Go installed. The latest version should match k6 and xk6. I recommend you to have gvm installed.

If you want to learn more about the extension, visit How to Load Test Your Kafka Producers and Consumers using k6 article on the k6 blog.

Supported Features

  • Produce/consume messages in JSON and Avro format (custom schema)
  • Authentication with SASL PLAIN and SCRAM
  • Create and list topics
  • Support for user-provided Avro key and value schemas
  • Support for loading Avro schemas from Schema Registry
  • Support to consume from all partitions with group ID
  • Support Kafka message compression: Gzip, Snappy, Lz4 & Zstd

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

Then, install xk6 and build your custom k6 binary with the Kafka extension:

  1. Install xk6:
$ go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the binary:
$ xk6 build --with github.com/mostafa/xk6-kafka@latest

Note: you can always use the latest version of k6 to build the extension, but the earliest version of k6 that supports extensions via xk6 is v0.32.0.

Run & Test

First, you need to have your Kafka development environment setup. I recommend you to use Lenses.io fast-data-dev Docker image, which is a complete Kafka setup for development that includes: Kafka, Zookeeper, Schema Registry, Kafka-Connect, Landoop Tools, 20+ connectors. It is fairly easy to setup, if you have Docker installed. Just make sure to monitor Docker logs to have a working setup, before attempting to test. Initial setup, leader election and test data ingestion takes time.

Development Environment

Run the Kafka environment and expose the container ports:

sudo docker run -d --rm --name lenseio -p 2181:2181 -p 3030:3030 \
       -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092  \
       -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev

After running the command, visit localhost:3030 to get into the fast-data-dev environment.

You can run the command to see the container logs:

sudo docker logs -f -t lensesio

If you have errors running the Kafka development environment, refer to the fast-data-dev documentation.

k6 Test

The following k6 test script is used to test this extension and Apache Kafka in turn. The script is available as test_<format>.js with more code and commented sections. The scripts have 4 parts:

  1. The imports at the top shows the exposed functions that are imported from k6 and the extension, check from k6 and the writer, produce, reader, consume from the extension using the k6/x/kafka extension loading convention.
  2. The Avro schema defines a key and a value schema that are used by both producer and consumer, according to the Avro schema specification. These are defined in the test_avro.js script.
  3. The Avro/JSON message producer:
    1. The writer function is used to open a connection to the bootstrap servers. The first argument is an array of strings that signifies the bootstrap server addresses and the second is the topic you want to write to. You can reuse this writer object to produce as many messages as you want. This object is created in init code and is reused in the exported default function.
    2. The produce function is used to send a list of messages to Kafka. The first argument is the producer object, the second is the list of messages (with key and value). The third and the fourth arguments are the key schema and value schema in Avro format, if Avro format is used. If the schema are not passed to the function for either the key or the value, the values are treated as normal strings. Use an empty string, "" if either of the schema is Avro and the other is going to be a string. The produce function returns an error if it fails. The check is optional, but error being undefined means that produce function successfully sent the message.
    3. The producer.close() function closes the producer object (in tearDown).
  4. The Avro/JSON message consumer:
    1. The reader function is used to open a connection to the bootstrap servers. The first argument is an array of strings that signifies the bootstrap server addresses and the second is the topic you want to reader from. This object is created in init code and is reused in the exported default function.
    2. The consume function is used to read a list of messages from Kafka. The first argument is the consumer object, the second is the number of messages to read in one go. The third and the fourth arguments are the key schema and value schema in Avro format, if Avro format is used. If the schema are not passed to the function for either the key or the value, the values are treated as normal strings. Use an empty string, "" if either of the schema is Avro and the other is going to be a string. The consume function returns an empty array if it fails. The check is optional, but it checks to see if the length of the message array is exactly 10.
    3. The consumer.close() function closes the consumer object (in tearDown).

You can run k6 with the Kafka extension using the following command:

$ ./k6 run --vus 50 --duration 60s scripts/test_json.js

And here's the test result output:


          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: scripts/test_json.js
     output: -

  scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
           * default: 50 looping VUs for 1m0s (gracefulStop: 30s)


running (1m00.4s), 00/50 VUs, 6554 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs  1m0s

    ✓ is sent
    ✓ 10 messages returned

    checks.........................: 100.00% ✓ 661954 ✗ 0
    data_received..................: 0 B     0 B/s
    data_sent......................: 0 B     0 B/s
    iteration_duration.............: avg=459.31ms min=188.19ms med=456.26ms max=733.67ms p(90)=543.22ms p(95)=572.76ms
    iterations.....................: 6554    108.563093/s
    kafka.reader.dial.count........: 6554    108.563093/s
    kafka.reader.error.count.......: 0       0/s
    kafka.reader.fetches.count.....: 6554    108.563093/s
    kafka.reader.message.bytes.....: 6.4 MB  106 kB/s
    kafka.reader.message.count.....: 77825   1289.124612/s
    kafka.reader.rebalance.count...: 0       0/s
    kafka.reader.timeouts.count....: 0       0/s
    kafka.writer.dial.count........: 6554    108.563093/s
    kafka.writer.error.count.......: 0       0/s
    kafka.writer.message.bytes.....: 54 MB   890 kB/s
    kafka.writer.message.count.....: 655400  10856.309293/s
    kafka.writer.rebalance.count...: 6554    108.563093/s
    kafka.writer.write.count.......: 655400  10856.309293/s
    vus............................: 50      min=50   max=50
    vus_max........................: 50      min=50   max=50
Troubleshooting

To avoid getting the following error while running the test:

Failed to write message: [5] Leader Not Available: the cluster is in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes

You can now use createTopic function to create topics in Kafka. The scripts/test_topics.js script shows how to list topics on all Kakfa partitions and also how to create a topic.

You always have the option to create it using kafka-topics command:

$ docker exec -it lensesio bash
(inside container)$ kafka-topics --create --topic xk6_kafka_avro_topic --bootstrap-server localhost:9092
(inside container)$ kafka-topics --create --topic xk6_kafka_json_topic --bootstrap-server localhost:9092

If you want to test SASL authentication, have a look at this commmit message, where I describe how to run a test environment.

Disclaimer

This is a proof of concept, isn't supported by the k6 team, and may break in the future. USE AT YOUR OWN RISK!

This work is licensed under the Apache License 2.0.

Documentation

Index

Constants

View Source
const (
	Plain  = "plain"
	SHA256 = "sha256"
	SHA512 = "sha512"
)

Variables

View Source
var (
	ReaderDials      = stats.New("kafka.reader.dial.count", stats.Counter)
	ReaderFetches    = stats.New("kafka.reader.fetches.count", stats.Counter)
	ReaderMessages   = stats.New("kafka.reader.message.count", stats.Counter)
	ReaderBytes      = stats.New("kafka.reader.message.bytes", stats.Counter, stats.Data)
	ReaderRebalances = stats.New("kafka.reader.rebalance.count", stats.Counter)
	ReaderTimeouts   = stats.New("kafka.reader.timeouts.count", stats.Counter)
	ReaderErrors     = stats.New("kafka.reader.error.count", stats.Counter)

	ReaderDialTime   = stats.New("kafka.reader.dial.seconds", stats.Trend, stats.Time)
	ReaderReadTime   = stats.New("kafka.reader.read.seconds", stats.Trend, stats.Time)
	ReaderWaitTime   = stats.New("kafka.reader.wait.seconds", stats.Trend, stats.Time)
	ReaderFetchSize  = stats.New("kafka.reader.fetch.size", stats.Counter)
	ReaderFetchBytes = stats.New("kafka.reader.fetch.bytes", stats.Counter, stats.Data)

	ReaderOffset        = stats.New("kafka.reader.offset", stats.Gauge)
	ReaderLag           = stats.New("kafka.reader.lag", stats.Gauge)
	ReaderMinBytes      = stats.New("kafka.reader.fetch_bytes.min", stats.Gauge)
	ReaderMaxBytes      = stats.New("kafka.reader.fetch_bytes.max", stats.Gauge)
	ReaderMaxWait       = stats.New("kafka.reader.fetch_wait.max", stats.Gauge, stats.Time)
	ReaderQueueLength   = stats.New("kafka.reader.queue.length", stats.Gauge)
	ReaderQueueCapacity = stats.New("kafka.reader.queue.capacity", stats.Gauge)

	WriterDials      = stats.New("kafka.writer.dial.count", stats.Counter)
	WriterWrites     = stats.New("kafka.writer.write.count", stats.Counter)
	WriterMessages   = stats.New("kafka.writer.message.count", stats.Counter)
	WriterBytes      = stats.New("kafka.writer.message.bytes", stats.Counter, stats.Data)
	WriterRebalances = stats.New("kafka.writer.rebalance.count", stats.Counter)
	WriterErrors     = stats.New("kafka.writer.error.count", stats.Counter)

	WriterDialTime   = stats.New("kafka.writer.dial.seconds", stats.Trend, stats.Time)
	WriterWriteTime  = stats.New("kafka.writer.write.seconds", stats.Trend, stats.Time)
	WriterWaitTime   = stats.New("kafka.writer.wait.seconds", stats.Trend, stats.Time)
	WriterRetries    = stats.New("kafka.writer.retries.count", stats.Counter)
	WriterBatchSize  = stats.New("kafka.writer.batch.size", stats.Counter)
	WriterBatchBytes = stats.New("kafka.writer.batch.bytes", stats.Counter, stats.Data)

	WriterMaxAttempts       = stats.New("kafka.writer.attempts.max", stats.Gauge)
	WriterMaxBatchSize      = stats.New("kafka.writer.batch.max", stats.Gauge)
	WriterBatchTimeout      = stats.New("kafka.writer.batch.timeout", stats.Gauge, stats.Time)
	WriterReadTimeout       = stats.New("kafka.writer.read.timeout", stats.Gauge, stats.Time)
	WriterWriteTimeout      = stats.New("kafka.writer.write.timeout", stats.Gauge, stats.Time)
	WriterRebalanceInterval = stats.New("kafka.writer.rebalance.interval", stats.Gauge, stats.Time)
	WriterRequiredAcks      = stats.New("kafka.writer.acks.required", stats.Gauge)
	WriterAsync             = stats.New("kafka.writer.async", stats.Rate)
	WriterQueueLength       = stats.New("kafka.writer.queue.length", stats.Gauge)
	WriterQueueCapacity     = stats.New("kafka.writer.queue.capacity", stats.Gauge)
)
View Source
var (
	CompressionCodecs = map[string]compress.Codec{
		"Gzip":   &compress.GzipCodec,
		"Snappy": &compress.SnappyCodec,
		"Lz4":    &compress.Lz4Codec,
		"Zstd":   &compress.ZstdCodec,
	}
)

Functions

func ConsumeInternal

func ConsumeInternal(
	ctx context.Context, reader *kafkago.Reader, limit int64,
	configuration Configuration, keySchema string, valueSchema string) []map[string]interface{}

func FromAvro

func FromAvro(message []byte, schema string) interface{}

func ProduceInternal

func ProduceInternal(
	ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{},
	configuration Configuration, keySchema string, valueSchema string) error

func ReportError

func ReportError(err error, msg string)

func ReportReaderStats

func ReportReaderStats(ctx context.Context, currentStats kafkago.ReaderStats) error

func ReportWriterStats

func ReportWriterStats(ctx context.Context, currentStats kafkago.WriterStats) error

func SerializeAvro

func SerializeAvro(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error)

func SerializeByteArray

func SerializeByteArray(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error)

func SerializeString

func SerializeString(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error)

func ToAvro

func ToAvro(value string, schema string) []byte

Types

type BasicAuth

type BasicAuth struct {
	CredentialsSource string `json:"credentialsSource"`
	UserInfo          string `json:"userInfo"`
}

type Configuration

type Configuration struct {
	Consumer       ConsumerConfiguration       `json:"consumer"`
	Producer       ProducerConfiguration       `json:"producer"`
	SchemaRegistry SchemaRegistryConfiguration `json:"schemaRegistry"`
}

type ConsumerConfiguration

type ConsumerConfiguration struct {
	KeyDeserializer   string `json:"keyDeserializer"`
	ValueDeserializer string `json:"valueDeserializer"`
}

type Credentials

type Credentials struct {
	Username  string `json:"username"`
	Password  string `json:"password"`
	Algorithm string `json:"algorithm"`
}

type Kafka

type Kafka struct{}

func (*Kafka) Consume

func (*Kafka) Consume(
	ctx context.Context, reader *kafkago.Reader, limit int64,
	keySchema string, valueSchema string) []map[string]interface{}

func (*Kafka) ConsumeWithConfiguration

func (*Kafka) ConsumeWithConfiguration(
	ctx context.Context, reader *kafkago.Reader, limit int64, configurationJson string,
	keySchema string, valueSchema string) []map[string]interface{}

func (*Kafka) CreateTopic

func (*Kafka) CreateTopic(address, topic string, partitions, replicationFactor int, compression string) error

func (*Kafka) ListTopics

func (*Kafka) ListTopics(address string) ([]string, error)

func (*Kafka) Produce

func (*Kafka) Produce(
	ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{},
	keySchema string, valueSchema string) error

func (*Kafka) ProduceWithConfiguration

func (*Kafka) ProduceWithConfiguration(
	ctx context.Context, writer *kafkago.Writer, messages []map[string]interface{},
	configurationJson string, keySchema string, valueSchema string) error

func (*Kafka) Reader

func (*Kafka) Reader(
	brokers []string, topic string, partition int,
	groupID string, offset int64, auth string) *kafkago.Reader

func (*Kafka) Writer

func (*Kafka) Writer(brokers []string, topic string, auth string, compression string) *kafkago.Writer

type ProducerConfiguration

type ProducerConfiguration struct {
	KeySerializer   string `json:"keySerializer"`
	ValueSerializer string `json:"valueSerializer"`
}

type SchemaRegistryConfiguration

type SchemaRegistryConfiguration struct {
	Url       string    `json:"url"`
	BasicAuth BasicAuth `json:"basicAuth"`
}

type Serializer

type Serializer func(configuration Configuration, topic string, data interface{}, keyOrValue string, schema string) ([]byte, error)

func GetSerializer

func GetSerializer(serializer string) Serializer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL