Documentation ¶
Overview ¶
Package gmqtt provides an MQTT v3.1.1 server library.
Example ¶
see /examples for more details.
s := NewServer() ln, err := net.Listen("tcp", ":1883") if err != nil { fmt.Println(err.Error()) return } s.AddTCPListenner(ln) ws := &WsServer{ Server: &http.Server{Addr: ":8080"}, } s.AddWebSocketServer(ws) s.RegisterOnConnect(func(client *Client) (code uint8) { return packets.CodeAccepted }) s.RegisterOnSubscribe(func(client *Client, topic packets.Topic) uint8 { fmt.Println("register onSubscribe callback") return packets.QOS_1 }) //register other callback before s.Run() s.Run() fmt.Println("started...") signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) <-signalCh s.Stop(context.Background()) fmt.Println("stopped")
Output:
Index ¶
- Constants
- Variables
- func SetLogger(l *logger.Logger)
- type Client
- type ClientInfo
- type ClientList
- type ClientOptions
- type InflightElem
- type Monitor
- func (m *Monitor) ClientSubscriptions(clientID string) SubscriptionList
- func (m *Monitor) Clients() ClientList
- func (m *Monitor) GetClient(clientID string) (ClientInfo, bool)
- func (m *Monitor) GetSession(clientID string) (SessionInfo, bool)
- func (m *Monitor) Sessions() SessionList
- func (m *Monitor) Subscriptions() SubscriptionList
- type MonitorRepository
- type MonitorStore
- func (m *MonitorStore) ClientSubscriptions(clientID string) SubscriptionList
- func (m *MonitorStore) Clients() ClientList
- func (m *MonitorStore) Close() error
- func (m *MonitorStore) DelClient(clientID string)
- func (m *MonitorStore) DelClientSubscriptions(clientID string)
- func (m *MonitorStore) DelSession(clientID string)
- func (m *MonitorStore) DelSubscription(clientID string, topicName string)
- func (m *MonitorStore) GetClient(clientID string) (ClientInfo, bool)
- func (m *MonitorStore) GetSession(clientID string) (SessionInfo, bool)
- func (m *MonitorStore) Open() error
- func (m *MonitorStore) PutClient(info ClientInfo)
- func (m *MonitorStore) PutSession(info SessionInfo)
- func (m *MonitorStore) PutSubscription(info SubscriptionsInfo)
- func (m *MonitorStore) Sessions() SessionList
- func (m *MonitorStore) Subscriptions() SubscriptionList
- type OnAccept
- type OnClose
- type OnConnect
- type OnPublish
- type OnStop
- type OnSubscribe
- type Server
- func (srv *Server) AddTCPListenner(ln ...net.Listener)
- func (srv *Server) AddWebSocketServer(Server ...*WsServer)
- func (srv *Server) Broadcast(publish *packets.Publish, clientIDs ...string)
- func (srv *Server) Client(clientID string) *Client
- func (srv *Server) Publish(publish *packets.Publish, clientIDs ...string)
- func (srv *Server) RegisterOnAccept(callback OnAccept)
- func (srv *Server) RegisterOnClose(callback OnClose)
- func (srv *Server) RegisterOnConnect(callback OnConnect)
- func (srv *Server) RegisterOnPublish(callback OnPublish)
- func (srv *Server) RegisterOnStop(callback OnStop)
- func (srv *Server) RegisterOnSubscribe(callback OnSubscribe)
- func (srv *Server) Run()
- func (srv *Server) SetDeliveryRetryInterval(duration time.Duration)
- func (srv *Server) SetMaxInflightMessages(i int)
- func (srv *Server) SetMaxQueueMessages(nums int)
- func (srv *Server) SetMsgRouterLen(i int)
- func (srv *Server) SetQueueQos0Messages(b bool)
- func (srv *Server) SetRegisterLen(i int)
- func (srv *Server) SetUnregisterLen(i int)
- func (srv *Server) Status() int32
- func (srv *Server) Stop(ctx context.Context) error
- func (srv *Server) Subscribe(clientID string, topics []packets.Topic)
- func (srv *Server) UnSubscribe(clientID string, topics []string)
- type SessionInfo
- type SessionList
- type SubscriptionList
- type SubscriptionsInfo
- type WsServer
Examples ¶
Constants ¶
const ( Connecting = iota Connected Switiching Disconnected )
Client status
const ( StatusOnline = "online" StatusOffline = "offline" )
Client Status
const ( DefaultDeliveryRetryInterval = 20 * time.Second DefaultQueueQos0Messages = true DefaultMaxInflightMessages = 20 DefaultMaxQueueMessages = 2048 DefaultMsgRouterLen = 4096 DefaultRegisterLen = 2048 DefaultUnRegisterLen = 2048 )
Default configration
Variables ¶
var ( ErrInvalStatus = errors.New("invalid connection status") ErrConnectTimeOut = errors.New("connect time out") )
Error
var ( // ErrInvalWsMsgType [MQTT-6.0.0-1] ErrInvalWsMsgType = errors.New("invalid websocket message type") )
Functions ¶
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a MQTT client
func (*Client) ClientOptions ¶
func (client *Client) ClientOptions() ClientOptions
ClientOptions returns the ClientOptions. This is mainly used in callback functions. See ./example/hook
func (*Client) Close ¶
func (client *Client) Close() <-chan struct{}
Close 关闭客户端连接,连接关闭完毕会将返回的channel关闭。
Close closes the client connection. The returned channel will be closed after unregister process has been done
func (*Client) IsConnected ¶
IsConnected returns whether the client is connected or not.
func (*Client) SetUserData ¶
func (client *Client) SetUserData(data interface{})
SetUserData is used to bind user data to the client
type ClientInfo ¶
type ClientInfo struct { ClientID string `json:"client_id"` Username string `json:"username"` RemoteAddr string `json:"remote_addr"` CleanSession bool `json:"clean_session"` KeepAlive uint16 `json:"keep_alive"` ConnectedAt time.Time `json:"connected_at"` }
ClientInfo represents a connected client
type ClientList ¶
type ClientList []ClientInfo
ClientList represents ClientInfo slice
func (ClientList) Len ¶
func (c ClientList) Len() int
func (ClientList) Less ¶
func (c ClientList) Less(i, j int) bool
func (ClientList) Swap ¶
func (c ClientList) Swap(i, j int)
type ClientOptions ¶
type ClientOptions struct { ClientID string Username string Password string KeepAlive uint16 CleanSession bool WillFlag bool WillRetain bool WillQos uint8 WillTopic string WillPayload []byte }
ClientOptions is mainly used in callback functions. See ClientOptions()
type InflightElem ¶
type InflightElem struct { //At is the entry time At time.Time //Pid is the packetID Pid packets.PacketID //Packet represents Publish packet Packet *packets.Publish Step int }
InflightElem is the element type in inflight queue
type Monitor ¶
type Monitor struct { sync.Mutex Repository MonitorRepository }
Monitor is used internally to save and get monitor data
func (*Monitor) ClientSubscriptions ¶
func (m *Monitor) ClientSubscriptions(clientID string) SubscriptionList
ClientSubscriptions returns the subscription info for the given clientID
func (*Monitor) Clients ¶
func (m *Monitor) Clients() ClientList
Clients returns the info for all connected clients
func (*Monitor) GetClient ¶
func (m *Monitor) GetClient(clientID string) (ClientInfo, bool)
GetClient returns the client info for the given clientID
func (*Monitor) GetSession ¶
func (m *Monitor) GetSession(clientID string) (SessionInfo, bool)
GetSession returns the session info for the given clientID
func (*Monitor) Sessions ¶
func (m *Monitor) Sessions() SessionList
Sessions returns the session info for all sessions
func (*Monitor) Subscriptions ¶
func (m *Monitor) Subscriptions() SubscriptionList
Subscriptions returns all subscription info
type MonitorRepository ¶
type MonitorRepository interface { //Open opens the repository Open() error //Close closes the repository Close() error //PutClient puts a ClientInfo into the repository when the client connects PutClient(info ClientInfo) //GetClient returns the ClientInfo for the given clientID GetClient(clientID string) (ClientInfo, bool) //Clients returns ClientList which is the list for all connected clients, this method should be idempotency Clients() ClientList //DelClient deletes the ClientInfo from repository DelClient(clientID string) //PutSession puts a SessionInfo into monitor repository when the client is connects PutSession(info SessionInfo) //GetSession returns the SessionInfo for the given clientID GetSession(clientID string) (SessionInfo, bool) //Sessions returns SessionList which is the list for all sessions including online sessions and offline sessions, this method should be idempotency Sessions() SessionList //DelSession deletes the SessionInfo from repository DelSession(clientID string) //ClientSubscriptions returns the SubscriptionList for given clientID, this method should be idempotency ClientSubscriptions(clientID string) SubscriptionList //DelClientSubscriptions deletes the subscription info for given clientID from the repository DelClientSubscriptions(clientID string) //PutSubscription puts the SubscriptionsInfo into the repository when a new subscription is made PutSubscription(info SubscriptionsInfo) //DelSubscription deletes the topic for given clientID from repository DelSubscription(clientID string, topicName string) //Subscriptions returns all subscriptions of the server Subscriptions() SubscriptionList }
MonitorRepository is an interface which can be used to provide a persistence mechanics for the monitor data
type MonitorStore ¶
type MonitorStore struct {
// contains filtered or unexported fields
}
MonitorStore implements the MonitorRepository interface to provide an in-memory monitor repository
func (*MonitorStore) ClientSubscriptions ¶
func (m *MonitorStore) ClientSubscriptions(clientID string) SubscriptionList
ClientSubscriptions returns the SubscriptionList for given clientID, this method should be idempotency
func (*MonitorStore) Clients ¶
func (m *MonitorStore) Clients() ClientList
Clients returns ClientList which is the list for all connected clients, this method should be idempotency
func (*MonitorStore) DelClient ¶
func (m *MonitorStore) DelClient(clientID string)
DelClient deletes the ClientInfo from repository
func (*MonitorStore) DelClientSubscriptions ¶
func (m *MonitorStore) DelClientSubscriptions(clientID string)
DelClientSubscriptions deletes the subscription info for given clientID from the repository
func (*MonitorStore) DelSession ¶
func (m *MonitorStore) DelSession(clientID string)
DelSession deletes the SessionInfo from repository
func (*MonitorStore) DelSubscription ¶
func (m *MonitorStore) DelSubscription(clientID string, topicName string)
DelSubscription deletes the topic for given clientID from repository
func (*MonitorStore) GetClient ¶
func (m *MonitorStore) GetClient(clientID string) (ClientInfo, bool)
GetClient returns the ClientInfo for the given clientID
func (*MonitorStore) GetSession ¶
func (m *MonitorStore) GetSession(clientID string) (SessionInfo, bool)
GetSession returns the SessionInfo for the given clientID
func (*MonitorStore) PutClient ¶
func (m *MonitorStore) PutClient(info ClientInfo)
PutClient puts a ClientInfo into the repository when the client connects
func (*MonitorStore) PutSession ¶
func (m *MonitorStore) PutSession(info SessionInfo)
PutSession puts a SessionInfo into monitor repository when the client is connects
func (*MonitorStore) PutSubscription ¶
func (m *MonitorStore) PutSubscription(info SubscriptionsInfo)
PutSubscription puts the SubscriptionsInfo into the repository when a new subscription is made
func (*MonitorStore) Sessions ¶
func (m *MonitorStore) Sessions() SessionList
Sessions returns SessionList which is the list for all sessions including online sessions and offline sessions, this method should be idempotency
func (*MonitorStore) Subscriptions ¶
func (m *MonitorStore) Subscriptions() SubscriptionList
Subscriptions returns all subscriptions of the server
type OnAccept ¶
OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接
OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.
type OnClose ¶
OnClose tcp连接关闭之后触发
OnClose will be called after the tcp connection of the client has been closed
type OnConnect ¶
OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码
OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet
type OnPublish ¶
OnPublish 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发
OnPublish returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.
type OnSubscribe ¶
OnSubscribe 返回topic允许订阅的最高QoS等级
OnSubscribe returns the maximum available QoS for the topic:
0x00 - Success - Maximum QoS 0 0x01 - Success - Maximum QoS 1 0x02 - Success - Maximum QoS 2 0x80 - Failure
type Server ¶
type Server struct { //Monitor Monitor *Monitor // contains filtered or unexported fields }
Server represents a mqtt server instance. Create an instance of Server, by using NewServer()
func (*Server) AddTCPListenner ¶
AddTCPListenner adds tcp listeners to mqtt server. This method enables the mqtt server to serve on multiple ports.
func (*Server) AddWebSocketServer ¶
AddWebSocketServer adds websocket server to mqtt server.
func (*Server) Broadcast ¶
Broadcast 广播一个消息,此消息不受主题限制。默认广播到所有的客户端中去,如果clientIDs有设置,则只会广播到clientIDs指定的客户端。
Broadcast broadcasts the message to all clients. If the second param clientIDs is set, the message will only send to the clients specified by the clientIDs.
Notice: This method will not trigger the onPublish callback
func (*Server) Publish ¶
Publish 主动发布一个主题,如果clientIDs没有设置,则默认会转发到所有有匹配主题的客户端,如果clientIDs有设置,则只会转发到clientIDs指定的有匹配主题的客户端。
Publish publishs a message to the broker. If the second param is not set, the message will be distributed to any clients that has matched subscriptions. If the second param clientIDs is set, the message will only try to distributed to the clients specified by the clientIDs
Notice: This method will not trigger the onPublish callback
func (*Server) RegisterOnAccept ¶
RegisterOnAccept registers a onAccept callback. A panic will cause if any RegisterOnXXX is called after server.Run()
func (*Server) RegisterOnClose ¶
RegisterOnClose registers a onClose callback.
func (*Server) RegisterOnConnect ¶
RegisterOnConnect registers a onConnect callback.
func (*Server) RegisterOnPublish ¶
RegisterOnPublish registers a onPublish callback.
func (*Server) RegisterOnStop ¶
RegisterOnStop registers a onStop callback.
func (*Server) RegisterOnSubscribe ¶
func (srv *Server) RegisterOnSubscribe(callback OnSubscribe)
RegisterOnSubscribe registers a onSubscribe callback.
func (*Server) Run ¶
func (srv *Server) Run()
Run starts the mqtt server. This method is non-blocking
func (*Server) SetDeliveryRetryInterval ¶
SetDeliveryRetryInterval sets the delivery retry interval.
func (*Server) SetMaxInflightMessages ¶
SetMaxInflightMessages sets the maximum inflight messages.
func (*Server) SetMaxQueueMessages ¶
SetMaxQueueMessages sets the maximum queue messages.
func (*Server) SetMsgRouterLen ¶
SetMsgRouterLen sets the length of msgRouter channel.
func (*Server) SetQueueQos0Messages ¶
SetQueueQos0Messages sets whether to queue QoS 0 messages. Default to true.
func (*Server) SetRegisterLen ¶
SetRegisterLen sets the length of register channel.
func (*Server) SetUnregisterLen ¶
SetUnregisterLen sets the length of unregister channel.
func (*Server) Stop ¶
Stop gracefully stops the mqtt server by the following steps:
- Closing all open TCP listeners and shutting down all open websocket servers
- Closing all idle connections
- Waiting for all connections have been closed
- Triggering OnStop()
func (*Server) Subscribe ¶
Subscribe 为某一个客户端订阅主题
Subscribe subscribes topics for the client specified by clientID.
Notice: This method will not trigger the onSubscribe callback
func (*Server) UnSubscribe ¶
UnSubscribe 为某一个客户端取消订阅某个主题
UnSubscribe unsubscribes topics for the client specified by clientID.
type SessionInfo ¶
type SessionInfo struct { ClientID string `json:"client_id"` Status string `json:"status"` RemoteAddr string `json:"remote_addr"` CleanSession bool `json:"clean_session"` Subscriptions int `json:"subscriptions"` MaxInflight int `json:"max_inflight"` InflightLen int `json:"inflight_len"` MaxMsgQueue int `json:"max_msg_queue"` MsgQueueLen int `json:"msg_queue_len"` MsgQueueDropped int `json:"msg_queue_dropped"` ConnectedAt time.Time `json:"connected_at"` OfflineAt time.Time `json:"offline_at,omitempty"` }
SessionInfo represents a session
type SessionList ¶
type SessionList []SessionInfo
SessionList represent SessionInfo slice
func (SessionList) Len ¶
func (s SessionList) Len() int
func (SessionList) Less ¶
func (s SessionList) Less(i, j int) bool
func (SessionList) Swap ¶
func (s SessionList) Swap(i, j int)
type SubscriptionList ¶
type SubscriptionList []SubscriptionsInfo
SubscriptionList is SubscriptionsInfo slice
func (SubscriptionList) Len ¶
func (s SubscriptionList) Len() int
func (SubscriptionList) Less ¶
func (s SubscriptionList) Less(i, j int) bool
func (SubscriptionList) Swap ¶
func (s SubscriptionList) Swap(i, j int)