Versions in this module Expand all Collapse all v0 v0.0.1 Sep 9, 2024 Changes in this version + const InlineClientId + const LocalListener + const OnACLCheck + const OnAuthPacket + const OnClientExpired + const OnConnect + const OnConnectAuthenticate + const OnDisconnect + const OnPacketEncode + const OnPacketIDExhausted + const OnPacketProcessed + const OnPacketRead + const OnPacketSent + const OnPublish + const OnPublishDropped + const OnPublished + const OnQosComplete + const OnQosDropped + const OnQosPublish + const OnRetainMessage + const OnRetainPublished + const OnRetainedExpired + const OnSelectSubscribers + const OnSessionEstablish + const OnSessionEstablished + const OnStarted + const OnStopped + const OnSubscribe + const OnSubscribed + const OnSysInfoTick + const OnUnsubscribe + const OnUnsubscribed + const OnWill + const OnWillSent + const SetOptions + const StoredClients + const StoredInflightMessages + const StoredRetainedMessages + const StoredSubscriptions + const StoredSysInfo + const Version + var DefaultServerCapabilities = NewDefaultServerCapabilities() + var ErrConnectionClosed = errors.New("connection not open") + var ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") + var ErrInvalidConfigType = errors.New("invalid config type provided") + var ErrListenerIDExists = errors.New("listener id already exists") + var ErrMinimumKeepalive = errors.New(...) + var ErrOptionsUnreadable = errors.New("unable to read options from bytes") + var SharePrefix = "$SHARE" + var SysPrefix = "$SYS" + func Int64toa(v int64) string + func IsSharedFilter(filter string) bool + func IsValidFilter(filter string, forPublish bool) bool + type Capabilities struct + Compatibilities Compatibilities + MaximumClientWritesPending int32 + MaximumClients int64 + MaximumInflight uint16 + MaximumMessageExpiryInterval int64 + MaximumPacketSize uint32 + MaximumQos byte + MaximumSessionExpiryInterval uint32 + MinimumProtocolVersion byte + ReceiveMaximum uint16 + RetainAvailable byte + SharedSubAvailable byte + SubIDAvailable byte + TopicAliasMaximum uint16 + WildcardSubAvailable byte + func NewDefaultServerCapabilities() *Capabilities + type Client struct + ID string + Net ClientConnection + Properties ClientProperties + State ClientState + func (cl *Client) ClearExpiredInflights(now, maximumExpiry int64) []uint16 + func (cl *Client) ClearInflights() + func (cl *Client) Closed() bool + func (cl *Client) NextPacketID() (i uint32, err error) + func (cl *Client) ParseConnect(lid string, pk packets.Packet) + func (cl *Client) Read(packetHandler ReadFn) error + func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error + func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err error) + func (cl *Client) ResendInflightMessages(force bool) error + func (cl *Client) Stop(err error) + func (cl *Client) StopCause() error + func (cl *Client) StopTime() int64 + func (cl *Client) WriteLoop() + func (cl *Client) WritePacket(pk packets.Packet) error + type ClientConnection struct + Conn net.Conn + Inline bool + Listener string + Remote string + type ClientProperties struct + Clean bool + Props packets.Properties + ProtocolVersion byte + Username []byte + Will Will + type ClientState struct + Inflight *Inflight + Keepalive uint16 + ServerKeepalive bool + Subscriptions *Subscriptions + TopicAliases TopicAliases + type ClientSubscriptions map[string]packets.Subscription + type Clients struct + func NewClients() *Clients + func (cl *Clients) Add(val *Client) + func (cl *Clients) Delete(id string) + func (cl *Clients) Get(id string) (*Client, bool) + func (cl *Clients) GetAll() map[string]*Client + func (cl *Clients) GetByListener(id string) []*Client + func (cl *Clients) Len() int + type Compatibilities struct + AlwaysReturnResponseInfo bool + NoInheritedPropertiesOnAck bool + ObscureNotAuthorized bool + PassiveClientDisconnect bool + RestoreSysInfoOnRestart bool + type Hook interface + ID func() string + Init func(config any) error + OnACLCheck func(cl *Client, topic string, write bool) bool + OnAuthPacket func(cl *Client, pk packets.Packet) (packets.Packet, error) + OnClientExpired func(cl *Client) + OnConnect func(cl *Client, pk packets.Packet) error + OnConnectAuthenticate func(cl *Client, pk packets.Packet) bool + OnDisconnect func(cl *Client, err error, expire bool) + OnPacketEncode func(cl *Client, pk packets.Packet) packets.Packet + OnPacketIDExhausted func(cl *Client, pk packets.Packet) + OnPacketProcessed func(cl *Client, pk packets.Packet, err error) + OnPacketRead func(cl *Client, pk packets.Packet) (packets.Packet, error) + OnPacketSent func(cl *Client, pk packets.Packet, b []byte) + OnPublish func(cl *Client, pk packets.Packet) (packets.Packet, error) + OnPublishDropped func(cl *Client, pk packets.Packet) + OnPublished func(cl *Client, pk packets.Packet) + OnQosComplete func(cl *Client, pk packets.Packet) + OnQosDropped func(cl *Client, pk packets.Packet) + OnQosPublish func(cl *Client, pk packets.Packet, sent int64, resends int) + OnRetainMessage func(cl *Client, pk packets.Packet, r int64) + OnRetainPublished func(cl *Client, pk packets.Packet) + OnRetainedExpired func(filter string) + OnSelectSubscribers func(subs *Subscribers, pk packets.Packet) *Subscribers + OnSessionEstablish func(cl *Client, pk packets.Packet) + OnSessionEstablished func(cl *Client, pk packets.Packet) + OnStarted func() + OnStopped func() + OnSubscribe func(cl *Client, pk packets.Packet) packets.Packet + OnSubscribed func(cl *Client, pk packets.Packet, reasonCodes []byte) + OnSysInfoTick func(*system.Info) + OnUnsubscribe func(cl *Client, pk packets.Packet) packets.Packet + OnUnsubscribed func(cl *Client, pk packets.Packet) + OnWill func(cl *Client, will Will) (Will, error) + OnWillSent func(cl *Client, pk packets.Packet) + Provides func(b byte) bool + SetOpts func(l *slog.Logger, o *HookOptions) + Stop func() error + StoredClients func() ([]storage.Client, error) + StoredInflightMessages func() ([]storage.Message, error) + StoredRetainedMessages func() ([]storage.Message, error) + StoredSubscriptions func() ([]storage.Subscription, error) + StoredSysInfo func() (storage.SystemInfo, error) + type HookBase struct + Log *slog.Logger + Opts *HookOptions + func (h *HookBase) ID() string + func (h *HookBase) Init(config any) error + func (h *HookBase) OnACLCheck(cl *Client, topic string, write bool) bool + func (h *HookBase) OnAuthPacket(cl *Client, pk packets.Packet) (packets.Packet, error) + func (h *HookBase) OnClientExpired(cl *Client) + func (h *HookBase) OnConnect(cl *Client, pk packets.Packet) error + func (h *HookBase) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool + func (h *HookBase) OnDisconnect(cl *Client, err error, expire bool) + func (h *HookBase) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet + func (h *HookBase) OnPacketIDExhausted(cl *Client, pk packets.Packet) + func (h *HookBase) OnPacketProcessed(cl *Client, pk packets.Packet, err error) + func (h *HookBase) OnPacketRead(cl *Client, pk packets.Packet) (packets.Packet, error) + func (h *HookBase) OnPacketSent(cl *Client, pk packets.Packet, b []byte) + func (h *HookBase) OnPublish(cl *Client, pk packets.Packet) (packets.Packet, error) + func (h *HookBase) OnPublishDropped(cl *Client, pk packets.Packet) + func (h *HookBase) OnPublished(cl *Client, pk packets.Packet) + func (h *HookBase) OnQosComplete(cl *Client, pk packets.Packet) + func (h *HookBase) OnQosDropped(cl *Client, pk packets.Packet) + func (h *HookBase) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int) + func (h *HookBase) OnRetainMessage(cl *Client, pk packets.Packet, r int64) + func (h *HookBase) OnRetainPublished(cl *Client, pk packets.Packet) + func (h *HookBase) OnRetainedExpired(topic string) + func (h *HookBase) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers + func (h *HookBase) OnSessionEstablish(cl *Client, pk packets.Packet) + func (h *HookBase) OnSessionEstablished(cl *Client, pk packets.Packet) + func (h *HookBase) OnStarted() + func (h *HookBase) OnStopped() + func (h *HookBase) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet + func (h *HookBase) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte) + func (h *HookBase) OnSysInfoTick(*system.Info) + func (h *HookBase) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet + func (h *HookBase) OnUnsubscribed(cl *Client, pk packets.Packet) + func (h *HookBase) OnWill(cl *Client, will Will) (Will, error) + func (h *HookBase) OnWillSent(cl *Client, pk packets.Packet) + func (h *HookBase) Provides(b byte) bool + func (h *HookBase) SetOpts(l *slog.Logger, opts *HookOptions) + func (h *HookBase) Stop() error + func (h *HookBase) StoredClients() (v []storage.Client, err error) + func (h *HookBase) StoredInflightMessages() (v []storage.Message, err error) + func (h *HookBase) StoredRetainedMessages() (v []storage.Message, err error) + func (h *HookBase) StoredSubscriptions() (v []storage.Subscription, err error) + func (h *HookBase) StoredSysInfo() (v storage.SystemInfo, err error) + type HookLoadConfig struct + Config any + Hook Hook + type HookOptions struct + Capabilities *Capabilities + type Hooks struct + Log *slog.Logger + func (h *Hooks) Add(hook Hook, config any) error + func (h *Hooks) GetAll() []Hook + func (h *Hooks) Len() int64 + func (h *Hooks) OnACLCheck(cl *Client, topic string, write bool) bool + func (h *Hooks) OnAuthPacket(cl *Client, pk packets.Packet) (pkx packets.Packet, err error) + func (h *Hooks) OnClientExpired(cl *Client) + func (h *Hooks) OnConnect(cl *Client, pk packets.Packet) error + func (h *Hooks) OnConnectAuthenticate(cl *Client, pk packets.Packet) bool + func (h *Hooks) OnDisconnect(cl *Client, err error, expire bool) + func (h *Hooks) OnPacketEncode(cl *Client, pk packets.Packet) packets.Packet + func (h *Hooks) OnPacketIDExhausted(cl *Client, pk packets.Packet) + func (h *Hooks) OnPacketProcessed(cl *Client, pk packets.Packet, err error) + func (h *Hooks) OnPacketRead(cl *Client, pk packets.Packet) (pkx packets.Packet, err error) + func (h *Hooks) OnPacketSent(cl *Client, pk packets.Packet, b []byte) + func (h *Hooks) OnPublish(cl *Client, pk packets.Packet) (pkx packets.Packet, err error) + func (h *Hooks) OnPublishDropped(cl *Client, pk packets.Packet) + func (h *Hooks) OnPublished(cl *Client, pk packets.Packet) + func (h *Hooks) OnQosComplete(cl *Client, pk packets.Packet) + func (h *Hooks) OnQosDropped(cl *Client, pk packets.Packet) + func (h *Hooks) OnQosPublish(cl *Client, pk packets.Packet, sent int64, resends int) + func (h *Hooks) OnRetainMessage(cl *Client, pk packets.Packet, r int64) + func (h *Hooks) OnRetainPublished(cl *Client, pk packets.Packet) + func (h *Hooks) OnRetainedExpired(filter string) + func (h *Hooks) OnSelectSubscribers(subs *Subscribers, pk packets.Packet) *Subscribers + func (h *Hooks) OnSessionEstablish(cl *Client, pk packets.Packet) + func (h *Hooks) OnSessionEstablished(cl *Client, pk packets.Packet) + func (h *Hooks) OnStarted() + func (h *Hooks) OnStopped() + func (h *Hooks) OnSubscribe(cl *Client, pk packets.Packet) packets.Packet + func (h *Hooks) OnSubscribed(cl *Client, pk packets.Packet, reasonCodes []byte) + func (h *Hooks) OnSysInfoTick(sys *system.Info) + func (h *Hooks) OnUnsubscribe(cl *Client, pk packets.Packet) packets.Packet + func (h *Hooks) OnUnsubscribed(cl *Client, pk packets.Packet) + func (h *Hooks) OnWill(cl *Client, will Will) Will + func (h *Hooks) OnWillSent(cl *Client, pk packets.Packet) + func (h *Hooks) Provides(b ...byte) bool + func (h *Hooks) Stop() + func (h *Hooks) StoredClients() (v []storage.Client, err error) + func (h *Hooks) StoredInflightMessages() (v []storage.Message, err error) + func (h *Hooks) StoredRetainedMessages() (v []storage.Message, err error) + func (h *Hooks) StoredSubscriptions() (v []storage.Subscription, err error) + func (h *Hooks) StoredSysInfo() (v storage.SystemInfo, err error) + type InboundTopicAliases struct + func NewInboundTopicAliases(topicAliasMaximum uint16) *InboundTopicAliases + func (a *InboundTopicAliases) Set(id uint16, topic string) string + type Inflight struct + func NewInflights() *Inflight + func (i *Inflight) Clone() *Inflight + func (i *Inflight) DecreaseReceiveQuota() + func (i *Inflight) DecreaseSendQuota() + func (i *Inflight) Delete(id uint16) bool + func (i *Inflight) Get(id uint16) (packets.Packet, bool) + func (i *Inflight) GetAll(immediate bool) []packets.Packet + func (i *Inflight) IncreaseReceiveQuota() + func (i *Inflight) IncreaseSendQuota() + func (i *Inflight) Len() int + func (i *Inflight) NextImmediate() (packets.Packet, bool) + func (i *Inflight) ResetReceiveQuota(n int32) + func (i *Inflight) ResetSendQuota(n int32) + func (i *Inflight) Set(m packets.Packet) bool + type InlineSubFn func(cl *Client, sub packets.Subscription, pk packets.Packet) + type InlineSubscription struct + Handler InlineSubFn + type InlineSubscriptions struct + func NewInlineSubscriptions() *InlineSubscriptions + func (s *InlineSubscriptions) Add(val InlineSubscription) + func (s *InlineSubscriptions) Delete(id int) + func (s *InlineSubscriptions) Get(id int) (val InlineSubscription, ok bool) + func (s *InlineSubscriptions) GetAll() map[int]InlineSubscription + func (s *InlineSubscriptions) Len() int + type Options struct + Capabilities *Capabilities + ClientNetReadBufferSize int + ClientNetWriteBufferSize int + Hooks []HookLoadConfig + InlineClient bool + Listeners []listeners.Config + Logger *slog.Logger + SysTopicResendInterval int64 + type OutboundTopicAliases struct + func NewOutboundTopicAliases(topicAliasMaximum uint16) *OutboundTopicAliases + func (a *OutboundTopicAliases) Set(topic string) (uint16, bool) + type ReadFn func(*Client, packets.Packet) error + type Server struct + Clients *Clients + Info *system.Info + Listeners *listeners.Listeners + Log *slog.Logger + Options *Options + Topics *TopicsIndex + func New(opts *Options) *Server + func (s *Server) AddHook(hook Hook, config any) error + func (s *Server) AddHooksFromConfig(hooks []HookLoadConfig) error + func (s *Server) AddListener(l listeners.Listener) error + func (s *Server) AddListenersFromConfig(configs []listeners.Config) error + func (s *Server) Close() error + func (s *Server) DisconnectClient(cl *Client, code packets.Code) error + func (s *Server) EstablishConnection(listener string, c net.Conn) error + func (s *Server) InjectPacket(cl *Client, pk packets.Packet) error + func (s *Server) NewClient(c net.Conn, listener string, id string, inline bool) *Client + func (s *Server) Publish(topic string, payload []byte, retain bool, qos byte) error + func (s *Server) SendConnack(cl *Client, reason packets.Code, present bool, properties *packets.Properties) error + func (s *Server) Serve() error + func (s *Server) Subscribe(filter string, subscriptionId int, handler InlineSubFn) error + func (s *Server) Unsubscribe(filter string, subscriptionId int) error + func (s *Server) UnsubscribeClient(cl *Client) + type SharedSubscriptions struct + func NewSharedSubscriptions() *SharedSubscriptions + func (s *SharedSubscriptions) Add(group, id string, val packets.Subscription) + func (s *SharedSubscriptions) Delete(group, id string) + func (s *SharedSubscriptions) Get(group, id string) (val packets.Subscription, ok bool) + func (s *SharedSubscriptions) GetAll() map[string]map[string]packets.Subscription + func (s *SharedSubscriptions) GroupLen() int + func (s *SharedSubscriptions) Len() int + type Subscribers struct + InlineSubscriptions map[int]InlineSubscription + Shared map[string]map[string]packets.Subscription + SharedSelected map[string]packets.Subscription + Subscriptions map[string]packets.Subscription + func (s *Subscribers) MergeSharedSelected() + func (s *Subscribers) SelectShared() + type Subscriptions struct + func NewSubscriptions() *Subscriptions + func (s *Subscriptions) Add(id string, val packets.Subscription) + func (s *Subscriptions) Delete(id string) + func (s *Subscriptions) Get(id string) (val packets.Subscription, ok bool) + func (s *Subscriptions) GetAll() map[string]packets.Subscription + func (s *Subscriptions) Len() int + type TopicAliases struct + Inbound *InboundTopicAliases + Outbound *OutboundTopicAliases + func NewTopicAliases(topicAliasMaximum uint16) TopicAliases + type TopicsIndex struct + Retained *packets.Packets + func NewTopicsIndex() *TopicsIndex + func (x *TopicsIndex) InlineSubscribe(subscription InlineSubscription) bool + func (x *TopicsIndex) InlineUnsubscribe(id int, filter string) bool + func (x *TopicsIndex) Messages(filter string) []packets.Packet + func (x *TopicsIndex) RetainMessage(pk packets.Packet) int64 + func (x *TopicsIndex) Subscribe(client string, subscription packets.Subscription) bool + func (x *TopicsIndex) Subscribers(topic string) *Subscribers + func (x *TopicsIndex) Unsubscribe(filter, client string) bool + type Will struct + Flag uint32 + Payload []byte + Qos byte + Retain bool + TopicName string + User []packets.UserProperty + WillDelayInterval uint32