mqttadapter

package
v0.0.0-...-aac6d55 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package mqtt provides options for configuring MQTT client. It includes various options for setting up the client, such as enabling debug mode, setting username and password, configuring TLS, setting will messages, and more.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientOptions

type ClientOptions struct {
	*mqtt.ClientOptions
	// contains filtered or unexported fields
}

ClientOptions represents the options for configuring the MQTT client.

type MQTTClientAdapter

type MQTTClientAdapter interface {
	// GetMqttClient returns the underlying MQTT client.
	GetMqttClient() mqtt.Client

	GetClientOptions() *mqtt.ClientOptions

	// OnConnectOnce sets a callback function to be called once when the client is connected.
	OnConnectOnce(cb OnConnectCallback)

	// OnConnectLostOnce sets a callback function to be called once when the client connection is lost.
	OnConnectLostOnce(cb OnConnectLostCallback)

	// OnConnect sets a callback function to be called when the client is connected.
	// It returns an index that can be used to remove the callback using OffConnect.
	OnConnect(cb OnConnectCallback) int

	// OffConnect removes the callback function associated with the given index.
	OffConnect(idx int)

	// OnConnectLost sets a callback function to be called when the client connection is lost.
	// It returns an index that can be used to remove the callback using OffConnectLost.
	OnConnectLost(cb OnConnectLostCallback) int

	// OffConnectLost removes the callback function associated with the given index.
	OffConnectLost(idx int)

	// Connect establishes a connection to the MQTT broker.
	// It takes a context.Context as a parameter and returns an error if the connection fails.
	Connect(ctx context.Context) error

	// EnsureConnected ensures that the client is connected to the MQTT broker.
	EnsureConnected()

	// ConnectAndWaitForSuccess establishes a connection to the MQTT broker and waits for a successful connection.
	ConnectAndWaitForSuccess()

	// Disconnect disconnects the client from the MQTT broker.
	Disconnect()

	// IsConnected returns true if the client is currently connected to the MQTT broker, false otherwise.
	IsConnected() bool

	// Subscribe subscribes to a topic with the specified quality of service (QoS) level and message callback function.
	Subscribe(ctx context.Context, topic string, qos byte, onMsg MessageCallback)

	// SubscribeWait subscribes to a topic with the specified quality of service (QoS) level and message callback function,
	// and waits for the subscription to be successful.
	SubscribeWait(ctx context.Context, topic string, qos byte, onMsg MessageCallback) error

	// SubscribeMultiple subscribes to multiple topics with the specified quality of service (QoS) levels and message callback function.
	SubscribeMultiple(ctx context.Context, filters map[string]byte, onMsg MessageCallback)

	// SubscribeMultipleWait subscribes to multiple topics with the specified quality of service (QoS) levels and message callback function,
	// and waits for the subscriptions to be successful.
	SubscribeMultipleWait(ctx context.Context, filters map[string]byte, onMsg MessageCallback) error

	// Unsubscribe unsubscribes from a topic.
	Unsubscribe(ctx context.Context, topic string)

	// UnsubscribeWait unsubscribes from a topic and waits for the unsubscribe to be successful.
	UnsubscribeWait(ctx context.Context, topic string) error

	// UnsubscribeAll unsubscribes from all topics.
	UnsubscribeAll(ctx context.Context)

	// UnsubscribeAllWait unsubscribes from all topics and waits for the unsubscribes to be successful.
	UnsubscribeAllWait(ctx context.Context) error

	// PublishBytes publishes a byte array as a message to the specified topic with the specified quality of service (QoS) level and retained flag.
	PublishBytes(ctx context.Context, topic string, qos byte, retained bool, data []byte)

	// PublishBytesWait publishes a byte array as a message to the specified topic with the specified quality of service (QoS) level and retained flag,
	// and waits for the publish to be successful.
	PublishBytesWait(ctx context.Context, topic string, qos byte, retained bool, data []byte) error

	// PublishObject publishes an object as a message to the specified topic with the specified quality of service (QoS) level and retained flag.
	PublishObject(ctx context.Context, topic string, qos byte, retained bool, payload any) error

	// PublishObjectWait publishes an object as a message to the specified topic with the specified quality of service (QoS) level and retained flag,
	// and waits for the publish to be successful.
	PublishObjectWait(ctx context.Context, topic string, qos byte, retained bool, payload any) error
}

MQTTClientAdapter is an interface that defines the methods for interacting with an MQTT client.

func New

func New(uri, clientID string, options ...Option) (MQTTClientAdapter, error)

New creates a new MQTT client with the specified URI, client ID, and options. The URI should be in the format "scheme://host:port", where scheme can be "tcp" or "ssl". The client ID is a unique identifier for the client. The options parameter allows for additional configuration of the client. Returns a pointer to the created Client and an error if any.

type MQTTClientAdapterImpl

type MQTTClientAdapterImpl struct {
	// contains filtered or unexported fields
}

MQTTClientAdapterImpl represents an MQTT client.

func (*MQTTClientAdapterImpl) Connect

func (s *MQTTClientAdapterImpl) Connect(ctx context.Context) error

func (*MQTTClientAdapterImpl) ConnectAndWaitForSuccess

func (s *MQTTClientAdapterImpl) ConnectAndWaitForSuccess()

ConnectAndWaitForSuccess connects to the MQTT broker and waits for a successful connection. If the client is already connected, it returns immediately. If the connection fails, it retries every 10 seconds until a successful connection is established. The function stops retrying if the stopRetryConnect flag is set to true.

func (*MQTTClientAdapterImpl) Disconnect

func (s *MQTTClientAdapterImpl) Disconnect()

Disconnect disconnects the MQTT client from the broker. It stops the retry connection mechanism and calls the Disconnect method of the underlying MQTT client. The timeout parameter specifies the maximum time to wait for the disconnection to complete, in milliseconds.

func (*MQTTClientAdapterImpl) EnsureConnected

func (s *MQTTClientAdapterImpl) EnsureConnected()

EnsureConnected ensures that the MQTT client is connected. It starts a goroutine to connect to the MQTT broker and waits for the connection to be successful.

func (*MQTTClientAdapterImpl) GetClientOptions

func (s *MQTTClientAdapterImpl) GetClientOptions() *mqtt.ClientOptions

func (*MQTTClientAdapterImpl) GetMqttClient

func (s *MQTTClientAdapterImpl) GetMqttClient() mqtt.Client

GetMqttClient returns the MQTT client associated with the Client instance.

func (*MQTTClientAdapterImpl) IsConnected

func (s *MQTTClientAdapterImpl) IsConnected() bool

IsConnected returns a boolean value indicating whether the client is currently connected to the MQTT broker. It checks if the connection to the broker is open.

func (*MQTTClientAdapterImpl) OffConnect

func (s *MQTTClientAdapterImpl) OffConnect(idx int)

OffConnect removes the onConnect callback function associated with the given index.

func (*MQTTClientAdapterImpl) OffConnectLost

func (s *MQTTClientAdapterImpl) OffConnectLost(idx int)

OffConnectLost removes the callback function associated with the given index from the onConnectLostCallbaks map. It locks the onConnectLostCallbakMutex to ensure thread safety and then deletes the callback function from the map.

func (*MQTTClientAdapterImpl) OnConnect

func (s *MQTTClientAdapterImpl) OnConnect(cb OnConnectCallback) int

OnConnect registers a callback function to be called when the MQTT client is connected. The callback function will be invoked immediately if the client is already connected. The function returns an index that can be used to unregister the callback using the UnregisterOnConnect method.

func (*MQTTClientAdapterImpl) OnConnectLost

func (s *MQTTClientAdapterImpl) OnConnectLost(cb OnConnectLostCallback) int

OnConnectLost registers a callback function to be called when the MQTT client loses connection. The callback function will be invoked with an integer parameter representing the index of the callback. Returns the index of the registered callback.

func (*MQTTClientAdapterImpl) OnConnectLostOnce

func (s *MQTTClientAdapterImpl) OnConnectLostOnce(cb OnConnectLostCallback)

OnConnectLostOnce registers a callback function to be called when the MQTT client loses connection to the broker. The callback function will be called only once and then automatically unregistered. The provided callback function should accept an error parameter, which represents the reason for the connection loss.

func (*MQTTClientAdapterImpl) OnConnectOnce

func (s *MQTTClientAdapterImpl) OnConnectOnce(cb OnConnectCallback)

OnConnectOnce registers a callback function to be executed once the MQTT client is connected. If the client is already connected, the callback function is executed immediately. Otherwise, the callback function is executed when the client successfully connects. The callback function is unregistered after it is executed.

func (*MQTTClientAdapterImpl) PublishBytes

func (s *MQTTClientAdapterImpl) PublishBytes(ctx context.Context, topic string, qos byte, retained bool, data []byte)

PublishBytes publishes the given data as a byte array to the specified MQTT topic. It takes the context, topic, quality of service (QoS), retained flag, and data as parameters. The QoS determines the level of assurance for message delivery. The retained flag indicates whether the message should be retained by the broker. This function is used to publish data using the MQTT client.

func (*MQTTClientAdapterImpl) PublishBytesWait

func (s *MQTTClientAdapterImpl) PublishBytesWait(ctx context.Context, topic string, qos byte, retained bool, data []byte) error

PublishBytesWait publishes the given data as bytes to the specified topic with the specified quality of service (QoS), and waits for the operation to complete or the context to be canceled. It returns an error if the operation fails or if the context is canceled.

func (*MQTTClientAdapterImpl) PublishObject

func (s *MQTTClientAdapterImpl) PublishObject(ctx context.Context, topic string, qos byte, retained bool, payload any) error

PublishObject publishes an object to the specified MQTT topic with the given quality of service (QoS), retained flag, and payload. It returns an error if the publishing operation fails.

func (*MQTTClientAdapterImpl) PublishObjectWait

func (s *MQTTClientAdapterImpl) PublishObjectWait(ctx context.Context, topic string, qos byte, retained bool, payload any) error

PublishObjectWait publishes an object to the specified MQTT topic with the given quality of service (QoS), retention flag, and payload. It waits for the operation to complete or for the context to be canceled. If the context is canceled before the operation completes, it returns the context error. If the operation completes successfully, it returns nil. If there is an error during the operation, it returns the error from the MQTT token.

func (*MQTTClientAdapterImpl) Subscribe

func (s *MQTTClientAdapterImpl) Subscribe(ctx context.Context, topic string, qos byte, onMsg MessageCallback)

Subscribe subscribes to a topic with the specified quality of service (QoS) level and registers a callback function to handle incoming messages. The topic parameter specifies the topic to subscribe to. The qos parameter specifies the desired QoS level for the subscription. The onMsg parameter is a callback function that will be called when a message is received. The callback function receives the MQTT client adapter instance and the received message as parameters. The subscription is stored in the subscribeMap for later reference.

func (*MQTTClientAdapterImpl) SubscribeMultiple

func (s *MQTTClientAdapterImpl) SubscribeMultiple(ctx context.Context, filters map[string]byte, onMsg MessageCallback)

SubscribeMultiple subscribes to multiple MQTT topics with their respective QoS levels. It takes a map of topic filters and their corresponding QoS levels, and a callback function to handle incoming messages. The callback function is invoked for each message received on any of the subscribed topics.

The function returns an mqtt.Token that can be used to track the status of the subscription. If there is an error during the subscription process, the error can be obtained from the token.

The topics and their corresponding QoS levels are stored in the filters map. The onMsg callback function is invoked with the MQTT client and the received message for each message received on any of the subscribed topics.

Example usage:

filters := map[string]byte{
  "topic1": 0,
  "topic2": 1,
  "topic3": 2,
}

onMsg := func(client *Client, message mqtt.Message) {
  // Handle incoming message
}

func (*MQTTClientAdapterImpl) SubscribeMultipleWait

func (s *MQTTClientAdapterImpl) SubscribeMultipleWait(ctx context.Context, filters map[string]byte, onMsg MessageCallback) error

SubscribeMultipleWait subscribes to multiple MQTT topics and waits for incoming messages. It takes a context.Context object for cancellation, a map of topic filters and their QoS levels, and a MessageCallback function to handle incoming messages. The MessageCallback function is called with the MQTT client adapter and the received message as parameters. This function returns an error if the subscription or message handling encounters an error, or if the context is canceled before the subscription is completed.

func (*MQTTClientAdapterImpl) SubscribeWait

func (s *MQTTClientAdapterImpl) SubscribeWait(ctx context.Context, topic string, qos byte, onMsg MessageCallback) error

SubscribeWait subscribes to a topic with the specified quality of service (QoS) level and waits for incoming messages. It registers a callback function to handle each received message. The function returns an error if the context is canceled or if there is an error while subscribing.

Parameters: - ctx: The context.Context object for cancellation. - topic: The topic to subscribe to. - qos: The quality of service level for the subscription. - onMsg: The callback function to handle incoming messages.

Returns: - error: An error if the context is canceled or if there is an error while subscribing.

func (*MQTTClientAdapterImpl) Unsubscribe

func (s *MQTTClientAdapterImpl) Unsubscribe(ctx context.Context, topic string)

Unsubscribe unsubscribes from the specified MQTT topic. It takes a context.Context and the topic string as parameters. This method logs the topic being unsubscribed and removes it from the subscribeMap. Finally, it calls the Unsubscribe method of the MQTT client to unsubscribe from the topic.

func (*MQTTClientAdapterImpl) UnsubscribeAll

func (s *MQTTClientAdapterImpl) UnsubscribeAll(ctx context.Context)

UnsubscribeAll unsubscribes from all topics that the client is currently subscribed to. It retrieves the list of topics from the subscribeMap and unsubscribes from each topic. After unsubscribing, it removes the topics from the subscribeMap.

func (*MQTTClientAdapterImpl) UnsubscribeAllWait

func (s *MQTTClientAdapterImpl) UnsubscribeAllWait(ctx context.Context) error

UnsubscribeAllWait unsubscribes from all topics that have been previously subscribed to. It waits for the operation to complete or for the context to be canceled. If the context is canceled before the operation completes, it returns the context error. If the operation completes with an error, it returns the error.

func (*MQTTClientAdapterImpl) UnsubscribeWait

func (s *MQTTClientAdapterImpl) UnsubscribeWait(ctx context.Context, topic string) error

UnsubscribeWait unsubscribes from a topic and waits for the operation to complete or the context to be canceled. It returns an error if the operation fails or the context is canceled.

type Message

type Message = mqtt.Message

Message represents a message in the MQTT protocol.

type MessageCallback

type MessageCallback func(MQTTClientAdapter, Message)

MessageCallback is a function type that represents a callback for handling MQTT messages. It takes a pointer to a Client and a Message as parameters.

type OnConnectCallback

type OnConnectCallback func()

OnConnectCallback represents a callback function that is called when a connection is established.

type OnConnectLostCallback

type OnConnectLostCallback func(err error)

OnConnectLostCallback is a function type that represents a callback function to be called when the connection to the MQTT broker is lost. The callback function takes an error parameter that indicates the reason for the connection loss.

type Option

type Option func(o *ClientOptions)

Option is a function that modifies the ClientOptions.

func WithClientID

func WithClientID(clientID string) Option

WithClientID sets the client ID for the MQTT client.

func WithConnectRetryInterval

func WithConnectRetryInterval(duration time.Duration) Option

WithConnectRetryInterval sets the interval for reconnecting to the MQTT broker.

func WithDebug

func WithDebug(debug bool) Option

WithDebug sets the debug mode for the MQTT client.

func WithFileStore

func WithFileStore(tempDir string) Option

WithFileStore sets the file store for the MQTT client.

func WithKeepAlive

func WithKeepAlive(keepalive time.Duration) Option

WithKeepAlive sets the keep alive interval for the MQTT client.

func WithMaxReconnectInterval

func WithMaxReconnectInterval(interval time.Duration) Option

WithMaxReconnectInterval sets the maximum interval for reconnecting to the MQTT broker.

func WithOfflineWill

func WithOfflineWill(topic string, payload string) Option

WithOfflineWill sets the offline will message for the MQTT client.

func WithOnlineStatus

func WithOnlineStatus(topic string, payload []byte) Option

WithOnlineStatus sets the online status topic and payload for the MQTT client.

func WithProtocolVersion

func WithProtocolVersion(pv uint) Option

WithProtocolVersion sets the MQTT protocol version for the client.

func WithQos

func WithQos(qos byte) Option

WithQos sets the QoS level for the MQTT client.

func WithStatus

func WithStatus(
	onlineTopic string, onlinePayload []byte,
	offlineTopic string, offlinePayload []byte,
) Option

WithStatus sets the online and offline status topics and payloads for the MQTT client.

func WithStore

func WithStore(store mqtt.Store) Option

WithStore sets the store for the MQTT client.

func WithTlsConfig

func WithTlsConfig(cfg *tls.Config) Option

WithTlsConfig sets the TLS configuration for the MQTT client.

func WithUserPass

func WithUserPass(user, pass string) Option

WithUserPass sets the username and password for the MQTT client.

func WithWill

func WithWill(topic string, payload string, qos byte, retained bool) Option

WithWill sets the will message for the MQTT client.

Directories

Path Synopsis
Package mock_mqttadapter is a generated GoMock package.
Package mock_mqttadapter is a generated GoMock package.
mqtt
Package mock_mqtt is a generated GoMock package.
Package mock_mqtt is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL