Documentation ¶
Overview ¶
Implementation of MQTT V3.1 encoding and decoding.
See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html for the MQTT protocol specification. This package does not implement the semantics of MQTT, but purely the encoding and decoding of its messages.
Decoding Messages:
Use the DecodeOneMessage function to read a Message from an io.Reader, it will return a Message value. The function can be implemented using the public API of this package if more control is required. For example:
for { msg, err := mqtt.DecodeOneMessage(conn, nil) if err != nil { // handle err } switch msg := msg.(type) { case *Connect: // ... case *Publish: // ... // etc. } }
Encoding Messages:
Create a message value, and use its Encode method to write it to an io.Writer. For example:
someData := []byte{1, 2, 3} msg := &Publish{ Header: { DupFlag: false, QosLevel: QosAtLeastOnce, Retain: false, }, TopicName: "a/b", MessageId: 10, Payload: BytesPayload(someData), } if err := msg.Encode(conn); err != nil { // handle err }
Advanced PUBLISH payload handling:
The default behaviour for decoding PUBLISH payloads, and most common way to supply payloads for encoding, is the BytesPayload, which is a []byte derivative.
More complex handling is possible by implementing the Payload interface, which can be injected into DecodeOneMessage via the `config` parameter, or into an outgoing Publish message via its Payload field. Potential benefits of this include:
* Data can be (un)marshalled directly on a connection, without an unecessary round-trip via bytes.Buffer.
* Data can be streamed directly on readers/writers (e.g files, other connections, pipes) without the requirement to buffer an entire message payload in memory at once.
The limitations of these streaming features are:
* When encoding a payload, the encoded size of the payload must be known and declared upfront.
* The payload size (and PUBLISH variable header) can be no more than 256MiB minus 1 byte. This is a specified limitation of MQTT v3.1 itself.
Index ¶
- Constants
- type BytesPayload
- type ConnAck
- type Connect
- type DecoderConfig
- type DefaultDecoderConfig
- type Disconnect
- type Header
- type Message
- type MessageType
- type Payload
- type PingReq
- type PingResp
- type PubAck
- type PubComp
- type PubRec
- type PubRel
- type Publish
- type QosLevel
- type ReturnCode
- type StreamedPayload
- type SubAck
- type Subscribe
- type TopicQos
- type UnsubAck
- type Unsubscribe
- type ValueConfig
Constants ¶
const ( MsgConnect = MessageType(iota + 1) MsgConnAck MsgPublish MsgPubAck MsgPubRec MsgPubRel MsgPubComp MsgSubscribe MsgSubAck MsgUnsubscribe MsgUnsubAck MsgPingReq MsgPingResp MsgDisconnect )
MessageType constants.
const ( QosAtMostOnce = QosLevel(iota) QosAtLeastOnce QosExactlyOnce )
const ( RetCodeAccepted = ReturnCode(iota) RetCodeUnacceptableProtocolVersion RetCodeIdentifierRejected RetCodeBadUsernameOrPassword RetCodeNotAuthorized )
const (
// Maximum payload size in bytes (256MiB - 1B).
MaxPayloadSize = (1 << (4 * 7)) - 1
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BytesPayload ¶
type BytesPayload []byte
BytesPayload reads/writes a plain slice of bytes.
func (BytesPayload) ReadPayload ¶
func (p BytesPayload) ReadPayload(r io.Reader) error
func (BytesPayload) Size ¶
func (p BytesPayload) Size() int
func (BytesPayload) WritePayload ¶
func (p BytesPayload) WritePayload(w io.Writer) error
type ConnAck ¶
type ConnAck struct { Header SessionPresent bool ReturnCode ReturnCode }
ConnAck represents an MQTT CONNACK message.
type Connect ¶
type Connect struct { Header ProtocolName string ProtocolVersion uint8 WillRetain bool WillFlag bool CleanSession bool WillQos QosLevel KeepAliveTimer uint16 ClientId string WillTopic, WillMessage string UsernameFlag, PasswordFlag bool Username, Password string }
Connect represents an MQTT CONNECT message.
type DecoderConfig ¶
type DecoderConfig interface { // MakePayload returns a Payload for the given Publish message. r is a Reader // that will read the payload data, and n is the number of bytes in the // payload. The Payload.ReadPayload method is called on the returned payload // by the decoding process. MakePayload(msg *Publish, r io.Reader, n int) (Payload, error) }
DecoderConfig provides configuration for decoding messages.
type DefaultDecoderConfig ¶
type DefaultDecoderConfig struct{}
func (DefaultDecoderConfig) MakePayload ¶
type Disconnect ¶
type Disconnect struct {
Header
}
Disconnect represents an MQTT DISCONNECT message.
func (*Disconnect) Decode ¶
func (msg *Disconnect) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) error
type Header ¶
Header contains the common attributes of all messages. Some attributes are not applicable to some message types.
type Message ¶
type Message interface { // Encode writes the message to w. Encode(w io.Writer) error // Decode reads the message extended headers and payload from // r. Typically the values for hdr and packetRemaining will // be returned from Header.Decode. Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) error }
Message is the interface that all MQTT messages implement.
func DecodeOneMessage ¶
func DecodeOneMessage(r io.Reader, config DecoderConfig) (msg Message, err error)
DecodeOneMessage decodes one message from r. config provides specifics on how to decode messages, nil indicates that the DefaultDecoderConfig should be used.
func NewMessage ¶
func NewMessage(msgType MessageType) (msg Message, err error)
NewMessage creates an instance of a Message value for the given message type. An error is returned if msgType is invalid.
type MessageType ¶
type MessageType uint8
func (MessageType) IsValid ¶
func (mt MessageType) IsValid() bool
IsValid returns true if the MessageType value is valid.
type Payload ¶
type Payload interface { // Size returns the number of bytes that WritePayload will write. Size() int // WritePayload writes the payload data to w. Implementations must write // Size() bytes of data, but it is *not* required to do so prior to // returning. Size() bytes must have been written to w prior to another // message being encoded to the underlying connection. WritePayload(w io.Writer) error // ReadPayload reads the payload data from r (r will EOF at the end of the // payload). It is *not* required for r to have been consumed prior to this // returning. r must have been consumed completely prior to another message // being decoded from the underlying connection. ReadPayload(r io.Reader) error }
Payload is the interface for Publish payloads. Typically the BytesPayload implementation will be sufficient for small payloads whose full contents will exist in memory. However, other implementations can read or write payloads requiring them holding their complete contents in memory.
type PingReq ¶
type PingReq struct {
Header
}
PingReq represents an MQTT PINGREQ message.
type PingResp ¶
type PingResp struct {
Header
}
PingResp represents an MQTT PINGRESP message.
type ReturnCode ¶
type ReturnCode uint8
func (ReturnCode) IsValid ¶
func (rc ReturnCode) IsValid() bool
type StreamedPayload ¶
type StreamedPayload struct { // N indicates payload size to the encoder. This many bytes will be read from // the reader when encoding. The number of bytes in the payload will be // stored here when decoding. N int // EncodingSource is used to copy data from when encoding a Publish message // onto the wire. This can be EncodingSource io.Reader // DecodingSink is used to copy data to when decoding a Publish message from // the wire. This can be nil if the payload is only being used for encoding. DecodingSink io.Writer }
StreamedPayload writes payload data from reader, or reads payload data into a writer.
func (*StreamedPayload) ReadPayload ¶
func (p *StreamedPayload) ReadPayload(r io.Reader) error
func (*StreamedPayload) Size ¶
func (p *StreamedPayload) Size() int
func (*StreamedPayload) WritePayload ¶
func (p *StreamedPayload) WritePayload(w io.Writer) error
type Unsubscribe ¶
Unsubscribe represents an MQTT UNSUBSCRIBE message.
func (*Unsubscribe) Decode ¶
func (msg *Unsubscribe) Decode(r io.Reader, hdr Header, packetRemaining int32, config DecoderConfig) (err error)
type ValueConfig ¶
type ValueConfig struct {
Payload Payload
}
ValueConfig always returns the given Payload when MakePayload is called.