mqtt

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2024 License: CC0-1.0 Imports: 17 Imported by: 1

README

MQTT🤖

About

MQTT is a protocol for message queueing over a network. This project provides a client library for the Go programming language. Message-delivery guarantees are maintained at all costs, even when in error. The client recovers from failure with automatic reconnects. Message transfers in both directions do zero-copy.

The development was kindly sponsored by Northvolt as a gift to the open-source community. Many 🤖 navigated over unreliable connections since then.

This is free and unencumbered software released into the public domain.

Go Reference Build Status

Usage

Client instantiation validates the configuration only. Network management itself operates from the read routine.

client, err := mqtt.VolatileSession("demo-client", &mqtt.Config{
	Dialer:       mqtt.NewDialer("tcp", "mq1.example.com:1883"),
	PauseTimeout: 4 * time.Second,
	CleanSession: true,
})
if err != nil {
	log.Fatal("exit on broken setup: ", err)
}

A read routine sees inbound messages from any of the subscribed topics.

for {
	message, topic, err := client.ReadSlices()
	switch {
	case err == nil:
		r, _ := utf8.DecodeLastRune(message)
		switch r {
		case '℃', '℉':
			log.Printf("%q at %q", message, topic)
		}

	case errors.Is(err, mqtt.ErrClosed):
		return // client terminated

	default:
		log.Print("broker unavailable: ", err)
		time.Sleep(time.Second) // backoff
	}
}

The client supports confirmed message delivery with full progress disclosure. Message transfers without confirmation can be as simple as the following.

err := client.Publish(ctx.Done(), []byte("20.8℃"), "bedroom")
if err != nil {
	log.Print("thermostat update lost: ", err)
}

See the examples from the package documentation for more detail.

Command-Line Client

Run go install github.com/pascaldekloe/mqtt/cmd/mqttc@latest to build the binary.

NAME
	mqttc — MQTT broker access

SYNOPSIS
	mqttc [options] address

DESCRIPTION
	The command connects to the address argument, with an option to
	publish a message and/or subscribe with topic filters.

	When the address does not specify a port, then the defaults are
	applied, which is 1883 for plain connections and 8883 for TLS.

OPTIONS
  -ca file
    	Amend the trusted certificate authorities with a PEM file.
  -cert file
    	Use a client certificate from a PEM file (with a corresponding
    	-key option).
  -client identifier
    	Use a specific client identifier. (default "generated")
  -key file
    	Use a private key (matching the client certificate) from a PEM
    	file.
  -net name
    	Select the network by name. Valid alternatives include tcp4,
    	tcp6 and unix. (default "tcp")
  -pass file
    	The file content is used as a password.
  -prefix string
    	Print a string before each inbound message.
  -publish topic
    	Send a message to a topic. The payload is read from standard
    	input.
  -quiet
    	Suppress all output to standard error. Error reporting is
    	deduced to the exit code only.
  -quote
    	Print inbound topics and messages as quoted strings.
  -server name
    	Use a specific server name with TLS
  -subscribe filter
    	Listen with a topic filter. Inbound messages are printed to
    	standard output until interrupted by a signal(3). Multiple
    	-subscribe options may be applied together.
  -suffix string
    	Print a string after each inbound message. (default "\n")
  -timeout duration
    	Network operation expiry. (default 4s)
  -tls
    	Secure the connection with TLS.
  -topic
    	Print the respective topic of each inbound message.
  -user name
    	The user name may be used by the broker for authentication
    	and/or authorization purposes.
  -verbose
    	Produces more output to standard error for debug purposes.

EXIT STATUS
	(0) no error
	(1) MQTT operational error
	(2) illegal command invocation
	(5) connection refused: unacceptable protocol version
	(6) connection refused: identifier rejected
	(7) connection refused: server unavailable
	(8) connection refused: bad username or password
	(9) connection refused: not authorized
	(130) close on SIGINT
	(143) disconnect on SIGTERM

EXAMPLES
	Send a message:

		echo "hello" | mqttc -publish chat/misc localhost

	Print messages:

		mqttc -subscribe "news/#" -prefix "📥 " :1883

	Health check:

		mqttc -tls q1.example.com:8883 || echo "exit $?"

BUGS
	Report bugs at <https://github.com/pascaldekloe/mqtt/issues>.

SEE ALSO
	mosquitto_pub(1)

Standard Compliance

The implementation follows version 3.1.1 of the OASIS specification in a strict manner. Support for the original IBM-specification may be added at some point in time.

There are no plans to support protocol version 5. Version 3 is lean and well suited for IOT. The additions in version 5 may be more of a fit for backend computing.

See the Broker wiki for implementation specifics, especially the AWS IoT users.

Documentation

Overview

Package mqtt provides a client for the Message Queuing Telemetry Transport protocol.

http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

Publish and Disconnect do a fire-and-forget submission. ErrClosed, ErrDown, ErrCanceled or an IsDeny [Publish only] imply no request submission. Any other error implies that the request submission was interrupted by either a connection failure or a by PauseTimeout appliance.

Ping, Subscribe and Unsubscribe await response from the broker. ErrClosed, ErrDown, ErrMax, ErrCanceled or an IsDeny [Subscribe and Unsubscribe only] imply no request submission. ErrBreak and ErrAbandoned leave with the broker response unknown. Subscribe responses may cause an SubscribeError. All other errors imply that the request submission was interrupted by either a connection failure or by a PauseTimeout appliance.

PublishAtLeastOnce and PublishExactlyOnce enqueue requests to a Persistence. Errors [either ErrClosed, ErrMax, IsDeny, or a Save return] imply that the message was dropped. Once persisted, the client will execute the transfer with endless retries, and report to the respective exchange channel.

Index

Examples

Constants

View Source
const (

	// ErrProtocolLevel means that the server does not support the level of
	// the MQTT protocol requested by the Client.
	ErrProtocolLevel connectReturn

	// ErrClientID means that the client identifier is correct UTF-8 but not
	// allowed by the Server.
	ErrClientID

	// ErrUnavailable means that the network connection has been made but
	// the MQTT service is unavailable.
	ErrUnavailable

	// ErrAuthBad means that the data in the user name or password is
	// malformed.
	ErrAuthBad

	// ErrAuth means that the client is not authorized to connect.
	ErrAuth
)

Connect return errors are predefined reasons for a broker to deny a connect request. IsConnectionRefused returns true for each of these.

Variables

View Source
var ErrAbandoned = errors.New("mqtt: request abandoned after submission")

ErrAbandoned means that a quit signal got applied after the request was send. The broker received the request, yet the result/response remains unknown.

View Source
var ErrBreak = errors.New("mqtt: connection lost while awaiting response")

ErrBreak means that the connection broke up after the request was send. The broker received the request, yet the result/response remains unknown.

View Source
var ErrCanceled = errors.New("mqtt: request canceled before submission")

ErrCanceled means that a quit signal got applied before the request was send. The transacion never happened, as opposed to ErrAbandoned.

View Source
var ErrClosed = errors.New("mqtt: client closed")

ErrClosed signals use after Close. The state is permanent. Further invocation will result again in an ErrClosed error.

View Source
var ErrDown = errors.New("mqtt: connection unavailable")

ErrDown signals no-service after a failed connect attempt. The error state will clear once a connect retry succeeds.

View Source
var ErrMax = errors.New("mqtt: maximum number of pending requests reached")

ErrMax denies a request on transit capacity, which prevents the Client from blocking. Ping has a limit of 1 slot. Subscribe and Unsubscribe share a large number of slots. PublishAtLeastOnce and PublishExactlyOnce each have a limit defined by Config. A plain Publish (at most once) has no limit.

Functions

func IsConnectionRefused

func IsConnectionRefused(err error) bool

IsConnectionRefused returns whether the broker denied a connect request from the Client.

func IsDeny

func IsDeny(err error) bool

IsDeny returns whether execution was rejected by the Client based on some validation constraint, like a size limitation or an illegal UTF-8 encoding. The rejection is permanent in such case. Another invocation with the same arguments will result in the same error again.

Types

type BigMessage

type BigMessage struct {
	*Client        // source
	Topic   string // destination
	Size    int    // byte count
}

BigMessage signals reception beyond the read buffer capacity. Receivers may or may not allocate the memory with ReadAll. The next ReadSlices will acknowledge reception either way.

func (*BigMessage) Error

func (e *BigMessage) Error() string

Error implements the standard error interface.

func (*BigMessage) ReadAll

func (e *BigMessage) ReadAll() ([]byte, error)

ReadAll returns the message in a new/dedicated buffer. Messages can be read only once, after reception (from ReadSlices), and before the next ReadSlices. The invocation must occur from within the same goroutine.

type Client

type Client struct {
	Config // read-only
	// contains filtered or unexported fields
}

Client manages a network connection until Close or Disconnect. Clients always start in the Offline state. The (un)subscribe, publish and ping methods block until the first connect attempt (from ReadSlices) completes. When the connect attempt fails, then requests receive ErrDown until a retry succeeds. The same goes for reconnects on connection loss.

A single goroutine must invoke ReadSlices consecutively until ErrClosed. Some backoff on error reception comes recommended though.

Multiple goroutines may invoke methods on a Client simultaneously, except for ReadSlices.

Example (Setup)

It is good practice to install the client from main.

package main

import (
	"errors"
	"log"
	"time"

	"github.com/pascaldekloe/mqtt"
)

// Publish is a method from mqtt.Client.
var Publish func(quit <-chan struct{}, message []byte, topic string) error

func main() {
	client, err := mqtt.VolatileSession("demo-client", &mqtt.Config{
		Dialer:       mqtt.NewDialer("tcp", "localhost:1883"),
		PauseTimeout: 4 * time.Second,
	})
	if err != nil {
		log.Fatal("exit on broken setup: ", err)
	}

	// launch read-routine
	go func() {
		var big *mqtt.BigMessage
		for {
			message, topic, err := client.ReadSlices()
			switch {
			case err == nil:
				// do something with inbound message
				log.Printf("📥 %q: %q", topic, message)

			case errors.As(err, &big):
				log.Printf("📥 %q: %d byte message omitted", big.Topic, big.Size)

			case errors.Is(err, mqtt.ErrClosed):
				log.Print(err)
				return // terminated

			case mqtt.IsConnectionRefused(err):
				log.Print(err) // explains rejection
				// mqtt.ErrDown for a while
				time.Sleep(15 * time.Minute)

			default:
				log.Print("broker unavailable: ", err)
				// mqtt.ErrDown during backoff
				time.Sleep(2 * time.Second)
			}
		}
	}()

	// Install each method in use as a package variable. Such setup is
	// compatible with the tools proveded from the mqtttest subpackage.
	Publish = client.Publish
}
Output:

func AdoptSession

func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal error)

AdoptSession continues with a Persistence which had an InitSession already.

A fatal implies either a broken setup or persistence failure. Connection issues, if any, are reported by ReadSlices. The Client recovers from corrupt states (in Persistence) automatically with warn entries.

func InitSession

func InitSession(clientID string, p Persistence, c *Config) (*Client, error)

InitSession configures the Persistence for first use. Brokers use clientID to uniquely identify the session. The session may be continued with AdoptSession on another Client.

func VolatileSession

func VolatileSession(clientID string, c *Config) (*Client, error)

VolatileSession operates solely in-memory. This setup is recommended for delivery with the “at most once” guarantee [Publish], and for reception without the “exactly once” guarantee [SubscribeLimitAtLeastOnce], and for testing.

Brokers use clientID to uniquely identify the session. Volatile sessions may be continued by using the same clientID again. Use CleanSession to prevent reuse of an existing state.

An error implies either a broken setup or persistence failure. Connection issues, if any, are reported by ReadSlices.

func (*Client) Close

func (c *Client) Close() error

Close terminates the connection establishment. The Client is closed regardless of the error return. Closing an already closed Client has no effect.

func (*Client) Disconnect

func (c *Client) Disconnect(quit <-chan struct{}) error

Disconnect tries a graceful termination, which discards the Will. The Client is closed regardless of the error return.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in ErrCanceled.

BUG(pascaldekloe): The MQTT protocol has no confirmation for the disconnect request. As a result, a client can never know for sure whether the operation actually succeeded.

func (*Client) Offline

func (c *Client) Offline() <-chan struct{}

Offline returns a chanel that's closed when the client has no connection.

func (*Client) Online

func (c *Client) Online() <-chan struct{}

Online returns a chanel that's closed when the client has a connection.

func (*Client) Ping

func (c *Client) Ping(quit <-chan struct{}) error

Ping makes a roundtrip to validate the connection. Only one request is permitted ErrMax at a time.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.

func (*Client) Publish

func (c *Client) Publish(quit <-chan struct{}, message []byte, topic string) error

Publish delivers the message with an “at most once” guarantee. Subscribers may or may not receive the message when subject to error. This delivery method is the most efficient option.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in ErrCanceled.

func (*Client) PublishAtLeastOnce

func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-chan error, err error)

PublishAtLeastOnce delivers the message with an “at least once” guarantee. Subscribers may receive the message more than once when subject to error. This delivery method requires a response transmission plus persistence on both client-side and broker-side.

The exchange channel is closed uppon receival confirmation by the broker. ErrClosed leaves the channel blocked (with no further input).

Example (Critical)

Demonstrates all error scenario and the respective recovery options.

package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/pascaldekloe/mqtt"
)

// PublishAtLeastOnce is a method from mqtt.Client.
var PublishAtLeastOnce func(message []byte, topic string) (ack <-chan error, err error)

func main() {
	for {
		exchange, err := PublishAtLeastOnce([]byte("🍸🆘"), "demo/alert")
		switch {
		case err == nil:
			fmt.Println("alert submitted…")
			break

		case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed):
			fmt.Println("🚨 alert not send:", err)
			return

		case errors.Is(err, mqtt.ErrMax):
			fmt.Println("⚠️ alert submission hold-up:", err)
			time.Sleep(time.Second / 4)
			continue

		default:
			fmt.Println("⚠️ alert submission blocked on persistence malfunction:", err)
			time.Sleep(4 * time.Second)
			continue
		}

		for err := range exchange {
			if errors.Is(err, mqtt.ErrClosed) {
				fmt.Println("🚨 alert exchange suspended:", err)
				// An AdoptSession may continue the transaction.
				return
			}

			fmt.Println("⚠️ alert request transfer interrupted:", err)
		}
		fmt.Println("alert acknowledged ✓")
		break
	}

}
Output:

alert submitted…
alert acknowledged ✓

func (*Client) PublishAtLeastOnceRetained

func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (exchange <-chan error, err error)

PublishAtLeastOnceRetained is like PublishAtLeastOnce, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. When a new subscription is established, the last retained message, if any, on each matching topic name must be sent to the subscriber.

func (*Client) PublishExactlyOnce

func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-chan error, err error)

PublishExactlyOnce delivers the message with an “exactly once” guarantee. This delivery method eliminates the duplicate-delivery risk from PublishAtLeastOnce at the expense of an additional network roundtrip.

func (*Client) PublishExactlyOnceRetained

func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (exchange <-chan error, err error)

PublishExactlyOnceRetained is like PublishExactlyOnce, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. When a new subscription is established, the last retained message, if any, on each matching topic name must be sent to the subscriber.

func (*Client) PublishRetained

func (c *Client) PublishRetained(quit <-chan struct{}, message []byte, topic string) error

PublishRetained is like Publish, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. The broker may choose to discard the message at any time though. Uppon reception, the broker must discard any message previously retained for the topic name.

func (*Client) ReadSlices

func (c *Client) ReadSlices() (message, topic []byte, err error)

ReadSlices should be invoked consecutively from a single goroutine until ErrClosed. Both message and topic are slices from a read buffer. The bytes stop being valid at the next read. Care must be taken to avoid freezing the Client. Use any of the following techniques for blocking operations.

  • start a goroutine with a copy of message and/or topic
  • start a goroutine with the bytes parsed/unmarshalled
  • persist message and/or topic; then continue from there
  • apply low timeouts in a strict manner

Each invocation acknowledges ownership of the previous returned, if any, including BigMessage.

BigMessage leaves memory allocation beyond the read buffer as a choice to the consumer. Any error other than BigMessage puts the Client in an ErrDown state. Invocation should apply a backoff once down. Retries on IsConnectionRefused, if any, should probably apply a rather large backoff. See the Client example for a complete setup.

func (*Client) Subscribe

func (c *Client) Subscribe(quit <-chan struct{}, topicFilters ...string) error

Subscribe requests subscription for all topics that match any of the filter arguments.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.

Example (Sticky)

Demonstrates all error scenario and the respective recovery options.

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/pascaldekloe/mqtt"
)

// Subscribe is a method from mqtt.Client.
var Subscribe func(quit <-chan struct{}, topicFilters ...string) error

// Online is a method from mqtt.Client.
var Online func() <-chan struct{}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	for {
		err := Subscribe(ctx.Done(), "demo/+")
		switch {
		case err == nil:
			fmt.Println("subscribe confirmed by broker")
			return

		case errors.As(err, new(mqtt.SubscribeError)):
			fmt.Println("subscribe failed by broker")
			return

		case mqtt.IsDeny(err): // illegal topic filter
			fmt.Println(err)
			return

		case errors.Is(err, mqtt.ErrClosed):
			fmt.Println("no subscribe due client close")
			return

		case errors.Is(err, mqtt.ErrCanceled):
			fmt.Println("no subscribe due timeout")
			return

		case errors.Is(err, mqtt.ErrAbandoned):
			fmt.Println("subscribe state unknown due timeout")
			return

		case errors.Is(err, mqtt.ErrBreak):
			fmt.Println("subscribe state unknown due connection loss")
			select {
			case <-Online():
				fmt.Println("subscribe retry with new connection")
			case <-ctx.Done():
				fmt.Println("subscribe timeout")
				return
			}

		case errors.Is(err, mqtt.ErrDown):
			fmt.Println("subscribe delay while service is down")
			select {
			case <-Online():
				fmt.Println("subscribe retry with new connection")
			case <-ctx.Done():
				fmt.Println("subscribe timeout")
				return
			}

		case errors.Is(err, mqtt.ErrMax):
			fmt.Println("subscribe hold-up due excessive number of pending requests")
			time.Sleep(2 * time.Second) // backoff

		default:
			fmt.Println("subscribe request transfer interrupted:", err)
			time.Sleep(time.Second / 2) // backoff
		}
	}
}
Output:

subscribe confirmed by broker

func (*Client) SubscribeLimitAtLeastOnce

func (c *Client) SubscribeLimitAtLeastOnce(quit <-chan struct{}, topicFilters ...string) error

SubscribeLimitAtLeastOnce is like Subscribe, but it limits message reception to quality-of-service level 1—acknowledged transfer.

func (*Client) SubscribeLimitAtMostOnce

func (c *Client) SubscribeLimitAtMostOnce(quit <-chan struct{}, topicFilters ...string) error

SubscribeLimitAtMostOnce is like Subscribe, but it limits message reception to quality-of-service level 0—fire and forget.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error

Unsubscribe requests subscription cancelation for each of the filter arguments.

Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.

type Config

type Config struct {
	Dialer // chooses the broker

	// PauseTimeout sets the minimim transfer rate as one byte per duration.
	// Zero disables timeout protection entirely, which leaves the Client
	// vulnerable to blocking on stale connections.
	//
	// Any pauses during MQTT packet submission that exceed the timeout will
	// be treated as fatal to the connection, if they are detected in time.
	// Expiry causes automated reconnects just like any other fatal network
	// error. Operations which got interrupted by a PauseTimeout receive a
	// net.Error with Timeout true.
	PauseTimeout time.Duration

	// The maximum number of transactions at a time. Excess is denied with
	// ErrMax. Zero effectively disables the respective quality-of-service
	// level. Negative values default to the Client limit of 16,384. Higher
	// values are truncated silently.
	AtLeastOnceMax, ExactlyOnceMax int

	// The user name may be used by the broker for authentication and/or
	// authorization purposes. An empty string omits the option, except for
	// when password is not nil.
	UserName string
	Password []byte // option omitted when nil

	// The Will Message is published when the connection terminates without
	// Disconnect. A nil Message disables the Will option.
	Will struct {
		Topic   string // destination
		Message []byte // payload

		Retain      bool // see PublishRetained
		AtLeastOnce bool // see PublishAtLeastOnce
		ExactlyOnce bool // overrides AtLeastOnce
	}

	// KeepAlive sets the activity timeout in seconds, with zero for none.
	// The server must disconnect after no control-packet reception for one
	// and a half times the keep-alive duration. Use Ping when idle.
	KeepAlive uint16

	// Brokers must resume communications with the client (identified by
	// ClientID) when CleanSession is false. Otherwise, brokers must create
	// a new session when either CleanSession is true or when no session is
	// associated to the client identifier.
	//
	// Reconnects do not clean the session, regardless of this setting.
	CleanSession bool
}

Config is a Client configuration. Dialer is the only required field.

type Dialer

type Dialer func(ctx context.Context) (net.Conn, error)

Dialer abstracts the transport layer establishment.

func NewDialer

func NewDialer(network, address string) Dialer

NewDialer provides plain network connections. See net.Dial for details on the network & address syntax.

func NewTLSDialer

func NewTLSDialer(network, address string, config *tls.Config) Dialer

NewTLSDialer provides secured network connections. See net.Dial for details on the network & address syntax.

Example (ClientCertificate)

Some brokers permit authentication with TLS client certificates.

package main

import (
	"crypto/tls"
	"log"

	"github.com/pascaldekloe/mqtt"
)

func main() {
	certPEM := []byte(`-----BEGIN CERTIFICATE-----
MIIBhTCCASugAwIBAgIQIRi6zePL6mKjOipn+dNuaTAKBggqhkjOPQQDAjASMRAw
DgYDVQQKEwdBY21lIENvMB4XDTE3MTAyMDE5NDMwNloXDTE4MTAyMDE5NDMwNlow
EjEQMA4GA1UEChMHQWNtZSBDbzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABD0d
7VNhbWvZLWPuj/RtHFjvtJBEwOkhbN/BnnE8rnZR8+sbwnc/KhCk3FhnpHZnQz7B
5aETbbIgmuvewdjvSBSjYzBhMA4GA1UdDwEB/wQEAwICpDATBgNVHSUEDDAKBggr
BgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MCkGA1UdEQQiMCCCDmxvY2FsaG9zdDo1
NDUzgg4xMjcuMC4wLjE6NTQ1MzAKBggqhkjOPQQDAgNIADBFAiEA2zpJEPQyz6/l
Wf86aX6PepsntZv2GYlA5UpabfT2EZICICpJ5h/iI+i341gBmLiAFQOyTDT+/wQc
6MF9+Yw1Yy0t
-----END CERTIFICATE-----`)
	keyPEM := []byte(`-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIIrYSSNQFaA2Hwf1duRSxKtLYX5CB04fSeQ6tF1aY/PuoAoGCCqGSM49
AwEHoUQDQgAEPR3tU2Fta9ktY+6P9G0cWO+0kETA6SFs38GecTyudlHz6xvCdz8q
EKTcWGekdmdDPsHloRNtsiCa697B2O9IFA==
-----END EC PRIVATE KEY-----`)

	cert, err := tls.X509KeyPair(certPEM, keyPEM)
	if err != nil {
		log.Fatal(err)
	}
	mqtt.NewTLSDialer("tcp", "mq1.example.com:8883", &tls.Config{
		Certificates: []tls.Certificate{cert},
	})
}
Output:

type Persistence

type Persistence interface {
	// Load resolves the value of a key. A nil return means “not found”.
	Load(key uint) ([]byte, error)

	// Save defines the value of a key.
	Save(key uint, value net.Buffers) error

	// Delete clears the value of a key, whether it existed or not. Failures
	// will be overwitten eventually due to the limited address space.
	Delete(key uint) error

	// List enumerates all available in any order.
	List() (keys []uint, err error)
}

Persistence tracks the session state as a key–value store. An instance may serve only one Client at a time.

Values are addressed by a 17-bit key, mask 0x1ffff. The minimum size is 12 B. The maximum size is 256 MiB + 17 B. Clients apply integrity checks all round.

Multiple goroutines may invoke methods on a Persistence simultaneously.

func FileSystem

func FileSystem(dir string) Persistence

FileSystem stores values per file in a directory. Callers must ensure the availability, including write permission for the user. The empty string defaults to the working directory.

type SubscribeError

type SubscribeError []string

SubscribeError holds one or more topic filters which were failed by the broker. The element order matches the originating request's.

func (SubscribeError) Error

func (e SubscribeError) Error() string

Error implements the standard error interface.

Notes

Bugs

  • The MQTT protocol has no confirmation for the disconnect request. As a result, a client can never know for sure whether the operation actually succeeded.

  • Save errors from a Persistence may cause duplicate reception for deliveries with an “exactly once guarantee”, if the respective Client goes down before a recovery/retry succeeds.

  • AdoptSession assumes that all publish-at-least-once packets were submitted before already. Persisting the actual state after each network submission seems like too much just for the DUP flag to be slightly more precise.

  • AdoptSession assumes that all publish-exactly-once packets were submitted before already. Persisting the actual state after each network submission seems like too much just for the DUP flag to be slightly more precise.

Directories

Path Synopsis
cmd
mqttc
Package main provides a command-line utility.
Package main provides a command-line utility.
Package mqtttest provides utilities for MQTT testing.
Package mqtttest provides utilities for MQTT testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL