mqtt

package module
v0.0.0-...-e544f28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2024 License: MIT Imports: 24 Imported by: 0

README

mqtt

巨人的肩膀

implement

https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html

mqtt.Client -> http.Transport -> http.Conn -> net.Conn

http.Server -> http.ResponseWriter -> http.Conn -> net.Conn

https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/06-WebSocket.html

p2p模式:https://help.aliyun.com/zh/apsaramq-for-mqtt/cloud-message-queue-mqtt-upgraded/developer-reference/features-p2p-messaging-model?spm=a2c4g.11186623.0.0.44ff2ab0JCP9Sa#concept-96176-zh

注入数据包(Packet Injection)

如果你想要更多的服务端控制,或者想要设置特定的MQTT v5属性或其他属性,你可以选择指定的客户端创建自己的发布包(publish packets)。这种方法允许你将MQTT数据包(packets)直接注入到运行中的服务端,相当于服务端直接自己模拟接收到了某个客户端的数据包。

数据包注入(Packet Injection)可用于任何MQTT数据包,包括ping请求、订阅等。你可以获取客户端的详细信息,因此你甚至可以直接在服务端模拟某个在线的客户端,发布一个数据包。

大多数情况下,您可能希望使用上面描述的内联客户端(Inline Client),因为它具有独特的特权:它可以绕过所有ACL和主题验证检查,这意味着它甚至可以发布到$SYS主题。你也可以自己从头开始制定一个自己的内联客户端,它将与内置的内联客户端行为相同。

cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
  FixedHeader: packets.FixedHeader{
    Type: packets.Publish,
  },
  TopicName: "direct/publish",
  Payload: []byte("scheduled message"),
})

MQTT数据包仍然需要满足规范的结构,所以请参考测试用例中数据包的定义MQTTv5规范 以获取一些帮助。

具体如何使用请参考 hooks example

测试(Testing)

单元测试(Unit Tests)
Paho 互操作性测试(Paho Interoperability Test)

您可以使用 examples/paho/main.go 启动服务器,然后在 interoperability 文件夹中运行 python3 client_test5.py 来检查代理是否符合 Paho互操作性测试 的要求,包括 MQTT v5 和 v3 的测试。

请注意,关于 paho 测试套件存在一些尚未解决的问题,因此在 paho/main.go 示例中启用了某些兼容性模式。

基准测试(Performance Benchmarks)

Mochi MQTT 的性能与其他的一些主流的mqtt中间件(如 Mosquitto、EMQX 等)不相上下。

基准测试是使用 MQTT-Stresser 在 Apple Macbook Air M2 上进行的,使用 cmd/main.go 默认设置。考虑到高低吞吐量的突发情况,中位数分数是最有用的。数值越高越好。

基准测试中呈现的数值不代表真实每秒消息吞吐量。它们依赖于 mqtt-stresser 的一种不寻常的计算方法,但它们在所有代理之间是一致的。性能基准测试的结果仅供参考。这些比较都是使用默认配置进行的。

mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 124,772 125,456 124,614 314,461 313,186 311,910
Mosquitto v2.0.15 155,920 155,919 155,918 185,485 185,097 184,709
EMQX v5.0.11 156,945 156,257 155,568 17,918 17,783 17,649
Rumqtt v0.21.0 112,208 108,480 104,753 135,784 126,446 117,108

mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 41,825 31,663 23,008 144,058 65,903 37,618
Mosquitto v2.0.15 42,729 38,633 29,879 23,241 19,714 18,806
EMQX v5.0.11 21,553 17,418 14,356 4,257 3,980 3,756
Rumqtt v0.21.0 42,213 23,153 20,814 49,465 36,626 19,283
mqtt-server[v0.0.1] 24,374 16,040 10,856 71,731 32,081 15,785
mqtt-server[v0.0.2] - buffer 38,435 23407 30615 40351 27848 22182
mqtt-server-buffer/sync.Map 14,191 2,912 1,180 89,528 5,605 1,468

百万消息挑战(立即向服务器发送100万条消息):

mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 13,532 4,425 2,344 52,120 7,274 2,701
Mosquitto v2.0.15 3,826 3,395 3,032 1,200 1,150 1,118
EMQX v5.0.11 4,086 2,432 2,274 434 333 311
Rumqtt v0.21.0 78,972 5,047 3,804 4,286 3,249 2,027
mqtt-server 75,974 6,059 1,685 24,711 2,724 1,819
mqtt-server-buffer/sync.Map 14,191 2,912 1,180 89,528 5,605 1,468
mqtt-server-linux-server 21724 3379 2306 2608 2107 1454

这里还不确定EMQX是不是哪里出了问题,可能是因为 Docker 的默认配置优化不对,所以要持保留意见,因为我们确实知道它是一款可靠的软件。

Documentation

Index

Constants

View Source
const (
	RESERVED    byte = 0x0
	CONNECT     byte = 0x1
	CONNACK     byte = 0x2
	PUBLISH     byte = 0x3
	PUBACK      byte = 0x4
	PUBREC      byte = 0x5
	PUBREL      byte = 0x6
	PUBCOMP     byte = 0x7
	SUBSCRIBE   byte = 0x8
	SUBACK      byte = 0x9
	UNSUBSCRIBE byte = 0xA
	UNSUBACK    byte = 0xB
	PINGREQ     byte = 0xC
	PINGRESP    byte = 0xD
	DISCONNECT  byte = 0xE
	AUTH        byte = 0xF
)

Control packet types. Position: byte 1, bits 7-4

View Source
const KB = 1024 * 1
View Source
const MB = 1024 * KB

Variables

View Source
var CONFIG = &config{
	Auth: map[string]string{
		"":     "",
		"root": "admin",
	},
}
View Source
var ErrAbortHandler = errors.New("mqtt: abort Handler")

ErrAbortHandler is a sentinel panic value to abort a handler. While any panic from ServeHTTP aborts the response to the client, panicking with ErrAbortHandler also suppresses logging of a stack trace to the server's error log.

View Source
var ErrServerClosed = errors.New("mqtt: Server closed")

ErrServerClosed is returned by the Server.Serve, [ServeTLS], [ListenAndServe], and [ListenAndServeTLS] methods after a call to Server.Shutdown or [Server.Close].

View Source
var Kind = map[byte]string{
	0x0: "[0x0]RESERVED",
	0x1: "[0x1]CONNECT",
	0x2: "[0x2]CONNACK",
	0x3: "[0x3]PUBLISH",
	0x4: "[0x4]PUBACK",
	0x5: "[0x5]PUBREC",
	0x6: "[0x6]PUBREL",
	0x7: "[0x7]PUBCOMP",
	0x8: "[0x8]SUBSCRIBE",
	0x9: "[0x9]SUBACK",
	0xA: "[0xA]UNSUBSCRIBE",
	0xB: "[0xB]UNSUBACK",
	0xC: "[0xC]PINGREQ",
	0xD: "[0xD]PINGRESP",
	0xE: "[0xE]DISCONNECT",
	0xF: "[0xF]AUTH",
}

Functions

func Fedstart

func Fedstart(ctx context.Context, listen string, join string) error

func Httpd

func Httpd() error

func ServerLog

func ServerLog(ctx context.Context, stat *requests.Stat)

Types

type Client

type Client struct {
	// URL specifies either the URI being requested (for server requests) or the URL to access (for client requests).
	//
	// For server requests, the URL is parsed from the URI supplied on the Request-Line as stored in RequestURI.
	// For most requests, fields other than Path and RawQuery will be empty. (See RFC 7230, Section 5.3)
	//
	// For client requests, the URL's Host specifies the server to
	// connect to, while the Request's Host field optionally
	// specifies the Host header value to send in the MQTT request.
	URL *url.URL

	// DialContext specifies the dial function for creating unencrypted TCP connections.
	// If DialContext is nil (and the deprecated Dial below is also nil), then the transport dials using package net.
	//
	// DialContext runs concurrently with calls to RoundTrip.
	// A RoundTrip call that initiates a dial may end up using
	// a connection dialed previously when the earlier connection
	// becomes idle before the later DialContext completes.
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)

	// DialTLSContext specifies an optional dial function for creating TLS connections for non-proxied HTTPS requests.
	//
	// If DialTLSContext is nil (and the deprecated DialTLS below is also nil), DialContext and TLSClientConfig are used.
	//
	// If DialTLSContext is set, the Dial and DialContext hooks are not used for HTTPS
	// requests and the TLSClientConfig and TLSHandshakeTimeout are ignored.
	// The returned net.Conn is assumed to already be past the TLS handshake.
	DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)

	// TLSClientConfig specifies the TLS configuration to use with tls.Client.
	// If nil, the default configuration is used.
	// If non-nil, HTTP/2 support may not be enabled by default.
	TLSClientConfig *tls.Config

	// TLSHandshakeTimeout specifies the maximum amount of time to wait for a TLS handshake. Zero means no timeout.
	TLSHandshakeTimeout time.Duration

	// Timeout specifies a time limit for requests made by this Client.
	// The timeout includes connection time, any redirects, and reading the response body.
	// The timer remains running after Get, Head, Post, or Do return and will interrupt reading of the Response.Body.
	//
	// A Timeout of zero means no timeout.
	//
	// The Client cancels requests to the underlying Transport as if the Request's Context ended.
	//
	// For compatibility, the Client will also use the deprecated CancelRequest method on Transport if found.
	// New RoundTripper implementations should use the Request's Context
	// for cancellation instead of implementing CancelRequest.
	Timeout time.Duration
	// contains filtered or unexported fields
}

A Client is an MQTT client. Its zero value ([DefaultClient]) is a usable client that uses [DefaultTransport].

The [Client.Transport] typically has internal state (cached TCP connections), so Clients should be reused instead of created as needed. Clients are safe for concurrent use by multiple goroutines.

A Client is higher-level than a [RoundTripper] (such as [Transport]) and additionally handles HTTP details such as cookies and redirects.

func New

func New(opts ...Option) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

func (*Client) ConnectAndSubscribe

func (c *Client) ConnectAndSubscribe(ctx context.Context) error

func (*Client) ID

func (c *Client) ID() string

func (*Client) OnMessage

func (c *Client) OnMessage(fn func(*packet.Message))

func (*Client) RoundTrip

func (c *Client) RoundTrip(req packet.Packet) (packet.Packet, error)

RoundTrip implements the [RoundTripper] interface.

For higher-level HTTP client support (such as handling of cookies and redirects), see [Get], [Post], and the Client type.

Like the RoundTripper interface, the error types returned by RoundTrip are unspecified.

func (*Client) ServeMessage

func (c *Client) ServeMessage(ctx context.Context) error

func (*Client) ServeMessageLoop

func (c *Client) ServeMessageLoop(ctx context.Context) error

func (*Client) SubmitMessage

func (c *Client) SubmitMessage(message *packet.Message) error

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context) error

type ConnState

type ConnState int

A ConnState represents the state of a client connection to a server. It's used by the optional [Server.ConnState] hook.

const (
	// StateNew represents a new connection that is expected to
	// send a request immediately. Connections begin at this
	// state and then transition to either StateActive or
	// StateClosed.
	StateNew ConnState = iota

	// StateActive represents a connection that has read 1 or more
	// bytes of a request. The Server.ConnState hook for
	// StateActive fires before the request has entered a handler
	// and doesn't fire again until the request has been
	// handled. After the request is handled, the state
	// transitions to StateClosed, StateHijacked, or StateIdle.
	// For HTTP/2, StateActive fires on the transition from zero
	// to one active request, and only transitions away once all
	// active requests are complete. That means that ConnState
	// cannot be used to do per-request work; ConnState only notes
	// the overall state of the connection.
	StateActive

	// StateIdle represents a connection that has finished
	// handling a request and is in the keep-alive state, waiting
	// for a new request. Connections transition from StateIdle
	// to either StateActive or StateClosed.
	StateIdle

	// StateHijacked represents a hijacked connection.
	// This is a terminal state. It does not transition to StateClosed.
	StateHijacked

	// StateClosed represents a closed connection.
	// This is a terminal state. Hijacked connections do not
	// transition to StateClosed.
	StateClosed
)

type Endpoint

type Endpoint struct {
	// contains filtered or unexported fields
}

Endpoint 不提供事务能力,事务需要其他的上层协议来处理

func (*Endpoint) List

func (e *Endpoint) List() map[string]string

func (*Endpoint) Ping

func (e *Endpoint) Ping()

Ping 这里有个问题,如果一个节点退出后,立即加入集群,会存在之前的节点没有清理的问题 这里的目前解决办法就是,节点sleep5秒之后再重新进入集群

func (*Endpoint) Send

func (e *Endpoint) Send(content []byte) error

type Handler

type Handler interface {
	ServeMQTT(ResponseWriter, packet.Packet)
}

A Handler responds to an MQTT request.

type HandlerFunc

type HandlerFunc func(ResponseWriter, packet.Packet)

func (HandlerFunc) ServeMQTT

func (f HandlerFunc) ServeMQTT(rw ResponseWriter, r packet.Packet)

type InFight

type InFight struct {
	// contains filtered or unexported fields
}

func (*InFight) Get

func (i *InFight) Get(id uint16) (*packet.PUBLISH, bool)

func (*InFight) Put

func (i *InFight) Put(pkt *packet.PUBLISH) bool

type Listen

type Listen struct {
	URL      string `yaml:"url"`
	CertFile string `yaml:"certFile"`
	KeyFile  string `yaml:"keyFile"`
}

type MemorySubscribed

type MemorySubscribed struct {
	// contains filtered or unexported fields
}

func NewMemorySubscribed

func NewMemorySubscribed(s *Server) *MemorySubscribed

func (*MemorySubscribed) CleanEmptyTopic

func (m *MemorySubscribed) CleanEmptyTopic()

func (*MemorySubscribed) Print

func (m *MemorySubscribed) Print()

func (*MemorySubscribed) Publish

func (m *MemorySubscribed) Publish(message *packet.Message) error

Publish 发布消息,如果是新topic需要额外处理存量connect订阅列表的构建

func (*MemorySubscribed) Subscribe

func (m *MemorySubscribed) Subscribe(c *conn)

func (*MemorySubscribed) Unsubscribe

func (m *MemorySubscribed) Unsubscribe(c *conn)

type Option

type Option func(*Options)

func Subscription

func Subscription(subscription ...packet.Subscription) Option

func URL

func URL(url string) Option

func Version

func Version[T ~string | ~byte](version T) Option

type Options

type Options struct {
	URL           string // client used
	ClientID      string
	Version       byte
	Subscriptions []packet.Subscription
}

type ResponseWriter

type ResponseWriter interface {
	OnSend(request packet.Packet) error
}

type Server

type Server struct {
	Handler          Handler
	WebsocketHandler websocket.Handler

	// TLSConfig optionally provides a TLS configuration for use
	// by ServeTLS and ListenAndServeTLS. Note that this value is
	// cloned by ServeTLS and ListenAndServeTLS, so it's not
	// possible to modify the configuration with methods like
	// tls.Config.SetSessionTicketKeys. To use
	// SetSessionTicketKeys, use Server.Serve with a TLS Listener
	// instead.
	TLSConfig *tls.Config

	// ConnState specifies an optional callback function that is
	// called when a client connection changes state. See the
	// ConnState type and associated constants for details.
	ConnState func(net.Conn, ConnState)

	// ConnContext optionally specifies a function that modifies
	// the context used for a new connection c. The provided ctx
	// is derived from the base context and has a ServerContextKey
	// value.
	ConnContext func(ctx context.Context, c net.Conn) context.Context
	// contains filtered or unexported fields
}

A Server defines parameters for running an HTTP server. The zero value for Server is a valid configuration.

func NewServer

func NewServer(ctx context.Context) *Server

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(opts ...Option) error

func (*Server) ListenAndServeTLS

func (s *Server) ListenAndServeTLS(certFile, keyFile string, opts ...Option) error

func (*Server) ListenAndServeWebsocket

func (s *Server) ListenAndServeWebsocket(opts ...Option) error

ListenAndServeWebsocket TODO

func (*Server) Serve

func (s *Server) Serve(l net.Listener) error

Serve accepts incoming connections on the Listener l, creating a new service goroutine for each. The service goroutines read requests and then call srv.Handler to reply to them.

HTTP/2 support is only enabled if the Listener returns *tls.Conn connections. and they were configured with "h2" in the TLS Config.NextProtos.

Serve always returns a non-nil error and closes l. After Server.Shutdown or [Server.Close], the returned error is ErrServerClosed.

func (*Server) ServeTLS

func (s *Server) ServeTLS(l net.Listener, certFile, keyFile string) error

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

type Stat

type Stat struct {
	Uptime            prometheus.Counter
	ActiveConnections prometheus.Gauge
	PacketReceived    prometheus.Counter
	ByteReceived      prometheus.Counter
	PacketSent        prometheus.Counter
	ByteSent          prometheus.Counter
}

func (*Stat) RefreshUptime

func (s *Stat) RefreshUptime()

func (*Stat) Register

func (s *Stat) Register()

type TopicSubscribed

type TopicSubscribed struct {
	TopicName string
	// contains filtered or unexported fields
}

TopicSubscribed 用来存储当前topic有哪些客户端订阅了

func NewTopicSubscribed

func NewTopicSubscribed(topicName string) *TopicSubscribed

func (*TopicSubscribed) Exchange

func (s *TopicSubscribed) Exchange(message *packet.Message) error

func (*TopicSubscribed) Len

func (s *TopicSubscribed) Len() int

func (*TopicSubscribed) Subscribe

func (s *TopicSubscribed) Subscribe(c *conn)

func (*TopicSubscribed) Unsubscribe

func (s *TopicSubscribed) Unsubscribe(c *conn) int

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL