Documentation ¶
Index ¶
- Variables
- type Backoff
- type ClientConfig
- func (cfg *ClientConfig) ResetUsernamePassword()
- func (cfg *ClientConfig) SetConnectPacketConfigurator(fn func(*paho.Connect) (*paho.Connect, error)) booldeprecated
- func (cfg *ClientConfig) SetDisConnectPacketConfigurator(fn func() *paho.Disconnect)deprecated
- func (cfg *ClientConfig) SetUsernamePassword(username string, password []byte)deprecated
- func (cfg *ClientConfig) SetWillMessage(topic string, payload []byte, qos byte, retain bool)deprecated
- type ConnackError
- type ConnectionManager
- func (c *ConnectionManager) AddOnPublishReceived(f func(PublishReceived) (bool, error)) func()
- func (c *ConnectionManager) Authenticate(ctx context.Context, a *paho.Auth) (*paho.AuthResponse, error)
- func (c *ConnectionManager) AwaitConnection(ctx context.Context) error
- func (c *ConnectionManager) Disconnect(ctx context.Context) error
- func (c *ConnectionManager) Done() <-chan struct{}
- func (c *ConnectionManager) Publish(ctx context.Context, p *paho.Publish) (*paho.PublishResponse, error)
- func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish) error
- func (c *ConnectionManager) Subscribe(ctx context.Context, s *paho.Subscribe) (*paho.Suback, error)
- func (c *ConnectionManager) TerminateConnectionForTest()
- func (c *ConnectionManager) Unsubscribe(ctx context.Context, u *paho.Unsubscribe) (*paho.Unsuback, error)
- type DisconnectError
- type PublishReceived
- type QueuePublish
- type WebSocketConfig
Constants ¶
This section is empty.
Variables ¶
var ConnectionDownError = errors.New("connection with the MQTT server is currently down")
ConnectionDownError Down will be returned when a request is made but the connection to the server is down Note: It is possible that the connection will drop between the request being made and a response being received, in which case a different error will be received (this is only returned if the connection is down at the time the request is made).
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
Backoff function to compute backoff duration for the Nth attempt attempt starts at "0" indicating the delay BEFORE the first attempt
func DefaultExponentialBackoff ¶
func DefaultExponentialBackoff() Backoff
DefaultExponentialBackoff returns an exponential backoff with default values.
The default values are:
- min delay: 5 seconds
- max delay: 10 minutes
- initial max delay: 10 seconds
- factor: 1.5
func NewConstantBackoff ¶
Creates a new backoff with constant delay (for attempt > 0, otherwise the backoff is 0).
func NewExponentialBackoff ¶
func NewExponentialBackoff( minDelay time.Duration, maxDelay time.Duration, initialMaxDelay time.Duration, factor float32, ) Backoff
NewExponentialBackoff provides a random duration within a range starting from a fixed min value up to a "moving" max value that increases exponentially for each attempt up to the specified max value.
The "moving" max is computed by multiplying the initial max value with the factor for each attemt up the specified max value.
Configuration parameters:
- minDelay - lower bound for computed backoff
- maxDelay - upper bound for computed backoff
- initialMaxDelay - initial max value which wiil incerease exponentially up to the max delay
- factor - factor for the exponential increase of initial max delay
type ClientConfig ¶
type ClientConfig struct { ServerUrls []*url.URL // URL(s) for the MQTT server (schemes supported include 'mqtt' and 'tls') TlsCfg *tls.Config // Configuration used when connecting using TLS KeepAlive uint16 // Keepalive period in seconds (the maximum time interval that is permitted to elapse between the point at which the Client finishes transmitting one MQTT Control Packet and the point it starts sending the next) CleanStartOnInitialConnection bool // Clean Start flag, if true, existing session information will be cleared on the first connection (it will be false for subsequent connections) SessionExpiryInterval uint32 // Session Expiry Interval in seconds (if 0 the Session ends when the Network Connection is closed) // Deprecated: ConnectRetryDelay is deprecated and its functionality is replaced by ReconnectBackoff. ConnectRetryDelay time.Duration // How long to wait between connection attempts (defaults to 10s) ReconnectBackoff func(int) time.Duration // How long to wait after failed connection attempt N (defaults to 10s) ConnectTimeout time.Duration // How long to wait for the connection process to complete (defaults to 10s) WebSocketCfg *WebSocketConfig // Enables customisation of the websocket connection Queue queue.Queue // Used to queue up publish messages (if nil an error will be returned if publish could not be transmitted) // Depreciated: Use ServerUrls instead (this will be used if ServerUrls is empty). Will be removed in a future release. BrokerUrls []*url.URL // AttemptConnection, if provided, will be called to establish a network connection. // The returned `conn` must support thread safe writing; most wrapped net.Conn implementations like tls.Conn // are not thread safe for writing. // To fix, use packets.NewThreadSafeConn wrapper or extend the custom net.Conn struct with sync.Locker. AttemptConnection func(context.Context, ClientConfig, *url.URL) (net.Conn, error) OnConnectionUp func(*ConnectionManager, *paho.Connack) // Called when a connection is made (including reconnection). Connection Manager passed to simplify subscriptions. Supplied function must not block. OnConnectError func(error) // Called (within a goroutine) whenever a connection attempt fails. Will wrap autopaho.ConnackError on server deny. Debug log.Logger // By default set to NOOPLogger{},set to a logger for debugging info Errors log.Logger // By default set to NOOPLogger{},set to a logger for errors PahoDebug log.Logger // debugger passed to the paho package (will default to NOOPLogger{}) PahoErrors log.Logger // error logger passed to the paho package (will default to NOOPLogger{}) ConnectUsername string ConnectPassword []byte WillMessage *paho.WillMessage WillProperties *paho.WillProperties ConnectPacketBuilder func(*paho.Connect, *url.URL) (*paho.Connect, error) // called prior to connection allowing customisation of the CONNECT packet // DisconnectPacketBuilder - called prior to disconnection allowing customisation of the DISCONNECT // packet. If the function returns nil, then no DISCONNECT packet will be passed; if nil a default packet is sent. DisconnectPacketBuilder func() *paho.Disconnect // We include the full paho.ClientConfig in order to simplify moving between the two packages. // Note that Conn will be ignored. paho.ClientConfig }
ClientConfig adds a few values, required to manage the connection, to the standard paho.ClientConfig (note that conn will be ignored)
func (*ClientConfig) ResetUsernamePassword ¶
func (cfg *ClientConfig) ResetUsernamePassword()
ResetUsernamePassword clears any configured username and password on the client configuration
Set ConnectUsername and ConnectPassword directly instead.
func (*ClientConfig) SetConnectPacketConfigurator
deprecated
func (cfg *ClientConfig) SetConnectPacketConfigurator(fn func(*paho.Connect) (*paho.Connect, error)) bool
SetConnectPacketConfigurator assigns a callback for modification of the Connect packet, called before the connection is opened, allowing the application to adjust its configuration before establishing a connection. This function should be treated as asynchronous, and expected to have no side effects.
Deprecated: Set ConnectPacketBuilder directly instead. This function exists for backwards compatibility only (and may be removed in the future).
func (*ClientConfig) SetDisConnectPacketConfigurator
deprecated
func (cfg *ClientConfig) SetDisConnectPacketConfigurator(fn func() *paho.Disconnect)
SetDisConnectPacketConfigurator assigns a callback for the provision of a DISCONNECT packet. By default, a DISCONNECT is sent to the server when Disconnect is called; setting a callback allows a custom packet to be provided, or no packet (by returning nil).
Deprecated: Set DisconnectPacketBuilder directly instead. This function exists for backwards compatibility only (and may be removed in the future).
func (*ClientConfig) SetUsernamePassword
deprecated
func (cfg *ClientConfig) SetUsernamePassword(username string, password []byte)
SetUsernamePassword configures username and password properties for the Connect packets These values are staged in the ClientConfig, and preparation of the Connect packet is deferred.
Deprecated: Set ConnectUsername and ConnectPassword directly instead.
func (*ClientConfig) SetWillMessage
deprecated
func (cfg *ClientConfig) SetWillMessage(topic string, payload []byte, qos byte, retain bool)
SetWillMessage configures the Will topic, payload, QOS and Retain facets of the client connection These values are staged in the ClientConfig, for later preparation of the Connect packet.
Deprecated: Set WillMessage and WillProperties directly instead.
type ConnackError ¶
type ConnackError struct { ReasonCode byte // CONNACK reason code Reason string // CONNACK Reason string from properties Err error // underlying error }
ConnackError will be passed when the server denies connection in CONNACK packet
func NewConnackError ¶
func NewConnackError(err error, connack *paho.Connack) *ConnackError
NewConnackError returns a new ConnackError
func (*ConnackError) Error ¶
func (c *ConnackError) Error() string
func (*ConnackError) Unwrap ¶
func (c *ConnackError) Unwrap() error
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
ConnectionManager manages the connection with the server and provides the ability to publish messages
func NewConnection ¶
func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, error)
NewConnection creates a connection manager and begins the connection process (will retry until the context is cancelled)
func (*ConnectionManager) AddOnPublishReceived ¶
func (c *ConnectionManager) AddOnPublishReceived(f func(PublishReceived) (bool, error)) func()
AddOnPublishReceived adds a function that will be called when a PUBLISH is received The new function will be called after any functions already in the list Returns a function that can be called to remove the callback
func (*ConnectionManager) Authenticate ¶
func (c *ConnectionManager) Authenticate(ctx context.Context, a *paho.Auth) (*paho.AuthResponse, error)
Authenticate is used to initiate a reauthentication of credentials with the server. This function sends the initial Auth packet to start the reauthentication then relies on the client AuthHandler managing any further requests from the server until either a successful Auth packet is passed back, or a Disconnect is received.
func (*ConnectionManager) AwaitConnection ¶
func (c *ConnectionManager) AwaitConnection(ctx context.Context) error
AwaitConnection will return when the connection comes up or the context is cancelled (only returns an error if context is cancelled). If you require more complex connection management then consider using the OnConnectionUp callback.
func (*ConnectionManager) Disconnect ¶
func (c *ConnectionManager) Disconnect(ctx context.Context) error
Disconnect closes the connection (if one is up) and shuts down any active processes before returning
func (*ConnectionManager) Done ¶
func (c *ConnectionManager) Done() <-chan struct{}
Done returns a channel that will be closed when the connection handler has shutdown cleanly Note: We cannot currently tell when the mqtt has fully shutdown (so it may still be in the process of closing down)
func (*ConnectionManager) Publish ¶
func (c *ConnectionManager) Publish(ctx context.Context, p *paho.Publish) (*paho.PublishResponse, error)
Publish is used to send a publication to the MQTT server. It is passed a pre-prepared `PUBLISH` packet and blocks waiting for the appropriate response, or for the timeout to fire. Any response message is returned from the function, along with any errors.
func (*ConnectionManager) PublishViaQueue ¶
func (c *ConnectionManager) PublishViaQueue(ctx context.Context, p *QueuePublish) error
PublishViaQueue is used to send a publication to the MQTT server via a queue (by default memory based). An error will be returned if the message could not be added to the queue, otherwise the message will be delivered in the background with no status updates available. Use this function when you wish to rely upon the libraries best-effort to transmit the message; it is anticipated that this will generally be in situations where the network link or power supply is unreliable. Messages will be written to a queue (configuring a disk-based queue is recommended) and transmitted where possible. To maximise the chance of a successful delivery:
- Leave CleanStartOnInitialConnection set to false
- Set SessionExpiryInterval such that sessions will outlive anticipated outages (this impacts inflight messages only)
- Set ClientConfig.Session to a session manager with persistent storage
- Set ClientConfig.Queue to a queue with persistent storage
func (*ConnectionManager) Subscribe ¶
Subscribe is used to send a Subscription request to the MQTT server. It is passed a pre-prepared Subscribe packet and blocks waiting for a response Suback, or for the timeout to fire. Any response Suback is returned from the function, along with any errors.
func (*ConnectionManager) TerminateConnectionForTest ¶
func (c *ConnectionManager) TerminateConnectionForTest()
TerminateConnectionForTest closes the active connection (if any). This function is intended for testing only, it simulates connection loss which supports testing QOS1 and 2 message delivery.
func (*ConnectionManager) Unsubscribe ¶
func (c *ConnectionManager) Unsubscribe(ctx context.Context, u *paho.Unsubscribe) (*paho.Unsuback, error)
Unsubscribe is used to send an Unsubscribe request to the MQTT server. It is passed a pre-prepared Unsubscribe packet and blocks waiting for a response Unsuback, or for the timeout to fire. Any response Unsuback is returned from the function, along with any errors.
type DisconnectError ¶
type DisconnectError struct {
// contains filtered or unexported fields
}
DisconnectError will be passed when the server requests disconnection (allows this error type to be detected)
func (*DisconnectError) Error ¶
func (d *DisconnectError) Error() string
type PublishReceived ¶
type PublishReceived struct { paho.PublishReceived ConnectionManager *ConnectionManager }
type QueuePublish ¶
QueuePublish holds info required to publish a message. A separate struct is used so options can be added in the future without breaking existing code
type WebSocketConfig ¶
type WebSocketConfig struct { Dialer func(url *url.URL, tlsCfg *tls.Config) *websocket.Dialer // If non-nil this will be called before each websocket connection (allows full configuration of the dialer used) Header func(url *url.URL, tlsCfg *tls.Config) http.Header // If non-nil this will be called before each connection attempt to get headers to include with request }
WebSocketConfig enables customisation of the websocket connection