Documentation ¶
Overview ¶
Package bridge contains the MQTT-NATS bridge server implementation
Index ¶
Constants ¶
const ( // StateInfant is set when the client is created and awaits a Connect packet StateInfant = byte(iota) // StateConnected is set once a successful connection has been established StateConnected // StateDisconnected is set when a disconnect packet arrives or when a non recoverable error occurs. StateDisconnected )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bridge ¶
type Bridge interface { jsonstream.Consumer jsonstream.Streamer Server Done() <-chan bool Restart(ready *sync.WaitGroup) error Serve(ready *sync.WaitGroup) error ServeClient(conn net.Conn) Shutdown() error }
A Bridge extends the Server with methods needed to start, restard, terminate, and save/restore the Server state.
This functionality is broken into a separate interface to make it easier to mock the Server part when testing a Client
type Client ¶
type Client interface { // Serve starts the read and write loops and then waits for them to finish which // normally happens after the receipt of a disconnect packet Serve() // State returns the current client state State() byte // PublishResponse publishes a packet to the client in response to a subscription PublishResponse(qos byte, pp *pkg.Publish) // SetDisconnected will end the read and write loop and eventually cause Serve() to end. SetDisconnected(error) }
A Client represents a connection from a client.
type Options ¶
type Options struct { // Path to file where the bridge is persisted. Can be empty if no persistence is desired StoragePath string // NATSUrls is a comma separated list of URLs used when connecting to NATS NATSUrls string // RetainedRequestTopic is a NATS topic that a NATS client can publish to after doing a subscribe // in order to retrieve any messages that are retained for that subscription. The payload must be // the verbatim NATS subscription. Retained messages that matches the subscription will be published // to the reply-to topic in the form of a JSON list of objects with a "subject" string and a "payload" // base64 encoded string RetainedRequestTopic string // Port is the MQTT port Port int // RepeatRate is the delay in milliseconds between publishing packets that originated in this server // that have QoS > 0 but hasn't been acknowledged. RepeatRate int // NATSOpts are options specific to the NATS connection NATSOpts []nats.Option TLSTimeout float64 TLSCert string TLSKey string TLSCaCert string TLSConfig *tls.Config TLS bool TLSVerify bool TLSMap bool // Debug enables debug level log output Debug bool }
Options contains all configuration options for the mqtt-nats bridge.
type ReplyTopic ¶
type ReplyTopic struct {
// contains filtered or unexported fields
}
ReplyTopic represents the decoded form of the NATS reply-topic that the bridge uses to track messages that are in need of an ACK.
func NewReplyTopic ¶
func NewReplyTopic(s Session, pp *pkg.Publish) *ReplyTopic
NewReplyTopic creates a new ReplyTopic based on a pkg.Publish packet.
func ParseReplyTopic ¶
func ParseReplyTopic(s string) *ReplyTopic
ParseReplyTopic creates a new ReplyTopic by parsing a NATS reply-to string
func (*ReplyTopic) ClientID ¶
func (r *ReplyTopic) ClientID() string
ClientID returns the ID of the client where the message originated
func (*ReplyTopic) Flags ¶
func (r *ReplyTopic) Flags() byte
Flags returns the packet flags of the original packet
func (*ReplyTopic) PacketID ¶
func (r *ReplyTopic) PacketID() uint16
PacketID returns the packet identifier of the original packet
func (*ReplyTopic) SessionID ¶
func (r *ReplyTopic) SessionID() string
SessionID returns the ID of the client session within the mqtt-nats bridge
func (*ReplyTopic) String ¶
func (r *ReplyTopic) String() string
String returns the NATS string form of the reply-topic
type Server ¶
type Server interface { pkg.IDManager SessionManager() SessionManager ManageClient(c Client) NatsConn(creds *pkg.Credentials) (*nats.Conn, error) HandleRetain(pp *pkg.Publish) *pkg.Publish PublishMatching(sp *pkg.Subscribe, c Client) PublishWill(will *pkg.Will, creds *pkg.Credentials) error }
A Server implements the methods needed to support a Client connection.
type Session ¶
type Session interface { jsonstream.Consumer jsonstream.Streamer // ID returns an identifier that is unique for this session ID() string // ClientID returns the id of the client that this session belongs to ClientID() string // Destroy the session Destroy() // AckRequested remembers the given subscription which represents an awaited ACK // for the given packetID AckRequested(uint16, *nats.Subscription) // AwaitsAck returns true if a subscription associated with the given packet identifier // is currently waiting for an Ack. AwaitsAck(uint16) bool // AckReceived will delete pending ack subscription from the session and return them. It is up // to the caller to cancel the returned subscriptions. AckReceived(uint16) []*nats.Subscription // ClientAckRequested remembers the id of a packet which has been sent to the client. The packet stems from a NATS // subscription with QoS level > 0 and it is now expected that the client sends an PubACK back to which can be // propagated to the reply-to address. ClientAckRequested(*pkg.Publish) // ClientAckReceived will close a pending response ack subscription and forward the ACK to the // replyTo subject. It returns whether or not such an ack was pending ClientAckReceived(uint16, *nats.Conn) bool // Resend all messages that the client hasn't acknowledged ResendClientUnack(c *client) // RestoreAckSubscriptions called when a client restores an old session. THe method restores subscriptions that // were peristed and then loaded again. RestoreAckSubscriptions(c *client) }
A Session contains data associated with a client ID. The session might survive client connections.
type SessionManager ¶
type SessionManager interface { // Create creates a new session for the given clientID. Any previous session registered for // the given id is discarded Create(clientID string) Session // Get returns an existing session for the given clientID or nil if no such session exists Get(clientID string) Session // Remove removes any session for the given clientID Remove(clientID string) }
A SessionManager manages sessions.