client

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2018 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package client implements a MQTT client and service for interacting with brokers.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClientAlreadyConnecting = errors.New("client already connecting")

ErrClientAlreadyConnecting is returned by Connect if there has been already a connection attempt.

View Source
var ErrClientConnectionDenied = errors.New("client connection denied")

ErrClientConnectionDenied is returned in the Callback if the connection has been reject by the broker.

View Source
var ErrClientExpectedConnack = errors.New("client expected connack")

ErrClientExpectedConnack is returned when the first received packet is not a Connack.

View Source
var ErrClientMissingID = errors.New("client missing id")

ErrClientMissingID is returned by Connect if no ClientID has been provided in the config while requesting to resume a session.

View Source
var ErrClientMissingPong = errors.New("client missing pong")

ErrClientMissingPong is returned in the Callback if the broker did not respond in time to a Pingreq.

View Source
var ErrClientNotConnected = errors.New("client not connected")

ErrClientNotConnected is returned by Publish, Subscribe and Unsubscribe if the client is not currently connected.

View Source
var ErrFailedSubscription = errors.New("failed subscription")

ErrFailedSubscription is returned when a submitted subscription is marked as failed when Config.ValidateSubs must be set to true.

Functions

func ClearRetainedMessage

func ClearRetainedMessage(config *Config, topic string, timeout time.Duration) error

ClearRetainedMessage will connect to the specified broker and send an empty retained message to force any already retained message to be cleared.

func ClearSession

func ClearSession(config *Config, timeout time.Duration) error

ClearSession will connect to the specified broker and request a clean session.

func PublishMessage

func PublishMessage(config *Config, msg *packet.Message, timeout time.Duration) error

PublishMessage will connect to the specified broker to publish the passed message.

func ReceiveMessage

func ReceiveMessage(config *Config, topic string, qos byte, timeout time.Duration) (*packet.Message, error)

ReceiveMessage will connect to the specified broker and issue a subscription for the specified topic and return the first message received.

Types

type Callback

type Callback func(msg *packet.Message, err error) error

A Callback is a function called by the client upon received messages or internal errors. An error can be returned if the callback is not already called with an error to instantly close the client and prevent it from sending any acknowledgments for the specified message.

Note: Execution of the client is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the client.

type Client

type Client struct {

	// The session used by the client to store unacknowledged packets.
	Session Session

	// The callback to be called by the client upon receiving a message or
	// encountering an error while processing incoming packets.
	Callback Callback

	// The logger that is used to log low level information about packets
	// that have been successfully sent and received and details about the
	// automatic keep alive handler.
	Logger Logger
	// contains filtered or unexported fields
}

A Client connects to a broker and handles the transmission of packets. It will automatically send PingreqPackets to keep the connection alive. Outgoing publish related packets will be stored in session and resent when the connection gets closed abruptly. All methods return Futures that get completed when the packets get acknowledged by the broker. Once the connection is closed all waiting futures get canceled.

Note: If clean session is set to false and there are packets in the session, messages might get completed after connecting without triggering any futures to complete.

Example
done := make(chan struct{})

c := New()

c.Callback = func(msg *packet.Message, err error) error {
	if err != nil {
		panic(err)
	}

	fmt.Printf("%s: %s\n", msg.Topic, msg.Payload)
	close(done)

	return nil
}

config := NewConfigWithClientID("mqtt://0.0.0.0", "gomqtt/client")

connectFuture, err := c.Connect(config)
if err != nil {
	panic(err)
}

err = connectFuture.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

subscribeFuture, err := c.Subscribe("test", 0)
if err != nil {
	panic(err)
}

err = subscribeFuture.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

publishFuture, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
	panic(err)
}

err = publishFuture.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

<-done

err = c.Disconnect()
if err != nil {
	panic(err)
}
Output:

test: test

func New

func New() *Client

New returns a new client that by default uses a fresh MemorySession.

func (*Client) Close

func (c *Client) Close() error

Close closes the client immediately without sending a Disconnect packet and waiting for outgoing transmissions to finish.

func (*Client) Connect

func (c *Client) Connect(config *Config) (ConnectFuture, error)

Connect opens the connection to the broker and sends a Connect packet. It will return a ConnectFuture that gets completed once a Connack has been received. If the Connect packet couldn't be transmitted it will return an error.

func (*Client) Disconnect

func (c *Client) Disconnect(timeout ...time.Duration) error

Disconnect will send a Disconnect packet and close the connection.

If a timeout is specified, the client will wait the specified amount of time for all queued futures to complete or cancel. If no timeout is specified it will not wait at all.

func (*Client) Publish

func (c *Client) Publish(topic string, payload []byte, qos uint8, retain bool) (GenericFuture, error)

Publish will send a Publish packet containing the passed parameters. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Client) PublishMessage

func (c *Client) PublishMessage(msg *packet.Message) (GenericFuture, error)

PublishMessage will send a Publish containing the passed message. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Client) Subscribe

func (c *Client) Subscribe(topic string, qos uint8) (SubscribeFuture, error)

Subscribe will send a Subscribe packet containing one topic to subscribe. It will return a SubscribeFuture that gets completed once a Suback packet has been received.

func (*Client) SubscribeMultiple

func (c *Client) SubscribeMultiple(subscriptions []packet.Subscription) (SubscribeFuture, error)

SubscribeMultiple will send a Subscribe packet containing multiple topics to subscribe. It will return a SubscribeFuture that gets completed once a Suback packet has been received.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic string) (GenericFuture, error)

Unsubscribe will send a Unsubscribe packet containing one topic to unsubscribe. It will return a UnsubscribeFuture that gets completed once an Unsuback packet has been received.

func (*Client) UnsubscribeMultiple

func (c *Client) UnsubscribeMultiple(topics []string) (GenericFuture, error)

UnsubscribeMultiple will send a Unsubscribe packet containing multiple topics to unsubscribe. It will return a UnsubscribeFuture that gets completed once an Unsuback packet has been received.

type Config

type Config struct {
	Dialer       *transport.Dialer
	BrokerURL    string
	ClientID     string
	CleanSession bool
	KeepAlive    string
	WillMessage  *packet.Message
	ValidateSubs bool
}

A Config holds information about establishing a connection to a broker.

func NewConfig

func NewConfig(url string) *Config

NewConfig creates a new Config using the specified URL.

func NewConfigWithClientID

func NewConfigWithClientID(url, id string) *Config

NewConfigWithClientID creates a new Config using the specified URL and client ID.

type ConnectFuture

type ConnectFuture interface {
	GenericFuture

	// SessionPresent will return whether a session was present.
	SessionPresent() bool

	// ReturnCode will return the connack code returned by the broker.
	ReturnCode() packet.ConnackCode
}

A ConnectFuture is returned by the connect method.

type ErrorCallback

type ErrorCallback func(error)

An ErrorCallback is a function that is called when an error occurred.

Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.

type GenericFuture added in v0.2.0

type GenericFuture interface {
	// Wait will block until the future is completed or canceled. It will return
	// future.ErrCanceled if the future gets canceled. If the timeout is reached,
	// future.ErrTimeoutExceeded is returned.
	//
	// Note: Wait will not return any Client related errors.
	Wait(timeout time.Duration) error
}

A GenericFuture is returned by publish and unsubscribe methods.

type Logger

type Logger func(msg string)

A Logger is a function called by the client to log activity.

type MessageCallback

type MessageCallback func(*packet.Message) error

A MessageCallback is a function that is called when a message is received. If an error is returned the underlying client will be prevented from acknowledging the specified message and closes immediately.

Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.

type OfflineCallback

type OfflineCallback func()

An OfflineCallback is a function that is called when the service is disconnected.

Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.

type OnlineCallback

type OnlineCallback func(resumed bool)

An OnlineCallback is a function that is called when the service is connected.

Note: Execution of the service is resumed after the callback returns. This means that waiting on a future inside the callback will deadlock the service.

type Service

type Service struct {

	// The session used by the client to store unacknowledged packets.
	Session Session

	// The callback that is used to notify that the service is online.
	OnlineCallback OnlineCallback

	// The callback to be called by the service upon receiving a message.
	MessageCallback MessageCallback

	// The callback to be called by the service upon encountering an error.
	ErrorCallback ErrorCallback

	// The callback that is used to notify that the service is offline.
	OfflineCallback OfflineCallback

	// The logger that is used to log write low level information like packets
	// that have ben successfully sent and received, details about the
	// automatic keep alive handler, reconnection and occurring errors.
	Logger Logger

	// The minimum delay between reconnects.
	//
	// Note: The value must be changed before calling Start.
	MinReconnectDelay time.Duration

	// The maximum delay between reconnects.
	//
	// Note: The value must be changed before calling Start.
	MaxReconnectDelay time.Duration

	// The allowed timeout until a connection attempt is canceled.
	ConnectTimeout time.Duration

	// The allowed timeout until a connection is forcefully closed.
	DisconnectTimeout time.Duration
	// contains filtered or unexported fields
}

Service is an abstraction for Client that provides a stable interface to the application, while it automatically connects and reconnects clients in the background. Errors are not returned but emitted using the ErrorCallback. All methods return Futures that get completed once the acknowledgements are received. Once the services is stopped all waiting futures get canceled.

Note: If clean session is false and there are packets in the store, messages might get completed after starting without triggering any futures to complete.

Example
wait := make(chan struct{})
done := make(chan struct{})

config := NewConfigWithClientID("mqtt://0.0.0.0", "gomqtt/service")
config.CleanSession = false

s := NewService()

s.OnlineCallback = func(resumed bool) {
	fmt.Println("online!")
	fmt.Printf("resumed: %v\n", resumed)
}

s.OfflineCallback = func() {
	fmt.Println("offline!")
	close(done)
}

s.MessageCallback = func(msg *packet.Message) error {
	fmt.Printf("message: %s - %s\n", msg.Topic, msg.Payload)
	close(wait)
	return nil
}

err := ClearSession(config, 1*time.Second)
if err != nil {
	panic(err)
}

s.Start(config)

s.Subscribe("test", 0).Wait(10 * time.Second)

s.Publish("test", []byte("test"), 0, false)

<-wait

s.Stop(true)

<-done
Output:

online!
resumed: false
message: test - test
offline!

func NewService

func NewService(queueSize ...int) *Service

NewService allocates and returns a new service. The optional parameter queueSize specifies how many Subscribe, Unsubscribe and Publish commands can be queued up before actually sending them on the wire. The default queueSize is 100.

func (*Service) Publish

func (s *Service) Publish(topic string, payload []byte, qos uint8, retain bool) GenericFuture

Publish will send a Publish packet containing the passed parameters. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Service) PublishMessage

func (s *Service) PublishMessage(msg *packet.Message) GenericFuture

PublishMessage will send a Publish packet containing the passed message. It will return a PublishFuture that gets completed once the quality of service flow has been completed.

func (*Service) Start

func (s *Service) Start(config *Config)

Start will start the service with the specified configuration. From now on the service will automatically reconnect on any error until Stop is called.

func (*Service) Stop

func (s *Service) Stop(clearFutures bool)

Stop will disconnect the client if online and cancel all futures if requested. After the service is stopped in can be started again.

Note: You should clear the futures on the last stop before exiting to ensure that all goroutines return that wait on futures.

func (*Service) Subscribe

func (s *Service) Subscribe(topic string, qos uint8) SubscribeFuture

Subscribe will send a Subscribe packet containing one topic to subscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

func (*Service) SubscribeMultiple

func (s *Service) SubscribeMultiple(subscriptions []packet.Subscription) SubscribeFuture

SubscribeMultiple will send a Subscribe packet containing multiple topics to subscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

func (*Service) Unsubscribe

func (s *Service) Unsubscribe(topic string) GenericFuture

Unsubscribe will send a Unsubscribe packet containing one topic to unsubscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

func (*Service) UnsubscribeMultiple

func (s *Service) UnsubscribeMultiple(topics []string) GenericFuture

UnsubscribeMultiple will send a Unsubscribe packet containing multiple topics to unsubscribe. It will return a SubscribeFuture that gets completed once the acknowledgements have been received.

type Session

type Session interface {
	// NextID will return the next id for outgoing packets.
	NextID() packet.ID

	// SavePacket will store a packet in the session. An eventual existing
	// packet with the same id gets quietly overwritten.
	SavePacket(session.Direction, packet.Generic) error

	// LookupPacket will retrieve a packet from the session using a packet id.
	LookupPacket(session.Direction, packet.ID) (packet.Generic, error)

	// DeletePacket will remove a packet from the session. The method must not
	// return an error if no packet with the specified id does exists.
	DeletePacket(session.Direction, packet.ID) error

	// AllPackets will return all packets currently saved in the session.
	AllPackets(session.Direction) ([]packet.Generic, error)

	// Reset will completely reset the session.
	Reset() error
}

A Session is used to persist incoming and outgoing packets.

type SubscribeFuture

type SubscribeFuture interface {
	GenericFuture

	// ReturnCodes will return the suback codes returned by the broker.
	ReturnCodes() []uint8
}

A SubscribeFuture is returned by the subscribe methods.

Directories

Path Synopsis
Package future implements a generic future handling system.
Package future implements a generic future handling system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL