routing

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2024 License: Apache-2.0, EPL-2.0 Imports: 19 Imported by: 6

Documentation

Index

Constants

View Source
const (
	// TopicCommandRequest defines command request topic
	TopicCommandRequest = "command//+/req/#"

	// TopicCommandResponse defines command responses topic
	TopicCommandResponse = "command//+/res/#,c//+/s/#"
)
View Source
const (
	// TopicsEvent defines event topics
	TopicsEvent = "event/#,e/#"

	// TopicsTelemetry defines telemetry topics
	TopicsTelemetry = "telemetry/#,t/#"
)
View Source
const (
	// TopicGwParamsRequest defines the thing's data request topic.
	TopicGwParamsRequest = "edge/thing/request"
	// TopicGwParamsResponse defines the thing's data response topic.
	TopicGwParamsResponse = "edge/thing/response"

	// TopicConnectionStatus defines the connection status message topic.
	TopicConnectionStatus = "edge/connection/remote/status"
)
View Source
const (
	// StatusConfigError defines the congifuration error status.
	StatusConfigError = "CONFIG_ERROR"

	// StatusProvisioningError defines a provisioning error status.
	StatusProvisioningError = "PROVISIONING_ERROR"
	// StatusProvisioningUpdate defines a running provisioning update status.
	StatusProvisioningUpdate = "PROVISIONING_UPDATE"
	// StatusProvisioningMissing defines a missing provisioning status.
	StatusProvisioningMissing = "PROVISIONING_MISSING"

	// StatusConnectionClosed defines the connection closes status.
	StatusConnectionClosed = "CONNECTION_CLOSED"
	// StatusConnectionUnknown defines the unknown connection status.
	StatusConnectionUnknown = "CONNECTION_UNKNOWN"
	// StatusConnectionError defines a connection error.
	StatusConnectionError = "CONNECTION_ERROR"
	// StatusConnectionNotAuthenticated defines a not authenticated connection status.
	StatusConnectionNotAuthenticated = "CONNECTION_NOT_AUTHENTICATED"
)
View Source
const (
	// Subscription message type.
	Subscription logMessageType = iota
	// UnSubscription message type.
	UnSubscription
	// Disconnection message type.
	Disconnection
	// Termination message type.
	Termination
)
View Source
const (
	// TopicLogSubscribe is topic for log messages about subscriptions.
	TopicLogSubscribe = "$SYS/broker/log/M/subscribe"
	// TopicLogUnsubscribe is topic for log messages about unsubscriptions.
	TopicLogUnsubscribe = "$SYS/broker/log/M/unsubscribe"
	// TopicLogNoticeLevel is topic for log messages with notice level.
	TopicLogNoticeLevel = "$SYS/broker/log/N"
)

Variables

This section is empty.

Functions

func AddTimestamp

func AddTimestamp(h message.HandlerFunc) message.HandlerFunc

AddTimestamp creates middleware handler which adds timestamp to message body

func CommandsReqBus

func CommandsReqBus(router *message.Router,
	mosquittoPub message.Publisher,
	honoSub message.Subscriber,
	reqCache *cache.Cache,
	tenantID, deviceID string,
	generic bool,
) *message.Handler

CommandsReqBus creates the commands request bus.

func CommandsResBus

func CommandsResBus(router *message.Router,
	honoPub message.Publisher,
	mosquittoSub message.Subscriber,
	reqCache *cache.Cache,
	tenantID, deviceID string,
	generic bool,
) *message.Handler

CommandsResBus creates the commands response bus.

func CreateLogHandler

func CreateLogHandler(router *message.Router,
	manager connector.SubscriptionManager,
	localClient *connector.MQTTConnection,
)

CreateLogHandler creates handler that subscribes to Mosquitto logging topic

func CreateServiceRouter

func CreateServiceRouter(localClient *connector.MQTTConnection,
	manager connector.SubscriptionManager,
	logger watermill.LoggerAdapter,
) *message.Router

CreateServiceRouter creates the service router.

func EventsBus

func EventsBus(router *message.Router,
	honoPub message.Publisher,
	mosquittoSub message.Subscriber,
	tenantID, deviceID string,
	generic bool,
) *message.Handler

EventsBus creates bus for events messages.

func NewCommandRequestHandler

func NewCommandRequestHandler(reqCache *cache.Cache,
	tenantID, deviceID string,
	generic bool,
) message.HandlerFunc

NewCommandRequestHandler returns the commands handler function.

func NewCommandResponseHandler

func NewCommandResponseHandler(
	reqCache *cache.Cache,
	prefix string,
	tenantID, deviceID string,
	generic bool,
) message.HandlerFunc

NewCommandResponseHandler returns the responses handler function.

func NewEventsHandler

func NewEventsHandler(prefix string, tenantID string, deviceID string, generic bool) message.HandlerFunc

NewEventsHandler returns the events handler function.

func NewGwParamsMessage

func NewGwParamsMessage(params *GwParams) *message.Message

NewGwParamsMessage creates GWParams message.

func ParamsBus

func ParamsBus(router *message.Router,
	params *GwParams,
	mosquittoPub message.Publisher,
	mosquittoSub message.Subscriber,
	logger logger.Logger,
) *message.Handler

ParamsBus creates the GW parametes bus.

func SendGwParams

func SendGwParams(params *GwParams, retain bool, pub message.Publisher, logger logger.Logger)

SendGwParams publishes the GwParams.

func SendStatus

func SendStatus(cause string, statusPub message.Publisher, logger logger.Logger)

SendStatus publishes a connection status.

func StatusCause

func StatusCause(err error) (string, bool)

StatusCause defines the connection error status on MQTT connection error.

func TelemetryBus

func TelemetryBus(router *message.Router,
	honoPub message.Publisher,
	mosquittoSub message.Subscriber,
	tenantID, deviceID string,
	generic bool,
) *message.Handler

TelemetryBus creates bus for telemetry messages.

Types

type CloudConnectionHandler

type CloudConnectionHandler struct {
	CloudClient *connector.MQTTConnection
	Logger      watermill.LoggerAdapter
}

CloudConnectionHandler defines a connection handler.

func (*CloudConnectionHandler) Connected

func (h *CloudConnectionHandler) Connected(connected bool, err error)

Connected is called to notify CloudConnectionHandler for connection status change.

type ConnectionStatus

type ConnectionStatus struct {
	Connected bool   `json:"connected"`
	Timestamp int64  `json:"timestamp,omitempty"`
	Cause     string `json:"cause,omitempty"`
}

ConnectionStatus defines a connection status data.

type ConnectionStatusHandler

type ConnectionStatusHandler struct {
	Pub message.Publisher

	Logger logger.Logger
}

ConnectionStatusHandler represents a connection status handler.

func (*ConnectionStatusHandler) Connected

func (h *ConnectionStatusHandler) Connected(connected bool, err error)

Connected is called to notify ConnectionStatusHandler for connection status change.

type ErrorsHandler

type ErrorsHandler struct {
	StatusPub message.Publisher

	Logger logger.Logger
}

ErrorsHandler represents an error handler.

func (*ErrorsHandler) Connected

func (h *ErrorsHandler) Connected(connected bool, err error)

Connected is called to notify ErrorsHandler for connection status change.

type GwParams

type GwParams struct {
	DeviceID string `json:"deviceId"`
	TenantID string `json:"tenantId"`
	PolicyID string `json:"policyId,omitempty"`
}

GwParams defines the GW parameters.

func NewGwParams

func NewGwParams(deviceID, tenantID, policyID string) *GwParams

NewGwParams creates GWParams.

type LogHandler

type LogHandler struct {
	SubcriptionList *list.List
	Manager         connector.SubscriptionManager

	Logger watermill.LoggerAdapter
}

LogHandler processes Mosquitto log messages

func (*LogHandler) ProcessLogs

func (h *LogHandler) ProcessLogs(msg *message.Message) error

ProcessLogs is called once new log notification comes from Mosquitto. It evaluates the topic and message if relates to Local subscriptions and if so take the proper action

type LogMessage

type LogMessage struct {
	ClientID  string
	Timestamp logTimestamp
	Text      string
	Type      logMessageType
}

LogMessage represents a subscription message data.

func ParseLogMessage

func ParseLogMessage(topic, message string) *LogMessage

ParseLogMessage parse log message to logMessage structure

type ReconnectHandler

type ReconnectHandler struct {
	Pub message.Publisher

	Params *GwParams

	Logger logger.Logger
}

ReconnectHandler defines reconnection handler.

func (*ReconnectHandler) Connected

func (h *ReconnectHandler) Connected(connected bool, err error)

Connected is called to notify ReconnectHandler for connection status change.

type SubscriptionItem

type SubscriptionItem struct {
	ClientID  string       `json:"clientId,omitempty"`
	Timestamp logTimestamp `json:"timestamp"`
	TopicID   string       `json:"topicId,omitempty"`
	TopicReal string       `json:"topicReal,omitempty"`
}

SubscriptionItem represents a subscription data.

func (SubscriptionItem) String

func (s SubscriptionItem) String() string

String returns the SubscriptionItem string representation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL