Documentation ¶
Overview ¶
Package broker implements an extensible MQTT broker.
Example ¶
server, err := transport.Launch("tcp://localhost:8080") if err != nil { panic(err) } done := make(chan struct{}) backend := NewMemoryBackend() backend.Logger = func(e LogEvent, c *Client, pkt packet.Generic, msg *packet.Message, err error) { if err != nil { fmt.Printf("B [%s] %s\n", e, err.Error()) } else if msg != nil { fmt.Printf("B [%s] %s\n", e, msg.String()) } else if pkt != nil { fmt.Printf("B [%s] %s\n", e, pkt.String()) } else { fmt.Printf("B [%s]\n", e) } if e == LostConnection { close(done) } } engine := NewEngine(backend) engine.Accept(server) c := client.New() wait := make(chan struct{}) c.Callback = func(msg *packet.Message, err error) error { if err != nil { panic(err) } fmt.Printf("C [message] %s\n", msg.String()) close(wait) return nil } cf, err := c.Connect(client.NewConfig("tcp://localhost:8080")) if err != nil { panic(err) } err = cf.Wait(10 * time.Second) if err != nil { panic(err) } sf, err := c.Subscribe("test", 0) if err != nil { panic(err) } err = sf.Wait(10 * time.Second) if err != nil { panic(err) } pf, err := c.Publish("test", []byte("test"), 0, false) if err != nil { panic(err) } err = pf.Wait(10 * time.Second) if err != nil { panic(err) } <-wait err = c.Disconnect() if err != nil { panic(err) } <-done err = server.Close() if err != nil { panic(err) } engine.Close()
Output: B [new connection] B [packet received] <Connect ClientID="" KeepAlive=30 Username="" Password="" CleanSession=true Will=nil Version=4> B [packet sent] <Connack SessionPresent=false ReturnCode=0> B [packet received] <Subscribe ID=1 Subscriptions=["test"=>0]> B [packet sent] <Suback ID=1 ReturnCodes=[0]> B [packet received] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=74657374> Dup=false> B [message published] <Message Topic="test" QOS=0 Retain=false Payload=74657374> B [message dequeued] <Message Topic="test" QOS=0 Retain=false Payload=74657374> B [packet sent] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=74657374> Dup=false> B [message forwarded] <Message Topic="test" QOS=0 Retain=false Payload=74657374> C [message] <Message Topic="test" QOS=0 Retain=false Payload=74657374> B [packet received] <Disconnect> B [client disconnected] B [lost connection]
Index ¶
- Variables
- func Run(engine *Engine, protocol string) (string, chan struct{}, chan struct{})
- type Ack
- type Backend
- type Client
- type Engine
- type LogEvent
- type MemoryBackend
- func (m *MemoryBackend) Authenticate(_ *Client, user, password string) (bool, error)
- func (m *MemoryBackend) Close(timeout time.Duration) bool
- func (m *MemoryBackend) Dequeue(client *Client) (*packet.Message, Ack, error)
- func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, ...)
- func (m *MemoryBackend) Publish(client *Client, msg *packet.Message, ack Ack) error
- func (m *MemoryBackend) Restore(*Client) error
- func (m *MemoryBackend) Setup(client *Client, id string, clean bool) (Session, bool, error)
- func (m *MemoryBackend) Subscribe(client *Client, subs []packet.Subscription, ack Ack) error
- func (m *MemoryBackend) Terminate(client *Client) error
- func (m *MemoryBackend) Unsubscribe(client *Client, topics []string, ack Ack) error
- type Session
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClientClosed = errors.New("client closed")
ErrClientClosed is returned if a client is being closed by the broker.
var ErrClientDisconnected = errors.New("client disconnected")
ErrClientDisconnected is returned if a client disconnects cleanly.
var ErrClosing = errors.New("closing")
ErrClosing is returned to a client if the backend is closing.
var ErrKillTimeout = errors.New("kill timeout")
ErrKillTimeout is returned to a client if the existing client does not close in time.
var ErrMissingSession = errors.New("missing session")
ErrMissingSession is returned if the backend does not return a session.
var ErrNotAuthorized = errors.New("not authorized")
ErrNotAuthorized is returned when a client is not authorized.
var ErrQueueFull = errors.New("queue full")
ErrQueueFull is returned to a client that attempts two write to its own full queue, which would result in a deadlock.
var ErrTokenTimeout = errors.New("token timeout")
ErrTokenTimeout is returned if the client reaches the token timeout.
var ErrUnexpectedPacket = errors.New("unexpected packet")
ErrUnexpectedPacket is returned when an unexpected packet is received.
Functions ¶
Types ¶
type Ack ¶ added in v0.6.0
type Ack func()
Ack is executed by the Backend or Client to signal either that a message will be delivered under the selected qos level and is therefore safe to be deleted from either queue or the successful handling of subscriptions.
type Backend ¶
type Backend interface { // Authenticate should authenticate the client using the user and password // values and return true if the client is eligible to continue or false // when the broker should terminate the connection. Authenticate(client *Client, user, password string) (ok bool, err error) // Setup is called when a new client comes online and is successfully // authenticated. Setup should return the already stored session for the // supplied id or create and return a new one if it is missing or a clean // session is requested. If the supplied id has a zero length, a new // temporary session should be returned that is not stored further. The // backend should also close any existing clients that use the same id. // // Note: In this call the Backend may also allocate other resources and // set up the client for further usage as the broker will acknowledge the // connection when the call returns. The Terminate function is called for // every client that Setup has been called for. Setup(client *Client, id string, clean bool) (a Session, resumed bool, err error) // Restore is called after the client has restored packets from the session. // // The Backend should resubscribe stored subscriptions and begin with queueing // missed offline messages. When all offline messages have been queued the // client may receive online messages. Depending on the implementation, this // may not be required as Dequeue will already pick up offline messages. Restore(client *Client) error // Subscribe should subscribe the passed client to the specified topics and // store the subscription in the session. If an Ack is provided, the // subscription will be acknowledged when called during or after the call to // Subscribe. // // Incoming messages that match the supplied subscription should be added to // a temporary or persistent queue that is drained when Dequeue is called. // // Retained messages that match the supplied subscription should be added to // a temporary queue that is also drained when Dequeue is called. The messages // must be delivered with the retained flag set to true. Subscribe(client *Client, subs []packet.Subscription, ack Ack) error // Unsubscribe should unsubscribe the passed client from the specified topics // and remove the subscriptions from the session. If an Ack is provided, the // unsubscription will be acknowledged when called during or after the call // to Unsubscribe. Unsubscribe(client *Client, topics []string, ack Ack) error // Publish should forward the passed message to all other clients that hold // a subscription that matches the message's topic. It should also add the // message to all sessions that have a matching offline subscription. The // later may only apply to message's with a QOS greater than 0. If an Ack is // provided, the message will be acknowledged when called during or after // the call to Publish. // // If the retained flag is set, messages with a payload should replace the // currently retained message. Otherwise, the currently retained message // should be removed. The flag should be cleared before publishing the // message to other subscribed clients. Publish(client *Client, msg *packet.Message, ack Ack) error // Dequeue is called by the Client to obtain the next message from the queue // and must return either a message or an error. The backend must only return // no message and no error if the client's Closing channel has been closed. // // The Backend may return an Ack to receive a signal that the message is being // delivered under the selected qos level and is therefore safe to be deleted // from the queue. The Ack will be called before Dequeue is called again. // // The returned message must have a QOS set that respects the QOS set by // the matching subscription. Dequeue(client *Client) (*packet.Message, Ack, error) // Terminate is called when the client goes offline. Terminate should // unsubscribe the passed client from all previously subscribed topics. The // backend may also convert a clients subscriptions to offline subscriptions. // // Note: The Backend may also clean up previously allocated resources for // that client as the broker will close the connection when the call // returns. Terminate(client *Client) error // Log is called multiple times during the lifecycle of a client see LogEvent // for a list of all events. Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error) }
A Backend provides the effective brokering functionality to its clients.
type Client ¶
type Client struct { // MaximumKeepAlive may be set during Setup to enforce a maximum keep alive // for this client. Missing or higher intervals will be set to the specified // value. // // Will default to 5 minutes. MaximumKeepAlive time.Duration // ParallelPublishes may be set during Setup to control the number of // parallel calls to Publish a client can perform. This setting also has an // effect on how many incoming packets are stored in the client's session. // // Will default to 10. ParallelPublishes int // ParallelSubscribes may be set during Setup to control the number of // parallel calls to Subscribe and Unsubscribe a client can perform. // // Will default to 10. ParallelSubscribes int // InflightMessages may be set during Setup to control the number of // inflight messages from the broker to the client. This also defines how // many outgoing packets are stored in the client's session. // // Will default to 10. InflightMessages int // TokenTimeout sets the timeout after which the client should fail when // obtaining publish, subscribe and dequeue tokens in order to prevent // potential deadlocks. // // Will default to 30 seconds. TokenTimeout time.Duration // PacketCallback can be set to inspect packets before processing and // apply rate limits. To guarantee the connection lifecycle, Connect and // Disconnect packets are not provided to the callback. PacketCallback func(packet.Generic) error // Ref can be used by the backend to attach a custom object to the client. Ref interface{} // contains filtered or unexported fields }
A Client represents a remote client that is connected to the broker.
func (*Client) Closed ¶ added in v0.6.0
func (c *Client) Closed() <-chan struct{}
Closed returns a channel that is closed when the client is closed.
func (*Client) Closing ¶ added in v0.6.0
func (c *Client) Closing() <-chan struct{}
Closing returns a channel that is closed when the client is closing.
func (*Client) Conn ¶ added in v0.6.0
Conn returns the client's underlying connection. Calls to SetReadLimit, LocalAddr and RemoteAddr are safe.
type Engine ¶
type Engine struct { // The Backend that will be passed to accepted clients. Backend Backend // ReadLimit defines the initial read limit. ReadLimit int64 // MaxWriteDelay defines the initial max write delay. MaxWriteDelay time.Duration // ConnectTimeout defines the timeout to receive the first packet. ConnectTimeout time.Duration // OnError can be used to receive errors from the engine. If an error is // received the server should be restarted. OnError func(error) // contains filtered or unexported fields }
The Engine handles incoming connections and connects them to the backend.
type LogEvent ¶
type LogEvent string
LogEvent denotes the class of an event passed to the logger.
const ( // NewConnection is emitted when a client comes online. NewConnection LogEvent = "new connection" // PacketReceived is emitted when a packet has been received. PacketReceived LogEvent = "packet received" // MessagePublished is emitted after a message has been published. MessagePublished LogEvent = "message published" // MessageAcknowledged is emitted after a message has been acknowledged. MessageAcknowledged LogEvent = "message acknowledged" // MessageDequeued is emitted after a message has been dequeued. MessageDequeued LogEvent = "message dequeued" // MessageForwarded is emitted after a message has been forwarded. MessageForwarded LogEvent = "message forwarded" // PacketSent is emitted when a packet has been sent. PacketSent LogEvent = "packet sent" // ClientDisconnected is emitted when a client disconnects cleanly. ClientDisconnected LogEvent = "client disconnected" // TransportError is emitted when an underlying transport error occurs. TransportError LogEvent = "transport error" // SessionError is emitted when a call to the session fails. SessionError LogEvent = "session error" // BackendError is emitted when a call to the backend fails. BackendError LogEvent = "backend error" // ClientError is emitted when the client violates the protocol. ClientError LogEvent = "client error" // LostConnection is emitted when the connection has been terminated. LostConnection LogEvent = "lost connection" )
type MemoryBackend ¶
type MemoryBackend struct { // The size of the session queue. SessionQueueSize int // The time after an error is returned while waiting on a killed existing // client to exit. KillTimeout time.Duration // Client configuration options. See broker.Client for details. ClientMaximumKeepAlive time.Duration ClientParallelPublishes int ClientParallelSubscribes int ClientInflightMessages int ClientTokenTimeout time.Duration // A map of username and passwords that grant read and write access. Credentials map[string]string // The Logger callback handles incoming log events. Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error) // contains filtered or unexported fields }
A MemoryBackend stores everything in memory.
func NewMemoryBackend ¶
func NewMemoryBackend() *MemoryBackend
NewMemoryBackend returns a new MemoryBackend.
func (*MemoryBackend) Authenticate ¶
func (m *MemoryBackend) Authenticate(_ *Client, user, password string) (bool, error)
Authenticate will authenticate a clients credentials.
func (*MemoryBackend) Close ¶ added in v0.6.0
func (m *MemoryBackend) Close(timeout time.Duration) bool
Close will close all active clients and close the backend. The return value denotes if the timeout has been reached.
func (*MemoryBackend) Dequeue ¶ added in v0.6.0
Dequeue will get the next message from the temporary or stored queue.
func (*MemoryBackend) Log ¶ added in v0.7.2
func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)
Log will call the associated logger.
func (*MemoryBackend) Publish ¶
Publish will handle retained messages and add the message to the session queues.
func (*MemoryBackend) Restore ¶ added in v0.7.0
func (m *MemoryBackend) Restore(*Client) error
Restore is not needed at the moment.
func (*MemoryBackend) Subscribe ¶
func (m *MemoryBackend) Subscribe(client *Client, subs []packet.Subscription, ack Ack) error
Subscribe will store the subscription and queue retained messages.
func (*MemoryBackend) Terminate ¶
func (m *MemoryBackend) Terminate(client *Client) error
Terminate will disassociate the session from the client.
func (*MemoryBackend) Unsubscribe ¶
func (m *MemoryBackend) Unsubscribe(client *Client, topics []string, ack Ack) error
Unsubscribe will delete the subscription.
type Session ¶
type Session interface { // NextID should return the next id for outgoing packets. NextID() packet.ID // SavePacket should store a packet in the session. An eventual existing // packet with the same id should be quietly overwritten. SavePacket(session.Direction, packet.Generic) error // LookupPacket should retrieve a packet from the session using the packet id. LookupPacket(session.Direction, packet.ID) (packet.Generic, error) // DeletePacket should remove a packet from the session. The method should // not return an error if no packet with the specified id does exist. DeletePacket(session.Direction, packet.ID) error // AllPackets should return all packets currently saved in the session. AllPackets(session.Direction) ([]packet.Generic, error) }
A Session is used to get packet ids and persist incoming/outgoing packets.