client

package
v0.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2021 License: Apache-2.0 Imports: 14 Imported by: 14

Documentation

Overview

Package client implements a MQTT client and service for interacting with brokers.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClientAlreadyConnecting = errors.New("client already connecting")

ErrClientAlreadyConnecting is returned by Connect if another connection attempt is already underway.

View Source
var ErrClientConnectionDenied = errors.New("client connection denied")

ErrClientConnectionDenied is returned to the Callback if the connection has been reject by the broker.

View Source
var ErrClientExpectedConnack = errors.New("client expected connack")

ErrClientExpectedConnack is returned when the first received packet is not a Connack.

View Source
var ErrClientMissingID = errors.New("client missing id")

ErrClientMissingID is returned by Connect if no ClientID has been provided in the config while requesting to resume a session.

View Source
var ErrClientMissingPong = errors.New("client missing pong")

ErrClientMissingPong is returned to the Callback if the broker did not respond in time to a Pingreq.

View Source
var ErrClientNotConnected = errors.New("client not connected")

ErrClientNotConnected is returned by Publish, Subscribe and Unsubscribe if the client is not currently connected.

View Source
var ErrFailedSubscription = errors.New("failed subscription")

ErrFailedSubscription is returned when a subscription attempt failed and Config.ValidateSubs has been set to true.

Functions

func ClearRetainedMessage

func ClearRetainedMessage(config *Config, topic string, timeout time.Duration) error

ClearRetainedMessage will connect to the specified broker and send an empty retained message to force any already retained message to be cleared.

func ClearSession

func ClearSession(config *Config, timeout time.Duration) error

ClearSession will connect to the specified broker and request a clean session.

func PublishMessage

func PublishMessage(config *Config, msg *packet.Message, timeout time.Duration) error

PublishMessage will connect to the specified broker to publish the passed message.

func ReceiveMessage

func ReceiveMessage(config *Config, topic string, qos packet.QOS, timeout time.Duration) (*packet.Message, error)

ReceiveMessage will connect to the specified broker and issue a subscription for the specified topic and return the first message received.

Types

type Client

type Client struct {

	// The session used by the client to store unacknowledged packets.
	Session Session

	// The Callback is called by the client upon received messages or internal
	// errors. An error can be returned if the callback is not already called
	// with an error to instantly close the client and prevent it from sending
	// any acknowledgments for the specified message. In this case the callback
	// is called again with the error.
	//
	// Note: Execution of the client is stopped before the callback is called and
	// resumed after the callback returns. This means that waiting on a future
	// inside the callback will deadlock the client.
	Callback func(msg *packet.Message, err error) error

	// The logger that is used to log low level information about packets
	// that have been successfully sent and received and details about the
	// automatic keep alive handler.
	Logger func(msg string)
	// contains filtered or unexported fields
}

A Client connects to a broker and handles the transmission of packets. It will automatically send PingreqPackets to keep the connection alive. Outgoing publish related packets will be stored in session and resent when the connection gets closed abruptly. All methods return Futures that get completed when the packets get acknowledged by the broker. Once the connection is closed all waiting futures get canceled.

Note: If clean session is set to false and there are packets in the session, messages might get completed after connecting without triggering any futures to complete.

Example
done := make(chan struct{})

c := New()

c.Callback = func(msg *packet.Message, err error) error {
	if err != nil {
		panic(err)
	}

	fmt.Printf("%s: %s\n", msg.Topic, msg.Payload)
	close(done)

	return nil
}

config := NewConfigWithClientID("mqtt://0.0.0.0", "gomqtt/client")

connectFuture, err := c.Connect(config)
if err != nil {
	panic(err)
}

err = connectFuture.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

subscribeFuture, err := c.Subscribe("test", 0)
if err != nil {
	panic(err)
}

err = subscribeFuture.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

publishFuture, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
	panic(err)
}

err = publishFuture.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

<-done

err = c.Disconnect()
if err != nil {
	panic(err)
}
Output:

test: test

func New

func New() *Client

New returns a new client that by default uses a fresh MemorySession.

func (*Client) Close

func (c *Client) Close() error

Close closes the client immediately without sending a Disconnect packet and waiting for outgoing transmissions to finish.

func (*Client) Connect

func (c *Client) Connect(config *Config) (ConnectFuture, error)

Connect opens the connection to the broker and sends a Connect packet. It will return a ConnectFuture that gets completed once a Connack has been received. If the Connect packet couldn't be transmitted it will return an error.

func (*Client) Disconnect

func (c *Client) Disconnect(timeout ...time.Duration) error

Disconnect will send a Disconnect packet and close the connection.

If a timeout is specified, the client will wait the specified amount of time for all queued futures to complete or cancel. If no timeout is specified it will not wait at all.

func (*Client) Publish

func (c *Client) Publish(topic string, payload []byte, qos packet.QOS, retain bool) (GenericFuture, error)

Publish will send a Publish packet containing the passed parameters. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Client) PublishMessage

func (c *Client) PublishMessage(msg *packet.Message) (GenericFuture, error)

PublishMessage will send a Publish containing the passed message. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Client) Subscribe

func (c *Client) Subscribe(topic string, qos packet.QOS) (SubscribeFuture, error)

Subscribe will send a Subscribe packet containing one topic to subscribe. It will return a SubscribeFuture that gets completed once a Suback packet has been received.

func (*Client) SubscribeMultiple

func (c *Client) SubscribeMultiple(subscriptions []packet.Subscription) (SubscribeFuture, error)

SubscribeMultiple will send a Subscribe packet containing multiple topics to subscribe. It will return a SubscribeFuture that gets completed once a Suback packet has been received.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic string) (GenericFuture, error)

Unsubscribe will send a Unsubscribe packet containing one topic to unsubscribe. It will return a UnsubscribeFuture that gets completed once an Unsuback packet has been received.

func (*Client) UnsubscribeMultiple

func (c *Client) UnsubscribeMultiple(topics []string) (GenericFuture, error)

UnsubscribeMultiple will send a Unsubscribe packet containing multiple topics to unsubscribe. It will return a UnsubscribeFuture that gets completed once an Unsuback packet has been received.

type Config

type Config struct {
	// Dialer can be set to use a custom dialer.
	Dialer Dialer

	// BrokerURL is the url that is used to infer options to open the connection.
	BrokerURL string

	// ClientID can be set to the client's id.
	ClientID string

	// CleanSession can be set to request a clean session.
	CleanSession bool

	// KeepAlive should be time a duration string e.g. "30s".
	KeepAlive string

	// Will message is registered on the broker upon connect if set.
	WillMessage *packet.Message

	// ValidateSubs will cause the client to fail if subscriptions failed.
	ValidateSubs bool

	// ReadLimit defines the maximum size of a packet that can be received.
	ReadLimit int64

	// MaxWriteDelay defines the maximum allowed delay when flushing the
	// underlying buffered writer.
	MaxWriteDelay time.Duration

	// AlwaysAnnounceOnPublish defines when the message callback is called.
	// - QOS 0 and 1: Callback always occurs on reception of Publish.
	// - QOS 2 and AlwaysAnnounceOnPublish == false: Callback occurs on
	//   reception of PubRel and returning and error in the callback will close
	//   the connection before PubComp is sent.
	// - QOS 2 and AlwaysAnnounceOnPublish == true: Callback occurs on reception
	//   of Publish and returning an error in the callback will close the
	//   connection, preventing PubRec being sent, thus ensuring the redelivery
	//   of Publish.
	AlwaysAnnounceOnPublish bool
}

A Config holds information about establishing a connection to a broker.

func NewConfig

func NewConfig(url string) *Config

NewConfig creates a new Config using the specified URL.

func NewConfigWithClientID

func NewConfigWithClientID(url, id string) *Config

NewConfigWithClientID creates a new Config using the specified URL and client ID.

type ConnectFuture

type ConnectFuture interface {
	GenericFuture

	// SessionPresent will return whether a session was present.
	SessionPresent() bool

	// ReturnCode will return the connack code returned by the broker.
	ReturnCode() packet.ConnackCode
}

A ConnectFuture is returned by the connect method.

type Dialer added in v0.9.3

type Dialer interface {
	Dial(urlString string) (transport.Conn, error)
}

Dialer defines the dialer used by a client.

type GenericFuture added in v0.2.0

type GenericFuture interface {
	// Wait will wait the given amount of time and return whether the future has
	// been completed, canceled or the request timed out. If no time has been
	// provided the wait will never time out.
	//
	// Note: Wait will not return any Client related errors.
	Wait(timeout time.Duration) error
}

A GenericFuture is returned by publish and unsubscribe methods.

type Service

type Service struct {
	// The session used by the client to store unacknowledged packets.
	Session Session

	// The OnlineCallback is called when the service is connected, but before
	// topics are resubscribed.
	//
	// Note: Execution of the service is resumed after the callback returns.
	// This means that waiting on a future inside the callback will deadlock the
	// service.
	OnlineCallback func(resumed bool)

	// The MessageCallback is called when a message is received. If an error is
	// returned the underlying client will be prevented from acknowledging the
	// specified message and closed immediately. The error is logged and a
	// reconnect attempt initiated.
	//
	// Note: Execution of the service is resumed after the callback returns.
	// This means that waiting on a future inside the callback will deadlock the
	// service.
	MessageCallback func(*packet.Message) error

	// The ErrorCallback is called when an error occurred.
	//
	// Note: Execution of the service is resumed after the callback returns.
	// This means that waiting on a future inside the callback will deadlock the
	// service.
	ErrorCallback func(error)

	// The OfflineCallback is called when the service is disconnected.
	//
	// Note: Execution of the service is resumed after the callback returns.
	// This means that waiting on a future inside the callback will deadlock the
	// service.
	OfflineCallback func()

	// The logger that is used to log write low level information like packets
	// that have been successfully sent and received, details about the automatic
	// keep alive handler, reconnection and occurring errors.
	Logger func(msg string)

	// The minimum delay between reconnects.
	//
	// Note: The value must be changed before calling Start.
	MinReconnectDelay time.Duration

	// The maximum delay between reconnects.
	//
	// Note: The value must be changed before calling Start.
	MaxReconnectDelay time.Duration

	// The allowed timeout until a connection attempt is canceled.
	ConnectTimeout time.Duration

	// The allowed timeout until a connection is forcefully closed.
	DisconnectTimeout time.Duration

	// The allowed timeout until a subscribe action is forcefully closed during
	// reconnect.
	ResubscribeTimeout time.Duration

	// Whether to resubscribe all subscriptions after reconnecting. Can be
	// disabled if the broker supports persistent sessions and the client is
	// configured to request one.
	ResubscribeAllSubscriptions bool

	// The time after which the queueing of a command is aborted.
	QueueTimeout time.Duration
	// contains filtered or unexported fields
}

Service is an abstraction for Client that provides a stable interface to the application, while it automatically connects and reconnects clients in the background. Errors are not returned but emitted using the ErrorCallback. All methods return Futures that get completed once the acknowledgements are received. Once the service is stopped all waiting futures get canceled.

Note: If clean session is false and there are packets in the store, messages might get completed after starting without triggering any futures to complete.

Example
wait := make(chan struct{})
done := make(chan struct{})

config := NewConfigWithClientID("mqtt://0.0.0.0", "gomqtt/service")
config.CleanSession = false

s := NewService()

s.OnlineCallback = func(resumed bool) {
	fmt.Println("online!")
	fmt.Printf("resumed: %v\n", resumed)
}

s.OfflineCallback = func() {
	fmt.Println("offline!")
	close(done)
}

s.MessageCallback = func(msg *packet.Message) error {
	fmt.Printf("message: %s - %s\n", msg.Topic, msg.Payload)
	close(wait)
	return nil
}

err := ClearSession(config, 1*time.Second)
if err != nil {
	panic(err)
}

s.Start(config)

err = s.Subscribe("test", 0).Wait(10 * time.Second)
if err != nil {
	panic(err)
}

err = s.Publish("test", []byte("test"), 0, false).Wait(10 * time.Second)
if err != nil {
	panic(err)
}

<-wait

s.Stop(true)

<-done
Output:

online!
resumed: false
message: test - test
offline!

func NewService

func NewService(queueSize ...int) *Service

NewService allocates and returns a new service. The optional parameter queueSize specifies how many Subscribe, Unsubscribe and Publish commands can be queued up before actually sending them on the wire. The default queueSize is 100.

func (*Service) Publish

func (s *Service) Publish(topic string, payload []byte, qos packet.QOS, retain bool) GenericFuture

Publish will send a Publish packet containing the passed parameters. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Service) PublishMessage

func (s *Service) PublishMessage(msg *packet.Message) GenericFuture

PublishMessage will send a Publish packet containing the passed message. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Service) Start

func (s *Service) Start(config *Config) bool

Start will start the service with the specified configuration. From now on the service will automatically reconnect on any error until Stop is called. It returns false if the service was already started.

func (*Service) Stop

func (s *Service) Stop(clearFutures bool) bool

Stop will disconnect the client if online and cancel all futures if requested. After the service is stopped in can be started again. It returns false if the service was not running.

Note: You should clear the futures on the last stop before exiting to ensure that all goroutines return that wait on futures.

func (*Service) Subscribe

func (s *Service) Subscribe(topic string, qos packet.QOS) SubscribeFuture

Subscribe will send a Subscribe packet containing one topic to subscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

func (*Service) SubscribeMultiple

func (s *Service) SubscribeMultiple(subscriptions []packet.Subscription) SubscribeFuture

SubscribeMultiple will send a Subscribe packet containing multiple topics to subscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

func (*Service) Unsubscribe

func (s *Service) Unsubscribe(topic string) GenericFuture

Unsubscribe will send a Unsubscribe packet containing one topic to unsubscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

func (*Service) UnsubscribeMultiple

func (s *Service) UnsubscribeMultiple(topics []string) GenericFuture

UnsubscribeMultiple will send a Unsubscribe packet containing multiple topics to unsubscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

type Session

type Session interface {
	// NextID will return the next id for outgoing packets.
	NextID() packet.ID

	// SavePacket will store a packet in the session. An eventual existing
	// packet with the same id gets quietly overwritten.
	SavePacket(session.Direction, packet.Generic) error

	// LookupPacket will retrieve a packet from the session using a packet id.
	LookupPacket(session.Direction, packet.ID) (packet.Generic, error)

	// DeletePacket will remove a packet from the session. The method must not
	// return an error if no packet with the specified id does exist.
	DeletePacket(session.Direction, packet.ID) error

	// AllPackets will return all packets currently saved in the session.
	AllPackets(session.Direction) ([]packet.Generic, error)

	// Reset will completely reset the session.
	Reset() error
}

A Session is used to persist incoming and outgoing packets.

type SubscribeFuture

type SubscribeFuture interface {
	GenericFuture

	// ReturnCodes will return the suback codes returned by the broker.
	ReturnCodes() []packet.QOS
}

A SubscribeFuture is returned by the subscribe methods.

type Tracker added in v0.8.2

type Tracker struct {
	// contains filtered or unexported fields
}

A Tracker keeps track of keep alive intervals.

func NewTracker added in v0.8.2

func NewTracker(timeout time.Duration) *Tracker

NewTracker returns a new tracker.

func (*Tracker) Pending added in v0.8.2

func (t *Tracker) Pending() bool

Pending returns if pings are pending.

func (*Tracker) Ping added in v0.8.2

func (t *Tracker) Ping()

Ping marks a ping.

func (*Tracker) Pong added in v0.8.2

func (t *Tracker) Pong()

Pong marks a pong.

func (*Tracker) Reset added in v0.8.2

func (t *Tracker) Reset()

Reset will reset the tracker.

func (*Tracker) Window added in v0.8.2

func (t *Tracker) Window() time.Duration

Window returns the time until a new ping should be sent.

Directories

Path Synopsis
Package future implements a generic future handling system.
Package future implements a generic future handling system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL