broker

package
v0.0.0-...-2b98a61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2014 License: EPL-1.0 Imports: 15 Imported by: 1

Documentation

Index

Constants

View Source
const (
	CONNECT     = byte(1)
	CONNACK     = byte(2)
	PUBLISH     = byte(3)
	PUBACK      = byte(4)
	PUBREC      = byte(5)
	PUBREL      = byte(6)
	PUBCOMP     = byte(7)
	SUBSCRIBE   = byte(8)
	SUBACK      = byte(9)
	UNSUBSCRIBE = byte(10)
	UNSUBACK    = byte(11)
	PINGREQ     = byte(12)
	PINGRESP    = byte(13)
	DISCONNECT  = byte(14)
)
View Source
const (
	CONN_ACCEPTED           = 0x00
	CONN_REF_BAD_PROTO_VER  = 0x01
	CONN_REF_ID_REJ         = 0x02
	CONN_REF_SERV_UNAVAIL   = 0x03
	CONN_REF_BAD_USER_PASS  = 0x04
	CONN_REF_NOT_AUTH       = 0x05
	CONN_PROTOCOL_VIOLATION = 0xFF
)

Variables

View Source
var (
	INFO     *log.Logger
	PROTOCOL *log.Logger
	ERROR    *log.Logger
	DEBUG    *log.Logger
)

loggers

Functions

func NewMsgQueue

func NewMsgQueue(limit int) *msgQueue

Types

type Client

type Client struct {
	sync.WaitGroup
	MessageIds
	// contains filtered or unexported fields
}

func NewClient

func NewClient(conn net.Conn, bufferedConn *bufio.ReadWriter, clientId string, maxQDepth int) *Client

func (*Client) AddSubscription

func (c *Client) AddSubscription(topics []string, qoss []byte) []byte

Add a subscription for a client, taking an array of topics to subscribe to and an associated slice of QoS values for the topics, return a slice of byte values indicating the granted QoS values in topics order.

func (*Client) Connected

func (c *Client) Connected() bool

func (*Client) HandleFlow

func (c *Client) HandleFlow(msg ControlPacket, hrotti *Hrotti)

func (*Client) KeepAliveTimer

func (c *Client) KeepAliveTimer(hrotti *Hrotti)

func (*Client) Receive

func (c *Client) Receive(hrotti *Hrotti)

func (*Client) RemoveSubscription

func (c *Client) RemoveSubscription(topic string) bool

func (*Client) ResetTimer

func (c *Client) ResetTimer()

func (*Client) Send

func (c *Client) Send(hrotti *Hrotti)

func (*Client) SetRootNode

func (c *Client) SetRootNode(node *Node)

func (*Client) Start

func (c *Client) Start(cp *connectPacket, hrotti *Hrotti)

func (*Client) Stop

func (c *Client) Stop(sendWill bool, hrotti *Hrotti)

func (*Client) StopForTakeover

func (c *Client) StopForTakeover()

type Clients

type Clients struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

A map of clientid to Client pointer and a RW Mutex to protect access.

func NewClients

func NewClients() Clients

Return empty Clients (value type)

type ControlPacket

type ControlPacket interface {
	Pack() []byte
	Unpack([]byte)
	Type() uint8
	SetMsgId(msgId)
	MsgId() msgId
	String() string
	QoS() byte
}

func New

func New(packetType byte) ControlPacket

type Entry

type Entry struct {
	Client *Client
	Qos    byte
}

type FixedHeader

type FixedHeader struct {
	MessageType byte
	Dup         byte
	Qos         byte
	Retain      byte
	// contains filtered or unexported fields
}

func (FixedHeader) String

func (fh FixedHeader) String() string

type Hrotti

type Hrotti struct {
	// contains filtered or unexported fields
}

func NewHrotti

func NewHrotti(maxQueueDepth int) *Hrotti

func (*Hrotti) AddListener

func (h *Hrotti) AddListener(name string, config *ListenerConfig) error

func (*Hrotti) InitClient

func (h *Hrotti) InitClient(conn net.Conn)

func (*Hrotti) Stop

func (h *Hrotti) Stop()

type ListenerConfig

type ListenerConfig struct {
	URL *url.URL `json:"url"`
	// contains filtered or unexported fields
}

func NewListenerConfig

func NewListenerConfig(rawURL string) *ListenerConfig

type MemoryPersistence

type MemoryPersistence struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

the MemoryPersistence struct is a map of Client pointers to pointers to a Persistence Entry. So each Client has its own map of msgIds/packets.

func NewMemoryPersistence

func NewMemoryPersistence() *MemoryPersistence

func (*MemoryPersistence) Add

func (p *MemoryPersistence) Add(client *Client, message ControlPacket) bool

func (*MemoryPersistence) AddBatch

func (p *MemoryPersistence) AddBatch(batch map[*Client]*publishPacket)

func (*MemoryPersistence) Close

func (p *MemoryPersistence) Close(client *Client)

func (*MemoryPersistence) Delete

func (p *MemoryPersistence) Delete(client *Client, id msgId) bool

func (*MemoryPersistence) Exists

func (p *MemoryPersistence) Exists(client *Client) bool

func (*MemoryPersistence) GetAll

func (p *MemoryPersistence) GetAll(client *Client) (messages []ControlPacket)

func (*MemoryPersistence) Open

func (p *MemoryPersistence) Open(client *Client)

func (*MemoryPersistence) Replace

func (p *MemoryPersistence) Replace(client *Client, message ControlPacket) bool

type MemoryPersistenceEntry

type MemoryPersistenceEntry struct {
	sync.Mutex
	// contains filtered or unexported fields
}

a persistence entry is a map of msgIds and ControlPackets

type MessageIds

type MessageIds struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type Node

type Node struct {
	sync.RWMutex
	Name     string
	HashSub  map[*Client]byte
	Sub      map[*Client]byte
	Nodes    map[string]*Node
	Retained *publishPacket
}

func NewNode

func NewNode(name string) *Node

function to create a new Node with the name; name.

func (*Node) AddSub

func (n *Node) AddSub(client *Client, subscription []string, qos byte, complete chan byte)

func (*Node) DeleteSub

func (n *Node) DeleteSub(client *Client, subscription []string, complete chan bool)

func (*Node) DeleteSubAll

func (n *Node) DeleteSubAll(client *Client)

func (*Node) DeliverMessage

func (n *Node) DeliverMessage(topic []string, message *publishPacket, hrotti *Hrotti)

func (*Node) FindRecipients

func (n *Node) FindRecipients(topic []string, recipients chan *Entry, wg *sync.WaitGroup)

func (*Node) FindRetainedForPlus

func (n *Node) FindRetainedForPlus(client *Client, subscription []string)

func (*Node) Print

func (n *Node) Print(prefix string) string

func (*Node) SendRetainedRecursive

func (n *Node) SendRetainedRecursive(client *Client)

func (*Node) SetRetained

func (n *Node) SetRetained(topic []string, message *publishPacket)

type Persistence

type Persistence interface {
	Open(*Client)
	Close(*Client)
	Add(*Client, ControlPacket) bool
	Replace(*Client, ControlPacket) bool
	AddBatch(map[*Client]*publishPacket)
	Delete(*Client, msgId) bool
	GetAll(*Client) []ControlPacket
	Exists(*Client) bool
}

type PersistenceBatchEntry

type PersistenceBatchEntry struct {
	// contains filtered or unexported fields
}

type State

type State struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*State) SetValue

func (s *State) SetValue(value StateVal)

func (*State) Value

func (s *State) Value() StateVal

type StateVal

type StateVal uint8
const (
	DISCONNECTED  StateVal = 0x00
	CONNECTING    StateVal = 0x01
	CONNECTED     StateVal = 0x02
	DISCONNECTING StateVal = 0x03
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL