broker

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2018 License: Apache-2.0 Imports: 10 Imported by: 10

Documentation

Overview

Package broker implements an extensible MQTT broker.

Example
server, err := transport.Launch("tcp://localhost:8080")
if err != nil {
	panic(err)
}

engine := NewEngine()
engine.Accept(server)

c := client.New()
wait := make(chan struct{})

c.Callback = func(msg *packet.Message, err error) error {
	if err != nil {
		panic(err)
	}

	fmt.Println(msg.String())
	close(wait)
	return nil
}

cf, err := c.Connect(client.NewConfig("tcp://localhost:8080"))
if err != nil {
	panic(err)
}

cf.Wait(10 * time.Second)

sf, err := c.Subscribe("test", 0)
if err != nil {
	panic(err)
}

sf.Wait(10 * time.Second)

pf, err := c.Publish("test", []byte("test"), 0, false)
if err != nil {
	panic(err)
}

pf.Wait(10 * time.Second)

<-wait

err = c.Disconnect()
if err != nil {
	panic(err)
}

err = server.Close()
if err != nil {
	panic(err)
}

engine.Close()
Output:

<Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrExpectedConnect = errors.New("expected a ConnectPacket as the first packet")

ErrExpectedConnect is returned when the first received packet is not a ConnectPacket.

View Source
var ErrMissingSession = errors.New("no session returned from Backend")

ErrMissingSession is returned if the backend does not return a session.

Functions

func Run

func Run(engine *Engine, protocol string) (string, chan struct{}, chan struct{})

Run runs the passed engine on a random available port and returns a channel that can be closed to shutdown the engine. This method is intended to be used in testing scenarios.

Types

type Backend

type Backend interface {
	// Authenticate should authenticate the client using the user and password
	// values and return true if the client is eligible to continue or false
	// when the broker should terminate the connection.
	Authenticate(client *Client, user, password string) (bool, error)

	// Setup is called when a new client comes online and is successfully
	// authenticated. Setup should return the already stored session for the
	// supplied id or create and return a new one. If the supplied id has a zero
	// length, a new temporary session should returned that is not stored
	// further. The backend may also close any existing clients that use the
	// same client id.
	//
	// Note: In this call the Backend may also allocate other resources and
	// setup the client for further usage as the broker will acknowledge the
	// connection when the call returns.
	Setup(client *Client, id string) (Session, bool, error)

	// QueueOffline is called after the clients stored subscriptions have been
	// resubscribed. It should be used to trigger a background process that
	// forwards all missed messages.
	QueueOffline(*Client) error

	// Subscribe should subscribe the passed client to the specified topic and
	// call Publish with any incoming messages.
	Subscribe(*Client, *packet.Subscription) error

	// Unsubscribe should unsubscribe the passed client from the specified topic.
	Unsubscribe(client *Client, topic string) error

	// StoreRetained should store the specified message.
	StoreRetained(*Client, *packet.Message) error

	// ClearRetained should remove the stored messages for the given topic.
	ClearRetained(client *Client, topic string) error

	// QueueRetained is called after acknowledging a subscription and should be
	// used to trigger a background process that forwards all retained messages.
	QueueRetained(client *Client, topic string) error

	// Publish should forward the passed message to all other clients that hold
	// a subscription that matches the messages topic. It should also add the
	// message to all sessions that have a matching offline subscription.
	Publish(*Client, *packet.Message) error

	// Terminate is called when the client goes offline. Terminate should
	// unsubscribe the passed client from all previously subscribed topics. The
	// backend may also convert a clients subscriptions to offline subscriptions.
	//
	// Note: The Backend may also cleanup previously allocated resources for
	// that client as the broker will close the connection when the call
	// returns.
	Terminate(*Client) error
}

A Backend provides the effective brokering functionality to its clients.

type Client

type Client struct {

	// Ref can be used to store a custom reference to an object. This is usually
	// used to attach a state object to client that is created in the Backend.
	Ref interface{}
	// contains filtered or unexported fields
}

A Client represents a remote client that is connected to the broker.

func (*Client) CleanSession

func (c *Client) CleanSession() bool

CleanSession returns whether the client requested a clean session during connect.

func (*Client) ClientID

func (c *Client) ClientID() string

ClientID returns the supplied client id during connect.

func (*Client) Close

func (c *Client) Close(clean bool)

Close will immediately close the connection. When clean=true the client will be marked as cleanly disconnected, and the will message will not get dispatched.

func (*Client) Publish

func (c *Client) Publish(msg *packet.Message) bool

Publish will send a Message to the client and initiate QOS flows.

func (*Client) RemoteAddr

func (c *Client) RemoteAddr() net.Addr

RemoteAddr returns the client's remote net address from the underlying connection.

func (*Client) Session

func (c *Client) Session() Session

Session returns the current Session used by the client.

type Engine

type Engine struct {
	Backend Backend
	Logger  Logger

	ConnectTimeout   time.Duration
	DefaultReadLimit int64
	// contains filtered or unexported fields
}

The Engine handles incoming connections and connects them to the backend.

func NewEngine

func NewEngine() *Engine

NewEngine returns a new Engine with a basic MemoryBackend.

func NewEngineWithBackend

func NewEngineWithBackend(backend Backend) *Engine

NewEngineWithBackend returns a new Engine with a custom Backend.

func (*Engine) Accept

func (e *Engine) Accept(server transport.Server)

Accept begins accepting connections from the passed server.

func (*Engine) Clients

func (e *Engine) Clients() []*Client

Clients returns a current list of connected clients.

func (*Engine) Close

func (e *Engine) Close()

Close will stop handling incoming connections and close all current clients. The call will block until all clients are properly closed.

Note: All passed servers to Accept must be closed before calling this method.

func (*Engine) Handle

func (e *Engine) Handle(conn transport.Conn) bool

Handle takes over responsibility and handles a transport.Conn. It returns false if the engine is closing and the connection has been closed.

func (*Engine) Wait

func (e *Engine) Wait(timeout time.Duration) bool

Wait can be called after close to wait until all clients have been closed. The method returns whether all clients have been closed (true) or the timeout has been reached (false).

type LogEvent

type LogEvent int

LogEvent are received by a Logger.

const (
	// NewConnection is emitted when a client comes online.
	NewConnection LogEvent = iota

	// PacketReceived is emitted when a packet has been received.
	PacketReceived

	// MessagePublished is emitted after a message has been published.
	MessagePublished

	// MessageForwarded is emitted after a message has been forwarded.
	MessageForwarded

	// PacketSent is emitted when a packet has been sent.
	PacketSent

	// LostConnection is emitted when the connection has been terminated.
	LostConnection

	// TransportError is emitted when an underlying transport error occurs.
	TransportError

	// SessionError is emitted when a call to the session fails.
	SessionError

	// BackendError is emitted when a call to the backend fails.
	BackendError

	// ClientError is emitted when the client violates the protocol.
	ClientError
)

type Logger

The Logger callback handles incoming log messages.

type MemoryBackend

type MemoryBackend struct {
	Credentials map[string]string
	// contains filtered or unexported fields
}

A MemoryBackend stores everything in memory.

func NewMemoryBackend

func NewMemoryBackend() *MemoryBackend

NewMemoryBackend returns a new MemoryBackend.

func (*MemoryBackend) Authenticate

func (m *MemoryBackend) Authenticate(client *Client, user, password string) (bool, error)

Authenticate authenticates a clients credentials by matching them to the saved Credentials map.

func (*MemoryBackend) ClearRetained

func (m *MemoryBackend) ClearRetained(client *Client, topic string) error

ClearRetained will remove the stored messages for the given topic.

func (*MemoryBackend) Publish

func (m *MemoryBackend) Publish(client *Client, msg *packet.Message) error

Publish will forward the passed message to all other subscribed clients. It will also add the message to all sessions that have a matching offline subscription.

func (*MemoryBackend) QueueOffline

func (m *MemoryBackend) QueueOffline(client *Client) error

QueueOffline will begin with forwarding all missed messages in a separate goroutine.

func (*MemoryBackend) QueueRetained

func (m *MemoryBackend) QueueRetained(client *Client, topic string) error

QueueRetained will queue all retained messages matching the given topic.

func (*MemoryBackend) Setup

func (m *MemoryBackend) Setup(client *Client, id string) (Session, bool, error)

Setup returns the already stored session for the supplied id or creates and returns a new one. If the supplied id has a zero length, a new session is returned that is not stored further. Furthermore, it will disconnect any client connected with the same client id.

func (*MemoryBackend) StoreRetained

func (m *MemoryBackend) StoreRetained(client *Client, msg *packet.Message) error

StoreRetained will store the specified message.

func (*MemoryBackend) Subscribe

func (m *MemoryBackend) Subscribe(client *Client, sub *packet.Subscription) error

Subscribe will subscribe the passed client to the specified topic and begin to forward messages by calling the clients Publish method.

func (*MemoryBackend) Terminate

func (m *MemoryBackend) Terminate(client *Client) error

Terminate will unsubscribe the passed client from all previously subscribed topics. If the client connect with clean=true it will also clean the session. Otherwise it will create offline subscriptions for all QOS 1 and QOS 2 subscriptions.

func (*MemoryBackend) Unsubscribe

func (m *MemoryBackend) Unsubscribe(client *Client, topic string) error

Unsubscribe will unsubscribe the passed client from the specified topic.

type MessageQueue added in v0.5.0

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue is a basic FIFO queue for messages.

func NewMessageQueue added in v0.5.0

func NewMessageQueue(size int) *MessageQueue

NewMessageQueue returns a new MessageQueue. If size is greater than zero the queue will not grow more than the defined size.

func (*MessageQueue) Len added in v0.5.0

func (q *MessageQueue) Len() int

Len returns the length of the queue.

func (*MessageQueue) Pop added in v0.5.0

func (q *MessageQueue) Pop() *packet.Message

Pop removes and returns a message from the queue in first to last order.

func (*MessageQueue) Push added in v0.5.0

func (q *MessageQueue) Push(msg *packet.Message)

Push adds a message to the queue.

func (*MessageQueue) Range added in v0.5.0

func (q *MessageQueue) Range(fn func(*packet.Message) bool)

Range will call range with the contents of the queue. If fn returns false the operation is stopped immediately.

func (*MessageQueue) Reset added in v0.5.0

func (q *MessageQueue) Reset()

Reset returns and removes all messages from the queue.

type Session

type Session interface {
	// NextID should return the next id for outgoing packets.
	NextID() packet.ID

	// SavePacket should store a packet in the session. An eventual existing
	// packet with the same id should be quietly overwritten.
	SavePacket(session.Direction, packet.GenericPacket) error

	// LookupPacket should retrieve a packet from the session using the packet id.
	LookupPacket(session.Direction, packet.ID) (packet.GenericPacket, error)

	// DeletePacket should remove a packet from the session. The method should
	// not return an error if no packet with the specified id does exists.
	DeletePacket(session.Direction, packet.ID) error

	// AllPackets should return all packets currently saved in the session. This
	// method is used to resend stored packets when the session is resumed.
	AllPackets(session.Direction) ([]packet.GenericPacket, error)

	// SaveSubscription should store the subscription in the session. An eventual
	// subscription with the same topic should be quietly overwritten.
	SaveSubscription(*packet.Subscription) error

	// LookupSubscription should match a topic against the stored subscriptions
	// and eventually return the first found subscription.
	LookupSubscription(topic string) (*packet.Subscription, error)

	// DeleteSubscription should remove the subscription from the session. The
	// method should not return an error if no subscription with the specified
	// topic does exist.
	DeleteSubscription(topic string) error

	// AllSubscriptions should return all subscriptions currently saved in the
	// session. This method is used to restore a clients subscriptions when the
	// session is resumed.
	AllSubscriptions() ([]*packet.Subscription, error)

	// SaveWill should store the will message.
	SaveWill(*packet.Message) error

	// LookupWill should retrieve the will message.
	LookupWill() (*packet.Message, error)

	// ClearWill should remove the will message from the store.
	ClearWill() error

	// Reset should completely reset the session.
	Reset() error
}

A Session is used to persist incoming/outgoing packets, subscriptions and the will.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL