Documentation ¶
Overview ¶
Package client implements a MQTT client and service for interacting with brokers.
Index ¶
- Variables
- func ClearRetainedMessage(config *Config, topic string, timeout time.Duration) error
- func ClearSession(config *Config, timeout time.Duration) error
- func PublishMessage(config *Config, msg *packet.Message, timeout time.Duration) error
- func ReceiveMessage(config *Config, topic string, qos byte, timeout time.Duration) (*packet.Message, error)
- type Callback
- type Client
- func (c *Client) Close() error
- func (c *Client) Connect(config *Config) (ConnectFuture, error)
- func (c *Client) Disconnect(timeout ...time.Duration) error
- func (c *Client) Publish(topic string, payload []byte, qos uint8, retain bool) (GenericFuture, error)
- func (c *Client) PublishMessage(msg *packet.Message) (GenericFuture, error)
- func (c *Client) Subscribe(topic string, qos uint8) (SubscribeFuture, error)
- func (c *Client) SubscribeMultiple(subscriptions []packet.Subscription) (SubscribeFuture, error)
- func (c *Client) Unsubscribe(topic string) (GenericFuture, error)
- func (c *Client) UnsubscribeMultiple(topics []string) (GenericFuture, error)
- type Config
- type ConnectFuture
- type ErrorCallback
- type GenericFuture
- type Logger
- type MessageCallback
- type OfflineCallback
- type OnlineCallback
- type Service
- func (s *Service) Publish(topic string, payload []byte, qos uint8, retain bool) GenericFuture
- func (s *Service) PublishMessage(msg *packet.Message) GenericFuture
- func (s *Service) Start(config *Config)
- func (s *Service) Stop(clearFutures bool)
- func (s *Service) Subscribe(topic string, qos uint8) SubscribeFuture
- func (s *Service) SubscribeMultiple(subscriptions []packet.Subscription) SubscribeFuture
- func (s *Service) Unsubscribe(topic string) GenericFuture
- func (s *Service) UnsubscribeMultiple(topics []string) GenericFuture
- type Session
- type SubscribeFuture
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClientAlreadyConnecting = errors.New("client already connecting")
ErrClientAlreadyConnecting is returned by Connect if there has been already a connection attempt.
var ErrClientConnectionDenied = errors.New("client connection denied")
ErrClientConnectionDenied is returned in the Callback if the connection has been reject by the broker.
var ErrClientExpectedConnack = errors.New("client expected connack")
ErrClientExpectedConnack is returned when the first received packet is not a ConnackPacket.
var ErrClientMissingID = errors.New("client missing id")
ErrClientMissingID is returned by Connect if no ClientID has been provided in the config while requesting to resume a session.
var ErrClientMissingPong = errors.New("client missing pong")
ErrClientMissingPong is returned in the Callback if the broker did not respond in time to a PingreqPacket.
var ErrClientNotConnected = errors.New("client not connected")
ErrClientNotConnected is returned by Publish, Subscribe and Unsubscribe if the client is not currently connected.
var ErrFailedSubscription = errors.New("failed subscription")
ErrFailedSubscription is returned when a submitted subscription is marked as failed when Config.ValidateSubs must be set to true.
Functions ¶
func ClearRetainedMessage ¶
ClearRetainedMessage will connect to the specified broker and send an empty retained message to force any already retained message to be cleared.
func ClearSession ¶
ClearSession will connect to the specified broker and request a clean session.
func PublishMessage ¶
PublishMessage will connect to the specified broker to publish the passed message.
Types ¶
type Callback ¶
A Callback is a function called by the client upon received messages or internal errors. An error can be returned if the callback is not already called with an error to instantly close the client and prevent it from sending any acknowledgments for the specified message.
Note: Execution of the client is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the client.
type Client ¶
type Client struct { // The session used by the client to store unacknowledged packets. Session Session // The callback to be called by the client upon receiving a message or // encountering an error while processing incoming packets. Callback Callback // The logger that is used to log low level information about packets // that have been successfully sent and received and details about the // automatic keep alive handler. Logger Logger // contains filtered or unexported fields }
A Client connects to a broker and handles the transmission of packets. It will automatically send PingreqPackets to keep the connection alive. Outgoing publish related packets will be stored in session and resent when the connection gets closed abruptly. All methods return Futures that get completed when the packets get acknowledged by the broker. Once the connection is closed all waiting futures get canceled.
Note: If clean session is set to false and there are packets in the session, messages might get completed after connecting without triggering any futures to complete.
Example ¶
done := make(chan struct{}) c := New() c.Callback = func(msg *packet.Message, err error) error { if err != nil { panic(err) } fmt.Printf("%s: %s\n", msg.Topic, msg.Payload) close(done) return nil } config := NewConfigWithClientID("mqtt://0.0.0.0", "gomqtt/client") connectFuture, err := c.Connect(config) if err != nil { panic(err) } err = connectFuture.Wait(10 * time.Second) if err != nil { panic(err) } subscribeFuture, err := c.Subscribe("test", 0) if err != nil { panic(err) } err = subscribeFuture.Wait(10 * time.Second) if err != nil { panic(err) } publishFuture, err := c.Publish("test", []byte("test"), 0, false) if err != nil { panic(err) } err = publishFuture.Wait(10 * time.Second) if err != nil { panic(err) } <-done err = c.Disconnect() if err != nil { panic(err) }
Output: test: test
func (*Client) Close ¶
Close closes the client immediately without sending a DisconnectPacket and waiting for outgoing transmissions to finish.
func (*Client) Connect ¶
func (c *Client) Connect(config *Config) (ConnectFuture, error)
Connect opens the connection to the broker and sends a ConnectPacket. It will return a ConnectFuture that gets completed once a ConnackPacket has been received. If the ConnectPacket couldn't be transmitted it will return an error.
func (*Client) Disconnect ¶
Disconnect will send a DisconnectPacket and close the connection.
If a timeout is specified, the client will wait the specified amount of time for all queued futures to complete or cancel. If no timeout is specified it will not wait at all.
func (*Client) Publish ¶
func (c *Client) Publish(topic string, payload []byte, qos uint8, retain bool) (GenericFuture, error)
Publish will send a PublishPacket containing the passed parameters. It will return a PublishFuture that gets completed once the quality of service flow has been completed.
func (*Client) PublishMessage ¶
func (c *Client) PublishMessage(msg *packet.Message) (GenericFuture, error)
PublishMessage will send a PublishPacket containing the passed message. It will return a PublishFuture that gets completed once the quality of service flow has been completed.
func (*Client) Subscribe ¶
func (c *Client) Subscribe(topic string, qos uint8) (SubscribeFuture, error)
Subscribe will send a SubscribePacket containing one topic to subscribe. It will return a SubscribeFuture that gets completed once a SubackPacket has been received.
func (*Client) SubscribeMultiple ¶
func (c *Client) SubscribeMultiple(subscriptions []packet.Subscription) (SubscribeFuture, error)
SubscribeMultiple will send a SubscribePacket containing multiple topics to subscribe. It will return a SubscribeFuture that gets completed once a SubackPacket has been received.
func (*Client) Unsubscribe ¶
func (c *Client) Unsubscribe(topic string) (GenericFuture, error)
Unsubscribe will send a UnsubscribePacket containing one topic to unsubscribe. It will return a UnsubscribeFuture that gets completed once a UnsubackPacket has been received.
func (*Client) UnsubscribeMultiple ¶
func (c *Client) UnsubscribeMultiple(topics []string) (GenericFuture, error)
UnsubscribeMultiple will send a UnsubscribePacket containing multiple topics to unsubscribe. It will return a UnsubscribeFuture that gets completed once a UnsubackPacket has been received.
type Config ¶
type Config struct { Dialer *transport.Dialer BrokerURL string ClientID string CleanSession bool KeepAlive string WillMessage *packet.Message ValidateSubs bool }
A Config holds information about establishing a connection to a broker.
func NewConfigWithClientID ¶
NewConfigWithClientID creates a new Config using the specified URL and client ID.
type ConnectFuture ¶
type ConnectFuture interface { GenericFuture // SessionPresent will return whether a session was present. SessionPresent() bool // ReturnCode will return the connack code returned by the broker. ReturnCode() packet.ConnackCode }
A ConnectFuture is returned by the connect method.
type ErrorCallback ¶
type ErrorCallback func(error)
An ErrorCallback is a function that is called when an error occurred.
Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.
type GenericFuture ¶ added in v0.2.0
type GenericFuture interface { // Wait will block until the future is completed or canceled. It will return // future.ErrCanceled if the future gets canceled. If the timeout is reached, // future.ErrTimeoutExceeded is returned. // // Note: Wait will not return any Client related errors. Wait(timeout time.Duration) error }
A GenericFuture is returned by publish and unsubscribe methods.
type Logger ¶
type Logger func(msg string)
A Logger is a function called by the client to log activity.
type MessageCallback ¶
A MessageCallback is a function that is called when a message is received. If an error is returned the underlying client will be prevented from acknowledging the specified message and closes immediately.
Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.
type OfflineCallback ¶
type OfflineCallback func()
An OfflineCallback is a function that is called when the service is disconnected.
Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.
type OnlineCallback ¶
type OnlineCallback func(resumed bool)
An OnlineCallback is a function that is called when the service is connected.
Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.
type Service ¶
type Service struct { // The session used by the client to store unacknowledged packets. Session Session // The callback that is used to notify that the service is online. OnlineCallback OnlineCallback // The callback to be called by the service upon receiving a message. MessageCallback MessageCallback // The callback to be called by the service upon encountering an error. ErrorCallback ErrorCallback // The callback that is used to notify that the service is offline. OfflineCallback OfflineCallback // The logger that is used to log write low level information like packets // that have ben successfully sent and received, details about the // automatic keep alive handler, reconnection and occurring errors. Logger Logger // The minimum delay between reconnects. // // Note: The value must be changed before calling Start. MinReconnectDelay time.Duration // The maximum delay between reconnects. // // Note: The value must be changed before calling Start. MaxReconnectDelay time.Duration // The allowed timeout until a connection attempt is canceled. ConnectTimeout time.Duration // The allowed timeout until a connection is forcefully closed. DisconnectTimeout time.Duration // contains filtered or unexported fields }
Service is an abstraction for Client that provides a stable interface to the application, while it automatically connects and reconnects clients in the background. Errors are not returned but emitted using the ErrorCallback. All methods return Futures that get completed once the acknowledgements are received. Once the services is stopped all waiting futures get canceled.
Note: If clean session is false and there are packets in the store, messages might get completed after starting without triggering any futures to complete.
Example ¶
wait := make(chan struct{}) done := make(chan struct{}) config := NewConfigWithClientID("mqtt://0.0.0.0", "gomqtt/service") config.CleanSession = false s := NewService() s.OnlineCallback = func(resumed bool) { fmt.Println("online!") fmt.Printf("resumed: %v\n", resumed) } s.OfflineCallback = func() { fmt.Println("offline!") close(done) } s.MessageCallback = func(msg *packet.Message) error { fmt.Printf("message: %s - %s\n", msg.Topic, msg.Payload) close(wait) return nil } err := ClearSession(config, 1*time.Second) if err != nil { panic(err) } s.Start(config) s.Subscribe("test", 0).Wait(10 * time.Second) s.Publish("test", []byte("test"), 0, false) <-wait s.Stop(true) <-done
Output: online! resumed: false message: test - test offline!
func NewService ¶
NewService allocates and returns a new service. The optional parameter queueSize specifies how many Subscribe, Unsubscribe and Publish commands can be queued up before actually sending them on the wire. The default queueSize is 100.
func (*Service) Publish ¶
Publish will send a PublishPacket containing the passed parameters. It will return a PublishFuture that gets completed once the quality of service flow has been completed.
func (*Service) PublishMessage ¶
func (s *Service) PublishMessage(msg *packet.Message) GenericFuture
PublishMessage will send a PublishPacket containing the passed message. It will return a PublishFuture that gets completed once the quality of service flow has been completed.
func (*Service) Start ¶
Start will start the service with the specified configuration. From now on the service will automatically reconnect on any error until Stop is called.
func (*Service) Stop ¶
Stop will disconnect the client if online and cancel all futures if requested. After the service is stopped in can be started again.
Note: You should clear the futures on the last stop before exiting to ensure that all goroutines return that wait on futures.
func (*Service) Subscribe ¶
func (s *Service) Subscribe(topic string, qos uint8) SubscribeFuture
Subscribe will send a SubscribePacket containing one topic to subscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.
func (*Service) SubscribeMultiple ¶
func (s *Service) SubscribeMultiple(subscriptions []packet.Subscription) SubscribeFuture
SubscribeMultiple will send a SubscribePacket containing multiple topics to subscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.
func (*Service) Unsubscribe ¶
func (s *Service) Unsubscribe(topic string) GenericFuture
Unsubscribe will send a UnsubscribePacket containing one topic to unsubscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.
func (*Service) UnsubscribeMultiple ¶
func (s *Service) UnsubscribeMultiple(topics []string) GenericFuture
UnsubscribeMultiple will send a UnsubscribePacket containing multiple topics to unsubscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.
type Session ¶
type Session interface { // NextID will return the next id for outgoing packets. NextID() packet.ID // SavePacket will store a packet in the session. An eventual existing // packet with the same id gets quietly overwritten. SavePacket(session.Direction, packet.GenericPacket) error // LookupPacket will retrieve a packet from the session using a packet id. LookupPacket(session.Direction, packet.ID) (packet.GenericPacket, error) // DeletePacket will remove a packet from the session. The method must not // return an error if no packet with the specified id does exists. DeletePacket(session.Direction, packet.ID) error // AllPackets will return all packets currently saved in the session. AllPackets(session.Direction) ([]packet.GenericPacket, error) // Reset will completely reset the session. Reset() error }
A Session is used to persist incoming and outgoing packets.
type SubscribeFuture ¶
type SubscribeFuture interface { GenericFuture // ReturnCodes will return the suback codes returned by the broker. ReturnCodes() []uint8 }
A SubscribeFuture is returned by the subscribe methods.