README ¶
Updated!!!
This extension was based on archived and deprecated https://github.com/grafana/xk6-amqp.
But all methods/objects was refactored.
I've used https://github.com/mostafa/xk6-kafka as an example of approved extension.USE AT YOUR OWN RISK!
xk6-amqp-custom
A k6 extension for publishing and consuming messages from queues and exchanges. This project utilizes AMQP 0.9.1, the most common AMQP protocol in use today.
⚠ This project is not compatible with AMQP 1.0. A list of AMQP 1.0 brokers and other AMQP 1.0 resources may be found at github.com/xinchen10/awesome-amqp.
Build
To build a k6
binary with this extension, first ensure you have the prerequisites:
- Go toolchain
- Git
Then:
- Download xk6:
$ go install go.k6.io/xk6/cmd/xk6@latest
$ xk6 build --with github.com/ptsypyshev/xk6-amqp-custom@latest
Development
To make development a little smoother, use the Makefile
in the root folder. The default target will format your code, run tests, and create a k6
binary with your local code rather than from GitHub.
git clone git@github.com:ptsypyshev/xk6-amqp-custom.git
cd xk6-amqp-custom
make
Example
import {Publisher, Consumer} from "k6/x/amqp"; // import Amqp extension
const url = "amqp://guest:guest@localhost:5672/"
const exchangeName = 'K6 exchange'
const queueName = 'K6 queue'
const routingKey = 'rkey123'
const exchange = {
name: exchangeName,
kind: "direct",
durable: true,
};
const queue = {
name: queueName,
routing_key: routingKey,
durable: true,
};
const publisher = new Publisher({
connection_url: url,
exchange: exchange,
queue: queue,
});
const consumer = new Consumer({
connection_url: url,
exchange: exchange,
queue: queue,
});
// init
export let options = {
vus: 5,
iterations: 10,
// duration: '20s',
};
export default function () {
console.log("Publisher and Consumer are ready")
// Publish messages
const publish = function(mark) {
publisher.publish({
exchange : exchangeName,
routing_key: routingKey,
content_type: "text/plain",
body: "Ping from k6 -> " + mark
})
}
for (let i = 65; i <= 90; i++) {
publish(String.fromCharCode(i))
}
// Consume messages
let result = consumer.consume({
read_timeout: '3s',
consume_limit: 26,
})
result.forEach(msg => {
console.log("msg: " + msg.body)
});
}
Result output:
$ ./k6 run ./examples/publish-listen.js
/\ Grafana /‾‾/
/\ / \ |\ __ / /
/ \/ \ | |/ / / ‾‾\
/ \ | ( | (‾) |
/ __________ \ |_|\_\ \_____/
execution: local
script: ./examples/publish-listen.js
output: -
scenarios: (100.00%) 1 scenario, 5 max VUs, 10m30s max duration (incl. graceful stop):
* default: 10 iterations shared among 5 VUs (maxDuration: 10m0s, gracefulStop: 30s)
INFO[0000] Publisher and Consumer are ready source=console
INFO[0000] Publisher and Consumer are ready source=console
INFO[0000] Publisher and Consumer are ready source=console
INFO[0000] Publisher and Consumer are ready source=console
INFO[0000] Publisher and Consumer are ready source=console
INFO[0000] msg: Ping from k6 -> X source=console
... some the same console output ...
INFO[0000] msg: Ping from k6 -> F source=console
consumer_message_count....: 156 49.882524/s
consumer_queue_load.......: 0 min=0 max=90
data_received.............: 0 B 0 B/s
data_sent.................: 0 B 0 B/s
iteration_duration........: avg=1.26s min=62.09ms med=69.95ms max=3.06s p(90)=3.05s p(95)=3.06s
iterations................: 10 3.197598/s
publisher_message_count...: 260 83.13754/s
publisher_queue_load......: 0 min=0 max=1539
vus.......................: 4 min=4 max=4
vus_max...................: 5 min=5 max=5
running (00m03.1s), 0/5 VUs, 10 complete and 0 interrupted iterations
default ✓ [======================================] 5 VUs 00m03.1s/10m0s 10/10 shared iters
Inspect examples folder for more details.
Testing Locally
This repository includes a docker-compose.yml file that starts RabbitMQ with Management Plugin for testing the extension locally.
⚠ This environment is intended for testing only and should not be used for production purposes.
- Start the docker compose environment.
Output should appear similar to the following:docker compose up -d
✔ Network xk6-amqp_default Created ... 0.0s ✔ Container xk6-amqp-rabbitmq-1 Started ... 0.2s
- Use your custom k6 binary to run a k6 test script connecting to your RabbitMQ server started in the previous step.
./k6 run ./examples/publish-listen.js
- Use the RabbitMQ admin console by accessing http://localhost:15672/, then login using
guest
for both the Username and Password. This will allow you to monitor activity within your messaging server.
Documentation ¶
Overview ¶
Package amqp is an extension for K6 to work with RabbitMQ.
Index ¶
- Variables
- type AMQP
- type Channel
- type ConnectionConfig
- type ConsumeConfig
- type Consumer
- type ConsumerConfig
- type Duration
- type ExchangeBindOptions
- type ExchangeDeclareOptions
- type ExchangeDeleteOptions
- type ExchangeOptions
- type ExchangeUnbindOptions
- type Message
- type Metrics
- type Module
- type Publisher
- type PublisherConfig
- type QueueBindOptions
- type QueueDeclareOptions
- type QueueDeleteOptions
- type QueueInspectOptions
- type QueueOptions
- type QueuePurgeOptions
- type QueueUnbindOptions
- type RootModule
Constants ¶
This section is empty.
Variables ¶
var ( // ErrForbiddenInInitContext is used when a function is called during VU initialization. ErrForbiddenInInitContext = errors.New("publishing RabbitMQ messages in the init context is not supported") // ErrNotEnoughArguments is used when a function is called with too few arguments. ErrNotEnoughArguments = errors.New("not enough arguments") // ErrEmptyContext is used when a function is called with empty vu context. ErrEmptyContext = errors.New("empty vu context") // ErrGetChannel is used when a function cannot get a channel from connection. ErrGetChannel = errors.New("failed to get channel from connection") // ErrDeclareExchange is used when a function cannot declare exchange. ErrDeclareExchange = errors.New("failed to declare exchange") // ErrDeleteExchange is used when a function cannot delete exchange. ErrDeleteExchange = errors.New("failed to delete exchange") // ErrBindExchange is used when a function cannot bind exchange. ErrBindExchange = errors.New("failed to bind exchange") // ErrUnbindExchange is used when a function cannot unbind exchange. ErrUnbindExchange = errors.New("failed to unbind exchange") // ErrDeclareQueue is used when a function cannot declare queue. ErrDeclareQueue = errors.New("failed to declare queue") // ErrDeleteQueue is used when a function cannot delete queue. ErrDeleteQueue = errors.New("failed to delete queue") // ErrBindQueue is used when a function cannot bind queue. ErrBindQueue = errors.New("failed to bind queue") // ErrUnbindQueue is used when a function cannot unbind queue. ErrUnbindQueue = errors.New("failed to unbind queue") // ErrInspectQueue is used when a function cannot inspect queue. ErrInspectQueue = errors.New("failed to inspect queue") // ErrPurgeQueue is used when a function cannot purge queue. ErrPurgeQueue = errors.New("failed to purge queue") // ErrGetQueueStats is used when a function cannot get queue stats. ErrGetQueueStats = errors.New("failed to get queue stats") )
Functions ¶
This section is empty.
Types ¶
type AMQP ¶
type AMQP struct {
// contains filtered or unexported fields
}
AMQP is a main struct for xk6-amqp extension.
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel is a struct used to prepare publishing.
type ConnectionConfig ¶
type ConnectionConfig struct {
ConnectionURL string `json:"connection_url"`
}
ConnectionConfig used to pass connection url to constructor.
type ConsumeConfig ¶
type ConsumeConfig struct { ConsumeLimit int `json:"consume_limit"` ReadTimeout Duration `json:"read_timeout"` }
ConsumeConfig is used for pass params to consume method.
type Consumer ¶
type Consumer struct { Channel // contains filtered or unexported fields }
Consumer is a main struct for consume messages to RabbitMQ.
type ConsumerConfig ¶
type ConsumerConfig struct { ConnectionURL string `json:"connection_url"` Exchange ExchangeDeclareOptions `json:"exchange"` Queue QueueDeclareOptions `json:"queue"` }
ConsumerConfig is used for construct Consumer.
type Duration ¶
Duration is a custom wrapper for time.Duration type.
func (Duration) MarshalJSON ¶
MarshalJSON is a wrapper of json.Marshal.
func (*Duration) UnmarshalJSON ¶
UnmarshalJSON is a wrapper of json.Unmarshal.
type ExchangeBindOptions ¶
type ExchangeBindOptions struct { Args amqpDriver.Table `json:"args"` DestinationExchangeName string `json:"destination_exchange_name"` SourceExchangeName string `json:"source_exchange_name"` RoutingKey string `json:"routing_key"` NoWait bool `json:"no_wait"` }
ExchangeBindOptions provides options when binding (subscribing) one exchange to another.
type ExchangeDeclareOptions ¶
type ExchangeDeclareOptions struct { Args amqpDriver.Table `json:"args"` Name string `json:"name"` Kind string `json:"kind"` Durable bool `json:"durable"` AutoDelete bool `json:"auto_delete"` Internal bool `json:"internal"` NoWait bool `json:"no_wait"` }
ExchangeDeclareOptions provides options when declaring (creating) an exchange.
type ExchangeDeleteOptions ¶
type ExchangeDeleteOptions struct {
Name string `json:"name"`
}
ExchangeDeleteOptions provides options when deleting an exchange.
type ExchangeOptions ¶
type ExchangeOptions struct {
ConnectionURL string `json:"connection_url"`
}
ExchangeOptions defines configuration settings for accessing an exchange.
type ExchangeUnbindOptions ¶
type ExchangeUnbindOptions struct { Args amqpDriver.Table `json:"args"` DestinationExchangeName string `json:"destination_exchange_name"` SourceExchangeName string `json:"source_exchange_name"` RoutingKey string `json:"routing_key"` NoWait bool `json:"no_wait"` }
ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.
type Message ¶
type Message struct { Headers map[string]any `json:"headers"` Exchange string `json:"exchange"` RoutingKey string `json:"routing_key"` Body string `json:"body"` ContentType string `json:"content_type"` Mandatory bool `json:"mandatory"` Immediate bool `json:"immediate"` }
Message is a struct for a publishing message.
type Metrics ¶
type Metrics struct { ConsumerQueueMessages *metrics.Metric ConsumerQueueLoad *metrics.Metric PublisherQueueMessages *metrics.Metric PublisherQueueLoad *metrics.Metric }
Metrics contains metrics for consumer and publisher.
type Publisher ¶
type Publisher struct { Channel // contains filtered or unexported fields }
Publisher is a main struct for publish messages to RabbitMQ.
type PublisherConfig ¶
type PublisherConfig struct { ConnectionURL string `json:"connection_url"` Exchange ExchangeDeclareOptions `json:"exchange"` Queue QueueDeclareOptions `json:"queue"` }
PublisherConfig is used for construct Publisher.
type QueueBindOptions ¶
type QueueBindOptions struct { Args amqpDriver.Table `json:"args"` QueueName string `json:"queue_name"` ExchangeName string `json:"exchange_name"` RoutingKey string `json:"routing_key"` NoWait bool `json:"no_wait"` }
QueueBindOptions provides options when binding a queue to an exchange in order to receive message(s).
type QueueDeclareOptions ¶
type QueueDeclareOptions struct { Args amqpDriver.Table `json:"args"` RoutingKey string `json:"routing_key"` Name string `json:"name"` Durable bool `json:"durable"` DeleteWhenUnused bool `json:"delete_when_unused"` Exclusive bool `json:"exclusive"` NoWait bool `json:"no_wait"` }
QueueDeclareOptions provides queue options when declaring (creating) a queue.
type QueueDeleteOptions ¶
type QueueDeleteOptions struct {
Name string `json:"name"`
}
QueueDeleteOptions provide options when deleting a queue.
type QueueInspectOptions ¶
type QueueInspectOptions struct { Args amqpDriver.Table `json:"args"` Name string `json:"name"` Durable bool `json:"durable"` DeleteWhenUnused bool `json:"delete_when_unused"` Exclusive bool `json:"exclusive"` NoWait bool `json:"no_wait"` }
QueueInspectOptions provide options when inspecting a queue.
type QueueOptions ¶
type QueueOptions struct {
ConnectionURL string `json:"connection_url"`
}
QueueOptions defines configuration settings for accessing a queue.
type QueuePurgeOptions ¶
QueuePurgeOptions provide options when purging (emptying) a queue.
type QueueUnbindOptions ¶
type QueueUnbindOptions struct { Args amqpDriver.Table `json:"args"` QueueName string `json:"queue_name"` ExchangeName string `json:"exchange_name"` RoutingKey string `json:"routing_key"` }
QueueUnbindOptions provides options when unbinding a queue from an exchange to stop receiving message(s).
type RootModule ¶
type RootModule struct{}
RootModule implements Module interface.
func (*RootModule) NewModuleInstance ¶
func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance
NewModuleInstance creates a new instance of the AMQP module.