Documentation ¶
Overview ¶
Package service provides the MQTT Server and Client services in a library form. See Server and Client examples below for more detailed usage.
Index ¶
- Constants
- Variables
- type Client
- func (cln *Client) Connect(uri string, msg *message.ConnectMessage) (err error)
- func (cln *Client) ConnectTLS(uri string, msg *message.ConnectMessage, cfg *tls.Config) (err error)
- func (cln *Client) Disconnect()
- func (cln *Client) Ping(onComplete OnCompleteFunc) error
- func (cln *Client) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error
- func (cln *Client) Subscribe(msg *message.SubscribeMessage, onComplete OnCompleteFunc, ...) error
- func (cln *Client) Unsubscribe(msg *message.UnsubscribeMessage, onComplete OnCompleteFunc) error
- type OnCompleteFunc
- type OnPublishFunc
- type Server
- func (svr *Server) Close() error
- func (svr *Server) ListenAndServe(uri string) error
- func (svr *Server) ListenAndServeTLS(uri string, cfg *tls.Config) error
- func (svr *Server) Publish(msg *message.PublishMessage) error
- func (svr *Server) Subscribe(topic string, qos byte, onPublish *OnPublishFunc) error
- func (svr *Server) Unsubscribe(topic string, onPublish *OnPublishFunc) error
- type WebsocketHandler
Examples ¶
Constants ¶
const ( DefaultKeepAlive = 300 DefaultConnectTimeout = 2 DefaultAckTimeout = 20 DefaultTimeoutRetries = 3 DefaultSessionsProvider = "mem" DefaultAuthenticator = "mockSuccess" DefaultTopicsProvider = "mem" )
Default server configuration.
Variables ¶
var ( ErrInvalidConnectionType error = errors.New("service: Invalid connection type") ErrInvalidSubscriber error = errors.New("service: Invalid subscriber") ErrBufferNotReady error = errors.New("service: buffer is not ready") ErrBufferInsufficientData error = errors.New("service: buffer has insufficient data") )
List of errors.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { // The number of seconds to keep the connection live if there's no data. // If not set then default to 5 mins. KeepAlive int // The number of seconds to wait for the CONNACK message before disconnecting. // If not set then default to 2 seconds. ConnectTimeout int // The number of seconds to wait for any ACK messages before failing. // If not set then default to 20 seconds. AckTimeout int // The number of times to retry sending a packet if ACK is not received. // If no set then default to 3 retries. TimeoutRetries int // Size of the in and out buffers. This affects the maximum payload size. If // not set, the defaultBufferSize (1024*256) is used. BufferSize int64 // contains filtered or unexported fields }
Client is a library implementation of the MQTT client that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.
Example ¶
// Instantiates a new Client c := &Client{} // Creates a new MQTT CONNECT message and sets the proper parameters msg := message.NewConnectMessage() msg.SetWillQos(1) msg.SetVersion(4) msg.SetCleanSession(true) msg.SetClientID([]byte("surgemq")) msg.SetKeepAlive(10) msg.SetWillTopic([]byte("will")) msg.SetWillMessage([]byte("send me home")) msg.SetUsername([]byte("surgemq")) msg.SetPassword([]byte("verysecret")) // Connects to the remote server at 127.0.0.1 port 1883 c.Connect("tcp://127.0.0.1:1883", msg) // Creates a new SUBSCRIBE message to subscribe to topic "abc" submsg := message.NewSubscribeMessage() submsg.AddTopic([]byte("abc"), 0) // Subscribes to the topic by sending the message. The first nil in the function // call is a OnCompleteFunc that should handle the SUBACK message from the server. // Nil means we are ignoring the SUBACK messages. The second nil should be a // OnPublishFunc that handles any messages send to the client because of this // subscription. Nil means we are ignoring any PUBLISH messages for this topic. c.Subscribe(submsg, nil, nil) // Creates a new PUBLISH message with the appropriate contents for publishing pubmsg := message.NewPublishMessage() pubmsg.SetTopic([]byte("abc")) pubmsg.SetPayload(make([]byte, 1024)) pubmsg.SetQoS(0) // Publishes to the server by sending the message c.Publish(pubmsg, nil) // Disconnects from the server c.Disconnect()
Output:
func (*Client) Connect ¶
func (cln *Client) Connect(uri string, msg *message.ConnectMessage) (err error)
Connect is for MQTT clients to open a connection to a remote server. It needs to know the URI, e.g., "tcp://127.0.0.1:1883", so it knows where to connect to. It also needs to be supplied with the MQTT CONNECT message.
func (*Client) ConnectTLS ¶
ConnectTLS is for MQTT clients to open a TLS connection to a remote server.
func (*Client) Disconnect ¶
func (cln *Client) Disconnect()
Disconnect sends a single DISCONNECT message to the server. The client immediately terminates after the sending of the DISCONNECT message.
func (*Client) Ping ¶
func (cln *Client) Ping(onComplete OnCompleteFunc) error
Ping sends a single PINGREQ message to the server. PINGREQ/PINGRESP messages are mainly used by the client to keep a heartbeat to the server so the connection won't be dropped.
func (*Client) Publish ¶
func (cln *Client) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error
Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.
func (*Client) Subscribe ¶
func (cln *Client) Subscribe(msg *message.SubscribeMessage, onComplete OnCompleteFunc, onPublish OnPublishFunc) error
Subscribe sends a single SUBSCRIBE message to the server. The SUBSCRIBE message can contain multiple topics that the client wants to subscribe to. On completion, which is when the client receives a SUBACK messsage back from the server, the supplied onComplete funciton is called.
When messages are sent to the client from the server that matches the topics the client subscribed to, the onPublish function is called to handle those messages. So in effect, the client can supply different onPublish functions for different topics.
func (*Client) Unsubscribe ¶
func (cln *Client) Unsubscribe(msg *message.UnsubscribeMessage, onComplete OnCompleteFunc) error
Unsubscribe sends a single UNSUBSCRIBE message to the server. The UNSUBSCRIBE message can contain multiple topics that the client wants to unsubscribe. On completion, which is when the client receives a UNSUBACK message from the server, the supplied onComplete function is called. The client will no longer handle messages from the server for those unsubscribed topics.
type OnCompleteFunc ¶
OnCompleteFunc is called, when a publish is completed.
type OnPublishFunc ¶
type OnPublishFunc func(msg *message.PublishMessage) error
OnPublishFunc is called, when a publish message is received.
type Server ¶
type Server struct { // The number of seconds to keep the connection live if there's no data. // If not set then default to 5 mins. KeepAlive int // The number of seconds to wait for the CONNECT message before disconnecting. // If not set then default to 2 seconds. ConnectTimeout int // The number of seconds to wait for any ACK messages before failing. // If not set then default to 20 seconds. AckTimeout int // The number of times to retry sending a packet if ACK is not received. // If no set then default to 3 retries. TimeoutRetries int // Size of the in and out buffers. This affects the maximum payload size. If // not set, the defaultBufferSize (1024*256) is used. BufferSize int64 // Authenticator is the authenticator used to check username and password sent // in the CONNECT message. If not set then default to "mockSuccess". Authenticator string // SessionsProvider is the session store that keeps all the Session objects. // This is the store to check if CleanSession is set to 0 in the CONNECT message. // If not set then default to "mem". SessionsProvider string // TopicsProvider is the topic store that keeps all the subscription topics. // If not set then default to "mem". TopicsProvider string // contains filtered or unexported fields }
Server is a library implementation of the MQTT server that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.
Example ¶
// Create a new server svr := &Server{ KeepAlive: 300, // seconds ConnectTimeout: 2, // seconds SessionsProvider: "mem", // keeps sessions in memory Authenticator: "mockSuccess", // always succeed TopicsProvider: "mem", // keeps topic subscriptions in memory } // Listen and serve connections at localhost:1883 svr.ListenAndServe("tcp://:1883")
Output:
func (*Server) Close ¶
Close terminates the server by shutting down all the client connections and closing the listener. It will, as best it can, clean up after itself.
func (*Server) ListenAndServe ¶
ListenAndServe listents to connections on the URI requested, and handles any incoming MQTT client sessions. It should not return until Close() is called or if there's some critical error that stops the server from running. The URI supplied should be of the form "protocol://host:port" that can be parsed by url.Parse(). For example, an URI could be "tcp://0.0.0.0:1883".
func (*Server) ListenAndServeTLS ¶
ListenAndServeTLS listents to connections on the URI requested, and handles any incoming MQTT client sessions. It should not return until Close() is called or if there's some critical error that stops the server from running. The URI supplied should be of the form "protocol://host:port" that can be parsed by url.Parse(). For example, an URI could be "tcp://0.0.0.0:8883".
func (*Server) Publish ¶
func (svr *Server) Publish(msg *message.PublishMessage) error
Publish sends a single MQTT PUBLISH message to the server. msg is modified.
func (*Server) Subscribe ¶
func (svr *Server) Subscribe(topic string, qos byte, onPublish *OnPublishFunc) error
Subscribe registers a callback for a topic.
func (*Server) Unsubscribe ¶
func (svr *Server) Unsubscribe(topic string, onPublish *OnPublishFunc) error
Unsubscribe deregisters a callback for a topic.
type WebsocketHandler ¶
type WebsocketHandler struct { // target address, e.g. ":1883" Addr string }
WebsocketHandler provides an http.Handler that forwards MQTT websocket requests to a plain TCP socket.
func (*WebsocketHandler) ServeHTTP ¶
func (wh *WebsocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler.