Versions in this module Expand all Collapse all v1 v1.0.0 Dec 6, 2020 Changes in this version + var ErrCanceled = errors.New("canceled") + var ErrClosed = errors.New("isClosed") + func AckFunc(ctx context.Context, cl ConnectionLayer, packet *packets.PublishPacket) func() + func IsNetClosedErr(err error) bool + func IsNetTimeout(err error) bool + func NewConnectMsg(cid string, cleanSession bool, keepalive uint16) *packets.ConnectPacket + func NewRouter() *router + func SetLogger(out io.Writer) + type ApplicationLayer interface + AddRoute func(topic string, callback MessageHandler) + Close func() error + DeleteRoute func(topic string) + Open func(ctx context.Context) error + Publish func(ctx context.Context, topic string, data interface{}) error + PublishTimeout func(ctx context.Context, topic string, data interface{}, timeout time.Duration) error + PublishWithID func(ctx context.Context, sid string, topic string, data interface{}) Token + Remove func(id string) + Sessions func() []string + SetDefaultHandler func(handler MessageHandler) + type ConnConfig struct + CID string + CleanSession bool + CloseCB OnCloseCallback + Keepalive uint16 + ReadTimeout time.Duration + Version byte + WriteTimeout time.Duration + type ConnectToken struct + func (b *ConnectToken) Error() error + func (b *ConnectToken) FlowComplete() + func (b *ConnectToken) SetError(e error) + func (b *ConnectToken) Wait() bool + func (b *ConnectToken) WaitTimeout(d time.Duration) bool + func (c *ConnectToken) ReturnCode() byte + func (c *ConnectToken) SessionPresent() bool + func (c *ConnectToken) SetReturnCode(code byte) + func (c *ConnectToken) SetSessionPresent(sp bool) + type ConnectionLayer interface + Close func() + ID func() string + Read func() chan packets.ControlPacket + Write func(ctx context.Context, pkt *PacketAndToken) error + WriteP func(ctx context.Context, msg packets.ControlPacket) error + func NewConnectionKeeper(ctx context.Context, connFactory func(context.Context) MConn, sendPing bool) ConnectionLayer + type Customer interface + GetAckFunc func(msg *packets.PublishPacket) func() + type DisconnectToken struct + func (b *DisconnectToken) Error() error + func (b *DisconnectToken) FlowComplete() + func (b *DisconnectToken) SetError(e error) + func (b *DisconnectToken) Wait() bool + func (b *DisconnectToken) WaitTimeout(d time.Duration) bool + type DummyToken struct + func (d *DummyToken) Error() error + func (d *DummyToken) FlowComplete() + func (d *DummyToken) SetError(error) + func (d *DummyToken) Wait() bool + func (d *DummyToken) WaitTimeout(time.Duration) bool + type ErrorToken struct + func (e *ErrorToken) Error() error + func (e *ErrorToken) Wait() bool + func (e *ErrorToken) WaitTimeout(time.Duration) bool + type Logger interface + Printf func(format string, v ...interface{}) + Println func(v ...interface{}) + var CRITICAL Logger = NOOPLogger{} + var DEBUG Logger = NOOPLogger{} + var ERROR Logger = NOOPLogger{} + var INFO Logger = NOOPLogger{} + var WARN Logger = NOOPLogger{} + type MConn interface + CleanSession func() bool + Heartbeat func() time.Duration + ID func() string + func Accept(conn net.Conn, cf *ConnConfig) (MConn, error) + func Connect(conn net.Conn, cf *ConnConfig) (MConn, error) + func NewMConn(conn net.Conn, options *ConnConfig) MConn + type MId uint16 + type Message interface + Ack func() + ClientID func() string + Duplicate func() bool + MessageID func() MId + Payload func() []byte + Qos func() byte + Retained func() bool + Topic func() string + func MessageFromPublish(p *packets.PublishPacket, cid string, ack func()) Message + type MessageHandler func(Message) bool + type MessageIds struct + func NewMessageIds() *MessageIds + func (m *MessageIds) ClaimID(token TokenCompleter, id MId) + func (m *MessageIds) CleanUp() + func (m *MessageIds) FreeID(id MId) + func (m *MessageIds) GetID(t TokenCompleter) MId + func (m *MessageIds) GetToken(id MId) TokenCompleter + type NOOPLogger struct + func (NOOPLogger) Printf(format string, v ...interface{}) + func (NOOPLogger) Println(v ...interface{}) + type OnCloseCallback func(id string) + type PacketAndToken struct + P packets.ControlPacket + T TokenCompleter + type PresentationLayer interface + Decode func(src []byte, dst interface{}) (err error) + Encode func(src interface{}) (dst []byte, err error) + func NewJsonPresentationLayer() PresentationLayer + type PublishToken struct + func (b *PublishToken) Error() error + func (b *PublishToken) FlowComplete() + func (b *PublishToken) SetError(e error) + func (b *PublishToken) Wait() bool + func (b *PublishToken) WaitTimeout(d time.Duration) bool + func (p *PublishToken) MessageID() MId + func (p *PublishToken) SetMessageID(id MId) + type Router interface + AddRoute func(topic string, callback MessageHandler) + DeleteRoute func(topic string) + MatchAndDispatch func(messages <-chan Message, order bool) + SetDefaultHandler func(handler MessageHandler) + type SessionLayer interface + Close func() + In func() <-chan Message + Send func(ctx context.Context, topic string, qos int, msg []byte) Token + UpdateConnectLayer func(ctx context.Context, cl ConnectionLayer) error + func NewSession(ctx context.Context, cl ConnectionLayer) (SessionLayer, error) + type Token interface + Error func() error + Wait func() bool + WaitTimeout func(time.Duration) bool + func NewErrToken(err error) Token + type TokenCompleter interface + FlowComplete func() + func NewToken(tType byte) TokenCompleter + type TokenErrorSetter interface + SetError func(error)