Documentation ¶
Index ¶
- Constants
- Variables
- func BuildTopicPath(pkg, service, method, sessionID, direction string) string
- func NewLargeMessage(size int) *bytes.Buffer
- func ParseTopicPath(topic string) (pkg, service, method, sessionID, direction string, err error)
- func ReleaseLargeMessage(buf *bytes.Buffer)
- func SetMaxFragmentSize(size int)
- func WithConnectionTimeout(timeout time.Duration) option
- func WithLogger(logger *zap.Logger) option
- func WithMQTTClient(client mqtt.Client) option
- type BridgeError
- type Frame
- type FrameOption
- type Header
- type MQTTAddr
- type MQTTBridge
- type MQTTNetBridge
- func (b *MQTTNetBridge) Accept() (net.Conn, error)
- func (b *MQTTNetBridge) Addr() net.Addr
- func (b *MQTTNetBridge) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error)
- func (b *MQTTNetBridge) Close() error
- func (b *MQTTNetBridge) Dial(ctx context.Context, targetBridgeID string) (net.Conn, error)
- func (b *MQTTNetBridge) Scheme() string
- type MQTTNetBridgeConn
- func (c *MQTTNetBridgeConn) Close() error
- func (c *MQTTNetBridgeConn) LocalAddr() net.Addr
- func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)
- func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr
- func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error
- func (c *MQTTNetBridgeConn) Write(b []byte) (n int, err error)
- type MessageType
- type Session
- type SessionEvent
- type SessionEventHandler
- type SessionState
- type StreamContext
- type StreamState
Constants ¶
const ( MinHeaderSize = 16 // Fixed header size without StreamID MaxStreamIDSize = 256 // Maximum length for StreamID MinSequenceNum = 1 // Minimum sequence number MaxSequenceNum uint64 = 1<<64 - 1 // Maximum sequence number (uint64 max) )
Variables ¶
var ( MaxFragmentSize = 10 * 1024 // 10KB per fragment HeaderSize = 16 // Fixed header size )
Functions ¶
func BuildTopicPath ¶
BuildTopicPath creates an MQTT topic path for a given service and method
func NewLargeMessage ¶
Add a new helper for creating large messages
func ParseTopicPath ¶
ParseTopicPath extracts components from an MQTT topic path
func ReleaseLargeMessage ¶
Add a helper to return large message buffers to the pool
func SetMaxFragmentSize ¶
func SetMaxFragmentSize(size int)
func WithConnectionTimeout ¶
func WithLogger ¶
func WithMQTTClient ¶
Types ¶
type BridgeError ¶
func (*BridgeError) Error ¶
func (e *BridgeError) Error() string
func (*BridgeError) Unwrap ¶
func (e *BridgeError) Unwrap() error
type Frame ¶
Frame represents a complete protocol frame including header and payload
func FrameMessage ¶
func FrameMessage(data []byte, seqNum uint64, msgType MessageType) []Frame
FrameMessage splits a large message into frames
func NewFrame ¶
func NewFrame(msgType MessageType, seqNum uint64, data []byte, opts ...FrameOption) (*Frame, error)
NewFrame creates a new Frame with validated parameters
func UnmarshalFrame ¶
UnmarshalFrame converts a byte slice into a Frame
type FrameOption ¶
FrameOption defines options for frame creation
func WithFragmentation ¶
func WithFragmentation(fragmentID uint16, total uint16, seq uint16, isLast bool) FrameOption
WithFragmentation sets fragmentation parameters for a frame
func WithStreamID ¶
func WithStreamID(streamID string) FrameOption
WithStreamID sets the StreamID for a frame
type MQTTAddr ¶
type MQTTAddr struct {
// contains filtered or unexported fields
}
MQTTAddr implements net.Addr for MQTT connections
type MQTTBridge ¶
type MQTTBridge struct {
// contains filtered or unexported fields
}
func NewMQTTBridge ¶
func (*MQTTBridge) GetServiceInfo ¶
func (b *MQTTBridge) GetServiceInfo() map[string]grpc.ServiceInfo
GetServiceInfo implements grpc.ServiceInfoProvider
func (*MQTTBridge) RegisterService ¶
func (b *MQTTBridge) RegisterService(desc *grpc.ServiceDesc, impl interface{})
RegisterService implements grpc.ServiceRegistrar
type MQTTNetBridge ¶
type MQTTNetBridge struct {
// contains filtered or unexported fields
}
MQTTNetBridge implements net.Listener over MQTT
func NewMQTTNetBridge ¶
NewMQTTNetBridge creates a new bridge that listens on a specific bridgeID
func (*MQTTNetBridge) Accept ¶
func (b *MQTTNetBridge) Accept() (net.Conn, error)
Accept implements net.Listener.Accept
func (*MQTTNetBridge) Addr ¶
func (b *MQTTNetBridge) Addr() net.Addr
Addr implements net.Listener.Addr
func (*MQTTNetBridge) Build ¶ added in v0.1.1
func (b *MQTTNetBridge) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error)
func (*MQTTNetBridge) Close ¶
func (b *MQTTNetBridge) Close() error
Close implements net.Listener.Close
func (*MQTTNetBridge) Scheme ¶ added in v0.1.1
func (b *MQTTNetBridge) Scheme() string
type MQTTNetBridgeConn ¶
type MQTTNetBridgeConn struct {
// contains filtered or unexported fields
}
MQTTNetBridgeConn implements net.Conn over MQTT
func (*MQTTNetBridgeConn) Close ¶
func (c *MQTTNetBridgeConn) Close() error
func (*MQTTNetBridgeConn) LocalAddr ¶
func (c *MQTTNetBridgeConn) LocalAddr() net.Addr
func (*MQTTNetBridgeConn) Read ¶
func (c *MQTTNetBridgeConn) Read(b []byte) (n int, err error)
Implement net.Conn interface stubs (we'll flesh these out next)
func (*MQTTNetBridgeConn) RemoteAddr ¶
func (c *MQTTNetBridgeConn) RemoteAddr() net.Addr
func (*MQTTNetBridgeConn) SetDeadline ¶
func (c *MQTTNetBridgeConn) SetDeadline(t time.Time) error
func (*MQTTNetBridgeConn) SetReadDeadline ¶
func (c *MQTTNetBridgeConn) SetReadDeadline(t time.Time) error
func (*MQTTNetBridgeConn) SetWriteDeadline ¶
func (c *MQTTNetBridgeConn) SetWriteDeadline(t time.Time) error
type MessageType ¶
type MessageType uint8
const ( MessageTypeData MessageType = iota MessageTypeStreamInit MessageTypeStreamEnd MessageTypeHeader MessageTypeError )
type Session ¶
type Session struct { ID string ServiceName string LastActive time.Time State SessionState // contains filtered or unexported fields }
Session represents an active MQTT-GRPC bridge session
func NewSession ¶
func NewSession(id, serviceName string, eventHandler SessionEventHandler) *Session
NewSession creates a new session with proper initialization
func (*Session) AddStream ¶
func (s *Session) AddStream(methodName string) (*StreamContext, error)
AddStream adds a new stream to the session
func (*Session) CloseStream ¶
CloseStream gracefully closes a stream
func (*Session) UpdateActivity ¶
UpdateActivity updates the last active timestamp for the session and stream
type SessionEvent ¶
type SessionEvent int
SessionEvent represents different session lifecycle events
const ( SessionEventCreated SessionEvent = iota SessionEventActivated SessionEventStreamStarted SessionEventStreamEnded SessionEventClosing SessionEventClosed SessionEventTimeout )
type SessionEventHandler ¶
type SessionEventHandler func(session *Session, event SessionEvent)
SessionEventHandler is called when session state changes occur
type SessionState ¶
type SessionState int
const ( SessionStateActive SessionState = iota SessionStateClosing SessionStateClosed )
type StreamContext ¶
type StreamContext struct { StreamID string // Usually method name + unique identifier Method string Created time.Time LastActive time.Time State StreamState Cancel context.CancelFunc // For cancelling individual streams Context context.Context Extra interface{} // For storing the serverStream }
StreamContext holds the context for an active stream
type StreamState ¶
type StreamState int
const ( StreamStateActive StreamState = iota StreamStateClosing StreamStateClosed )