amqp

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

README

Updated!!!

This extension was based on archived and deprecated https://github.com/grafana/xk6-amqp.
But all methods/objects was refactored.
I've used https://github.com/mostafa/xk6-kafka as an example of approved extension.

USE AT YOUR OWN RISK!


xk6-amqp-custom

A k6 extension for publishing and consuming messages from queues and exchanges. This project utilizes AMQP 0.9.1, the most common AMQP protocol in use today.

⚠ This project is not compatible with AMQP 1.0. A list of AMQP 1.0 brokers and other AMQP 1.0 resources may be found at github.com/xinchen10/awesome-amqp.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

Then:

  1. Download xk6:
$ go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the k6 binary:
$ xk6 build --with github.com/ptsypyshev/xk6-amqp-custom@latest

Development

To make development a little smoother, use the Makefile in the root folder. The default target will format your code, run tests, and create a k6 binary with your local code rather than from GitHub.

git clone git@github.com:ptsypyshev/xk6-amqp-custom.git
cd xk6-amqp-custom
make

Example

import {Publisher, Consumer} from "k6/x/amqp"; // import Amqp extension

const url = "amqp://guest:guest@localhost:5672/"
const exchangeName = 'K6 exchange'
const queueName = 'K6 queue'
const routingKey = 'rkey123'

const exchange = {
  name: exchangeName,
  kind: "direct",
  durable: true,
};

const queue = {
  name: queueName,
  routing_key: routingKey,
  durable: true,
};

const publisher = new Publisher({
  connection_url: url,
  exchange: exchange,
  queue: queue,
});

const consumer = new Consumer({
  connection_url: url,
  exchange: exchange,
  queue: queue,
});

// init
export let options = {
  vus: 5,
  iterations: 10,
  // duration: '20s',
};

export default function () {
  console.log("Publisher and Consumer are ready")

  // Publish messages
  const publish = function(mark) {
    publisher.publish({
      exchange  : exchangeName,
      routing_key: routingKey,
      content_type: "text/plain",
      body: "Ping from k6 -> " + mark
    })
  }
  for (let i = 65; i <= 90; i++) {
    publish(String.fromCharCode(i))
}

  // Consume messages
  let result = consumer.consume({
    read_timeout: '3s',
    consume_limit: 26,
  })

  result.forEach(msg => {
    console.log("msg: " + msg.body)
  });  
}

Result output:

$ ./k6 run ./examples/publish-listen.js
         /\      Grafana   /‾‾/  
    /\  /  \     |\  __   /  /   
   /  \/    \    | |/ /  /   ‾‾\ 
  /          \   |   (  |  (‾)  |
 / __________ \  |_|\_\  \_____/ 

     execution: local
        script: ./examples/publish-listen.js
        output: -

     scenarios: (100.00%) 1 scenario, 5 max VUs, 10m30s max duration (incl. graceful stop):
              * default: 10 iterations shared among 5 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] Publisher and Consumer are ready              source=console
INFO[0000] Publisher and Consumer are ready              source=console
INFO[0000] Publisher and Consumer are ready              source=console
INFO[0000] Publisher and Consumer are ready              source=console
INFO[0000] Publisher and Consumer are ready              source=console
INFO[0000] msg: Ping from k6 -> X                        source=console

... some the same console output ...

INFO[0000] msg: Ping from k6 -> F                        source=console

     consumer_message_count....: 156 49.882524/s
     consumer_queue_load.......: 0   min=0       max=90  
     data_received.............: 0 B 0 B/s
     data_sent.................: 0 B 0 B/s
     iteration_duration........: avg=1.26s min=62.09ms med=69.95ms max=3.06s p(90)=3.05s p(95)=3.06s
     iterations................: 10  3.197598/s
     publisher_message_count...: 260 83.13754/s
     publisher_queue_load......: 0   min=0       max=1539
     vus.......................: 4   min=4       max=4   
     vus_max...................: 5   min=5       max=5   


running (00m03.1s), 0/5 VUs, 10 complete and 0 interrupted iterations
default ✓ [======================================] 5 VUs  00m03.1s/10m0s  10/10 shared iters

Inspect examples folder for more details.

Testing Locally

This repository includes a docker-compose.yml file that starts RabbitMQ with Management Plugin for testing the extension locally.

⚠ This environment is intended for testing only and should not be used for production purposes.

  1. Start the docker compose environment.
    docker compose up -d
    
    Output should appear similar to the following:
    ✔ Network xk6-amqp_default       Created               ...    0.0s
    ✔ Container xk6-amqp-rabbitmq-1  Started               ...    0.2s
    
  2. Use your custom k6 binary to run a k6 test script connecting to your RabbitMQ server started in the previous step.
    ./k6 run ./examples/publish-listen.js
    
  3. Use the RabbitMQ admin console by accessing http://localhost:15672/, then login using guest for both the Username and Password. This will allow you to monitor activity within your messaging server.

Documentation

Overview

Package amqp is an extension for K6 to work with RabbitMQ.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrForbiddenInInitContext is used when a function is called during VU initialization.
	ErrForbiddenInInitContext = errors.New("publishing RabbitMQ messages in the init context is not supported")

	// ErrNotEnoughArguments is used when a function is called with too few arguments.
	ErrNotEnoughArguments = errors.New("not enough arguments")

	// ErrEmptyContext is used when a function is called with empty vu context.
	ErrEmptyContext = errors.New("empty vu context")

	// ErrGetChannel is used when a function cannot get a channel from connection.
	ErrGetChannel = errors.New("failed to get channel from connection")

	// ErrDeclareExchange is used when a function cannot declare exchange.
	ErrDeclareExchange = errors.New("failed to declare exchange")

	// ErrDeleteExchange is used when a function cannot delete exchange.
	ErrDeleteExchange = errors.New("failed to delete exchange")

	// ErrBindExchange is used when a function cannot bind exchange.
	ErrBindExchange = errors.New("failed to bind exchange")

	// ErrUnbindExchange is used when a function cannot unbind exchange.
	ErrUnbindExchange = errors.New("failed to unbind exchange")

	// ErrDeclareQueue is used when a function cannot declare queue.
	ErrDeclareQueue = errors.New("failed to declare queue")

	// ErrDeleteQueue is used when a function cannot delete queue.
	ErrDeleteQueue = errors.New("failed to delete queue")

	// ErrBindQueue is used when a function cannot bind queue.
	ErrBindQueue = errors.New("failed to bind queue")

	// ErrUnbindQueue is used when a function cannot unbind queue.
	ErrUnbindQueue = errors.New("failed to unbind queue")

	// ErrInspectQueue is used when a function cannot inspect queue.
	ErrInspectQueue = errors.New("failed to inspect queue")

	// ErrPurgeQueue is used when a function cannot purge queue.
	ErrPurgeQueue = errors.New("failed to purge queue")

	// ErrGetQueueStats is used when a function cannot get queue stats.
	ErrGetQueueStats = errors.New("failed to get queue stats")
)

Functions

This section is empty.

Types

type AMQP

type AMQP struct {
	// contains filtered or unexported fields
}

AMQP is a main struct for xk6-amqp extension.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

Channel is a struct used to prepare publishing.

type ConnectionConfig

type ConnectionConfig struct {
	ConnectionURL string `json:"connection_url"`
}

ConnectionConfig used to pass connection url to constructor.

type ConsumeConfig

type ConsumeConfig struct {
	ConsumeLimit int      `json:"consume_limit"`
	ReadTimeout  Duration `json:"read_timeout"`
}

ConsumeConfig is used for pass params to consume method.

type Consumer

type Consumer struct {
	Channel
	// contains filtered or unexported fields
}

Consumer is a main struct for consume messages to RabbitMQ.

func (*Consumer) Close

func (c *Consumer) Close() error

Close is a method of Publisher for safely shutdown it.

type ConsumerConfig

type ConsumerConfig struct {
	ConnectionURL string                 `json:"connection_url"`
	Exchange      ExchangeDeclareOptions `json:"exchange"`
	Queue         QueueDeclareOptions    `json:"queue"`
}

ConsumerConfig is used for construct Consumer.

type Duration

type Duration struct {
	time.Duration
}

Duration is a custom wrapper for time.Duration type.

func (Duration) MarshalJSON

func (d Duration) MarshalJSON() ([]byte, error)

MarshalJSON is a wrapper of json.Marshal.

func (*Duration) UnmarshalJSON

func (d *Duration) UnmarshalJSON(b []byte) error

UnmarshalJSON is a wrapper of json.Unmarshal.

type ExchangeBindOptions

type ExchangeBindOptions struct {
	Args                    amqpDriver.Table `json:"args"`
	DestinationExchangeName string           `json:"destination_exchange_name"`
	SourceExchangeName      string           `json:"source_exchange_name"`
	RoutingKey              string           `json:"routing_key"`
	NoWait                  bool             `json:"no_wait"`
}

ExchangeBindOptions provides options when binding (subscribing) one exchange to another.

type ExchangeDeclareOptions

type ExchangeDeclareOptions struct {
	Args       amqpDriver.Table `json:"args"`
	Name       string           `json:"name"`
	Kind       string           `json:"kind"`
	Durable    bool             `json:"durable"`
	AutoDelete bool             `json:"auto_delete"`
	Internal   bool             `json:"internal"`
	NoWait     bool             `json:"no_wait"`
}

ExchangeDeclareOptions provides options when declaring (creating) an exchange.

type ExchangeDeleteOptions

type ExchangeDeleteOptions struct {
	Name string `json:"name"`
}

ExchangeDeleteOptions provides options when deleting an exchange.

type ExchangeOptions

type ExchangeOptions struct {
	ConnectionURL string `json:"connection_url"`
}

ExchangeOptions defines configuration settings for accessing an exchange.

type ExchangeUnbindOptions

type ExchangeUnbindOptions struct {
	Args                    amqpDriver.Table `json:"args"`
	DestinationExchangeName string           `json:"destination_exchange_name"`
	SourceExchangeName      string           `json:"source_exchange_name"`
	RoutingKey              string           `json:"routing_key"`
	NoWait                  bool             `json:"no_wait"`
}

ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.

type Message

type Message struct {
	Headers     map[string]any `json:"headers"`
	Exchange    string         `json:"exchange"`
	RoutingKey  string         `json:"routing_key"`
	Body        string         `json:"body"`
	ContentType string         `json:"content_type"`
	Mandatory   bool           `json:"mandatory"`
	Immediate   bool           `json:"immediate"`
}

Message is a struct for a publishing message.

type Metrics

type Metrics struct {
	ConsumerQueueMessages  *metrics.Metric
	ConsumerQueueLoad      *metrics.Metric
	PublisherQueueMessages *metrics.Metric
	PublisherQueueLoad     *metrics.Metric
}

Metrics contains metrics for consumer and publisher.

type Module

type Module struct {
	*AMQP
}

Module implements Instance interface.

func (*Module) Exports

func (m *Module) Exports() modules.Exports

Exports returns the exports of the AMQP module, which are the functions that can be called from the JS code.

type Publisher

type Publisher struct {
	Channel
	// contains filtered or unexported fields
}

Publisher is a main struct for publish messages to RabbitMQ.

func (*Publisher) Close

func (p *Publisher) Close() error

Close is a method of Publisher for safely shutdown it.

type PublisherConfig

type PublisherConfig struct {
	ConnectionURL string                 `json:"connection_url"`
	Exchange      ExchangeDeclareOptions `json:"exchange"`
	Queue         QueueDeclareOptions    `json:"queue"`
}

PublisherConfig is used for construct Publisher.

type QueueBindOptions

type QueueBindOptions struct {
	Args         amqpDriver.Table `json:"args"`
	QueueName    string           `json:"queue_name"`
	ExchangeName string           `json:"exchange_name"`
	RoutingKey   string           `json:"routing_key"`
	NoWait       bool             `json:"no_wait"`
}

QueueBindOptions provides options when binding a queue to an exchange in order to receive message(s).

type QueueDeclareOptions

type QueueDeclareOptions struct {
	Args             amqpDriver.Table `json:"args"`
	RoutingKey       string           `json:"routing_key"`
	Name             string           `json:"name"`
	Durable          bool             `json:"durable"`
	DeleteWhenUnused bool             `json:"delete_when_unused"`
	Exclusive        bool             `json:"exclusive"`
	NoWait           bool             `json:"no_wait"`
}

QueueDeclareOptions provides queue options when declaring (creating) a queue.

type QueueDeleteOptions

type QueueDeleteOptions struct {
	Name string `json:"name"`
}

QueueDeleteOptions provide options when deleting a queue.

type QueueInspectOptions

type QueueInspectOptions struct {
	Args             amqpDriver.Table `json:"args"`
	Name             string           `json:"name"`
	Durable          bool             `json:"durable"`
	DeleteWhenUnused bool             `json:"delete_when_unused"`
	Exclusive        bool             `json:"exclusive"`
	NoWait           bool             `json:"no_wait"`
}

QueueInspectOptions provide options when inspecting a queue.

type QueueOptions

type QueueOptions struct {
	ConnectionURL string `json:"connection_url"`
}

QueueOptions defines configuration settings for accessing a queue.

type QueuePurgeOptions

type QueuePurgeOptions struct {
	QueueName string `json:"queue_name"`
	NoWait    bool   `json:"no_wait"`
}

QueuePurgeOptions provide options when purging (emptying) a queue.

type QueueUnbindOptions

type QueueUnbindOptions struct {
	Args         amqpDriver.Table `json:"args"`
	QueueName    string           `json:"queue_name"`
	ExchangeName string           `json:"exchange_name"`
	RoutingKey   string           `json:"routing_key"`
}

QueueUnbindOptions provides options when unbinding a queue from an exchange to stop receiving message(s).

type RootModule

type RootModule struct{}

RootModule implements Module interface.

func New

func New() *RootModule

New creates a new instance of the root module.

func (*RootModule) NewModuleInstance

func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance

NewModuleInstance creates a new instance of the AMQP module.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL