Documentation ¶
Overview ¶
Package broker implements an extensible MQTT broker.
Example ¶
server, err := transport.Launch("tcp://localhost:8080") if err != nil { panic(err) } engine := NewEngine() engine.Accept(server) c := client.New() wait := make(chan struct{}) c.Callback = func(msg *packet.Message, err error) error { if err != nil { panic(err) } fmt.Println(msg.String()) close(wait) return nil } cf, err := c.Connect(client.NewConfig("tcp://localhost:8080")) if err != nil { panic(err) } cf.Wait(10 * time.Second) sf, err := c.Subscribe("test", 0) if err != nil { panic(err) } sf.Wait(10 * time.Second) pf, err := c.Publish("test", []byte("test"), 0, false) if err != nil { panic(err) } pf.Wait(10 * time.Second) <-wait err = c.Disconnect() if err != nil { panic(err) } err = server.Close() if err != nil { panic(err) } engine.Close()
Output: <Message Topic="test" QOS=0 Retain=false Payload=[116 101 115 116]>
Index ¶
- Variables
- func Run(engine *Engine, protocol string) (string, chan struct{}, chan struct{})
- type Backend
- type Client
- type Engine
- type LogEvent
- type Logger
- type MemoryBackend
- func (m *MemoryBackend) Authenticate(client *Client, user, password string) (bool, error)
- func (m *MemoryBackend) ClearRetained(client *Client, topic string) error
- func (m *MemoryBackend) Publish(client *Client, msg *packet.Message) error
- func (m *MemoryBackend) QueueOffline(client *Client) error
- func (m *MemoryBackend) QueueRetained(client *Client, topic string) error
- func (m *MemoryBackend) Setup(client *Client, id string) (Session, bool, error)
- func (m *MemoryBackend) StoreRetained(client *Client, msg *packet.Message) error
- func (m *MemoryBackend) Subscribe(client *Client, sub *packet.Subscription) error
- func (m *MemoryBackend) Terminate(client *Client) error
- func (m *MemoryBackend) Unsubscribe(client *Client, topic string) error
- type MessageQueue
- type Session
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrExpectedConnect = errors.New("expected a ConnectPacket as the first packet")
ErrExpectedConnect is returned when the first received packet is not a ConnectPacket.
var ErrMissingSession = errors.New("no session returned from Backend")
ErrMissingSession is returned if the backend does not return a session.
var ErrNotAuthorized = errors.New("client is not authorized")
ErrNotAuthorized is returned when a client is not authorized.
Functions ¶
Types ¶
type Backend ¶
type Backend interface { // Authenticate should authenticate the client using the user and password // values and return true if the client is eligible to continue or false // when the broker should terminate the connection. Authenticate(client *Client, user, password string) (bool, error) // Setup is called when a new client comes online and is successfully // authenticated. Setup should return the already stored session for the // supplied id or create and return a new one. If the supplied id has a zero // length, a new temporary session should returned that is not stored // further. The backend may also close any existing clients that use the // same client id. // // Note: In this call the Backend may also allocate other resources and // setup the client for further usage as the broker will acknowledge the // connection when the call returns. Setup(client *Client, id string) (Session, bool, error) // QueueOffline is called after the clients stored subscriptions have been // resubscribed. It should be used to trigger a background process that // forwards all missed messages. QueueOffline(*Client) error // Subscribe should subscribe the passed client to the specified topic and // call Publish with any incoming messages. Subscribe(*Client, *packet.Subscription) error // Unsubscribe should unsubscribe the passed client from the specified topic. Unsubscribe(client *Client, topic string) error // StoreRetained should store the specified message. StoreRetained(*Client, *packet.Message) error // ClearRetained should remove the stored messages for the given topic. ClearRetained(client *Client, topic string) error // QueueRetained is called after acknowledging a subscription and should be // used to trigger a background process that forwards all retained messages. QueueRetained(client *Client, topic string) error // Publish should forward the passed message to all other clients that hold // a subscription that matches the messages topic. It should also add the // message to all sessions that have a matching offline subscription. Publish(*Client, *packet.Message) error // Terminate is called when the client goes offline. Terminate should // unsubscribe the passed client from all previously subscribed topics. The // backend may also convert a clients subscriptions to offline subscriptions. // // Note: The Backend may also cleanup previously allocated resources for // that client as the broker will close the connection when the call // returns. Terminate(*Client) error }
A Backend provides the effective brokering functionality to its clients.
type Client ¶
type Client struct { // Ref can be used to store a custom reference to an object. This is usually // used to attach a state object to client that is created in the Backend. Ref interface{} // contains filtered or unexported fields }
A Client represents a remote client that is connected to the broker.
func (*Client) CleanSession ¶
CleanSession returns whether the client requested a clean session during connect.
func (*Client) Close ¶
Close will immediately close the connection. When clean=true the client will be marked as cleanly disconnected, and the will message will not get dispatched.
func (*Client) RemoteAddr ¶
RemoteAddr returns the client's remote net address from the underlying connection.
type Engine ¶
type Engine struct { Backend Backend Logger Logger ConnectTimeout time.Duration DefaultReadLimit int64 // contains filtered or unexported fields }
The Engine handles incoming connections and connects them to the backend.
func NewEngine ¶
func NewEngine() *Engine
NewEngine returns a new Engine with a basic MemoryBackend.
func NewEngineWithBackend ¶
NewEngineWithBackend returns a new Engine with a custom Backend.
func (*Engine) Close ¶
func (e *Engine) Close()
Close will stop handling incoming connections and close all current clients. The call will block until all clients are properly closed.
Note: All passed servers to Accept must be closed before calling this method.
type LogEvent ¶
type LogEvent int
LogEvent are received by a Logger.
const ( // NewConnection is emitted when a client comes online. NewConnection LogEvent = iota // PacketReceived is emitted when a packet has been received. PacketReceived // MessagePublished is emitted after a message has been published. MessagePublished // MessageForwarded is emitted after a message has been forwarded. MessageForwarded // PacketSent is emitted when a packet has been sent. PacketSent // LostConnection is emitted when the connection has been terminated. LostConnection // TransportError is emitted when an underlying transport error occurs. TransportError // SessionError is emitted when a call to the session fails. SessionError // BackendError is emitted when a call to the backend fails. BackendError // ClientError is emitted when the client violates the protocol. ClientError )
type MemoryBackend ¶
type MemoryBackend struct { Credentials map[string]string // contains filtered or unexported fields }
A MemoryBackend stores everything in memory.
func NewMemoryBackend ¶
func NewMemoryBackend() *MemoryBackend
NewMemoryBackend returns a new MemoryBackend.
func (*MemoryBackend) Authenticate ¶
func (m *MemoryBackend) Authenticate(client *Client, user, password string) (bool, error)
Authenticate authenticates a clients credentials by matching them to the saved Credentials map.
func (*MemoryBackend) ClearRetained ¶
func (m *MemoryBackend) ClearRetained(client *Client, topic string) error
ClearRetained will remove the stored messages for the given topic.
func (*MemoryBackend) Publish ¶
func (m *MemoryBackend) Publish(client *Client, msg *packet.Message) error
Publish will forward the passed message to all other subscribed clients. It will also add the message to all sessions that have a matching offline subscription.
func (*MemoryBackend) QueueOffline ¶
func (m *MemoryBackend) QueueOffline(client *Client) error
QueueOffline will begin with forwarding all missed messages in a separate goroutine.
func (*MemoryBackend) QueueRetained ¶
func (m *MemoryBackend) QueueRetained(client *Client, topic string) error
QueueRetained will queue all retained messages matching the given topic.
func (*MemoryBackend) Setup ¶
Setup returns the already stored session for the supplied id or creates and returns a new one. If the supplied id has a zero length, a new session is returned that is not stored further. Furthermore, it will disconnect any client connected with the same client id.
func (*MemoryBackend) StoreRetained ¶
func (m *MemoryBackend) StoreRetained(client *Client, msg *packet.Message) error
StoreRetained will store the specified message.
func (*MemoryBackend) Subscribe ¶
func (m *MemoryBackend) Subscribe(client *Client, sub *packet.Subscription) error
Subscribe will subscribe the passed client to the specified topic and begin to forward messages by calling the clients Publish method.
func (*MemoryBackend) Terminate ¶
func (m *MemoryBackend) Terminate(client *Client) error
Terminate will unsubscribe the passed client from all previously subscribed topics. If the client connect with clean=true it will also clean the session. Otherwise it will create offline subscriptions for all QOS 1 and QOS 2 subscriptions.
func (*MemoryBackend) Unsubscribe ¶
func (m *MemoryBackend) Unsubscribe(client *Client, topic string) error
Unsubscribe will unsubscribe the passed client from the specified topic.
type MessageQueue ¶ added in v0.5.0
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue is a basic FIFO queue for messages.
func NewMessageQueue ¶ added in v0.5.0
func NewMessageQueue(size int) *MessageQueue
NewMessageQueue returns a new MessageQueue. If size is greater than zero the queue will not grow more than the defined size.
func (*MessageQueue) Len ¶ added in v0.5.0
func (q *MessageQueue) Len() int
Len returns the length of the queue.
func (*MessageQueue) Pop ¶ added in v0.5.0
func (q *MessageQueue) Pop() *packet.Message
Pop removes and returns a message from the queue in first to last order.
func (*MessageQueue) Push ¶ added in v0.5.0
func (q *MessageQueue) Push(msg *packet.Message)
Push adds a message to the queue.
func (*MessageQueue) Range ¶ added in v0.5.0
func (q *MessageQueue) Range(fn func(*packet.Message) bool)
Range will call range with the contents of the queue. If fn returns false the operation is stopped immediately.
func (*MessageQueue) Reset ¶ added in v0.5.0
func (q *MessageQueue) Reset()
Reset returns and removes all messages from the queue.
type Session ¶
type Session interface { // NextID should return the next id for outgoing packets. NextID() packet.ID // SavePacket should store a packet in the session. An eventual existing // packet with the same id should be quietly overwritten. SavePacket(session.Direction, packet.GenericPacket) error // LookupPacket should retrieve a packet from the session using the packet id. LookupPacket(session.Direction, packet.ID) (packet.GenericPacket, error) // DeletePacket should remove a packet from the session. The method should // not return an error if no packet with the specified id does exists. DeletePacket(session.Direction, packet.ID) error // AllPackets should return all packets currently saved in the session. This // method is used to resend stored packets when the session is resumed. AllPackets(session.Direction) ([]packet.GenericPacket, error) // SaveSubscription should store the subscription in the session. An eventual // subscription with the same topic should be quietly overwritten. SaveSubscription(*packet.Subscription) error // LookupSubscription should match a topic against the stored subscriptions // and eventually return the first found subscription. LookupSubscription(topic string) (*packet.Subscription, error) // DeleteSubscription should remove the subscription from the session. The // method should not return an error if no subscription with the specified // topic does exist. DeleteSubscription(topic string) error // AllSubscriptions should return all subscriptions currently saved in the // session. This method is used to restore a clients subscriptions when the // session is resumed. AllSubscriptions() ([]*packet.Subscription, error) // SaveWill should store the will message. SaveWill(*packet.Message) error // LookupWill should retrieve the will message. LookupWill() (*packet.Message, error) // ClearWill should remove the will message from the store. ClearWill() error // Reset should completely reset the session. Reset() error }
A Session is used to persist incoming/outgoing packets, subscriptions and the will.