Documentation
¶
Overview ¶
packet server provides a MQTT 3.1.1 compliant MQTT server.
Index ¶
- Constants
- Variables
- type Auth
- type AuthAllow
- type AuthDisallow
- type Client
- func (c *Client) ForgetSubscription(filter string)
- func (c *Client) Identify(listenerID string, pk packets.Packet, ac Auth)
- func (c *Client) LocalAddr() net.Addr
- func (c *Client) NextPacketID() uint32
- func (c *Client) NoteSubscription(filter string, qos byte)
- func (c *Client) Read(h func(*Client, packets.Packet) error) error
- func (c *Client) ReadFixedHeader(fh *packets.FixedHeader) error
- func (c *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err error)
- func (c *Client) RemoteAddr() net.Addr
- func (c *Client) Start()
- func (c *Client) Stop()
- func (c *Client) WritePacket(pk packets.Packet) (n int, err error)
- type Clients
- type CloseFunc
- type EstablishFunc
- type Hook
- type Inflight
- type InflightMessage
- type LWT
- type Listener
- type Listeners
- func (l *Listeners) Add(val Listener)
- func (l *Listeners) Close(id string, closer CloseFunc)
- func (l *Listeners) CloseAll(closer CloseFunc)
- func (l *Listeners) Delete(id string)
- func (l *Listeners) Get(id string) (Listener, bool)
- func (l *Listeners) Len() int
- func (l *Listeners) Serve(id string, establisher EstablishFunc)
- func (l *Listeners) ServeAll(establisher EstablishFunc)
- type NilHook
- func (*NilHook) Connect(*Server, *Client) bool
- func (*NilHook) DisConnect(*Server, *Client, error)
- func (*NilHook) Emit(*Server, *Client, *packets.Packet) bool
- func (*NilHook) Push(*Server, *Client, *packets.Packet) bool
- func (*NilHook) Recv(*Server, *Client, *packets.Packet) bool
- func (*NilHook) Send(*Server, *Client, *packets.Packet) bool
- type Server
- func (s *Server) AddListener(listener Listener) error
- func (s *Server) Close() error
- func (s *Server) EstablishConnection(listenerID string, c net.Conn, auth Auth) error
- func (s *Server) Publish(topic string, payload []byte, qos byte, retain bool) error
- func (s *Server) PublishToClient(c *Client, out packets.Packet)
- func (s *Server) PublishToClientByID(id string, topic string, payload []byte, qos byte, retain bool) error
- func (s *Server) PublishToSubscribers(pk packets.Packet)
- func (s *Server) ResendClientInflight(c *Client, force bool) error
- func (s *Server) Serve() error
- func (s *Server) SetHook(h Hook)
- func (s *Server) SetStore(p persistence.Store) error
- type State
Constants ¶
const (
Version = "1.0.0" // the server version.
)
Variables ¶
var ( ErrListenerIDExists = errors.New("Listener id already exists") ErrReadConnectInvalid = errors.New("Connect packet was not valid") ErrConnectNotAuthorized = errors.New("Connect packet was not authorized") ErrInvalidTopic = errors.New("Cannot publish to $ and $SYS topics") // SysTopicInterval is the number of milliseconds between $SYS topic publishes. SysTopicInterval time.Duration = 30000 )
var (
ErrConnectionClosed = errors.New("Connection not open")
)
Functions ¶
This section is empty.
Types ¶
type Auth ¶
type Auth interface { // Auth authenticates a user on CONNECT and returns true if a user is // allowed to join the server. Auth(client *Client) bool // ACL returns true if a user has read or write access to a given topic. ACL(client *Client, topic string, write bool) bool }
Auth is an interface for authentication controllers.
type AuthAllow ¶
type AuthAllow struct{}
Allow is an auth controller which allows access to all connections and topics.
type AuthDisallow ¶
type AuthDisallow struct{}
Disallow is an auth controller which disallows access to all connections and topics.
func (*AuthDisallow) ACL ¶
func (d *AuthDisallow) ACL(client *Client, topic string, write bool) bool
ACL returns true if a user has access permissions to read or write on a topic. Disallow always returns false.
func (*AuthDisallow) Auth ¶
func (d *AuthDisallow) Auth(client *Client) bool
Auth returns true if a username and password are acceptable. Disallow always returns false.
type Client ¶
type Client struct { sync.RWMutex ID string // the client id. AC Auth // an auth controller inherited from the listener. Subscriptions topics.Subscriptions // a map of the subscription filters a client maintains. Listener string // the id of the listener the client is connected to. Inflight Inflight // a map of in-flight qos messages. Username []byte // the username the client authenticated with. Password []byte // the password the client authenticated with. LWT LWT // the last will and testament for the client. State State // the operational state of the client. // contains filtered or unexported fields }
Client contains information about a client known by the broker.
func NewClientStub ¶
NewClientStub returns an instance of Client with basic initializations. This method is typically called by the persistence restoration system.
func (*Client) ForgetSubscription ¶
ForgetSubscription forgests a subscription note for the client.
func (*Client) NextPacketID ¶
NextPacketID returns the next packet id for a client, looping back to 0 if the maximum ID has been reached.
func (*Client) NoteSubscription ¶
NoteSubscription makes a note of a subscription for the client.
func (*Client) ReadFixedHeader ¶
func (c *Client) ReadFixedHeader(fh *packets.FixedHeader) error
readFixedHeader reads in the values of the next packet's fixed header.
func (*Client) ReadPacket ¶
ReadPacket reads the remaining buffer into an MQTT packet.
func (*Client) RemoteAddr ¶
RemoteAddr returns the remote network address.
func (*Client) Start ¶
func (c *Client) Start()
Start begins the client goroutines reading and writing packets.
type Clients ¶
Clients contains a map of the clients known by the broker.
func (*Clients) GetByListener ¶
GetByListener returns clients matching a listener id.
type CloseFunc ¶
type CloseFunc func(id string)
CloseFunc is a callback function for closing all listener clients.
type EstablishFunc ¶
EstablishFunc is a callback function for establishing new clients.
type Hook ¶
type Hook interface { // When the client connects to the server // If the return is false, the client will be rejected. Connect(*Server, *Client) bool // When the client disconnects DisConnect(*Server, *Client, error) // When the server receives a packet. // If the return is false, it will cancel the operation. Recv(*Server, *Client, *packets.Packet) bool // When the server sends a packet. // If the return is false, it will cancel the operation. Send(*Server, *Client, *packets.Packet) bool // When the server receives a message from the client publish. // If the return is false, it will cancel the operation. Emit(*Server, *Client, *packets.Packet) bool // When the server pushes a message to the client // If the return is false, it will cancel the operation. Push(*Server, *Client, *packets.Packet) bool }
Hook is the server hook interface.
type Inflight ¶
Inflight is a map of InflightMessage keyed on packet id.
func (*Inflight) Delete ¶
Delete removes an in-flight message from the map. Returns true if the message existed.
func (*Inflight) Get ¶
func (i *Inflight) Get(key uint16) (InflightMessage, bool)
Get returns the value of an in-flight message if it exists.
func (*Inflight) GetAll ¶
func (i *Inflight) GetAll() map[uint16]InflightMessage
GetAll returns all the in-flight messages.
type InflightMessage ¶
type InflightMessage struct { Packet packets.Packet // the packet currently in-flight. Sent int64 // the last time the message was sent (for retries) in unixtime. Resends int // the number of times the message was attempted to be sent. }
InflightMessage contains data about a packet which is currently in-flight.
type LWT ¶
type LWT struct { Topic string // the topic the will message shall be sent to. Message []byte // the message that shall be sent when the client disconnects. Qos byte // the quality of service desired. Retain bool // indicates whether the will message should be retained }
LWT contains the last will and testament details for a client connection.
type Listener ¶
type Listener interface { Listen(s *system.Info) error // open the network address. Serve(EstablishFunc) error // starting actively listening for new connections. ID() string // return the id of the listener. Auth() Auth Close(CloseFunc) // stop and close the listener. }
Listener is an interface for network listeners. A network listener listens for incoming client connections and adds them to the server.
type Listeners ¶
Listeners contains the network listeners for the broker.
func NewListeners ¶
New returns a new instance of Listeners.
func (*Listeners) Serve ¶
func (l *Listeners) Serve(id string, establisher EstablishFunc)
Serve starts a listener serving from the internal map.
func (*Listeners) ServeAll ¶
func (l *Listeners) ServeAll(establisher EstablishFunc)
ServeAll starts all listeners serving from the internal map.
type Server ¶
type Server struct { Listeners *Listeners // listeners are network interfaces which listen for new connections. Clients *Clients // clients which are known to the broker. Topics *topics.Index // an index of topic filter subscriptions and retained messages. System *system.Info // values about the server commonly found in $SYS topics. Store persistence.Store // a persistent storage backend if desired. Hook Hook // contains filtered or unexported fields }
Server is an MQTT broker server. It should be created with server.New() in order to ensure all the internal fields are correctly populated.
func (*Server) AddListener ¶
AddListener adds a new network listener to the server.
func (*Server) Close ¶
Close attempts to gracefully shutdown the server, all listeners, clients, and stores.
func (*Server) EstablishConnection ¶
EstablishConnection establishes a new client when a listener accepts a new connection.
func (*Server) PublishToClientByID ¶
func (*Server) PublishToSubscribers ¶
publishToSubscribers publishes a publish packet to all subscribers with matching topic filters.
func (*Server) ResendClientInflight ¶
ResendClientInflight attempts to resend all undelivered inflight messages to a