gmqtt

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2020 License: MIT Imports: 3 Imported by: 11

README

中文文档

Gmqtt Mentioned in Awesome Go Build Status codecov Go Report Card

News: MQTT V5 is now supported. But due to those new features in v5, there area lots of breaking changes. If you have any migration problems, feel free to raise an issue. Or you can use the latest v3 broker.

Installation

$ go get -u github.com/DrmagicE/gmqtt

Features

  • Provide hook method to customized the broker behaviours(Authentication, ACL, etc..). See server/hooks.go for details
  • Support tls/ssl and websocket
  • Provide flexible plugable mechanism. See server/plugin.go and /plugin for details.
  • Provide Go interface for extensions to interact with the server. For examples, the extensions or plugins can publish message or add/remove subscription through function call. See Server interface in server/server.go and admin for details.
  • Provide metrics (by using Prometheus). (plugin: prometheus)
  • Provide GRPC and REST APIs to interact with server. (plugin:admin)
  • Provide session persistence which means the broker can retrieve the session data after restart. Currently, only redis backend is supported.

Limitations

  • Cluster is not supported.

Get Started

The following command will start gmqtt broker with default configuration. The broker listens on 1883 for tcp server and 8883 for websocket server with admin and prometheus plugin loaded.

$ cd cmd/gmqttd
$ go run . start -c default_config.yml

configuration

Gmqtt use -c flag to define configuration path. If not set, gmqtt reads $HOME/gmqtt.yml as default. If default path not exist, Gmqtt will start with default configuration.

session persistence

Gmqtt uses memory to store session data by default and it is the recommended way because of the good performance. But the session data will be lose after the broker restart. You can use redis as backend storage to prevent data loss from restart:

persistence:
  type: redis  
  redis:
    # redis server address
    addr: "127.0.0.1:6379"
    # the maximum number of idle connections in the redis connection pool
    max_idle: 1000
    # the maximum number of connections allocated by the redis connection pool at a given time.
    # If zero, there is no limit on the number of connections in the pool.
    max_active: 0
    # the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed
    idle_timeout: 240s
    password: ""
    # the number of the redis database
    database: 0

Docker

$ docker build -t gmqtt .
$ docker run -p 1883:1883 -p 8883:8883 -p 8082:8082 -p 8083:8083  -p 8084:8084  gmqtt

Documentation

godoc

Hooks

Gmqtt implements the following hooks:

Name hooking point possible usages
OnAccept When accepts a TCP connection.(Not supported in websocket) Connection rate limit, IP allow/block list.
OnStop When the broker exists
OnSubscribe When received a subscribe packet Subscribe access control, modifies subscriptions.
OnSubscribed When subscribe succeed
OnUnsubscribe When received a unsubscribe packet Unsubscribe access controls, modifies the topics that is going to unsubscribe.
OnUnsubscribed When unsubscribe succeed
OnMsgArrived When received a publish packet Publish access control, modifies message before delivery.
OnBasicAuth When received a connect packet without AuthMethod property Authentication
OnEnhancedAuth When received a connect packet with AuthMethod property (Only for v5 clients) Authentication
OnReAuth When received a auth packet (Only for v5 clients) Authentication
OnConnected When the client connected succeed
OnSessionCreated When creates a new session
OnSessionResumed When resumes from old session
OnSessionTerminated When session terminated
OnDelivered When a message is delivered to the client
OnClosed When the client is closed
OnMsgDropped When a message is dropped for some reasons

See /examples/hook for details.

Test

Unit Test

$ go test -race ./...

Integration Test

paho.mqtt.testing.

TODO

  • Support bridge mode and cluster.

Breaking changes may occur when adding this new features.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MessageToPublish added in v0.2.0

func MessageToPublish(msg *Message, version packets.Version) *packets.Publish

MessageToPublish create the publish packet instance from *Message

Types

type Message added in v0.2.0

type Message struct {
	Dup      bool
	QoS      uint8
	Retained bool
	Topic    string
	Payload  []byte
	PacketID packets.PacketID
	// v5
	// The following fields are introduced in v5 specification.
	// These fields will not take effect when it represents a v3 publish packet.
	ContentType            string
	CorrelationData        []byte
	MessageExpiry          uint32
	PayloadFormat          packets.PayloadFormat
	ResponseTopic          string
	SubscriptionIdentifier []uint32
	UserProperties         []packets.UserProperty
}

func MessageFromPublish added in v0.2.0

func MessageFromPublish(p *packets.Publish) *Message

MessageFromPublish create the Message instance from publish packets

func (*Message) Copy added in v0.2.0

func (m *Message) Copy() *Message

Copy deep copies the Message and return the new one

func (*Message) TotalBytes added in v0.2.0

func (m *Message) TotalBytes(version packets.Version) uint32

TotalBytes return the publish packets total bytes.

type Session added in v0.2.0

type Session struct {
	// ClientID represents the client id.
	ClientID string
	// Will is the will message of the client, can be nil if there is no will message.
	Will *Message
	// WillDelayInterval represents the Will Delay Interval in seconds
	WillDelayInterval uint32
	// ConnectedAt is the session create time.
	ConnectedAt time.Time
	// ExpiryInterval represents the Session Expiry Interval in seconds
	ExpiryInterval uint32
}

Session represents a MQTT session.

func (*Session) IsExpired added in v0.2.0

func (s *Session) IsExpired(now time.Time) bool

IsExpired return whether the session is expired

type Subscription added in v0.2.0

type Subscription struct {
	// ShareName is the share name of a shared subscription.
	// set to "" if it is a non-shared subscription.
	ShareName string
	// TopicFilter is the topic filter which does not include the share name.
	TopicFilter string
	// ID is the subscription identifier
	ID uint32

	// QoS is the qos level of the Subscription.
	QoS packets.QoS
	// NoLocal is the No Local option.
	NoLocal bool
	// RetainAsPublished is the Retain As Published option.
	RetainAsPublished bool
	// RetainHandling the Retain Handling option.
	RetainHandling byte
}

Subscription represents a subscription in gmqtt.

func (*Subscription) Copy added in v0.2.0

func (s *Subscription) Copy() *Subscription

Copy makes a copy of subscription.

func (*Subscription) GetFullTopicName added in v0.2.0

func (s *Subscription) GetFullTopicName() string

GetFullTopicName returns the full topic name of the subscription.

func (*Subscription) Validate added in v0.2.0

func (s *Subscription) Validate() error

Validate returns whether the subscription is valid. If you can ensure the subscription is valid then just skip the validation.

Directories

Path Synopsis
cmd
Package config is a generated GoMock package.
Package config is a generated GoMock package.
examples
queue
Package queue is a generated GoMock package.
Package queue is a generated GoMock package.
session
Package session is a generated GoMock package.
Package session is a generated GoMock package.
subscription
Package subscription is a generated GoMock package.
Package subscription is a generated GoMock package.
unack
Package unack is a generated GoMock package.
Package unack is a generated GoMock package.
pkg
packets
Package packets is a generated GoMock package.
Package packets is a generated GoMock package.
pidfile
Package pidfile provides structure and helper functions to create and remove PID file.
Package pidfile provides structure and helper functions to create and remove PID file.
plugin
admin
Package admin is a reverse proxy.
Package admin is a reverse proxy.
Package retained is a generated GoMock package.
Package retained is a generated GoMock package.
Package gmqtt provides an MQTT v3.1.1 server library.
Package gmqtt provides an MQTT v3.1.1 server library.
topicalias

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL