Documentation ¶
Index ¶
- Constants
- func AddTimestamp(h message.HandlerFunc) message.HandlerFunc
- func CommandsReqBus(router *message.Router, mosquittoPub message.Publisher, ...) *message.Handler
- func CommandsResBus(router *message.Router, honoPub message.Publisher, ...) *message.Handler
- func CreateLogHandler(router *message.Router, manager connector.SubscriptionManager, ...)
- func CreateServiceRouter(localClient *connector.MQTTConnection, manager connector.SubscriptionManager, ...) *message.Router
- func EventsBus(router *message.Router, honoPub message.Publisher, ...) *message.Handler
- func NewCommandRequestHandler(reqCache *cache.Cache, tenantID, deviceID string, generic bool) message.HandlerFunc
- func NewCommandResponseHandler(reqCache *cache.Cache, prefix string, tenantID, deviceID string, generic bool) message.HandlerFunc
- func NewEventsHandler(prefix string, tenantID string, deviceID string, generic bool) message.HandlerFunc
- func NewGwParamsMessage(params *GwParams) *message.Message
- func ParamsBus(router *message.Router, params *GwParams, mosquittoPub message.Publisher, ...) *message.Handler
- func SendGwParams(params *GwParams, retain bool, pub message.Publisher, logger logger.Logger)
- func SendStatus(cause string, statusPub message.Publisher, logger logger.Logger)
- func StatusCause(err error) (string, bool)
- func TelemetryBus(router *message.Router, honoPub message.Publisher, ...) *message.Handler
- type CloudConnectionHandler
- type ConnectionStatus
- type ConnectionStatusHandler
- type ErrorsHandler
- type GwParams
- type LogHandler
- type LogMessage
- type ReconnectHandler
- type SubscriptionItem
Constants ¶
const ( // TopicCommandRequest defines command request topic TopicCommandRequest = "command//+/req/#" // TopicCommandResponse defines command responses topic TopicCommandResponse = "command//+/res/#,c//+/s/#" )
const ( // TopicsEvent defines event topics TopicsEvent = "event/#,e/#" // TopicsTelemetry defines telemetry topics TopicsTelemetry = "telemetry/#,t/#" )
const ( // TopicGwParamsRequest defines the thing's data request topic. TopicGwParamsRequest = "edge/thing/request" // TopicGwParamsResponse defines the thing's data response topic. TopicGwParamsResponse = "edge/thing/response" // TopicConnectionStatus defines the connection status message topic. TopicConnectionStatus = "edge/connection/remote/status" )
const ( // StatusConfigError defines the congifuration error status. StatusConfigError = "CONFIG_ERROR" // StatusProvisioningError defines a provisioning error status. StatusProvisioningError = "PROVISIONING_ERROR" // StatusProvisioningUpdate defines a running provisioning update status. StatusProvisioningUpdate = "PROVISIONING_UPDATE" // StatusProvisioningMissing defines a missing provisioning status. StatusProvisioningMissing = "PROVISIONING_MISSING" // StatusConnectionClosed defines the connection closes status. StatusConnectionClosed = "CONNECTION_CLOSED" // StatusConnectionUnknown defines the unknown connection status. StatusConnectionUnknown = "CONNECTION_UNKNOWN" // StatusConnectionError defines a connection error. StatusConnectionError = "CONNECTION_ERROR" // StatusConnectionNotAuthenticated defines a not authenticated connection status. StatusConnectionNotAuthenticated = "CONNECTION_NOT_AUTHENTICATED" )
const ( // Subscription message type. Subscription logMessageType = iota // UnSubscription message type. UnSubscription // Disconnection message type. Disconnection // Termination message type. Termination )
const ( // TopicLogSubscribe is topic for log messages about subscriptions. TopicLogSubscribe = "$SYS/broker/log/M/subscribe" // TopicLogUnsubscribe is topic for log messages about unsubscriptions. TopicLogUnsubscribe = "$SYS/broker/log/M/unsubscribe" // TopicLogNoticeLevel is topic for log messages with notice level. TopicLogNoticeLevel = "$SYS/broker/log/N" )
Variables ¶
This section is empty.
Functions ¶
func AddTimestamp ¶
func AddTimestamp(h message.HandlerFunc) message.HandlerFunc
AddTimestamp creates middleware handler which adds timestamp to message body
func CommandsReqBus ¶
func CommandsReqBus(router *message.Router, mosquittoPub message.Publisher, honoSub message.Subscriber, reqCache *cache.Cache, tenantID, deviceID string, generic bool, ) *message.Handler
CommandsReqBus creates the commands request bus.
func CommandsResBus ¶
func CommandsResBus(router *message.Router, honoPub message.Publisher, mosquittoSub message.Subscriber, reqCache *cache.Cache, tenantID, deviceID string, generic bool, ) *message.Handler
CommandsResBus creates the commands response bus.
func CreateLogHandler ¶
func CreateLogHandler(router *message.Router, manager connector.SubscriptionManager, localClient *connector.MQTTConnection, )
CreateLogHandler creates handler that subscribes to Mosquitto logging topic
func CreateServiceRouter ¶
func CreateServiceRouter(localClient *connector.MQTTConnection, manager connector.SubscriptionManager, logger watermill.LoggerAdapter, ) *message.Router
CreateServiceRouter creates the service router.
func EventsBus ¶
func EventsBus(router *message.Router, honoPub message.Publisher, mosquittoSub message.Subscriber, tenantID, deviceID string, generic bool, ) *message.Handler
EventsBus creates bus for events messages.
func NewCommandRequestHandler ¶
func NewCommandRequestHandler(reqCache *cache.Cache, tenantID, deviceID string, generic bool, ) message.HandlerFunc
NewCommandRequestHandler returns the commands handler function.
func NewCommandResponseHandler ¶
func NewCommandResponseHandler( reqCache *cache.Cache, prefix string, tenantID, deviceID string, generic bool, ) message.HandlerFunc
NewCommandResponseHandler returns the responses handler function.
func NewEventsHandler ¶
func NewEventsHandler(prefix string, tenantID string, deviceID string, generic bool) message.HandlerFunc
NewEventsHandler returns the events handler function.
func NewGwParamsMessage ¶
NewGwParamsMessage creates GWParams message.
func ParamsBus ¶
func ParamsBus(router *message.Router, params *GwParams, mosquittoPub message.Publisher, mosquittoSub message.Subscriber, logger logger.Logger, ) *message.Handler
ParamsBus creates the GW parametes bus.
func SendGwParams ¶
SendGwParams publishes the GwParams.
func SendStatus ¶
SendStatus publishes a connection status.
func StatusCause ¶
StatusCause defines the connection error status on MQTT connection error.
Types ¶
type CloudConnectionHandler ¶
type CloudConnectionHandler struct { CloudClient *connector.MQTTConnection Logger watermill.LoggerAdapter }
CloudConnectionHandler defines a connection handler.
func (*CloudConnectionHandler) Connected ¶
func (h *CloudConnectionHandler) Connected(connected bool, err error)
Connected is called to notify CloudConnectionHandler for connection status change.
type ConnectionStatus ¶
type ConnectionStatus struct { Connected bool `json:"connected"` Timestamp int64 `json:"timestamp,omitempty"` Cause string `json:"cause,omitempty"` }
ConnectionStatus defines a connection status data.
type ConnectionStatusHandler ¶
ConnectionStatusHandler represents a connection status handler.
func (*ConnectionStatusHandler) Connected ¶
func (h *ConnectionStatusHandler) Connected(connected bool, err error)
Connected is called to notify ConnectionStatusHandler for connection status change.
type ErrorsHandler ¶
ErrorsHandler represents an error handler.
func (*ErrorsHandler) Connected ¶
func (h *ErrorsHandler) Connected(connected bool, err error)
Connected is called to notify ErrorsHandler for connection status change.
type GwParams ¶
type GwParams struct { DeviceID string `json:"deviceId"` TenantID string `json:"tenantId"` PolicyID string `json:"policyId,omitempty"` }
GwParams defines the GW parameters.
func NewGwParams ¶
NewGwParams creates GWParams.
type LogHandler ¶
type LogHandler struct { SubcriptionList *list.List Manager connector.SubscriptionManager Logger watermill.LoggerAdapter }
LogHandler processes Mosquitto log messages
func (*LogHandler) ProcessLogs ¶
func (h *LogHandler) ProcessLogs(msg *message.Message) error
ProcessLogs is called once new log notification comes from Mosquitto. It evaluates the topic and message if relates to Local subscriptions and if so take the proper action
type LogMessage ¶
LogMessage represents a subscription message data.
func ParseLogMessage ¶
func ParseLogMessage(topic, message string) *LogMessage
ParseLogMessage parse log message to logMessage structure
type ReconnectHandler ¶
ReconnectHandler defines reconnection handler.
func (*ReconnectHandler) Connected ¶
func (h *ReconnectHandler) Connected(connected bool, err error)
Connected is called to notify ReconnectHandler for connection status change.
type SubscriptionItem ¶
type SubscriptionItem struct { ClientID string `json:"clientId,omitempty"` Timestamp logTimestamp `json:"timestamp"` TopicID string `json:"topicId,omitempty"` TopicReal string `json:"topicReal,omitempty"` }
SubscriptionItem represents a subscription data.
func (SubscriptionItem) String ¶
func (s SubscriptionItem) String() string
String returns the SubscriptionItem string representation.