bridge

package
v0.0.0-...-782b6ae Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2020 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package bridge contains the MQTT-NATS bridge server implementation

Index

Constants

View Source
const (
	// StateInfant is set when the client is created and awaits a Connect packet
	StateInfant = byte(iota)

	// StateConnected is set once a successful connection has been established
	StateConnected

	// StateDisconnected is set when a disconnect packet arrives or when a non recoverable error occurs.
	StateDisconnected
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bridge

type Bridge interface {
	jsonstream.Consumer
	jsonstream.Streamer

	Server
	Done() <-chan bool
	Restart(ready *sync.WaitGroup) error
	Serve(ready *sync.WaitGroup) error
	ServeClient(conn net.Conn)
	Shutdown() error
}

A Bridge extends the Server with methods needed to start, restard, terminate, and save/restore the Server state.

This functionality is broken into a separate interface to make it easier to mock the Server part when testing a Client

func New

func New(opts *Options, logger logger.Logger) (Bridge, error)

New creates a new Bridge configured using the given options and logger.

type Client

type Client interface {
	// Serve starts the read and write loops and then waits for them to finish which
	// normally happens after the receipt of a disconnect packet
	Serve()

	// State returns the current client state
	State() byte

	// PublishResponse publishes a packet to the client in response to a subscription
	PublishResponse(qos byte, pp *pkg.Publish)

	// SetDisconnected will end the read and write loop and eventually cause Serve() to end.
	SetDisconnected(error)
}

A Client represents a connection from a client.

func NewClient

func NewClient(s Server, log logger.Logger, conn net.Conn) Client

NewClient returns a new Client instance with StateInfant state.

type Options

type Options struct {
	// Path to file where the bridge is persisted. Can be empty if no persistence is desired
	StoragePath string

	// NATSUrls is a comma separated list of URLs used when connecting to NATS
	NATSUrls string

	// RetainedRequestTopic is a NATS topic that a NATS client can publish to after doing a subscribe
	// in order to retrieve any messages that are retained for that subscription. The payload must be
	// the verbatim NATS subscription. Retained messages that matches the subscription will be published
	// to the reply-to topic in the form of a JSON list of objects with a "subject" string and a "payload"
	// base64 encoded string
	RetainedRequestTopic string

	// Port is the MQTT port
	Port int

	// RepeatRate is the delay in milliseconds between publishing packets that originated in this server
	// that have QoS > 0 but hasn't been acknowledged.
	RepeatRate int

	// NATSOpts are options specific to the NATS connection
	NATSOpts []nats.Option

	TLSTimeout float64
	TLSCert    string
	TLSKey     string
	TLSCaCert  string
	TLSConfig  *tls.Config
	TLS        bool
	TLSVerify  bool
	TLSMap     bool

	// Debug enables debug level log output
	Debug bool
}

Options contains all configuration options for the mqtt-nats bridge.

type ReplyTopic

type ReplyTopic struct {
	// contains filtered or unexported fields
}

ReplyTopic represents the decoded form of the NATS reply-topic that the bridge uses to track messages that are in need of an ACK.

func NewReplyTopic

func NewReplyTopic(s Session, pp *pkg.Publish) *ReplyTopic

NewReplyTopic creates a new ReplyTopic based on a pkg.Publish packet.

func ParseReplyTopic

func ParseReplyTopic(s string) *ReplyTopic

ParseReplyTopic creates a new ReplyTopic by parsing a NATS reply-to string

func (*ReplyTopic) ClientID

func (r *ReplyTopic) ClientID() string

ClientID returns the ID of the client where the message originated

func (*ReplyTopic) Flags

func (r *ReplyTopic) Flags() byte

Flags returns the packet flags of the original packet

func (*ReplyTopic) PacketID

func (r *ReplyTopic) PacketID() uint16

PacketID returns the packet identifier of the original packet

func (*ReplyTopic) SessionID

func (r *ReplyTopic) SessionID() string

SessionID returns the ID of the client session within the mqtt-nats bridge

func (*ReplyTopic) String

func (r *ReplyTopic) String() string

String returns the NATS string form of the reply-topic

type Server

type Server interface {
	pkg.IDManager
	SessionManager() SessionManager
	ManageClient(c Client)
	NatsConn(creds *pkg.Credentials) (*nats.Conn, error)
	HandleRetain(pp *pkg.Publish) *pkg.Publish
	PublishMatching(sp *pkg.Subscribe, c Client)
	PublishWill(will *pkg.Will, creds *pkg.Credentials) error
}

A Server implements the methods needed to support a Client connection.

type Session

type Session interface {
	jsonstream.Consumer
	jsonstream.Streamer

	// ID returns an identifier that is unique for this session
	ID() string

	// ClientID returns the id of the client that this session belongs to
	ClientID() string

	// Destroy the session
	Destroy()

	// AckRequested remembers the given subscription which represents an awaited ACK
	// for the given packetID
	AckRequested(uint16, *nats.Subscription)

	// AwaitsAck returns true if a subscription associated with the given packet identifier
	// is currently waiting for an Ack.
	AwaitsAck(uint16) bool

	// AckReceived will delete pending ack subscription from the session and return them. It is up
	// to the caller to cancel the returned subscriptions.
	AckReceived(uint16) []*nats.Subscription

	// ClientAckRequested remembers the id of a packet which has been sent to the client. The packet stems from a NATS
	// subscription with QoS level > 0 and it is now expected that the client sends an PubACK back to which can be
	// propagated to the reply-to address.
	ClientAckRequested(*pkg.Publish)

	// ClientAckReceived will close a pending response ack subscription and forward the ACK to the
	// replyTo subject. It returns whether or not such an ack was pending
	ClientAckReceived(uint16, *nats.Conn) bool

	// Resend all messages that the client hasn't acknowledged
	ResendClientUnack(c *client)

	// RestoreAckSubscriptions called when a client restores an old session. THe method restores subscriptions that
	// were peristed and then loaded again.
	RestoreAckSubscriptions(c *client)
}

A Session contains data associated with a client ID. The session might survive client connections.

type SessionManager

type SessionManager interface {
	// Create creates a new session for the given clientID. Any previous session registered for
	// the given id is discarded
	Create(clientID string) Session

	// Get returns an existing session for the given clientID or nil if no such session exists
	Get(clientID string) Session

	// Remove removes any session for the given clientID
	Remove(clientID string)
}

A SessionManager manages sessions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL