Documentation ¶
Overview ¶
package mqtt provides a high performance, fully compliant MQTT v5 broker server with v3.1.1 backward compatibility.
Index ¶
- Constants
- Variables
- func AtomicItoa(ptr *int64) string
- func IsSharedFilter(filter string) bool
- func IsValidFilter(filter string, forPublish bool) bool
- type Capabilities
- type Client
- func (cl *Client) ClearInflights(now, maximumExpiry int64) []uint16
- func (cl *Client) Closed() bool
- func (cl *Client) NextPacketID() (i uint32, err error)
- func (cl *Client) ParseConnect(lid string, pk packets.Packet)
- func (cl *Client) Read(packetHandler ReadFn) error
- func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error
- func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err error)
- func (cl *Client) ResendInflightMessages(force bool) error
- func (cl *Client) Stop(err error)
- func (cl *Client) StopCause() error
- func (cl *Client) WriteLoop()
- func (cl *Client) WritePacket(pk packets.Packet) error
- type ClientConnection
- type ClientProperties
- type ClientState
- type ClientSubscriptions
- type Clients
- type Compatibilities
- type Hook
- type HookBase
- func (h *HookBase) ID() string
- func (h *HookBase) Init(config any) error
- func (h *HookBase) OnACLCheck(cl *Client, topic string, write bool) bool
- func (h *HookBase) OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error)
- func (h *HookBase) OnClientExpired(cl *Client)
- func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) error
- func (h *HookBase) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool
- func (h *HookBase) OnDisconnect(cl *Client, err error, expire bool)
- func (h *HookBase) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet
- func (h *HookBase) OnPacketIDExhausted(cl *Client, pk packets.Packet)
- func (h *HookBase) OnPacketProcessed(cl *Client, pk packets.Packet, err error)
- func (h *HookBase) OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error)
- func (h *HookBase) OnPacketSent(cl *Client, pk packets.Packet, b []byte)
- func (h *HookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error)
- func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet)
- func (h *HookBase) OnPublished(cl *Client, pk packets.Packet)
- func (h *HookBase) OnPublishedWithSharedFilters(pk packets.Packet, sharedFilters map[string]bool)
- func (h *HookBase) OnQosComplete(cl *Client, pk packets.Packet)
- func (h *HookBase) OnQosDropped(cl *Client, pk packets.Packet)
- func (h *HookBase) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)
- func (h *HookBase) OnRetainMessage(cl *Client, pk packets.Packet, r int64)
- func (h *HookBase) OnRetainPublished(cl *Client, pk packets.Packet)
- func (h *HookBase) OnRetainedExpired(topic string)
- func (h *HookBase) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers
- func (h *HookBase) OnSessionEstablish(cl *Client, pk packets.Packet)
- func (h *HookBase) OnSessionEstablished(cl *Client, pk packets.Packet)
- func (h *HookBase) OnStarted()
- func (h *HookBase) OnStopped()
- func (h *HookBase) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet
- func (h *HookBase) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)
- func (h *HookBase) OnSysInfoTick(*system.Info)
- func (h *HookBase) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet
- func (h *HookBase) OnUnsubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)
- func (h *HookBase) OnWill(cl *Client, will Will) (Will, error)
- func (h *HookBase) OnWillSent(cl *Client, pk packets.Packet)
- func (h *HookBase) Provides(b byte) bool
- func (h *HookBase) SetOpts(l *slog.Logger, opts *HookOptions)
- func (h *HookBase) Stop() error
- func (h *HookBase) StoredClientByCid(cid string) (v storage.Client, err error)
- func (h *HookBase) StoredClients() (v []storage.Client, err error)
- func (h *HookBase) StoredInflightMessages() (v []storage.Message, err error)
- func (h *HookBase) StoredInflightMessagesByCid(cid string) (v []storage.Message, err error)
- func (h *HookBase) StoredRetainedMessageByTopic(topic string) (v storage.Message, err error)
- func (h *HookBase) StoredRetainedMessages() (v []storage.Message, err error)
- func (h *HookBase) StoredSubscriptions() (v []storage.Subscription, err error)
- func (h *HookBase) StoredSubscriptionsByCid(cid string) (v []storage.Subscription, err error)
- func (h *HookBase) StoredSysInfo() (v storage.SystemInfo, err error)
- type HookOptions
- type Hooks
- func (h *Hooks) Add(hook Hook, config any) error
- func (h *Hooks) GetAll() []Hook
- func (h *Hooks) Len() int64
- func (h *Hooks) OnACLCheck(cl *Client, topic string, write bool) bool
- func (h *Hooks) OnAuthPacket(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)
- func (h *Hooks) OnClientExpired(cl *Client)
- func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) error
- func (h *Hooks) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool
- func (h *Hooks) OnDisconnect(cl *Client, err error, expire bool)
- func (h *Hooks) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet
- func (h *Hooks) OnPacketIDExhausted(cl *Client, pk packets.Packet)
- func (h *Hooks) OnPacketProcessed(cl *Client, pk packets.Packet, err error)
- func (h *Hooks) OnPacketRead(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)
- func (h *Hooks) OnPacketSent(cl *Client, pk packets.Packet, b []byte)
- func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, err error)
- func (h *Hooks) OnPublishDropped(cl *Client, pk packets.Packet)
- func (h *Hooks) OnPublished(cl *Client, pk packets.Packet)
- func (h *Hooks) OnPublishedWithSharedFilters(pk packets.Packet, sharedFilters map[string]bool)
- func (h *Hooks) OnQosComplete(cl *Client, pk packets.Packet)
- func (h *Hooks) OnQosDropped(cl *Client, pk packets.Packet)
- func (h *Hooks) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int)
- func (h *Hooks) OnRetainMessage(cl *Client, pk packets.Packet, r int64)
- func (h *Hooks) OnRetainPublished(cl *Client, pk packets.Packet)
- func (h *Hooks) OnRetainedExpired(filter string)
- func (h *Hooks) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers
- func (h *Hooks) OnSessionEstablish(cl *Client, pk packets.Packet)
- func (h *Hooks) OnSessionEstablished(cl *Client, pk packets.Packet)
- func (h *Hooks) OnStarted()
- func (h *Hooks) OnStopped()
- func (h *Hooks) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet
- func (h *Hooks) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)
- func (h *Hooks) OnSysInfoTick(sys *system.Info)
- func (h *Hooks) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet
- func (h *Hooks) OnUnsubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int)
- func (h *Hooks) OnWill(cl *Client, will Will) Will
- func (h *Hooks) OnWillSent(cl *Client, pk packets.Packet)
- func (h *Hooks) Provides(b ...byte) bool
- func (h *Hooks) Stop()
- func (h *Hooks) StoredClientByCid(cid string) (v storage.Client, err error)
- func (h *Hooks) StoredClients() (v []storage.Client, err error)
- func (h *Hooks) StoredInflightMessages() (v []storage.Message, err error)
- func (h *Hooks) StoredInflightMessagesByCid(cid string) (v []storage.Message, err error)
- func (h *Hooks) StoredRetainedMessageByTopic(topic string) (v storage.Message, err error)
- func (h *Hooks) StoredRetainedMessages() (v []storage.Message, err error)
- func (h *Hooks) StoredSubscriptions() (v []storage.Subscription, err error)
- func (h *Hooks) StoredSubscriptionsByCid(cid string) (v []storage.Subscription, err error)
- func (h *Hooks) StoredSysInfo() (v storage.SystemInfo, err error)
- type InboundTopicAliases
- type Inflight
- func (i *Inflight) Clone() *Inflight
- func (i *Inflight) DecreaseReceiveQuota()
- func (i *Inflight) DecreaseSendQuota()
- func (i *Inflight) Delete(id uint16) bool
- func (i *Inflight) Get(id uint16) (packets.Packet, bool)
- func (i *Inflight) GetAll(immediate bool) []packets.Packet
- func (i *Inflight) IncreaseReceiveQuota()
- func (i *Inflight) IncreaseSendQuota()
- func (i *Inflight) Len() int
- func (i *Inflight) NextImmediate() (packets.Packet, bool)
- func (i *Inflight) ResetReceiveQuota(n int32)
- func (i *Inflight) ResetSendQuota(n int32)
- func (i *Inflight) Set(m packets.Packet) bool
- type InlineSubFn
- type InlineSubscription
- type InlineSubscriptions
- type Options
- type OutboundTopicAliases
- type ReadFn
- type Server
- func (s *Server) AddHook(hook Hook, config any) error
- func (s *Server) AddListener(l listeners.Listener) error
- func (s *Server) Close() error
- func (s *Server) DisconnectClient(cl *Client, code packets.Code) error
- func (s *Server) EstablishConnection(listener string, c net.Conn) error
- func (s *Server) InjectPacket(cl *Client, pk packets.Packet) error
- func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool) *Client
- func (s *Server) Publish(topic string, payload []byte, retain bool, qos byte) error
- func (s *Server) PublishToSubscribers(pk packets.Packet, local bool)
- func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, properties *packets.Properties) error
- func (s *Server) Serve() error
- func (s *Server) Subscribe(filter string, subscriptionId int, handler InlineSubFn) error
- func (s *Server) Unsubscribe(filter string, subscriptionId int) error
- func (s *Server) UnsubscribeClient(cl *Client)
- type SharedSubscriptions
- func (s *SharedSubscriptions) Add(group, id string, val packets.Subscription)
- func (s *SharedSubscriptions) Delete(group, id string)
- func (s *SharedSubscriptions) Get(group, id string) (val packets.Subscription, ok bool)
- func (s *SharedSubscriptions) GetAll() map[string]map[string]packets.Subscription
- func (s *SharedSubscriptions) GroupLen() int
- func (s *SharedSubscriptions) Len() int
- func (s *SharedSubscriptions) SubsInGroupLen(group string) int
- type Subscribers
- type Subscriptions
- type TopicAliases
- type TopicsIndex
- func (x *TopicsIndex) InlineSubscribe(subscription InlineSubscription) (bool, int)
- func (x *TopicsIndex) InlineUnsubscribe(id int, filter string) (bool, int)
- func (x *TopicsIndex) Messages(filter string) []packets.Packet
- func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64
- func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) (bool, int)
- func (x *TopicsIndex) Subscribers(topic string) *Subscribers
- func (x *TopicsIndex) Unsubscribe(filter, client string) (bool, int)
- type Will
Constants ¶
const ( InheritWayNew = iota InheritWayLocal InheritWayRemote )
Client session inheritance way
const ( SetOptions byte = iota OnSysInfoTick OnStarted OnStopped OnConnectAuthenticate OnACLCheck OnConnect OnSessionEstablish OnSessionEstablished OnDisconnect OnAuthPacket OnPacketRead OnPacketEncode OnPacketSent OnPacketProcessed OnSubscribe OnSubscribed OnSelectSubscribers OnUnsubscribe OnUnsubscribed OnPublish OnPublished OnPublishDropped OnRetainMessage OnRetainPublished OnQosPublish OnQosComplete OnQosDropped OnPacketIDExhausted OnWill OnWillSent OnClientExpired OnRetainedExpired StoredClients StoredSubscriptions StoredInflightMessages StoredRetainedMessages StoredSysInfo StoredClientByCid StoredSubscriptionsByCid StoredInflightMessagesByCid StoredRetainedMessageByTopic )
const ( Version = "2.4.0" // the current server version. LocalListener = "local" InlineClientId = "inline" )
Variables ¶
var ( // DefaultServerCapabilities defines the default features and capabilities provided by the server. DefaultServerCapabilities = &Capabilities{ MaximumSessionExpiryInterval: math.MaxUint32, MaximumMessageExpiryInterval: 60 * 60 * 24, ReceiveMaximum: 1024, MaximumQos: 2, RetainAvailable: 1, MaximumPacketSize: 0, TopicAliasMaximum: math.MaxUint16, WildcardSubAvailable: 1, SubIDAvailable: 1, SharedSubAvailable: 1, MinimumProtocolVersion: 3, MaximumClientWritesPending: 1024 * 8, } ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists ErrConnectionClosed = errors.New("connection not open") // connection is closed ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") // inline client is not enabled by default )
var ( SysPrefix = "$SYS" // the prefix indicating a system info topic )
var ( // ErrInvalidConfigType indicates a different Type of config value was expected to what was received. ErrInvalidConfigType = errors.New("invalid config type provided") )
Functions ¶
func AtomicItoa ¶
AtomicItoa converts an int64 point to a string.
func IsSharedFilter ¶
IsSharedFilter returns true if the filter uses the share prefix.
func IsValidFilter ¶
IsValidFilter returns true if the filter is valid.
Types ¶
type Capabilities ¶
type Capabilities struct { MaximumMessageExpiryInterval int64 `yaml:"maximum-message-expiry-interval"` MaximumClientWritesPending int32 `yaml:"maximum-client-writes-pending"` MaximumSessionExpiryInterval uint32 `yaml:"maximum-session-expiry-interval"` MaximumPacketSize uint32 `yaml:"maximum-packet-size"` ReceiveMaximum uint16 `yaml:"receive-maximum"` TopicAliasMaximum uint16 `yaml:"topic-alias-maximum"` MinimumProtocolVersion byte `yaml:"minimum-protocol-version"` Compatibilities Compatibilities MaximumQos byte `yaml:"maximum-qos"` RetainAvailable byte `yaml:"retain-available"` WildcardSubAvailable byte `yaml:"wildcard-sub-available"` SubIDAvailable byte `yaml:"sub-id-available"` // contains filtered or unexported fields }
Capabilities indicates the capabilities and features provided by the server.
type Client ¶
type Client struct { Properties ClientProperties // client properties State ClientState // the operational state of the client. Net ClientConnection // network connection state of the client ID string // the client id. sync.RWMutex // mutex InheritWay int // session inheritance way Ext map[string]interface{} // client extension. // contains filtered or unexported fields }
Client contains information about a client known by the broker.
func (*Client) ClearInflights ¶
ClearInflights deletes all inflight messages for the client, e.g. for a disconnected user with a clean session.
func (*Client) NextPacketID ¶
NextPacketID returns the next available (unused) packet id for the client. If no unused packet ids are available, an error is returned and the client should be disconnected.
func (*Client) ParseConnect ¶
ParseConnect parses the connect parameters and properties for a client.
func (*Client) Read ¶
Read reads incoming packets from the connected client and transforms them into packets to be handled by the packetHandler.
func (*Client) ReadFixedHeader ¶
func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error
ReadFixedHeader reads in the values of the next packet's fixed header.
func (*Client) ReadPacket ¶
ReadPacket reads the remaining buffer into an MQTT packet.
func (*Client) ResendInflightMessages ¶
ResendInflightMessages attempts to resend any pending inflight messages to connected clients.
func (*Client) Stop ¶
Stop instructs the client to shut down all processing goroutines and disconnect.
type ClientConnection ¶
type ClientConnection struct { Conn net.Conn // the net.Conn used to establish the connection Remote string // the remote address of the client Listener string // listener id of the client Inline bool // if true, the client is the built-in 'inline' embedded client // contains filtered or unexported fields }
ClientConnection contains the connection transport and metadata for the client.
type ClientProperties ¶
type ClientProperties struct { Props packets.Properties Will Will Username []byte ProtocolVersion byte Clean bool }
ClientProperties contains the properties which define the client behaviour.
type ClientState ¶
type ClientState struct { TopicAliases TopicAliases // a map of topic aliases Inflight *Inflight // a map of in-flight qos messages Subscriptions *Subscriptions // a map of the subscription filters a client maintains Keepalive uint16 // the number of seconds the connection can wait ServerKeepalive bool // keepalive was set by the server // contains filtered or unexported fields }
ClientState tracks the state of the client.
type ClientSubscriptions ¶
type ClientSubscriptions map[string]packets.Subscription
ClientSubscriptions is a map of aggregated subscriptions for a client.
type Clients ¶
Clients contains a map of the clients known by the broker.
func (*Clients) GetByListener ¶
GetByListener returns clients matching a listener id.
type Compatibilities ¶
type Compatibilities struct { ObscureNotAuthorized bool `yaml:"obscure-not-authorized"` // return unspecified errors instead of not authorized PassiveClientDisconnect bool `yaml:"passive-client-disconnect"` // don't disconnect the client forcefully after sending disconnect packet (paho) AlwaysReturnResponseInfo bool `yaml:"always-return-response"` // always return response info (useful for testing) RestoreSysInfoOnRestart bool `yaml:"restore-sys-info-restart"` // restore system info from store as if server never stopped NoInheritedPropertiesOnAck bool // don't allow inherited user properties on ack (paho - spec violation) }
Compatibilities provides flags for using compatibility modes.
type Hook ¶
type Hook interface { ID() string Provides(b byte) bool Init(config any) error Stop() error SetOpts(l *slog.Logger, o *HookOptions) OnStarted() OnStopped() OnConnectAuthenticate(cl *Client, pk packets.Packet) bool OnACLCheck(cl *Client, topic string, write bool) bool OnSysInfoTick(*system.Info) OnConnect(cl *Client, pk packets.Packet) error OnSessionEstablish(cl *Client, pk packets.Packet) OnSessionEstablished(cl *Client, pk packets.Packet) OnDisconnect(cl *Client, err error, expire bool) OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error) OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error) // triggers when a new packet is received by a client, but before packet validation OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet // modify a packet before it is byte-encoded and written to the client OnPacketSent(cl *Client, pk packets.Packet, b []byte) // triggers when packet bytes have been written to the client OnPacketProcessed(cl *Client, pk packets.Packet, err error) // triggers after a packet from the client been processed (handled) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int) // counts is an array of the number of subscribers for the same filter OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet OnUnsubscribed(cl *Client, pk packets.Packet, reasonCodes []byte, counts []int) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error) OnPublished(cl *Client, pk packets.Packet) OnPublishDropped(cl *Client, pk packets.Packet) OnRetainMessage(cl *Client, pk packets.Packet, r int64) OnRetainPublished(cl *Client, pk packets.Packet) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int) OnQosComplete(cl *Client, pk packets.Packet) OnQosDropped(cl *Client, pk packets.Packet) OnPacketIDExhausted(cl *Client, pk packets.Packet) OnWill(cl *Client, will Will) (Will, error) OnWillSent(cl *Client, pk packets.Packet) OnClientExpired(cl *Client) OnRetainedExpired(filter string) StoredClients() ([]storage.Client, error) StoredSubscriptions() ([]storage.Subscription, error) StoredInflightMessages() ([]storage.Message, error) StoredRetainedMessages() ([]storage.Message, error) StoredSysInfo() (storage.SystemInfo, error) StoredClientByCid(cid string) (storage.Client, error) StoredSubscriptionsByCid(cid string) ([]storage.Subscription, error) StoredInflightMessagesByCid(cid string) ([]storage.Message, error) StoredRetainedMessageByTopic(topic string) (storage.Message, error) }
Hook provides an interface of handlers for different events which occur during the lifecycle of the broker.
type HookBase ¶
type HookBase struct { Hook Log *slog.Logger Opts *HookOptions }
HookBase provides a set of default methods for each hook. It should be embedded in all hooks.
func (*HookBase) Init ¶
Init performs any pre-start initializations for the hook, such as connecting to databases or opening files.
func (*HookBase) OnACLCheck ¶
OnACLCheck is called when a user attempts to subscribe or publish to a topic.
func (*HookBase) OnAuthPacket ¶
OnAuthPacket is called when an auth packet is received from the client.
func (*HookBase) OnClientExpired ¶
OnClientExpired is called when a client session has expired.
func (*HookBase) OnConnectAuthenticate ¶
OnConnectAuthenticate is called when a user attempts to authenticate with the server.
func (*HookBase) OnDisconnect ¶
OnDisconnect is called when a client is disconnected for any reason.
func (*HookBase) OnPacketEncode ¶
OnPacketEncode is called before a packet is byte-encoded and written to the client.
func (*HookBase) OnPacketIDExhausted ¶
OnPacketIDExhausted is called when the client runs out of unused packet ids to assign to a packet.
func (*HookBase) OnPacketProcessed ¶
OnPacketProcessed is called immediately after a packet from a client is processed.
func (*HookBase) OnPacketRead ¶
OnPacketRead is called when a packet is received.
func (*HookBase) OnPacketSent ¶
OnPacketSent is called immediately after a packet is written to a client.
func (*HookBase) OnPublishDropped ¶
OnPublishDropped is called when a message to a client is dropped instead of being delivered.
func (*HookBase) OnPublished ¶
OnPublished is called when a client has published a message to subscribers.
func (*HookBase) OnPublishedWithSharedFilters ¶ added in v2.5.4
OnPublishedWithSharedFilters is called when a client has published a message to cluster.
func (*HookBase) OnQosComplete ¶
OnQosComplete is called when the Qos flow for a message has been completed.
func (*HookBase) OnQosDropped ¶
OnQosDropped is called the Qos flow for a message expires.
func (*HookBase) OnQosPublish ¶
OnQosPublish is called when a publish packet with Qos > 1 is issued to a subscriber.
func (*HookBase) OnRetainMessage ¶
OnRetainMessage is called then a published message is retained.
func (*HookBase) OnRetainPublished ¶ added in v2.3.7
OnRetainPublished is called when a retained message is published.
func (*HookBase) OnRetainedExpired ¶
OnRetainedExpired is called when a retained message for a topic has expired.
func (*HookBase) OnSelectSubscribers ¶
func (h *HookBase) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers
OnSelectSubscribers is called when selecting subscribers to receive a message.
func (*HookBase) OnSessionEstablish ¶ added in v2.3.7
OnSessionEstablish is called right after a new client connects and authenticates and right before the session is established and CONNACK is sent.
func (*HookBase) OnSessionEstablished ¶
OnSessionEstablished is called when a new client establishes a session (after OnConnect).
func (*HookBase) OnStarted ¶
func (h *HookBase) OnStarted()
OnStarted is called when the server starts.
func (*HookBase) OnStopped ¶
func (h *HookBase) OnStopped()
OnStopped is called when the server stops.
func (*HookBase) OnSubscribe ¶
OnSubscribe is called when a client subscribes to one or more filters.
func (*HookBase) OnSubscribed ¶
OnSubscribed is called when a client subscribes to one or more filters.
func (*HookBase) OnSysInfoTick ¶
OnSysInfoTick is called when the server publishes system info.
func (*HookBase) OnUnsubscribe ¶
OnUnsubscribe is called when a client unsubscribes from one or more filters.
func (*HookBase) OnUnsubscribed ¶
OnUnsubscribed is called when a client unsubscribes from one or more filters.
func (*HookBase) OnWillSent ¶
OnWillSent is called when an LWT message has been issued from a disconnecting client.
func (*HookBase) Provides ¶
Provides indicates which methods a hook provides. The default is none - this method should be overridden by the embedding hook.
func (*HookBase) SetOpts ¶
func (h *HookBase) SetOpts(l *slog.Logger, opts *HookOptions)
SetOpts is called by the server to propagate internal values and generally should not be called manually.
func (*HookBase) StoredClientByCid ¶
StoredClientByCid returns a client from a store.
func (*HookBase) StoredClients ¶
StoredClients returns all clients from a store.
func (*HookBase) StoredInflightMessages ¶
StoredInflightMessages returns all inflight messages from a store.
func (*HookBase) StoredInflightMessagesByCid ¶
StoredInflightMessagesByCid returns all inflight messages of client from a store.
func (*HookBase) StoredRetainedMessageByTopic ¶
StoredRetainedMessageByTopic returns a retained message of topic from a store.
func (*HookBase) StoredRetainedMessages ¶
StoredRetainedMessages returns all retained messages from a store.
func (*HookBase) StoredSubscriptions ¶
func (h *HookBase) StoredSubscriptions() (v []storage.Subscription, err error)
StoredSubscriptions returns all subcriptions from a store.
func (*HookBase) StoredSubscriptionsByCid ¶
func (h *HookBase) StoredSubscriptionsByCid(cid string) (v []storage.Subscription, err error)
StoredSubscriptionsByCid returns all subcriptions of client from a store.
func (*HookBase) StoredSysInfo ¶
func (h *HookBase) StoredSysInfo() (v storage.SystemInfo, err error)
StoredSysInfo returns a set of system info values.
type HookOptions ¶
type HookOptions struct {
Capabilities *Capabilities
}
HookOptions contains values which are inherited from the server on initialisation.
type Hooks ¶
type Hooks struct { Log *slog.Logger // a logger for the hook (from the server) sync.Mutex // a mutex for locking when adding hooks // contains filtered or unexported fields }
Hooks is a slice of Hook interfaces to be called in sequence.
func (*Hooks) OnACLCheck ¶
OnACLCheck is called when a user attempts to publish or subscribe to a topic filter. An implementation of this method MUST be used to allow or deny access to the (see hooks/auth/allow_all or basic). It can be used in custom hooks to check publishing and subscribing users against an existing permissions or roles database.
func (*Hooks) OnAuthPacket ¶
OnAuthPacket is called when an auth packet is received. It is intended to allow developers to create their own auth packet handling mechanisms.
func (*Hooks) OnClientExpired ¶
OnClientExpired is called when a client session has expired and should be deleted.
func (*Hooks) OnConnect ¶
OnConnect is called when a new client connects, and may return a packets.Code as an error to halt the connection.
func (*Hooks) OnConnectAuthenticate ¶
OnConnectAuthenticate is called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database.
func (*Hooks) OnDisconnect ¶
OnDisconnect is called when a client is disconnected for any reason.
func (*Hooks) OnPacketEncode ¶
OnPacketEncode is called immediately before a packet is encoded to be sent to a client.
func (*Hooks) OnPacketIDExhausted ¶
OnPacketIDExhausted is called when the client runs out of unused packet ids to assign to a packet.
func (*Hooks) OnPacketProcessed ¶
OnPacketProcessed is called when a packet has been received and successfully handled by the broker.
func (*Hooks) OnPacketRead ¶
OnPacketRead is called when a packet is received from a client.
func (*Hooks) OnPacketSent ¶
OnPacketSent is called when a packet has been sent to a client. It takes a bytes parameter containing the bytes sent.
func (*Hooks) OnPublish ¶
OnPublish is called when a client publishes a message. This method differs from OnPublished in that it allows you to modify you to modify the incoming packet before it is processed. The return values of the hook methods are passed-through in the order the hooks were attached.
func (*Hooks) OnPublishDropped ¶
OnPublishDropped is called when a message to a client was dropped instead of delivered such as when a client is too slow to respond.
func (*Hooks) OnPublished ¶
OnPublished is called when a client has published a message to subscribers.
func (*Hooks) OnPublishedWithSharedFilters ¶ added in v2.5.1
OnPublishedWithSharedFilters is called when a client has published a message to cluster.
func (*Hooks) OnQosComplete ¶
OnQosComplete is called when the Qos flow for a message has been completed. In other words, when an inflight message is resolved. It is typically used to delete an inflight message from a store.
func (*Hooks) OnQosDropped ¶
OnQosDropped is called the Qos flow for a message expires. In other words, when an inflight message expires or is abandoned. It is typically used to delete an inflight message from a store.
func (*Hooks) OnQosPublish ¶
OnQosPublish is called when a publish packet with Qos >= 1 is issued to a subscriber. In other words, this method is called when a new inflight message is created or resent. It is typically used to store a new inflight message.
func (*Hooks) OnRetainMessage ¶
OnRetainMessage is called then a published message is retained.
func (*Hooks) OnRetainPublished ¶ added in v2.3.7
OnRetainPublished is called when a retained message is published.
func (*Hooks) OnRetainedExpired ¶
OnRetainedExpired is called when a retained message has expired and should be deleted.
func (*Hooks) OnSelectSubscribers ¶
func (h *Hooks) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers
OnSelectSubscribers is called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. This hook can be used to programmatically remove or add clients to a publish to subscribers process, or to select the subscriber for a shared group in a custom manner (such as based on client id, ip, etc).
func (*Hooks) OnSessionEstablish ¶ added in v2.3.7
OnSessionEstablish is called right after a new client connects and authenticates and right before the session is established and CONNACK is sent.
func (*Hooks) OnSessionEstablished ¶
OnSessionEstablished is called when a new client establishes a session (after OnConnect).
func (*Hooks) OnStarted ¶
func (h *Hooks) OnStarted()
OnStarted is called when the server has successfully started.
func (*Hooks) OnStopped ¶
func (h *Hooks) OnStopped()
OnStopped is called when the server has successfully stopped.
func (*Hooks) OnSubscribe ¶
OnSubscribe is called when a client subscribes to one or more filters. This method differs from OnSubscribed in that it allows you to modify the subscription values before the packet is processed. The return values of the hook methods are passed-through in the order the hooks were attached.
func (*Hooks) OnSubscribed ¶
OnSubscribed is called when a client subscribes to one or more filters.
func (*Hooks) OnSysInfoTick ¶
OnSysInfoTick is called when the $SYS topic values are published out.
func (*Hooks) OnUnsubscribe ¶
OnUnsubscribe is called when a client unsubscribes from one or more filters. This method differs from OnUnsubscribed in that it allows you to modify the unsubscription values before the packet is processed. The return values of the hook methods are passed-through in the order the hooks were attached.
func (*Hooks) OnUnsubscribed ¶
OnUnsubscribed is called when a client unsubscribes from one or more filters.
func (*Hooks) OnWill ¶
OnWill is called when a client disconnects and publishes an LWT message. This method differs from OnWillSent in that it allows you to modify the LWT message before it is published. The return values of the hook methods are passed-through in the order the hooks were attached.
func (*Hooks) OnWillSent ¶
OnWillSent is called when an LWT message has been issued from a disconnecting client.
func (*Hooks) Provides ¶
Provides returns true if any one hook provides any of the requested hook methods.
func (*Hooks) StoredClientByCid ¶
StoredClientByCid returns a clients, e.g. from a persistent store.
func (*Hooks) StoredClients ¶
StoredClients returns all clients, e.g. from a persistent store, is used to populate the server clients list before start.
func (*Hooks) StoredInflightMessages ¶
StoredInflightMessages returns all inflight messages, e.g. from a persistent store, and is used to populate the restored clients with inflight messages before start.
func (*Hooks) StoredInflightMessagesByCid ¶
StoredInflightMessagesByCid returns all inflight messages, e.g. from a persistent store.
func (*Hooks) StoredRetainedMessageByTopic ¶
StoredRetainedMessageByTopic returns a retained message, e.g. from a persistent store.
func (*Hooks) StoredRetainedMessages ¶
StoredRetainedMessages returns all retained messages, e.g. from a persistent store, and is used to populate the server topics with retained messages before start.
func (*Hooks) StoredSubscriptions ¶
func (h *Hooks) StoredSubscriptions() (v []storage.Subscription, err error)
StoredSubscriptions returns all subcriptions, e.g. from a persistent store, and is used to populate the server subscriptions list before start.
func (*Hooks) StoredSubscriptionsByCid ¶
func (h *Hooks) StoredSubscriptionsByCid(cid string) (v []storage.Subscription, err error)
StoredSubscriptionsByCid returns all subcriptions, e.g. from a persistent store.
func (*Hooks) StoredSysInfo ¶
func (h *Hooks) StoredSysInfo() (v storage.SystemInfo, err error)
StoredSysInfo returns a set of system info values.
type InboundTopicAliases ¶
InboundTopicAliases contains a map of topic aliases received from the client.
func NewInboundTopicAliases ¶
func NewInboundTopicAliases(topicAliasMaximum uint16) *InboundTopicAliases
NewInboundTopicAliases returns a pointer to InboundTopicAliases.
type Inflight ¶
Inflight is a map of InflightMessage keyed on packet id.
func NewInflights ¶
func NewInflights() *Inflight
NewInflights returns a new instance of an Inflight packets map.
func (*Inflight) Clone ¶
Clone returns a new instance of Inflight with the same message data. This is used when transferring inflights from a taken-over session.
func (*Inflight) DecreaseReceiveQuota ¶
func (i *Inflight) DecreaseReceiveQuota()
TakeRecieveQuota reduces the receive quota by 1.
func (*Inflight) DecreaseSendQuota ¶
func (i *Inflight) DecreaseSendQuota()
DecreaseSendQuota reduces the send quota by 1.
func (*Inflight) Delete ¶
Delete removes an in-flight message from the map. Returns true if the message existed.
func (*Inflight) IncreaseReceiveQuota ¶
func (i *Inflight) IncreaseReceiveQuota()
TakeRecieveQuota increases the receive quota by 1.
func (*Inflight) IncreaseSendQuota ¶
func (i *Inflight) IncreaseSendQuota()
IncreaseSendQuota increases the send quota by 1.
func (*Inflight) NextImmediate ¶
NextImmediate returns the next inflight packet which is indicated to be sent immediately. This typically occurs when the quota has been exhausted, and we need to wait until new quota is free to continue sending.
func (*Inflight) ResetReceiveQuota ¶
ResetReceiveQuota resets the receive quota to the maximum allowed value.
func (*Inflight) ResetSendQuota ¶
ResetSendQuota resets the send quota to the maximum allowed value.
type InlineSubFn ¶ added in v2.5.0
type InlineSubFn func(cl *Client, sub packets.Subscription, pk packets.Packet)
InlineSubFn is the signature for a callback function which will be called when an inline client receives a message on a topic it is subscribed to. The sub argument contains information about the subscription that was matched for any filters.
type InlineSubscription ¶ added in v2.5.0
type InlineSubscription struct { packets.Subscription Handler InlineSubFn }
type InlineSubscriptions ¶ added in v2.5.0
InlineSubscriptions represents a map of internal subscriptions keyed on client.
func NewInlineSubscriptions ¶ added in v2.5.0
func NewInlineSubscriptions() *InlineSubscriptions
NewInlineSubscriptions returns a new instance of InlineSubscriptions.
func (*InlineSubscriptions) Add ¶ added in v2.5.0
func (s *InlineSubscriptions) Add(val InlineSubscription)
Add adds a new internal subscription for a client id.
func (*InlineSubscriptions) Delete ¶ added in v2.5.0
func (s *InlineSubscriptions) Delete(id int)
Delete removes an internal subscription by the client id.
func (*InlineSubscriptions) Get ¶ added in v2.5.0
func (s *InlineSubscriptions) Get(id int) (val InlineSubscription, ok bool)
Get returns an internal subscription for a client id.
func (*InlineSubscriptions) GetAll ¶ added in v2.5.0
func (s *InlineSubscriptions) GetAll() map[int]InlineSubscription
GetAll returns all internal subscriptions.
func (*InlineSubscriptions) Len ¶ added in v2.5.0
func (s *InlineSubscriptions) Len() int
Len returns the number of internal subscriptions.
type Options ¶
type Options struct { // Capabilities defines the server features and behaviour. If you only wish to modify // several of these values, set them explicitly - e.g. // server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024 Capabilities *Capabilities // ClientNetWriteBufferSize specifies the size of the client *bufio.Writer write buffer. ClientNetWriteBufferSize int `yaml:"client-write-buffer-size"` // ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer. ClientNetReadBufferSize int `yaml:"client-read-buffer-size"` // Logger specifies a custom configured implementation of zerolog to override // the servers default logger configuration. If you wish to change the log level, // of the default logger, you can do so by setting // server := mqtt.New(nil) // level := new(slog.LevelVar) // server.Slog = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ // Level: level, // })) // level.Set(slog.LevelDebug) Logger *slog.Logger // SysTopicResendInterval specifies the interval between $SYS topic updates in seconds. SysTopicResendInterval int64 `yaml:"sys-topic-resend-interval"` // Enable Inline client to allow direct subscribing and publishing from the parent codebase, // with negligible performance difference (disabled by default to prevent confusion in statistics). InlineClient bool `yaml:"inline-client"` }
Options contains configurable options for the server.
type OutboundTopicAliases ¶
OutboundTopicAliases contains a map of topic aliases sent from the broker to the client.
func NewOutboundTopicAliases ¶
func NewOutboundTopicAliases(topicAliasMaximum uint16) *OutboundTopicAliases
NewOutboundTopicAliases returns a pointer to OutboundTopicAliases.
type ReadFn ¶
ReadFn is the function signature for the function used for reading and processing new packets.
type Server ¶
type Server struct { Options *Options // configurable server options Listeners *listeners.Listeners // listeners are network interfaces which listen for new connections Clients *Clients // clients known to the broker Topics *TopicsIndex // an index of topic filter subscriptions and retained messages Info *system.Info // values about the server commonly known as $SYS topics Log *slog.Logger // minimal no-alloc logger // contains filtered or unexported fields }
Server is an MQTT broker server. It should be created with server.New() in order to ensure all the internal fields are correctly populated.
func New ¶
New returns a new instance of comqtt broker. Optional parameters can be specified to override some default settings (see Options).
func (*Server) AddHook ¶
AddHook attaches a new Hook to the server. Ideally, this should be called before the server is started with s.Serve().
func (*Server) AddListener ¶
AddListener adds a new network listener to the server, for receiving incoming client connections.
func (*Server) Close ¶
Close attempts to gracefully shut down the server, all listeners, clients, and stores.
func (*Server) DisconnectClient ¶
DisconnectClient sends a Disconnect packet to a client and then closes the client connection.
func (*Server) EstablishConnection ¶
EstablishConnection establishes a new client when a listener accepts a new connection.
func (*Server) InjectPacket ¶
InjectPacket injects a packet into the broker as if it were sent from the specified client. InlineClients using this method can publish packets to any topic (including $SYS) and bypass ACL checks.
func (*Server) NewClient ¶
NewClient returns a new Client instance, populated with all the required values and references to be used with the server. If you are using this client to directly publish messages from the embedding application, set the inline flag to true to bypass ACL and topic validation checks.
func (*Server) Publish ¶
Publish publishes a publish packet into the broker as if it were sent from the specified client. This is a convenience function which wraps InjectPacket. As such, this method can publish packets to any topic (including $SYS) and bypass ACL checks. The qos byte is used for limiting the outbound qos (mqtt v5) rather than issuing to the broker (we assume qos 2 complete).
func (*Server) PublishToSubscribers ¶
PublishToSubscribers publishes a publish packet to all subscribers with matching topic filters. local: true indicates the current process call,false indicates external forwarding
func (*Server) SendConnack ¶ added in v2.3.7
func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, properties *packets.Properties) error
SendConnack returns a Connack packet to a client.
func (*Server) Serve ¶
Serve starts the event loops responsible for establishing client connections on all attached listeners, publishing the system topics, and starting all hooks.
func (*Server) Subscribe ¶ added in v2.5.0
func (s *Server) Subscribe(filter string, subscriptionId int, handler InlineSubFn) error
Subscribe adds an inline subscription for the specified topic filter and subscription identifier with the provided handler function.
func (*Server) Unsubscribe ¶ added in v2.5.0
Unsubscribe removes an inline subscription for the specified subscription and topic filter. It allows you to unsubscribe a specific subscription from the internal subscription associated with the given topic filter.
func (*Server) UnsubscribeClient ¶
UnsubscribeClient unsubscribes a client from all of their subscriptions.
type SharedSubscriptions ¶
type SharedSubscriptions struct { // contains filtered or unexported fields }
SharedSubscriptions contains a map of subscriptions to a shared filter, keyed on share group then client id.
func NewSharedSubscriptions ¶
func NewSharedSubscriptions() *SharedSubscriptions
NewSharedSubscriptions returns a new instance of Subscriptions.
func (*SharedSubscriptions) Add ¶
func (s *SharedSubscriptions) Add(group, id string, val packets.Subscription)
Add creates a new shared subscription for a group and client id pair.
func (*SharedSubscriptions) Delete ¶
func (s *SharedSubscriptions) Delete(group, id string)
Delete deletes a client id from a shared subscription group.
func (*SharedSubscriptions) Get ¶
func (s *SharedSubscriptions) Get(group, id string) (val packets.Subscription, ok bool)
Get returns the subscription properties for a client id in a share group, if one exists.
func (*SharedSubscriptions) GetAll ¶
func (s *SharedSubscriptions) GetAll() map[string]map[string]packets.Subscription
GetAll returns all shared subscription groups and their subscriptions.
func (*SharedSubscriptions) GroupLen ¶
func (s *SharedSubscriptions) GroupLen() int
GroupLen returns the number of groups subscribed to the filter.
func (*SharedSubscriptions) Len ¶
func (s *SharedSubscriptions) Len() int
Len returns the total number of shared subscriptions to a filter across all groups.
func (*SharedSubscriptions) SubsInGroupLen ¶ added in v2.5.1
func (s *SharedSubscriptions) SubsInGroupLen(group string) int
SubsInGroupLen returns the number of subscriptions in a shared subscription group.
type Subscribers ¶
type Subscribers struct { Subscriptions map[string]packets.Subscription InlineSubscriptions map[int]InlineSubscription }
Subscribers contains the shared and non-shared subscribers matching a topic.
func (*Subscribers) MergeSharedSelected ¶
func (s *Subscribers) MergeSharedSelected()
MergeSharedSelected merges the selected subscribers for a shared subscription group and the non-shared subscribers, to ensure that no subscriber gets multiple messages due to have both types of subscription matching the same filter.
func (*Subscribers) SelectShared ¶
func (s *Subscribers) SelectShared()
SelectShared returns one subscriber for each shared subscription group.
type Subscriptions ¶
Subscriptions is a map of subscriptions keyed on client.
func NewSubscriptions ¶
func NewSubscriptions() *Subscriptions
NewSubscriptions returns a new instance of Subscriptions.
func (*Subscriptions) Add ¶
func (s *Subscriptions) Add(id string, val packets.Subscription)
Add adds a new subscription for a client. ID can be a filter in the case this map is client state, or a client id if particle state.
func (*Subscriptions) Delete ¶
func (s *Subscriptions) Delete(id string)
Delete removes a subscription by client or filter id.
func (*Subscriptions) Get ¶
func (s *Subscriptions) Get(id string) (val packets.Subscription, ok bool)
Get returns a subscriptions for a specific client or filter id.
func (*Subscriptions) GetAll ¶
func (s *Subscriptions) GetAll() map[string]packets.Subscription
GetAll returns all subscriptions.
func (*Subscriptions) Len ¶
func (s *Subscriptions) Len() int
Len returns the number of subscriptions.
type TopicAliases ¶
type TopicAliases struct { Inbound *InboundTopicAliases Outbound *OutboundTopicAliases }
TopicAliases contains inbound and outbound topic alias registrations.
func NewTopicAliases ¶
func NewTopicAliases(topicAliasMaximum uint16) TopicAliases
NewTopicAliases returns an instance of TopicAliases.
type TopicsIndex ¶
TopicsIndex is a prefix/trie tree containing topic subscribers and retained messages.
func NewTopicsIndex ¶
func NewTopicsIndex() *TopicsIndex
NewTopicsIndex returns a pointer to a new instance of Index.
func (*TopicsIndex) InlineSubscribe ¶ added in v2.5.0
func (x *TopicsIndex) InlineSubscribe(subscription InlineSubscription) (bool, int)
InlineSubscribe adds a new internal subscription for a topic filter, returning true if the subscription was new.
func (*TopicsIndex) InlineUnsubscribe ¶ added in v2.5.0
func (x *TopicsIndex) InlineUnsubscribe(id int, filter string) (bool, int)
InlineUnsubscribe removes an internal subscription for a topic filter associated with a specific client, returning true if the subscription existed.
func (*TopicsIndex) Messages ¶
func (x *TopicsIndex) Messages(filter string) []packets.Packet
Messages returns a slice of any retained messages which match a filter.
func (*TopicsIndex) RetainMessage ¶
func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64
RetainMessage saves a message payload to the end of a topic address. Returns 1 if a retained message was added, and -1 if the retained message was removed. 0 is returned if sequential empty payloads are received.
func (*TopicsIndex) Subscribe ¶
func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) (bool, int)
Subscribe adds a new subscription for a client to a topic filter, returning true if the subscription was new.
func (*TopicsIndex) Subscribers ¶
func (x *TopicsIndex) Subscribers(topic string) *Subscribers
Subscribers returns a map of clients who are subscribed to matching filters, their subscription ids and highest qos.
func (*TopicsIndex) Unsubscribe ¶
func (x *TopicsIndex) Unsubscribe(filter, client string) (bool, int)
Unsubscribe removes a subscription filter for a client, returning true if the subscription existed.
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
hooks
|
|
storage/bolt
Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead.
|
Package bolt is provided for historical compatibility and may not be actively updated, you should use the badger hook instead. |