service

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2024 License: Apache-2.0 Imports: 19 Imported by: 2

Documentation

Overview

Package service provides the MQTT Server and Client services in a library form. See Server and Client examples below for more detailed usage.

Index

Examples

Constants

View Source
const (
	DefaultKeepAlive        = 300
	DefaultConnectTimeout   = 2
	DefaultAckTimeout       = 20
	DefaultTimeoutRetries   = 3
	DefaultSessionsProvider = "mem"
	DefaultAuthenticator    = "mockSuccess"
	DefaultTopicsProvider   = "mem"
)

Default server configuration.

Variables

View Source
var (
	ErrInvalidConnectionType  error = errors.New("service: Invalid connection type")
	ErrInvalidSubscriber      error = errors.New("service: Invalid subscriber")
	ErrBufferNotReady         error = errors.New("service: buffer is not ready")
	ErrBufferInsufficientData error = errors.New("service: buffer has insufficient data")
)

List of errors.

Functions

This section is empty.

Types

type Client

type Client struct {
	// The number of seconds to keep the connection live if there's no data.
	// If not set then default to 5 mins.
	KeepAlive int

	// The number of seconds to wait for the CONNACK message before disconnecting.
	// If not set then default to 2 seconds.
	ConnectTimeout int

	// The number of seconds to wait for any ACK messages before failing.
	// If not set then default to 20 seconds.
	AckTimeout int

	// The number of times to retry sending a packet if ACK is not received.
	// If no set then default to 3 retries.
	TimeoutRetries int

	// Size of the in and out buffers. This affects the maximum payload size. If
	// not set, the defaultBufferSize (1024*256) is used.
	BufferSize int64
	// contains filtered or unexported fields
}

Client is a library implementation of the MQTT client that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.

Example
// Instantiates a new Client
c := &Client{}

// Creates a new MQTT CONNECT message and sets the proper parameters
msg := message.NewConnectMessage()
msg.SetWillQos(1)
msg.SetVersion(4)
msg.SetCleanSession(true)
msg.SetClientID([]byte("surgemq"))
msg.SetKeepAlive(10)
msg.SetWillTopic([]byte("will"))
msg.SetWillMessage([]byte("send me home"))
msg.SetUsername([]byte("surgemq"))
msg.SetPassword([]byte("verysecret"))

// Connects to the remote server at 127.0.0.1 port 1883
c.Connect("tcp://127.0.0.1:1883", msg)

// Creates a new SUBSCRIBE message to subscribe to topic "abc"
submsg := message.NewSubscribeMessage()
submsg.AddTopic([]byte("abc"), 0)

// Subscribes to the topic by sending the message. The first nil in the function
// call is a OnCompleteFunc that should handle the SUBACK message from the server.
// Nil means we are ignoring the SUBACK messages. The second nil should be a
// OnPublishFunc that handles any messages send to the client because of this
// subscription. Nil means we are ignoring any PUBLISH messages for this topic.
c.Subscribe(submsg, nil, nil)

// Creates a new PUBLISH message with the appropriate contents for publishing
pubmsg := message.NewPublishMessage()
pubmsg.SetTopic([]byte("abc"))
pubmsg.SetPayload(make([]byte, 1024))
pubmsg.SetQoS(0)

// Publishes to the server by sending the message
c.Publish(pubmsg, nil)

// Disconnects from the server
c.Disconnect()
Output:

func (*Client) Connect

func (cln *Client) Connect(uri string, msg *message.ConnectMessage) (err error)

Connect is for MQTT clients to open a connection to a remote server. It needs to know the URI, e.g., "tcp://127.0.0.1:1883", so it knows where to connect to. It also needs to be supplied with the MQTT CONNECT message.

func (*Client) ConnectTLS

func (cln *Client) ConnectTLS(uri string, msg *message.ConnectMessage, cfg *tls.Config) (err error)

ConnectTLS is for MQTT clients to open a TLS connection to a remote server.

func (*Client) Disconnect

func (cln *Client) Disconnect()

Disconnect sends a single DISCONNECT message to the server. The client immediately terminates after the sending of the DISCONNECT message.

func (*Client) Ping

func (cln *Client) Ping(onComplete OnCompleteFunc) error

Ping sends a single PINGREQ message to the server. PINGREQ/PINGRESP messages are mainly used by the client to keep a heartbeat to the server so the connection won't be dropped.

func (*Client) Publish

func (cln *Client) Publish(msg *message.PublishMessage, onComplete OnCompleteFunc) error

Publish sends a single MQTT PUBLISH message to the server. On completion, the supplied OnCompleteFunc is called. For QOS 0 messages, onComplete is called immediately after the message is sent to the outgoing buffer. For QOS 1 messages, onComplete is called when PUBACK is received. For QOS 2 messages, onComplete is called after the PUBCOMP message is received.

func (*Client) Subscribe

func (cln *Client) Subscribe(msg *message.SubscribeMessage, onComplete OnCompleteFunc, onPublish OnPublishFunc) error

Subscribe sends a single SUBSCRIBE message to the server. The SUBSCRIBE message can contain multiple topics that the client wants to subscribe to. On completion, which is when the client receives a SUBACK messsage back from the server, the supplied onComplete funciton is called.

When messages are sent to the client from the server that matches the topics the client subscribed to, the onPublish function is called to handle those messages. So in effect, the client can supply different onPublish functions for different topics.

func (*Client) Unsubscribe

func (cln *Client) Unsubscribe(msg *message.UnsubscribeMessage, onComplete OnCompleteFunc) error

Unsubscribe sends a single UNSUBSCRIBE message to the server. The UNSUBSCRIBE message can contain multiple topics that the client wants to unsubscribe. On completion, which is when the client receives a UNSUBACK message from the server, the supplied onComplete function is called. The client will no longer handle messages from the server for those unsubscribed topics.

type OnCompleteFunc

type OnCompleteFunc func(msg, ack message.Message, err error) error

OnCompleteFunc is called, when a publish is completed.

type OnPublishFunc

type OnPublishFunc func(msg *message.PublishMessage) error

OnPublishFunc is called, when a publish message is received.

type Server

type Server struct {
	// The number of seconds to keep the connection live if there's no data.
	// If not set then default to 5 mins.
	KeepAlive int

	// The number of seconds to wait for the CONNECT message before disconnecting.
	// If not set then default to 2 seconds.
	ConnectTimeout int

	// The number of seconds to wait for any ACK messages before failing.
	// If not set then default to 20 seconds.
	AckTimeout int

	// The number of times to retry sending a packet if ACK is not received.
	// If no set then default to 3 retries.
	TimeoutRetries int

	// Size of the in and out buffers. This affects the maximum payload size. If
	// not set, the defaultBufferSize (1024*256) is used.
	BufferSize int64

	// Authenticator is the authenticator used to check username and password sent
	// in the CONNECT message. If not set then default to "mockSuccess".
	Authenticator string

	// SessionsProvider is the session store that keeps all the Session objects.
	// This is the store to check if CleanSession is set to 0 in the CONNECT message.
	// If not set then default to "mem".
	SessionsProvider string

	// TopicsProvider is the topic store that keeps all the subscription topics.
	// If not set then default to "mem".
	TopicsProvider string
	// contains filtered or unexported fields
}

Server is a library implementation of the MQTT server that, as best it can, complies with the MQTT 3.1 and 3.1.1 specs.

Example
// Create a new server
svr := &Server{
	KeepAlive:        300,           // seconds
	ConnectTimeout:   2,             // seconds
	SessionsProvider: "mem",         // keeps sessions in memory
	Authenticator:    "mockSuccess", // always succeed
	TopicsProvider:   "mem",         // keeps topic subscriptions in memory
}

// Listen and serve connections at localhost:1883
svr.ListenAndServe("tcp://:1883")
Output:

func (*Server) Close

func (svr *Server) Close() error

Close terminates the server by shutting down all the client connections and closing the listener. It will, as best it can, clean up after itself.

func (*Server) ListenAndServe

func (svr *Server) ListenAndServe(uri string) error

ListenAndServe listents to connections on the URI requested, and handles any incoming MQTT client sessions. It should not return until Close() is called or if there's some critical error that stops the server from running. The URI supplied should be of the form "protocol://host:port" that can be parsed by url.Parse(). For example, an URI could be "tcp://0.0.0.0:1883".

func (*Server) ListenAndServeTLS

func (svr *Server) ListenAndServeTLS(uri string, cfg *tls.Config) error

ListenAndServeTLS listents to connections on the URI requested, and handles any incoming MQTT client sessions. It should not return until Close() is called or if there's some critical error that stops the server from running. The URI supplied should be of the form "protocol://host:port" that can be parsed by url.Parse(). For example, an URI could be "tcp://0.0.0.0:8883".

func (*Server) Publish

func (svr *Server) Publish(msg *message.PublishMessage) error

Publish sends a single MQTT PUBLISH message to the server. msg is modified.

func (*Server) Subscribe

func (svr *Server) Subscribe(topic string, qos byte, onPublish *OnPublishFunc) error

Subscribe registers a callback for a topic.

func (*Server) Unsubscribe

func (svr *Server) Unsubscribe(topic string, onPublish *OnPublishFunc) error

Unsubscribe deregisters a callback for a topic.

type WebsocketHandler

type WebsocketHandler struct {
	// target address, e.g. ":1883"
	Addr string
}

WebsocketHandler provides an http.Handler that forwards MQTT websocket requests to a plain TCP socket.

func (*WebsocketHandler) ServeHTTP

func (wh *WebsocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL