mqttx

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

mqttx

mqtt client for golang

Documentation

Index

Constants

View Source
const (
	LevelEmergency = iota
	LevelAlert
	LevelCritical
	LevelError
	LevelWarning
	LevelNotice
	LevelInformational
	LevelDebug
)
View Source
const (
	MaxUint uint = (1 << bits.UintSize) - 1  // 无符号最大值
	MaxInt  int  = (1<<bits.UintSize)/2 - 1  // 有符号最大值
	MinInt  int  = (1 << bits.UintSize) / -2 // 有符号最小值
)
View Source
const (
	MQTT_BROKER_MOSQUITTO = "mosquitto"
	MQTT_BROKER_EMQX      = "emqx"
	MQTT_BROKER_MOSCA     = "mosca"
	MQTT_BROKER_VERNEMQ   = "vernemq"
	MQTT_BROKER_RABBITMQ  = "rabbitmq"
	MQTT_BROKER_ALIYUN    = "aliyun"
	MQTT_BROKER_QCLOUD    = "qcloud"
	MQTT_BROKER_BAIDU     = "baidu"
	MQTT_BROKER_JD        = "jd"
)

MQTT服务软件厂商 mosquitto/emqx/mosca/vernemq/rabbitmq/aliyun/qcloud/baidu/jd/...

Variables

View Source
var MQTTxHandlerConnectionLost mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	cliOpts := client.OptionsReader()
	server := FormatServerAddr(cliOpts.Servers()[0].String())
	TraceError("mqtt(%v) - client(%v) connection lost, error: %s", server, cliOpts.ClientID(), err)

	if callbackConnectionLostHandler != nil {
		callbackConnectionLostHandler(client, err)
	}
}

MQTTxHandlerConnectionLost 在与MQTT服务器连接断开时回调

View Source
var MQTTxHandlerDefault mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	cliOpts := client.OptionsReader()
	server := FormatServerAddr(cliOpts.Servers()[0].String())
	TraceInfo("mqtt(%v) - client(%v) topic - default: %s, message: %X", server, cliOpts.ClientID(), msg.Topic(), msg.Payload())

	if callbackDefaultHandler != nil {
		callbackDefaultHandler(client, msg)
	}
}

DefaultMQTTxHandler 默认的mqtt消息处理函数, 没有匹配的topic时,使用此函数处理

View Source
var MQTTxHandlerOnConnect mqtt.OnConnectHandler = func(client mqtt.Client) {
	cliOpts := client.OptionsReader()
	server := FormatServerAddr(cliOpts.Servers()[0].String())
	TraceInfo("mqtt(%v) - client(%v) connected", server, cliOpts.ClientID())

	if callbackOnConnectHandler != nil {
		callbackOnConnectHandler(client)
	}
}

MQTTxHandlerOnConnect 在与MQTT服务器建立连接时,订阅主题

View Source
var MQTTxHandlerReconnecting mqtt.ReconnectHandler = func(client mqtt.Client, clientOptions *mqtt.ClientOptions) {
	cliOpts := client.OptionsReader()
	server := FormatServerAddr(cliOpts.Servers()[0].String())
	TraceInfo("mqtt(%v) - client(%v) reconnecting", server, cliOpts.ClientID())

	if callbackReconnectingHandler != nil {
		callbackReconnectingHandler(client, clientOptions)
	}
}

MQTTxHandlerReconnecting 在与MQTT服务器重连时回调

Functions

func CallbackConnectionLostHandlerSet

func CallbackConnectionLostHandlerSet(handler mqtt.ConnectionLostHandler)

CallbackConnectionLostHandlerSet set callbackConnectionLostHandler

func CallbackDefaultHandlerSet

func CallbackDefaultHandlerSet(handler mqtt.MessageHandler)

CallbackDefaultHandlerSet set callbackDefaultHandler

func CallbackOnConnectHandlerSet

func CallbackOnConnectHandlerSet(handler mqtt.OnConnectHandler)

CallbackOnConnectHandlerSet set callbackOnConnectHandler

func CallbackReconnectingHandlerSet

func CallbackReconnectingHandlerSet(handler mqtt.ReconnectHandler)

CallbackReconnectingHandlerSet set callbackReconnectingHandler

func Connect

func Connect(pool *MQTTxClientPool, servers []*MQTTxServer, defaultPublishHandler mqtt.MessageHandler, onConnectHandler mqtt.OnConnectHandler, connectionLostHandler mqtt.ConnectionLostHandler, reconnectingHandler mqtt.ReconnectHandler) error

Connect 初始化MQTT服务

func Debug

func Debug(enable bool)

Debug Enable debug

func FormatServerAddr added in v1.0.9

func FormatServerAddr(addr string) string

FormatServerAddr 转换 ssl:// 地址为 tls://

func MqttLibDebug added in v1.0.11

func MqttLibDebug(enable bool, level int)

MqttLibDebug Enable mqtt lib debug

func NewTLSConfig

func NewTLSConfig(cafile, clientCertFile, clientKeyFile string) (*tls.Config, error)

NewTLSConfig 创建TLS配置

func SetUserDebug

func SetUserDebug(f TraceFunc)

SetUserDebug 配置其他日志输出

func SleepSecond added in v1.0.4

func SleepSecond(seconds int)

SleepSecond 睡眠指定的秒数

func TraceDebug

func TraceDebug(format string, v ...interface{})

TraceDebug Debug调试信息日志

func TraceError

func TraceError(format string, v ...interface{})

TraceError 错误日志

func TraceInfo

func TraceInfo(format string, v ...interface{})

TraceInfo 调试信息日志

Types

type Cert

type Cert struct {
	Cafile         string `json:"cafile"`           // CA证书文件路径
	ClientCertFile string `json:"client_cert_file"` // 客户端证书文件路径
	ClientKeyFile  string `json:"client_key_file"`  // 客户端私钥文件路径
}

Cert 证书信息

func (*Cert) IsValid

func (c *Cert) IsValid() bool

IsValid 是否有效

func (Cert) String

func (c Cert) String() string

String 序列化成字符串

type MQTTLibDebug added in v1.0.11

type MQTTLibDebug struct {
	Level int
}

func (*MQTTLibDebug) Printf added in v1.0.11

func (l *MQTTLibDebug) Printf(format string, v ...interface{})

func (*MQTTLibDebug) Println added in v1.0.11

func (l *MQTTLibDebug) Println(v ...interface{})

type MQTTxClient

type MQTTxClient struct {
	Vendor   string `json:"vendor"`    // MQTT服务软件厂商
	Scheme   string `json:"scheme"`    // MQTT协议 tcp/tls(ssl)/ws
	Domain   string `json:"domain"`    // MQTT服务器地址
	IP       string `json:"ip"`        // MQTT服务器IP
	Port     string `json:"port"`      // MQTT服务器端口
	Cert     Cert   `json:"cert"`      // MQTT证书信息
	ClientID string `json:"client_id"` // MQTT客户端ID, GID_GW101@@@deviceid
	Username string `json:"username"`  // MQTT服务器用户名
	Password string `json:"password"`  // MQTT服务器密码

	// MQTT连接相关
	Opts   *mqtt.ClientOptions `json:"-"` // MQTT连接参数
	Client mqtt.Client         `json:"-"` // MQTT客户端连接

	// 其他属性值
	ServerConnectionCount int               `json:"-"` // MQTT客户端所连接的服务器客户端连接数
	OtherOpts             map[string]string `json:"-"` // 其他属性值
	// contains filtered or unexported fields
}

MQTTxClient MQTT客户端信息

func NewMQTTxClient

func NewMQTTxClient(server *MQTTxServer) (*MQTTxClient, error)

NewMQTTxClient 根据服务器的信息,创建MQTT客户端

func (*MQTTxClient) ClearOtherOpts added in v1.0.5

func (m *MQTTxClient) ClearOtherOpts()

ClearOtherOpts 清空其他属性值

func (*MQTTxClient) Connect

func (m *MQTTxClient) Connect(defaultPublishHandler mqtt.MessageHandler, onConnectHandler mqtt.OnConnectHandler, connectionLostHandler mqtt.ConnectionLostHandler, reconnectingHandler mqtt.ReconnectHandler) error

Connect 连接MQTT服务器

func (*MQTTxClient) Disconnect

func (m *MQTTxClient) Disconnect() error

Disconnect 断开MQTT服务器

func (*MQTTxClient) GetCert

func (m *MQTTxClient) GetCert() Cert

GetCert 获取MQTT证书信息

func (*MQTTxClient) GetClientID

func (m *MQTTxClient) GetClientID() string

GetClientID 获取MQTT客户端ID

func (*MQTTxClient) GetDomain

func (m *MQTTxClient) GetDomain() string

GetDomain 获取MQTT服务器地址

func (*MQTTxClient) GetIP

func (m *MQTTxClient) GetIP() string

GetIP 获取MQTT服务器IP

func (*MQTTxClient) GetOtherOpts added in v1.0.4

func (m *MQTTxClient) GetOtherOpts(key string) string

GetOtherOpts 获取其他属性值

func (*MQTTxClient) GetOtherOptsAll added in v1.0.8

func (m *MQTTxClient) GetOtherOptsAll() map[string]string

GetOtherOptsAll 获取其他所有属性值

func (*MQTTxClient) GetPassword

func (m *MQTTxClient) GetPassword() string

GetPassword 获取MQTT服务器密码

func (*MQTTxClient) GetPort

func (m *MQTTxClient) GetPort() string

GetPort 获取MQTT服务器端口

func (*MQTTxClient) GetScheme

func (m *MQTTxClient) GetScheme() string

GetScheme 获取MQTT协议

func (*MQTTxClient) GetServerConnectionCount added in v1.0.3

func (m *MQTTxClient) GetServerConnectionCount() int

GetServerConnectionCount 获取MQTT客户端所连接的服务器客户端连接数

func (*MQTTxClient) GetUsername

func (m *MQTTxClient) GetUsername() string

GetUsername 获取MQTT服务器用户名

func (*MQTTxClient) GetVendor

func (m *MQTTxClient) GetVendor() string

GetVendor 获取MQTT服务软件厂商

func (*MQTTxClient) Publish

func (m *MQTTxClient) Publish(topic string, qos byte, retained bool, payload interface{}) error

Publish 发布消息

func (*MQTTxClient) Server added in v1.0.3

func (m *MQTTxClient) Server() string

Server 获取MQTT服务器信息

func (*MQTTxClient) SetCert

func (m *MQTTxClient) SetCert(cert Cert)

SetCert 设置MQTT证书信息

func (*MQTTxClient) SetClientID

func (m *MQTTxClient) SetClientID(clientID string)

SetClientID 设置MQTT客户端ID

func (*MQTTxClient) SetDomain

func (m *MQTTxClient) SetDomain(domain string)

SetDomain 设置MQTT服务器地址

func (*MQTTxClient) SetIP

func (m *MQTTxClient) SetIP(ip string)

SetIP 设置MQTT服务器IP

func (*MQTTxClient) SetOtherOpts added in v1.0.4

func (m *MQTTxClient) SetOtherOpts(key, value string)

SetOtherOpts 设置其他属性值

func (*MQTTxClient) SetPassword

func (m *MQTTxClient) SetPassword(password string)

SetPassword 设置MQTT服务器密码

func (*MQTTxClient) SetPort

func (m *MQTTxClient) SetPort(port string)

SetPort 设置MQTT服务器端口

func (*MQTTxClient) SetScheme

func (m *MQTTxClient) SetScheme(scheme string)

SetScheme 设置MQTT协议

func (*MQTTxClient) SetServerConnectionCount added in v1.0.4

func (m *MQTTxClient) SetServerConnectionCount(count int)

SetServerConnectionCount 设置MQTT客户端所连接的服务器客户端连接数

func (*MQTTxClient) SetUsername

func (m *MQTTxClient) SetUsername(username string)

SetUsername 设置MQTT服务器用户名

func (*MQTTxClient) SetVendor

func (m *MQTTxClient) SetVendor(vendor string)

SetVendor 设置MQTT服务软件厂商

func (MQTTxClient) String

func (m MQTTxClient) String() string

String 序列化成字符串

func (*MQTTxClient) Subscribe

func (m *MQTTxClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error

Subscribe 订阅消息

type MQTTxClientPool added in v1.0.3

type MQTTxClientPool struct {
	Clients []*MQTTxClient `json:"clients"` // MQTT客户端连接池
	// contains filtered or unexported fields
}

MQTTxClientPool MQTT客户端连接池

func NewMQTTxClientPool added in v1.0.4

func NewMQTTxClientPool() *MQTTxClientPool

NewMQTTxClientPool 创建一个MQTT客户端连接池

func (*MQTTxClientPool) Add added in v1.0.3

func (p *MQTTxClientPool) Add(client *MQTTxClient)

Add 添加一个客户端连接

func (*MQTTxClientPool) Get added in v1.0.3

func (p *MQTTxClientPool) Get(server string) *MQTTxClient

Get 获取一个客户端连接

func (*MQTTxClientPool) GetClients added in v1.0.6

func (p *MQTTxClientPool) GetClients() []*MQTTxClient

GetClients 获取所有客户端连接

func (*MQTTxClientPool) GetMinConnectionCountClient added in v1.0.3

func (p *MQTTxClientPool) GetMinConnectionCountClient() *MQTTxClient

GetMinConnectionCountClient 获取所有客户端里面连接数最少的一个

func (*MQTTxClientPool) Iterate added in v1.1.1

func (p *MQTTxClientPool) Iterate(f func(c *MQTTxClient))

Iterate 遍历所有客户端连接, 主要用于定时主动检查连接状态,如有必要进行重连操作

func (*MQTTxClientPool) Len added in v1.0.3

func (p *MQTTxClientPool) Len() int

Len 返回连接池的长度

func (*MQTTxClientPool) Remove added in v1.0.3

func (p *MQTTxClientPool) Remove(server string)

Remove 移除一个客户端连接

func (MQTTxClientPool) String added in v1.0.3

func (p MQTTxClientPool) String() string

String 序列化成字符串

type MQTTxServer

type MQTTxServer struct {
	Vendor   string `json:"vendor"`    // MQTT服务软件厂商
	Scheme   string `json:"scheme"`    // MQTT协议 tcp/tls(ssl)/ws
	Domain   string `json:"domain"`    // MQTT服务器地址
	IP       string `json:"ip"`        // MQTT服务器IP
	Port     string `json:"port"`      // MQTT服务器端口
	Cert     Cert   `json:"cert"`      // MQTT证书信息
	ClientID string `json:"client_id"` // MQTT客户端ID, GID_GW101@@@deviceid
	Username string `json:"username"`  // MQTT服务器用户名
	Password string `json:"-"`         // MQTT服务器密码, 安全原因不序列化
}

MQTTxServer MQTT服务器信息

func (*MQTTxServer) GetCert

func (m *MQTTxServer) GetCert() Cert

GetCert 获取MQTT证书信息

func (*MQTTxServer) GetClientID

func (m *MQTTxServer) GetClientID() string

GetClientID 获取MQTT客户端ID

func (*MQTTxServer) GetDomain

func (m *MQTTxServer) GetDomain() string

GetDomain 获取MQTT服务器地址

func (*MQTTxServer) GetIP

func (m *MQTTxServer) GetIP() string

GetIP 获取MQTT服务器IP

func (*MQTTxServer) GetPassword

func (m *MQTTxServer) GetPassword() string

GetPassword 获取MQTT服务器密码

func (*MQTTxServer) GetPort

func (m *MQTTxServer) GetPort() string

GetPort 获取MQTT服务器端口

func (*MQTTxServer) GetScheme

func (m *MQTTxServer) GetScheme() string

GetScheme 获取MQTT协议

func (*MQTTxServer) GetUsername

func (m *MQTTxServer) GetUsername() string

GetUsername 获取MQTT服务器用户名

func (*MQTTxServer) GetVendor

func (m *MQTTxServer) GetVendor() string

GetVendor 获取MQTT服务软件厂商

func (*MQTTxServer) Server added in v1.0.4

func (m *MQTTxServer) Server() string

Server 获取MQTT服务器信息

func (*MQTTxServer) SetCert

func (m *MQTTxServer) SetCert(cert Cert)

SetCert 设置MQTT证书信息

func (*MQTTxServer) SetClientID

func (m *MQTTxServer) SetClientID(clientID string)

SetClientID 设置MQTT客户端ID

func (*MQTTxServer) SetDomain

func (m *MQTTxServer) SetDomain(domain string)

SetDomain 设置MQTT服务器地址

func (*MQTTxServer) SetIP

func (m *MQTTxServer) SetIP(ip string)

SetIP 设置MQTT服务器IP

func (*MQTTxServer) SetPassword

func (m *MQTTxServer) SetPassword(password string)

SetPassword 设置MQTT服务器密码

func (*MQTTxServer) SetPort

func (m *MQTTxServer) SetPort(port string)

SetPort 设置MQTT服务器端口

func (*MQTTxServer) SetScheme

func (m *MQTTxServer) SetScheme(scheme string)

SetScheme 设置MQTT协议

func (*MQTTxServer) SetUsername

func (m *MQTTxServer) SetUsername(username string)

SetUsername 设置MQTT服务器用户名

func (*MQTTxServer) SetVendor

func (m *MQTTxServer) SetVendor(vendor string)

SetVendor 设置MQTT服务软件厂商

func (*MQTTxServer) String

func (m *MQTTxServer) String() string

String 序列化成字符串

type TraceFunc

type TraceFunc func(format string, level int, v ...interface{})
var UserTrace TraceFunc = nil

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL