nats

package
v0.12.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2023 License: Apache-2.0 Imports: 24 Imported by: 2

README

NATS as a Pub-Sub Protocol Module

Nats is a CNCF-certified pub-sub, streaming, and communications system built in Go for the cloud, IoT, and edge.

Nats is forked into the aperture-2.0 branch, which removes all listeners and other complex OS-specific code, significantly simplifying the codebase. It then implements these functions with directives to the Aperture stack.

Nats uses nkeys (public-private key crypto) with flexible wrappers internally. This made it simple to import Aperture's flexible public/private key interface system with some simple glue code. Nats Accounts map directly to Bifrost Peers. The Nats 2.0 systems for authentication, authorization, and permissions are all effectively integrated with the Aperture stack.

The result is a highly automated Nats deployment which can work across arbitrary network transports, firewalls, air-gaps, compute architectures, even in the web browser. Nats is now communicating over QUIC-over-UDP instead of TCP, but could communicate over other exotic transports like Xbee or even sound (chirp.js).

This fully supports all protocols for Nats. Previously, NATS would require manual configuration for TLS, accounts, and HTTP listeners. With Bifrost, these protocols are exposed as stream endpoints bound to the peer's public key.

Bifrost's one-size-fits-all PubSub controller effectively manages and automates connections between next-hop routing peers. As the system supports the exact same drop-in protocol compatibility with upstream nats and minimal modifications, the system also inter-operates with any existing or third-party nats clusters or clients as expected.

NATS as a communications backend for Bifrost

An existing NATS deployment or NATS-client protocol compatible cluster can also be used as backing infrastructure for Bifrost. The "NATS Client Controller" operates as a client to existing NATS infrastructure, importing the nats-go client and speaking the protocol over HTTP/s.

Bifrost imports the following capabilities from the NATS 2.0 client:

  • Account: the controller has a private key which is used to authenticate.
    • Bifrost Peer maps to Nkeys PKI and uses the PKI challenge auth method.
  • Stream: peers communicate over a Link via Transport backed by a NATS Stream
    • Provides full two-way communication connectivity between peers.
    • Requires permission to import/export streams + services.
    • Peer A publishes a "Session Establish" service to NATS.
    • Peer B discovers the Service and sends a request to dial the Transport.
    • Peer A acks with a unique identifier for the session.
    • The unique identifiers from both peers are mixed to form a shared secret S.
    • Secret S is used to generate the session ID and stream ID prefix.
    • Peer A publishes a Stream {session-id} to peer B and vise-versa.
    • Packet-Conn protocol (currently Quic) is used to TLS handshake.
    • Session continues as usual.
    • If session is closed for any reason the streams are unpublished.
    • Embedded NATS hosts can communicate over this encrypted Link.
  • PubSub: use existing NATS.io infrastructure for pub-sub.
    • Requires permission to publish/subscribe to arbitrary topics below a user-specified prefix.
    • The Bifrost pub-sub topic is transformed w/ the prefix and passed to Nats.
    • Trusts the underlying NATS infrastructure to not forge messages.

This allows existing deployment tools for NATS to be used as a backing infrastructure for Bifrost and all other systems to communicate with.

Documentation

Index

Constants

View Source
const (
	NatsRouterID = protocol.ID("nats.io/2/router") // nats 2.0 router
	NatsClientID = protocol.ID("nats.io/2/client") // nats 2.0 client API
)

Variables

View Source
var (
	NatsConnType_name = map[int32]string{
		0: "NatsConnType_UNKNOWN",
		1: "NatsConnType_CLIENT",
		2: "NatsConnType_ROUTER",
	}
	NatsConnType_value = map[string]int32{
		"NatsConnType_UNKNOWN": 0,
		"NatsConnType_CLIENT":  1,
		"NatsConnType_ROUTER":  2,
	}
)

Enum value maps for NatsConnType.

View Source
var (
	ErrInvalidLength        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflow          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group")
)
View Source
var File_github_com_aperturerobotics_bifrost_pubsub_nats_nats_proto protoreflect.FileDescriptor

Functions

func NewNats

func NewNats(
	ctx context.Context,
	le *logrus.Entry,
	handler pubsub.PubSubHandler,
	cc *Config,
	peer peer.Peer,
) (pubsub.PubSub, error)

NewNats constructs a new Nats PubSub router.

Types

type Config

type Config struct {

	// ClusterName is the cluster ID string to use.
	// This must be the same on all nodes.
	// If unset, uses the protocol ID.
	ClusterName string `protobuf:"bytes,1,opt,name=cluster_name,json=clusterName,proto3" json:"cluster_name,omitempty"`
	// PublishHashType is the hash type to use when signing published messages.
	// Defaults to sha256
	PublishHashType hash.HashType `` /* 128-byte string literal not displayed */
	// LogDebug turns on extended debugging logging.
	LogDebug bool `protobuf:"varint,3,opt,name=log_debug,json=logDebug,proto3" json:"log_debug,omitempty"`
	// LogTrace turns on tracing logging.
	// implies log_debug.
	LogTrace bool `protobuf:"varint,4,opt,name=log_trace,json=logTrace,proto3" json:"log_trace,omitempty"`
	// LogTraceVrebose turns on verbose tracing logging.
	// Implies log_trace and log_debug.
	LogTraceVerbose bool `protobuf:"varint,5,opt,name=log_trace_verbose,json=logTraceVerbose,proto3" json:"log_trace_verbose,omitempty"`
	// contains filtered or unexported fields
}

Config configures the nats router, hosting a nats.io routing node. This uses nats 2.0 accounts - an Account maps to a Peer.

func (*Config) ApplyOptions

func (c *Config) ApplyOptions(opts *nats_server.Options) error

ApplyOptions applies the nats server options.

func (*Config) CloneGenericVT added in v0.8.3

func (m *Config) CloneGenericVT() proto.Message

func (*Config) CloneVT added in v0.8.3

func (m *Config) CloneVT() *Config

func (*Config) Descriptor deprecated

func (*Config) Descriptor() ([]byte, []int)

Deprecated: Use Config.ProtoReflect.Descriptor instead.

func (*Config) EqualVT added in v0.3.0

func (this *Config) EqualVT(that *Config) bool

func (*Config) GetClusterName

func (x *Config) GetClusterName() string

func (*Config) GetLogDebug

func (x *Config) GetLogDebug() bool

func (*Config) GetLogTrace

func (x *Config) GetLogTrace() bool

func (*Config) GetLogTraceVerbose

func (x *Config) GetLogTraceVerbose() bool

func (*Config) GetPublishHashType

func (x *Config) GetPublishHashType() hash.HashType

func (*Config) MarshalToSizedBufferVT added in v0.2.0

func (m *Config) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*Config) MarshalToVT added in v0.2.0

func (m *Config) MarshalToVT(dAtA []byte) (int, error)

func (*Config) MarshalVT added in v0.2.0

func (m *Config) MarshalVT() (dAtA []byte, err error)

func (*Config) ProtoMessage

func (*Config) ProtoMessage()

func (*Config) ProtoReflect added in v0.2.0

func (x *Config) ProtoReflect() protoreflect.Message

func (*Config) Reset

func (x *Config) Reset()

func (*Config) SizeVT added in v0.2.0

func (m *Config) SizeVT() (n int)

func (*Config) String

func (x *Config) String() string

func (*Config) UnmarshalVT added in v0.2.0

func (m *Config) UnmarshalVT(dAtA []byte) error

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type KeyPair

type KeyPair struct {
	// contains filtered or unexported fields
}

KeyPair wraps a Bifrost key pair.

func NewKeyPair

func NewKeyPair(privKey crypto.PrivKey, pubKey crypto.PubKey) (*KeyPair, error)

NewKeyPair builds a new keypair. Public key must be specified unless private key is specified. Private key must be specified if public key is not.

func (*KeyPair) PrivateKey

func (k *KeyPair) PrivateKey() ([]byte, error)

PrivateKey will return an error since this is not available for public key only KeyPairs.

func (*KeyPair) PublicKey

func (k *KeyPair) PublicKey() (string, error)

PublicKey will return the encoded public key associated with the KeyPair. All KeyPairs have a public key.

func (*KeyPair) Seed

func (k *KeyPair) Seed() ([]byte, error)

Seed returns the seed used to make the key.

func (*KeyPair) Sign

func (k *KeyPair) Sign(input []byte) ([]byte, error)

Sign will return an error since this is not available for public key only KeyPairs.

func (*KeyPair) Verify

func (k *KeyPair) Verify(input []byte, sig []byte) error

Verify will verify the input against a signature utilizing the public key.

func (*KeyPair) Wipe

func (k *KeyPair) Wipe()

type Nats

type Nats struct {
	// contains filtered or unexported fields
}

Nats implements the nats router.

func (*Nats) AddPeerStream

func (m *Nats) AddPeerStream(tpl pubsub.PeerLinkTuple, initiator bool, mstrm link.MountedStream)

AddPeerStream adds a negotiated peer stream. Two streams will be negotiated, one outgoing, one incoming. The pubsub should communicate over the stream.

func (*Nats) AddSubscription

func (n *Nats) AddSubscription(ctx context.Context, privKey crypto.PrivKey, channelID string) (pubsub.Subscription, error)

AddSubscription adds a channel subscription, returning a subscription handle.

Uses the router peer private key.

An alternate approach is to use a client connection.

func (*Nats) BuildClient

func (n *Nats) BuildClient(ctx context.Context, privKey crypto.PrivKey, opts ...nats_client.Option) (*nats_client.Conn, error)

BuildClient builds a client for the nats server, creating a client connection.

Note: the servers list & dialer will be overwritten.

func (*Nats) Close

func (m *Nats) Close()

Close closes the pubsub.

func (*Nats) Execute

func (m *Nats) Execute(ctx context.Context) error

Execute executes the PubSub routines.

func (*Nats) GetOrBuildCommonClient

func (n *Nats) GetOrBuildCommonClient(ctx context.Context) (*nats_client.Conn, error)

GetOrBuildCommonClient returns the common nats client.

func (*Nats) SubscribeSync

func (n *Nats) SubscribeSync(
	ctx context.Context,
	channelID string,
) (*nats_client.Subscription, *nats_client.Conn, error)

SubscribeSync passes through to the nats client SubscribeSync call.

type NatsConnType

type NatsConnType int32

NatsConnType indicates the type of nats conn a stream represents.

const (
	// NatsConnType_UNKNOWN is the unknown type.
	NatsConnType_NatsConnType_UNKNOWN NatsConnType = 0
	// NatsConnType_CLIENT is the client connection type.
	NatsConnType_NatsConnType_CLIENT NatsConnType = 1
	// NatsConnType_ROUTER is the router-router connection type.
	NatsConnType_NatsConnType_ROUTER NatsConnType = 2 // TODO other types
)

func ProtocolIDToStreamType

func ProtocolIDToStreamType(id protocol.ID) NatsConnType

ProtocolIDToStreamType converts a protocol ID to a conn type.

func (NatsConnType) Descriptor added in v0.2.0

func (NatsConnType) Enum added in v0.2.0

func (x NatsConnType) Enum() *NatsConnType

func (NatsConnType) EnumDescriptor deprecated

func (NatsConnType) EnumDescriptor() ([]byte, []int)

Deprecated: Use NatsConnType.Descriptor instead.

func (NatsConnType) Number added in v0.2.0

func (NatsConnType) String

func (x NatsConnType) String() string

func (NatsConnType) Type added in v0.2.0

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL