Documentation ¶
Index ¶
- Constants
- Variables
- func Fedstart(ctx context.Context, listen string, join string) error
- func Httpd() error
- func ServerLog(ctx context.Context, stat *requests.Stat)
- type Client
- func (c *Client) Close() error
- func (c *Client) Connect(ctx context.Context) error
- func (c *Client) ConnectAndSubscribe(ctx context.Context) error
- func (c *Client) ID() string
- func (c *Client) OnMessage(fn func(*packet.Message))
- func (c *Client) RoundTrip(req packet.Packet) (packet.Packet, error)
- func (c *Client) ServeMessage(ctx context.Context) error
- func (c *Client) ServeMessageLoop(ctx context.Context) error
- func (c *Client) SubmitMessage(message *packet.Message) error
- func (c *Client) Subscribe(ctx context.Context) error
- type ConnState
- type Endpoint
- type Handler
- type HandlerFunc
- type InFight
- type Listen
- type MemorySubscribed
- type Option
- type Options
- type ResponseWriter
- type Server
- func (s *Server) ListenAndServe(opts ...Option) error
- func (s *Server) ListenAndServeTLS(certFile, keyFile string, opts ...Option) error
- func (s *Server) ListenAndServeWebsocket(opts ...Option) error
- func (s *Server) Serve(l net.Listener) error
- func (s *Server) ServeTLS(l net.Listener, certFile, keyFile string) error
- func (s *Server) Shutdown(ctx context.Context) error
- type Stat
- type TopicSubscribed
Constants ¶
const ( RESERVED byte = 0x0 CONNECT byte = 0x1 CONNACK byte = 0x2 PUBLISH byte = 0x3 PUBACK byte = 0x4 PUBREC byte = 0x5 PUBREL byte = 0x6 PUBCOMP byte = 0x7 SUBSCRIBE byte = 0x8 SUBACK byte = 0x9 UNSUBSCRIBE byte = 0xA UNSUBACK byte = 0xB PINGREQ byte = 0xC PINGRESP byte = 0xD DISCONNECT byte = 0xE AUTH byte = 0xF )
Control packet types. Position: byte 1, bits 7-4
const KB = 1024 * 1
const MB = 1024 * KB
Variables ¶
var CONFIG = &config{ Auth: map[string]string{ "": "", "root": "admin", }, }
var ErrAbortHandler = errors.New("mqtt: abort Handler")
ErrAbortHandler is a sentinel panic value to abort a handler. While any panic from ServeHTTP aborts the response to the client, panicking with ErrAbortHandler also suppresses logging of a stack trace to the server's error log.
var ErrServerClosed = errors.New("mqtt: Server closed")
ErrServerClosed is returned by the Server.Serve, [ServeTLS], [ListenAndServe], and [ListenAndServeTLS] methods after a call to Server.Shutdown or [Server.Close].
var Kind = map[byte]string{
0x0: "[0x0]RESERVED",
0x1: "[0x1]CONNECT",
0x2: "[0x2]CONNACK",
0x3: "[0x3]PUBLISH",
0x4: "[0x4]PUBACK",
0x5: "[0x5]PUBREC",
0x6: "[0x6]PUBREL",
0x7: "[0x7]PUBCOMP",
0x8: "[0x8]SUBSCRIBE",
0x9: "[0x9]SUBACK",
0xA: "[0xA]UNSUBSCRIBE",
0xB: "[0xB]UNSUBACK",
0xC: "[0xC]PINGREQ",
0xD: "[0xD]PINGRESP",
0xE: "[0xE]DISCONNECT",
0xF: "[0xF]AUTH",
}
Functions ¶
Types ¶
type Client ¶
type Client struct { // URL specifies either the URI being requested (for server requests) or the URL to access (for client requests). // // For server requests, the URL is parsed from the URI supplied on the Request-Line as stored in RequestURI. // For most requests, fields other than Path and RawQuery will be empty. (See RFC 7230, Section 5.3) // // For client requests, the URL's Host specifies the server to // connect to, while the Request's Host field optionally // specifies the Host header value to send in the MQTT request. URL *url.URL // DialContext specifies the dial function for creating unencrypted TCP connections. // If DialContext is nil (and the deprecated Dial below is also nil), then the transport dials using package net. // // DialContext runs concurrently with calls to RoundTrip. // A RoundTrip call that initiates a dial may end up using // a connection dialed previously when the earlier connection // becomes idle before the later DialContext completes. DialContext func(ctx context.Context, network, addr string) (net.Conn, error) // DialTLSContext specifies an optional dial function for creating TLS connections for non-proxied HTTPS requests. // // If DialTLSContext is nil (and the deprecated DialTLS below is also nil), DialContext and TLSClientConfig are used. // // If DialTLSContext is set, the Dial and DialContext hooks are not used for HTTPS // requests and the TLSClientConfig and TLSHandshakeTimeout are ignored. // The returned net.Conn is assumed to already be past the TLS handshake. DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error) // TLSClientConfig specifies the TLS configuration to use with tls.Client. // If nil, the default configuration is used. // If non-nil, HTTP/2 support may not be enabled by default. TLSClientConfig *tls.Config // TLSHandshakeTimeout specifies the maximum amount of time to wait for a TLS handshake. Zero means no timeout. TLSHandshakeTimeout time.Duration // Timeout specifies a time limit for requests made by this Client. // The timeout includes connection time, any redirects, and reading the response body. // The timer remains running after Get, Head, Post, or Do return and will interrupt reading of the Response.Body. // // A Timeout of zero means no timeout. // // The Client cancels requests to the underlying Transport as if the Request's Context ended. // // For compatibility, the Client will also use the deprecated CancelRequest method on Transport if found. // New RoundTripper implementations should use the Request's Context // for cancellation instead of implementing CancelRequest. Timeout time.Duration // contains filtered or unexported fields }
A Client is an MQTT client. Its zero value ([DefaultClient]) is a usable client that uses [DefaultTransport].
The [Client.Transport] typically has internal state (cached TCP connections), so Clients should be reused instead of created as needed. Clients are safe for concurrent use by multiple goroutines.
A Client is higher-level than a [RoundTripper] (such as [Transport]) and additionally handles HTTP details such as cookies and redirects.
func (*Client) ConnectAndSubscribe ¶
type ConnState ¶
type ConnState int
A ConnState represents the state of a client connection to a server. It's used by the optional [Server.ConnState] hook.
const ( // StateNew represents a new connection that is expected to // send a request immediately. Connections begin at this // state and then transition to either StateActive or // StateClosed. StateNew ConnState = iota // StateActive represents a connection that has read 1 or more // bytes of a request. The Server.ConnState hook for // StateActive fires before the request has entered a handler // and doesn't fire again until the request has been // handled. After the request is handled, the state // transitions to StateClosed, StateHijacked, or StateIdle. // For HTTP/2, StateActive fires on the transition from zero // to one active request, and only transitions away once all // active requests are complete. That means that ConnState // cannot be used to do per-request work; ConnState only notes // the overall state of the connection. StateActive // StateIdle represents a connection that has finished // handling a request and is in the keep-alive state, waiting // for a new request. Connections transition from StateIdle // to either StateActive or StateClosed. StateIdle // StateHijacked represents a hijacked connection. // This is a terminal state. It does not transition to StateClosed. StateHijacked // StateClosed represents a closed connection. // This is a terminal state. Hijacked connections do not // transition to StateClosed. StateClosed )
type Endpoint ¶
type Endpoint struct {
// contains filtered or unexported fields
}
Endpoint 不提供事务能力,事务需要其他的上层协议来处理
type Handler ¶
type Handler interface {
ServeMQTT(ResponseWriter, packet.Packet)
}
A Handler responds to an MQTT request.
type HandlerFunc ¶
type HandlerFunc func(ResponseWriter, packet.Packet)
func (HandlerFunc) ServeMQTT ¶
func (f HandlerFunc) ServeMQTT(rw ResponseWriter, r packet.Packet)
type MemorySubscribed ¶
type MemorySubscribed struct {
// contains filtered or unexported fields
}
func NewMemorySubscribed ¶
func NewMemorySubscribed(s *Server) *MemorySubscribed
func (*MemorySubscribed) CleanEmptyTopic ¶
func (m *MemorySubscribed) CleanEmptyTopic()
func (*MemorySubscribed) Print ¶
func (m *MemorySubscribed) Print()
func (*MemorySubscribed) Publish ¶
func (m *MemorySubscribed) Publish(message *packet.Message) error
Publish 发布消息,如果是新topic需要额外处理存量connect订阅列表的构建
func (*MemorySubscribed) Subscribe ¶
func (m *MemorySubscribed) Subscribe(c *conn)
func (*MemorySubscribed) Unsubscribe ¶
func (m *MemorySubscribed) Unsubscribe(c *conn)
type Option ¶
type Option func(*Options)
func Subscription ¶
func Subscription(subscription ...packet.Subscription) Option
type Options ¶
type Options struct { URL string // client used ClientID string Version byte Subscriptions []packet.Subscription }
type ResponseWriter ¶
type Server ¶
type Server struct { Handler Handler WebsocketHandler websocket.Handler // TLSConfig optionally provides a TLS configuration for use // by ServeTLS and ListenAndServeTLS. Note that this value is // cloned by ServeTLS and ListenAndServeTLS, so it's not // possible to modify the configuration with methods like // tls.Config.SetSessionTicketKeys. To use // SetSessionTicketKeys, use Server.Serve with a TLS Listener // instead. TLSConfig *tls.Config // ConnState specifies an optional callback function that is // called when a client connection changes state. See the // ConnState type and associated constants for details. ConnState func(net.Conn, ConnState) // ConnContext optionally specifies a function that modifies // the context used for a new connection c. The provided ctx // is derived from the base context and has a ServerContextKey // value. ConnContext func(ctx context.Context, c net.Conn) context.Context // contains filtered or unexported fields }
A Server defines parameters for running an HTTP server. The zero value for Server is a valid configuration.
func (*Server) ListenAndServe ¶
func (*Server) ListenAndServeTLS ¶
func (*Server) ListenAndServeWebsocket ¶
ListenAndServeWebsocket TODO
func (*Server) Serve ¶
Serve accepts incoming connections on the Listener l, creating a new service goroutine for each. The service goroutines read requests and then call srv.Handler to reply to them.
HTTP/2 support is only enabled if the Listener returns *tls.Conn connections. and they were configured with "h2" in the TLS Config.NextProtos.
Serve always returns a non-nil error and closes l. After Server.Shutdown or [Server.Close], the returned error is ErrServerClosed.
type Stat ¶
type Stat struct { Uptime prometheus.Counter ActiveConnections prometheus.Gauge PacketReceived prometheus.Counter ByteReceived prometheus.Counter PacketSent prometheus.Counter ByteSent prometheus.Counter }
func (*Stat) RefreshUptime ¶
func (s *Stat) RefreshUptime()
type TopicSubscribed ¶
type TopicSubscribed struct { TopicName string // contains filtered or unexported fields }
TopicSubscribed 用来存储当前topic有哪些客户端订阅了
func NewTopicSubscribed ¶
func NewTopicSubscribed(topicName string) *TopicSubscribed
func (*TopicSubscribed) Exchange ¶
func (s *TopicSubscribed) Exchange(message *packet.Message) error
func (*TopicSubscribed) Len ¶
func (s *TopicSubscribed) Len() int
func (*TopicSubscribed) Subscribe ¶
func (s *TopicSubscribed) Subscribe(c *conn)
func (*TopicSubscribed) Unsubscribe ¶
func (s *TopicSubscribed) Unsubscribe(c *conn) int