Documentation ¶
Index ¶
- Constants
- Variables
- func CheckTimeout(client *ClientRep)
- func Deliver(dest_client_id string, dest_qos uint8, msg *MqttMessage)
- func DeliverMessage(dest_client_id string, qos uint8, msg *MqttMessage)
- func DeliverOnConnection(client_id string)
- func Encode(mqtt *Mqtt) ([]byte, error)
- func ForceDisconnect(client *ClientRep, lock *sync.Mutex, send_will uint8)
- func GetNextMessageInternalId() uint64
- func HandleConnect(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
- func HandleDisconnect(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
- func HandlePingreq(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
- func HandlePuback(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
- func HandlePublish(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
- func HandleSubscribe(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
- func HandleUnsubscribe(mqtt *Mqtt, conn *net.Conn, client **ClientRep)
- func MessageTypeStr(mt uint8) string
- func MqttSendToClient(bytes []byte, conn *net.Conn, lock *sync.Mutex)
- func NextOutMessageIdForClient(client_id string) uint16
- func PublishMessage(mqtt_msg *MqttMessage)
- func RecoverFromRedis()
- func RemoveAllSubscriptionsOnConnect(client_id string)
- func RetryDeliver(sleep uint64, dest_client_id string, qos uint8, msg *MqttMessage)
- func SendConnack(rc uint8, conn *net.Conn, lock *sync.Mutex)
- func SendPingresp(conn *net.Conn, lock *sync.Mutex)
- func SendPuback(msg_id uint16, conn *net.Conn, lock *sync.Mutex)
- func SendSuback(msg_id uint16, qos_list []uint8, conn *net.Conn, lock *sync.Mutex)
- func SendUnsuback(msg_id uint16, conn *net.Conn, lock *sync.Mutex)
- type ClientRep
- type ConnectFlags
- type ConnectInfo
- type FixedHeader
- type FlyingMessage
- type Mqtt
- type MqttMessage
- type RedisClient
- func (client *RedisClient) AddFlyingMessage(dest_id string, fly_msg *FlyingMessage)
- func (client *RedisClient) Delete(key string)
- func (client *RedisClient) Expire(key string, sec uint64)
- func (client *RedisClient) Fetch(key string, value interface{}) int
- func (client *RedisClient) FetchNoLock(key string, value interface{}) int
- func (client *RedisClient) GetFlyingMessagesForClient(client_id string) *map[uint16]FlyingMessage
- func (client *RedisClient) GetRetainMessage(topic string) *MqttMessage
- func (client *RedisClient) GetSubsClients() []string
- func (client *RedisClient) IsFlyingMessagePendingAck(client_id string, message_id uint16) bool
- func (client *RedisClient) Reconnect()
- func (client *RedisClient) RemoveAllFlyingMessagesForClient(client_id string)
- func (client *RedisClient) SetFlyingMessagesForClient(client_id string, messages *map[uint16]FlyingMessage)
- func (client *RedisClient) SetRetainMessage(topic string, msg *MqttMessage)
- func (client *RedisClient) Store(key string, value interface{})
- func (client *RedisClient) StoreNoLock(key string, value interface{})
- type Topic
Constants ¶
const ( SEND_WILL = uint8(iota) DONT_SEND_WILL )
const ( PENDING_PUB = uint8(iota + 1) PENDING_ACK )
const ( CONNECT = uint8(iota + 1) CONNACK PUBLISH PUBACK PUBREC PUBREL PUBCOMP SUBSCRIBE SUBACK UNSUBSCRIBE UNSUBACK PINGREQ PINGRESP DISCONNECT )
Types
const ( ACCEPTED = uint8(iota) UNACCEPTABLE_PROTOCOL_VERSION IDENTIFIER_REJECTED SERVER_UNAVAILABLE BAD_USERNAME_OR_PASSWORD NOT_AUTHORIZED )
Variables ¶
Glabal status
var G_messages map[uint64]*MqttMessage = make(map[uint64]*MqttMessage)
InternalId -> Message FIXME: Add code to store G_messages to disk
Map topic => sub sub is implemented as map, key is client_id, value is qos
Functions ¶
func Deliver ¶
func Deliver(dest_client_id string, dest_qos uint8, msg *MqttMessage)
func DeliverMessage ¶
func DeliverMessage(dest_client_id string, qos uint8, msg *MqttMessage)
Real heavy lifting jobs for delivering message
func DeliverOnConnection ¶
func DeliverOnConnection(client_id string)
func GetNextMessageInternalId ¶
func GetNextMessageInternalId() uint64
func HandleConnect ¶
Handle CONNECT
func MessageTypeStr ¶
func MqttSendToClient ¶
This is the main place to change if we need to use channel rather than lock
func PublishMessage ¶
func PublishMessage(mqtt_msg *MqttMessage)
func RecoverFromRedis ¶
func RecoverFromRedis()
This function should be called upon starting up. It will try to recover global status(G_subs, etc) from Redis.
func RemoveAllSubscriptionsOnConnect ¶
func RemoveAllSubscriptionsOnConnect(client_id string)
On connection, if clean session is set, call this method to clear all connections. This is the senario when previous CONNECT didn't set clean session bit but current one does
func RetryDeliver ¶
func RetryDeliver(sleep uint64, dest_client_id string, qos uint8, msg *MqttMessage)
func SendSuback ¶
Types ¶
type ClientRep ¶
type ClientRep struct { ClientId string Conn *net.Conn WriteLock *sync.Mutex LastTime int64 // Last Unix timestamp when recieved message from this client Shuttingdown chan uint8 Subscriptions map[string]uint8 Mqtt *Mqtt Disconnected bool }
func CreateClientRep ¶
func (*ClientRep) UpdateLastTime ¶
func (cr *ClientRep) UpdateLastTime()
type ConnectFlags ¶
type ConnectFlags struct {
UsernameFlag, PasswordFlag, WillRetain, WillFlag, CleanSession bool
WillQos uint8
}
func (*ConnectFlags) Show ¶
func (flags *ConnectFlags) Show()
type ConnectInfo ¶
type ConnectInfo struct { Protocol string // Must be 'MQIsdp' for now Version uint8 UsernameFlag bool PasswordFlag bool WillRetain bool WillQos uint8 WillFlag bool CleanSession bool Keepalive uint16 }
func (*ConnectInfo) Show ¶
func (con_info *ConnectInfo) Show()
type FixedHeader ¶
func ReadCompleteCommand ¶
func ReadCompleteCommand(conn *net.Conn) (*FixedHeader, []byte)
func ReadFixedHeader ¶
func ReadFixedHeader(conn *net.Conn) *FixedHeader
func (*FixedHeader) Show ¶
func (header *FixedHeader) Show()
type FlyingMessage ¶
type FlyingMessage struct { Qos uint8 // the Qos in effect DestClientId string MessageInternalId uint64 // The MqttMessage of interest Status uint8 // The status of this message, like PENDING_PUB(deliver occured // when client if offline), PENDING_ACK, etc ClientMessageId uint16 // The message id to be used in MQTT packet }
This is the type represents a message should be delivered to specific client
func CreateFlyingMessage ¶
type Mqtt ¶
type Mqtt struct { FixedHeader *FixedHeader ProtocolName, TopicName, ClientId, WillTopic, WillMessage, Username, Password string ProtocolVersion uint8 ConnectFlags *ConnectFlags KeepAliveTimer, MessageId uint16 Data []byte Topics []string Topics_qos []uint8 ReturnCode uint8 }
func CreateMqtt ¶
func DecodeAfterFixedHeader ¶
func DecodeAfterFixedHeader(fixed_header *FixedHeader, buf []byte) (*Mqtt, error)
type MqttMessage ¶
type MqttMessage struct { Topic string Payload string Qos uint8 SenderClientId string MessageId uint16 InternalId uint64 CreatedAt int64 Retain bool }
This is the type represents a message received from publisher. FlyingMessage(message should be delivered to specific subscribers) reference MqttMessage
func CreateMqttMessage ¶
func GetMqttMessageById ¶
func GetMqttMessageById(internal_id uint64) *MqttMessage
This is thread-safe
func (*MqttMessage) RedisKey ¶
func (msg *MqttMessage) RedisKey() string
func (*MqttMessage) Show ¶
func (msg *MqttMessage) Show()
func (*MqttMessage) Store ¶
func (msg *MqttMessage) Store()
type RedisClient ¶
var G_redis_client *RedisClient = StartRedisClient()
func StartRedisClient ¶
func StartRedisClient() *RedisClient
func (*RedisClient) AddFlyingMessage ¶
func (client *RedisClient) AddFlyingMessage(dest_id string, fly_msg *FlyingMessage)
func (*RedisClient) Delete ¶
func (client *RedisClient) Delete(key string)
func (*RedisClient) Expire ¶
func (client *RedisClient) Expire(key string, sec uint64)
func (*RedisClient) Fetch ¶
func (client *RedisClient) Fetch(key string, value interface{}) int
func (*RedisClient) FetchNoLock ¶
func (client *RedisClient) FetchNoLock(key string, value interface{}) int
func (*RedisClient) GetFlyingMessagesForClient ¶
func (client *RedisClient) GetFlyingMessagesForClient(client_id string) *map[uint16]FlyingMessage
func (*RedisClient) GetRetainMessage ¶
func (client *RedisClient) GetRetainMessage(topic string) *MqttMessage
func (*RedisClient) GetSubsClients ¶
func (client *RedisClient) GetSubsClients() []string
func (*RedisClient) IsFlyingMessagePendingAck ¶
func (client *RedisClient) IsFlyingMessagePendingAck(client_id string, message_id uint16) bool
func (*RedisClient) Reconnect ¶
func (client *RedisClient) Reconnect()
func (*RedisClient) RemoveAllFlyingMessagesForClient ¶
func (client *RedisClient) RemoveAllFlyingMessagesForClient(client_id string)
func (*RedisClient) SetFlyingMessagesForClient ¶
func (client *RedisClient) SetFlyingMessagesForClient(client_id string, messages *map[uint16]FlyingMessage)
func (*RedisClient) SetRetainMessage ¶
func (client *RedisClient) SetRetainMessage(topic string, msg *MqttMessage)
func (*RedisClient) Store ¶
func (client *RedisClient) Store(key string, value interface{})
func (*RedisClient) StoreNoLock ¶
func (client *RedisClient) StoreNoLock(key string, value interface{})