Conduit Connector NATS JetStream
General
The NATS JetStream connector is one of Conduit plugins. It provides both, a source and a destination NATS JetStream connector.
Prerequisites
How to build it
Run make
.
Testing
Run make test
to run all the unit and integration tests, which require Docker and Docker Compose to be installed and running. The command will handle starting and stopping docker containers for you.
Source
Connection and authentication
The NATS JetStream connector connects to a NATS server or a cluster with the required parameters urls
, subject
and mode
. If your NATS server has a configured authentication you can pass an authentication details in the connection URL. For example, for a token authentication the url will look like: nats://mytoken@127.0.0.1:4222
, and for a username/password authentication: nats://username:password@127.0.0.1:4222
. But if your server is using NKey or Credentials file for authentication you must configure them via separate configuration parameters.
Receiving messages
The connector creates a durable NATS consumer which means it's able to read messages that were written to a NATS stream before the connector was created, unless configured otherwise. The deliverPolicy
configuration parameter allows you to control this behavior.
- If the
deliverPolicy
is equal to new
the connector will only consume messages which were created after the connector started.
- If the
deliverPolicy
is equal to all
the connector will consume all messages in a stream.
The connector allows you to configure a size of a pending message buffer. If your NATS server has hundreds of thousands of messages and a high frequency of their writing, it's highly recommended to set the bufferSize
parameter high enough (65536
or more, depending on how much RAM you have). Otherwise, you risk getting a slow consumers problem.
Position handling
The position is initialized based on incoming messages. To ensure the ability to continue reading from it, the most important message metadata is stored within it.
Configuration
The config passed to Configure can contain the following fields.
name |
description |
required |
default |
urls |
A list of connection URLs joined by comma. Must be a valid URLs. Examples:
nats://127.0.0.1:1222
nats://127.0.0.1:1222,nats://127.0.0.1:1223
nats://myname:password@127.0.0.1:4222
nats://mytoken@127.0.0.1:4222 |
true |
|
subject |
A name of a subject from which the connector should read. It is possible to specify a name of a subject that belongs to a stream, but not the one you specified, the connector in this case will handle messages properly. |
true |
|
connectionName |
Optional connection name which will come in handy when it comes to monitoring |
false |
conduit-connection-<random_uuid> |
nkeyPath |
A path pointed to a NKey pair. Must be a valid file path. Required if your NATS server is using NKey authentication. |
false |
|
credentialsFilePath |
A path pointed to a credentials file. Must be a valid file path. Required if your NATS server is using file credentials authentication. |
false |
|
tls.clientCertPath |
A path pointed to a TLS client certificate, must be present if tls.clientPrivateKeyPath field is also present. Must be a valid file path. Required if your NATS server is using TLS. |
false |
|
tls.clientPrivateKeyPath |
A path pointed to a TLS client private key, must be present if tls.clientCertPath field is also present. Must be a valid file path. Required if your NATS server is using TLS. |
false |
|
tls.rootCACertPath |
A path pointed to a TLS root certificate, provide if you want to verify server’s identity. Must be a valid file path |
false |
|
maxReconnects |
Sets the number of NATS server reconnect attempts that will be tried before giving up. If negative, then it will never give up trying to reconnect. |
false |
5 |
reconnectWait |
Sets the time to backoff after attempting a reconnect to a NATS server that the connector was already connected to previously. |
false |
5s |
bufferSize |
A buffer size for consumed messages. It must be set to avoid the slow consumers problem. Minimum allowed value is 64 |
false |
1024 |
durable |
The name of the Consumer, if set will make a consumer durable, allowing resuming consumption where left off |
false |
conduit-<random_uuid> |
deliverSubject |
Specifies the JetStream consumer deliver subject. |
false |
<durable>.conduit |
deliverPolicy |
Defines where in the stream the connector should start receiving messages. Allowed values are new and all .
-all - The connector will start receiving from the earliest available message. -new - When first consuming messages, the connector will only start receiving messages that were created after the consumer was created.
If the connector starts with non-zero position, the deliver policy will be DeliverByStartSequence and the connector will read messages from that position |
false |
all |
ackPolicy |
Defines how messages should be acknowledged. Allowed values are explicit , all and none
- explicit - each individual message must be acknowledged - all - if the connector receives a series of messages, it only has to ack the last one it received - none - the connector doesn’t have to ack any messages |
false |
explicit |
Destination
Sending messages
The connector currently only supports synchronous message sending.
Configuration
The config passed to Configure can contain the following fields.
name |
description |
required |
default |
urls |
A list of connection URLs joined by comma. Must be a valid URLs. Examples:
nats://127.0.0.1:1222
nats://127.0.0.1:1222,nats://127.0.0.1:1223
nats://myname:password@127.0.0.1:4222
nats://mytoken@127.0.0.1:4222 |
true |
|
subject |
A name of a subject to which the connector should write. |
true |
|
connectionName |
Optional connection name which will come in handy when it comes to monitoring |
false |
conduit-connection-<random_uuid> |
nkeyPath |
A path pointed to a NKey pair. Must be a valid file path. Required if your NATS server is using NKey authentication. |
false |
|
credentialsFilePath |
A path pointed to a credentials file. Must be a valid file path. Required if your NATS server is using file credentials authentication. |
false |
|
tls.clientCertPath |
A path pointed to a TLS client certificate, must be present if tls.clientPrivateKeyPath field is also present. Must be a valid file path. Required if your NATS server is using TLS. |
false |
|
tls.clientPrivateKeyPath |
A path pointed to a TLS client private key, must be present if tls.clientCertPath field is also present. Must be a valid file path. Required if your NATS server is using TLS. |
false |
|
tls.rootCACertPath |
A path pointed to a TLS root certificate, provide if you want to verify server’s identity. Must be a valid file path |
false |
|
maxReconnects |
Sets the number of NATS server reconnect attempts that will be tried before giving up. If negative, then it will never give up trying to reconnect. |
false |
5 |
reconnectWait |
Sets the time to backoff after attempting a reconnect to a NATS server that the connector was already connected to previously. |
false |
5s |
retryWait |
Sets the timeout to wait for a message to be resent, if send fails. |
false |
5s |
retryAttempts |
Sets a numbers of attempts to send a message, if send fails. |
false |
3 |