broker

package
v0.0.0-...-231ab7b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2020 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidTopicName = errors.New("invalid topic name/filter")

ErrInvalidTopicName is returned whenever a topic name or filter is invalid

Functions

func ParseTopicName

func ParseTopicName(b []byte) ([]string, error)

ParseTopicName parses a given bytes slice into a slice of strings each representing a topic level. For use mainly with Publish packets which should not contain wildcards.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker encapsulates all the functionality of a MQTT broker plus rules. It also holds shared resources such as topics or client IDs that've been issued

func NewBroker

func NewBroker() *Broker

NewBroker returns a fresh instance of a Broker

func (*Broker) Close

func (b *Broker) Close()

Close is an implementation of the server's ConnHandler.Close. It's expected that the server instance will invoke Close when it too is closed however, Close is safe to call multiple times. Once closed, the broker will not accept any more connections. It's expected that each underlying client session will also gracefully shut down

func (*Broker) OnConn

func (b *Broker) OnConn(conn net.Conn)

OnConn is an implementation of the server's ConnHandler.OnConn Should be called whenever there's a new connection. If the connection is valid, it is elevated into a ClientSession. If it's invalid or an error occurs such as a protocol violation, the connection is closed

type Feed

type Feed struct {
	// contains filtered or unexported fields
}

Feed ...

func NewFeed

func NewFeed(topic string) *Feed

NewFeed ...

func (*Feed) Publish

func (f *Feed) Publish(ctx context.Context, rawPkt *p.PublishPacket) (nSent int)

Publish ...

func (*Feed) Subscribe

func (f *Feed) Subscribe(ch chan<- PublishEvent) *Subscription

Subscribe ...

type MatchType

type MatchType byte

MatchType indicates whether a match is an exact match(string), single-level or multi-level

const (
	ExactMatch MatchType = iota
	SingleLevelMatch
	MultiLevelMatch
)

ExactMatch etc, see MatchType documentation entry

type PublishEvent

type PublishEvent struct {
	Topic  string
	RawPkt *p.PublishPacket
}

PublishEvent holds a publish event. Note that the topic might not the topic in the raw packet particularly in the case where the client subscribed using wildcard(s).

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription ...

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

Unsubscribe ...

type TopicMap

type TopicMap struct {
	// contains filtered or unexported fields
}

TopicMap holds a specialized map of topics to feeds through which subscribers can receive publish events. Given a topic T, eg during a publish event, it efficiently finds all topics (both exact and thos with wildcards) that match the given topic and returns the subsequent Feeds. If the topic T has l levels, then finding matching topics is O(l) regardless of the entire length of the topic name or number of matching topics This is under the assumption that hashing the string of a single level of constant time. TopicMap is concurrency safe, ie can be accessed safely from multiple concurrent goroutines.

func NewTopicMap

func NewTopicMap() TopicMap

NewTopicMap returns an instance of a topic map for holding topics with wildcards

func (TopicMap) GetFeedsThatMatchTopic

func (m TopicMap) GetFeedsThatMatchTopic(topicTokens []TopicToken) []*Feed

GetFeedsThatMatchTopic The given topic should be an exact topic match, ie, it should not have any wildcards. For use mainly when a publish packet arrives and one needs to check whether a subscriber qualifies to receive it on the basis of a topic they supplied earlier which contained wildcards. topicTokens should consist of valid tokens of a parsed topic and should not contain any wildcards since this function is meant to be used when one is publishing a publish packet. It is expected that the caller uses the helper function ParseTopicName to check for possible errors and retrieve valid tokens

func (TopicMap) InitFeedByTopic

func (m TopicMap) InitFeedByTopic(topic string, tokens []TopicToken) (*Feed, bool)

InitFeedByTopic ensures that the feed for a given topic is instantiated. Note that a given topic should have at least 1 token as per the MQTT requirements. The bool returned indicates whether the feed was already present (true) or it's just been instantiated (false). Mainly there for debugging/testing.For simplicity, one should use the ParseTopic helper to parse a given topic name, check for errors then retrieve the appropriate arguments to pass to the function

func (TopicMap) RemoveFeedByTopic

func (m TopicMap) RemoveFeedByTopic(topic string, tokens []TopicToken) *Feed

RemoveFeedByTopic removes a given feed. It's better to keep feeds in place rather than remove them frequently. However, the function is provided for situations whereby there are multiple feeds but each is sparsely and infrequently used. A Nil feed is returned if the feed was not present to begin with. For simplicity, one should use the ParseTopic helper to parse a given topic name, check for errors then retrieve the appropriate arguments to pass to the function

func (TopicMap) TraverseAll

func (m TopicMap) TraverseAll(fn func(int, *node))

TraverseAll .For debugging mostly

type TopicToken

type TopicToken struct {
	Value     string
	MatchType MatchType
}

TopicToken holds a match for a topic filter plust whether it's a plain string or a wildcard

func ParseTopic

func ParseTopic(b []byte) (tokens []TopicToken, hasWildcard bool, err error)

ParseTopic parses a given bytes slice into a slice of topic filters each representing a topic level. For use mainly with Subscribe/Unsubscribe packets which might contain wildcards.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL