Documentation ¶
Overview ¶
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Copyright 2014, Shuhei Tanuma. All rights reserved. Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Index ¶
- Constants
- Variables
- type Application
- type Authenticator
- type DisconnectError
- type DummyPlug
- func (self *DummyPlug) AppendSubscribedTopic(string, *SubscribeSet)
- func (self *DummyPlug) Close() error
- func (self *DummyPlug) DisableCleanSession()
- func (self *DummyPlug) GetId() string
- func (self *DummyPlug) GetOutGoingTable() *util.MessageTable
- func (self *DummyPlug) GetRealId() string
- func (self *DummyPlug) GetState() State
- func (self *DummyPlug) GetSubscribedTopics() map[string]*SubscribeSet
- func (self *DummyPlug) GetWillMessage() *mqtt.WillMessage
- func (self *DummyPlug) HasWillMessage() bool
- func (self *DummyPlug) IsAlived() bool
- func (self *DummyPlug) IsBridge() bool
- func (self *DummyPlug) ReadMessage() (mqtt.Message, error)
- func (self *DummyPlug) RemoveSubscribedTopic(string)
- func (self *DummyPlug) ResetState()
- func (self *DummyPlug) Run()
- func (self *DummyPlug) SetId(id string)
- func (self *DummyPlug) SetKeepaliveInterval(int)
- func (self *DummyPlug) SetState(State)
- func (self *DummyPlug) SetWillMessage(mqtt.WillMessage)
- func (self *DummyPlug) ShouldCleanSession() bool
- func (self *DummyPlug) Stop()
- func (self *DummyPlug) WriteMessageQueue(request mqtt.Message)
- func (self *DummyPlug) WriteMessageQueue2(msg []byte)
- type EmptyAuthenticator
- type Handler
- func (self *Handler) Close()
- func (self *Handler) Disconnect()
- func (self *Handler) HandshakeInternal(p *codec.ConnectMessage)
- func (self *Handler) Parsed()
- func (self *Handler) Pingreq()
- func (self *Handler) Puback(messageId uint16)
- func (self *Handler) Pubcomp(messageId uint16)
- func (self *Handler) Publish(p *codec.PublishMessage)
- func (self *Handler) Pubrec(messageId uint16)
- func (self *Handler) Pubrel(messageId uint16)
- func (self *Handler) Subscribe(p *codec.SubscribeMessage)
- func (self *Handler) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload)
- type HttpListener
- type HttpServer
- type Listener
- type MmuxConnection
- func (self *MmuxConnection) AppendSubscribedTopic(topic string, set *SubscribeSet)
- func (self *MmuxConnection) Attach(conn Connection)
- func (self *MmuxConnection) Close() error
- func (self *MmuxConnection) Detach(conn Connection, dummy *DummyPlug)
- func (self *MmuxConnection) DisableCleanSession()
- func (self *MmuxConnection) GetHash() uint32
- func (self *MmuxConnection) GetId() string
- func (self *MmuxConnection) GetOutGoingTable() *util.MessageTable
- func (self *MmuxConnection) GetRealId() string
- func (self *MmuxConnection) GetState() State
- func (self *MmuxConnection) GetSubscribedTopics() map[string]*SubscribeSet
- func (self *MmuxConnection) GetWillMessage() *mqtt.WillMessage
- func (self *MmuxConnection) HasWillMessage() bool
- func (self *MmuxConnection) IsAlived() bool
- func (self *MmuxConnection) IsBridge() bool
- func (self *MmuxConnection) IsSubscribed(topic string) bool
- func (self *MmuxConnection) ReadMessage() (mqtt.Message, error)
- func (self *MmuxConnection) RemoveSubscribedTopic(topic string)
- func (self *MmuxConnection) ResetState()
- func (self *MmuxConnection) SetId(id string)
- func (self *MmuxConnection) SetKeepaliveInterval(interval int)
- func (self *MmuxConnection) SetState(state State)
- func (self *MmuxConnection) SetWillMessage(msg mqtt.WillMessage)
- func (self *MmuxConnection) ShouldCleanSession() bool
- func (self *MmuxConnection) WriteMessageQueue(request mqtt.Message)
- type Momonga
- func (self *Momonga) Authenticate(user_id, password []byte) (bool, error)
- func (self *Momonga) CleanSubscription(conn Connection)
- func (self *Momonga) Config() *configuration.Config
- func (self *Momonga) DisableSys()
- func (self *Momonga) Doom()
- func (self *Momonga) GetConnectionByClientId(clientId string) (*MmuxConnection, error)
- func (self *Momonga) HandleConnection(conn Connection)
- func (self *Momonga) Handshake(p *codec.ConnectMessage, conn *MyConnection) *MmuxConnection
- func (self *Momonga) RemoveConnectionByClientId(clientId string)
- func (self *Momonga) RetainMatch(topic string) []*codec.PublishMessage
- func (self *Momonga) Run()
- func (self *Momonga) RunMaintenanceThread()
- func (self *Momonga) SendMessage(topic string, message []byte, qos int)
- func (self *Momonga) SendPublishMessage(msg *codec.PublishMessage, client_id string, is_bridged bool)
- func (self *Momonga) SendWillMessage(conn Connection)
- func (self *Momonga) SetConnectionByClientId(clientId string, conn *MmuxConnection)
- func (self *Momonga) Subscribe(p *codec.SubscribeMessage, conn Connection)
- func (self *Momonga) Terminate()
- func (self *Momonga) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload, ...)
- type MyBroker
- type MyClients
- type MyConn
- type MyHttpServer
- type MyListener
- type MyLoad
- type MyMessages
- type MyMetrics
- type MySystem
- type Retryable
- type Server
- type TcpServer
- type TlsServer
- type TopicMatcher
- type UnixServer
Constants ¶
const KILOBYTE = 1024
const MAX_REQUEST_SIZE = MEGABYTE * 2
const MEGABYTE = 1024 * KILOBYTE
Variables ¶
var ( V311_MAGIC = []byte("MQTT") V311_VERSION = uint8(4) V3_MAGIC = []byte("MQIsdp") V3_VERSION = uint8(3) )
Functions ¶
This section is empty.
Types ¶
type Application ¶
type Application struct { Engine *Momonga Servers []Server // contains filtered or unexported fields }
Application manages start / stop, singnal handling and listeners.
+-----------+ |APPLICATION| start / stop, signal handling +-----------+ | LISTENER | listen and accept +-----------+ | HANDLER | parse and execute commands +-----------+ | ENGINE | implements commands api +-----------+
func NewApplication ¶
func NewApplication(configPath string) *Application
func (*Application) Loop ¶
func (self *Application) Loop()
func (*Application) RegisterServer ¶
func (self *Application) RegisterServer(svr Server)
func (*Application) Start ¶
func (self *Application) Start()
func (*Application) Stop ¶
func (self *Application) Stop()
type Authenticator ¶
type Authenticator interface { Init(config *configuration.Config) Authenticate(user_id, password []byte) (bool, error) Shutdown() }
type DisconnectError ¶
type DisconnectError struct { }
func (*DisconnectError) Error ¶
func (e *DisconnectError) Error() string
type DummyPlug ¶
type DummyPlug struct { Identity string Switch chan bool Running bool // contains filtered or unexported fields }
func NewDummyPlug ¶
func (*DummyPlug) AppendSubscribedTopic ¶
func (*DummyPlug) DisableCleanSession ¶
func (self *DummyPlug) DisableCleanSession()
func (*DummyPlug) GetOutGoingTable ¶
func (self *DummyPlug) GetOutGoingTable() *util.MessageTable
func (*DummyPlug) GetSubscribedTopics ¶
func (*DummyPlug) GetWillMessage ¶
func (self *DummyPlug) GetWillMessage() *mqtt.WillMessage
func (*DummyPlug) HasWillMessage ¶
func (*DummyPlug) RemoveSubscribedTopic ¶
func (*DummyPlug) ResetState ¶
func (self *DummyPlug) ResetState()
func (*DummyPlug) SetKeepaliveInterval ¶
func (*DummyPlug) SetWillMessage ¶
func (self *DummyPlug) SetWillMessage(mqtt.WillMessage)
func (*DummyPlug) ShouldCleanSession ¶
func (*DummyPlug) WriteMessageQueue ¶
func (*DummyPlug) WriteMessageQueue2 ¶
type EmptyAuthenticator ¶
type EmptyAuthenticator struct { }
EmptyAuthenticator allows anything.
func (*EmptyAuthenticator) Authenticate ¶
func (self *EmptyAuthenticator) Authenticate(user_id, password []byte) (bool, error)
func (*EmptyAuthenticator) Init ¶
func (self *EmptyAuthenticator) Init(config *configuration.Config)
func (*EmptyAuthenticator) Shutdown ¶
func (self *EmptyAuthenticator) Shutdown()
type Handler ¶
type Handler struct { Engine *Momonga Connection Connection }
Handler dispatches messages which sent by client. this struct will be use client library soon.
とかいいつつ、ackとかはhandlerで返してねーとか立ち位置分かりづらい Engine側でMQTTの基本機能を全部やれればいいんだけど、そうすると client library別にしないと無理なんだよなー。 目指すところとしては、基本部分はデフォルトのHandlerで動くから それで動かないところだけうわがいてね!って所。 Handler自体は受け渡ししかやらんのでlockしなくて大丈夫なはず
func NewHandler ¶
func (*Handler) Disconnect ¶
func (self *Handler) Disconnect()
func (*Handler) HandshakeInternal ¶
func (self *Handler) HandshakeInternal(p *codec.ConnectMessage)
func (*Handler) Publish ¶
func (self *Handler) Publish(p *codec.PublishMessage)
func (*Handler) Subscribe ¶
func (self *Handler) Subscribe(p *codec.SubscribeMessage)
func (*Handler) Unsubscribe ¶
func (self *Handler) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload)
type HttpListener ¶
type HttpListener struct { net.Listener WebSocketMount string // contains filtered or unexported fields }
func NewHttpListener ¶
func NewHttpListener(listener net.Listener) *HttpListener
func (*HttpListener) Addr ¶
func (self *HttpListener) Addr() net.Addr
func (*HttpListener) Close ¶
func (self *HttpListener) Close() error
type HttpServer ¶
type HttpServer struct { http.Server Engine *Momonga Address string // contains filtered or unexported fields }
func NewHttpServer ¶
func NewHttpServer(engine *Momonga, config *configuration.Config, inherit bool) *HttpServer
func (*HttpServer) Graceful ¶
func (self *HttpServer) Graceful()
func (*HttpServer) ListenAndServe ¶
func (self *HttpServer) ListenAndServe() error
func (*HttpServer) Listener ¶
func (self *HttpServer) Listener() Listener
func (*HttpServer) Stop ¶
func (self *HttpServer) Stop()
type MmuxConnection ¶
type MmuxConnection struct { // Primary Connection *Connection OfflineQueue []mqtt.Message MaxOfflineQueue int Identifier string CleanSession bool OutGoingTable *util.MessageTable SubscribeMap map[string]bool Created time.Time Hash uint32 Mutex sync.RWMutex SubscribedTopics map[string]*SubscribeSet }
MQTT Multiplexer Connection
Multiplexer、というかなんだろ。EngineとConnectionとの仲介でおいとくやつ。 Sessionがあるのでこういうふうにしとくと楽かな、と
func NewMmuxConnection ¶
func NewMmuxConnection() *MmuxConnection
func (*MmuxConnection) AppendSubscribedTopic ¶
func (self *MmuxConnection) AppendSubscribedTopic(topic string, set *SubscribeSet)
func (*MmuxConnection) Attach ¶
func (self *MmuxConnection) Attach(conn Connection)
func (*MmuxConnection) Close ¶
func (self *MmuxConnection) Close() error
func (*MmuxConnection) Detach ¶
func (self *MmuxConnection) Detach(conn Connection, dummy *DummyPlug)
func (*MmuxConnection) DisableCleanSession ¶
func (self *MmuxConnection) DisableCleanSession()
func (*MmuxConnection) GetHash ¶
func (self *MmuxConnection) GetHash() uint32
func (*MmuxConnection) GetId ¶
func (self *MmuxConnection) GetId() string
func (*MmuxConnection) GetOutGoingTable ¶
func (self *MmuxConnection) GetOutGoingTable() *util.MessageTable
func (*MmuxConnection) GetRealId ¶
func (self *MmuxConnection) GetRealId() string
func (*MmuxConnection) GetState ¶
func (self *MmuxConnection) GetState() State
func (*MmuxConnection) GetSubscribedTopics ¶
func (self *MmuxConnection) GetSubscribedTopics() map[string]*SubscribeSet
func (*MmuxConnection) GetWillMessage ¶
func (self *MmuxConnection) GetWillMessage() *mqtt.WillMessage
func (*MmuxConnection) HasWillMessage ¶
func (self *MmuxConnection) HasWillMessage() bool
func (*MmuxConnection) IsAlived ¶
func (self *MmuxConnection) IsAlived() bool
func (*MmuxConnection) IsBridge ¶
func (self *MmuxConnection) IsBridge() bool
func (*MmuxConnection) IsSubscribed ¶
func (self *MmuxConnection) IsSubscribed(topic string) bool
func (*MmuxConnection) ReadMessage ¶
func (self *MmuxConnection) ReadMessage() (mqtt.Message, error)
func (*MmuxConnection) RemoveSubscribedTopic ¶
func (self *MmuxConnection) RemoveSubscribedTopic(topic string)
func (*MmuxConnection) ResetState ¶
func (self *MmuxConnection) ResetState()
func (*MmuxConnection) SetId ¶
func (self *MmuxConnection) SetId(id string)
func (*MmuxConnection) SetKeepaliveInterval ¶
func (self *MmuxConnection) SetKeepaliveInterval(interval int)
func (*MmuxConnection) SetState ¶
func (self *MmuxConnection) SetState(state State)
func (*MmuxConnection) SetWillMessage ¶
func (self *MmuxConnection) SetWillMessage(msg mqtt.WillMessage)
func (*MmuxConnection) ShouldCleanSession ¶
func (self *MmuxConnection) ShouldCleanSession() bool
func (*MmuxConnection) WriteMessageQueue ¶
func (self *MmuxConnection) WriteMessageQueue(request mqtt.Message)
type Momonga ¶
type Momonga struct { OutGoingTable *util.MessageTable InflightTable map[string]*util.MessageTable TopicMatcher TopicMatcher // TODO: improve this. Connections map[uint32]map[string]*MmuxConnection RetryMap map[string][]*Retryable EnableSys bool Started time.Time DataStore datastore.Datastore LockPool map[uint32]*sync.RWMutex Authenticators []Authenticator // contains filtered or unexported fields }
goroutine (2)
RunMaintenanceThread Run
func NewMomonga ¶
func NewMomonga(config *configuration.Config) *Momonga
QoS 1, 2 are available. but really suck implementation. reconsider qos design later.
func (*Momonga) Authenticate ¶
func (*Momonga) CleanSubscription ¶
func (self *Momonga) CleanSubscription(conn Connection)
func (*Momonga) Config ¶
func (self *Momonga) Config() *configuration.Config
func (*Momonga) DisableSys ¶
func (self *Momonga) DisableSys()
func (*Momonga) GetConnectionByClientId ¶
func (self *Momonga) GetConnectionByClientId(clientId string) (*MmuxConnection, error)
func (*Momonga) HandleConnection ¶
func (self *Momonga) HandleConnection(conn Connection)
func (*Momonga) Handshake ¶
func (self *Momonga) Handshake(p *codec.ConnectMessage, conn *MyConnection) *MmuxConnection
func (*Momonga) RemoveConnectionByClientId ¶
func (*Momonga) RetainMatch ¶
func (self *Momonga) RetainMatch(topic string) []*codec.PublishMessage
TODO: wanna implement trie. but regexp works well. retain should persist their data. though, how do we fetch it efficiently...
func (*Momonga) RunMaintenanceThread ¶
func (self *Momonga) RunMaintenanceThread()
below methods are intend to maintain engine itself (remove needless connection, dispatch queue).
func (*Momonga) SendMessage ¶
func (*Momonga) SendPublishMessage ¶
func (self *Momonga) SendPublishMessage(msg *codec.PublishMessage, client_id string, is_bridged bool)
func (*Momonga) SendWillMessage ¶
func (self *Momonga) SendWillMessage(conn Connection)
func (*Momonga) SetConnectionByClientId ¶
func (self *Momonga) SetConnectionByClientId(clientId string, conn *MmuxConnection)
func (*Momonga) Subscribe ¶
func (self *Momonga) Subscribe(p *codec.SubscribeMessage, conn Connection)
func (*Momonga) Unsubscribe ¶
func (self *Momonga) Unsubscribe(messageId uint16, granted int, payloads []codec.SubscribePayload, conn Connection)
type MyHttpServer ¶
func (*MyHttpServer) ServeHTTP ¶
func (self *MyHttpServer) ServeHTTP(w http.ResponseWriter, req *http.Request)
type MyListener ¶
type MyMessages ¶
type MyMetrics ¶
type MyMetrics struct { System MySystem NumGoroutine *expvar.Int NumCgoCall *expvar.Int Uptime *expvar.Int MemFree *expvar.Int MemUsed *expvar.Int MemActualFree *expvar.Int MemActualUsed *expvar.Int MemTotal *expvar.Int LoadOne *expvar.Float LoadFive *expvar.Float LoadFifteen *expvar.Float CpuUser *expvar.Float CpuNice *expvar.Float CpuSys *expvar.Float CpuIdle *expvar.Float CpuWait *expvar.Float CpuIrq *expvar.Float CpuSoftIrq *expvar.Float CpuStolen *expvar.Float CpuTotal *expvar.Float MessageSentPerSec *myexpvar.DiffInt ConnectPerSec *myexpvar.DiffInt GoroutinePerConn *expvar.Float }
var Metrics *MyMetrics = &MyMetrics{ System: MySystem{ Broker: MyBroker{ Clients: MyClients{ Connected: expvar.NewInt("sys.broker.clients.connected"), Total: expvar.NewInt("sys.broker.clients.total"), Maximum: expvar.NewInt("sys.broker.clients.maximum"), Disconnected: expvar.NewInt("sys.broker.clients.disconnected"), }, Uptime: expvar.NewInt("sys.broker.uptime"), Messages: MyMessages{ Received: expvar.NewInt("sys.broker.messages.received"), Sent: expvar.NewInt("sys.broker.messages.sent"), Stored: expvar.NewInt("sys.broker.messages.stored"), PublishDropped: expvar.NewInt("sys.broker.messages.publish.dropped"), RetainedCount: expvar.NewInt("sys.broker.messages.retained.count"), }, Load: MyLoad{ BytesSend: expvar.NewInt("sys.broker.load.bytes_send"), BytesReceived: expvar.NewInt("sys.broker.load.bytes_received"), }, SubscriptionsCount: expvar.NewInt("sys.broker.subscriptions.count"), }, }, NumGoroutine: expvar.NewInt("numgoroutine"), NumCgoCall: expvar.NewInt("numcgocall"), Uptime: expvar.NewInt("uptime"), MemFree: expvar.NewInt("memfree"), MemUsed: expvar.NewInt("memused"), MemActualFree: expvar.NewInt("memactualfree"), MemActualUsed: expvar.NewInt("memactualused"), MemTotal: expvar.NewInt("memtotal"), LoadOne: expvar.NewFloat("loadone"), LoadFive: expvar.NewFloat("loadfive"), LoadFifteen: expvar.NewFloat("loadfifteen"), CpuUser: expvar.NewFloat("cpuuser"), CpuNice: expvar.NewFloat("cpunice"), CpuSys: expvar.NewFloat("cpusys"), CpuIdle: expvar.NewFloat("cpuidle"), CpuWait: expvar.NewFloat("cpuwait"), CpuIrq: expvar.NewFloat("cpuirq"), CpuSoftIrq: expvar.NewFloat("cpusoftirq"), CpuStolen: expvar.NewFloat("cpustolen"), CpuTotal: expvar.NewFloat("cputotal"), MessageSentPerSec: myexpvar.NewDiffInt("msg_sent_per_sec"), ConnectPerSec: myexpvar.NewDiffInt("connect_per_sec"), GoroutinePerConn: expvar.NewFloat("goroutine_per_conn"), }
TODO: should not use expvar as we can't hold multiple MyMetrics metrics.
type Retryable ¶
type Retryable struct { Id string Payload interface{} }
TODO: haven't used this yet.
type TcpServer ¶
type TcpServer struct { ListenAddress string Engine *Momonga // contains filtered or unexported fields }
func NewTcpServer ¶
func NewTcpServer(engine *Momonga, config *configuration.Config, inherit bool) *TcpServer
func (*TcpServer) ListenAndServe ¶
type TlsServer ¶
type TlsServer struct { ListenAddress string Engine *Momonga // contains filtered or unexported fields }
func NewTlsServer ¶
func NewTlsServer(engine *Momonga, config *configuration.Config, inherit bool) *TlsServer
func (*TlsServer) ListenAndServe ¶
type TopicMatcher ¶
type UnixServer ¶
func NewUnixServer ¶
func NewUnixServer(engine *Momonga, config *configuration.Config, inherit bool) *UnixServer
func (*UnixServer) Graceful ¶
func (self *UnixServer) Graceful()
func (*UnixServer) ListenAndServe ¶
func (self *UnixServer) ListenAndServe() error
func (*UnixServer) Listener ¶
func (self *UnixServer) Listener() Listener
func (*UnixServer) Stop ¶
func (self *UnixServer) Stop()