Documentation ¶
Overview ¶
Package gmqtt provides an MQTT v3.1.1 server library.
Example ¶
see /examples for more details.
ln, err := net.Listen("tcp", ":1883") if err != nil { fmt.Println(err.Error()) return } ws := &WsServer{ Server: &http.Server{Addr: ":8080"}, Path: "/", } l, _ := zap.NewProduction() srv := NewServer( WithTCPListener(ln), WithWebsocketServer(ws), // add config WithConfig(DefaultConfig), // add plugins // WithPlugin(prometheus.New(&http.Server{Addr: ":8082"}, "/metrics")), // add Hook WithHook(Hooks{ OnConnect: func(ctx context.Context, client Client) (code uint8) { return packets.CodeAccepted }, OnSubscribe: func(ctx context.Context, client Client, topic packets.Topic) (qos uint8) { fmt.Println("register onSubscribe callback") return packets.QOS_1 }, }), // add logger WithLogger(l), ) srv.Run() fmt.Println("started...") signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) <-signalCh srv.Stop(context.Background()) fmt.Println("stopped")
Output:
Index ¶
- Constants
- Variables
- func LoggerWithField(fields ...zap.Field) *zap.Logger
- func NewMessage(topic string, payload []byte, qos uint8, opts ...msgOptions) packets.Message
- func NewServer(opts ...Options) *server
- func Retained(retained bool) msgOptions
- type Client
- type ClientOptionsReader
- type ClientStats
- type Config
- type DeliveryMode
- type HookWrapper
- type Hooks
- type MessageStats
- type OnAccept
- type OnAcceptWrapper
- type OnAcked
- type OnAckedWrapper
- type OnClose
- type OnCloseWrapper
- type OnConnect
- type OnConnectWrapper
- type OnConnected
- type OnConnectedWrapper
- type OnDeliver
- type OnDeliverWrapper
- type OnMsgArrived
- type OnMsgArrivedWrapper
- type OnMsgDropped
- type OnMsgDroppedWrapper
- type OnSessionCreated
- type OnSessionCreatedWrapper
- type OnSessionResumed
- type OnSessionResumedWrapper
- type OnSessionTerminated
- type OnSessionTerminatedWrapper
- type OnStop
- type OnStopWrapper
- type OnSubscribe
- type OnSubscribeWrapper
- type OnSubscribed
- type OnSubscribedWrapper
- type OnUnsubscribe
- type OnUnsubscribeWrapper
- type OnUnsubscribed
- type OnUnsubscribedWrapper
- type Options
- type PacketBytes
- type PacketCount
- type PacketStats
- type Plugable
- type PublishService
- type Server
- type ServerStats
- type SessionStats
- type SessionStatsManager
- type SessionTerminatedReason
- type StatsManager
- type WsServer
Examples ¶
Constants ¶
const ( Connecting = iota Connected Switiching Disconnected )
Client status
const ( DefaultMsgRouterLen = 4096 DefaultRegisterLen = 2048 DefaultUnRegisterLen = 2048 )
Default configration
Variables ¶
var ( ErrInvalStatus = errors.New("invalid connection status") ErrConnectTimeOut = errors.New("connect time out") )
Error
var DefaultConfig = Config{ RetryInterval: 20 * time.Second, RetryCheckInterval: 20 * time.Second, SessionExpiryInterval: 0 * time.Second, SessionExpiryCheckInterval: 0 * time.Second, QueueQos0Messages: true, MaxInflight: 32, MaxAwaitRel: 100, MaxMsgQueue: 1000, DeliveryMode: OnlyOnce, MsgRouterLen: DefaultMsgRouterLen, RegisterLen: DefaultRegisterLen, UnregisterLen: DefaultUnRegisterLen, }
DefaultConfig default config used by NewServer()
var ( // ErrInvalWsMsgType [MQTT-6.0.0-1] ErrInvalWsMsgType = errors.New("invalid websocket message type") )
Functions ¶
func LoggerWithField ¶
LoggerWithField add fields to a new logger. Plugins can use this method to add plugin name field.
func NewMessage ¶
NewMessage creates a message for publish service.
Types ¶
type Client ¶
type Client interface { // OptionsReader returns ClientOptionsReader for reading options data. OptionsReader() ClientOptionsReader // IsConnected returns whether the client is connected. IsConnected() bool // ConnectedAt returns the connected time ConnectedAt() time.Time // DisconnectedAt return the disconnected time DisconnectedAt() time.Time // Connection returns the raw net.Conn Connection() net.Conn // Close closes the client connection. The returned channel will be closed after unregister process has been done Close() <-chan struct{} GetSessionStatsManager() SessionStatsManager }
Client represent
type ClientOptionsReader ¶
type ClientOptionsReader interface { ClientID() string Username() string Password() string KeepAlive() uint16 CleanSession() bool WillFlag() bool WillRetain() bool WillQos() uint8 WillTopic() string WillPayload() []byte LocalAddr() net.Addr RemoteAddr() net.Addr }
ClientOptionsReader is mainly used in callback functions.
type ClientStats ¶
type ClientStats struct { ConnectedTotal uint64 DisconnectedTotal uint64 // ActiveCurrent is the number of current active session. ActiveCurrent uint64 // InactiveCurrent is the number of current inactive session. InactiveCurrent uint64 // ExpiredTotal is the number of expired session. ExpiredTotal uint64 }
ClientStats provides the statistics of client connections.
type Config ¶
type Config struct { RetryInterval time.Duration RetryCheckInterval time.Duration SessionExpiryInterval time.Duration SessionExpiryCheckInterval time.Duration QueueQos0Messages bool MaxInflight int MaxAwaitRel int MaxMsgQueue int DeliveryMode DeliveryMode MsgRouterLen int RegisterLen int UnregisterLen int }
type DeliveryMode ¶
type DeliveryMode int
const ( Overlap DeliveryMode = 0 OnlyOnce DeliveryMode = 1 )
type HookWrapper ¶
type HookWrapper struct { OnConnectWrapper OnConnectWrapper OnConnectedWrapper OnConnectedWrapper OnSessionCreatedWrapper OnSessionCreatedWrapper OnSessionResumedWrapper OnSessionResumedWrapper OnSessionTerminatedWrapper OnSessionTerminatedWrapper OnSubscribeWrapper OnSubscribeWrapper OnSubscribedWrapper OnSubscribedWrapper OnUnsubscribeWrapper OnUnsubscribeWrapper OnUnsubscribedWrapper OnUnsubscribedWrapper OnMsgArrivedWrapper OnMsgArrivedWrapper OnAckedWrapper OnAckedWrapper OnMsgDroppedWrapper OnMsgDroppedWrapper OnDeliverWrapper OnDeliverWrapper OnCloseWrapper OnCloseWrapper OnAcceptWrapper OnAcceptWrapper OnStopWrapper OnStopWrapper }
HookWrapper groups all hook wrappers function
type MessageStats ¶
type MessageStats struct { Qos0 struct { DroppedTotal uint64 ReceivedTotal uint64 SentTotal uint64 } Qos1 struct { DroppedTotal uint64 ReceivedTotal uint64 SentTotal uint64 } Qos2 struct { DroppedTotal uint64 ReceivedTotal uint64 SentTotal uint64 } QueuedCurrent uint64 }
MessageStats represents the statistics of PUBLISH packet, separated by QOS.
type OnAccept ¶
OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接
OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.
type OnAcceptWrapper ¶
type OnAcked ¶
OnAcked 当客户端对qos1或qos2返回确认的时候调用
OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.
type OnAckedWrapper ¶
type OnClose ¶
OnClose tcp连接关闭之后触发
OnClose will be called after the tcp connection of the client has been closed
type OnCloseWrapper ¶
type OnConnect ¶
OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码
OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet
type OnConnectWrapper ¶
type OnConnected ¶
OnConnected 当客户端成功连接后触发
OnConnected will be called when a mqtt client connect successfully.
type OnConnectedWrapper ¶
type OnConnectedWrapper func(OnConnected) OnConnected
type OnDeliverWrapper ¶
type OnMsgArrived ¶
OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发
OnMsgArrived returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.
type OnMsgArrivedWrapper ¶
type OnMsgArrivedWrapper func(OnMsgArrived) OnMsgArrived
type OnMsgDroppedWrapper ¶
type OnMsgDroppedWrapper func(OnMsgDropped) OnMsgDropped
type OnSessionCreated ¶
OnSessionCreated 新建session时触发
OnSessionCreated will be called when session created.
type OnSessionCreatedWrapper ¶
type OnSessionCreatedWrapper func(OnSessionCreated) OnSessionCreated
type OnSessionResumed ¶
OnSessionResumed 恢复session时触发
OnSessionResumed will be called when session resumed.
type OnSessionResumedWrapper ¶
type OnSessionResumedWrapper func(OnSessionResumed) OnSessionResumed
type OnSessionTerminated ¶
type OnSessionTerminated func(ctx context.Context, client Client, reason SessionTerminatedReason)
OnSessionTerminated session 下线时触发
OnSessionTerminated will be called when session terminated.
type OnSessionTerminatedWrapper ¶
type OnSessionTerminatedWrapper func(OnSessionTerminated) OnSessionTerminated
type OnStopWrapper ¶
type OnSubscribe ¶
OnSubscribe 返回topic允许订阅的最高QoS等级
OnSubscribe returns the maximum available QoS for the topic:
0x00 - Success - Maximum QoS 0 0x01 - Success - Maximum QoS 1 0x02 - Success - Maximum QoS 2 0x80 - Failure
type OnSubscribeWrapper ¶
type OnSubscribeWrapper func(OnSubscribe) OnSubscribe
type OnSubscribed ¶
OnSubscribed will be called after the topic subscribe successfully
type OnSubscribedWrapper ¶
type OnSubscribedWrapper func(OnSubscribed) OnSubscribed
type OnUnsubscribe ¶
OnUnsubscribe will be called when the topic is being unsubscribed
type OnUnsubscribeWrapper ¶
type OnUnsubscribeWrapper func(OnUnsubscribe) OnUnsubscribe
type OnUnsubscribed ¶
OnUnsubscribed will be called after the topic has been unsubscribed
type OnUnsubscribedWrapper ¶
type OnUnsubscribedWrapper func(OnUnsubscribed) OnUnsubscribed
type Options ¶
type Options func(srv *server)
func WithLogger ¶
func WithPlugin ¶
WithPlugin set plugin(s) of the server.
func WithTCPListener ¶
WithTCPListener set tcp listener(s) of the server. Default listen on :1883.
func WithWebsocketServer ¶
WithWebsocketServer set websocket server(s) of the server.
type PacketBytes ¶
type PacketBytes struct { Connect uint64 Connack uint64 Disconnect uint64 Pingreq uint64 Pingresp uint64 Puback uint64 Pubcomp uint64 Publish uint64 Pubrec uint64 Pubrel uint64 Suback uint64 Subscribe uint64 Unsuback uint64 Unsubscribe uint64 }
PacketBytes represents total bytes of each packet type have been received or sent.
type PacketCount ¶
type PacketCount struct { Connect uint64 Connack uint64 Disconnect uint64 Pingreq uint64 Pingresp uint64 Puback uint64 Pubcomp uint64 Publish uint64 Pubrec uint64 Pubrel uint64 Suback uint64 Subscribe uint64 Unsuback uint64 Unsubscribe uint64 }
PacketCount represents total number of each packet type have been received or sent.
type PacketStats ¶
type PacketStats struct { BytesReceived *PacketBytes ReceivedTotal *PacketCount BytesSent *PacketBytes SentTotal *PacketCount }
PacketStats represents the statistics of MQTT Packet.
type Plugable ¶
type Plugable interface { // Load will be called in server.Run(). If return error, the server will panic. Load(service Server) error // Unload will be called when the server is shutdown, the return error is only for logging Unload() error // HookWrapper returns all hook wrappers that used by the plugin. // Return a empty wrapper if the plugin does not need any hooks HookWrapper() HookWrapper // Name return the plugin name Name() string }
Plugable is the interface need to be implemented for every plugins.
type PublishService ¶
type PublishService interface { // Publish publish a message to broker. // Calling this method will not trigger OnMsgArrived hook. Publish(message packets.Message) // PublishToClient publish a message to a specific client. // If match sets to true, the message will send to the client // only if the client is subscribed to a topic that matches the message. // If match sets to false, the message will send to the client directly even // there are no matched subscriptions. // Calling this method will not trigger OnMsgArrived hook. PublishToClient(clientID string, message packets.Message, match bool) }
PublishService provides the ability to publish messages to the broker.
type Server ¶
type Server interface { // SubscriptionStore returns the subscription.Store. SubscriptionStore() subscription.Store // RetainedStore returns the retained.Store. RetainedStore() retained.Store // PublishService returns the PublishService PublishService() PublishService // Client return the client specified by clientID. Client(clientID string) Client // GetConfig returns the config of the server GetConfig() Config // GetStatsManager returns StatsManager GetStatsManager() StatsManager }
Server interface represents a mqtt server instance.
type ServerStats ¶
type ServerStats struct { PacketStats *PacketStats ClientStats *ClientStats MessageStats *MessageStats SubscriptionStats *subscription.Stats }
ServerStats is the collection of global statistics.
type SessionStats ¶
type SessionStats struct { // InflightCurrent, the current length of the inflight queue. InflightCurrent uint64 // AwaitRelCurrent, the current length of the awaitRel queue. AwaitRelCurrent uint64 MessageStats }
SessionStats the collection of statistics of each session.
type SessionStatsManager ¶
type SessionStatsManager interface { // GetStats return the session statistics GetStats() *SessionStats // contains filtered or unexported methods }
SessionStatsManager interface provides the ability to access the statistics of the session
type SessionTerminatedReason ¶
type SessionTerminatedReason byte
const ( NormalTermination SessionTerminatedReason = iota ConflictTermination ExpiredTermination )
type StatsManager ¶
type StatsManager interface { // GetStats return the server statistics GetStats() *ServerStats // contains filtered or unexported methods }
StatsManager interface provides the ability to access the statistics of the server