Documentation ¶
Overview ¶
Package mqtt provides a client for the Message Queuing Telemetry Transport protocol.
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Publish and Disconnect do a fire-and-forget submission. ErrClosed, ErrDown, ErrCanceled or an IsDeny [Publish only] imply no request submission. Any other error implies that the request submission was interrupted by either a connection failure or a by PauseTimeout appliance.
Ping, Subscribe and Unsubscribe await response from the broker. ErrClosed, ErrDown, ErrMax, ErrCanceled or an IsDeny [Subscribe and Unsubscribe only] imply no request submission. ErrBreak and ErrAbandoned leave with the broker response unknown. Subscribe responses may cause an SubscribeError. All other errors imply that the request submission was interrupted by either a connection failure or by a PauseTimeout appliance.
PublishAtLeastOnce and PublishExactlyOnce enqueue requests to a Persistence. Errors [either ErrClosed, ErrMax, IsDeny, or a Save return] imply that the message was dropped. Once persisted, the client will execute the transfer with endless retries, and report to the respective exchange channel.
Index ¶
- Constants
- Variables
- func IsConnectionRefused(err error) bool
- func IsDeny(err error) bool
- type BigMessage
- type Client
- func (c *Client) Close() error
- func (c *Client) Disconnect(quit <-chan struct{}) error
- func (c *Client) Offline() <-chan struct{}
- func (c *Client) Online() <-chan struct{}
- func (c *Client) Ping(quit <-chan struct{}) error
- func (c *Client) Publish(quit <-chan struct{}, message []byte, topic string) error
- func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-chan error, err error)
- func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (exchange <-chan error, err error)
- func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-chan error, err error)
- func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (exchange <-chan error, err error)
- func (c *Client) PublishRetained(quit <-chan struct{}, message []byte, topic string) error
- func (c *Client) ReadSlices() (message, topic []byte, err error)
- func (c *Client) Subscribe(quit <-chan struct{}, topicFilters ...string) error
- func (c *Client) SubscribeLimitAtLeastOnce(quit <-chan struct{}, topicFilters ...string) error
- func (c *Client) SubscribeLimitAtMostOnce(quit <-chan struct{}, topicFilters ...string) error
- func (c *Client) Unsubscribe(quit <-chan struct{}, topicFilters ...string) error
- type Config
- type Dialer
- type Persistence
- type SubscribeError
- Bugs
Examples ¶
Constants ¶
const ( // ErrProtocolLevel means that the server does not support the level of // the MQTT protocol requested by the Client. ErrProtocolLevel connectReturn // ErrClientID means that the client identifier is correct UTF-8 but not // allowed by the Server. ErrClientID // the MQTT service is unavailable. ErrUnavailable // ErrAuthBad means that the data in the user name or password is // malformed. ErrAuthBad // ErrAuth means that the client is not authorized to connect. ErrAuth )
Connect return errors are predefined reasons for a broker to deny a connect request. IsConnectionRefused returns true for each of these.
Variables ¶
var ErrAbandoned = errors.New("mqtt: request abandoned after submission")
ErrAbandoned means that a quit signal got applied after the request was send. The broker received the request, yet the result/response remains unknown.
var ErrBreak = errors.New("mqtt: connection lost while awaiting response")
ErrBreak means that the connection broke up after the request was send. The broker received the request, yet the result/response remains unknown.
var ErrCanceled = errors.New("mqtt: request canceled before submission")
ErrCanceled means that a quit signal got applied before the request was send. The transacion never happened, as opposed to ErrAbandoned.
var ErrClosed = errors.New("mqtt: client closed")
ErrClosed signals use after Close. The state is permanent. Further invocation will result again in an ErrClosed error.
var ErrDown = errors.New("mqtt: connection unavailable")
ErrDown signals no-service after a failed connect attempt. The error state will clear once a connect retry succeeds.
var ErrMax = errors.New("mqtt: maximum number of pending requests reached")
ErrMax denies a request on transit capacity, which prevents the Client from blocking. Ping has a limit of 1 slot. Subscribe and Unsubscribe share a large number of slots. PublishAtLeastOnce and PublishExactlyOnce each have a limit defined by Config. A plain Publish (at most once) has no limit.
Functions ¶
func IsConnectionRefused ¶
IsConnectionRefused returns whether the broker denied a connect request from the Client.
Types ¶
type BigMessage ¶
BigMessage signals reception beyond the read buffer capacity. Receivers may or may not allocate the memory with ReadAll. The next ReadSlices will acknowledge reception either way.
func (*BigMessage) Error ¶
func (e *BigMessage) Error() string
Error implements the standard error interface.
func (*BigMessage) ReadAll ¶
func (e *BigMessage) ReadAll() ([]byte, error)
ReadAll returns the message in a new/dedicated buffer. Messages can be read only once, after reception (from ReadSlices), and before the next ReadSlices. The invocation must occur from within the same goroutine.
type Client ¶
type Client struct { Config // read-only // contains filtered or unexported fields }
Client manages a network connection until Close or Disconnect. Clients always start in the Offline state. The (un)subscribe, publish and ping methods block until the first connect attempt (from ReadSlices) completes. When the connect attempt fails, then requests receive ErrDown until a retry succeeds. The same goes for reconnects on connection loss.
A single goroutine must invoke ReadSlices consecutively until ErrClosed. Some backoff on error reception comes recommended though.
Multiple goroutines may invoke methods on a Client simultaneously, except for ReadSlices.
Example (Setup) ¶
It is good practice to install the client from main.
package main import ( "errors" "log" "time" "github.com/pascaldekloe/mqtt" ) // Publish is a method from mqtt.Client. var Publish func(quit <-chan struct{}, message []byte, topic string) error func main() { client, err := mqtt.VolatileSession("demo-client", &mqtt.Config{ Dialer: mqtt.NewDialer("tcp", "localhost:1883"), PauseTimeout: 4 * time.Second, }) if err != nil { log.Fatal("exit on broken setup: ", err) } // launch read-routine go func() { var big *mqtt.BigMessage for { message, topic, err := client.ReadSlices() switch { case err == nil: // do something with inbound message log.Printf("📥 %q: %q", topic, message) case errors.As(err, &big): log.Printf("📥 %q: %d byte message omitted", big.Topic, big.Size) case errors.Is(err, mqtt.ErrClosed): log.Print(err) return // terminated case mqtt.IsConnectionRefused(err): log.Print(err) // explains rejection // mqtt.ErrDown for a while time.Sleep(15 * time.Minute) default: log.Print("broker unavailable: ", err) // mqtt.ErrDown during backoff time.Sleep(2 * time.Second) } } }() // Install each method in use as a package variable. Such setup is // compatible with the tools proveded from the mqtttest subpackage. Publish = client.Publish }
Output:
func AdoptSession ¶
func AdoptSession(p Persistence, c *Config) (client *Client, warn []error, fatal error)
AdoptSession continues with a Persistence which had an InitSession already.
A fatal implies either a broken setup or persistence failure. Connection issues, if any, are reported by ReadSlices. The Client recovers from corrupt states (in Persistence) automatically with warn entries.
func InitSession ¶
func InitSession(clientID string, p Persistence, c *Config) (*Client, error)
InitSession configures the Persistence for first use. Brokers use clientID to uniquely identify the session. The session may be continued with AdoptSession on another Client.
func VolatileSession ¶
VolatileSession operates solely in-memory. This setup is recommended for delivery with the “at most once” guarantee [Publish], and for reception without the “exactly once” guarantee [SubscribeLimitAtLeastOnce], and for testing.
Brokers use clientID to uniquely identify the session. Volatile sessions may be continued by using the same clientID again. Use CleanSession to prevent reuse of an existing state.
An error implies either a broken setup or persistence failure. Connection issues, if any, are reported by ReadSlices.
func (*Client) Close ¶
Close terminates the connection establishment. The Client is closed regardless of the error return. Closing an already closed Client has no effect.
func (*Client) Disconnect ¶
Disconnect tries a graceful termination, which discards the Will. The Client is closed regardless of the error return.
Quit is optional, as nil just blocks. Appliance of quit will strictly result in ErrCanceled.
BUG(pascaldekloe): The MQTT protocol has no confirmation for the disconnect request. As a result, a client can never know for sure whether the operation actually succeeded.
func (*Client) Offline ¶
func (c *Client) Offline() <-chan struct{}
Offline returns a chanel that's closed when the client has no connection.
func (*Client) Online ¶
func (c *Client) Online() <-chan struct{}
Online returns a chanel that's closed when the client has a connection.
func (*Client) Ping ¶
Ping makes a roundtrip to validate the connection. Only one request is permitted ErrMax at a time.
Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.
func (*Client) Publish ¶
Publish delivers the message with an “at most once” guarantee. Subscribers may or may not receive the message when subject to error. This delivery method is the most efficient option.
Quit is optional, as nil just blocks. Appliance of quit will strictly result in ErrCanceled.
func (*Client) PublishAtLeastOnce ¶
func (c *Client) PublishAtLeastOnce(message []byte, topic string) (exchange <-chan error, err error)
PublishAtLeastOnce delivers the message with an “at least once” guarantee. Subscribers may receive the message more than once when subject to error. This delivery method requires a response transmission plus persistence on both client-side and broker-side.
The exchange channel is closed uppon receival confirmation by the broker. ErrClosed leaves the channel blocked (with no further input).
Example (Critical) ¶
Demonstrates all error scenario and the respective recovery options.
package main import ( "errors" "fmt" "time" "github.com/pascaldekloe/mqtt" ) // PublishAtLeastOnce is a method from mqtt.Client. var PublishAtLeastOnce func(message []byte, topic string) (ack <-chan error, err error) func main() { for { exchange, err := PublishAtLeastOnce([]byte("🍸🆘"), "demo/alert") switch { case err == nil: fmt.Println("alert submitted…") break case mqtt.IsDeny(err), errors.Is(err, mqtt.ErrClosed): fmt.Println("🚨 alert not send:", err) return case errors.Is(err, mqtt.ErrMax): fmt.Println("⚠️ alert submission hold-up:", err) time.Sleep(time.Second / 4) continue default: fmt.Println("⚠️ alert submission blocked on persistence malfunction:", err) time.Sleep(4 * time.Second) continue } for err := range exchange { if errors.Is(err, mqtt.ErrClosed) { fmt.Println("🚨 alert exchange suspended:", err) // An AdoptSession may continue the transaction. return } fmt.Println("⚠️ alert request transfer interrupted:", err) } fmt.Println("alert acknowledged ✓") break } }
Output: alert submitted… alert acknowledged ✓
func (*Client) PublishAtLeastOnceRetained ¶
func (c *Client) PublishAtLeastOnceRetained(message []byte, topic string) (exchange <-chan error, err error)
PublishAtLeastOnceRetained is like PublishAtLeastOnce, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. When a new subscription is established, the last retained message, if any, on each matching topic name must be sent to the subscriber.
func (*Client) PublishExactlyOnce ¶
func (c *Client) PublishExactlyOnce(message []byte, topic string) (exchange <-chan error, err error)
PublishExactlyOnce delivers the message with an “exactly once” guarantee. This delivery method eliminates the duplicate-delivery risk from PublishAtLeastOnce at the expense of an additional network roundtrip.
func (*Client) PublishExactlyOnceRetained ¶
func (c *Client) PublishExactlyOnceRetained(message []byte, topic string) (exchange <-chan error, err error)
PublishExactlyOnceRetained is like PublishExactlyOnce, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. When a new subscription is established, the last retained message, if any, on each matching topic name must be sent to the subscriber.
func (*Client) PublishRetained ¶
PublishRetained is like Publish, but the broker must store the message, so that it can be delivered to future subscribers whose subscriptions match the topic name. The broker may choose to discard the message at any time though. Uppon reception, the broker must discard any message previously retained for the topic name.
func (*Client) ReadSlices ¶
ReadSlices should be invoked consecutively from a single goroutine until ErrClosed. Both message and topic are slices from a read buffer. The bytes stop being valid at the next read. Care must be taken to avoid freezing the Client. Use any of the following techniques for blocking operations.
- start a goroutine with a copy of message and/or topic
- start a goroutine with the bytes parsed/unmarshalled
- persist message and/or topic; then continue from there
- apply low timeouts in a strict manner
Each invocation acknowledges ownership of the previous returned, if any, including BigMessage.
BigMessage leaves memory allocation beyond the read buffer as a choice to the consumer. Any error other than BigMessage puts the Client in an ErrDown state. Invocation should apply a backoff once down. Retries on IsConnectionRefused, if any, should probably apply a rather large backoff. See the Client example for a complete setup.
func (*Client) Subscribe ¶
Subscribe requests subscription for all topics that match any of the filter arguments.
Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.
Example (Sticky) ¶
Demonstrates all error scenario and the respective recovery options.
package main import ( "context" "errors" "fmt" "time" "github.com/pascaldekloe/mqtt" ) // Subscribe is a method from mqtt.Client. var Subscribe func(quit <-chan struct{}, topicFilters ...string) error // Online is a method from mqtt.Client. var Online func() <-chan struct{} func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() for { err := Subscribe(ctx.Done(), "demo/+") switch { case err == nil: fmt.Println("subscribe confirmed by broker") return case errors.As(err, new(mqtt.SubscribeError)): fmt.Println("subscribe failed by broker") return case mqtt.IsDeny(err): // illegal topic filter fmt.Println(err) return case errors.Is(err, mqtt.ErrClosed): fmt.Println("no subscribe due client close") return case errors.Is(err, mqtt.ErrCanceled): fmt.Println("no subscribe due timeout") return case errors.Is(err, mqtt.ErrAbandoned): fmt.Println("subscribe state unknown due timeout") return case errors.Is(err, mqtt.ErrBreak): fmt.Println("subscribe state unknown due connection loss") select { case <-Online(): fmt.Println("subscribe retry with new connection") case <-ctx.Done(): fmt.Println("subscribe timeout") return } case errors.Is(err, mqtt.ErrDown): fmt.Println("subscribe delay while service is down") select { case <-Online(): fmt.Println("subscribe retry with new connection") case <-ctx.Done(): fmt.Println("subscribe timeout") return } case errors.Is(err, mqtt.ErrMax): fmt.Println("subscribe hold-up due excessive number of pending requests") time.Sleep(2 * time.Second) // backoff default: fmt.Println("subscribe request transfer interrupted:", err) time.Sleep(time.Second / 2) // backoff } } }
Output: subscribe confirmed by broker
func (*Client) SubscribeLimitAtLeastOnce ¶
SubscribeLimitAtLeastOnce is like Subscribe, but it limits message reception to quality-of-service level 1—acknowledged transfer.
func (*Client) SubscribeLimitAtMostOnce ¶
SubscribeLimitAtMostOnce is like Subscribe, but it limits message reception to quality-of-service level 0—fire and forget.
func (*Client) Unsubscribe ¶
Unsubscribe requests subscription cancelation for each of the filter arguments.
Quit is optional, as nil just blocks. Appliance of quit will strictly result in either ErrCanceled or ErrAbandoned.
type Config ¶
type Config struct { Dialer // chooses the broker // PauseTimeout sets the minimim transfer rate as one byte per duration. // Zero disables timeout protection entirely, which leaves the Client // vulnerable to blocking on stale connections. // // Any pauses during MQTT packet submission that exceed the timeout will // be treated as fatal to the connection, if they are detected in time. // Expiry causes automated reconnects just like any other fatal network // error. Operations which got interrupted by a PauseTimeout receive a // net.Error with Timeout true. PauseTimeout time.Duration // The maximum number of transactions at a time. Excess is denied with // ErrMax. Zero effectively disables the respective quality-of-service // level. Negative values default to the Client limit of 16,384. Higher // values are truncated silently. AtLeastOnceMax, ExactlyOnceMax int // The user name may be used by the broker for authentication and/or // authorization purposes. An empty string omits the option, except for // when password is not nil. UserName string Password []byte // option omitted when nil // The Will Message is published when the connection terminates without // Disconnect. A nil Message disables the Will option. Will struct { Topic string // destination Message []byte // payload Retain bool // see PublishRetained AtLeastOnce bool // see PublishAtLeastOnce ExactlyOnce bool // overrides AtLeastOnce } // KeepAlive sets the activity timeout in seconds, with zero for none. // The server must disconnect after no control-packet reception for one // and a half times the keep-alive duration. Use Ping when idle. KeepAlive uint16 // Brokers must resume communications with the client (identified by // ClientID) when CleanSession is false. Otherwise, brokers must create // a new session when either CleanSession is true or when no session is // associated to the client identifier. // // Reconnects do not clean the session, regardless of this setting. CleanSession bool }
Config is a Client configuration. Dialer is the only required field.
type Dialer ¶
Dialer abstracts the transport layer establishment.
func NewDialer ¶
NewDialer provides plain network connections. See net.Dial for details on the network & address syntax.
func NewTLSDialer ¶
NewTLSDialer provides secured network connections. See net.Dial for details on the network & address syntax.
Example (ClientCertificate) ¶
Some brokers permit authentication with TLS client certificates.
package main import ( "crypto/tls" "log" "github.com/pascaldekloe/mqtt" ) func main() { certPEM := []byte(`-----BEGIN CERTIFICATE----- MIIBhTCCASugAwIBAgIQIRi6zePL6mKjOipn+dNuaTAKBggqhkjOPQQDAjASMRAw DgYDVQQKEwdBY21lIENvMB4XDTE3MTAyMDE5NDMwNloXDTE4MTAyMDE5NDMwNlow EjEQMA4GA1UEChMHQWNtZSBDbzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABD0d 7VNhbWvZLWPuj/RtHFjvtJBEwOkhbN/BnnE8rnZR8+sbwnc/KhCk3FhnpHZnQz7B 5aETbbIgmuvewdjvSBSjYzBhMA4GA1UdDwEB/wQEAwICpDATBgNVHSUEDDAKBggr BgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MCkGA1UdEQQiMCCCDmxvY2FsaG9zdDo1 NDUzgg4xMjcuMC4wLjE6NTQ1MzAKBggqhkjOPQQDAgNIADBFAiEA2zpJEPQyz6/l Wf86aX6PepsntZv2GYlA5UpabfT2EZICICpJ5h/iI+i341gBmLiAFQOyTDT+/wQc 6MF9+Yw1Yy0t -----END CERTIFICATE-----`) keyPEM := []byte(`-----BEGIN EC PRIVATE KEY----- MHcCAQEEIIrYSSNQFaA2Hwf1duRSxKtLYX5CB04fSeQ6tF1aY/PuoAoGCCqGSM49 AwEHoUQDQgAEPR3tU2Fta9ktY+6P9G0cWO+0kETA6SFs38GecTyudlHz6xvCdz8q EKTcWGekdmdDPsHloRNtsiCa697B2O9IFA== -----END EC PRIVATE KEY-----`) cert, err := tls.X509KeyPair(certPEM, keyPEM) if err != nil { log.Fatal(err) } mqtt.NewTLSDialer("tcp", "mq1.example.com:8883", &tls.Config{ Certificates: []tls.Certificate{cert}, }) }
Output:
type Persistence ¶
type Persistence interface { // Load resolves the value of a key. A nil return means “not found”. Load(key uint) ([]byte, error) // Save defines the value of a key. Save(key uint, value net.Buffers) error // Delete clears the value of a key, whether it existed or not. Failures // will be overwitten eventually due to the limited address space. Delete(key uint) error // List enumerates all available in any order. List() (keys []uint, err error) }
Persistence tracks the session state as a key–value store. An instance may serve only one Client at a time.
Values are addressed by a 17-bit key, mask 0x1ffff. The minimum size is 12 B. The maximum size is 256 MiB + 17 B. Clients apply integrity checks all round.
Multiple goroutines may invoke methods on a Persistence simultaneously.
func FileSystem ¶
func FileSystem(dir string) Persistence
FileSystem stores values per file in a directory. Callers must ensure the availability, including write permission for the user. The empty string defaults to the working directory.
type SubscribeError ¶
type SubscribeError []string
SubscribeError holds one or more topic filters which were failed by the broker. The element order matches the originating request's.
func (SubscribeError) Error ¶
func (e SubscribeError) Error() string
Error implements the standard error interface.
Notes ¶
Bugs ¶
The MQTT protocol has no confirmation for the disconnect request. As a result, a client can never know for sure whether the operation actually succeeded.
Save errors from a Persistence may cause duplicate reception for deliveries with an “exactly once guarantee”, if the respective Client goes down before a recovery/retry succeeds.
AdoptSession assumes that all publish-at-least-once packets were submitted before already. Persisting the actual state after each network submission seems like too much just for the DUP flag to be slightly more precise.
AdoptSession assumes that all publish-exactly-once packets were submitted before already. Persisting the actual state after each network submission seems like too much just for the DUP flag to be slightly more precise.