Documentation ¶
Overview ¶
Package mqtt provides an MQTT v3.1.1 client library.
Index ¶
- Constants
- Variables
- func ConnectMQTT(conn net.Conn, cm *packets.ConnectPacket, protocolVersion uint) (byte, bool, []byte)
- func DefaultConnectionLostHandler(client Client, reason error)
- func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, ...) (net.Conn, error)
- func WaitTokenTimeout(t Token, d time.Duration) error
- type Client
- type ClientOptions
- func (o *ClientOptions) AddBroker(server string) *ClientOptions
- func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions
- func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions
- func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, retained bool) *ClientOptions
- func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions
- func (o *ClientOptions) SetClientID(id []byte) *ClientOptions
- func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions
- func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions
- func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions
- func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions
- func (o *ClientOptions) SetConnectionLostHandler(onLost ConnectionLostHandler) *ClientOptions
- func (o *ClientOptions) SetCredentialsProvider(p CredentialsProvider) *ClientOptions
- func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenConnectionFunc) *ClientOptions
- func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions
- func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions
- func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions
- func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions
- func (o *ClientOptions) SetMaxReconnectInterval(t time.Duration) *ClientOptions
- func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *ClientOptions
- func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions
- func (o *ClientOptions) SetOnConnectHandler(onConn OnConnectHandler) *ClientOptions
- func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions
- func (o *ClientOptions) SetPassword(p string) *ClientOptions
- func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions
- func (o *ClientOptions) SetProtocolVersion(pv uint) *ClientOptions
- func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptions
- func (o *ClientOptions) SetResumeSubs(resume bool) *ClientOptions
- func (o *ClientOptions) SetStore(s Store) *ClientOptions
- func (o *ClientOptions) SetTLSConfig(t *tls.Config) *ClientOptions
- func (o *ClientOptions) SetUsername(u string) *ClientOptions
- func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions
- func (o *ClientOptions) SetWill(topic string, payload string, qos byte, retained bool) *ClientOptions
- func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions
- func (o *ClientOptions) UnsetWill() *ClientOptions
- type ClientOptionsReader
- func (r *ClientOptionsReader) AutoReconnect() bool
- func (r *ClientOptionsReader) CleanSession() bool
- func (r *ClientOptionsReader) ClientID() []byte
- func (r *ClientOptionsReader) ConnectRetry() bool
- func (r *ClientOptionsReader) ConnectRetryInterval() time.Duration
- func (r *ClientOptionsReader) ConnectTimeout() time.Duration
- func (r *ClientOptionsReader) HTTPHeaders() http.Header
- func (r *ClientOptionsReader) KeepAlive() time.Duration
- func (r *ClientOptionsReader) MaxReconnectInterval() time.Duration
- func (r *ClientOptionsReader) MessageChannelDepth() uint
- func (r *ClientOptionsReader) Order() bool
- func (r *ClientOptionsReader) Password() string
- func (r *ClientOptionsReader) PingTimeout() time.Duration
- func (r *ClientOptionsReader) ProtocolVersion() uint
- func (r *ClientOptionsReader) ResumeSubs() bool
- func (r *ClientOptionsReader) Servers() []*url.URL
- func (r *ClientOptionsReader) TLSConfig() *tls.Config
- func (r *ClientOptionsReader) Username() string
- func (r *ClientOptionsReader) WebsocketOptions() *WebsocketOptions
- func (r *ClientOptionsReader) WillEnabled() bool
- func (r *ClientOptionsReader) WillPayload() []byte
- func (r *ClientOptionsReader) WillQos() byte
- func (r *ClientOptionsReader) WillRetained() bool
- func (r *ClientOptionsReader) WillTopic() string
- func (r *ClientOptionsReader) WriteTimeout() time.Duration
- type ConnectToken
- type ConnectionAttemptHandler
- type ConnectionLostHandler
- type CredentialsProvider
- type DisconnectToken
- type DummyToken
- type FileStore
- func (store *FileStore) All() []string
- func (store *FileStore) Close()
- func (store *FileStore) Del(key string)
- func (store *FileStore) Get(key string) packets.ControlPacket
- func (store *FileStore) Open()
- func (store *FileStore) Put(key string, m packets.ControlPacket)
- func (store *FileStore) Reset()
- type Logger
- type MId
- type MQTToTConnectResponsePacket
- type MemoryStore
- func (store *MemoryStore) All() []string
- func (store *MemoryStore) Close()
- func (store *MemoryStore) Del(key string)
- func (store *MemoryStore) Get(key string) packets.ControlPacket
- func (store *MemoryStore) Open()
- func (store *MemoryStore) Put(key string, message packets.ControlPacket)
- func (store *MemoryStore) Reset()
- type Message
- type MessageHandler
- type NOOPLogger
- type OnConnectHandler
- type OpenConnectionFunc
- type OrderedMemoryStore
- func (store *OrderedMemoryStore) All() []string
- func (store *OrderedMemoryStore) Close()
- func (store *OrderedMemoryStore) Del(key string)
- func (store *OrderedMemoryStore) Get(key string) packets.ControlPacket
- func (store *OrderedMemoryStore) Open()
- func (store *OrderedMemoryStore) Put(key string, message packets.ControlPacket)
- func (store *OrderedMemoryStore) Reset()
- type PacketAndToken
- type PlaceHolderToken
- type ProxyFunction
- type PublishToken
- type ReconnectHandler
- type Store
- type SubscribeToken
- type Token
- type TokenErrorSetter
- type UnsubscribeToken
- type WebsocketOptions
Constants ¶
const ( NET component = "[net] " PNG component = "[pinger] " CLI component = "[client] " DEC component = "[decode] " MES component = "[message] " STR component = "[store] " MID component = "[msgids] " TST component = "[test] " STA component = "[state] " ERR component = "[error] " ROU component = "[router] " )
Component names for debug output
Variables ¶
var ErrInvalidQos = errors.New("invalid QoS")
ErrInvalidQos is the error returned when an packet is to be sent with an invalid Qos value
var ErrInvalidTopicEmptyString = errors.New("invalid Topic; empty string")
ErrInvalidTopicEmptyString is the error returned when a topic string is passed in that is 0 length
var ErrInvalidTopicMultilevel = errors.New("invalid Topic; multi-level wildcard must be last level")
ErrInvalidTopicMultilevel is the error returned when a topic string is passed in that has the multi level wildcard in any position but the last
var ErrNotConnected = errors.New("not Connected")
ErrNotConnected is the error returned from function calls that are made when the client is not connected to a broker
var TimedOut = errors.New("context canceled")
TimedOut is the error returned by WaitTimeout when the timeout expires
Functions ¶
func ConnectMQTT ¶
func ConnectMQTT(conn net.Conn, cm *packets.ConnectPacket, protocolVersion uint) (byte, bool, []byte)
ConnectMQTT takes a connected net.Conn and performs the initial MQTT handshake. Parameters are: conn - Connected net.Conn cm - Connect Packet with everything other than the protocol name/version populated (historical reasons) protocolVersion - The protocol version to attempt to connect with
Note that, for backward compatibility, ConnectMQTT() suppresses the actual connection error (compare to connectMQTT()).
func DefaultConnectionLostHandler ¶
DefaultConnectionLostHandler is a definition of a function that simply reports to the DEBUG log the reason for the client losing a connection.
func NewWebsocket ¶
func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header, options *WebsocketOptions) (net.Conn, error)
NewWebsocket returns a new websocket and returns a net.Conn compatible interface using the gorilla/websocket package
func WaitTokenTimeout ¶
WaitTokenTimeout is a utility function used to simplify the use of token.WaitTimeout token.WaitTimeout may return `false` due to time out but t.Error() still results in nil. `if t := client.X(); t.WaitTimeout(time.Second) && t.Error() != nil {` may evaluate to false even if the operation fails. It is important to note that if TimedOut is returned, then the operation may still be running and could eventually complete successfully.
Types ¶
type Client ¶
type Client interface { // IsConnected returns a bool signifying whether // the client is connected or not. IsConnected() bool // IsConnectionOpen return a bool signifying whether the client has an active // connection to mqtt broker, i.e not in disconnected or reconnect mode IsConnectionOpen() bool // Connect will create a connection to the message broker, by default // it will attempt to connect at v3.1.1 and auto retry at v3.1 if that // fails Connect() Token // Disconnect will end the connection with the server, but not before waiting // the specified number of milliseconds to wait for existing work to be // completed. Disconnect(quiesce uint) // Publish will publish a message with the specified QoS and content // to the specified topic. // Returns a token to track delivery of the message to the broker Publish(topic string, qos byte, retained bool, payload interface{}) Token // Subscribe starts a new subscription. Provide a MessageHandler to be executed when // a message is published on the topic provided, or nil for the default handler. // // If options.OrderMatters is true (the default) then callback must not block or // call functions within this package that may block (e.g. Publish) other than in // a new go routine. // callback must be safe for concurrent use by multiple goroutines. Subscribe(topic string, qos byte, callback MessageHandler) Token // SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to // be executed when a message is published on one of the topics provided, or nil for the // default handler. // // If options.OrderMatters is true (the default) then callback must not block or // call functions within this package that may block (e.g. Publish) other than in // a new go routine. // callback must be safe for concurrent use by multiple goroutines. SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token // Unsubscribe will end the subscription from each of the topics provided. // Messages published to those topics from other clients will no longer be // received. Unsubscribe(topics ...string) Token // AddRoute allows you to add a handler for messages on a specific topic // without making a subscription. For example having a different handler // for parts of a wildcard subscription or for receiving retained messages // upon connection (before Sub scribe can be processed). // // If options.OrderMatters is true (the default) then callback must not block or // call functions within this package that may block (e.g. Publish) other than in // a new go routine. // callback must be safe for concurrent use by multiple goroutines. AddRoute(topic string, callback MessageHandler) // OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions // in use by the client. OptionsReader() ClientOptionsReader Payload() []byte }
Client is the interface definition for a Client as used by this library, the interface is primarily to allow mocking tests.
It is an MQTT v3.1.1 client for communicating with an MQTT server using non-blocking methods that allow work to be done in the background. An application may connect to an MQTT server using:
A plain TCP socket (e.g. mqtt://test.mosquitto.org:1833) A secure SSL/TLS socket (e.g. tls://test.mosquitto.org:8883) A websocket (e.g ws://test.mosquitto.org:8080 or wss://test.mosquitto.org:8081) Something else (using `options.CustomOpenConnectionFn`)
To enable ensured message delivery at Quality of Service (QoS) levels described in the MQTT spec, a message persistence mechanism must be used. This is done by providing a type which implements the Store interface. For convenience, FileStore and MemoryStore are provided implementations that should be sufficient for most use cases. More information can be found in their respective documentation. Numerous connection options may be specified by configuring a and then supplying a ClientOptions type. Implementations of Client must be safe for concurrent use by multiple goroutines
func NewClient ¶
func NewClient(o *ClientOptions) Client
NewClient will create an MQTT v3.1.1 client with all of the options specified in the provided ClientOptions. The client must have the Connect method called on it before it may be used. This is to make sure resources (such as a net connection) are created before the application is actually ready.
type ClientOptions ¶
type ClientOptions struct { Servers []*url.URL ClientID []byte Username string Password string CredentialsProvider CredentialsProvider CleanSession bool Order bool WillEnabled bool WillTopic string WillPayload []byte WillQos byte WillRetained bool ProtocolVersion uint TLSConfig *tls.Config KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0. PingTimeout time.Duration ConnectTimeout time.Duration MaxReconnectInterval time.Duration AutoReconnect bool ConnectRetryInterval time.Duration ConnectRetry bool Store Store DefaultPublishHandler MessageHandler OnConnect OnConnectHandler OnConnectionLost ConnectionLostHandler OnReconnecting ReconnectHandler OnConnectAttempt ConnectionAttemptHandler WriteTimeout time.Duration MessageChannelDepth uint ResumeSubs bool HTTPHeaders http.Header WebsocketOptions *WebsocketOptions MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming Dialer *net.Dialer CustomOpenConnectionFn OpenConnectionFunc AutoAckDisabled bool // contains filtered or unexported fields }
ClientOptions contains configurable options for an Client. Note that these should be set using the relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy to create a configuration with difficult to trace issues (e.g. Mosquitto 2.0.12+ will reject connections with KeepAlive=0 by default).
func NewClientOptions ¶
func NewClientOptions() *ClientOptions
NewClientOptions will create a new ClientClientOptions type with some default values.
Port: 1883 CleanSession: True Order: True (note: it is recommended that this be set to FALSE unless order is important) KeepAlive: 30 (seconds) ConnectTimeout: 30 (seconds) MaxReconnectInterval 10 (minutes) AutoReconnect: True
func (*ClientOptions) AddBroker ¶
func (o *ClientOptions) AddBroker(server string) *ClientOptions
AddBroker adds a broker URI to the list of brokers to be used. The format should be scheme://host:port Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname) and "port" is the port on which the broker is accepting connections.
Default values for hostname is "127.0.0.1", for schema is "tcp://".
An example broker URI would look like: tcp://foobar.com:1883
func (*ClientOptions) SetAutoAckDisabled ¶
func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions
SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler.
By default it is set to false. Setting it to true will disable the auto-ack globally.
func (*ClientOptions) SetAutoReconnect ¶
func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions
SetAutoReconnect sets whether the automatic reconnection logic should be used when the connection is lost, even if disabled the ConnectionLostHandler is still called
func (*ClientOptions) SetBinaryWill ¶
func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, retained bool) *ClientOptions
SetBinaryWill accepts a []byte will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.
func (*ClientOptions) SetCleanSession ¶
func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions
SetCleanSession will set the "clean session" flag in the connect message when this client connects to an MQTT broker. By setting this flag, you are indicating that no messages saved by the broker for this client should be delivered. Any messages that were going to be sent by this client before disconnecting previously but didn't will not be sent upon connecting to the broker.
func (*ClientOptions) SetClientID ¶
func (o *ClientOptions) SetClientID(id []byte) *ClientOptions
SetClientID will set the client id to be used by this client when connecting to the MQTT broker. According to the MQTT v3.1 specification, a client id must be no longer than 23 characters.
func (*ClientOptions) SetConnectRetry ¶
func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions
SetConnectRetry sets whether the connect function will automatically retry the connection in the event of a failure (when true the token returned by the Connect function will not complete until the connection is up or it is cancelled) If ConnectRetry is true then subscriptions should be requested in OnConnect handler Setting this to TRUE permits messages to be published before the connection is established
func (*ClientOptions) SetConnectRetryInterval ¶
func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions
SetConnectRetryInterval sets the time that will be waited between connection attempts when initially connecting if ConnectRetry is TRUE
func (*ClientOptions) SetConnectTimeout ¶
func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions
SetConnectTimeout limits how long the client will wait when trying to open a connection to an MQTT server before timing out. A duration of 0 never times out. Default 30 seconds. Currently only operational on TCP/TLS connections.
func (*ClientOptions) SetConnectionAttemptHandler ¶
func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions
SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL). This allows connection specific changes to be made to the *tls.Config.
func (*ClientOptions) SetConnectionLostHandler ¶
func (o *ClientOptions) SetConnectionLostHandler(onLost ConnectionLostHandler) *ClientOptions
SetConnectionLostHandler will set the OnConnectionLost callback to be executed in the case where the client unexpectedly loses connection with the MQTT broker.
func (*ClientOptions) SetCredentialsProvider ¶
func (o *ClientOptions) SetCredentialsProvider(p CredentialsProvider) *ClientOptions
SetCredentialsProvider will set a method to be called by this client when connecting to the MQTT broker that provide the current username and password. Note: without the use of SSL/TLS, this information will be sent in plaintext across the wire.
func (*ClientOptions) SetCustomOpenConnectionFn ¶
func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenConnectionFunc) *ClientOptions
SetCustomOpenConnectionFn replaces the inbuilt function that establishes a network connection with a custom function. The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example) It enables custom networking types in addition to the defaults (tcp, tls, websockets...)
func (*ClientOptions) SetDefaultPublishHandler ¶
func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions
SetDefaultPublishHandler sets the MessageHandler that will be called when a message is received that does not match any known subscriptions.
If OrderMatters is true (the defaultHandler) then callback must not block or call functions within this package that may block (e.g. Publish) other than in a new go routine. defaultHandler must be safe for concurrent use by multiple goroutines.
func (*ClientOptions) SetDialer ¶
func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions
SetDialer sets the tcp dialer options used in a tcp connection
func (*ClientOptions) SetHTTPHeaders ¶
func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions
SetHTTPHeaders sets the additional HTTP headers that will be sent in the WebSocket opening handshake.
func (*ClientOptions) SetKeepAlive ¶
func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions
SetKeepAlive will set the amount of time (in seconds) that the client should wait before sending a PING request to the broker. This will allow the client to know that a connection has not been lost with the server.
func (*ClientOptions) SetMaxReconnectInterval ¶
func (o *ClientOptions) SetMaxReconnectInterval(t time.Duration) *ClientOptions
SetMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts when connection is lost
func (*ClientOptions) SetMaxResumePubInFlight ¶
func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *ClientOptions
SetMaxResumePubInFlight sets the maximum simultaneous publish messages that will be sent while resuming. Note that this only applies to messages coming from the store (so additional sends may push us over the limit) Note that the connect token will not be flagged as complete until all messages have been sent from the store. If broker does not respond to messages then resume may not complete. This option was put in place because resuming after downtime can saturate low capacity links.
func (*ClientOptions) SetMessageChannelDepth ¶
func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions
SetMessageChannelDepth DEPRECATED The value set here no longer has any effect, this function remains so the API is not altered.
func (*ClientOptions) SetOnConnectHandler ¶
func (o *ClientOptions) SetOnConnectHandler(onConn OnConnectHandler) *ClientOptions
SetOnConnectHandler sets the function to be called when the client is connected. Both at initial connection time and upon automatic reconnect.
func (*ClientOptions) SetOrderMatters ¶
func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions
SetOrderMatters will set the message routing to guarantee order within each QoS level. By default, this value is true. If set to false (recommended), this flag indicates that messages can be delivered asynchronously from the client to the application and possibly arrive out of order. Specifically, the message handler is called in its own go routine. Note that setting this to true does not guarantee in-order delivery (this is subject to broker settings like "max_inflight_messages=1" in mosquitto) and if true then handlers must not block.
func (*ClientOptions) SetPassword ¶
func (o *ClientOptions) SetPassword(p string) *ClientOptions
SetPassword will set the password to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext across the wire.
func (*ClientOptions) SetPingTimeout ¶
func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions
SetPingTimeout will set the amount of time (in seconds) that the client will wait after sending a PING request to the broker, before deciding that the connection has been lost. Default is 10 seconds.
func (*ClientOptions) SetProtocolVersion ¶
func (o *ClientOptions) SetProtocolVersion(pv uint) *ClientOptions
SetProtocolVersion sets the MQTT version to be used to connect to the broker. Legitimate values are currently 3 - MQTT 3.1 or 4 - MQTT 3.1.1
func (*ClientOptions) SetReconnectingHandler ¶
func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptions
SetReconnectingHandler sets the OnReconnecting callback to be executed prior to the client attempting a reconnect to the MQTT broker.
func (*ClientOptions) SetResumeSubs ¶
func (o *ClientOptions) SetResumeSubs(resume bool) *ClientOptions
SetResumeSubs will enable resuming of stored (un)subscribe messages when connecting but not reconnecting if CleanSession is false. Otherwise these messages are discarded.
func (*ClientOptions) SetStore ¶
func (o *ClientOptions) SetStore(s Store) *ClientOptions
SetStore will set the implementation of the Store interface used to provide message persistence in cases where QoS levels QoS_ONE or QoS_TWO are used. If no store is provided, then the client will use MemoryStore by default.
func (*ClientOptions) SetTLSConfig ¶
func (o *ClientOptions) SetTLSConfig(t *tls.Config) *ClientOptions
SetTLSConfig will set an SSL/TLS configuration to be used when connecting to an MQTT broker. Please read the official Go documentation for more information.
func (*ClientOptions) SetUsername ¶
func (o *ClientOptions) SetUsername(u string) *ClientOptions
SetUsername will set the username to be used by this client when connecting to the MQTT broker. Note: without the use of SSL/TLS, this information will be sent in plaintext across the wire.
func (*ClientOptions) SetWebsocketOptions ¶
func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions
SetWebsocketOptions sets the additional websocket options used in a WebSocket connection
func (*ClientOptions) SetWill ¶
func (o *ClientOptions) SetWill(topic string, payload string, qos byte, retained bool) *ClientOptions
SetWill accepts a string will message to be set. When the client connects, it will give this will message to the broker, which will then publish the provided payload (the will) to any clients that are subscribed to the provided topic.
func (*ClientOptions) SetWriteTimeout ¶
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions
SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a timeout error. A duration of 0 never times out. Default never times out
func (*ClientOptions) UnsetWill ¶
func (o *ClientOptions) UnsetWill() *ClientOptions
UnsetWill will cause any set will message to be disregarded.
type ClientOptionsReader ¶
type ClientOptionsReader struct {
// contains filtered or unexported fields
}
ClientOptionsReader provides an interface for reading ClientOptions after the client has been initialized.
func NewOptionsReader ¶
func NewOptionsReader(o *ClientOptions) ClientOptionsReader
NewOptionsReader creates a ClientOptionsReader, this should only be used for mocking purposes.
An example implementation:
func (c *mqttClientMock) OptionsReader() mqtt.ClientOptionsReader { opts := mqtt.NewClientOptions() opts.UserName = "TestUserName" return mqtt.NewOptionsReader(opts) }
func (*ClientOptionsReader) AutoReconnect ¶
func (r *ClientOptionsReader) AutoReconnect() bool
func (*ClientOptionsReader) CleanSession ¶
func (r *ClientOptionsReader) CleanSession() bool
CleanSession returns whether Cleansession is set
func (*ClientOptionsReader) ClientID ¶
func (r *ClientOptionsReader) ClientID() []byte
ClientID returns the set client id
func (*ClientOptionsReader) ConnectRetry ¶
func (r *ClientOptionsReader) ConnectRetry() bool
ConnectRetry returns whether the initial connection request will be retried until connection established
func (*ClientOptionsReader) ConnectRetryInterval ¶
func (r *ClientOptionsReader) ConnectRetryInterval() time.Duration
ConnectRetryInterval returns the delay between retries on the initial connection (if ConnectRetry true)
func (*ClientOptionsReader) ConnectTimeout ¶
func (r *ClientOptionsReader) ConnectTimeout() time.Duration
func (*ClientOptionsReader) HTTPHeaders ¶
func (r *ClientOptionsReader) HTTPHeaders() http.Header
func (*ClientOptionsReader) KeepAlive ¶
func (r *ClientOptionsReader) KeepAlive() time.Duration
func (*ClientOptionsReader) MaxReconnectInterval ¶
func (r *ClientOptionsReader) MaxReconnectInterval() time.Duration
func (*ClientOptionsReader) MessageChannelDepth ¶
func (r *ClientOptionsReader) MessageChannelDepth() uint
func (*ClientOptionsReader) Order ¶
func (r *ClientOptionsReader) Order() bool
func (*ClientOptionsReader) Password ¶
func (r *ClientOptionsReader) Password() string
Password returns the set password
func (*ClientOptionsReader) PingTimeout ¶
func (r *ClientOptionsReader) PingTimeout() time.Duration
func (*ClientOptionsReader) ProtocolVersion ¶
func (r *ClientOptionsReader) ProtocolVersion() uint
func (*ClientOptionsReader) ResumeSubs ¶
func (r *ClientOptionsReader) ResumeSubs() bool
ResumeSubs returns true if resuming stored (un)sub is enabled
func (*ClientOptionsReader) Servers ¶
func (r *ClientOptionsReader) Servers() []*url.URL
Servers returns a slice of the servers defined in the clientoptions
func (*ClientOptionsReader) TLSConfig ¶
func (r *ClientOptionsReader) TLSConfig() *tls.Config
func (*ClientOptionsReader) Username ¶
func (r *ClientOptionsReader) Username() string
Username returns the set username
func (*ClientOptionsReader) WebsocketOptions ¶
func (r *ClientOptionsReader) WebsocketOptions() *WebsocketOptions
WebsocketOptions returns the currently configured WebSocket options
func (*ClientOptionsReader) WillEnabled ¶
func (r *ClientOptionsReader) WillEnabled() bool
func (*ClientOptionsReader) WillPayload ¶
func (r *ClientOptionsReader) WillPayload() []byte
func (*ClientOptionsReader) WillQos ¶
func (r *ClientOptionsReader) WillQos() byte
func (*ClientOptionsReader) WillRetained ¶
func (r *ClientOptionsReader) WillRetained() bool
func (*ClientOptionsReader) WillTopic ¶
func (r *ClientOptionsReader) WillTopic() string
func (*ClientOptionsReader) WriteTimeout ¶
func (r *ClientOptionsReader) WriteTimeout() time.Duration
type ConnectToken ¶
type ConnectToken struct {
// contains filtered or unexported fields
}
ConnectToken is an extension of Token containing the extra fields required to provide information about calls to Connect()
func (*ConnectToken) Done ¶
func (b *ConnectToken) Done() <-chan struct{}
Done implements the Token Done method.
func (*ConnectToken) ReturnCode ¶
func (c *ConnectToken) ReturnCode() byte
ReturnCode returns the acknowledgement code in the connack sent in response to a Connect()
func (*ConnectToken) SessionPresent ¶
func (c *ConnectToken) SessionPresent() bool
SessionPresent returns a bool representing the value of the session present field in the connack sent in response to a Connect()
func (*ConnectToken) Wait ¶
func (b *ConnectToken) Wait() bool
Wait implements the Token Wait method.
func (*ConnectToken) WaitTimeout ¶
WaitTimeout implements the Token WaitTimeout method.
type ConnectionAttemptHandler ¶
ConnectionAttemptHandler is invoked prior to making the initial connection.
type ConnectionLostHandler ¶
ConnectionLostHandler is a callback type which can be set to be executed upon an unintended disconnection from the MQTT broker. Disconnects caused by calling Disconnect or ForceDisconnect will not cause an OnConnectionLost callback to execute.
type CredentialsProvider ¶
CredentialsProvider allows the username and password to be updated before reconnecting. It should return the current username and password.
type DisconnectToken ¶
type DisconnectToken struct {
// contains filtered or unexported fields
}
DisconnectToken is an extension of Token containing the extra fields required to provide information about calls to Disconnect()
func (*DisconnectToken) Done ¶
func (b *DisconnectToken) Done() <-chan struct{}
Done implements the Token Done method.
func (*DisconnectToken) Wait ¶
func (b *DisconnectToken) Wait() bool
Wait implements the Token Wait method.
func (*DisconnectToken) WaitTimeout ¶
WaitTimeout implements the Token WaitTimeout method.
type DummyToken ¶
type DummyToken struct {
// contains filtered or unexported fields
}
func (*DummyToken) Done ¶
func (d *DummyToken) Done() <-chan struct{}
Done implements the Token Done method.
func (*DummyToken) Error ¶
func (d *DummyToken) Error() error
func (*DummyToken) WaitTimeout ¶
func (d *DummyToken) WaitTimeout(t time.Duration) bool
WaitTimeout implements the Token WaitTimeout method.
type FileStore ¶
FileStore implements the store interface using the filesystem to provide true persistence, even across client failure. This is designed to use a single directory per running client. If you are running multiple clients on the same filesystem, you will need to be careful to specify unique store directories for each.
func NewFileStore ¶
NewFileStore will create a new FileStore which stores its messages in the directory provided.
func (*FileStore) All ¶
All will provide a list of all of the keys associated with messages currently residing in the FileStore.
func (*FileStore) Close ¶
func (store *FileStore) Close()
Close will disallow the FileStore from being used.
func (*FileStore) Del ¶
Del will remove the persisted message associated with the provided key from the FileStore.
func (*FileStore) Get ¶
func (store *FileStore) Get(key string) packets.ControlPacket
Get will retrieve a message from the store, the one associated with the provided key value.
type Logger ¶
type Logger interface { Println(v ...interface{}) Printf(format string, v ...interface{}) }
Logger interface allows implementations to provide to this package any object that implements the methods defined in it.
var ( ERROR Logger = NOOPLogger{} CRITICAL Logger = NOOPLogger{} WARN Logger = NOOPLogger{} DEBUG Logger = NOOPLogger{} )
Internal levels of library output that are initialised to not print anything but can be overridden by programmer
type MId ¶
type MId uint16
MId is 16 bit message id as specified by the MQTT spec. In general, these values should not be depended upon by the client application.
type MQTToTConnectResponsePacket ¶
MQTToTConnectResponsePacket defines the structure for the MQTT Connect Acknowledgment packet
type MemoryStore ¶
MemoryStore implements the store interface to provide a "persistence" mechanism wholly stored in memory. This is only useful for as long as the client instance exists.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore returns a pointer to a new instance of MemoryStore, the instance is not initialized and ready to use until Open() has been called on it.
func (*MemoryStore) All ¶
func (store *MemoryStore) All() []string
All returns a slice of strings containing all the keys currently in the MemoryStore.
func (*MemoryStore) Close ¶
func (store *MemoryStore) Close()
Close will disallow modifications to the state of the store.
func (*MemoryStore) Del ¶
func (store *MemoryStore) Del(key string)
Del takes a key, searches the MemoryStore and if the key is found deletes the Message pointer associated with it.
func (*MemoryStore) Get ¶
func (store *MemoryStore) Get(key string) packets.ControlPacket
Get takes a key and looks in the store for a matching Message returning either the Message pointer or nil.
func (*MemoryStore) Open ¶
func (store *MemoryStore) Open()
Open initializes a MemoryStore instance.
func (*MemoryStore) Put ¶
func (store *MemoryStore) Put(key string, message packets.ControlPacket)
Put takes a key and a pointer to a Message and stores the message.
func (*MemoryStore) Reset ¶
func (store *MemoryStore) Reset()
Reset eliminates all persisted message data in the store.
type Message ¶
type Message interface { Duplicate() bool Qos() byte Retained() bool Topic() string MessageID() uint16 Payload() []byte Ack() }
Message defines the externals that a message implementation must support these are received messages that are passed to the callbacks, not internal messages
type MessageHandler ¶
MessageHandler is a callback type which can be set to be executed upon the arrival of messages published to topics to which the client is subscribed.
type NOOPLogger ¶
type NOOPLogger struct{}
NOOPLogger implements the logger that does not perform any operation by default. This allows us to efficiently discard the unwanted messages.
func (NOOPLogger) Printf ¶
func (NOOPLogger) Printf(format string, v ...interface{})
func (NOOPLogger) Println ¶
func (NOOPLogger) Println(v ...interface{})
type OnConnectHandler ¶
type OnConnectHandler func(Client)
OnConnectHandler is a callback that is called when the client state changes from unconnected/disconnected to connected. Both at initial connection and on reconnection
type OpenConnectionFunc ¶
OpenConnectionFunc is invoked to establish the underlying network connection Its purpose if for custom network transports. Does not carry out any MQTT specific handshakes.
type OrderedMemoryStore ¶
OrderedMemoryStore implements the store interface to provide a "persistence" mechanism wholly stored in memory. This is only useful for as long as the client instance exists.
func NewOrderedMemoryStore ¶
func NewOrderedMemoryStore() *OrderedMemoryStore
NewOrderedMemoryStore returns a pointer to a new instance of OrderedMemoryStore, the instance is not initialized and ready to use until Open() has been called on it.
func (*OrderedMemoryStore) All ¶
func (store *OrderedMemoryStore) All() []string
All returns a slice of strings containing all the keys currently in the OrderedMemoryStore.
func (*OrderedMemoryStore) Close ¶
func (store *OrderedMemoryStore) Close()
Close will disallow modifications to the state of the store.
func (*OrderedMemoryStore) Del ¶
func (store *OrderedMemoryStore) Del(key string)
Del takes a key, searches the OrderedMemoryStore and if the key is found deletes the Message pointer associated with it.
func (*OrderedMemoryStore) Get ¶
func (store *OrderedMemoryStore) Get(key string) packets.ControlPacket
Get takes a key and looks in the store for a matching Message returning either the Message pointer or nil.
func (*OrderedMemoryStore) Open ¶
func (store *OrderedMemoryStore) Open()
Open initializes a OrderedMemoryStore instance.
func (*OrderedMemoryStore) Put ¶
func (store *OrderedMemoryStore) Put(key string, message packets.ControlPacket)
Put takes a key and a pointer to a Message and stores the message.
func (*OrderedMemoryStore) Reset ¶
func (store *OrderedMemoryStore) Reset()
Reset eliminates all persisted message data in the store.
type PacketAndToken ¶
type PacketAndToken struct {
// contains filtered or unexported fields
}
PacketAndToken is a struct that contains both a ControlPacket and a Token. This struct is passed via channels between the client interface code and the underlying code responsible for sending and receiving MQTT messages.
type PlaceHolderToken ¶
type PlaceHolderToken struct {
// contains filtered or unexported fields
}
PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved it differs from DummyToken in that calling flowComplete does not generate an error (it is expected that flowComplete will be called when the token is overwritten with a real token)
func (*PlaceHolderToken) Done ¶
func (p *PlaceHolderToken) Done() <-chan struct{}
Done implements the Token Done method.
func (*PlaceHolderToken) Error ¶
func (p *PlaceHolderToken) Error() error
func (*PlaceHolderToken) Wait ¶
func (p *PlaceHolderToken) Wait() bool
Wait implements the Token Wait method.
func (*PlaceHolderToken) WaitTimeout ¶
func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool
WaitTimeout implements the Token WaitTimeout method.
type PublishToken ¶
type PublishToken struct {
// contains filtered or unexported fields
}
PublishToken is an extension of Token containing the extra fields required to provide information about calls to Publish()
func (*PublishToken) Done ¶
func (b *PublishToken) Done() <-chan struct{}
Done implements the Token Done method.
func (*PublishToken) MessageID ¶
func (p *PublishToken) MessageID() uint16
MessageID returns the MQTT message ID that was assigned to the Publish packet when it was sent to the broker
func (*PublishToken) Wait ¶
func (b *PublishToken) Wait() bool
Wait implements the Token Wait method.
func (*PublishToken) WaitTimeout ¶
WaitTimeout implements the Token WaitTimeout method.
type ReconnectHandler ¶
type ReconnectHandler func(Client, *ClientOptions)
ReconnectHandler is invoked prior to reconnecting after the initial connection is lost
type Store ¶
type Store interface { Open() Put(key string, message packets.ControlPacket) Get(key string) packets.ControlPacket All() []string Del(key string) Close() Reset() }
Store is an interface which can be used to provide implementations for message persistence. Because we may have to store distinct messages with the same message ID, we need a unique key for each message. This is possible by prepending "i." or "o." to each message id
type SubscribeToken ¶
type SubscribeToken struct {
// contains filtered or unexported fields
}
SubscribeToken is an extension of Token containing the extra fields required to provide information about calls to Subscribe()
func (*SubscribeToken) Done ¶
func (b *SubscribeToken) Done() <-chan struct{}
Done implements the Token Done method.
func (*SubscribeToken) Result ¶
func (s *SubscribeToken) Result() map[string]byte
Result returns a map of topics that were subscribed to along with the matching return code from the broker. This is either the Qos value of the subscription or an error code.
func (*SubscribeToken) Wait ¶
func (b *SubscribeToken) Wait() bool
Wait implements the Token Wait method.
func (*SubscribeToken) WaitTimeout ¶
WaitTimeout implements the Token WaitTimeout method.
type Token ¶
type Token interface { // Wait will wait indefinitely for the Token to complete, ie the Publish // to be sent and confirmed receipt from the broker. Wait() bool // WaitTimeout takes a time.Duration to wait for the flow associated with the // Token to complete, returns true if it returned before the timeout or // returns false if the timeout occurred. In the case of a timeout the Token // does not have an error set in case the caller wishes to wait again. WaitTimeout(time.Duration) bool // Done returns a channel that is closed when the flow associated // with the Token completes. Clients should call Error after the // channel is closed to check if the flow completed successfully. // // Done is provided for use in select statements. Simple use cases may // use Wait or WaitTimeout. Done() <-chan struct{} Error() error }
Token defines the interface for the tokens used to indicate when actions have completed.
type TokenErrorSetter ¶
type TokenErrorSetter interface {
// contains filtered or unexported methods
}
type UnsubscribeToken ¶
type UnsubscribeToken struct {
// contains filtered or unexported fields
}
UnsubscribeToken is an extension of Token containing the extra fields required to provide information about calls to Unsubscribe()
func (*UnsubscribeToken) Done ¶
func (b *UnsubscribeToken) Done() <-chan struct{}
Done implements the Token Done method.
func (*UnsubscribeToken) Wait ¶
func (b *UnsubscribeToken) Wait() bool
Wait implements the Token Wait method.
func (*UnsubscribeToken) WaitTimeout ¶
WaitTimeout implements the Token WaitTimeout method.
type WebsocketOptions ¶
type WebsocketOptions struct { ReadBufferSize int WriteBufferSize int Proxy ProxyFunction }
WebsocketOptions are config options for a websocket dialer