README
¶
AutoPaho
AutoPaho has a number of aims:
- Provide an easy-to-use MQTT v5 client that provides commonly requested functionality (e.g. connection, automatic reconnection, message queueing).
- Demonstrate the use of
paho.golang/paho
. - Enable us to smoke test
paho.golang/paho
features (ensuring they are they usable in a real world situation)
Basic Usage
The following code demonstrates basic usage; the full code is available under examples/basics
:
func main() {
// App will run until cancelled by user (e.g. ctrl-c)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// We will connect to the Eclipse test server (note that you may see messages that other users publish)
u, err := url.Parse("mqtt://mqtt.eclipseprojects.io:1883")
if err != nil {
panic(err)
}
cliCfg := autopaho.ClientConfig{
ServerUrls: []*url.URL{u},
KeepAlive: 20, // Keepalive message should be sent every 20 seconds
// CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.
CleanStartOnInitialConnection: false,
// SessionExpiryInterval - Seconds that a session will survive after disconnection.
// It is important to set this because otherwise, any queued messages will be lost if the connection drops and
// the server will not queue messages while it is down. The specific setting will depend upon your needs
// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 0xFFFFFFFE = 136 years, 0xFFFFFFFF = don't expire)
SessionExpiryInterval: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
// Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if
// the connection drops)
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: topic, QoS: 1},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
ClientConfig: paho.ClientConfig{
// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
ClientID: clientID,
// OnPublishReceived is a slice of functions that will be called when a message is received.
// You can write the function(s) yourself or use the supplied Router
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
fmt.Printf("received message on topic %s; body: %s (retain: %t)\n", pr.Packet.Topic, pr.Packet.Payload, pr.Packet.Retain)
return true, nil
}},
OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
} else {
fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
}
},
},
}
c, err := autopaho.NewConnection(ctx, cliCfg) // starts process; will reconnect until context cancelled
if err != nil {
panic(err)
}
// Wait for the connection to come up
if err = c.AwaitConnection(ctx); err != nil {
panic(err)
}
ticker := time.NewTicker(time.Second)
msgCount := 0
defer ticker.Stop()
for {
select {
case <-ticker.C:
msgCount++
// Publish a test message (use PublishViaQueue if you don't want to wait for a response)
if _, err = c.Publish(ctx, &paho.Publish{
QoS: 1,
Topic: topic,
Payload: []byte("TestMessage: " + strconv.Itoa(msgCount)),
}); err != nil {
if ctx.Err() == nil {
panic(err) // Publish will exit when context cancelled or if something went wrong
}
}
continue
case <-ctx.Done():
}
break
}
fmt.Println("signal caught - exiting")
<-c.Done() // Wait for clean shutdown (cancelling the context triggered the shutdown)
}
See the other examples for further information on usage.
QOS 1 & 2
QOS 1 & 2 provide assurances that messages will be delivered. To implement this a session state
is required that holds information on messages that have not been fully acknowledged. By default autopaho
holds this
state in memory meaning that messages will not be lost following a reconnection, but may be lost if the program is
restarted (a file-based store can be used to avoid this).
A range of settings impact message delivery; if you want guaranteed delivery, then remember to:
- Use a unique, client ID (you need to ensue any subsequent connections use the same ID)
- Configure
CleanStartOnInitialConnection
andSessionExpiryInterval
appropriately (e.g.false
,600
). - Use file-based persistence if you wish the session to survive an application restart
- Specify QOS 1 or 2 when publishing/subscribing.
When subscribing at QOS1/2:
- Remember that messages will not be queued until after the initial
Subscribe
call. - If you subscribed previously (and the session is live) then expect to receive messages upon connection (you do not need
to call
Subscribe
when reconnecting; however this is recommended in case the session was lost).
example/docker
provides a demonstration of how this can work. You can confirm this yourself using two terminal windows:
Terminal1 | Terminal2 |
---|---|
docker compose up -d |
|
docker compose logs --follow |
|
Wait until you see the subscriber receiving messages (e.g. docker-sub-1 received message: {"Count":1} ) |
|
docker compose stop sub |
|
Wait 20 seconds | |
docker compose up -d |
|
Verify that sub received all messages despite the stop/start. |
Note: The logs can be easier fo follow if you comment out the log_type all
in mosquitto.conf
.
Queue
When publishing a message, there are a number of things that can go wrong; for example:
- The connection to the server may drop (or not have even come up before your initial message is ready)
- The application might be restarted (but you still want messages previously published to be delivered)
ConnectionManager.Publish
may timeout because you are attempting to publish a lot of messages in a short space of time.
With MQTT v3.1 this was generally handled by adding messages to the session; meaning they would be retried if the connection droped and was reestablished. MQTT v5 introduces a Receive Maximum which limits the number of messages that can be in flight (and, hence, in the session).
ConnectionManager.PublishViaQueue
provides a solution; messages passed to this function are added to a queue and
transmitted when possible. By default, this queue is held in memory but you can use an alternate ClientConfig.Queue
(e.g. queue/disk
) if you wish the queue to survive an application restart.
See examples/queue
.
Documentation
¶
Index ¶
- Variables
- type ClientConfig
- func (cfg *ClientConfig) ResetUsernamePassword()
- func (cfg *ClientConfig) SetConnectPacketConfigurator(fn func(*paho.Connect) *paho.Connect) booldeprecated
- func (cfg *ClientConfig) SetDisConnectPacketConfigurator(fn func() *paho.Disconnect)deprecated
- func (cfg *ClientConfig) SetUsernamePassword(username string, password []byte)deprecated
- func (cfg *ClientConfig) SetWillMessage(topic string, payload []byte, qos byte, retain bool)deprecated
- type ConnackError
- type ConnectionManager
- func (c *ConnectionManager) AddOnPublishReceived(f func(PublishReceived) (bool, error)) func()
- func (c *ConnectionManager) Authenticate(ctx context.Context, a *paho.Auth) (*paho.AuthResponse, error)
- func (c *ConnectionManager) AwaitConnection(ctx context.Context) error
- func (c *ConnectionManager) Disconnect(ctx context.Context) error
- func (c *ConnectionManager) Done() <-chan struct{}
- func (c *ConnectionManager) Publish(ctx context.Context, p *paho.Publish) (*paho.PublishResponse, error)
- func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish) error
- func (c *ConnectionManager) Subscribe(ctx context.Context, s *paho.Subscribe) (*paho.Suback, error)
- func (c *ConnectionManager) TerminateConnectionForTest()
- func (c *ConnectionManager) Unsubscribe(ctx context.Context, u *paho.Unsubscribe) (*paho.Unsuback, error)
- type DisconnectError
- type PublishReceived
- type QueuePublish
- type WebSocketConfig
Constants ¶
This section is empty.
Variables ¶
var ConnectionDownError = errors.New("connection with the MQTT server is currently down")
ConnectionDownError Down will be returned when a request is made but the connection to the server is down Note: It is possible that the connection will drop between the request being made and a response being received, in which case a different error will be received (this is only returned if the connection is down at the time the request is made).
Functions ¶
This section is empty.
Types ¶
type ClientConfig ¶
type ClientConfig struct { ServerUrls []*url.URL // URL(s) for the MQTT server (schemes supported include 'mqtt' and 'tls') TlsCfg *tls.Config // Configuration used when connecting using TLS KeepAlive uint16 // Keepalive period in seconds (the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one MQTT Control Packet and the point it starts sending the next) CleanStartOnInitialConnection bool // Clean Start flag, if true, existing session information will be cleared on the first connection (it will be false for subsequent connections) SessionExpiryInterval uint32 // Session Expiry Interval in seconds (if 0 the Session ends when the Network Connection is closed) ConnectRetryDelay time.Duration // How long to wait between connection attempts (defaults to 10s) ConnectTimeout time.Duration // How long to wait for the connection process to complete (defaults to 10s) WebSocketCfg *WebSocketConfig // Enables customisation of the websocket connection Queue queue.Queue // Used to queue up publish messages (if nil an error will be returned if publish could not be transmitted) // Depreciated: Use ServerUrls instead (this will be used if ServerUrls is empty). Will be removed in a future release. BrokerUrls []*url.URL // AttemptConnection, if provided, will be called to establish a network connection. // The returned `conn` must support thread safe writing; most wrapped net.Conn implementations like tls.Conn // are not thread safe for writing. // To fix, use packets.NewThreadSafeConn wrapper or extend the custom net.Conn struct with sync.Locker. AttemptConnection func(context.Context, ClientConfig, *url.URL) (net.Conn, error) OnConnectionUp func(*ConnectionManager, *paho.Connack) // Called (within a goroutine) when a connection is made (including reconnection). Connection Manager passed to simplify subscriptions. OnConnectError func(error) // Called (within a goroutine) whenever a connection attempt fails. Will wrap autopaho.ConnackError on server deny. Debug log.Logger // By default set to NOOPLogger{},set to a logger for debugging info Errors log.Logger // By default set to NOOPLogger{},set to a logger for errors PahoDebug log.Logger // debugger passed to the paho package (will default to NOOPLogger{}) PahoErrors log.Logger // error logger passed to the paho package (will default to NOOPLogger{}) ConnectUsername string ConnectPassword []byte WillMessage *paho.WillMessage WillProperties *paho.WillProperties ConnectPacketBuilder func(*paho.Connect, *url.URL) *paho.Connect // called prior to connection allowing customisation of the CONNECT packet // DisconnectPacketBuilder - called prior to disconnection allowing customisation of the DISCONNECT // packet. If the function returns nil, then no DISCONNECT packet will be passed; if nil a default packet is sent. DisconnectPacketBuilder func() *paho.Disconnect // We include the full paho.ClientConfig in order to simplify moving between the two packages. // Note that Conn will be ignored. paho.ClientConfig }
ClientConfig adds a few values, required to manage the connection, to the standard paho.ClientConfig (note that conn will be ignored)
func (*ClientConfig) ResetUsernamePassword ¶
func (cfg *ClientConfig) ResetUsernamePassword()
ResetUsernamePassword clears any configured username and password on the client configuration
Set ConnectUsername and ConnectPassword directly instead.
func (*ClientConfig) SetConnectPacketConfigurator
deprecated
SetConnectPacketConfigurator assigns a callback for modification of the Connect packet, called before the connection is opened, allowing the application to adjust its configuration before establishing a connection. This function should be treated as asynchronous, and expected to have no side effects.
Deprecated: Set ConnectPacketBuilder directly instead. This function exists for backwards compatibility only (and may be removed in the future).
func (*ClientConfig) SetDisConnectPacketConfigurator
deprecated
added in
v0.20.0
func (cfg *ClientConfig) SetDisConnectPacketConfigurator(fn func() *paho.Disconnect)
SetDisConnectPacketConfigurator assigns a callback for the provision of a DISCONNECT packet. By default, a DISCONNECT is sent to the server when Disconnect is called; setting a callback allows a custom packet to be provided, or no packet (by returning nil).
Deprecated: Set DisconnectPacketBuilder directly instead. This function exists for backwards compatibility only (and may be removed in the future).
func (*ClientConfig) SetUsernamePassword
deprecated
func (cfg *ClientConfig) SetUsernamePassword(username string, password []byte)
SetUsernamePassword configures username and password properties for the Connect packets These values are staged in the ClientConfig, and preparation of the Connect packet is deferred.
Deprecated: Set ConnectUsername and ConnectPassword directly instead.
func (*ClientConfig) SetWillMessage
deprecated
func (cfg *ClientConfig) SetWillMessage(topic string, payload []byte, qos byte, retain bool)
SetWillMessage configures the Will topic, payload, QOS and Retain facets of the client connection These values are staged in the ClientConfig, for later preparation of the Connect packet.
Deprecated: Set WillMessage and WillProperties directly instead.
type ConnackError ¶ added in v0.12.0
type ConnackError struct { ReasonCode byte // CONNACK reason code Reason string // CONNACK Reason string from properties Err error // underlying error }
ConnackError will be passed when the server denies connection in CONNACK packet
func NewConnackError ¶ added in v0.12.0
func NewConnackError(err error, connack *paho.Connack) *ConnackError
NewConnackError returns a new ConnackError
func (*ConnackError) Error ¶ added in v0.12.0
func (c *ConnackError) Error() string
func (*ConnackError) Unwrap ¶ added in v0.12.0
func (c *ConnackError) Unwrap() error
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
ConnectionManager manages the connection with the server and provides the ability to publish messages
func NewConnection ¶
func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, error)
NewConnection creates a connection manager and begins the connection process (will retry until the context is cancelled)
func (*ConnectionManager) AddOnPublishReceived ¶ added in v0.20.0
func (c *ConnectionManager) AddOnPublishReceived(f func(PublishReceived) (bool, error)) func()
AddOnPublishReceived adds a function that will be called when a PUBLISH is received The new function will be called after any functions already in the list Returns a function that can be called to remove the callback
func (*ConnectionManager) Authenticate ¶ added in v0.20.0
func (c *ConnectionManager) Authenticate(ctx context.Context, a *paho.Auth) (*paho.AuthResponse, error)
Authenticate is used to initiate a reauthentication of credentials with the server. This function sends the initial Auth packet to start the reauthentication then relies on the client AuthHandler managing any further requests from the server until either a successful Auth packet is passed back, or a Disconnect is received.
func (*ConnectionManager) AwaitConnection ¶
func (c *ConnectionManager) AwaitConnection(ctx context.Context) error
AwaitConnection will return when the connection comes up or the context is cancelled (only returns an error if context is cancelled). If you require more complex connection management then consider using the OnConnectionUp callback.
func (*ConnectionManager) Disconnect ¶
func (c *ConnectionManager) Disconnect(ctx context.Context) error
Disconnect closes the connection (if one is up) and shuts down any active processes before returning
func (*ConnectionManager) Done ¶
func (c *ConnectionManager) Done() <-chan struct{}
Done returns a channel that will be closed when the connection handler has shutdown cleanly Note: We cannot currently tell when the mqtt has fully shutdown (so it may still be in the process of closing down)
func (*ConnectionManager) Publish ¶
func (c *ConnectionManager) Publish(ctx context.Context, p *paho.Publish) (*paho.PublishResponse, error)
Publish is used to send a publication to the MQTT server. It is passed a pre-prepared `PUBLISH` packet and blocks waiting for the appropriate response, or for the timeout to fire. Any response message is returned from the function, along with any errors.
func (*ConnectionManager) PublishViaQueue ¶ added in v0.20.0
func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish) error
PublishViaQueue is used to send a publication to the MQTT server via a queue (by default memory based). An error will be returned if the message could not be added to the queue, otherwise the message will be delivered in the background with no status updates available. Use this function when you wish to rely upon the libraries best-effort to transmit the message; it is anticipated that this will generally be in situations where the network link or power supply is unreliable. Messages will be written to a queue (configuring a disk-based queue is recommended) and transmitted where possible. To maximise the chance of a successful delivery:
- Leave CleanStartOnInitialConnection set to false
- Set SessionExpiryInterval such that sessions will outlive anticipated outages (this impacts inflight messages only)
- Set ClientConfig.Session to a session manager with persistent storage
- Set ClientConfig.Queue to a queue with persistent storage
func (*ConnectionManager) Subscribe ¶
Subscribe is used to send a Subscription request to the MQTT server. It is passed a pre-prepared Subscribe packet and blocks waiting for a response Suback, or for the timeout to fire. Any response Suback is returned from the function, along with any errors.
func (*ConnectionManager) TerminateConnectionForTest ¶ added in v0.20.0
func (c *ConnectionManager) TerminateConnectionForTest()
TerminateConnectionForTest closes the active connection (if any). This function is intended for testing only, it simulates connection loss which supports testing QOS1 and 2 message delivery.
func (*ConnectionManager) Unsubscribe ¶
func (c *ConnectionManager) Unsubscribe(ctx context.Context, u *paho.Unsubscribe) (*paho.Unsuback, error)
Unsubscribe is used to send an Unsubscribe request to the MQTT server. It is passed a pre-prepared Unsubscribe packet and blocks waiting for a response Unsuback, or for the timeout to fire. Any response Unsuback is returned from the function, along with any errors.
type DisconnectError ¶
type DisconnectError struct {
// contains filtered or unexported fields
}
DisconnectError will be passed when the server requests disconnection (allows this error type to be detected)
func (*DisconnectError) Error ¶
func (d *DisconnectError) Error() string
type PublishReceived ¶ added in v0.20.0
type PublishReceived struct { paho.PublishReceived ConnectionManager *ConnectionManager }
type QueuePublish ¶ added in v0.20.0
QueuePublish holds info required to publish a message. A separate struct is used so options can be added in the future without breaking existing code
type WebSocketConfig ¶
type WebSocketConfig struct { Dialer func(url *url.URL, tlsCfg *tls.Config) *websocket.Dialer // If non-nil this will be called before each websocket connection (allows full configuration of the dialer used) Header func(url *url.URL, tlsCfg *tls.Config) http.Header // If non-nil this will be called before each connection attempt to get headers to include with request }
WebSocketConfig enables customisation of the websocket connection