amqp

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2023 License: Apache-2.0 Imports: 7 Imported by: 0

README

xk6-amqp

A k6 extension for publishing and consuming messages from queues and exchanges. This project utilizes AMQP 0.9.1, the most common AMQP protocol in use today.

⚠ This project is not compatible with AMQP 1.0. A list of AMQP 1.0 brokers and other AMQP 1.0 resources may be found at github.com/xinchen10/awesome-amqp.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

Then:

  1. Download xk6:
$ go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the k6 binary:
$ xk6 build --with github.com/grafana/xk6-amqp@latest

Development

To make development a little smoother, use the Makefile in the root folder. The default target will format your code, run tests, and create a k6 binary with your local code rather than from GitHub.

git clone git@github.com:grafana/xk6-amqp.git
cd xk6-amqp
make

Example

import Amqp from 'k6/x/amqp';
import Queue from 'k6/x/amqp/queue';

export default function () {
  console.log("K6 amqp extension enabled, version: " + Amqp.version)
  const url = "amqp://guest:guest@localhost:5672/"
  Amqp.start({
    connection_url: url
  })
  console.log("Connection opened: " + url)

  const queueName = 'K6 general'

  Queue.declare({
    name: queueName,
    // durable: false,
    // delete_when_unused: false,
    // exclusive: false,
    // no_wait: false,
    // args: null
  })

  console.log(queueName + " queue is ready")

  Amqp.publish({
    queue_name: queueName,
    body: "Ping from k6",
    content_type: "text/plain"
    // timestamp: Math.round(Date.now() / 1000)
    // exchange: '',
    // mandatory: false,
    // immediate: false,
    // headers: {
    //   'header-1': '',
    // },
  })

  const listener = function(data) { console.log('received data: ' + data) }
  Amqp.listen({
    queue_name: queueName,
    listener: listener,
    // consumer: '',
    // auto_ack: true,
    // exclusive: false,
		// no_local: false,
		// no_wait: false,
    // args: null
  })
}

Result output:

$ ./k6 run script.js

          /\      |‾‾| /‾‾/   /‾‾/
     /\  /  \     |  |/  /   /  /
    /  \/    \    |     (   /   ‾‾\
   /          \   |  |\  \ |  (‾)  |
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: ../xk6-amqp/examples/test.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] K6 amqp extension enabled, version: v0.0.1    source=console
INFO[0000] Connection opened: amqp://guest:guest@localhost:5672/  source=console
INFO[0000] K6 general queue is ready                     source=console
INFO[0000] received data: Ping from k6                   source=console

running (00m00.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m00.0s/10m0s  1/1 iters, 1 per VU

     data_received........: 0 B 0 B/s
     data_sent............: 0 B 0 B/s
     iteration_duration...: avg=31.37ms min=31.37ms med=31.37ms max=31.37ms p(90)=31.37ms p(95)=31.37ms
     iterations...........: 1   30.855627/s

Inspect examples folder for more details.

Testing Locally

This repository includes a docker-compose.yml file that starts RabbitMQ with Management Plugin for testing the extension locally.

⚠ This environment is intended for testing only and should not be used for production purposes.

  1. Start the docker compose environment.
    docker compose up -d
    
    Output should appear similar to the following:
    ✔ Network xk6-amqp_default       Created               ...    0.0s
    ✔ Container xk6-amqp-rabbitmq-1  Started               ...    0.2s
    
  2. Use your custom k6 binary to run a k6 test script connecting to your RabbitMQ server started in the previous step.
    ./k6 run examples/test.js
    
  3. Use the RabbitMQ admin console by accessing http://localhost:15672/, then login using guest for both the Username and Password. This will allow you to monitor activity within your messaging server.

Documentation

Overview

Package amqp contains AMQP API for a remote server.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AMQP

type AMQP struct {
	Version     string
	Connections *map[int]*amqpDriver.Connection
	MaxConnID   *int
	Queue       *Queue
	Exchange    *Exchange
}

AMQP type holds connection to a remote AMQP server.

func (*AMQP) GetConn added in v0.4.0

func (amqp *AMQP) GetConn(connID int) (*amqpDriver.Connection, error)

GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0

func (*AMQP) Listen

func (amqp *AMQP) Listen(options ListenOptions) error

Listen binds to an AMQP queue in order to receive message(s) as they are received.

func (*AMQP) Publish

func (amqp *AMQP) Publish(options PublishOptions) error

Publish delivers the payload using options provided.

func (*AMQP) Start

func (amqp *AMQP) Start(options Options) (int, error)

Start establishes a session with an AMQP server given the provided options.

type ConsumeOptions

type ConsumeOptions struct {
	Consumer  string
	AutoAck   bool
	Exclusive bool
	NoLocal   bool
	NoWait    bool
	Args      amqpDriver.Table
}

ConsumeOptions defines options for use when consuming a message.

type DeclareOptions

type DeclareOptions struct {
	ConnectionID     int
	Name             string
	Durable          bool
	DeleteWhenUnused bool
	Exclusive        bool
	NoWait           bool
	Args             amqpDriver.Table
}

DeclareOptions provides queue options when declaring (creating) a queue.

type Exchange

type Exchange struct {
	Version     string
	Connections *map[int]*amqpDriver.Connection
	MaxConnID   *int
}

Exchange defines a connection to publish/subscribe destinations.

func (*Exchange) Bind

func (exchange *Exchange) Bind(options ExchangeBindOptions) error

Bind subscribes one exchange to another.

func (*Exchange) Declare

func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error

Declare creates a new exchange given the provided options.

func (*Exchange) Delete

func (exchange *Exchange) Delete(name string, options ExchangeDeleteOptions) error

Delete removes an exchange from the remote server given the exchange name.

func (*Exchange) GetConn added in v0.4.0

func (exchange *Exchange) GetConn(connID int) (*amqpDriver.Connection, error)

GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0

func (*Exchange) Unbind

func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error

Unbind removes a subscription from one exchange to another.

type ExchangeBindOptions

type ExchangeBindOptions struct {
	ConnectionID            int
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

ExchangeBindOptions provides options when binding (subscribing) one exchange to another.

type ExchangeDeclareOptions

type ExchangeDeclareOptions struct {
	ConnectionID int
	Name         string
	Kind         string
	Durable      bool
	AutoDelete   bool
	Internal     bool
	NoWait       bool
	Args         amqpDriver.Table
}

ExchangeDeclareOptions provides options when declaring (creating) an exchange.

type ExchangeDeleteOptions added in v0.4.0

type ExchangeDeleteOptions struct {
	ConnectionID int
}

ExchangeDeleteOptions provides options when deleting an exchange.

type ExchangeOptions

type ExchangeOptions struct {
	ConnectionURL string
}

ExchangeOptions defines configuration settings for accessing an exchange.

type ExchangeUnbindOptions

type ExchangeUnbindOptions struct {
	ConnectionID            int
	DestinationExchangeName string
	SourceExchangeName      string
	RoutingKey              string
	NoWait                  bool
	Args                    amqpDriver.Table
}

ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.

type ListenOptions

type ListenOptions struct {
	ConnectionID int
	Listener     ListenerType
	QueueName    string
	Consumer     string
	AutoAck      bool
	Exclusive    bool
	NoLocal      bool
	NoWait       bool
	Args         amqpDriver.Table
}

ListenOptions defines options for subscribing to message(s) within a queue.

type ListenerType

type ListenerType func(string) error

ListenerType is the message handler implemented within JavaScript.

type Options

type Options struct {
	ConnectionURL string
}

Options defines configuration options for an AMQP session.

type PublishOptions

type PublishOptions struct {
	ConnectionID  int
	QueueName     string
	Body          string
	Headers       amqpDriver.Table
	Exchange      string
	ContentType   string
	Mandatory     bool
	Immediate     bool
	Persistent    bool
	CorrelationID string
	ReplyTo       string
	Expiration    string
	MessageID     string
	Timestamp     int64 // unix epoch timestamp in seconds
	Type          string
	UserID        string
	AppID         string
}

PublishOptions defines a message payload with delivery options.

type Queue

type Queue struct {
	Version     string
	Connections *map[int]*amqpDriver.Connection
	MaxConnID   *int
}

Queue defines a connection to a point-to-point destination.

func (*Queue) Bind

func (queue *Queue) Bind(options QueueBindOptions) error

Bind subscribes a queue to an exchange in order to receive message(s).

func (*Queue) Declare

func (queue *Queue) Declare(options DeclareOptions) (amqpDriver.Queue, error)

Declare creates a new queue given the provided options.

func (*Queue) Delete

func (queue *Queue) Delete(name string, options QueueDeleteOptions) error

Delete removes a queue from the remote server given the queue name.

func (*Queue) GetConn added in v0.4.0

func (queue *Queue) GetConn(connID int) (*amqpDriver.Connection, error)

GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0

func (*Queue) Inspect

func (queue *Queue) Inspect(name string, options QueueInspectOptions) (amqpDriver.Queue, error)

Inspect provides queue metadata given queue name.

func (*Queue) Purge

func (queue *Queue) Purge(name string, noWait bool, options QueuePurgeOptions) (int, error)

Purge removes all non-consumed message(s) from the specified queue.

func (*Queue) Unbind

func (queue *Queue) Unbind(options QueueUnbindOptions) error

Unbind removes a queue subscription from an exchange to discontinue receiving message(s).

type QueueBindOptions

type QueueBindOptions struct {
	ConnectionID int
	QueueName    string
	ExchangeName string
	RoutingKey   string
	NoWait       bool
	Args         amqpDriver.Table
}

QueueBindOptions provides options when binding a queue to an exchange in order to receive message(s).

type QueueDeleteOptions added in v0.4.0

type QueueDeleteOptions struct {
	ConnectionID int
}

QueueDeleteOptions provide options when deleting a queue.

type QueueInspectOptions added in v0.4.0

type QueueInspectOptions struct {
	ConnectionID int
}

QueueInspectOptions provide options when inspecting a queue.

type QueueOptions

type QueueOptions struct {
	ConnectionURL string
}

QueueOptions defines configuration settings for accessing a queue.

type QueuePurgeOptions added in v0.4.0

type QueuePurgeOptions struct {
	ConnectionID int
}

QueuePurgeOptions provide options when purging (emptying) a queue.

type QueueUnbindOptions

type QueueUnbindOptions struct {
	ConnectionID int
	QueueName    string
	ExchangeName string
	RoutingKey   string
	Args         amqpDriver.Table
}

QueueUnbindOptions provides options when unbinding a queue from an exchange to stop receiving message(s).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL