Documentation
¶
Index ¶
- func CreateConnection(_ api.StreamContext) modules.Connection
- func GetSink() api.Sink
- func GetSource() api.Source
- func ValidateConfig(props map[string]any) error
- type AdConf
- type Conf
- type Connection
- func (conn *Connection) Close(ctx api.StreamContext) error
- func (conn *Connection) DetachSub(ctx api.StreamContext, props map[string]any)
- func (conn *Connection) Dial(ctx api.StreamContext) error
- func (conn *Connection) GetId(_ api.StreamContext) string
- func (conn *Connection) ParseMsg(ctx api.StreamContext, msg any) ([]byte, map[string]any, map[string]string)
- func (conn *Connection) Ping(ctx api.StreamContext) error
- func (conn *Connection) Provision(ctx api.StreamContext, conId string, props map[string]any) error
- func (conn *Connection) Publish(ctx api.StreamContext, topic string, qos byte, retained bool, payload []byte, ...) error
- func (conn *Connection) SetStatusChangeHandler(ctx api.StreamContext, sch api.StatusChangeHandler)
- func (conn *Connection) Status(_ api.StreamContext) modules.ConnectionStatus
- func (conn *Connection) Subscribe(ctx api.StreamContext, topic string, qos byte, callback client.MessageHandler) error
- type Sink
- func (ms *Sink) Close(ctx api.StreamContext) error
- func (ms *Sink) Collect(ctx api.StreamContext, item api.RawTuple) error
- func (ms *Sink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (ms *Sink) Ping(ctx api.StreamContext, props map[string]any) error
- func (ms *Sink) Provision(ctx api.StreamContext, ps map[string]any) error
- type SourceConnector
- func (ms *SourceConnector) Close(ctx api.StreamContext) error
- func (ms *SourceConnector) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (ms *SourceConnector) Ping(ctx api.StreamContext, props map[string]interface{}) error
- func (ms *SourceConnector) Provision(ctx api.StreamContext, props map[string]any) error
- func (ms *SourceConnector) SetEofIngest(eof api.EOFIngest)
- func (ms *SourceConnector) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, _ api.ErrorIngest) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateConnection ¶
func CreateConnection(_ api.StreamContext) modules.Connection
func ValidateConfig ¶
Types ¶
type AdConf ¶
type AdConf struct { Tpc string `json:"topic"` Qos byte `json:"qos"` Retained bool `json:"retained"` SelId string `json:"connectionSelector"` Props map[string]string `json:"properties"` PVersion string `json:"protocolVersion"` }
AdConf is the advanced configuration for the mqtt sink
type Connection ¶
func (*Connection) Close ¶
func (conn *Connection) Close(ctx api.StreamContext) error
func (*Connection) DetachSub ¶
func (conn *Connection) DetachSub(ctx api.StreamContext, props map[string]any)
func (*Connection) Dial ¶
func (conn *Connection) Dial(ctx api.StreamContext) error
func (*Connection) GetId ¶
func (conn *Connection) GetId(_ api.StreamContext) string
func (*Connection) ParseMsg ¶
func (conn *Connection) ParseMsg(ctx api.StreamContext, msg any) ([]byte, map[string]any, map[string]string)
func (*Connection) Ping ¶
func (conn *Connection) Ping(ctx api.StreamContext) error
func (*Connection) Provision ¶
func (conn *Connection) Provision(ctx api.StreamContext, conId string, props map[string]any) error
func (*Connection) Publish ¶
func (conn *Connection) Publish(ctx api.StreamContext, topic string, qos byte, retained bool, payload []byte, properties map[string]string) error
func (*Connection) SetStatusChangeHandler ¶
func (conn *Connection) SetStatusChangeHandler(ctx api.StreamContext, sch api.StatusChangeHandler)
func (*Connection) Status ¶
func (conn *Connection) Status(_ api.StreamContext) modules.ConnectionStatus
func (*Connection) Subscribe ¶
func (conn *Connection) Subscribe(ctx api.StreamContext, topic string, qos byte, callback client.MessageHandler) error
type Sink ¶
type Sink struct {
// contains filtered or unexported fields
}
func (*Sink) Connect ¶
func (ms *Sink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
type SourceConnector ¶
type SourceConnector struct {
// contains filtered or unexported fields
}
SourceConnector is the connector for mqtt source When sharing the same connection, each topic will have one single sourceConnector as the shared source node
func (*SourceConnector) Close ¶
func (ms *SourceConnector) Close(ctx api.StreamContext) error
func (*SourceConnector) Connect ¶
func (ms *SourceConnector) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
func (*SourceConnector) Ping ¶
func (ms *SourceConnector) Ping(ctx api.StreamContext, props map[string]interface{}) error
func (*SourceConnector) Provision ¶
func (ms *SourceConnector) Provision(ctx api.StreamContext, props map[string]any) error
func (*SourceConnector) SetEofIngest ¶
func (ms *SourceConnector) SetEofIngest(eof api.EOFIngest)
func (*SourceConnector) Subscribe ¶
func (ms *SourceConnector) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, _ api.ErrorIngest) error
Subscribe is a one time only operation for source. It connects to the mqtt broker and subscribe to the topic Run open before subscribe
Click to show internal directories.
Click to hide internal directories.