Documentation
¶
Index ¶
- func DetachConnection(clientId string, topic string)
- type AdConf
- type Conf
- type Connection
- type ConnectionConfig
- type MQTTSink
- func (ms *MQTTSink) Close(ctx api.StreamContext) error
- func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error
- func (ms *MQTTSink) CollectResend(ctx api.StreamContext, item interface{}) error
- func (ms *MQTTSink) Configure(ps map[string]interface{}) error
- func (ms *MQTTSink) Open(ctx api.StreamContext) error
- func (ms *MQTTSink) Ping(_ string, props map[string]interface{}) error
- type SourceConnector
- func (ms *SourceConnector) Close(ctx api.StreamContext) error
- func (ms *SourceConnector) Configure(topic string, props map[string]any) error
- func (ms *SourceConnector) Connect(ctx api.StreamContext) error
- func (ms *SourceConnector) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error)
- func (ms *SourceConnector) Ping(dataSource string, props map[string]interface{}) error
- func (ms *SourceConnector) SetupStats(stats metric.StatManager)
- func (ms *SourceConnector) Subscribe(ctx api.StreamContext) error
- type SubscriptionInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DetachConnection ¶
Types ¶
type AdConf ¶
type AdConf struct { Tpc string `json:"topic"` Qos byte `json:"qos"` Retained bool `json:"retained"` Compression string `json:"compression"` ResendTopic string `json:"resendDestination"` }
AdConf is the advanced configuration for the mqtt sink
type Connection ¶
func CreateClient ¶
func CreateClient(ctx api.StreamContext, selId string, props map[string]any) (*Connection, error)
CreateClient creates a new mqtt client. It is anonymous and does not require a name.
func GetConnection ¶
func GetConnection(ctx api.StreamContext, props map[string]any) (*Connection, error)
func (*Connection) Close ¶
func (conn *Connection) Close()
func (*Connection) GetClientId ¶
func (conn *Connection) GetClientId() string
func (*Connection) Ping ¶
func (conn *Connection) Ping() error
func (*Connection) Subscribe ¶
func (conn *Connection) Subscribe(topic string, info *SubscriptionInfo) error
type ConnectionConfig ¶
type MQTTSink ¶
type MQTTSink struct {
// contains filtered or unexported fields
}
func (*MQTTSink) Collect ¶
func (ms *MQTTSink) Collect(ctx api.StreamContext, item interface{}) error
func (*MQTTSink) CollectResend ¶
func (ms *MQTTSink) CollectResend(ctx api.StreamContext, item interface{}) error
type SourceConnector ¶
type SourceConnector struct {
// contains filtered or unexported fields
}
SourceConnector is the connector for mqtt source When sharing the same connection, each topic will have one single sourceConnector as the shared source node
func (*SourceConnector) Close ¶
func (ms *SourceConnector) Close(ctx api.StreamContext) error
func (*SourceConnector) Configure ¶
func (ms *SourceConnector) Configure(topic string, props map[string]any) error
func (*SourceConnector) Connect ¶
func (ms *SourceConnector) Connect(ctx api.StreamContext) error
func (*SourceConnector) Open ¶
func (ms *SourceConnector) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, _ chan<- error)
Open is a continuous process, it keeps reading data from mqtt broker. It starts a go routine to read data and send to consumer channel Run open then subscribe
func (*SourceConnector) Ping ¶
func (ms *SourceConnector) Ping(dataSource string, props map[string]interface{}) error
func (*SourceConnector) SetupStats ¶
func (ms *SourceConnector) SetupStats(stats metric.StatManager)
func (*SourceConnector) Subscribe ¶
func (ms *SourceConnector) Subscribe(ctx api.StreamContext) error
Subscribe is a one time only operation for source. It connects to the mqtt broker and subscribe to the topic Run open before subscribe
type SubscriptionInfo ¶
type SubscriptionInfo struct { Qos byte Handler pahoMqtt.MessageHandler ErrHandler func(error) }
Click to show internal directories.
Click to hide internal directories.