broker

package
v0.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2021 License: Apache-2.0 Imports: 10 Imported by: 10

Documentation

Overview

Package broker implements an extensible MQTT broker.

Example
server, err := transport.Launch("tcp://localhost:8080")
if err != nil {
	panic(err)
}

done := make(chan struct{})

backend := NewMemoryBackend()
backend.Logger = func(e LogEvent, c *Client, pkt packet.Generic, msg *packet.Message, err error) {
	if err != nil {
		fmt.Printf("B [%s] %s\n", e, err.Error())
	} else if msg != nil {
		fmt.Printf("B [%s] %s\n", e, msg.String())
	} else if pkt != nil {
		fmt.Printf("B [%s] %s\n", e, pkt.String())
	} else {
		fmt.Printf("B [%s]\n", e)
	}

	if e == LostConnection {
		close(done)
	}
}

engine := NewEngine(backend)
engine.Accept(server)

c := client.New()
wait := make(chan struct{})

c.Callback = func(msg *packet.Message, err error) error {
	if err != nil {
		panic(err)
	}

	fmt.Printf("C [message] %s\n", msg.String())
	close(wait)
	return nil
}

cf, err := c.Connect(client.NewConfig("tcp://localhost:8080"))
if err != nil {
	panic(err)
}

err = cf.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

sf, err := c.Subscribe("test", 0)
if err != nil {
	panic(err)
}

err = sf.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

pf, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
	panic(err)
}

err = pf.Wait(10 * time.Second)
if err != nil {
	panic(err)
}

<-wait

err = c.Disconnect()
if err != nil {
	panic(err)
}

<-done

err = server.Close()
if err != nil {
	panic(err)
}

engine.Close()
Output:

B [new connection]
B [packet received] <Connect ClientID="" KeepAlive=30 Username="" Password="" CleanSession=true Will=nil Version=4>
B [packet sent] <Connack SessionPresent=false ReturnCode=0>
B [packet received] <Subscribe ID=1 Subscriptions=["test"=>0]>
B [packet sent] <Suback ID=1 ReturnCodes=[0]>
B [packet received] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=74657374> Dup=false>
B [message published] <Message Topic="test" QOS=0 Retain=false Payload=74657374>
B [message dequeued] <Message Topic="test" QOS=0 Retain=false Payload=74657374>
B [packet sent] <Publish ID=0 Message=<Message Topic="test" QOS=0 Retain=false Payload=74657374> Dup=false>
B [message forwarded] <Message Topic="test" QOS=0 Retain=false Payload=74657374>
C [message] <Message Topic="test" QOS=0 Retain=false Payload=74657374>
B [packet received] <Disconnect>
B [client disconnected]
B [lost connection]

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClientClosed = errors.New("client closed")

ErrClientClosed is returned if a client is being closed by the broker.

View Source
var ErrClientDisconnected = errors.New("client disconnected")

ErrClientDisconnected is returned if a client disconnects cleanly.

View Source
var ErrClosing = errors.New("closing")

ErrClosing is returned to a client if the backend is closing.

View Source
var ErrKillTimeout = errors.New("kill timeout")

ErrKillTimeout is returned to a client if the existing client does not close in time.

View Source
var ErrMissingSession = errors.New("missing session")

ErrMissingSession is returned if the backend does not return a session.

View Source
var ErrNotAuthorized = errors.New("not authorized")

ErrNotAuthorized is returned when a client is not authorized.

View Source
var ErrQueueFull = errors.New("queue full")

ErrQueueFull is returned to a client that attempts two write to its own full queue, which would result in a deadlock.

View Source
var ErrTokenTimeout = errors.New("token timeout")

ErrTokenTimeout is returned if the client reaches the token timeout.

View Source
var ErrUnexpectedPacket = errors.New("unexpected packet")

ErrUnexpectedPacket is returned when an unexpected packet is received.

Functions

func Run

func Run(engine *Engine, protocol string) (string, chan struct{}, chan struct{})

Run runs the passed engine on a random available port and returns a channel that can be closed to shut down the engine. This method is intended to be used in testing scenarios.

Types

type Ack added in v0.6.0

type Ack func()

Ack is executed by the Backend or Client to signal either that a message will be delivered under the selected qos level and is therefore safe to be deleted from either queue or the successful handling of subscriptions.

type Backend

type Backend interface {
	// Authenticate should authenticate the client using the user and password
	// values and return true if the client is eligible to continue or false
	// when the broker should terminate the connection.
	Authenticate(client *Client, user, password string) (ok bool, err error)

	// Setup is called when a new client comes online and is successfully
	// authenticated. Setup should return the already stored session for the
	// supplied id or create and return a new one if it is missing or a clean
	// session is requested. If the supplied id has a zero length, a new
	// temporary session should be returned that is not stored further. The
	// backend should also close any existing clients that use the same id.
	//
	// Note: In this call the Backend may also allocate other resources and
	// set up the client for further usage as the broker will acknowledge the
	// connection when the call returns. The Terminate function is called for
	// every client that Setup has been called for.
	Setup(client *Client, id string, clean bool) (a Session, resumed bool, err error)

	// Restore is called after the client has restored packets from the session.
	//
	// The Backend should resubscribe stored subscriptions and begin with queueing
	// missed offline messages. When all offline messages have been queued the
	// client may receive online messages. Depending on the implementation, this
	// may not be required as Dequeue will already pick up offline messages.
	Restore(client *Client) error

	// Subscribe should subscribe the passed client to the specified topics and
	// store the subscription in the session. If an Ack is provided, the
	// subscription will be acknowledged when called during or after the call to
	// Subscribe.
	//
	// Incoming messages that match the supplied subscription should be added to
	// a temporary or persistent queue that is drained when Dequeue is called.
	//
	// Retained messages that match the supplied subscription should be added to
	// a temporary queue that is also drained when Dequeue is called. The messages
	// must be delivered with the retained flag set to true.
	Subscribe(client *Client, subs []packet.Subscription, ack Ack) error

	// Unsubscribe should unsubscribe the passed client from the specified topics
	// and remove the subscriptions from the session. If an Ack is provided, the
	// unsubscription will be acknowledged when called during or after the call
	// to Unsubscribe.
	Unsubscribe(client *Client, topics []string, ack Ack) error

	// Publish should forward the passed message to all other clients that hold
	// a subscription that matches the message's topic. It should also add the
	// message to all sessions that have a matching offline subscription. The
	// later may only apply to message's with a QOS greater than 0. If an Ack is
	// provided, the message will be acknowledged when called during or after
	// the call to Publish.
	//
	// If the retained flag is set, messages with a payload should replace the
	// currently retained message. Otherwise, the currently retained message
	// should be removed. The flag should be cleared before publishing the
	// message to other subscribed clients.
	Publish(client *Client, msg *packet.Message, ack Ack) error

	// Dequeue is called by the Client to obtain the next message from the queue
	// and must return either a message or an error. The backend must only return
	// no message and no error if the client's Closing channel has been closed.
	//
	// The Backend may return an Ack to receive a signal that the message is being
	// delivered under the selected qos level and is therefore safe to be deleted
	// from the queue. The Ack will be called before Dequeue is called again.
	//
	// The returned message must have a QOS set that respects the QOS set by
	// the matching subscription.
	Dequeue(client *Client) (*packet.Message, Ack, error)

	// Terminate is called when the client goes offline. Terminate should
	// unsubscribe the passed client from all previously subscribed topics. The
	// backend may also convert a clients subscriptions to offline subscriptions.
	//
	// Note: The Backend may also clean up previously allocated resources for
	// that client as the broker will close the connection when the call
	// returns.
	Terminate(client *Client) error

	// Log is called multiple times during the lifecycle of a client see LogEvent
	// for a list of all events.
	Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)
}

A Backend provides the effective brokering functionality to its clients.

type Client

type Client struct {

	// MaximumKeepAlive may be set during Setup to enforce a maximum keep alive
	// for this client. Missing or higher intervals will be set to the specified
	// value.
	//
	// Will default to 5 minutes.
	MaximumKeepAlive time.Duration

	// ParallelPublishes may be set during Setup to control the number of
	// parallel calls to Publish a client can perform. This setting also has an
	// effect on how many incoming packets are stored in the client's session.
	//
	// Will default to 10.
	ParallelPublishes int

	// ParallelSubscribes may be set during Setup to control the number of
	// parallel calls to Subscribe and Unsubscribe a client can perform.
	//
	// Will default to 10.
	ParallelSubscribes int

	// InflightMessages may be set during Setup to control the number of
	// inflight messages from the broker to the client. This also defines how
	// many outgoing packets are stored in the client's session.
	//
	// Will default to 10.
	InflightMessages int

	// TokenTimeout sets the timeout after which the client should fail when
	// obtaining publish, subscribe and dequeue tokens in order to prevent
	// potential deadlocks.
	//
	// Will default to 30 seconds.
	TokenTimeout time.Duration

	// PacketCallback can be set to inspect packets before processing and
	// apply rate limits. To guarantee the connection lifecycle, Connect and
	// Disconnect packets are not provided to the callback.
	PacketCallback func(packet.Generic) error

	// Ref can be used by the backend to attach a custom object to the client.
	Ref interface{}
	// contains filtered or unexported fields
}

A Client represents a remote client that is connected to the broker.

func NewClient added in v0.6.0

func NewClient(backend Backend, conn transport.Conn) *Client

NewClient takes over a connection and returns a Client.

func (*Client) Close

func (c *Client) Close()

Close will immediately close the client.

func (*Client) Closed added in v0.6.0

func (c *Client) Closed() <-chan struct{}

Closed returns a channel that is closed when the client is closed.

func (*Client) Closing added in v0.6.0

func (c *Client) Closing() <-chan struct{}

Closing returns a channel that is closed when the client is closing.

func (*Client) Conn added in v0.6.0

func (c *Client) Conn() transport.Conn

Conn returns the client's underlying connection. Calls to SetReadLimit, LocalAddr and RemoteAddr are safe.

func (*Client) ID added in v0.6.0

func (c *Client) ID() string

ID returns the clients id that has been supplied during connect.

func (*Client) Session

func (c *Client) Session() Session

Session returns the current Session used by the client.

type Engine

type Engine struct {
	// The Backend that will be passed to accepted clients.
	Backend Backend

	// ReadLimit defines the initial read limit.
	ReadLimit int64

	// MaxWriteDelay defines the initial max write delay.
	MaxWriteDelay time.Duration

	// ConnectTimeout defines the timeout to receive the first packet.
	ConnectTimeout time.Duration

	// OnError can be used to receive errors from the engine. If an error is
	// received the server should be restarted.
	OnError func(error)
	// contains filtered or unexported fields
}

The Engine handles incoming connections and connects them to the backend.

func NewEngine

func NewEngine(backend Backend) *Engine

NewEngine returns a new Engine.

func (*Engine) Accept

func (e *Engine) Accept(server transport.Server)

Accept begins accepting connections from the passed server.

func (*Engine) Close

func (e *Engine) Close()

Close will stop handling incoming connections and close all acceptors. The call will block until all acceptors returned.

Note: All passed servers to Accept must be closed before calling this method.

func (*Engine) Handle

func (e *Engine) Handle(conn transport.Conn) bool

Handle takes over responsibility and handles a transport.Conn. It returns false if the engine is closing and the connection has been closed.

type LogEvent

type LogEvent string

LogEvent denotes the class of an event passed to the logger.

const (
	// NewConnection is emitted when a client comes online.
	NewConnection LogEvent = "new connection"

	// PacketReceived is emitted when a packet has been received.
	PacketReceived LogEvent = "packet received"

	// MessagePublished is emitted after a message has been published.
	MessagePublished LogEvent = "message published"

	// MessageAcknowledged is emitted after a message has been acknowledged.
	MessageAcknowledged LogEvent = "message acknowledged"

	// MessageDequeued is emitted after a message has been dequeued.
	MessageDequeued LogEvent = "message dequeued"

	// MessageForwarded is emitted after a message has been forwarded.
	MessageForwarded LogEvent = "message forwarded"

	// PacketSent is emitted when a packet has been sent.
	PacketSent LogEvent = "packet sent"

	// ClientDisconnected is emitted when a client disconnects cleanly.
	ClientDisconnected LogEvent = "client disconnected"

	// TransportError is emitted when an underlying transport error occurs.
	TransportError LogEvent = "transport error"

	// SessionError is emitted when a call to the session fails.
	SessionError LogEvent = "session error"

	// BackendError is emitted when a call to the backend fails.
	BackendError LogEvent = "backend error"

	// ClientError is emitted when the client violates the protocol.
	ClientError LogEvent = "client error"

	// LostConnection is emitted when the connection has been terminated.
	LostConnection LogEvent = "lost connection"
)

type MemoryBackend

type MemoryBackend struct {
	// The size of the session queue.
	SessionQueueSize int

	// The time after an error is returned while waiting on a killed existing
	// client to exit.
	KillTimeout time.Duration

	// Client configuration options. See broker.Client for details.
	ClientMaximumKeepAlive   time.Duration
	ClientParallelPublishes  int
	ClientParallelSubscribes int
	ClientInflightMessages   int
	ClientTokenTimeout       time.Duration

	// A map of username and passwords that grant read and write access.
	Credentials map[string]string

	// The Logger callback handles incoming log events.
	Logger func(LogEvent, *Client, packet.Generic, *packet.Message, error)
	// contains filtered or unexported fields
}

A MemoryBackend stores everything in memory.

func NewMemoryBackend

func NewMemoryBackend() *MemoryBackend

NewMemoryBackend returns a new MemoryBackend.

func (*MemoryBackend) Authenticate

func (m *MemoryBackend) Authenticate(_ *Client, user, password string) (bool, error)

Authenticate will authenticate a clients credentials.

func (*MemoryBackend) Close added in v0.6.0

func (m *MemoryBackend) Close(timeout time.Duration) bool

Close will close all active clients and close the backend. The return value denotes if the timeout has been reached.

func (*MemoryBackend) Dequeue added in v0.6.0

func (m *MemoryBackend) Dequeue(client *Client) (*packet.Message, Ack, error)

Dequeue will get the next message from the temporary or stored queue.

func (*MemoryBackend) Log added in v0.7.2

func (m *MemoryBackend) Log(event LogEvent, client *Client, pkt packet.Generic, msg *packet.Message, err error)

Log will call the associated logger.

func (*MemoryBackend) Publish

func (m *MemoryBackend) Publish(client *Client, msg *packet.Message, ack Ack) error

Publish will handle retained messages and add the message to the session queues.

func (*MemoryBackend) Restore added in v0.7.0

func (m *MemoryBackend) Restore(*Client) error

Restore is not needed at the moment.

func (*MemoryBackend) Setup

func (m *MemoryBackend) Setup(client *Client, id string, clean bool) (Session, bool, error)

Setup will close existing clients and return an appropriate session.

func (*MemoryBackend) Subscribe

func (m *MemoryBackend) Subscribe(client *Client, subs []packet.Subscription, ack Ack) error

Subscribe will store the subscription and queue retained messages.

func (*MemoryBackend) Terminate

func (m *MemoryBackend) Terminate(client *Client) error

Terminate will disassociate the session from the client.

func (*MemoryBackend) Unsubscribe

func (m *MemoryBackend) Unsubscribe(client *Client, topics []string, ack Ack) error

Unsubscribe will delete the subscription.

type Session

type Session interface {
	// NextID should return the next id for outgoing packets.
	NextID() packet.ID

	// SavePacket should store a packet in the session. An eventual existing
	// packet with the same id should be quietly overwritten.
	SavePacket(session.Direction, packet.Generic) error

	// LookupPacket should retrieve a packet from the session using the packet id.
	LookupPacket(session.Direction, packet.ID) (packet.Generic, error)

	// DeletePacket should remove a packet from the session. The method should
	// not return an error if no packet with the specified id does exist.
	DeletePacket(session.Direction, packet.ID) error

	// AllPackets should return all packets currently saved in the session.
	AllPackets(session.Direction) ([]packet.Generic, error)
}

A Session is used to get packet ids and persist incoming/outgoing packets.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL