bridge

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2024 License: MIT Imports: 25 Imported by: 0

README

MQTT-Bridge

MQTT-Bridge is a library that allows you to bridge protocols over MQTT, with a focus on gRPC. It provides two implementations:

  • Network Bridge: A low-level implementation that allows existing gRPC clients and servers to communicate over MQTT without modification. The network bridge provides a net.Listener and net.Conn interface.
  • gRPC Bridge: A higher-level implementation that works directly with MQTT messages while maintaining gRPC-style APIs. The gRPC bridge provides a grpc.ServiceRegistrar and grpc.ServiceInfoProvider interface.

In theory, the network bridge should work with anything that uses the net.Listener and net.Conn interfaces, such as HTTP servers and other networking libraries.

Example Overview

This example demonstrates how to use mqtt-bridge to enable gRPC-style communication over MQTT. The project includes two different implementations showing how to bridge gRPC and MQTT communications.

Overview

The example implements a simple Echo service with three types of RPCs:

  • Unary calls (simple request-response)
  • Server streaming (server sends multiple responses)
  • Bidirectional streaming (both client and server can send multiple messages)

Prerequisites

  • Go 1.19 or later
  • An MQTT broker (e.g., Mosquitto) running on localhost:1883
  • Protocol buffer compiler (protoc)

Service Definition

The Echo service is defined in the proto file:

startLine: 6
endLine: 15

Implementation Options

1. Network Bridge Implementation

The network bridge provides a low-level network implementation that allows existing gRPC clients and servers to communicate over MQTT without modification.

Server Setup
startLine: 34
endLine: 45
Client Setup
startLine: 37
endLine: 55

To run:

# Start the server
go run example/net_bridge/server/main.go

# In another terminal, start the client
go run example/net_bridge/client/main.go
2. gRPC Bridge Implementation

The gRPC bridge provides a higher-level abstraction that works directly with MQTT messages while maintaining gRPC-style APIs.

Server Setup
startLine: 33
endLine: 39
Client Setup
startLine: 30
endLine: 51

To run:

# Start the server
go run example/grpc_bridge/server/main.go

# In another terminal, start the client
go run example/grpc_bridge/client/main.go

Service Implementation

The Echo service implements three types of RPCs:

  1. Unary Call - Simple request-response:
startLine: 23
endLine: 33
  1. Server Streaming - Server sends multiple responses:
startLine: 36
endLine: 55
  1. Bidirectional Streaming - Both sides can send messages:
startLine: 58
endLine: 82

Key Features

  • Seamless conversion between gRPC and MQTT communication
  • Support for all gRPC communication patterns:
    • Unary calls
    • Server streaming
    • Client streaming
    • Bidirectional streaming
  • Automatic message framing and protocol handling
  • Integration with existing gRPC tooling
  • Choice between network-level and message-level implementations

Notes

  • The network bridge implementation is ideal when you want to use existing gRPC code over MQTT
  • The gRPC bridge implementation is better when you want to work directly with MQTT messages while maintaining gRPC-style APIs
  • Both implementations support the full range of gRPC features
  • Ensure your MQTT broker is properly configured and accessible before running the examples
  • The gRPC bridge implementation has not been tested for streaming RPCs, only unary has been tested as of now.

License

MIT License

Copyright 2024 Golain Systems Private Limited.

Documentation

Index

Constants

View Source
const (
	MinHeaderSize          = 16        // Fixed header size without StreamID
	MaxStreamIDSize        = 256       // Maximum length for StreamID
	MinSequenceNum         = 1         // Minimum sequence number
	MaxSequenceNum  uint64 = 1<<64 - 1 // Maximum sequence number (uint64 max)
)

Variables

View Source
var (
	MaxFragmentSize = 10 * 1024 // 10KB per fragment
	HeaderSize      = 16        // Fixed header size

)

Functions

func BuildTopicPath

func BuildTopicPath(pkg, service, method, sessionID, direction string) string

BuildTopicPath creates an MQTT topic path for a given service and method

func NewLargeMessage

func NewLargeMessage(size int) *bytes.Buffer

Add a new helper for creating large messages

func ParseTopicPath

func ParseTopicPath(topic string) (pkg, service, method, sessionID, direction string, err error)

ParseTopicPath extracts components from an MQTT topic path

func ReleaseLargeMessage

func ReleaseLargeMessage(buf *bytes.Buffer)

Add a helper to return large message buffers to the pool

func SetMaxFragmentSize

func SetMaxFragmentSize(size int)

func WithConnectionTimeout

func WithConnectionTimeout(timeout time.Duration) option

func WithLogger

func WithLogger(logger *zap.Logger) option

func WithMQTTClient

func WithMQTTClient(client mqtt.Client) option

Types

type BridgeError

type BridgeError struct {
	Op  string
	Err error
}

func (*BridgeError) Error

func (e *BridgeError) Error() string

func (*BridgeError) Unwrap

func (e *BridgeError) Unwrap() error

type Frame

type Frame struct {
	Header *Header
	Data   []byte
}

Frame represents a complete protocol frame including header and payload

func FrameMessage

func FrameMessage(data []byte, seqNum uint64, msgType MessageType) []Frame

FrameMessage splits a large message into frames

func NewFrame

func NewFrame(msgType MessageType, seqNum uint64, data []byte, opts ...FrameOption) (*Frame, error)

NewFrame creates a new Frame with validated parameters

func UnmarshalFrame

func UnmarshalFrame(data []byte) (Frame, error)

UnmarshalFrame converts a byte slice into a Frame

func (*Frame) Marshal

func (f *Frame) Marshal() []byte

MarshalFrame converts a Frame into a single byte slice ready for transmission

type FrameOption

type FrameOption func(*Frame) error

FrameOption defines options for frame creation

func WithFragmentation

func WithFragmentation(fragmentID uint16, total uint16, seq uint16, isLast bool) FrameOption

WithFragmentation sets fragmentation parameters for a frame

func WithStreamID

func WithStreamID(streamID string) FrameOption

WithStreamID sets the StreamID for a frame

type Header struct {
	Type           MessageType // 1 byte
	SequenceNumber uint64      // 8 bytes
	FragmentID     uint16      // 2 bytes
	FragmentTotal  uint16      // 2 bytes
	FragmentSeq    uint16      // 2 bytes
	IsLastFragment bool        // 1 byte
	StreamID       string      // Variable length - we'll need to adjust marshaling/unmarshaling
}

type MQTTAddr

type MQTTAddr struct {
	// contains filtered or unexported fields
}

MQTTAddr implements net.Addr for MQTT connections

func (*MQTTAddr) Network

func (a *MQTTAddr) Network() string

func (*MQTTAddr) String

func (a *MQTTAddr) String() string

type MQTTBridge

type MQTTBridge struct {
	// contains filtered or unexported fields
}

func NewMQTTBridge

func NewMQTTBridge(mqttClient mqtt.Client, logger *zap.Logger, timeout time.Duration) *MQTTBridge

func (*MQTTBridge) GetServiceInfo

func (b *MQTTBridge) GetServiceInfo() map[string]grpc.ServiceInfo

GetServiceInfo implements grpc.ServiceInfoProvider

func (*MQTTBridge) RegisterService

func (b *MQTTBridge) RegisterService(desc *grpc.ServiceDesc, impl interface{})

RegisterService implements grpc.ServiceRegistrar

type MQTTNetBridge

type MQTTNetBridge struct {
	// contains filtered or unexported fields
}

MQTTNetBridge implements net.Listener over MQTT

func NewMQTTNetBridge

func NewMQTTNetBridge(mqttClient mqtt.Client, logger *zap.Logger, bridgeID string) *MQTTNetBridge

NewMQTTNetBridge creates a new bridge that listens on a specific bridgeID

func (*MQTTNetBridge) Accept

func (b *MQTTNetBridge) Accept() (net.Conn, error)

Accept implements net.Listener.Accept

func (*MQTTNetBridge) Addr

func (b *MQTTNetBridge) Addr() net.Addr

Addr implements net.Listener.Addr

func (*MQTTNetBridge) Build added in v0.1.1

func (*MQTTNetBridge) Close

func (b *MQTTNetBridge) Close() error

Close implements net.Listener.Close

func (*MQTTNetBridge) Dial

func (b *MQTTNetBridge) Dial(ctx context.Context, targetBridgeID string) (net.Conn, error)

Dial creates a new connection to a specific bridge

func (*MQTTNetBridge) Scheme added in v0.1.1

func (b *MQTTNetBridge) Scheme() string

type MQTTNetBridgeConn

type MQTTNetBridgeConn struct {
	// contains filtered or unexported fields
}

MQTTNetBridgeConn implements net.Conn over MQTT

func (*MQTTNetBridgeConn) Close

func (c *MQTTNetBridgeConn) Close() error

func (*MQTTNetBridgeConn) LocalAddr

func (c *MQTTNetBridgeConn) LocalAddr() net.Addr

func (*MQTTNetBridgeConn) Read

func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)

Implement net.Conn interface stubs (we'll flesh these out next)

func (*MQTTNetBridgeConn) RemoteAddr

func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr

func (*MQTTNetBridgeConn) SetDeadline

func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error

func (*MQTTNetBridgeConn) SetReadDeadline

func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error

func (*MQTTNetBridgeConn) SetWriteDeadline

func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error

func (*MQTTNetBridgeConn) Write

func (c *MQTTNetBridgeConn) Write(b []byte) (n int, err error)

type MessageType

type MessageType uint8
const (
	MessageTypeData MessageType = iota
	MessageTypeStreamInit
	MessageTypeStreamEnd
	MessageTypeHeader
	MessageTypeError
)

type Session

type Session struct {
	ID          string
	ServiceName string
	LastActive  time.Time
	State       SessionState
	// contains filtered or unexported fields
}

Session represents an active MQTT-GRPC bridge session

func NewSession

func NewSession(id, serviceName string, eventHandler SessionEventHandler) *Session

NewSession creates a new session with proper initialization

func (*Session) AddStream

func (s *Session) AddStream(methodName string) (*StreamContext, error)

AddStream adds a new stream to the session

func (*Session) Close

func (s *Session) Close()

Close closes all streams and the session itself

func (*Session) CloseStream

func (s *Session) CloseStream(streamID string) error

CloseStream gracefully closes a stream

func (*Session) UpdateActivity

func (s *Session) UpdateActivity(streamID string)

UpdateActivity updates the last active timestamp for the session and stream

type SessionEvent

type SessionEvent int

SessionEvent represents different session lifecycle events

const (
	SessionEventCreated SessionEvent = iota
	SessionEventActivated
	SessionEventStreamStarted
	SessionEventStreamEnded
	SessionEventClosing
	SessionEventClosed
	SessionEventTimeout
)

type SessionEventHandler

type SessionEventHandler func(session *Session, event SessionEvent)

SessionEventHandler is called when session state changes occur

type SessionState

type SessionState int
const (
	SessionStateActive SessionState = iota
	SessionStateClosing
	SessionStateClosed
)

type StreamContext

type StreamContext struct {
	StreamID   string // Usually method name + unique identifier
	Method     string
	Created    time.Time
	LastActive time.Time
	State      StreamState
	Cancel     context.CancelFunc // For cancelling individual streams
	Context    context.Context
	Extra      interface{} // For storing the serverStream
}

StreamContext holds the context for an active stream

type StreamState

type StreamState int
const (
	StreamStateActive StreamState = iota
	StreamStateClosing
	StreamStateClosed
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL