autopaho

package
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2024 License: EPL-2.0 Imports: 22 Imported by: 102

README

AutoPaho

AutoPaho has a number of aims:

  • Provide an easy-to-use MQTT v5 client that provides commonly requested functionality (e.g. connection, automatic reconnection, message queueing).
  • Demonstrate the use of paho.golang/paho.
  • Enable us to smoke test paho.golang/paho features (ensuring they are they usable in a real world situation)

Basic Usage

The following code demonstrates basic usage; the full code is available under examples/basics:

func main() {
	// App will run until cancelled by user (e.g. ctrl-c)
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer stop()

	// We will connect to the Eclipse test server (note that you may see messages that other users publish)
	u, err := url.Parse("mqtt://mqtt.eclipseprojects.io:1883")
	if err != nil {
		panic(err)
	}

	cliCfg := autopaho.ClientConfig{
		ServerUrls: []*url.URL{u},
		KeepAlive:  20, // Keepalive message should be sent every 20 seconds
		// CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.
		CleanStartOnInitialConnection: false,
		// SessionExpiryInterval - Seconds that a session will survive after disconnection.
		// It is important to set this because otherwise, any queued messages will be lost if the connection drops and
		// the server will not queue messages while it is down. The specific setting will depend upon your needs
		// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 0xFFFFFFFE = 136 years, 0xFFFFFFFF = don't expire)
		SessionExpiryInterval: 60,
		OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
			fmt.Println("mqtt connection up")
			// Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if
			// the connection drops)
			if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
				Subscriptions: []paho.SubscribeOptions{
					{Topic: topic, QoS: 1},
				},
			}); err != nil {
				fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
			}
			fmt.Println("mqtt subscription made")
		},
		OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
		// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
		ClientConfig: paho.ClientConfig{
			// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
			ClientID: clientID,
			// OnPublishReceived is a slice of functions that will be called when a message is received.
			// You can write the function(s) yourself or use the supplied Router
			OnPublishReceived: []func(paho.PublishReceived) (bool, error){
				func(pr paho.PublishReceived) (bool, error) {
					fmt.Printf("received message on topic %s; body: %s (retain: %t)\n", pr.Packet.Topic, pr.Packet.Payload, pr.Packet.Retain)
					return true, nil
				}},
			OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) },
			OnServerDisconnect: func(d *paho.Disconnect) {
				if d.Properties != nil {
					fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
				} else {
					fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
				}
			},
		},
	}

	c, err := autopaho.NewConnection(ctx, cliCfg) // starts process; will reconnect until context cancelled
	if err != nil {
		panic(err)
	}
	// Wait for the connection to come up
	if err = c.AwaitConnection(ctx); err != nil {
		panic(err)
	}

	ticker := time.NewTicker(time.Second)
	msgCount := 0
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			msgCount++
			// Publish a test message (use PublishViaQueue if you don't want to wait for a response)
			if _, err = c.Publish(ctx, &paho.Publish{
				QoS:     1,
				Topic:   topic,
				Payload: []byte("TestMessage: " + strconv.Itoa(msgCount)),
			}); err != nil {
				if ctx.Err() == nil {
					panic(err) // Publish will exit when context cancelled or if something went wrong
				}
			}
			continue
		case <-ctx.Done():
		}
		break
	}

	fmt.Println("signal caught - exiting")
	<-c.Done() // Wait for clean shutdown (cancelling the context triggered the shutdown)
}

See the other examples for further information on usage.

QOS 1 & 2

QOS 1 & 2 provide assurances that messages will be delivered. To implement this a session state is required that holds information on messages that have not been fully acknowledged. By default autopaho holds this state in memory meaning that messages will not be lost following a reconnection, but may be lost if the program is restarted (a file-based store can be used to avoid this).

A range of settings impact message delivery; if you want guaranteed delivery, then remember to:

  • Use a unique, client ID (you need to ensue any subsequent connections use the same ID)
  • Configure CleanStartOnInitialConnection and SessionExpiryInterval appropriately (e.g. false, 600).
  • Use file-based persistence if you wish the session to survive an application restart
  • Specify QOS 1 or 2 when publishing/subscribing.

When subscribing at QOS1/2:

  • Remember that messages will not be queued until after the initial Subscribe call.
  • If you subscribed previously (and the session is live) then expect to receive messages upon connection (you do not need to call Subscribe when reconnecting; however this is recommended in case the session was lost).

example/docker provides a demonstration of how this can work. You can confirm this yourself using two terminal windows:

Terminal1 Terminal2
docker compose up -d
docker compose logs --follow
Wait until you see the subscriber receiving messages (e.g. docker-sub-1 received message: {"Count":1})
docker compose stop sub
Wait 20 seconds
docker compose up -d
Verify that sub received all messages despite the stop/start.

Note: The logs can be easier fo follow if you comment out the log_type all in mosquitto.conf.

Queue

When publishing a message, there are a number of things that can go wrong; for example:

  • The connection to the server may drop (or not have even come up before your initial message is ready)
  • The application might be restarted (but you still want messages previously published to be delivered)
  • ConnectionManager.Publish may timeout because you are attempting to publish a lot of messages in a short space of time.

With MQTT v3.1 this was generally handled by adding messages to the session; meaning they would be retried if the connection droped and was reestablished. MQTT v5 introduces a Receive Maximum which limits the number of messages that can be in flight (and, hence, in the session).

ConnectionManager.PublishViaQueue provides a solution; messages passed to this function are added to a queue and transmitted when possible. By default, this queue is held in memory but you can use an alternate ClientConfig.Queue (e.g. queue/disk) if you wish the queue to survive an application restart.

See examples/queue.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ConnectionDownError = errors.New("connection with the MQTT server is currently down")

ConnectionDownError Down will be returned when a request is made but the connection to the server is down Note: It is possible that the connection will drop between the request being made and a response being received, in which case a different error will be received (this is only returned if the connection is down at the time the request is made).

Functions

This section is empty.

Types

type Backoff added in v0.22.0

type Backoff func(attempt int) time.Duration

Backoff function to compute backoff duration for the Nth attempt attempt starts at "0" indicating the delay BEFORE the first attempt

func DefaultExponentialBackoff added in v0.22.0

func DefaultExponentialBackoff() Backoff

DefaultExponentialBackoff returns an exponential backoff with default values.

The default values are:

  • min delay: 5 seconds
  • max delay: 10 minutes
  • initial max delay: 10 seconds
  • factor: 1.5

func NewConstantBackoff added in v0.22.0

func NewConstantBackoff(delay time.Duration) Backoff

Creates a new backoff with constant delay (for attempt > 0, otherwise the backoff is 0).

func NewExponentialBackoff added in v0.22.0

func NewExponentialBackoff(
	minDelay time.Duration,
	maxDelay time.Duration,
	initialMaxDelay time.Duration,
	factor float32,
) Backoff

NewExponentialBackoff provides a random duration within a range starting from a fixed min value up to a "moving" max value that increases exponentially for each attempt up to the specified max value.

The "moving" max is computed by multiplying the initial max value with the factor for each attemt up the specified max value.

Configuration parameters:

  • minDelay - lower bound for computed backoff
  • maxDelay - upper bound for computed backoff
  • initialMaxDelay - initial max value which wiil incerease exponentially up to the max delay
  • factor - factor for the exponential increase of initial max delay

type ClientConfig

type ClientConfig struct {
	ServerUrls                    []*url.URL  // URL(s) for the MQTT server (schemes supported include 'mqtt' and 'tls')
	TlsCfg                        *tls.Config // Configuration used when connecting using TLS
	KeepAlive                     uint16      // Keepalive period in seconds (the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one MQTT Control Packet and the point it starts sending the next)
	CleanStartOnInitialConnection bool        //  Clean Start flag, if true, existing session information will be cleared on the first connection (it will be false for subsequent connections)
	SessionExpiryInterval         uint32      // Session Expiry Interval in seconds (if 0 the Session ends when the Network Connection is closed)

	// Deprecated: ConnectRetryDelay is deprecated and its functionality is replaced by ReconnectBackoff.
	ConnectRetryDelay time.Duration           // How long to wait between connection attempts (defaults to 10s)
	ReconnectBackoff  func(int) time.Duration // How long to wait after failed connection attempt N (defaults to 10s)
	ConnectTimeout    time.Duration           // How long to wait for the connection process to complete (defaults to 10s)
	WebSocketCfg      *WebSocketConfig        // Enables customisation of the websocket connection

	Queue queue.Queue // Used to queue up publish messages (if nil an error will be returned if publish could not be transmitted)

	// Depreciated: Use ServerUrls instead (this will be used if ServerUrls is empty). Will be removed in a future release.
	BrokerUrls []*url.URL

	// AttemptConnection, if provided, will be called to establish a network connection.
	// The returned `conn` must support thread safe writing; most wrapped net.Conn implementations like tls.Conn
	// are not thread safe for writing.
	// To fix, use packets.NewThreadSafeConn wrapper or extend the custom net.Conn struct with sync.Locker.
	AttemptConnection func(context.Context, ClientConfig, *url.URL) (net.Conn, error)

	OnConnectionUp func(*ConnectionManager, *paho.Connack) // Called (within a goroutine) when a connection is made (including reconnection). Connection Manager passed to simplify subscriptions.
	OnConnectError func(error)                             // Called (within a goroutine) whenever a connection attempt fails. Will wrap autopaho.ConnackError on server deny.

	Debug      log.Logger // By default set to NOOPLogger{},set to a logger for debugging info
	Errors     log.Logger // By default set to NOOPLogger{},set to a logger for errors
	PahoDebug  log.Logger // debugger passed to the paho package (will default to NOOPLogger{})
	PahoErrors log.Logger // error logger passed to the paho package (will default to NOOPLogger{})

	ConnectUsername string
	ConnectPassword []byte

	WillMessage    *paho.WillMessage
	WillProperties *paho.WillProperties

	ConnectPacketBuilder func(*paho.Connect, *url.URL) (*paho.Connect, error) // called prior to connection allowing customisation of the CONNECT packet

	// DisconnectPacketBuilder - called prior to disconnection allowing customisation of the DISCONNECT
	// packet. If the function returns nil, then no DISCONNECT packet will be passed; if nil a default packet is sent.
	DisconnectPacketBuilder func() *paho.Disconnect

	// We include the full paho.ClientConfig in order to simplify moving between the two packages.
	// Note that Conn will be ignored.
	paho.ClientConfig
}

ClientConfig adds a few values, required to manage the connection, to the standard paho.ClientConfig (note that conn will be ignored)

func (*ClientConfig) ResetUsernamePassword

func (cfg *ClientConfig) ResetUsernamePassword()

ResetUsernamePassword clears any configured username and password on the client configuration

Set ConnectUsername and ConnectPassword directly instead.

func (*ClientConfig) SetConnectPacketConfigurator deprecated

func (cfg *ClientConfig) SetConnectPacketConfigurator(fn func(*paho.Connect) (*paho.Connect, error)) bool

SetConnectPacketConfigurator assigns a callback for modification of the Connect packet, called before the connection is opened, allowing the application to adjust its configuration before establishing a connection. This function should be treated as asynchronous, and expected to have no side effects.

Deprecated: Set ConnectPacketBuilder directly instead. This function exists for backwards compatibility only (and may be removed in the future).

func (*ClientConfig) SetDisConnectPacketConfigurator deprecated added in v0.20.0

func (cfg *ClientConfig) SetDisConnectPacketConfigurator(fn func() *paho.Disconnect)

SetDisConnectPacketConfigurator assigns a callback for the provision of a DISCONNECT packet. By default, a DISCONNECT is sent to the server when Disconnect is called; setting a callback allows a custom packet to be provided, or no packet (by returning nil).

Deprecated: Set DisconnectPacketBuilder directly instead. This function exists for backwards compatibility only (and may be removed in the future).

func (*ClientConfig) SetUsernamePassword deprecated

func (cfg *ClientConfig) SetUsernamePassword(username string, password []byte)

SetUsernamePassword configures username and password properties for the Connect packets These values are staged in the ClientConfig, and preparation of the Connect packet is deferred.

Deprecated: Set ConnectUsername and ConnectPassword directly instead.

func (*ClientConfig) SetWillMessage deprecated

func (cfg *ClientConfig) SetWillMessage(topic string, payload []byte, qos byte, retain bool)

SetWillMessage configures the Will topic, payload, QOS and Retain facets of the client connection These values are staged in the ClientConfig, for later preparation of the Connect packet.

Deprecated: Set WillMessage and WillProperties directly instead.

type ConnackError added in v0.12.0

type ConnackError struct {
	ReasonCode byte   // CONNACK reason code
	Reason     string // CONNACK Reason string from properties
	Err        error  // underlying error
}

ConnackError will be passed when the server denies connection in CONNACK packet

func NewConnackError added in v0.12.0

func NewConnackError(err error, connack *paho.Connack) *ConnackError

NewConnackError returns a new ConnackError

func (*ConnackError) Error added in v0.12.0

func (c *ConnackError) Error() string

func (*ConnackError) Unwrap added in v0.12.0

func (c *ConnackError) Unwrap() error

type ConnectionManager

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager manages the connection with the server and provides the ability to publish messages

func NewConnection

func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, error)

NewConnection creates a connection manager and begins the connection process (will retry until the context is cancelled)

func (*ConnectionManager) AddOnPublishReceived added in v0.20.0

func (c *ConnectionManager) AddOnPublishReceived(f func(PublishReceived) (bool, error)) func()

AddOnPublishReceived adds a function that will be called when a PUBLISH is received The new function will be called after any functions already in the list Returns a function that can be called to remove the callback

func (*ConnectionManager) Authenticate added in v0.20.0

func (c *ConnectionManager) Authenticate(ctx context.Context, a *paho.Auth) (*paho.AuthResponse, error)

Authenticate is used to initiate a reauthentication of credentials with the server. This function sends the initial Auth packet to start the reauthentication then relies on the client AuthHandler managing any further requests from the server until either a successful Auth packet is passed back, or a Disconnect is received.

func (*ConnectionManager) AwaitConnection

func (c *ConnectionManager) AwaitConnection(ctx context.Context) error

AwaitConnection will return when the connection comes up or the context is cancelled (only returns an error if context is cancelled). If you require more complex connection management then consider using the OnConnectionUp callback.

func (*ConnectionManager) Disconnect

func (c *ConnectionManager) Disconnect(ctx context.Context) error

Disconnect closes the connection (if one is up) and shuts down any active processes before returning

func (*ConnectionManager) Done

func (c *ConnectionManager) Done() <-chan struct{}

Done returns a channel that will be closed when the connection handler has shutdown cleanly Note: We cannot currently tell when the mqtt has fully shutdown (so it may still be in the process of closing down)

func (*ConnectionManager) Publish

Publish is used to send a publication to the MQTT server. It is passed a pre-prepared `PUBLISH` packet and blocks waiting for the appropriate response, or for the timeout to fire. Any response message is returned from the function, along with any errors.

func (*ConnectionManager) PublishViaQueue added in v0.20.0

func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish) error

PublishViaQueue is used to send a publication to the MQTT server via a queue (by default memory based). An error will be returned if the message could not be added to the queue, otherwise the message will be delivered in the background with no status updates available. Use this function when you wish to rely upon the libraries best-effort to transmit the message; it is anticipated that this will generally be in situations where the network link or power supply is unreliable. Messages will be written to a queue (configuring a disk-based queue is recommended) and transmitted where possible. To maximise the chance of a successful delivery:

  • Leave CleanStartOnInitialConnection set to false
  • Set SessionExpiryInterval such that sessions will outlive anticipated outages (this impacts inflight messages only)
  • Set ClientConfig.Session to a session manager with persistent storage
  • Set ClientConfig.Queue to a queue with persistent storage

func (*ConnectionManager) Subscribe

func (c *ConnectionManager) Subscribe(ctx context.Context, s *paho.Subscribe) (*paho.Suback, error)

Subscribe is used to send a Subscription request to the MQTT server. It is passed a pre-prepared Subscribe packet and blocks waiting for a response Suback, or for the timeout to fire. Any response Suback is returned from the function, along with any errors.

func (*ConnectionManager) TerminateConnectionForTest added in v0.20.0

func (c *ConnectionManager) TerminateConnectionForTest()

TerminateConnectionForTest closes the active connection (if any). This function is intended for testing only, it simulates connection loss which supports testing QOS1 and 2 message delivery.

func (*ConnectionManager) Unsubscribe

func (c *ConnectionManager) Unsubscribe(ctx context.Context, u *paho.Unsubscribe) (*paho.Unsuback, error)

Unsubscribe is used to send an Unsubscribe request to the MQTT server. It is passed a pre-prepared Unsubscribe packet and blocks waiting for a response Unsuback, or for the timeout to fire. Any response Unsuback is returned from the function, along with any errors.

type DisconnectError

type DisconnectError struct {
	// contains filtered or unexported fields
}

DisconnectError will be passed when the server requests disconnection (allows this error type to be detected)

func (*DisconnectError) Error

func (d *DisconnectError) Error() string

type PublishReceived added in v0.20.0

type PublishReceived struct {
	paho.PublishReceived
	ConnectionManager *ConnectionManager
}

type QueuePublish added in v0.20.0

type QueuePublish struct {
	*paho.Publish
}

QueuePublish holds info required to publish a message. A separate struct is used so options can be added in the future without breaking existing code

type WebSocketConfig

type WebSocketConfig struct {
	Dialer func(url *url.URL, tlsCfg *tls.Config) *websocket.Dialer // If non-nil this will be called before each websocket connection (allows full configuration of the dialer used)
	Header func(url *url.URL, tlsCfg *tls.Config) http.Header       // If non-nil this will be called before each connection attempt to get headers to include with request
}

WebSocketConfig enables customisation of the websocket connection

Directories

Path Synopsis
examples
rpc
extensions
rpc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL