README
¶
Performance Tests
The performance_test package contains functionality to orchestrate configurable tests that stress a Kafka cluster. It allows you to schedule scenarios, which consist of multiple tests.
We use Kafka's Trogdor framework to achieve this and plan on supporting multiple test types (throughput tests, connection/authentication/rebalance storms, etc).
Each kind of test has a different type, specified in a JSON format. Currently supported types:
ProgressiveWorkload
TailConsume
The framework allows you to schedule multiple tests at once with flexible schedules (one after the other, overlapping, etc)
Definitions
There can be confusion when reading this code because we divide it into many subdivisions of work, all with similar names. Here is what each subdivision means.
Workload
(Java) - What is actually running inside Trogdor.Task
- This is the smallest unit of work in the Go client. This has a 1:1 mapping to aworkload
within Trogdor.Step
- A group oftask
all sharing the same configuration. In general, a step has an equal number of tasks as trogdor agents, but that can be overridden.Fanout
- This should be a property of astep
and will duplicate the tasks linearly. For example: A fanout of 2 will double the tasks, and a fanout of 3 will triple it.Scenario
- A scenario is a metadata wrapper aroundstep
configurations.Workload
(Go) - An overarching configuration that defines allscenario
configurations.
General JSON Parameters
- scenario_name
- schedule - defines when each test in the scenario should run in relation to one another
- start_after_begin - a list of tasks that should all begin before the given task starts
- start_after_end - a list of tasks that should all end before the given task starts
- we do not support configuring both start_after_begin and start_after_end at once
- start_delay - defines a duration that should pass before we start the task
- run_until_end_of - defines the duration of the given test. It will run until the test with the latest end time in the list
- test_definitions - defines the different tests this scenario will consist of
- test_name - name of the test
- test_type - defines the type of test we will run. Each test supports different test_parameters
- test_parameters - custom parameters for this specific test type
Progressive Workload
We define a progressive workload as a continuous series of test scenarios where each step progressively issues more load on the cluster. Each step essentially consists of multiple Trogdor tasks. We schedule exactly one Trogdor task per Trogdor agent at any one given time.
workload_type
- This currently only supports"Produce"
.topic_name
(optional) - The topic to produce to. The default is derived from the name parameter:[workload_name]-topic
.partition_count
- The number of partitions the topic should have.step_duration_ms
- The duration, in milliseconds, of a single iteration.start_throughput_mbs
- The throughput, in MB/s, we want to start at.end_throughput_mbs
- The throughput, in MB/s, we want this test to end at (inclusive).step_cooldown_ms
(optional) - A configurable amount of time, in milliseconds, in between each iteration. Only applicable if the start and end throughput are different.throughput_increase_mbs
(optional) - The amount of throughput we want to progressively increase each step by. Only applicable if the start and end throughput are different.message_size_bytes
(optional) - An approximation of the message size. The default is 900 bytes.message_padding_bytes
(optional) - The amount of bytewise 0's to append the end of the message as padding so compression can work. The default is 100 bytes, and this value is not used unlessmessage_size_bytes
is specified as well.tasks_per_step
(optional) - The number of Trogdor tasks to create per step. The default is equal to the number of trogdor agents.slow_start_per_step_ms
(optional) - If specified, each task in a given step will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by(N-1) * [slow_start_per_step_ms]
milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.
Tail Consumer
A tail consumer test consists of multiple consumers subscribed to a topic. They read from the end of the log at all times with no throttling. We schedule exactly one Trogdor ConsumeBench task per Trogdor agent for every consumer group at any one given time.
fanout
(optional) - Defines the number of consumer groups and sets the number of tasks created as[fanout] * [number of trogdor agents]
. Ifconsumer_group
is specified, the test will not create a different consumer group for each fanout.topics_from_test
(optional) - The name of the test that produces to topics which consumers of this test will subscribe to. One and only one of this ortopics
below must be set.- It is expected for this produce test to be defined in the scenario, the tail consumer to be scheduled to run until the end of said produce test and for the topics field to not be populated.
topics
(optional) - The topics these consumers will subscribe to. One and only one of this ortopics_from_test
below must be set.duration
(optional) - The duration, as a Go duration construct, that this test will run.step_messages_per_second
(optional) - The number of messages per second this workload will limit itself to. Note: This number is divided between each task per step (fanout). The default ismath.MaxInt32
.tasks_per_step
(optional) = The number of Trogdor tasks to create per step (fanout). The default is equal to the number of trogdor agents.slow_start_per_step_ms
(optional) - If specified, each task in a given step (fanout) will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by(N-1) * [slow_start_per_step_ms]
milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.consumer_group
(optional) - Override the generated consumer groups and use this one instead. If specified,fanout
does not generate new consumer groups, and all tasks are part of the same one.
Connection Stress
This creates a test that creates and closes connections rapidly.
duration
- The duration, as a Go duration construct, that this test will run.target_connections_per_sec
- The number of connections to create and close per second. For best results, this should be a multiple ofnum_threads
.num_threads
- The number of threads used per task/fanout. Overall work is split between all available threads.action
- The action this test will take. Valid values are:CONNECT
- This uses basic Java connection classes to initiate a TCP connection, skipping all Kafka client code.FETCH_METADATA
- This uses the Kafka AdminClient to perform a fetch of basic cluster metadata.
tasks_per_step
(optional) - The number of Trogdor tasks to create per step. The default is equal to the number of trogdor agents.slow_start_per_step_ms
(optional) - If specified, each task in a given step will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by(N-1) * [slow_start_per_step_ms]
milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.fanout
- The amount of times to duplicate this workload as a new step.
Sustained Connections
This creates a test that generates sustained connections against Kafka. There are three different components we can stress with this, KafkaConsumer, KafkaProducer, and AdminClient. This test tries to use minimal bandwidth per connection to reduce overhead impacts.
duration
- The duration, as a Go duration construct, that this test will run.producer_connection_count
- The total amount of producer connections to maintain per task.consumer_connection_count
- The total amount of consumer connections to maintain per task.metadata_connection_count
- The total amount of metadata connections to maintain per task.num_threads
- The number of threads used per task/fanout to maintain the above connections.refresh_rate_ms
- The rate in which to refresh every connection.topic_name
(optional) - The topic that this test will run against. This must be specified if eitherproducerConnectionCount
orconsumerConnectionCount
are greater than 0.message_size_bytes
(optional) - The size, in bytes, for the produce task to use when sending records. The default is 512 bytes.tasks_per_step
(optional) - The number of Trogdor tasks to create per step. The default is equal to the number of trogdor agents.slow_start_per_step_ms
(optional) - If specified, each task in a given step will be progressively delayed by this amount, in milliseconds, as a way to ramp up the load. The task numbered N will start delayed by(N-1) * [slow_start_per_step_ms]
milliseconds, and its duration will be shortened by the same amount of time. The default is 0, or no ramp up.fanout
- The amount of times to duplicate this workload as a new step.
Note: If a 1:1:1 connection ratio is used, you don't see an equal number of connections on the brokers as are specified in the test. Testing has shown results with a decrease of 2.4x the number of connections specified. For example, 333:333:333 connections will result in about 999/2.4 = 413 connections on the cluster.
See example.json for a sample configuration. A configuration like
{
"scenario_name": "ExampleTest",
"test_definitions": [{
"test_type": "ProgressiveWorkload",
"test_name": "test-produce",
"test_parameters": {
"workload_type": "Produce",
"step_duration_ms": 60000,
"partition_count": 10,
"step_cooldown_ms": 60000,
"start_throughput_mbs": 10,
"end_throughput_mbs": 20,
"throughput_increase_per_step_mbs": 5,
"message_size_bytes": 1000
}
}]
}
would result in 3 steps, consisting of the following throughputs (10 MB/s, 15 MB/s, 20 MB/s). Each step would last one minute and there would be one minute of downtime in between each step. Note that the schedule field is optional. If omitted, all tasks get scheduled at once.
{
"scenario_name": "TestCPKAFKA",
"schedule": {
"A": {},
"B": {
"start_delay": "1m",
"start_after_begin": ["A"]
},
"C": {
"run_until_end_of": ["A"]
},
"D": {
"start_delay": "0s",
"start_after_begin": ["B"],
"run_until_end_of": ["B"]
}
},
"test_definitions": [
{
"test_type": "ProgressiveWorkload",
"test_name": "A",
"test_parameters": {
"workload_type": "Produce",
"step_duration_ms": 60000,
"partition_count": 10,
"step_cooldown_ms": 60000,
"start_throughput_mbs": 10,
"end_throughput_mbs": 20,
"throughput_increase_per_step_mbs": 5,
"message_size_bytes": 1000
}
},
{
"test_type": "ProgressiveWorkload",
"test_name": "B",
"test_parameters": {
"workload_type": "Produce",
"step_duration_ms": 30000,
"partition_count": 15,
"step_cooldown_ms": 1000,
"start_throughput_mbs": 10,
"end_throughput_mbs": 20,
"throughput_increase_per_step_mbs": 5,
"message_size_bytes": 255
}
},
{
"test_type": "TailConsume",
"test_name": "C",
"test_parameters": {
"fanout": 2,
"topics_from_test": "A"
}
},
{
"test_type": "TailConsume",
"test_name": "D",
"test_parameters": {
"fanout": 2,
"topics_from_test": "B"
}
}
]
}
In this example, we have two produce tasks. The second task, B, will start one minute after A starts. Task C will start in the beginning with A and run until A ends, and task D will start with B and run until B ends.
How to Run
Pre-requisite: Have Trogdor and the soak clients helm charts deployed. (see cc-services/README.md)
# open a shell into the running soak-clients pod
$ kubectl get pods --all-namespaces | grep clients-cli
soak-tests cc-soak-clients-clients-cli-76568867b5-bcmdh 0/1 Running 0 1h
$ kubectl exec -it -n soak-tests cc-soak-clients-clients-cli-76568867b5-bcmdh sh
Once inside the pod, create a JSON test configuration and run the tests with it:
vi /mnt/test/test_config.json
export PERFORMANCE_TEST_CONFIG_PATH=/mnt/test/test_config.json
./soak-clients performance-tests
Documentation
¶
Index ¶
- Constants
- func Run(testConfigPath string, trogdorCoordinatorHost string, trogdorAgentsCount int, ...)
- type ConnectionStress
- func (cs *ConnectionStress) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)
- func (cs *ConnectionStress) GetDuration() time.Duration
- func (cs *ConnectionStress) GetEndTime() (time.Time, error)
- func (cs *ConnectionStress) GetName() string
- func (cs *ConnectionStress) GetStartTime() (time.Time, error)
- func (cs *ConnectionStress) SetEndTime(endTime time.Time)
- func (cs *ConnectionStress) SetStartTime(startTime time.Time)
- type NotEnoughContextError
- type PerformanceTestConfig
- type ScenarioContext
- type ScenarioTestConfig
- type SchedulableTest
- type Schedule
- type ScheduleDef
- type Step
- type SustainedConnection
- func (sc *SustainedConnection) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)
- func (sc *SustainedConnection) GetDuration() time.Duration
- func (sc *SustainedConnection) GetEndTime() (time.Time, error)
- func (sc *SustainedConnection) GetName() string
- func (sc *SustainedConnection) GetStartTime() (time.Time, error)
- func (sc *SustainedConnection) SetEndTime(endTime time.Time)
- func (sc *SustainedConnection) SetStartTime(startTime time.Time)
- type TailConsumer
- func (tc *TailConsumer) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)
- func (tc *TailConsumer) GetDuration() time.Duration
- func (tc *TailConsumer) GetEndTime() (time.Time, error)
- func (tc *TailConsumer) GetName() string
- func (tc *TailConsumer) GetStartTime() (time.Time, error)
- func (tc *TailConsumer) SetEndTime(endTime time.Time)
- func (tc *TailConsumer) SetStartTime(startTime time.Time)
- type TestWithTopics
- type Workload
- func (workload *Workload) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)
- func (workload *Workload) GetDuration() time.Duration
- func (workload *Workload) GetEndTime() (time.Time, error)
- func (workload *Workload) GetName() string
- func (workload *Workload) GetStartTime() (time.Time, error)
- func (workload *Workload) SetEndTime(endTime time.Time)
- func (workload *Workload) SetStartTime(startTime time.Time)
- func (workload *Workload) TopicNames() []string
Constants ¶
const CONNECTION_STRESS_TEST_TYPE = "ConnectionStress"
const PRODUCE_WORKLOAD_TYPE = "Produce"
const PROGRESSIVE_WORKLOAD_TEST_TYPE = "ProgressiveWorkload"
const SUSTAINED_CONNECTION_TEST_TYPE = "SustainedConnection"
const TAIL_CONSUMER_TEST_TYPE = "TailConsume"
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ConnectionStress ¶
type ConnectionStress struct { Name string Duration common.Duration `json:"duration"` TargetConnectionsPerSec int `json:"target_connections_per_sec"` NumThreads int `json:"num_threads"` Fanout int `json:"fanout"` TasksPerStep int `json:"tasks_per_step"` Action string `json:"action"` SlowStartPerStepMs uint64 `json:"slow_start_per_step_ms"` // contains filtered or unexported fields }
func (*ConnectionStress) CreateTest ¶
func (*ConnectionStress) GetDuration ¶
func (cs *ConnectionStress) GetDuration() time.Duration
func (*ConnectionStress) GetEndTime ¶
func (cs *ConnectionStress) GetEndTime() (time.Time, error)
func (*ConnectionStress) GetName ¶
func (cs *ConnectionStress) GetName() string
func (*ConnectionStress) GetStartTime ¶
func (cs *ConnectionStress) GetStartTime() (time.Time, error)
func (*ConnectionStress) SetEndTime ¶
func (cs *ConnectionStress) SetEndTime(endTime time.Time)
func (*ConnectionStress) SetStartTime ¶
func (cs *ConnectionStress) SetStartTime(startTime time.Time)
type NotEnoughContextError ¶
type NotEnoughContextError struct {
// contains filtered or unexported fields
}
the NotEnoughContextError error indicates that a test needs more than the provided scenario context to be parsed correctly
func (*NotEnoughContextError) Error ¶
func (nec *NotEnoughContextError) Error() string
type PerformanceTestConfig ¶
type PerformanceTestConfig struct { Type string `json:"test_type"` Name string `json:"test_name"` Parameters json.RawMessage `json:"test_parameters"` // contains filtered or unexported fields }
PerformanceTestConfig is a generic definition of a test. It it meant to support different types of tests, each of which define their own set of test_parameters Each test should implement the SchedulableTest interface
func (*PerformanceTestConfig) CreateTest ¶
func (*PerformanceTestConfig) ParseTest ¶
func (ptc *PerformanceTestConfig) ParseTest(context *ScenarioContext) error
ParseTest() parses the configuration into the concrete test struct it can return a retriable error of type NotEnoughContext which means that we should try parsing this test again when we have more context from the scenario Parsing will be done only once, if successful
type ScenarioContext ¶
type ScenarioContext struct { TestsWithTopics map[string]TestWithTopics SchedulableTests map[string]SchedulableTest }
ScenarioContext holds the tests that are parsed for this scenario run
func (*ScenarioContext) AddSchedulableTest ¶
func (sc *ScenarioContext) AddSchedulableTest(st SchedulableTest)
func (*ScenarioContext) AddTestWithTopics ¶
func (sc *ScenarioContext) AddTestWithTopics(twt TestWithTopics)
type ScenarioTestConfig ¶
type ScenarioTestConfig struct { Name string `json:"scenario_name"` TestDefinitions []*PerformanceTestConfig `json:"test_definitions"` ScheduleDefinition ScheduleDef `json:"schedule"` // contains filtered or unexported fields }
ScenarioTestConfig is the top-most definition for all the performance tests scheduled to run
func (*ScenarioTestConfig) CreateSchedules ¶
func (sct *ScenarioTestConfig) CreateSchedules(startTime time.Time) error
CreateSchedules() parses the user-defined scheduling and sets each test's startTime/endTime accordingly
func (*ScenarioTestConfig) CreateTests ¶
func (sct *ScenarioTestConfig) CreateTests(trogdorAgentsCount int, bootstrapServers string) ([]trogdor.TaskSpec, error)
CreateTests() creates all the Trogdor tasks for each test that is part of this scenario. It requires that the config is parsed and that the schedules are created
func (*ScenarioTestConfig) ParseConfig ¶
func (sct *ScenarioTestConfig) ParseConfig(configPath string) error
type SchedulableTest ¶
type SchedulableTest interface { // CreateTest() should return Trogdor task specifications that compose the whole test. // Said tasks should start no earlier than GetStartTime(), // should have at least one tasks that ends at GetEndTime() and none ending later than that. CreateTest(trogdorAgentsCount int, bootstrapServers string) ([]trogdor.TaskSpec, error) GetName() string // returns the duration of the test. If the test is scheduled to run until another test, this method should return 0 GetDuration() time.Duration // returns an error if StartTime is not set GetStartTime() (time.Time, error) // returns an error if EndTime is not set GetEndTime() (time.Time, error) SetStartTime(time.Time) SetEndTime(time.Time) }
SchedulableTest is an interface for a test that is schedulable. To be eligible for scheduling, the test should have a known duration time or be scheduled to run until a test with a known duration time. After the scheduling is determined, the start and end times of the test
will be set via the appropriate methods
type ScheduleDef ¶
type Step ¶
type Step struct {
// contains filtered or unexported fields
}
a Step is a part of a Workload. It is to be converted to multiple Trogdor tasks which in combination achieve the desired throughput
type SustainedConnection ¶
type SustainedConnection struct { Name string Duration common.Duration `json:"duration"` ProducerConnectionCount uint64 `json:"producer_connection_count"` ConsumerConnectionCount uint64 `json:"consumer_connection_count"` MetadataConnectionCount uint64 `json:"metadata_connection_count"` NumThreads uint64 `json:"num_threads"` Fanout int `json:"fanout"` TasksPerStep int `json:"tasks_per_step"` RefreshRateMs uint64 `json:"refresh_rate_ms"` TopicName string `json:"topic_name"` MessageSizeBytes uint64 `json:"message_size_bytes"` SlowStartPerStepMs uint64 `json:"slow_start_per_step_ms"` // contains filtered or unexported fields }
func (*SustainedConnection) CreateTest ¶
func (*SustainedConnection) GetDuration ¶
func (sc *SustainedConnection) GetDuration() time.Duration
func (*SustainedConnection) GetEndTime ¶
func (sc *SustainedConnection) GetEndTime() (time.Time, error)
func (*SustainedConnection) GetName ¶
func (sc *SustainedConnection) GetName() string
func (*SustainedConnection) GetStartTime ¶
func (sc *SustainedConnection) GetStartTime() (time.Time, error)
func (*SustainedConnection) SetEndTime ¶
func (sc *SustainedConnection) SetEndTime(endTime time.Time)
func (*SustainedConnection) SetStartTime ¶
func (sc *SustainedConnection) SetStartTime(startTime time.Time)
type TailConsumer ¶
type TailConsumer struct { Name string // the ProduceTestName must refer to a test that implements the TopicWithTests interface ProduceTestName string `json:"topics_from_test"` Fanout int `json:"fanout"` Topics []string `json:"topics"` Duration common.Duration `json:"duration"` StepMessagesPerSecond uint64 `json:"step_messages_per_second"` TasksPerStep int `json:"tasks_per_step"` SlowStartPerStepMs uint64 `json:"slow_start_per_step_ms"` ConsumerGroup string `json:"consumer_group"` // contains filtered or unexported fields }
func (*TailConsumer) CreateTest ¶
func (*TailConsumer) GetDuration ¶
func (tc *TailConsumer) GetDuration() time.Duration
func (*TailConsumer) GetEndTime ¶
func (tc *TailConsumer) GetEndTime() (time.Time, error)
func (*TailConsumer) GetName ¶
func (tc *TailConsumer) GetName() string
func (*TailConsumer) GetStartTime ¶
func (tc *TailConsumer) GetStartTime() (time.Time, error)
func (*TailConsumer) SetEndTime ¶
func (tc *TailConsumer) SetEndTime(endTime time.Time)
func (*TailConsumer) SetStartTime ¶
func (tc *TailConsumer) SetStartTime(startTime time.Time)
type TestWithTopics ¶
type TestWithTopics interface { GetName() string // TopicNames() should return all the topics this test will use TopicNames() []string }
TestWithTopics is an interface for a test that makes use of topics.
type Workload ¶
type Workload struct { Name string Type string `json:"workload_type"` PartitionCount uint64 `json:"partition_count"` StepDurationMs uint64 `json:"step_duration_ms"` StepCooldownMs uint64 `json:"step_cooldown_ms"` StartThroughputMbs float32 `json:"start_throughput_mbs"` EndThroughputMbs float32 `json:"end_throughput_mbs"` ThroughputIncreaseMbs float32 `json:"throughput_increase_per_step_mbs"` MessageSizeBytes uint64 `json:"message_size_bytes"` MessagePaddingBytes uint64 `json:"message_padding_bytes"` TasksPerStep int `json:"tasks_per_step"` SlowStartPerStepMs uint64 `json:"slow_start_per_step_ms"` TopicName string `json:"topic_name"` // contains filtered or unexported fields }
func (*Workload) CreateTest ¶
func (workload *Workload) CreateTest(trogdorAgentsCount int, bootstrapServers string) (tasks []trogdor.TaskSpec, err error)
Returns all the Trogdor tasks that should be ran as part of this workload