mqtt

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2015 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QosAtMostOnce = TagQosLevel(iota)
	QosAtLeastOnce
	QosExactlyOnce
	QosInvalid
)

OoS only support QoS 0

View Source
const (
	MsgConnect = TagMessageType(iota + 1)
	MsgConnAck
	MsgPublish
	MsgPubAck
	MsgPubRec
	MsgPubRel
	MsgPubComp
	MsgSubscribe
	MsgSubAck
	MsgUnsubscribe
	MsgUnsubAck
	MsgPingReq
	MsgPingResp
	MsgDisconnect
	MsgInvalid
)

Message Type

View Source
const (
	RetCodeAccepted = TagRetCode(iota)
	RetCodeUnacceptableProtocolVersion
	RetCodeIdentifierRejected
	RetCodeServerUnavailable
	RetCodeBadUsernameOrPassword
	RetCodeNotAuthorized
	RetCodeInvalid
)

retcode

View Source
const (
	MaxPayloadSize = (1 << (4 * 7)) - 1
)

Max Payload size

View Source
const (
	SendChanLen = 16
)

Variables

This section is empty.

Functions

func ByteToUint16

func ByteToUint16(buf []byte) uint16

func ClientIdToDeviceId

func ClientIdToDeviceId(identify string) (uint64, error)

func DeviceIdToClientId

func DeviceIdToClientId(deviceid uint64) string

func Uint16ToByte

func Uint16ToByte(value uint16) []byte

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(p Provider) *Broker

func (*Broker) GetToken

func (b *Broker) GetToken(deviceid uint64) ([]byte, error)

func (*Broker) Handle

func (b *Broker) Handle(conn net.Conn)

func (*Broker) SendMessageToDevice

func (b *Broker) SendMessageToDevice(deviceid uint64, msgtype string, message []byte, timeout time.Duration) error

type BytesPayload

type BytesPayload []byte

func (BytesPayload) ReadPayload

func (p BytesPayload) ReadPayload(r io.Reader, n int) error

func (BytesPayload) Size

func (p BytesPayload) Size() int

func (BytesPayload) WritePayload

func (p BytesPayload) WritePayload(b *bytes.Buffer) error

type ConnAck

type ConnAck struct {
	Header
	ReturnCode TagRetCode
}

ConnAck represents an MQTT CONNACK message.

func (*ConnAck) Decode

func (msg *ConnAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*ConnAck) Encode

func (msg *ConnAck) Encode(w io.Writer) (err error)

type Connect

type Connect struct {
	Header
	ProtocolName               string
	ProtocolVersion            uint8
	WillRetain                 bool
	WillFlag                   bool
	CleanSession               bool
	WillQos                    TagQosLevel
	KeepAliveTimer             uint16
	ClientId                   string
	WillTopic, WillMessage     string
	UsernameFlag, PasswordFlag bool
	Username, Password         string
}

Connect represents an MQTT CONNECT message.

func (*Connect) Decode

func (msg *Connect) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*Connect) Encode

func (msg *Connect) Encode(w io.Writer) (err error)

type Connection

type Connection struct {
	Mgr             *Manager
	DeviceId        uint64
	Conn            net.Conn
	SendChan        chan Message
	MessageId       uint16
	MessageWaitChan map[uint16]chan error
	KeepAlive       uint16
	LastHbTime      int64
	Token           []byte
}

func NewConnection

func NewConnection(conn net.Conn, mgr *Manager) *Connection

func (*Connection) Close

func (c *Connection) Close()

func (*Connection) Publish

func (c *Connection) Publish(msg Message, timeout time.Duration) error

Publish will publish a message , and return a chan to wait for completion.

func (*Connection) RcvMsgFromClient

func (c *Connection) RcvMsgFromClient()

func (*Connection) SendMsgToClient

func (c *Connection) SendMsgToClient()

func (*Connection) Submit

func (c *Connection) Submit(msg Message)

func (*Connection) ValidateToken

func (c *Connection) ValidateToken(token []byte) error

type Disconnect

type Disconnect struct {
	Header
}

Disconnect represents an MQTT DISCONNECT message.

func (*Disconnect) Decode

func (msg *Disconnect) Decode(r io.Reader, hdr Header, packetRemaining int32) error

func (*Disconnect) Encode

func (msg *Disconnect) Encode(w io.Writer) error
type Header struct {
	DupFlag  bool
	QosLevel TagQosLevel
	Retain   bool
}

message fix header

func (*Header) Decode

func (hdr *Header) Decode(r io.Reader) (msgType TagMessageType, remainingLength int32, err error)

func (*Header) Encode

func (hdr *Header) Encode(w io.Writer, msgType TagMessageType, remainingLength int32) error

func (*Header) EncodeInto

func (hdr *Header) EncodeInto(buf *bytes.Buffer, msgType TagMessageType, remainingLength int32) error

type Manager

type Manager struct {
	Provider Provider
	CxtMutex sync.RWMutex
	IdToConn map[uint64]*Connection
}

func NewManager

func NewManager(p Provider) *Manager

func (*Manager) AddConn

func (m *Manager) AddConn(id uint64, c *Connection)

func (*Manager) CleanWorker

func (m *Manager) CleanWorker()

func (*Manager) DelConn

func (m *Manager) DelConn(id uint64)

func (*Manager) GetToken

func (m *Manager) GetToken(deviceid uint64) ([]byte, error)

func (*Manager) NewConn

func (m *Manager) NewConn(conn net.Conn)

func (*Manager) PublishMessage2Device

func (m *Manager) PublishMessage2Device(deviceid uint64, msg *Publish, timeout time.Duration) error

func (*Manager) PublishMessage2Server

func (m *Manager) PublishMessage2Server(deviceid uint64, msg *Publish) error

type Message

type Message interface {
	Encode(w io.Writer) error
	Decode(r io.Reader, hdr Header, packetRemaining int32) error
}

message interface

func DecodeOneMessage

func DecodeOneMessage(r io.Reader) (msg Message, err error)

DecodeOneMessage decodes one message from r. config provides specifics on how to decode messages, nil indicates that the DefaultDecoderConfig should be used.

func NewMessage

func NewMessage(msgType TagMessageType) (msg Message, err error)

type Payload

type Payload interface {
	// Size returns the number of bytes that WritePayload will write.
	Size() int

	// WritePayload writes the payload data to w. Implementations must write
	// Size() bytes of data, but it is *not* required to do so prior to
	// returning. Size() bytes must have been written to w prior to another
	// message being encoded to the underlying connection.
	WritePayload(b *bytes.Buffer) error

	// ReadPayload reads the payload data from r (r will EOF at the end of the
	// payload). It is *not* required for r to have been consumed prior to this
	// returning. r must have been consumed completely prior to another message
	// being decoded from the underlying connection.
	ReadPayload(r io.Reader, n int) error
}

Payload is the interface for Publish payloads. Typically the BytesPayload implementation will be sufficient for small payloads whose full contents will exist in memory. However, other implementations can read or write payloads requiring them holding their complete contents in memory.

type PingReq

type PingReq struct {
	Header
}

PingReq represents an MQTT PINGREQ message.

func (*PingReq) Decode

func (msg *PingReq) Decode(r io.Reader, hdr Header, packetRemaining int32) error

func (*PingReq) Encode

func (msg *PingReq) Encode(w io.Writer) error

type PingResp

type PingResp struct {
	Header
}

PingResp represents an MQTT PINGRESP message.

func (*PingResp) Decode

func (msg *PingResp) Decode(r io.Reader, hdr Header, packetRemaining int32) error

func (*PingResp) Encode

func (msg *PingResp) Encode(w io.Writer) error

type Provider

type Provider interface {
	ValidateDeviceToken(deviceid uint64, token []byte) error
	OnDeviceOnline(args rpcs.ArgsGetOnline) error
	OnDeviceOffline(deviceid uint64) error
	OnDeviceHeartBeat(deviceid uint64) error
	OnDeviceMessage(deviceid uint64, msgtype string, message []byte)
}

type PubAck

type PubAck struct {
	Header
	MessageId uint16
}

PubAck represents an MQTT PUBACK message.

func (*PubAck) Decode

func (msg *PubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*PubAck) Encode

func (msg *PubAck) Encode(w io.Writer) error

type PubComp

type PubComp struct {
	Header
	MessageId uint16
}

PubComp represents an MQTT PUBCOMP message.

func (*PubComp) Decode

func (msg *PubComp) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*PubComp) Encode

func (msg *PubComp) Encode(w io.Writer) error

type PubRec

type PubRec struct {
	Header
	MessageId uint16
}

PubRec represents an MQTT PUBREC message.

func (*PubRec) Decode

func (msg *PubRec) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*PubRec) Encode

func (msg *PubRec) Encode(w io.Writer) error

type PubRel

type PubRel struct {
	Header
	MessageId uint16
}

PubRel represents an MQTT PUBREL message.

func (*PubRel) Decode

func (msg *PubRel) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*PubRel) Encode

func (msg *PubRel) Encode(w io.Writer) error

type Publish

type Publish struct {
	Header
	TopicName string
	MessageId uint16
	Payload   Payload
}

Publish represents an MQTT PUBLISH message.

func (*Publish) Decode

func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*Publish) Encode

func (msg *Publish) Encode(w io.Writer) (err error)

type ResponseType

type ResponseType struct {
	SendTime    uint8
	PublishType uint8
	DataType    string
}

type SubAck

type SubAck struct {
	Header
	MessageId uint16
	TopicsQos []TagQosLevel
}

SubAck represents an MQTT SUBACK message.

func (*SubAck) Decode

func (msg *SubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*SubAck) Encode

func (msg *SubAck) Encode(w io.Writer) (err error)

type Subscribe

type Subscribe struct {
	Header
	MessageId uint16
	Topics    []TopicQos
}

Subscribe represents an MQTT SUBSCRIBE message.

func (*Subscribe) Decode

func (msg *Subscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*Subscribe) Encode

func (msg *Subscribe) Encode(w io.Writer) (err error)

type TagMessageType

type TagMessageType uint8

func (TagMessageType) IsValid

func (msg TagMessageType) IsValid() bool

type TagQosLevel

type TagQosLevel uint8

func (TagQosLevel) HasId

func (qos TagQosLevel) HasId() bool

func (TagQosLevel) IsAtLeastOnce

func (qos TagQosLevel) IsAtLeastOnce() bool

func (TagQosLevel) IsExactlyOnce

func (qos TagQosLevel) IsExactlyOnce() bool

func (TagQosLevel) IsValid

func (qos TagQosLevel) IsValid() bool

type TagRetCode

type TagRetCode uint8

func (TagRetCode) IsValid

func (rc TagRetCode) IsValid() bool

type TopicQos

type TopicQos struct {
	Topic string
	Qos   TagQosLevel
}

type UnsubAck

type UnsubAck struct {
	Header
	MessageId uint16
}

UnsubAck represents an MQTT UNSUBACK message.

func (*UnsubAck) Decode

func (msg *UnsubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*UnsubAck) Encode

func (msg *UnsubAck) Encode(w io.Writer) error

type Unsubscribe

type Unsubscribe struct {
	Header
	MessageId uint16
	Topics    []string
}

Unsubscribe represents an MQTT UNSUBSCRIBE message.

func (*Unsubscribe) Decode

func (msg *Unsubscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error)

func (*Unsubscribe) Encode

func (msg *Unsubscribe) Encode(w io.Writer) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL