Documentation ¶
Index ¶
- Variables
- func NewSessionContext(mctx *MqttSrvContext, conn *poll.Conn) (*SessionContext, CancelFunc)
- func WithCancel(ctx *Context) (*Context, CancelFunc)
- type BaseContext
- type CancelFunc
- type Context
- type GrpcSrvContext
- type MqttSrvContext
- type PullInfo
- type SessionContext
- func (ctx *SessionContext) CloneWriteInfo() []string
- func (s *SessionContext) CompareAndSwapPullState(olds, news WorkState) bool
- func (ctx *SessionContext) CompareAndSwapWriteState(olds, news WorkState) bool
- func (s *SessionContext) ComparePullState(expect WorkState) bool
- func (ctx *SessionContext) CompareState(state SessionState) bool
- func (ctx *SessionContext) CompareWriteState(state WorkState) bool
- func (ctx *SessionContext) GetAndClearWriteBuffer() []packets.ControlPacket
- func (ctx *SessionContext) GetNeedPullInfo() map[string]PullInfo
- func (ctx *SessionContext) GetPacketID() uint16
- func (ctx *SessionContext) GetWriteInfo(topic string) bool
- func (ctx *SessionContext) IsWrite(noWriteHandler func(), writeHandler func()) bool
- func (ctx *SessionContext) LoadState() SessionState
- func (ctx *SessionContext) LoadWriteState() WorkState
- func (ctx *SessionContext) LogFields(fs ...zapcore.Field) []zap.Field
- func (ctx *SessionContext) RecoveryUpdateInfo(r *pb.Record) bool
- func (ctx *SessionContext) SetPacketID(i int64)
- func (s *SessionContext) StorePullState(state WorkState)
- func (ctx *SessionContext) StoreState(state SessionState)
- func (ctx *SessionContext) StoreWriteState(state WorkState)
- func (ctx *SessionContext) SubUpdateInfo(topics []string, indexs [][]byte, qoss []int32)
- func (ctx *SessionContext) UnsubUpdateInfo(topics []string)
- func (ctx *SessionContext) UpdatePullInfo(topic string, last []byte) bool
- func (ctx *SessionContext) UpdatePullInfoCursor(cursors map[string][]byte)
- func (ctx *SessionContext) UpdateResumePullInfo(cursors map[string][]byte, completes map[string]bool)
- func (ctx *SessionContext) WithContext() *Context
- func (ctx *SessionContext) WritePacket(pkg packets.ControlPacket, noWriteHandler func() bool)
- func (ctx *SessionContext) WritePublishPackets(pkgs []*packets.PublishPacket, noWriteHandler func() bool)
- type SessionState
- type SubState
- type WorkState
Constants ¶
This section is empty.
Variables ¶
var Canceled = context.Canceled
Canceled is the error returned by Context.Err when the context is canceled.
Functions ¶
func NewSessionContext ¶
func NewSessionContext(mctx *MqttSrvContext, conn *poll.Conn) (*SessionContext, CancelFunc)
NewSessionContext return a new SessionContext
func WithCancel ¶
func WithCancel(ctx *Context) (*Context, CancelFunc)
WithCancel returns a copy of parent with a new Done channel. The returned context's Done channel is closed when the returned cancel function is called or when the parent context's Done channel is closed, whichever happens first.
Types ¶
type BaseContext ¶
type BaseContext struct { *Context PushCli *pushcli.PushCli Clients *clients.Clients Pubsub *pubsub.Pubsub }
BaseContext is the runtime context of the base
func (*BaseContext) WithCancel ¶
func (ctx *BaseContext) WithCancel() (*BaseContext, CancelFunc)
WithCancel returns a copy of parent with a new Done channel
type CancelFunc ¶
type CancelFunc context.CancelFunc
CancelFunc tells an operation to abandon its work
type Context ¶
Context alias of the system library
func Background ¶
func Background() *Context
Background returns a non-nil, empty Context. It is never canceled, has no values, and has no deadline. It is typically used by the main function, initialization, and tests, and as the top-level Context for incoming
type GrpcSrvContext ¶
type GrpcSrvContext struct {
*BaseContext
}
GrcpSrvContext is the runtime context of the grpc server
type MqttSrvContext ¶
type MqttSrvContext struct { *BaseContext GrpcAddr string Conf *conf.Session Poll *poll.Poll Clock *sync.RWMutex // Must be a pointer to implement subclasses and husband classes sharing Connections map[int]interface{} }
MqttSrvContext is the runtime context of the mqtt server
func (*MqttSrvContext) AddSession ¶
func (ctx *MqttSrvContext) AddSession(fd int, s interface{})
AddSession add session in ctx
func (*MqttSrvContext) GetSession ¶
func (ctx *MqttSrvContext) GetSession(fd int) interface{}
GetSession get session in fd
func (*MqttSrvContext) RemoveSession ¶
func (ctx *MqttSrvContext) RemoveSession(fd int)
RemoveSession delete a session
func (*MqttSrvContext) WithCancel ¶
func (ctx *MqttSrvContext) WithCancel() (*MqttSrvContext, CancelFunc)
WithCancel returns a copy of parent with a new Done channel
type PullInfo ¶
PullInfo records the interval in which the client pulls messages [) Cursor currently fetches the message location Latest the message queue assigns the location of the next message state 0 init state 1 notify
type SessionContext ¶
type SessionContext struct { *MqttSrvContext ClientID string Service string StatLabel string CleanSession bool Kick bool PullMutex sync.RWMutex // mutex should guide fields bellow PullInfo map[string]*PullInfo Conn *poll.Conn Keepalive time.Duration // contains filtered or unexported fields }
SessionContext combines the client and server context
func (*SessionContext) CloneWriteInfo ¶
func (ctx *SessionContext) CloneWriteInfo() []string
CloneWriteInfo return a slice topics that is cloned from the writeinfo
func (*SessionContext) CompareAndSwapPullState ¶
func (s *SessionContext) CompareAndSwapPullState(olds, news WorkState) bool
func (*SessionContext) CompareAndSwapWriteState ¶
func (ctx *SessionContext) CompareAndSwapWriteState(olds, news WorkState) bool
func (*SessionContext) ComparePullState ¶
func (s *SessionContext) ComparePullState(expect WorkState) bool
func (*SessionContext) CompareState ¶
func (ctx *SessionContext) CompareState(state SessionState) bool
func (*SessionContext) CompareWriteState ¶
func (ctx *SessionContext) CompareWriteState(state WorkState) bool
func (*SessionContext) GetAndClearWriteBuffer ¶
func (ctx *SessionContext) GetAndClearWriteBuffer() []packets.ControlPacket
GetAndClearWriteBuffer return write buffer and clear write buffer
func (*SessionContext) GetNeedPullInfo ¶
func (ctx *SessionContext) GetNeedPullInfo() map[string]PullInfo
GetNeedPullInfo get a list that cursor is greater than the latest in the pullinfo map
func (*SessionContext) GetPacketID ¶
func (ctx *SessionContext) GetPacketID() uint16
GetPacketID get a packet id
func (*SessionContext) GetWriteInfo ¶
func (ctx *SessionContext) GetWriteInfo(topic string) bool
func (*SessionContext) IsWrite ¶
func (ctx *SessionContext) IsWrite(noWriteHandler func(), writeHandler func()) bool
IsWrite When the write buffer is not empty return true otherwise return false
func (*SessionContext) LoadState ¶
func (ctx *SessionContext) LoadState() SessionState
func (*SessionContext) LoadWriteState ¶
func (ctx *SessionContext) LoadWriteState() WorkState
func (*SessionContext) LogFields ¶
func (ctx *SessionContext) LogFields(fs ...zapcore.Field) []zap.Field
LogFields return a slice fields Specify the order of printing ,print ctx then fs
func (*SessionContext) RecoveryUpdateInfo ¶
func (ctx *SessionContext) RecoveryUpdateInfo(r *pb.Record) bool
RecoveryUpdateInfo TODO bug the subscription can only be made when the painting restoration is complete
func (*SessionContext) SetPacketID ¶
func (ctx *SessionContext) SetPacketID(i int64)
SetPacketID set the starting point for the assignment package ids
func (*SessionContext) StorePullState ¶
func (s *SessionContext) StorePullState(state WorkState)
func (*SessionContext) StoreState ¶
func (ctx *SessionContext) StoreState(state SessionState)
func (*SessionContext) StoreWriteState ¶
func (ctx *SessionContext) StoreWriteState(state WorkState)
func (*SessionContext) SubUpdateInfo ¶
func (ctx *SessionContext) SubUpdateInfo(topics []string, indexs [][]byte, qoss []int32)
SubUpdateInfo update the subscribe list infomation qoss == 0x80 invalid subscription TODO If topic is subscribed to update all information
func (*SessionContext) UnsubUpdateInfo ¶
func (ctx *SessionContext) UnsubUpdateInfo(topics []string)
UnsubUpdateInfo clean up the subscription list information by using the topics parameter
func (*SessionContext) UpdatePullInfo ¶
func (ctx *SessionContext) UpdatePullInfo(topic string, last []byte) bool
UpdatePullInfo update pulling list when last is greater than latest return true when update sucessfully otherwise return false
func (*SessionContext) UpdatePullInfoCursor ¶
func (ctx *SessionContext) UpdatePullInfoCursor(cursors map[string][]byte)
UpdatePullInfoCursor update cursor in the pullinfo map
func (*SessionContext) UpdateResumePullInfo ¶
func (ctx *SessionContext) UpdateResumePullInfo(cursors map[string][]byte, completes map[string]bool)
func (*SessionContext) WithContext ¶
func (ctx *SessionContext) WithContext() *Context
WithContext return the base context
func (*SessionContext) WritePacket ¶
func (ctx *SessionContext) WritePacket(pkg packets.ControlPacket, noWriteHandler func() bool)
WritePacket send a package to client
func (*SessionContext) WritePublishPackets ¶
func (ctx *SessionContext) WritePublishPackets(pkgs []*packets.PublishPacket, noWriteHandler func() bool)
WritePublishPackets send data to the write chan return true in the position of 0 otherwise false
type SessionState ¶
type SessionState int32
const ( ConnectState SessionState = iota RunningState ClosedState DestroyState )