mqtt

package
v0.0.0-...-01fdce4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2019 License: BSD-3-Clause Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SEND_WILL = uint8(iota)
	DONT_SEND_WILL
)
View Source
const (
	PENDING_PUB = uint8(iota + 1)
	PENDING_ACK
)
View Source
const (
	CONNECT = uint8(iota + 1)
	CONNACK
	PUBLISH
	PUBACK
	PUBREC
	PUBREL
	PUBCOMP
	SUBSCRIBE
	SUBACK
	UNSUBSCRIBE
	UNSUBACK
	PINGREQ
	PINGRESP
	DISCONNECT
)

Types

View Source
const (
	ACCEPTED = uint8(iota)
	UNACCEPTABLE_PROTOCOL_VERSION
	IDENTIFIER_REJECTED
	SERVER_UNAVAILABLE
	BAD_USERNAME_OR_PASSWORD
	NOT_AUTHORIZED
)

Variables

View Source
var G_clients map[string]*ClientRep = make(map[string]*ClientRep)

Glabal status

View Source
var G_clients_lock *sync.Mutex = new(sync.Mutex)
View Source
var G_messages map[uint64]*MqttMessage = make(map[uint64]*MqttMessage)

InternalId -> Message FIXME: Add code to store G_messages to disk

View Source
var G_messages_lock *sync.Mutex = new(sync.Mutex)
View Source
var G_subs map[string]map[string]uint8 = make(map[string]map[string]uint8)

Map topic => sub sub is implemented as map, key is client_id, value is qos

View Source
var G_subs_lock *sync.Mutex = new(sync.Mutex)
View Source
var G_topics_lockk *sync.Mutex = new(sync.Mutex)
View Source
var G_topicss map[string]*Topic = make(map[string]*Topic)
View Source
var NextClientMessageId map[string]uint16 = make(map[string]uint16)

Functions

func CheckTimeout

func CheckTimeout(client *ClientRep)

Checking timeout

func Deliver

func Deliver(dest_client_id string, dest_qos uint8, msg *MqttMessage)

func DeliverMessage

func DeliverMessage(dest_client_id string, qos uint8, msg *MqttMessage)

Real heavy lifting jobs for delivering message

func DeliverOnConnection

func DeliverOnConnection(client_id string)

func Encode

func Encode(mqtt *Mqtt) ([]byte, error)

func ForceDisconnect

func ForceDisconnect(client *ClientRep, lock *sync.Mutex, send_will uint8)

func GetNextMessageInternalId

func GetNextMessageInternalId() uint64

func HandleConnect

func HandleConnect(mqtt *Mqtt, conn *net.Conn, client **ClientRep)

Handle CONNECT

func HandleDisconnect

func HandleDisconnect(mqtt *Mqtt, conn *net.Conn, client **ClientRep)

func HandlePingreq

func HandlePingreq(mqtt *Mqtt, conn *net.Conn, client **ClientRep)

func HandlePuback

func HandlePuback(mqtt *Mqtt, conn *net.Conn, client **ClientRep)

Handle PUBACK

func HandlePublish

func HandlePublish(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
Handle PUBLISH

FIXME: support qos = 2

func HandleSubscribe

func HandleSubscribe(mqtt *Mqtt, conn *net.Conn, client **ClientRep)

func HandleUnsubscribe

func HandleUnsubscribe(mqtt *Mqtt, conn *net.Conn, client **ClientRep)

func MessageTypeStr

func MessageTypeStr(mt uint8) string

func MqttSendToClient

func MqttSendToClient(bytes []byte, conn *net.Conn, lock *sync.Mutex)

This is the main place to change if we need to use channel rather than lock

func NextOutMessageIdForClient

func NextOutMessageIdForClient(client_id string) uint16

func PublishMessage

func PublishMessage(mqtt_msg *MqttMessage)

func RecoverFromRedis

func RecoverFromRedis()

This function should be called upon starting up. It will try to recover global status(G_subs, etc) from Redis.

func RemoveAllSubscriptionsOnConnect

func RemoveAllSubscriptionsOnConnect(client_id string)

On connection, if clean session is set, call this method to clear all connections. This is the senario when previous CONNECT didn't set clean session bit but current one does

func RetryDeliver

func RetryDeliver(sleep uint64, dest_client_id string, qos uint8, msg *MqttMessage)

func SendConnack

func SendConnack(rc uint8, conn *net.Conn, lock *sync.Mutex)

func SendPingresp

func SendPingresp(conn *net.Conn, lock *sync.Mutex)

func SendPuback

func SendPuback(msg_id uint16, conn *net.Conn, lock *sync.Mutex)

func SendSuback

func SendSuback(msg_id uint16, qos_list []uint8, conn *net.Conn, lock *sync.Mutex)

func SendUnsuback

func SendUnsuback(msg_id uint16, conn *net.Conn, lock *sync.Mutex)

Types

type ClientRep

type ClientRep struct {
	ClientId      string
	Conn          *net.Conn
	WriteLock     *sync.Mutex
	LastTime      int64 // Last Unix timestamp when recieved message from this client
	Shuttingdown  chan uint8
	Subscriptions map[string]uint8
	Mqtt          *Mqtt
	Disconnected  bool
}

func CreateClientRep

func CreateClientRep(client_id string, conn *net.Conn, mqtt *Mqtt) *ClientRep

func (*ClientRep) UpdateLastTime

func (cr *ClientRep) UpdateLastTime()

type ConnectFlags

type ConnectFlags struct {
	UsernameFlag, PasswordFlag, WillRetain, WillFlag, CleanSession bool
	WillQos                                                        uint8
}

func (*ConnectFlags) Show

func (flags *ConnectFlags) Show()

type ConnectInfo

type ConnectInfo struct {
	Protocol     string // Must be 'MQIsdp' for now
	Version      uint8
	UsernameFlag bool
	PasswordFlag bool
	WillRetain   bool
	WillQos      uint8
	WillFlag     bool
	CleanSession bool
	Keepalive    uint16
}

func (*ConnectInfo) Show

func (con_info *ConnectInfo) Show()

type FixedHeader

type FixedHeader struct {
	MessageType uint8
	DupFlag     bool
	Retain      bool
	QosLevel    uint8
	Length      uint32
}

func ReadCompleteCommand

func ReadCompleteCommand(conn *net.Conn) (*FixedHeader, []byte)

func ReadFixedHeader

func ReadFixedHeader(conn *net.Conn) *FixedHeader

func (*FixedHeader) Show

func (header *FixedHeader) Show()

type FlyingMessage

type FlyingMessage struct {
	Qos               uint8 // the Qos in effect
	DestClientId      string
	MessageInternalId uint64 // The MqttMessage of interest
	Status            uint8  // The status of this message, like PENDING_PUB(deliver occured
	// when client if offline), PENDING_ACK, etc
	ClientMessageId uint16 // The message id to be used in MQTT packet
}

This is the type represents a message should be delivered to specific client

func CreateFlyingMessage

func CreateFlyingMessage(dest_id string, message_internal_id uint64,
	qos uint8, status uint8, message_id uint16) *FlyingMessage

type Mqtt

type Mqtt struct {
	FixedHeader                                                                   *FixedHeader
	ProtocolName, TopicName, ClientId, WillTopic, WillMessage, Username, Password string
	ProtocolVersion                                                               uint8
	ConnectFlags                                                                  *ConnectFlags
	KeepAliveTimer, MessageId                                                     uint16
	Data                                                                          []byte
	Topics                                                                        []string
	Topics_qos                                                                    []uint8
	ReturnCode                                                                    uint8
}

func CreateMqtt

func CreateMqtt(msg_type uint8) *Mqtt

func Decode

func Decode(buf []byte) (*Mqtt, error)

func DecodeAfterFixedHeader

func DecodeAfterFixedHeader(fixed_header *FixedHeader, buf []byte) (*Mqtt, error)

func (*Mqtt) Show

func (mqtt *Mqtt) Show()

type MqttMessage

type MqttMessage struct {
	Topic          string
	Payload        string
	Qos            uint8
	SenderClientId string
	MessageId      uint16
	InternalId     uint64
	CreatedAt      int64
	Retain         bool
}

This is the type represents a message received from publisher. FlyingMessage(message should be delivered to specific subscribers) reference MqttMessage

func CreateMqttMessage

func CreateMqttMessage(topic, payload, sender_id string,
	qos uint8, message_id uint16,
	created_at int64, retain bool) *MqttMessage

func GetMqttMessageById

func GetMqttMessageById(internal_id uint64) *MqttMessage

This is thread-safe

func (*MqttMessage) RedisKey

func (msg *MqttMessage) RedisKey() string

func (*MqttMessage) Show

func (msg *MqttMessage) Show()

func (*MqttMessage) Store

func (msg *MqttMessage) Store()

type RedisClient

type RedisClient struct {
	Conn *redis.Conn
}
var G_redis_client *RedisClient = StartRedisClient()

func StartRedisClient

func StartRedisClient() *RedisClient

func (*RedisClient) AddFlyingMessage

func (client *RedisClient) AddFlyingMessage(dest_id string,
	fly_msg *FlyingMessage)

func (*RedisClient) Delete

func (client *RedisClient) Delete(key string)

func (*RedisClient) Expire

func (client *RedisClient) Expire(key string, sec uint64)

func (*RedisClient) Fetch

func (client *RedisClient) Fetch(key string, value interface{}) int

func (*RedisClient) FetchNoLock

func (client *RedisClient) FetchNoLock(key string, value interface{}) int

func (*RedisClient) GetFlyingMessagesForClient

func (client *RedisClient) GetFlyingMessagesForClient(client_id string) *map[uint16]FlyingMessage

func (*RedisClient) GetRetainMessage

func (client *RedisClient) GetRetainMessage(topic string) *MqttMessage

func (*RedisClient) GetSubsClients

func (client *RedisClient) GetSubsClients() []string

func (*RedisClient) IsFlyingMessagePendingAck

func (client *RedisClient) IsFlyingMessagePendingAck(client_id string, message_id uint16) bool

func (*RedisClient) Reconnect

func (client *RedisClient) Reconnect()

func (*RedisClient) RemoveAllFlyingMessagesForClient

func (client *RedisClient) RemoveAllFlyingMessagesForClient(client_id string)

func (*RedisClient) SetFlyingMessagesForClient

func (client *RedisClient) SetFlyingMessagesForClient(client_id string,
	messages *map[uint16]FlyingMessage)

func (*RedisClient) SetRetainMessage

func (client *RedisClient) SetRetainMessage(topic string, msg *MqttMessage)

func (*RedisClient) Store

func (client *RedisClient) Store(key string, value interface{})

func (*RedisClient) StoreNoLock

func (client *RedisClient) StoreNoLock(key string, value interface{})

type Topic

type Topic struct {
	Content         string
	RetainedMessage *MqttMessage
}

func CreateTopic

func CreateTopic(content string) *Topic

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL