Documentation ¶
Overview ¶
Package gmqtt provides an MQTT v3.1.1 server library.
Example ¶
see /examples for more details.
ln, err := net.Listen("tcp", ":1883") if err != nil { fmt.Println(err.Error()) return } ws := &WsServer{ Server: &http.Server{Addr: ":8080"}, Path: "/", } l, _ := zap.NewProduction() srv := NewServer( WithTCPListener(ln), WithWebsocketServer(ws), // add config WithConfig(DefaultConfig), // add plugins // WithPlugin(prometheus.New(&http.Server{Addr: ":8082"}, "/metrics")), // add Hook WithHook(Hooks{ OnConnect: func(ctx context.Context, client Client) (code uint8) { return packets.CodeAccepted }, OnSubscribe: func(ctx context.Context, client Client, topic packets.Topic) (qos uint8) { fmt.Println("register onSubscribe callback") return packets.QOS_1 }, }), // add logger WithLogger(l), ) srv.Run() fmt.Println("started...") signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) <-signalCh srv.Stop(context.Background()) fmt.Println("stopped")
Output:
Index ¶
- Constants
- Variables
- func LoggerWithField(fields ...zap.Field) *zap.Logger
- func NewMessage(topic string, payload []byte, qos uint8, opts ...msgOptions) packets.Message
- func NewServer(opts ...Options) *server
- func Retained(retained bool) msgOptions
- type Client
- type ClientOptionsReader
- type ClientStats
- type Config
- type DeliveryMode
- type HookWrapper
- type Hooks
- type MessageStats
- type OnAccept
- type OnAcceptWrapper
- type OnAcked
- type OnAckedWrapper
- type OnClose
- type OnCloseWrapper
- type OnConnect
- type OnConnectWrapper
- type OnConnected
- type OnConnectedWrapper
- type OnDeliver
- type OnDeliverWrapper
- type OnMsgArrived
- type OnMsgArrivedWrapper
- type OnMsgDropped
- type OnMsgDroppedWrapper
- type OnSessionCreated
- type OnSessionCreatedWrapper
- type OnSessionResumed
- type OnSessionResumedWrapper
- type OnSessionTerminated
- type OnSessionTerminatedWrapper
- type OnStop
- type OnStopWrapper
- type OnSubscribe
- type OnSubscribeWrapper
- type OnSubscribed
- type OnSubscribedWrapper
- type OnUnsubscribed
- type OnUnsubscribedWrapper
- type Options
- type PacketBytes
- type PacketCount
- type PacketStats
- type Plugable
- type PublishService
- type Server
- type ServerStats
- type SessionStats
- type SessionStatsManager
- type SessionTerminatedReason
- type StatsManager
- type WsServer
Examples ¶
Constants ¶
const ( Connecting = iota Connected Switiching Disconnected )
Client status
const ( DefaultMsgRouterLen = 4096 DefaultRegisterLen = 2048 DefaultUnRegisterLen = 2048 )
Default configration
Variables ¶
var ( ErrInvalStatus = errors.New("invalid connection status") ErrConnectTimeOut = errors.New("connect time out") )
Error
var DefaultConfig = Config{ RetryInterval: 20 * time.Second, RetryCheckInterval: 20 * time.Second, SessionExpiryInterval: 0 * time.Second, SessionExpiryCheckInterval: 0 * time.Second, QueueQos0Messages: true, MaxInflight: 32, MaxAwaitRel: 100, MaxMsgQueue: 1000, DeliveryMode: OnlyOnce, MsgRouterLen: DefaultMsgRouterLen, RegisterLen: DefaultRegisterLen, UnregisterLen: DefaultUnRegisterLen, }
DefaultConfig default config used by NewServer()
var ( // ErrInvalWsMsgType [MQTT-6.0.0-1] ErrInvalWsMsgType = errors.New("invalid websocket message type") )
Functions ¶
func LoggerWithField ¶
LoggerWithField add fields to a new logger. Plugins can use this method to add plugin name field.
func NewMessage ¶
NewMessage creates a message for publish service.
Types ¶
type Client ¶
type Client interface { // OptionsReader returns ClientOptionsReader for reading options data. OptionsReader() ClientOptionsReader // IsConnected returns whether the client is connected. IsConnected() bool // ConnectedAt returns the connected time ConnectedAt() time.Time // DisconnectedAt return the disconnected time DisconnectedAt() time.Time // Close closes the client connection. The returned channel will be closed after unregister process has been done Close() <-chan struct{} GetSessionStatsManager() SessionStatsManager }
Client represent
type ClientOptionsReader ¶
type ClientOptionsReader interface { ClientID() string Username() string Password() string KeepAlive() uint16 CleanSession() bool WillFlag() bool WillRetain() bool WillQos() uint8 WillTopic() string WillPayload() []byte LocalAddr() net.Addr RemoteAddr() net.Addr }
ClientOptionsReader is mainly used in callback functions.
type ClientStats ¶
type ClientStats struct { ConnectedTotal uint64 DisconnectedTotal uint64 // ActiveCurrent is the number of current active session. ActiveCurrent uint64 // InactiveCurrent is the number of current inactive session. InactiveCurrent uint64 // ExpiredTotal is the number of expired session. ExpiredTotal uint64 }
ClientStats provides the statistics of client connections.
type Config ¶
type Config struct { RetryInterval time.Duration RetryCheckInterval time.Duration SessionExpiryInterval time.Duration SessionExpiryCheckInterval time.Duration QueueQos0Messages bool MaxInflight int MaxAwaitRel int MaxMsgQueue int DeliveryMode DeliveryMode MsgRouterLen int RegisterLen int UnregisterLen int }
type DeliveryMode ¶
type DeliveryMode int
const ( Overlap DeliveryMode = 0 OnlyOnce DeliveryMode = 1 )
type HookWrapper ¶
type HookWrapper struct { OnConnectWrapper OnConnectWrapper OnConnectedWrapper OnConnectedWrapper OnSessionCreatedWrapper OnSessionCreatedWrapper OnSessionResumedWrapper OnSessionResumedWrapper OnSessionTerminatedWrapper OnSessionTerminatedWrapper OnSubscribeWrapper OnSubscribeWrapper OnSubscribedWrapper OnSubscribedWrapper OnUnsubscribedWrapper OnUnsubscribedWrapper OnMsgArrivedWrapper OnMsgArrivedWrapper OnAckedWrapper OnAckedWrapper OnMsgDroppedWrapper OnMsgDroppedWrapper OnDeliverWrapper OnDeliverWrapper OnCloseWrapper OnCloseWrapper OnAcceptWrapper OnAcceptWrapper OnStopWrapper OnStopWrapper }
HookWrapper groups all hook wrappers function
type MessageStats ¶
type MessageStats struct { Qos0 struct { DroppedTotal uint64 ReceivedTotal uint64 SentTotal uint64 } Qos1 struct { DroppedTotal uint64 ReceivedTotal uint64 SentTotal uint64 } Qos2 struct { DroppedTotal uint64 ReceivedTotal uint64 SentTotal uint64 } QueuedCurrent uint64 }
MessageStats represents the statistics of PUBLISH packet, separated by QOS.
type OnAccept ¶
OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接
OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.
type OnAcceptWrapper ¶
type OnAcked ¶
OnAcked 当客户端对qos1或qos2返回确认的时候调用
OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.
type OnAckedWrapper ¶
type OnClose ¶
OnClose tcp连接关闭之后触发
OnClose will be called after the tcp connection of the client has been closed
type OnCloseWrapper ¶
type OnConnect ¶
OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码
OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet
type OnConnectWrapper ¶
type OnConnected ¶
OnConnected 当客户端成功连接后触发
OnConnected will be called when a mqtt client connect successfully.
type OnConnectedWrapper ¶
type OnConnectedWrapper func(OnConnected) OnConnected
type OnDeliverWrapper ¶
type OnMsgArrived ¶
OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发
OnMsgArrived returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.
type OnMsgArrivedWrapper ¶
type OnMsgArrivedWrapper func(OnMsgArrived) OnMsgArrived
type OnMsgDroppedWrapper ¶
type OnMsgDroppedWrapper func(OnMsgDropped) OnMsgDropped
type OnSessionCreated ¶
OnSessionCreated 新建session时触发
OnSessionCreated will be called when session created.
type OnSessionCreatedWrapper ¶
type OnSessionCreatedWrapper func(OnSessionCreated) OnSessionCreated
type OnSessionResumed ¶
OnSessionResumed 恢复session时触发
OnSessionResumed will be called when session resumed.
type OnSessionResumedWrapper ¶
type OnSessionResumedWrapper func(OnSessionResumed) OnSessionResumed
type OnSessionTerminated ¶
type OnSessionTerminated func(ctx context.Context, client Client, reason SessionTerminatedReason)
OnSessionTerminated session 下线时触发
OnSessionTerminated will be called when session terminated.
type OnSessionTerminatedWrapper ¶
type OnSessionTerminatedWrapper func(OnSessionTerminated) OnSessionTerminated
type OnStopWrapper ¶
type OnSubscribe ¶
OnSubscribe 返回topic允许订阅的最高QoS等级
OnSubscribe returns the maximum available QoS for the topic:
0x00 - Success - Maximum QoS 0 0x01 - Success - Maximum QoS 1 0x02 - Success - Maximum QoS 2 0x80 - Failure
type OnSubscribeWrapper ¶
type OnSubscribeWrapper func(OnSubscribe) OnSubscribe
type OnSubscribed ¶
OnSubscribed will be called after the topic subscribe successfully
type OnSubscribedWrapper ¶
type OnSubscribedWrapper func(OnSubscribed) OnSubscribed
type OnUnsubscribed ¶
OnUnsubscribed will be called after the topic has been unsubscribed
type OnUnsubscribedWrapper ¶
type OnUnsubscribedWrapper func(OnUnsubscribed) OnUnsubscribed
type Options ¶
type Options func(srv *server)
func WithLogger ¶
func WithPlugin ¶
WithPlugin set plugin(s) of the server.
func WithTCPListener ¶
WithTCPListener set tcp listener(s) of the server. Default listen on :1883.
func WithWebsocketServer ¶
WithWebsocketServer set websocket server(s) of the server.
type PacketBytes ¶
type PacketBytes struct { Connect uint64 Connack uint64 Disconnect uint64 Pingreq uint64 Pingresp uint64 Puback uint64 Pubcomp uint64 Publish uint64 Pubrec uint64 Pubrel uint64 Suback uint64 Subscribe uint64 Unsuback uint64 Unsubscribe uint64 }
PacketBytes represents total bytes of each packet type have been received or sent.
type PacketCount ¶
type PacketCount struct { Connect uint64 Connack uint64 Disconnect uint64 Pingreq uint64 Pingresp uint64 Puback uint64 Pubcomp uint64 Publish uint64 Pubrec uint64 Pubrel uint64 Suback uint64 Subscribe uint64 Unsuback uint64 Unsubscribe uint64 }
PacketCount represents total number of each packet type have been received or sent.
type PacketStats ¶
type PacketStats struct { BytesReceived *PacketBytes ReceivedTotal *PacketCount BytesSent *PacketBytes SentTotal *PacketCount }
PacketStats represents the statistics of MQTT Packet.
type Plugable ¶
type Plugable interface { // Load will be called in server.Run(). If return error, the server will panic. Load(service Server) error // Unload will be called when the server is shutdown, the return error is only for logging Unload() error // HookWrapper returns all hook wrappers that used by the plugin. // Return a empty wrapper if the plugin does not need any hooks HookWrapper() HookWrapper // Name return the plugin name Name() string }
Plugable is the interface need to be implemented for every plugins.
type PublishService ¶
type PublishService interface { // Publish publish a message to broker. // Calling this method will not trigger OnMsgArrived hook. Publish(message packets.Message) // PublishToClient publish a message to a specific client. // If match sets to true, the message will send to the client // only if the client is subscribed to a topic that matches the message. // If match sets to false, the message will send to the client directly even // there are no matched subscriptions. // Calling this method will not trigger OnMsgArrived hook. PublishToClient(clientID string, message packets.Message, match bool) }
PublishService provides the ability to publish messages to the broker.
type Server ¶
type Server interface { // SubscriptionStore returns the subscription.Store. SubscriptionStore() subscription.Store // RetainedStore returns the retained.Store. RetainedStore() retained.Store // PublishService returns the PublishService PublishService() PublishService // Client return the client specified by clientID. Client(clientID string) Client // GetConfig returns the config of the server GetConfig() Config // GetStatsManager returns StatsManager GetStatsManager() StatsManager }
Server interface represents a mqtt server instance.
type ServerStats ¶
type ServerStats struct { PacketStats *PacketStats ClientStats *ClientStats MessageStats *MessageStats SubscriptionStats *subscription.Stats }
ServerStats is the collection of global statistics.
type SessionStats ¶
type SessionStats struct { // InflightCurrent, the current length of the inflight queue. InflightCurrent uint64 // AwaitRelCurrent, the current length of the awaitRel queue. AwaitRelCurrent uint64 MessageStats }
SessionStats the collection of statistics of each session.
type SessionStatsManager ¶
type SessionStatsManager interface { // GetStats return the session statistics GetStats() *SessionStats // contains filtered or unexported methods }
SessionStatsManager interface provides the ability to access the statistics of the session
type SessionTerminatedReason ¶
type SessionTerminatedReason byte
const ( NormalTermination SessionTerminatedReason = iota ConflictTermination ExpiredTermination )
type StatsManager ¶
type StatsManager interface { // GetStats return the server statistics GetStats() *ServerStats // contains filtered or unexported methods }
StatsManager interface provides the ability to access the statistics of the server