nats

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2023 License: MIT Imports: 7 Imported by: 0

README

xk6-nats

This is a k6 extension using the xk6 system, that allows to use NATS protocol.

This extension is a fork of ydarias/k6-nats that adds headers support and some other features. It is provided with no warranty, use it at your own risk.

Build

To build a k6 binary with this extension, first ensure you have the prerequisites:

  1. Install xk6 framework for extending k6:
go install go.k6.io/xk6/cmd/xk6@latest
  1. Build the binary:
xk6 build --with github.com/fernandoescolar/xk6-nats@latest
  1. Run a test
k6 run -e NATS_HOSTNAME=localhost test/test.js

To run JetStream test, make sure NATS JetStream is started, e.g. nats-server -js

k6 run -e NATS_HOSTNAME=localhost test/test_jetstream.js

To run publish with headers test, make sure NATS JetStream is started, e.g. nats-server -js

./k6 run -e NATS_HOSTNAME=localhost test/test_headers.js

API

Nats

A Nats instance represents the connection with the NATS server, and it is created with new Nats(configuration), where configuration attributes are:

Attribute Description
servers (mandatory) is the list of servers where NATS is available (e.g. [nats://localhost:4222])
unsafe (optional) allows running with self-signed certificates when doing tests against a testing environment, it is a boolean value (default value is false)
token (optional) is the value of the token used to connect to the NATS server

Example:

import {Nats} from 'k6/x/nats'

const natsConfig = {
    servers: ['nats://localhost:4222'],
    unsafe: true,
}

const nats = new Nats(natsConfig)
Publishing

You can publish messages to a topic using the following functions:

Function Description
publish(topic, payload) publish a new message using the topic (string) and the given payload that is a string representation that later is serialized as a byte array
publisWithHeaders(topic, payload, headers) publish a new message using the topic (string), the given payload that is a string representation that later is serialized as a byte array and the headers
publishMsg(message) publish a new message using the message (object) that has the following attributes: topic (string), data (string), raw(byte array) and headers (object)
request(topic, payload, headers) sends a request to the topic (string) and the given payload as string representation and the headers, and returns a message

Example:

const publisher = new Nats(natsConfig)

publisher.publish('topic', 'data')
publisher.publishWithHeaders('topic', 'data', { 'header1': 'value1' })
publisher.publishMsg({ topic: 'topic', data: 'string data', headers: { 'header1': 'value1' } })
publisher.publishMsg({ topic: 'topic', raw: [ 0, 1, 2, 3 ], headers: { 'header1': 'value1' } })
const message = publisher.request('topic', 'data', { 'header1': 'value1' })
Subscribing

You can subscribe to a topic using the following functions:

Function Description
subscribe(topic, callback) subscribe to a topic (string) and execute the callback function when a message is received, it returns a subscription

Example:

const subscriber = new Nats(natsConfig)
const subscription = subscriber.subscribe('topic', (msg) => {
    console.log(msg.data)
})
// ...
subscription.close()
JetStream

You can use JetStream Pub/Sub in the same way as NATS Pub/Sub. The only difference is that you need to setup the stream before publishing or subscribing to it.

The configuration is the same as the one used in the nats-io's StreamConfig:

Attribute Description
name (mandatory) is the name of the stream
description (optional) is the description of the stream
subjects (mandatory) is the list of subjects that the stream will be listening to
retention (optional) is the retention policy of the stream, it can be limits, interest, workqueue or stream
max_consumers (optional) is the maximum number of consumers that the stream will allow
max_msgs (optional) is the maximum number of messages that the stream will store
max_bytes (optional) is the maximum number of bytes that the stream will store
max_age (optional) is the maximum age of the messages that the stream will store
max_msg_size (optional) is the maximum size of the messages that the stream will store
discard (optional) is the discard policy of the stream, it can be old, new or none
storage (optional) is the type of storage that the stream will use, it can be file or memory
replicas (optional) is the number of replicas that the stream will have
no_ack (optional) is a boolean value that indicates if the stream will use acks or not

Example:

const streamConfig = {
    name: "mock",
    subjects: ["foo"],
    max_msgs_per_subject: 100,
    discard: 0,
    storage: 1
}

const publisher = new Nats(natsConfig)
publisher.jetStreamSetup(streamConfig)
JetStream operations

Once the stream is setup, you can publish and subscribe to it using the following functions:

Function Description
jetStreamSetup(config) setup a stream with the given configuration
jetStreamPublish(topic, payload) publish a new message using the topic (string) and the given payload that is a string representation that later is serialized as a byte array
jetStreamPublishWithHeaders(topic, payload, headers) publish a new message using the topic (string), the given payload that is a string representation that later is serialized as a byte array and the headers
jetStreamPublishMsg(message) publish a new message using the message (object) that has the following attributes: topic (string), data (string), raw(byte array) and headers (object)
jetStreamSubscribe(topic, callback) subscribe to a topic (string) and execute the callback function when a message is received, it returns a subscription

Example:

const subscriber = new Nats(natsConfig)
publisher.jetStreamSetup(streamConfig)
const subscription = subscriber.jetStreamSubscribe('mock', (msg) => {
    console.log(msg.data)
})

const publisher = new Nats(natsConfig)
publisher.jetStreamPublish('foo', 'data')
publisher.jetStreamPublishWithHeaders('foo', 'data', { 'header1': 'value1' })
publisher.jetStreamPublishMsg({ topic: 'topic', data: 'string data', headers: { 'header1': 'value1' } })
publisher.jetStreamPublishMsg({ topic: 'topic', raw: [ 0, 1, 2, 3 ], headers: { 'header1': 'value1' } })

// ...

subscription.close()
Return values

A subscription return value has the following methods:

Method Description
close() closes the subscription

A message return value has the following attributes:

Attribute Description
raw the payload in byte array format
data the payload in string format
topic the topic where the message was published
headers the headers of the message

Examples

You can find some examples in the examples folder. To run them, you need to have a NATS server running and then run the following command:

k6 run -e NATS_HOSTNAME=your_nats_server_host examples/simple.js
k6 run -e NATS_HOSTNAME=your_nats_server_host examples/withHeaders.js

Or you can check the test folder to see how to use the extension.

License

The original source code of this project belongs to ydarias and has not been relased under any license.

The source code of this project is released under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Configuration

type Configuration struct {
	Servers []string
	Unsafe  bool
	Token   string
}

type Message

type Message struct {
	Raw    []byte
	Data   string
	Topic  string
	Header map[string]string
}

type MessageHandler

type MessageHandler func(Message)

type Nats

type Nats struct {
	// contains filtered or unexported fields
}

ModuleInstance represents an instance of the module for every VU.

func (*Nats) Close

func (n *Nats) Close()

func (*Nats) Exports

func (mi *Nats) Exports() modules.Exports

Exports implements the modules.Instance interface and returns the exports of the JS module.

func (*Nats) JetStreamDelete

func (n *Nats) JetStreamDelete(name string) error

func (*Nats) JetStreamPublish

func (n *Nats) JetStreamPublish(topic string, message string) error

func (*Nats) JetStreamPublishMsg

func (n *Nats) JetStreamPublishMsg(msg *Message) error

func (*Nats) JetStreamPublishWithHeaders

func (n *Nats) JetStreamPublishWithHeaders(topic, message string, headers map[string]string) error

func (*Nats) JetStreamSetup

func (n *Nats) JetStreamSetup(streamConfig *natsio.StreamConfig) error

Connects to JetStream and creates a new stream or updates it if exists already

func (*Nats) JetStreamSubscribe

func (n *Nats) JetStreamSubscribe(topic string, handler MessageHandler) (*Subscription, error)

func (*Nats) Publish

func (n *Nats) Publish(topic, message string) error

func (*Nats) PublishMsg

func (n *Nats) PublishMsg(msg *Message) error

func (*Nats) PublishWithHeaders

func (n *Nats) PublishWithHeaders(topic, message string, headers map[string]string) error

func (*Nats) Request

func (n *Nats) Request(subject, data string, headers map[string]string) (Message, error)

func (*Nats) Subscribe

func (n *Nats) Subscribe(topic string, handler MessageHandler) (*Subscription, error)

type RootModule

type RootModule struct{}

RootModule is the global module object type. It is instantiated once per test run and will be used to create k6/x/nats module instances for each VU.

func (*RootModule) NewModuleInstance

func (r *RootModule) NewModuleInstance(vu modules.VU) modules.Instance

NewModuleInstance implements the modules.Module interface and returns a new instance for each VU.

type Subscription

type Subscription struct {
	Close func() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL