mqtt

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 22 Imported by: 0

README ¶

Mochi-MQTT Server

build status Coverage Status Go Report Card Go Reference contributions welcome

English | 简体中文 | 日本語 | Translators Wanted!

🎆 mochi-co/mqtt is now part of the new mochi-mqtt organisation. Read about this announcement here.

Mochi-MQTT is a fully compliant, embeddable high-performance Go MQTT v5 (and v3.1.1) broker/server

Mochi MQTT is an embeddable fully compliant MQTT v5 broker server written in Go, designed for the development of telemetry and internet-of-things projects. The server can be used either as a standalone binary or embedded as a library in your own applications, and has been designed to be as lightweight and fast as possible, with great care taken to ensure the quality and maintainability of the project.

What is MQTT?

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks (Learn more). Mochi MQTT fully implements version 5.0.0 of the MQTT protocol.

Mochi-MQTT Features
  • Full MQTTv5 Feature Compliance, compatibility for MQTT v3.1.1 and v3.0.0:
    • User and MQTTv5 Packet Properties
    • Topic Aliases
    • Shared Subscriptions
    • Subscription Options and Subscription Identifiers
    • Message Expiry
    • Client Session Expiry
    • Send and Receive QoS Flow Control Quotas
    • Server-side Disconnect and Auth Packets
    • Will Delay Intervals
    • Plus all the original MQTT features of Mochi MQTT v1, such as Full QoS(0,1,2), $SYS topics, retained messages, etc.
  • Developer-centric:
    • Most core broker code is now exported and accessible, for total developer control.
    • Full-featured and flexible Hook-based interfacing system to provide easy 'plugin' development.
    • Direct Packet Injection using special inline client, or masquerade as existing clients.
  • Performant and Stable:
    • Our classic trie-based Topic-Subscription model.
    • Client-specific write buffers to avoid issues with slow-reading or irregular client behaviour.
    • Passes all Paho Interoperability Tests for MQTT v5 and MQTT v3.
    • Over a thousand carefully considered unit test scenarios.
  • TCP, Websocket (including SSL/TLS), and $SYS Dashboard listeners.
  • Built-in Redis, Badger, Pebble and Bolt Persistence using Hooks (but you can also make your own).
  • Built-in Rule-based Authentication and ACL Ledger using Hooks (also make your own).
Compatibility Notes

Because of the overlap between the v5 specification and previous versions of mqtt, the server can accept both v5 and v3 clients, but note that in cases where both v5 an v3 clients are connected, properties and features provided for v5 clients will be downgraded for v3 clients (such as user properties).

Support for MQTT v3.0.0 and v3.1.1 is considered hybrid-compatibility. Where not specifically restricted in the v3 specification, more modern and safety-first v5 behaviours are used instead - such as expiry for inflight and retained messages, and clients - and quality-of-service flow control limits.

When is this repo updated?

Unless it's a critical issue, new releases typically go out over the weekend.

Roadmap

  • Please open an issue to request new features or event hooks!
  • Cluster support.
  • Enhanced Metrics support.

Quick Start

Running the Broker with Go

Mochi MQTT can be used as a standalone broker. Simply checkout this repository and run the cmd/main.go entrypoint in the cmd folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners.

cd cmd
go build -o mqtt && ./mqtt
Using Docker

You can now pull and run the official Mochi MQTT image from our Docker repo:

docker pull mochimqtt/server
or
docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server 

For most use cases, you can use File Based Configuration to configure the server, by specifying a valid yaml or json config file.

A simple Dockerfile is provided for running the cmd/main.go Websocket, TCP, and Stats server, using the allow-all auth hook.

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest
File Based Configuration

You can use File Based Configuration with either the Docker image (described above), or by running the build binary with the --config=config.yaml or --config=config.json parameter.

Configuration files provide a convenient mechanism for easily preparing a server with the most common configurations. You can enable and configure built-in hooks and listeners, and specify server options and compatibilities:

listeners:
  - type: "tcp"
    id: "tcp12"
    address: ":1883"
  - type: "ws"
    id: "ws1"
    address: ":1882"
  - type: "sysinfo"
    id: "stats"
    address: ":1880"
hooks:
  auth:
    allow_all: true
options:
  inline_client: true

Please review the examples found in examples/config for all available configuration options.

There are a few conditions to note:

  1. If you use file-based configuration, the supported hook types for configuration are currently limited to auth, storage, and debug. Each type of hook can only have one instance.
  2. You can only use built in hooks with file-based configuration, as the type and configuration structure needs to be known by the server in order for it to be applied.
  3. You can only use built in listeners, for the reasons above.

If you need to implement custom hooks or listeners, please do so using the traditional manner indicated in cmd/main.go.

Developing with Mochi MQTT

Importing as a package

Importing Mochi MQTT as a package requires just a few lines of code to get started.

import (
  "log"

  mqtt "github.com/aixj1984/mqtt-server"
  "github.com/aixj1984/mqtt-server/hooks/auth"
  "github.com/aixj1984/mqtt-server/listeners"
)

func main() {
  // Create signals channel to run server until interrupted
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()

  // Create the new MQTT Server.
  server := mqtt.New(nil)
  
  // Allow all connections.
  _ = server.AddHook(new(auth.AllowHook), nil)
  
  // Create a TCP listener on a standard port.
  tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }
  

  go func() {
    err := server.Serve()
    if err != nil {
      log.Fatal(err)
    }
  }()

  // Run server until interrupted
  <-done

  // Cleanup
}

Examples of running the broker with various configurations can be found in the examples folder.

Network Listeners

The server comes with a variety of pre-packaged network listeners which allow the broker to accept connections on different protocols. The current listeners are:

Listener Usage
listeners.NewTCP A TCP listener
listeners.NewUnixSock A Unix Socket listener
listeners.NewNet A net.Listener listener
listeners.NewWebsocket A Websocket listener
listeners.NewHTTPStats An HTTP $SYS info dashboard
listeners.NewHTTPHealthCheck An HTTP healthcheck listener to provide health check responses for e.g. cloud infrastructure

Use the listeners.Listener interface to develop new listeners. If you do, please let us know!

A *listeners.Config may be passed to configure TLS.

Examples of usage can be found in the examples folder or cmd/main.go.

Server Options and Capabilities

A number of configurable options are available which can be used to alter the behaviour or restrict access to certain features in the server.

server := mqtt.New(&mqtt.Options{
  Capabilities: mqtt.Capabilities{
    MaximumSessionExpiryInterval: 3600,
    MaximumClientWritesPending: 3,
    Compatibilities: mqtt.Compatibilities{
      ObscureNotAuthorized: true,
    },
  },
  ClientNetWriteBufferSize: 4096,
  ClientNetReadBufferSize: 4096,
  SysTopicResendInterval: 10,
  InlineClient: false,
})

Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. ClientNetWriteBufferSize and ClientNetReadBufferSize can be configured to adjust memory usage per client, based on your needs. The size of Capabilities.MaximumClientWritesPending will affect the memory usage of the server. If the number of IoT devices online at the same time is large, and the set value is very large, even if there is no data transmission, the memory usage of the server will increase a lot. The default value is 1024*8, and this parameter can be adjusted according to the actual situation.

Default Configuration Notes

Some choices were made when deciding the default configuration that need to be mentioned here:

  • By default, the value of server.Options.Capabilities.MaximumMessageExpiryInterval is set to 86400 (24 hours), in order to prevent exposing the broker to DOS attacks on hostile networks when using the out-of-the-box configuration (as an infinite expiry would allow an infinite number of retained/inflight messages to accumulate). If you are operating in a trusted environment, or you have capacity for a larger retention period, you may wish to override this (set to 0 for no expiry).

Event Hooks

A universal event hooks system allows developers to hook into various parts of the server and client life cycle to add and modify functionality of the broker. These universal hooks are used to provide everything from authentication, persistent storage, to debugging tools.

Hooks are stackable - you can add multiple hooks to a server, and they will be run in the order they were added. Some hooks modify values, and these modified values will be passed to the subsequent hooks before being returned to the runtime code.

Type Import Info
Access Control mochi-mqtt/server/hooks/auth . AllowHook Allow access to all connecting clients and read/write to all topics.
Access Control mochi-mqtt/server/hooks/auth . Auth Rule-based access control ledger.
Persistence mochi-mqtt/server/hooks/storage/bolt Persistent storage using BoltDB (deprecated).
Persistence mochi-mqtt/server/hooks/storage/badger Persistent storage using BadgerDB.
Persistence mochi-mqtt/server/hooks/storage/pebble Persistent storage using PebbleDB.
Persistence mochi-mqtt/server/hooks/storage/redis Persistent storage using Redis.
Debugging mochi-mqtt/server/hooks/debug Additional debugging output to visualise packet flow.

Many of the internal server functions are now exposed to developers, so you can make your own Hooks by using the above as examples. If you do, please Open an issue and let everyone know!

Access Control
Allow Hook

By default, Mochi MQTT uses a DENY-ALL access control rule. To allow connections, this must overwritten using an Access Control hook. The simplest of these hooks is the auth.AllowAll hook, which provides ALLOW-ALL rules to all connections, subscriptions, and publishing. It's also the simplest hook to use:

server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)

Don't do this if you are exposing your server to the internet or untrusted networks - it should really be used for development, testing, and debugging only.

Auth Ledger

The Auth Ledger hook provides a sophisticated mechanism for defining access rules in a struct format. Auth ledger rules come in two forms: Auth rules (connection), and ACL rules (publish subscribe).

Auth rules have 4 optional criteria and an assertion flag:

Criteria Usage
Client client id of the connecting client
Username username of the connecting client
Password password of the connecting client
Remote the remote address or ip of the client
Allow true (allow this user) or false (deny this user)

ACL rules have 3 optional criteria and an filter match:

Criteria Usage
Client client id of the connecting client
Username username of the connecting client
Remote the remote address or ip of the client
Filters an array of filters to match

Rules are processed in index order (0,1,2,3), returning on the first matching rule. See hooks/auth/ledger.go to review the structs.

server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // Auth disallows all by default
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL allows all by default
      {Remote: "127.0.0.1:*"}, // local superuser allow all
      {
        // user melon can read and write to their own topic
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, // can write to updates, but can't read updates from others
        },
      },
      {
        // Otherwise, no clients have publishing permissions
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

The ledger can also be stored as JSON or YAML and loaded using the Data field:

err := server.AddHook(new(auth.Hook), &auth.Options{
    Data: data, // build ledger from byte slice: yaml or json
})

See examples/auth/encoded/main.go for more information.

Persistent Storage
Redis

A basic Redis storage hook is available which provides persistence for the broker. It can be added to the server in the same fashion as any other hook, with several options. It uses github.com/go-redis/redis/v8 under the hook, and is completely configurable through the Options value.

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // default redis address
    Password: "",               // your password
    DB:       0,                // your redis db
  },
})
if err != nil {
  log.Fatal(err)
}

For more information on how the redis hook works, or how to use it, see the examples/persistence/redis/main.go or hooks/storage/redis code.

Pebble DB

There's also a Pebble Db storage hook if you prefer file-based storage. It can be added and configured in much the same way as the other hooks (with somewhat less options).

err := server.AddHook(new(pebble.Hook), &pebble.Options{
  Path: pebblePath,
  Mode: pebble.NoSync,
})
if err != nil {
  log.Fatal(err)
}

For more information on how the pebble hook works, or how to use it, see the examples/persistence/pebble/main.go or hooks/storage/pebble code.

Badger DB

Similarly, for file-based storage, there is also a BadgerDB storage hook available. It can be added and configured in much the same way as the other hooks.

err := server.AddHook(new(badger.Hook), &badger.Options{
  Path: badgerPath,
})
if err != nil {
  log.Fatal(err)
}

For more information on how the badger hook works, or how to use it, see the examples/persistence/badger/main.go or hooks/storage/badger code.

There is also a BoltDB hook which has been deprecated in favour of Badger, but if you need it, check examples/persistence/bolt/main.go.

Developing with Event Hooks

Many hooks are available for interacting with the broker and client lifecycle. The function signatures for all the hooks and mqtt.Hook interface can be found in hooks.go.

The most flexible event hooks are OnPacketRead, OnPacketEncode, and OnPacketSent - these hooks be used to control and modify all incoming and outgoing packets.

Function Usage
OnStarted Called when the server has successfully started.
OnStopped Called when the server has successfully stopped.
OnConnectAuthenticate Called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed.
OnACLCheck Called when a user attempts to publish or subscribe to a topic filter. As above.
OnSysInfoTick Called when the $SYS topic values are published out.
OnConnect Called when a new client connects, may return an error or packet code to halt the client connection process.
OnSessionEstablish Called immediately after a new client connects and authenticates and immediately before the session is established and CONNACK is sent.
OnSessionEstablished Called when a new client successfully establishes a session (after OnConnect)
OnDisconnect Called when a client is disconnected for any reason.
OnAuthPacket Called when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification.
OnPacketRead Called when a packet is received from a client. Allows packet modification.
OnPacketEncode Called immediately before a packet is encoded to be sent to a client. Allows packet modification.
OnPacketSent Called when a packet has been sent to a client.
OnPacketProcessed Called when a packet has been received and successfully handled by the broker.
OnSubscribe Called when a client subscribes to one or more filters. Allows packet modification.
OnSubscribed Called when a client successfully subscribes to one or more filters.
OnSelectSubscribers Called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification.
OnUnsubscribe Called when a client unsubscribes from one or more filters. Allows packet modification.
OnUnsubscribed Called when a client successfully unsubscribes from one or more filters.
OnPublish Called when a client publishes a message. Allows packet modification.
OnPublished Called when a client has published a message to subscribers.
OnPublishDropped Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond.
OnRetainMessage Called then a published message is retained.
OnRetainPublished Called then a retained message is published to a client.
OnQosPublish Called when a publish packet with Qos >= 1 is issued to a subscriber.
OnQosComplete Called when the Qos flow for a message has been completed.
OnQosDropped Called when an inflight message expires before completion.
OnPacketIDExhausted Called when a client runs out of unused packet ids to assign.
OnWill Called when a client disconnects and intends to issue a will message. Allows packet modification.
OnWillSent Called when an LWT message has been issued from a disconnecting client.
OnClientExpired Called when a client session has expired and should be deleted.
OnRetainedExpired Called when a retained message has expired and should be deleted.
StoredClients Returns clients, eg. from a persistent store.
StoredSubscriptions Returns client subscriptions, eg. from a persistent store.
StoredInflightMessages Returns inflight messages, eg. from a persistent store.
StoredRetainedMessages Returns retained messages, eg. from a persistent store.
StoredSysInfo Returns stored system info values, eg. from a persistent store.

If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need OnACLCheck and OnConnectAuthenticate.

Inline Client (v2.4.0+)

It's now possible to subscribe and publish to topics directly from the embedding code, by using the inline client feature. Currently, the inline client does not support shared subscriptions. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options:

server := mqtt.New(&mqtt.Options{
  InlineClient: true,
})

Once enabled, you will be able to use the server.Publish, server.Subscribe, and server.Unsubscribe methods to issue and received messages from broker-adjacent code.

See direct examples for real-life usage examples.

Inline Publish

To publish basic message to a topic from within the embedding application, you can use the server.Publish(topic string, payload []byte, retain bool, qos byte) error method.

err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)

The Qos byte in this case is only used to set the upper qos limit available for subscribers, as per MQTT v5 spec.

Inline Subscribe

To subscribe to a topic filter from within the embedding application, you can use the server.Subscribe(filter string, subscriptionId int, handler InlineSubFn) error method with a callback function. Note that only QoS 0 is supported for inline subscriptions. If you wish to have multiple callbacks for the same filter, you can use the MQTTv5 subscriptionId property to differentiate.

callbackFn := func(cl *mqtt.Client, sub packets.Subscription, pk packets.Packet) {
    server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload))
}
server.Subscribe("direct/#", 1, callbackFn)
Inline Unsubscribe

You may wish to unsubscribe if you have subscribed to a filter using the inline client. You can do this easily with the server.Unsubscribe(filter string, subscriptionId int) error method:

server.Unsubscribe("direct/#", 1)
Packet Injection

If you want more control, or want to set specific MQTT v5 properties and other values you can create your own publish packets from a client of your choice. This method allows you to inject MQTT packets (no just publish) directly into the runtime as though they had been received by a specific client.

Packet injection can be used for any MQTT packet, including ping requests, subscriptions, etc. And because the Clients structs and methods are now exported, you can even inject packets on behalf of a connected client (if you have a very custom requirements).

Most of the time you'll want to use the Inline Client described above, as it has unique privileges: it bypasses all ACL and topic validation checks, meaning it can even publish to $SYS topics. In this case, you can create an inline client from scratch which will behave the same as the built-in inline client.

cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
  FixedHeader: packets.FixedHeader{
    Type: packets.Publish,
  },
  TopicName: "direct/publish",
  Payload: []byte("scheduled message"),
})

MQTT packets still need to be correctly formed, so refer our the test packets catalogue and MQTTv5 Specification for inspiration.

See the hooks example to see this feature in action.

Testing
Unit Tests

Mochi MQTT tests over a thousand scenarios with thoughtfully hand written unit tests to ensure each function does exactly what we expect. You can run the tests using go:

go run --cover ./...
Paho Interoperability Test

You can check the broker against the Paho Interoperability Test by starting the broker using examples/paho/main.go, and then running the mqtt v5 and v3 tests with python3 client_test5.py from the interoperability folder.

Note that there are currently a number of outstanding issues regarding false negatives in the paho suite, and as such, certain compatibility modes are enabled in the paho/main.go example.

Performance Benchmarks

Mochi MQTT performance is comparable with popular brokers such as Mosquitto, EMQX, and others.

Performance benchmarks were tested using MQTT-Stresser on a Apple Macbook Air M2, using cmd/main.go default settings. Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better.

The values presented in the benchmark are not representative of true messages per second throughput. They rely on an unusual calculation by mqtt-stresser, but are usable as they are consistent across all brokers. Benchmarks are provided as a general performance expectation guideline only. Comparisons are performed using out-of-the-box default configurations.

mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 124,772 125,456 124,614 314,461 313,186 311,910
Mosquitto v2.0.15 155,920 155,919 155,918 185,485 185,097 184,709
EMQX v5.0.11 156,945 156,257 155,568 17,918 17,783 17,649
Rumqtt v0.21.0 112,208 108,480 104,753 135,784 126,446 117,108

mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 41,825 31,663 23,008 144,058 65,903 37,618
Mosquitto v2.0.15 42,729 38,633 29,879 23,241 19,714 18,806
EMQX v5.0.11 21,553 17,418 14,356 4,257 3,980 3,756
Rumqtt v0.21.0 42,213 23,153 20,814 49,465 36,626 19,283

Million Message Challenge (hit the server with 1 million messages immediately):

mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 13,532 4,425 2,344 52,120 7,274 2,701
Mosquitto v2.0.15 3,826 3,395 3,032 1,200 1,150 1,118
EMQX v5.0.11 4,086 2,432 2,274 434 333 311
Rumqtt v0.21.0 78,972 5,047 3,804 4,286 3,249 2,027

Not sure what's going on with EMQX here, perhaps the docker out-of-the-box settings are not optimal, so take it with a pinch of salt as we know for a fact it's a solid piece of software.

Contribution Guidelines

Contributions and feedback are both welcomed and encouraged! Open an issue to report a bug, ask a question, or make a feature request. If you open a pull request, please try to follow the following guidelines:

  • Try to maintain test coverage where reasonably possible.
  • Clearly state what the PR does and why.
  • Please remember to add your SPDX FileContributor tag to files where you have made a meaningful contribution.

SPDX Annotations are used to clearly indicate the license, copyright, and contributions of each file in a machine-readable format. If you are adding a new file to the repository, please ensure it has the following SPDX header:

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt
// SPDX-FileContributor: Your name or alias <optional@email.address>

package name

Please ensure to add a new SPDX-FileContributor line for each contributor to the file. Refer to other files for examples. Please remember to do this, your contributions to this project are valuable and appreciated - it's important to receive credit!

Stargazers over time 🥰

Stargazers over time Are you using Mochi MQTT in a project? Let us know!

Documentation ¶

Overview ¶

Package mqtt provides a high performance, fully compliant MQTT v5 broker server with v3.1.1 backward compatibility.

Index ¶

Constants ¶

View Source
const (
	SetOptions byte = iota
	OnSysInfoTick
	OnStarted
	OnStopped
	OnConnectAuthenticate
	OnACLCheck
	OnConnect
	OnSessionEstablish
	OnSessionEstablished
	OnDisconnect
	OnAuthPacket
	OnPacketRead
	OnPacketEncode
	OnPacketSent
	OnPacketProcessed
	OnSubscribe
	OnSubscribed
	OnSelectSubscribers
	OnUnsubscribe
	OnUnsubscribed
	OnPublish
	OnPublished
	OnPublishDropped
	OnRetainMessage
	OnRetainPublished
	OnQosPublish
	OnQosComplete
	OnQosDropped
	OnPacketIDExhausted
	OnWill
	OnWillSent
	OnClientExpired
	OnRetainedExpired
	StoredClients
	StoredSubscriptions
	StoredInflightMessages
	StoredRetainedMessages
	StoredSysInfo
)
View Source
const (
	Version = "2.6.5" // the current server version.

	LocalListener  = "local"
	InlineClientId = "inline"
)

Variables ¶

View Source
var (
	// Deprecated: Use NewDefaultServerCapabilities to avoid data race issue.
	DefaultServerCapabilities = NewDefaultServerCapabilities()

	ErrListenerIDExists       = errors.New("listener id already exists")                               // a listener with the same id already exists
	ErrConnectionClosed       = errors.New("connection not open")                                      // connection is closed
	ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") // inline client is not enabled by default
	ErrOptionsUnreadable      = errors.New("unable to read options from bytes")
)
View Source
var (
	SharePrefix = "$SHARE" // the prefix indicating a share topic
	SysPrefix   = "$SYS"   // the prefix indicating a system info topic
)
View Source
var (
	// ErrInvalidConfigType indicates a different Type of config value was expected to what was received.
	ErrInvalidConfigType = errors.New("invalid config type provided")
)
View Source
var (
	ErrMinimumKeepalive = errors.New("client keepalive is below minimum recommended value and may exhibit connection instability")
)

Functions ¶

func Int64toa ¶

func Int64toa(v int64) string

Int64toa converts an int64 to a string.

func IsSharedFilter ¶

func IsSharedFilter(filter string) bool

IsSharedFilter returns true if the filter uses the share prefix.

func IsValidFilter ¶

func IsValidFilter(filter string, forPublish bool) bool

IsValidFilter returns true if the filter is valid.

Types ¶

type Capabilities ¶

type Capabilities struct {
	MaximumClients               int64  `yaml:"maximum_clients" json:"maximum_clients"`                                 // maximum number of connected clients
	MaximumMessageExpiryInterval int64  `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"` // maximum message expiry if message expiry is 0 or over
	MaximumClientWritesPending   int32  `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"`     // maximum number of pending message writes for a client
	MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"` // maximum number of seconds to keep disconnected sessions
	MaximumPacketSize            uint32 `yaml:"maximum_packet_size" json:"maximum_packet_size"`                         // maximum packet size, no limit if 0

	ReceiveMaximum         uint16          `yaml:"receive_maximum" json:"receive_maximum"`                   // maximum number of concurrent qos messages per client
	MaximumInflight        uint16          `yaml:"maximum_inflight" json:"maximum_inflight"`                 // maximum number of qos > 0 messages can be stored, 0(=8192)-65535
	TopicAliasMaximum      uint16          `yaml:"topic_alias_maximum" json:"topic_alias_maximum"`           // maximum topic alias value
	SharedSubAvailable     byte            `yaml:"shared_sub_available" json:"shared_sub_available"`         // support of shared subscriptions
	MinimumProtocolVersion byte            `yaml:"minimum_protocol_version" json:"minimum_protocol_version"` // minimum supported mqtt version
	Compatibilities        Compatibilities `yaml:"compatibilities" json:"compatibilities"`                   // version compatibilities the server provides
	MaximumQos             byte            `yaml:"maximum_qos" json:"maximum_qos"`                           // maximum qos value available to clients
	RetainAvailable        byte            `yaml:"retain_available" json:"retain_available"`                 // support of retain messages
	WildcardSubAvailable   byte            `yaml:"wildcard_sub_available" json:"wildcard_sub_available"`     // support of wildcard subscriptions
	SubIDAvailable         byte            `yaml:"sub_id_available" json:"sub_id_available"`                 // support of subscription identifiers
	// contains filtered or unexported fields
}

Capabilities indicates the capabilities and features provided by the server.

func NewDefaultServerCapabilities ¶

func NewDefaultServerCapabilities() *Capabilities

NewDefaultServerCapabilities defines the default features and capabilities provided by the server.

type Client ¶

type Client struct {
	Properties ClientProperties // client properties
	State      ClientState      // the operational state of the client.
	Net        ClientConnection // network connection state of the client
	ID         string           // the client id.

	sync.RWMutex // mutex
	// contains filtered or unexported fields
}

Client contains information about a client known by the broker.

func (*Client) ClearExpiredInflights ¶

func (cl *Client) ClearExpiredInflights(now, maximumExpiry int64) []uint16

ClearExpiredInflights deletes any inflight messages which have expired.

func (*Client) ClearInflights ¶

func (cl *Client) ClearInflights()

ClearInflights deletes all inflight messages for the client, e.g. for a disconnected user with a clean session.

func (*Client) Closed ¶

func (cl *Client) Closed() bool

Closed returns true if client connection is closed.

func (*Client) NextPacketID ¶

func (cl *Client) NextPacketID() (i uint32, err error)

NextPacketID returns the next available (unused) packet id for the client. If no unused packet ids are available, an error is returned and the client should be disconnected.

func (*Client) ParseConnect ¶

func (cl *Client) ParseConnect(lid string, pk packets.Packet)

ParseConnect parses the connect parameters and properties for a client.

func (*Client) Read ¶

func (cl *Client) Read(packetHandler ReadFn) error

Read reads incoming packets from the connected client and transforms them into packets to be handled by the packetHandler.

func (*Client) ReadFixedHeader ¶

func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error

ReadFixedHeader reads in the values of the next packet's fixed header.

func (*Client) ReadPacket ¶

func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err error)

ReadPacket reads the remaining buffer into an MQTT packet.

func (*Client) ResendInflightMessages ¶

func (cl *Client) ResendInflightMessages(force bool) error

ResendInflightMessages attempts to resend any pending inflight messages to connected clients.

func (*Client) Stop ¶

func (cl *Client) Stop(err error)

Stop instructs the client to shut down all processing goroutines and disconnect.

func (*Client) StopCause ¶

func (cl *Client) StopCause() error

StopCause returns the reason the client connection was stopped, if any.

func (*Client) StopTime ¶

func (cl *Client) StopTime() int64

StopTime returns the the time the client disconnected in unix time, else zero.

func (*Client) WriteLoop ¶

func (cl *Client) WriteLoop()

WriteLoop ranges over pending outbound messages and writes them to the client connection.

func (*Client) WritePacket ¶

func (cl *Client) WritePacket(pk packets.Packet) error

WritePacket encodes and writes a packet to the client.

type ClientConnection ¶

type ClientConnection struct {
	Conn net.Conn // the net.Conn used to establish the connection

	Remote   string // the remote address of the client
	Listener string // listener id of the client
	Inline   bool   // if true, the client is the built-in 'inline' embedded client
	// contains filtered or unexported fields
}

ClientConnection contains the connection transport and metadata for the client.

type ClientProperties ¶

type ClientProperties struct {
	Props           packets.Properties
	Will            Will
	Username        []byte
	ProtocolVersion byte
	Clean           bool
}

ClientProperties contains the properties which define the client behaviour.

type ClientState ¶

type ClientState struct {
	TopicAliases TopicAliases // a map of topic aliases

	Inflight      *Inflight      // a map of in-flight qos messages
	Subscriptions *Subscriptions // a map of the subscription filters a client maintains

	Keepalive       uint16 // the number of seconds the connection can wait
	ServerKeepalive bool   // keepalive was set by the server
	// contains filtered or unexported fields
}

ClientState tracks the state of the client.

type ClientSubscriptions ¶

type ClientSubscriptions map[string]packets.Subscription

ClientSubscriptions is a map of aggregated subscriptions for a client.

type Clients ¶

type Clients struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Clients contains a map of the clients known by the broker.

func NewClients ¶

func NewClients() *Clients

NewClients returns an instance of Clients.

func (*Clients) Add ¶

func (cl *Clients) Add(val *Client)

Add adds a new client to the clients map, keyed on client id.

func (*Clients) Delete ¶

func (cl *Clients) Delete(id string)

Delete removes a client from the internal map.

func (*Clients) Get ¶

func (cl *Clients) Get(id string) (*Client, bool)

Get returns the value of a client if it exists.

func (*Clients) GetAll ¶

func (cl *Clients) GetAll() map[string]*Client

GetAll returns all the clients.

func (*Clients) GetByListener ¶

func (cl *Clients) GetByListener(id string) []*Client

GetByListener returns clients matching a listener id.

func (*Clients) Len ¶

func (cl *Clients) Len() int

Len returns the length of the clients map.

type Compatibilities ¶

type Compatibilities struct {
	ObscureNotAuthorized       bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"`                 // return unspecified errors instead of not authorized
	PassiveClientDisconnect    bool `yaml:"passive_client_disconnect" json:"passive_client_disconnect"`           // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation)
	AlwaysReturnResponseInfo   bool `yaml:"always_return_response_info" json:"always_return_response_info"`       // always return response info (useful for testing)
	RestoreSysInfoOnRestart    bool `yaml:"restore_sys_info_on_restart" json:"restore_sys_info_on_restart"`       // restore system info from store as if server never stopped
	NoInheritedPropertiesOnAck bool `yaml:"no_inherited_properties_on_ack" json:"no_inherited_properties_on_ack"` // don't allow inherited user properties on ack (paho - spec violation)
}

Compatibilities provides flags for using compatibility modes.

type Hook ¶

type Hook interface {
	ID() string
	Provides(b byte) bool
	Init(config any) error
	Stop() error
	SetOpts(l *slog.Logger, o *HookOptions)

	OnStarted()
	OnStopped()
	OnConnectAuthenticate(cl *Client, pk packets.Packet) bool
	OnACLCheck(cl *Client, topic string, write bool) bool
	OnSysInfoTick(*system.Info)
	OnConnect(cl *Client, pk packets.Packet) error
	OnSessionEstablish(cl *Client, pk packets.Packet)
	OnSessionEstablished(cl *Client, pk packets.Packet)
	OnDisconnect(cl *Client, err error, expire bool)
	OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error)
	OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error) // triggers when a new packet is received by a client, but before packet validation
	OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet        // modify a packet before it is byte-encoded and written to the client
	OnPacketSent(cl *Client, pk packets.Packet, b []byte)               // triggers when packet bytes have been written to the client
	OnPacketProcessed(cl *Client, pk packets.Packet, err error)         // triggers after a packet from the client been processed (handled)
	OnSubscribe(cl *Client, pk packets.Packet) packets.Packet
	OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte)
	OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers
	OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet
	OnUnsubscribed(cl *Client, pk packets.Packet)
	OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error)
	OnPublished(cl *Client, pk packets.Packet)
	OnPublishDropped(cl *Client, pk packets.Packet)
	OnRetainMessage(cl *Client, pk packets.Packet, r int64)
	OnRetainPublished(cl *Client, pk packets.Packet)
	OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)
	OnQosComplete(cl *Client, pk packets.Packet)
	OnQosDropped(cl *Client, pk packets.Packet)
	OnPacketIDExhausted(cl *Client, pk packets.Packet)
	OnWill(cl *Client, will Will) (Will, error)
	OnWillSent(cl *Client, pk packets.Packet)
	OnClientExpired(cl *Client)
	OnRetainedExpired(filter string)
	StoredClients() ([]storage.Client, error)
	StoredSubscriptions() ([]storage.Subscription, error)
	StoredInflightMessages() ([]storage.Message, error)
	StoredRetainedMessages() ([]storage.Message, error)
	StoredSysInfo() (storage.SystemInfo, error)
}

Hook provides an interface of handlers for different events which occur during the lifecycle of the broker.

type HookBase ¶

type HookBase struct {
	Hook
	Log  *slog.Logger
	Opts *HookOptions
}

HookBase provides a set of default methods for each hook. It should be embedded in all hooks.

func (*HookBase) ID ¶

func (h *HookBase) ID() string

ID returns the ID of the hook.

func (*HookBase) Init ¶

func (h *HookBase) Init(config any) error

Init performs any pre-start initializations for the hook, such as connecting to databases or opening files.

func (*HookBase) OnACLCheck ¶

func (h *HookBase) OnACLCheck(cl *Client, topic string, write bool) bool

OnACLCheck is called when a user attempts to subscribe or publish to a topic.

func (*HookBase) OnAuthPacket ¶

func (h *HookBase) OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error)

OnAuthPacket is called when an auth packet is received from the client.

func (*HookBase) OnClientExpired ¶

func (h *HookBase) OnClientExpired(cl *Client)

OnClientExpired is called when a client session has expired.

func (*HookBase) OnConnect ¶

func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) error

OnConnect is called when a new client connects.

func (*HookBase) OnConnectAuthenticate ¶

func (h *HookBase) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool

OnConnectAuthenticate is called when a user attempts to authenticate with the server.

func (*HookBase) OnDisconnect ¶

func (h *HookBase) OnDisconnect(cl *Client, err error, expire bool)

OnDisconnect is called when a client is disconnected for any reason.

func (*HookBase) OnPacketEncode ¶

func (h *HookBase) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet

OnPacketEncode is called before a packet is byte-encoded and written to the client.

func (*HookBase) OnPacketIDExhausted ¶

func (h *HookBase) OnPacketIDExhausted(cl *Client, pk packets.Packet)

OnPacketIDExhausted is called when the client runs out of unused packet ids to assign to a packet.

func (*HookBase) OnPacketProcessed ¶

func (h *HookBase) OnPacketProcessed(cl *Client, pk packets.Packet, err error)

OnPacketProcessed is called immediately after a packet from a client is processed.

func (*HookBase) OnPacketRead ¶

func (h *HookBase) OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error)

OnPacketRead is called when a packet is received.

func (*HookBase) OnPacketSent ¶

func (h *HookBase) OnPacketSent(cl *Client, pk packets.Packet, b []byte)

OnPacketSent is called immediately after a packet is written to a client.

func (*HookBase) OnPublish ¶

func (h *HookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error)

OnPublish is called when a client publishes a message.

func (*HookBase) OnPublishDropped ¶

func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet)

OnPublishDropped is called when a message to a client is dropped instead of being delivered.

func (*HookBase) OnPublished ¶

func (h *HookBase) OnPublished(cl *Client, pk packets.Packet)

OnPublished is called when a client has published a message to subscribers.

func (*HookBase) OnQosComplete ¶

func (h *HookBase) OnQosComplete(cl *Client, pk packets.Packet)

OnQosComplete is called when the Qos flow for a message has been completed.

func (*HookBase) OnQosDropped ¶

func (h *HookBase) OnQosDropped(cl *Client, pk packets.Packet)

OnQosDropped is called the Qos flow for a message expires.

func (*HookBase) OnQosPublish ¶

func (h *HookBase) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)

OnQosPublish is called when a publish packet with Qos > 1 is issued to a subscriber.

func (*HookBase) OnRetainMessage ¶

func (h *HookBase) OnRetainMessage(cl *Client, pk packets.Packet, r int64)

OnRetainMessage is called then a published message is retained.

func (*HookBase) OnRetainPublished ¶

func (h *HookBase) OnRetainPublished(cl *Client, pk packets.Packet)

OnRetainPublished is called when a retained message is published.

func (*HookBase) OnRetainedExpired ¶

func (h *HookBase) OnRetainedExpired(topic string)

OnRetainedExpired is called when a retained message for a topic has expired.

func (*HookBase) OnSelectSubscribers ¶

func (h *HookBase) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers

OnSelectSubscribers is called when selecting subscribers to receive a message.

func (*HookBase) OnSessionEstablish ¶

func (h *HookBase) OnSessionEstablish(cl *Client, pk packets.Packet)

OnSessionEstablish is called right after a new client connects and authenticates and right before the session is established and CONNACK is sent.

func (*HookBase) OnSessionEstablished ¶

func (h *HookBase) OnSessionEstablished(cl *Client, pk packets.Packet)

OnSessionEstablished is called when a new client establishes a session (after OnConnect).

func (*HookBase) OnStarted ¶

func (h *HookBase) OnStarted()

OnStarted is called when the server starts.

func (*HookBase) OnStopped ¶

func (h *HookBase) OnStopped()

OnStopped is called when the server stops.

func (*HookBase) OnSubscribe ¶

func (h *HookBase) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet

OnSubscribe is called when a client subscribes to one or more filters.

func (*HookBase) OnSubscribed ¶

func (h *HookBase) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte)

OnSubscribed is called when a client subscribes to one or more filters.

func (*HookBase) OnSysInfoTick ¶

func (h *HookBase) OnSysInfoTick(*system.Info)

OnSysInfoTick is called when the server publishes system info.

func (*HookBase) OnUnsubscribe ¶

func (h *HookBase) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet

OnUnsubscribe is called when a client unsubscribes from one or more filters.

func (*HookBase) OnUnsubscribed ¶

func (h *HookBase) OnUnsubscribed(cl *Client, pk packets.Packet)

OnUnsubscribed is called when a client unsubscribes from one or more filters.

func (*HookBase) OnWill ¶

func (h *HookBase) OnWill(cl *Client, will Will) (Will, error)

OnWill is called when a client disconnects and publishes an LWT message.

func (*HookBase) OnWillSent ¶

func (h *HookBase) OnWillSent(cl *Client, pk packets.Packet)

OnWillSent is called when an LWT message has been issued from a disconnecting client.

func (*HookBase) Provides ¶

func (h *HookBase) Provides(b byte) bool

Provides indicates which methods a hook provides. The default is none - this method should be overridden by the embedding hook.

func (*HookBase) SetOpts ¶

func (h *HookBase) SetOpts(l *slog.Logger, opts *HookOptions)

SetOpts is called by the server to propagate internal values and generally should not be called manually.

func (*HookBase) Stop ¶

func (h *HookBase) Stop() error

Stop is called to gracefully shut down the hook.

func (*HookBase) StoredClients ¶

func (h *HookBase) StoredClients() (v []storage.Client, err error)

StoredClients returns all clients from a store.

func (*HookBase) StoredInflightMessages ¶

func (h *HookBase) StoredInflightMessages() (v []storage.Message, err error)

StoredInflightMessages returns all inflight messages from a store.

func (*HookBase) StoredRetainedMessages ¶

func (h *HookBase) StoredRetainedMessages() (v []storage.Message, err error)

StoredRetainedMessages returns all retained messages from a store.

func (*HookBase) StoredSubscriptions ¶

func (h *HookBase) StoredSubscriptions() (v []storage.Subscription, err error)

StoredSubscriptions returns all subcriptions from a store.

func (*HookBase) StoredSysInfo ¶

func (h *HookBase) StoredSysInfo() (v storage.SystemInfo, err error)

StoredSysInfo returns a set of system info values.

type HookLoadConfig ¶

type HookLoadConfig struct {
	Hook   Hook
	Config any
}

HookLoadConfig contains the hook and configuration as loaded from a configuration (usually file).

type HookOptions ¶

type HookOptions struct {
	Capabilities *Capabilities
}

HookOptions contains values which are inherited from the server on initialisation.

type Hooks ¶

type Hooks struct {
	Log *slog.Logger // a logger for the hook (from the server)

	sync.Mutex // a mutex for locking when adding hooks
	// contains filtered or unexported fields
}

Hooks is a slice of Hook interfaces to be called in sequence.

func (*Hooks) Add ¶

func (h *Hooks) Add(hook Hook, config any) error

Add adds and initializes a new hook.

func (*Hooks) GetAll ¶

func (h *Hooks) GetAll() []Hook

GetAll returns a slice of all the hooks.

func (*Hooks) Len ¶

func (h *Hooks) Len() int64

Len returns the number of hooks added.

func (*Hooks) OnACLCheck ¶

func (h *Hooks) OnACLCheck(cl *Client, topic string, write bool) bool

OnACLCheck is called when a user attempts to publish or subscribe to a topic filter. An implementation of this method MUST be used to allow or deny access to the (see hooks/auth/allow_all or basic). It can be used in custom hooks to check publishing and subscribing users against an existing permissions or roles database.

func (*Hooks) OnAuthPacket ¶

func (h *Hooks) OnAuthPacket(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)

OnAuthPacket is called when an auth packet is received. It is intended to allow developers to create their own auth packet handling mechanisms.

func (*Hooks) OnClientExpired ¶

func (h *Hooks) OnClientExpired(cl *Client)

OnClientExpired is called when a client session has expired and should be deleted.

func (*Hooks) OnConnect ¶

func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) error

OnConnect is called when a new client connects, and may return a packets.Code as an error to halt the connection.

func (*Hooks) OnConnectAuthenticate ¶

func (h *Hooks) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool

OnConnectAuthenticate is called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database.

func (*Hooks) OnDisconnect ¶

func (h *Hooks) OnDisconnect(cl *Client, err error, expire bool)

OnDisconnect is called when a client is disconnected for any reason.

func (*Hooks) OnPacketEncode ¶

func (h *Hooks) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet

OnPacketEncode is called immediately before a packet is encoded to be sent to a client.

func (*Hooks) OnPacketIDExhausted ¶

func (h *Hooks) OnPacketIDExhausted(cl *Client, pk packets.Packet)

OnPacketIDExhausted is called when the client runs out of unused packet ids to assign to a packet.

func (*Hooks) OnPacketProcessed ¶

func (h *Hooks) OnPacketProcessed(cl *Client, pk packets.Packet, err error)

OnPacketProcessed is called when a packet has been received and successfully handled by the broker.

func (*Hooks) OnPacketRead ¶

func (h *Hooks) OnPacketRead(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)

OnPacketRead is called when a packet is received from a client.

func (*Hooks) OnPacketSent ¶

func (h *Hooks) OnPacketSent(cl *Client, pk packets.Packet, b []byte)

OnPacketSent is called when a packet has been sent to a client. It takes a bytes parameter containing the bytes sent.

func (*Hooks) OnPublish ¶

func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)

OnPublish is called when a client publishes a message. This method differs from OnPublished in that it allows you to modify you to modify the incoming packet before it is processed. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnPublishDropped ¶

func (h *Hooks) OnPublishDropped(cl *Client, pk packets.Packet)

OnPublishDropped is called when a message to a client was dropped instead of delivered such as when a client is too slow to respond.

func (*Hooks) OnPublished ¶

func (h *Hooks) OnPublished(cl *Client, pk packets.Packet)

OnPublished is called when a client has published a message to subscribers.

func (*Hooks) OnQosComplete ¶

func (h *Hooks) OnQosComplete(cl *Client, pk packets.Packet)

OnQosComplete is called when the Qos flow for a message has been completed. In other words, when an inflight message is resolved. It is typically used to delete an inflight message from a store.

func (*Hooks) OnQosDropped ¶

func (h *Hooks) OnQosDropped(cl *Client, pk packets.Packet)

OnQosDropped is called the Qos flow for a message expires. In other words, when an inflight message expires or is abandoned. It is typically used to delete an inflight message from a store.

func (*Hooks) OnQosPublish ¶

func (h *Hooks) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)

OnQosPublish is called when a publish packet with Qos >= 1 is issued to a subscriber. In other words, this method is called when a new inflight message is created or resent. It is typically used to store a new inflight message.

func (*Hooks) OnRetainMessage ¶

func (h *Hooks) OnRetainMessage(cl *Client, pk packets.Packet, r int64)

OnRetainMessage is called then a published message is retained.

func (*Hooks) OnRetainPublished ¶

func (h *Hooks) OnRetainPublished(cl *Client, pk packets.Packet)

OnRetainPublished is called when a retained message is published.

func (*Hooks) OnRetainedExpired ¶

func (h *Hooks) OnRetainedExpired(filter string)

OnRetainedExpired is called when a retained message has expired and should be deleted.

func (*Hooks) OnSelectSubscribers ¶

func (h *Hooks) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers

OnSelectSubscribers is called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. This hook can be used to programmatically remove or add clients to a publish to subscribers process, or to select the subscriber for a shared group in a custom manner (such as based on client id, ip, etc).

func (*Hooks) OnSessionEstablish ¶

func (h *Hooks) OnSessionEstablish(cl *Client, pk packets.Packet)

OnSessionEstablish is called right after a new client connects and authenticates and right before the session is established and CONNACK is sent.

func (*Hooks) OnSessionEstablished ¶

func (h *Hooks) OnSessionEstablished(cl *Client, pk packets.Packet)

OnSessionEstablished is called when a new client establishes a session (after OnConnect).

func (*Hooks) OnStarted ¶

func (h *Hooks) OnStarted()

OnStarted is called when the server has successfully started.

func (*Hooks) OnStopped ¶

func (h *Hooks) OnStopped()

OnStopped is called when the server has successfully stopped.

func (*Hooks) OnSubscribe ¶

func (h *Hooks) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet

OnSubscribe is called when a client subscribes to one or more filters. This method differs from OnSubscribed in that it allows you to modify the subscription values before the packet is processed. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnSubscribed ¶

func (h *Hooks) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte)

OnSubscribed is called when a client subscribes to one or more filters.

func (*Hooks) OnSysInfoTick ¶

func (h *Hooks) OnSysInfoTick(sys *system.Info)

OnSysInfoTick is called when the $SYS topic values are published out.

func (*Hooks) OnUnsubscribe ¶

func (h *Hooks) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet

OnUnsubscribe is called when a client unsubscribes from one or more filters. This method differs from OnUnsubscribed in that it allows you to modify the unsubscription values before the packet is processed. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnUnsubscribed ¶

func (h *Hooks) OnUnsubscribed(cl *Client, pk packets.Packet)

OnUnsubscribed is called when a client unsubscribes from one or more filters.

func (*Hooks) OnWill ¶

func (h *Hooks) OnWill(cl *Client, will Will) Will

OnWill is called when a client disconnects and publishes an LWT message. This method differs from OnWillSent in that it allows you to modify the LWT message before it is published. The return values of the hook methods are passed-through in the order the hooks were attached.

func (*Hooks) OnWillSent ¶

func (h *Hooks) OnWillSent(cl *Client, pk packets.Packet)

OnWillSent is called when an LWT message has been issued from a disconnecting client.

func (*Hooks) Provides ¶

func (h *Hooks) Provides(b ...byte) bool

Provides returns true if any one hook provides any of the requested hook methods.

func (*Hooks) Stop ¶

func (h *Hooks) Stop()

Stop indicates all attached hooks to gracefully end.

func (*Hooks) StoredClients ¶

func (h *Hooks) StoredClients() (v []storage.Client, err error)

StoredClients returns all clients, e.g. from a persistent store, is used to populate the server clients list before start.

func (*Hooks) StoredInflightMessages ¶

func (h *Hooks) StoredInflightMessages() (v []storage.Message, err error)

StoredInflightMessages returns all inflight messages, e.g. from a persistent store, and is used to populate the restored clients with inflight messages before start.

func (*Hooks) StoredRetainedMessages ¶

func (h *Hooks) StoredRetainedMessages() (v []storage.Message, err error)

StoredRetainedMessages returns all retained messages, e.g. from a persistent store, and is used to populate the server topics with retained messages before start.

func (*Hooks) StoredSubscriptions ¶

func (h *Hooks) StoredSubscriptions() (v []storage.Subscription, err error)

StoredSubscriptions returns all subcriptions, e.g. from a persistent store, and is used to populate the server subscriptions list before start.

func (*Hooks) StoredSysInfo ¶

func (h *Hooks) StoredSysInfo() (v storage.SystemInfo, err error)

StoredSysInfo returns a set of system info values.

type InboundTopicAliases ¶

type InboundTopicAliases struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InboundTopicAliases contains a map of topic aliases received from the client.

func NewInboundTopicAliases ¶

func NewInboundTopicAliases(topicAliasMaximum uint16) *InboundTopicAliases

NewInboundTopicAliases returns a pointer to InboundTopicAliases.

func (*InboundTopicAliases) Set ¶

func (a *InboundTopicAliases) Set(id uint16, topic string) string

Set sets a new alias for a specific topic.

type Inflight ¶

type Inflight struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Inflight is a map of InflightMessage keyed on packet id.

func NewInflights ¶

func NewInflights() *Inflight

NewInflights returns a new instance of an Inflight packets map.

func (*Inflight) Clone ¶

func (i *Inflight) Clone() *Inflight

Clone returns a new instance of Inflight with the same message data. This is used when transferring inflights from a taken-over session.

func (*Inflight) DecreaseReceiveQuota ¶

func (i *Inflight) DecreaseReceiveQuota()

TakeRecieveQuota reduces the receive quota by 1.

func (*Inflight) DecreaseSendQuota ¶

func (i *Inflight) DecreaseSendQuota()

DecreaseSendQuota reduces the send quota by 1.

func (*Inflight) Delete ¶

func (i *Inflight) Delete(id uint16) bool

Delete removes an in-flight message from the map. Returns true if the message existed.

func (*Inflight) Get ¶

func (i *Inflight) Get(id uint16) (packets.Packet, bool)

Get returns an inflight packet by packet id.

func (*Inflight) GetAll ¶

func (i *Inflight) GetAll(immediate bool) []packets.Packet

GetAll returns all the inflight messages.

func (*Inflight) IncreaseReceiveQuota ¶

func (i *Inflight) IncreaseReceiveQuota()

TakeRecieveQuota increases the receive quota by 1.

func (*Inflight) IncreaseSendQuota ¶

func (i *Inflight) IncreaseSendQuota()

IncreaseSendQuota increases the send quota by 1.

func (*Inflight) Len ¶

func (i *Inflight) Len() int

Len returns the size of the inflight messages map.

func (*Inflight) NextImmediate ¶

func (i *Inflight) NextImmediate() (packets.Packet, bool)

NextImmediate returns the next inflight packet which is indicated to be sent immediately. This typically occurs when the quota has been exhausted, and we need to wait until new quota is free to continue sending.

func (*Inflight) ResetReceiveQuota ¶

func (i *Inflight) ResetReceiveQuota(n int32)

ResetReceiveQuota resets the receive quota to the maximum allowed value.

func (*Inflight) ResetSendQuota ¶

func (i *Inflight) ResetSendQuota(n int32)

ResetSendQuota resets the send quota to the maximum allowed value.

func (*Inflight) Set ¶

func (i *Inflight) Set(m packets.Packet) bool

Set adds or updates an inflight packet by packet id.

type InlineSubFn ¶

type InlineSubFn func(cl *Client, sub packets.Subscription, pk packets.Packet)

InlineSubFn is the signature for a callback function which will be called when an inline client receives a message on a topic it is subscribed to. The sub argument contains information about the subscription that was matched for any filters.

type InlineSubscription ¶

type InlineSubscription struct {
	packets.Subscription
	Handler InlineSubFn
}

type InlineSubscriptions ¶

type InlineSubscriptions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InlineSubscriptions represents a map of internal subscriptions keyed on client.

func NewInlineSubscriptions ¶

func NewInlineSubscriptions() *InlineSubscriptions

NewInlineSubscriptions returns a new instance of InlineSubscriptions.

func (*InlineSubscriptions) Add ¶

Add adds a new internal subscription for a client id.

func (*InlineSubscriptions) Delete ¶

func (s *InlineSubscriptions) Delete(id int)

Delete removes an internal subscription by the client id.

func (*InlineSubscriptions) Get ¶

func (s *InlineSubscriptions) Get(id int) (val InlineSubscription, ok bool)

Get returns an internal subscription for a client id.

func (*InlineSubscriptions) GetAll ¶

func (s *InlineSubscriptions) GetAll() map[int]InlineSubscription

GetAll returns all internal subscriptions.

func (*InlineSubscriptions) Len ¶

func (s *InlineSubscriptions) Len() int

Len returns the number of internal subscriptions.

type Options ¶

type Options struct {
	// Listeners specifies any listeners which should be dynamically added on serve. Used when setting listeners by config.
	Listeners []listeners.Config `yaml:"listeners" json:"listeners"`

	// Hooks specifies any hooks which should be dynamically added on serve. Used when setting hooks by config.
	Hooks []HookLoadConfig `yaml:"hooks" json:"hooks"`

	// Capabilities defines the server features and behaviour. If you only wish to modify
	// several of these values, set them explicitly - e.g.
	// 	server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024
	Capabilities *Capabilities `yaml:"capabilities" json:"capabilities"`

	// ClientNetWriteBufferSize specifies the size of the client *bufio.Writer write buffer.
	ClientNetWriteBufferSize int `yaml:"client_net_write_buffer_size" json:"client_net_write_buffer_size"`

	// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
	ClientNetReadBufferSize int `yaml:"client_net_read_buffer_size" json:"client_net_read_buffer_size"`

	// Logger specifies a custom configured implementation of log/slog to override
	// the servers default logger configuration. If you wish to change the log level,
	// of the default logger, you can do so by setting:
	// server := mqtt.New(nil)
	// level := new(slog.LevelVar)
	// server.Slog = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
	// 	Level: level,
	// }))
	// level.Set(slog.LevelDebug)
	Logger *slog.Logger `yaml:"-" json:"-"`

	// SysTopicResendInterval specifies the interval between $SYS topic updates in seconds.
	SysTopicResendInterval int64 `yaml:"sys_topic_resend_interval" json:"sys_topic_resend_interval"`

	// Enable Inline client to allow direct subscribing and publishing from the parent codebase,
	// with negligible performance difference (disabled by default to prevent confusion in statistics).
	InlineClient bool `yaml:"inline_client" json:"inline_client"`
}

Options contains configurable options for the server.

type OutboundTopicAliases ¶

type OutboundTopicAliases struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

OutboundTopicAliases contains a map of topic aliases sent from the broker to the client.

func NewOutboundTopicAliases ¶

func NewOutboundTopicAliases(topicAliasMaximum uint16) *OutboundTopicAliases

NewOutboundTopicAliases returns a pointer to OutboundTopicAliases.

func (*OutboundTopicAliases) Set ¶

func (a *OutboundTopicAliases) Set(topic string) (uint16, bool)

Set sets a new topic alias for a topic and returns the alias value, and a boolean indicating if the alias already existed.

type ReadFn ¶

type ReadFn func(*Client, packets.Packet) error

ReadFn is the function signature for the function used for reading and processing new packets.

type Server ¶

type Server struct {
	Options   *Options             // configurable server options
	Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections
	Clients   *Clients             // clients known to the broker
	Topics    *TopicsIndex         // an index of topic filter subscriptions and retained messages
	Info      *system.Info         // values about the server commonly known as $SYS topics

	Log *slog.Logger // minimal no-alloc logger
	// contains filtered or unexported fields
}

Server is an MQTT broker server. It should be created with server.New() in order to ensure all the internal fields are correctly populated.

func New ¶

func New(opts *Options) *Server

New returns a new instance of mochi mqtt broker. Optional parameters can be specified to override some default settings (see Options).

func (*Server) AddHook ¶

func (s *Server) AddHook(hook Hook, config any) error

AddHook attaches a new Hook to the server. Ideally, this should be called before the server is started with s.Serve().

func (*Server) AddHooksFromConfig ¶

func (s *Server) AddHooksFromConfig(hooks []HookLoadConfig) error

AddHooksFromConfig adds hooks to the server which were specified in the hooks config (usually from a config file). New built-in hooks should be added to this list.

func (*Server) AddListener ¶

func (s *Server) AddListener(l listeners.Listener) error

AddListener adds a new network listener to the server, for receiving incoming client connections.

func (*Server) AddListenersFromConfig ¶

func (s *Server) AddListenersFromConfig(configs []listeners.Config) error

AddListenersFromConfig adds listeners to the server which were specified in the listeners config (usually from a config file). New built-in listeners should be added to this list.

func (*Server) Close ¶

func (s *Server) Close() error

Close attempts to gracefully shut down the server, all listeners, clients, and stores.

func (*Server) DisconnectClient ¶

func (s *Server) DisconnectClient(cl *Client, code packets.Code) error

DisconnectClient sends a Disconnect packet to a client and then closes the client connection.

func (*Server) EstablishConnection ¶

func (s *Server) EstablishConnection(listener string, c net.Conn) error

EstablishConnection establishes a new client when a listener accepts a new connection.

func (*Server) InjectPacket ¶

func (s *Server) InjectPacket(cl *Client, pk packets.Packet) error

InjectPacket injects a packet into the broker as if it were sent from the specified client. InlineClients using this method can publish packets to any topic (including $SYS) and bypass ACL checks.

func (*Server) NewClient ¶

func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool) *Client

NewClient returns a new Client instance, populated with all the required values and references to be used with the server. If you are using this client to directly publish messages from the embedding application, set the inline flag to true to bypass ACL and topic validation checks.

func (*Server) Publish ¶

func (s *Server) Publish(topic string, payload []byte, retain bool, qos byte) error

Publish publishes a publish packet into the broker as if it were sent from the specified client. This is a convenience function which wraps InjectPacket. As such, this method can publish packets to any topic (including $SYS) and bypass ACL checks. The qos byte is used for limiting the outbound qos (mqtt v5) rather than issuing to the broker (we assume qos 2 complete).

func (*Server) SendConnack ¶

func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, properties *packets.Properties) error

SendConnack returns a Connack packet to a client.

func (*Server) Serve ¶

func (s *Server) Serve() error

Serve starts the event loops responsible for establishing client connections on all attached listeners, publishing the system topics, and starting all hooks.

func (*Server) Subscribe ¶

func (s *Server) Subscribe(filter string, subscriptionId int, handler InlineSubFn) error

Subscribe adds an inline subscription for the specified topic filter and subscription identifier with the provided handler function.

func (*Server) Unsubscribe ¶

func (s *Server) Unsubscribe(filter string, subscriptionId int) error

Unsubscribe removes an inline subscription for the specified subscription and topic filter. It allows you to unsubscribe a specific subscription from the internal subscription associated with the given topic filter.

func (*Server) UnsubscribeClient ¶

func (s *Server) UnsubscribeClient(cl *Client)

UnsubscribeClient unsubscribes a client from all of their subscriptions.

type SharedSubscriptions ¶

type SharedSubscriptions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SharedSubscriptions contains a map of subscriptions to a shared filter, keyed on share group then client id.

func NewSharedSubscriptions ¶

func NewSharedSubscriptions() *SharedSubscriptions

NewSharedSubscriptions returns a new instance of Subscriptions.

func (*SharedSubscriptions) Add ¶

func (s *SharedSubscriptions) Add(group, id string, val packets.Subscription)

Add creates a new shared subscription for a group and client id pair.

func (*SharedSubscriptions) Delete ¶

func (s *SharedSubscriptions) Delete(group, id string)

Delete deletes a client id from a shared subscription group.

func (*SharedSubscriptions) Get ¶

func (s *SharedSubscriptions) Get(group, id string) (val packets.Subscription, ok bool)

Get returns the subscription properties for a client id in a share group, if one exists.

func (*SharedSubscriptions) GetAll ¶

GetAll returns all shared subscription groups and their subscriptions.

func (*SharedSubscriptions) GroupLen ¶

func (s *SharedSubscriptions) GroupLen() int

GroupLen returns the number of groups subscribed to the filter.

func (*SharedSubscriptions) Len ¶

func (s *SharedSubscriptions) Len() int

Len returns the total number of shared subscriptions to a filter across all groups.

type Subscribers ¶

type Subscribers struct {
	Shared              map[string]map[string]packets.Subscription
	SharedSelected      map[string]packets.Subscription
	Subscriptions       map[string]packets.Subscription
	InlineSubscriptions map[int]InlineSubscription
}

Subscribers contains the shared and non-shared subscribers matching a topic.

func (*Subscribers) MergeSharedSelected ¶

func (s *Subscribers) MergeSharedSelected()

MergeSharedSelected merges the selected subscribers for a shared subscription group and the non-shared subscribers, to ensure that no subscriber gets multiple messages due to have both types of subscription matching the same filter.

func (*Subscribers) SelectShared ¶

func (s *Subscribers) SelectShared()

SelectShared returns one subscriber for each shared subscription group.

type Subscriptions ¶

type Subscriptions struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriptions is a map of subscriptions keyed on client.

func NewSubscriptions ¶

func NewSubscriptions() *Subscriptions

NewSubscriptions returns a new instance of Subscriptions.

func (*Subscriptions) Add ¶

func (s *Subscriptions) Add(id string, val packets.Subscription)

Add adds a new subscription for a client. ID can be a filter in the case this map is client state, or a client id if particle state.

func (*Subscriptions) Delete ¶

func (s *Subscriptions) Delete(id string)

Delete removes a subscription by client or filter id.

func (*Subscriptions) Get ¶

func (s *Subscriptions) Get(id string) (val packets.Subscription, ok bool)

Get returns a subscriptions for a specific client or filter id.

func (*Subscriptions) GetAll ¶

func (s *Subscriptions) GetAll() map[string]packets.Subscription

GetAll returns all subscriptions.

func (*Subscriptions) Len ¶

func (s *Subscriptions) Len() int

Len returns the number of subscriptions.

type TopicAliases ¶

type TopicAliases struct {
	Inbound  *InboundTopicAliases
	Outbound *OutboundTopicAliases
}

TopicAliases contains inbound and outbound topic alias registrations.

func NewTopicAliases ¶

func NewTopicAliases(topicAliasMaximum uint16) TopicAliases

NewTopicAliases returns an instance of TopicAliases.

type TopicsIndex ¶

type TopicsIndex struct {
	Retained *packets.Packets
	// contains filtered or unexported fields
}

TopicsIndex is a prefix/trie tree containing topic subscribers and retained messages.

func NewTopicsIndex ¶

func NewTopicsIndex() *TopicsIndex

NewTopicsIndex returns a pointer to a new instance of Index.

func (*TopicsIndex) InlineSubscribe ¶

func (x *TopicsIndex) InlineSubscribe(subscription InlineSubscription) bool

InlineSubscribe adds a new internal subscription for a topic filter, returning true if the subscription was new.

func (*TopicsIndex) InlineUnsubscribe ¶

func (x *TopicsIndex) InlineUnsubscribe(id int, filter string) bool

InlineUnsubscribe removes an internal subscription for a topic filter associated with a specific client, returning true if the subscription existed.

func (*TopicsIndex) Messages ¶

func (x *TopicsIndex) Messages(filter string) []packets.Packet

Messages returns a slice of any retained messages which match a filter.

func (*TopicsIndex) RetainMessage ¶

func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64

RetainMessage saves a message payload to the end of a topic address. Returns 1 if a retained message was added, and -1 if the retained message was removed. 0 is returned if sequential empty payloads are received.

func (*TopicsIndex) Subscribe ¶

func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) bool

Subscribe adds a new subscription for a client to a topic filter, returning true if the subscription was new.

func (*TopicsIndex) Subscribers ¶

func (x *TopicsIndex) Subscribers(topic string) *Subscribers

Subscribers returns a map of clients who are subscribed to matching filters, their subscription ids and highest qos.

func (*TopicsIndex) Unsubscribe ¶

func (x *TopicsIndex) Unsubscribe(filter, client string) bool

Unsubscribe removes a subscription filter for a client, returning true if the subscription existed.

type Will ¶

type Will struct {
	Payload           []byte                 // -
	User              []packets.UserProperty // -
	TopicName         string                 // -
	Flag              uint32                 // 0,1
	WillDelayInterval uint32                 // -
	Qos               byte                   // -
	Retain            bool                   // -
}

Will contains the last will and testament details for a client connection.

Directories ¶

Path Synopsis
cmd
examples
tcp
tls
hooks
storage/bolt
Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL