mqtt

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

README

MQTT adapter

MQTT adapter provides an MQTT API for sending messages through the platform. MQTT adapter uses mProxy for proxying traffic between client and MQTT broker.

Configuration

The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values.

Variable Description Default
MG_MQTT_ADAPTER_LOG_LEVEL Log level for the MQTT Adapter (debug, info, warn, error) info
MG_MQTT_ADAPTER_MQTT_PORT mProxy port 1883
MG_MQTT_ADAPTER_MQTT_TARGET_HOST MQTT broker host localhost
MG_MQTT_ADAPTER_MQTT_TARGET_PORT MQTT broker port 1883
MG_MQTT_ADAPTER_MQTT_QOS MQTT broker QoS 1
MG_MQTT_ADAPTER_FORWARDER_TIMEOUT MQTT forwarder for multiprotocol communication timeout 30s
MG_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK URL of broker health check ""
MG_MQTT_ADAPTER_WS_PORT mProxy MQTT over WS port 8080
MG_MQTT_ADAPTER_WS_TARGET_HOST MQTT broker host for MQTT over WS localhost
MG_MQTT_ADAPTER_WS_TARGET_PORT MQTT broker port for MQTT over WS 8080
MG_MQTT_ADAPTER_WS_TARGET_PATH MQTT broker MQTT over WS path /mqtt
MG_MQTT_ADAPTER_INSTANCE Instance name for MQTT adapter ""
MG_THINGS_AUTH_GRPC_URL Things service Auth gRPC URL localhost:7016
MG_THINGS_AUTH_GRPC_TIMEOUT Things service Auth gRPC request timeout in seconds 1s
MG_THINGS_AUTH_GRPC_CLIENT_CERT Path to the PEM encoded things service Auth gRPC client certificate file ""
MG_THINGS_AUTH_GRPC_CLIENT_KEY Path to the PEM encoded things service Auth gRPC client key file ""
MG_THINGS_AUTH_GRPC_SERVER_CERTS Path to the PEM encoded things server Auth gRPC server trusted CA certificate file ""
MG_ES_URL Event sourcing URL nats://localhost:4222
MG_MESSAGE_BROKER_URL Message broker instance URL nats://localhost:4222
MG_JAEGER_URL Jaeger server URL http://localhost:14268/api/traces
MG_JAEGER_TRACE_RATIO Jaeger sampling ratio 1.0
MG_SEND_TELEMETRY Send telemetry to magistrala call home server true
MG_MQTT_ADAPTER_INSTANCE_ID Service instance ID ""

Deployment

The service itself is distributed as Docker container. Check the mqtt-adapter service section in docker-compose file to see how service is deployed.

Running this service outside of container requires working instance of the message broker service, things service and Jaeger server. To start the service outside of the container, execute the following shell script:

# download the latest version of the service
git clone https://github.com/andychao217/magistrala

cd magistrala

# compile the mqtt
make mqtt

# copy binary to bin
make install

# set the environment variables and run the service
MG_MQTT_ADAPTER_LOG_LEVEL=info \
MG_MQTT_ADAPTER_MQTT_PORT=1883 \
MG_MQTT_ADAPTER_MQTT_TARGET_HOST=localhost \
MG_MQTT_ADAPTER_MQTT_TARGET_PORT=1883 \
MG_MQTT_ADAPTER_MQTT_QOS=1 \
MG_MQTT_ADAPTER_FORWARDER_TIMEOUT=30s \
MG_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK="" \
MG_MQTT_ADAPTER_WS_PORT=8080 \
MG_MQTT_ADAPTER_WS_TARGET_HOST=localhost \
MG_MQTT_ADAPTER_WS_TARGET_PORT=8080 \
MG_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt \
MG_MQTT_ADAPTER_INSTANCE="" \
MG_THINGS_AUTH_GRPC_URL=localhost:7016 \
MG_THINGS_AUTH_GRPC_TIMEOUT=1s \
MG_THINGS_AUTH_GRPC_CLIENT_CERT="" \
MG_THINGS_AUTH_GRPC_CLIENT_KEY="" \
MG_THINGS_AUTH_GRPC_SERVER_CERTS="" \
MG_ES_URL=nats://localhost:4222 \
MG_MESSAGE_BROKER_URL=nats://localhost:4222 \
MG_JAEGER_URL=http://localhost:14268/api/traces \
MG_JAEGER_TRACE_RATIO=1.0 \
MG_SEND_TELEMETRY=true \
MG_MQTT_ADAPTER_INSTANCE_ID="" \
$GOBIN/magistrala-mqtt

Setting MG_THINGS_AUTH_GRPC_CLIENT_CERT and MG_THINGS_AUTH_GRPC_CLIENT_KEY will enable TLS against the things service. The service expects a file in PEM format for both the certificate and the key. Setting MG_THINGS_AUTH_GRPC_SERVER_CERTS will enable TLS against the things service trusting only those CAs that are provided. The service expects a file in PEM format of trusted CAs.

For more information about service capabilities and its usage, please check out the API documentation API.

Documentation

Overview

Package mqtt contains the domain concept definitions needed to support Magistrala MQTT service functionality.

Index

Constants

View Source
const (
	LogInfoSubscribed   = "subscribed with client_id %s to topics %s"
	LogInfoUnsubscribed = "unsubscribed client_id %s from topics %s"
	LogInfoConnected    = "connected with client_id %s"
	LogInfoDisconnected = "disconnected client_id %s and username %s"
	LogInfoPublished    = "published with client_id %s to the topic %s"
)

Log message formats.

Variables

View Source
var (
	ErrMalformedSubtopic            = errors.New("malformed subtopic")
	ErrClientNotInitialized         = errors.New("client is not initialized")
	ErrMalformedTopic               = errors.New("malformed topic")
	ErrMissingClientID              = errors.New("client_id not found")
	ErrMissingTopicPub              = errors.New("failed to publish due to missing topic")
	ErrMissingTopicSub              = errors.New("failed to subscribe due to missing topic")
	ErrFailedConnect                = errors.New("failed to connect")
	ErrFailedSubscribe              = errors.New("failed to subscribe")
	ErrFailedUnsubscribe            = errors.New("failed to unsubscribe")
	ErrFailedPublish                = errors.New("failed to publish")
	ErrFailedDisconnect             = errors.New("failed to disconnect")
	ErrFailedPublishDisconnectEvent = errors.New("failed to publish disconnect event")
	ErrFailedParseSubtopic          = errors.New("failed to parse subtopic")
	ErrFailedPublishConnectEvent    = errors.New("failed to publish connect event")
	ErrFailedPublishToMsgBroker     = errors.New("failed to publish to magistrala message broker")
)

Error wrappers for MQTT errors.

Functions

func NewHandler

func NewHandler(publisher messaging.Publisher, es events.EventStore, logger *slog.Logger, authClient magistrala.AuthzServiceClient) session.Handler

NewHandler creates new Handler entity.

Types

type Forwarder

type Forwarder interface {
	// Forward subscribes to the Subscriber and
	// publishes messages using provided Publisher.
	Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error
}

Forwarder specifies MQTT forwarder interface API.

func NewForwarder

func NewForwarder(topic string, logger *slog.Logger) Forwarder

NewForwarder returns new Forwarder implementation.

Directories

Path Synopsis
Package events provides the domain concept definitions needed to support mqtt events functionality.
Package events provides the domain concept definitions needed to support mqtt events functionality.
Package mocks contains mocks for testing purposes.
Package mocks contains mocks for testing purposes.
Package tracing provides tracing instrumentation for Magistrala MQTT adapter service.
Package tracing provides tracing instrumentation for Magistrala MQTT adapter service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL