broker

package
v0.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2018 License: Apache-2.0 Imports: 10 Imported by: 10

Documentation

Overview

Package broker implements an extensible MQTT broker.

Example
server, err := transport.Launch("tcp://localhost:8080")
if err != nil {
	panic(err)
}

done := make(chan struct{})

backend := NewMemoryBackend()
backend.Logger = func(e LogEvent, c *Client, pkt packet.Generic, msg *packet.Message, err error) {
	if err != nil {
		fmt.Printf("B [%s] %s\n", e, err.Error())
	} else if msg != nil {
		fmt.Printf("B [%s] %s\n", e, msg.String())
	} else if pkt != nil {
		fmt.Printf("B [%s] %s\n", e, pkt.String())
	} else {
		fmt.Printf("B [%s]\n", e)
	}

	if e == LostConnection {
		close(done)
	}
}

engine := NewEngine(backend)
engine.Accept(server)

c := client.New()
wait := make(chan struct{})

c.Callback = func(msg *packet.Message, err error) error {
	if err != nil {
		panic(err)
	}

	fmt.Printf("C [message] %s\n", msg.String())
	close(wait)
	return nil
}

cf, err := c.Connect(client.NewConfig("tcp://localhost:8080"))
if err != nil {
	panic(err)
}

err = cf.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

sf, err := c.Subscribe("test", 0)
if err != nil {
	panic(err)
}

err = sf.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

pf, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
	panic(err)
}

err = pf.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

<-wait

err = c.Disconnect()
if err != nil {
	panic(err)
}

<-done

err = server.Close()
if err != nil {
	panic(err)
}

engine.Close()
Output:

B [new connection]
B [packet received] <Connect ClientID="" KeepAlive=30 Username="" Password="" CleanSession=true Will=nil Version=4>
B [packet sent] <Connack SessionPresent=false ReturnCode=0>
B [packet received] <Subscribe ID=1 Subscriptions=["test"=>0]>
B [packet sent] <Suback ID=1 ReturnCodes=[0]>
B [packet received] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]> Dup=false>
B [message published] <Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>
B [message dequeued] <Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>
B [packet sent] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]> Dup=false>
B [message forwarded] <Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>
C [message] <Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>
B [packet received] <Disconnect>
B [client disconnected]
B [lost connection]

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClientClosed = errors.New("client has been closed")

ErrClientClosed is returned if a client is being closed by the broker.

View Source
var ErrClientDisconnected = errors.New("client has disconnected")

ErrClientDisconnected is returned if a client disconnects cleanly.

View Source
var ErrClosing = errors.New("closing")

ErrClosing is returned to a client if the backend is closing.

View Source
var ErrExpectedConnect = errors.New("expected a Connect as the first packet")

ErrExpectedConnect is returned when the first received packet is not a Connect.

View Source
var ErrKillTimeout = errors.New("kill timeout")

ErrKillTimeout is returned to a client if the existing client does not close in time.

View Source
var ErrMissingSession = errors.New("no session returned from Backend")

ErrMissingSession is returned if the backend does not return a session.

View Source
var ErrNotAuthorized = errors.New("client is not authorized")

ErrNotAuthorized is returned when a client is not authorized.

View Source
var ErrQueueFull = errors.New("queue full")

ErrQueueFull is returned to a client that attempts two write to its own full queue, which would result in a deadlock.

Functions

func Run

func Run(engine *Engine, protocol string) (string, chan struct{}, chan struct{})

Run runs the passed engine on a random available port and returns a channel that can be closed to shutdown the engine. This method is intended to be used in testing scenarios.

Types

type Ack added in v0.6.0

type Ack func()

Ack is executed by the Backend or Client to signal either that a message will be delivered under the selected qos level and is therefore safe to be deleted from either queue or the successful handling of subscriptions.

type Backend

type Backend interface {
	// Authenticate should authenticate the client using the user and password
	// values and return true if the client is eligible to continue or false
	// when the broker should terminate the connection.
	Authenticate(client *Client, user, password string) (ok bool, err error)

	// Setup is called when a new client comes online and is successfully
	// authenticated. Setup should return the already stored session for the
	// supplied id or create and return a new one if it is missing or a clean
	// session is requested. If the supplied id has a zero length, a new
	// temporary session should returned that is not stored further. The backend
	// should also close any existing clients that use the same client id.
	//
	// Note: In this call the Backend may also allocate other resources and
	// setup the client for further usage as the broker will acknowledge the
	// connection when the call returns. The Terminate function is called for
	// every client that Setup has been called for.
	Setup(client *Client, id string, clean bool) (a Session, resumed bool, err error)

	// Restore is called after the client has restored packets from the session.
	//
	// The Backend should resubscribe stored subscriptions. Retained messages
	// that are delivered as a result of resubscribing a stored subscription
	// must be queued with the retain flag set to false.
	Restore(client *Client) error

	// Subscribe should subscribe the passed client to the specified topics and
	// store the subscription in the session. If an Ack is provided, the
	// subscription will be acknowledged when called during or after the call to
	// Subscribe.
	//
	// Incoming messages that match the supplied subscription should be added to
	// a temporary or persistent queue that is drained when Dequeue is called.
	//
	// Retained messages that match the supplied subscription should be added to
	// a temporary queue that is also drained when Dequeue is called. Retained
	// messages are not part of the stored session state as they are anyway
	// redelivered using the stored subscription mechanism.
	Subscribe(client *Client, subs []packet.Subscription, ack Ack) error

	// Unsubscribe should unsubscribe the passed client from the specified topics
	// and remove the subscriptions from the session. If an Ack is provided, the
	// unsubscription will be acknowledged when called during or after the call
	// to Unsubscribe.
	Unsubscribe(client *Client, topics []string, ack Ack) error

	// Publish should forward the passed message to all other clients that hold
	// a subscription that matches the messages topic. It should also add the
	// message to all sessions that have a matching offline subscription. The
	// later may only apply to messages with a QoS greater than 0. If an Ack is
	// provided, the message will be acknowledged when called during or after
	// the call to Publish.
	//
	// If the retained flag is set, messages with a payload should replace the
	// currently retained message. Otherwise, the currently retained message
	// should be removed. The flag should be cleared before publishing the
	// message to subscribed clients.
	Publish(client *Client, msg *packet.Message, ack Ack) error

	// Dequeue is called by the Client to obtain the next message from the queue
	// and must return either a message or an error. The backend must only return
	// no message and no error if the client's Closing channel has been closed.
	//
	// The Backend may return an Ack to receive a signal that the message is being
	// delivered under the selected qos level and is therefore safe to be deleted
	// from the queue. The Client might dequeue other messages before acknowledging
	// a message.
	//
	// The returned message must have a QoS set that respects the QoS set by
	// the matching subscription.
	Dequeue(client *Client) (*packet.Message, Ack, error)

	// Terminate is called when the client goes offline. Terminate should
	// unsubscribe the passed client from all previously subscribed topics. The
	// backend may also convert a clients subscriptions to offline subscriptions.
	//
	// Note: The Backend may also cleanup previously allocated resources for
	// that client as the broker will close the connection when the call
	// returns.
	Terminate(client *Client) error

	// Log is called multiple times during the lifecycle of a client see LogEvent
	// for a list of all events.
	Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)
}

A Backend provides the effective brokering functionality to its clients.

type Client

type Client struct {
	// PacketPrefetch may be set during Setup to control the number of packets
	// that are read by Client and made available for processing. Will default
	// to 10 if not set.
	PacketPrefetch int

	// ParallelPublishes may be set during Setup to control the number of
	// parallel calls to Publish a client can perform. Will default to 10.
	ParallelPublishes int

	// ParallelSubscribes may be set during Setup to control the number of
	// parallel calls to Subscribe and Unsubscribe a client can perform.
	// Will default to 10.
	ParallelSubscribes int

	// ParallelDequeues may be set during Setup to control the number of
	// parallel calls to Dequeue a client can perform. Will default to 10.
	ParallelDequeues int
	// contains filtered or unexported fields
}

A Client represents a remote client that is connected to the broker.

func NewClient added in v0.6.0

func NewClient(backend Backend, conn transport.Conn) *Client

NewClient takes over a connection and returns a Client.

func (*Client) Close

func (c *Client) Close()

Close will immediately close the client.

func (*Client) Closed added in v0.6.0

func (c *Client) Closed() <-chan struct{}

Closed returns a channel that is closed when the client is closed.

func (*Client) Closing added in v0.6.0

func (c *Client) Closing() <-chan struct{}

Closing returns a channel that is closed when the client is closing.

func (*Client) Conn added in v0.6.0

func (c *Client) Conn() transport.Conn

Conn returns the client's underlying connection. Calls to SetReadLimit, SetBuffers, LocalAddr and RemoteAddr are safe.

func (*Client) ID added in v0.6.0

func (c *Client) ID() string

ID returns the clients id that has been supplied during connect.

func (*Client) Session

func (c *Client) Session() Session

Session returns the current Session used by the client.

type Engine

type Engine struct {
	// The Backend that will passed to accepted clients.
	Backend Backend

	// ConnectTimeout defines the timeout to receive the first packet.
	ConnectTimeout time.Duration

	// The Default* properties will be set on newly accepted connections.
	DefaultReadLimit   int64
	DefaultReadBuffer  int
	DefaultWriteBuffer int

	// OnError can be used to receive errors from engine. If an error is received
	// the server should be restarted.
	OnError func(error)
	// contains filtered or unexported fields
}

The Engine handles incoming connections and connects them to the backend.

func NewEngine

func NewEngine(backend Backend) *Engine

NewEngine returns a new Engine.

func (*Engine) Accept

func (e *Engine) Accept(server transport.Server)

Accept begins accepting connections from the passed server.

func (*Engine) Close

func (e *Engine) Close()

Close will stop handling incoming connections and close all acceptors. The call will block until all acceptors returned.

Note: All passed servers to Accept must be closed before calling this method.

func (*Engine) Handle

func (e *Engine) Handle(conn transport.Conn) bool

Handle takes over responsibility and handles a transport.Conn. It returns false if the engine is closing and the connection has been closed.

type LogEvent

type LogEvent string

LogEvent denotes the class of an event passed to the logger.

const (
	// NewConnection is emitted when a client comes online.
	NewConnection LogEvent = "new connection"

	// PacketReceived is emitted when a packet has been received.
	PacketReceived LogEvent = "packet received"

	// MessagePublished is emitted after a message has been published.
	MessagePublished LogEvent = "message published"

	// MessageAcknowledged is emitted after a message has been acknowledged.
	MessageAcknowledged LogEvent = "message acknowledged"

	// MessageDequeued is emitted after a message has been dequeued.
	MessageDequeued LogEvent = "message dequeued"

	// MessageForwarded is emitted after a message has been forwarded.
	MessageForwarded LogEvent = "message forwarded"

	// PacketSent is emitted when a packet has been sent.
	PacketSent LogEvent = "packet sent"

	// ClientDisconnected is emitted when a client disconnects cleanly.
	ClientDisconnected LogEvent = "client disconnected"

	// TransportError is emitted when an underlying transport error occurs.
	TransportError LogEvent = "transport error"

	// SessionError is emitted when a call to the session fails.
	SessionError LogEvent = "session error"

	// BackendError is emitted when a call to the backend fails.
	BackendError LogEvent = "backend error"

	// ClientError is emitted when the client violates the protocol.
	ClientError LogEvent = "client error"

	// LostConnection is emitted when the connection has been terminated.
	LostConnection LogEvent = "lost connection"
)

type MemoryBackend

type MemoryBackend struct {
	// The maximal size of the session queue. Will default to 100 if not set.
	SessionQueueSize int

	// The time after an error is returned while waiting on an killed existing
	// client to exit. Will default to 5 seconds if not set.
	KillTimeout time.Duration

	// A map of username and passwords that grant read and write access.
	Credentials map[string]string

	// The Logger callback handles incoming log events.
	Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error)
	// contains filtered or unexported fields
}

A MemoryBackend stores everything in memory.

func NewMemoryBackend

func NewMemoryBackend() *MemoryBackend

NewMemoryBackend returns a new MemoryBackend.

func (*MemoryBackend) Authenticate

func (m *MemoryBackend) Authenticate(client *Client, user, password string) (bool, error)

Authenticate will authenticates a clients credentials.

func (*MemoryBackend) Close added in v0.6.0

func (m *MemoryBackend) Close(timeout time.Duration) bool

Close will close all active clients and close the backend. The return value denotes if the timeout has been reached.

func (*MemoryBackend) Dequeue added in v0.6.0

func (m *MemoryBackend) Dequeue(client *Client) (*packet.Message, Ack, error)

Dequeue will get the next message from the queues.

func (*MemoryBackend) Log added in v0.7.2

func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)

Log will call the associated logger.

func (*MemoryBackend) Publish

func (m *MemoryBackend) Publish(client *Client, msg *packet.Message, ack Ack) error

Publish will handle retained messages and add the message to the session queues.

func (*MemoryBackend) Restore added in v0.7.0

func (m *MemoryBackend) Restore(client *Client) error

Restore is not needed at the moment.

func (*MemoryBackend) Setup

func (m *MemoryBackend) Setup(client *Client, id string, clean bool) (Session, bool, error)

Setup will close existing clients and return an appropriate session.

func (*MemoryBackend) Subscribe

func (m *MemoryBackend) Subscribe(client *Client, subs []packet.Subscription, ack Ack) error

Subscribe will queue retained messages for the passed subscription.

func (*MemoryBackend) Terminate

func (m *MemoryBackend) Terminate(client *Client) error

Terminate will disassociate the session from the client.

func (*MemoryBackend) Unsubscribe

func (m *MemoryBackend) Unsubscribe(client *Client, topics []string, ack Ack) error

Unsubscribe is not needed at the moment.

type Session

type Session interface {
	// NextID should return the next id for outgoing packets.
	NextID() packet.ID

	// SavePacket should store a packet in the session. An eventual existing
	// packet with the same id should be quietly overwritten.
	SavePacket(session.Direction, packet.Generic) error

	// LookupPacket should retrieve a packet from the session using the packet id.
	LookupPacket(session.Direction, packet.ID) (packet.Generic, error)

	// DeletePacket should remove a packet from the session. The method should
	// not return an error if no packet with the specified id does exists.
	DeletePacket(session.Direction, packet.ID) error

	// AllPackets should return all packets currently saved in the session.
	AllPackets(session.Direction) ([]packet.Generic, error)
}

A Session is used to get packet ids and persist incoming/outgoing packets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL