Documentation ¶
Index ¶
- Variables
- func NewTiSession(txn kv.Transaction) *tiSession
- type ByMessageID
- type ClientInfo
- type Option
- type RdsSession
- func (s *RdsSession) Ack(ns, client string, mid int64) ([]byte, error)
- func (s *RdsSession) ConndAddress(ns, client string) (*ClientInfo, error)
- func (s *RdsSession) Cursor(ns, client, topic string) ([]byte, error)
- func (s *RdsSession) DeleteConndAddress(ns, client, address string, conneciontID int64) error
- func (s *RdsSession) DeleteCursor(ns, client, topic string) error
- func (s *RdsSession) DeleteMessageID(ns, client string) error
- func (s *RdsSession) Drop(ns, client string) error
- func (s *RdsSession) DropCursor(ns, client string) error
- func (s *RdsSession) DropSub(ns, client string) error
- func (s *RdsSession) DropUnack(ns, client string) error
- func (s *RdsSession) MessageID(ns, client string) (int64, error)
- func (s *RdsSession) SetConndAddress(ns, client, address string, conneciontID int64) error
- func (s *RdsSession) SetCursor(ns, client, topic string, cursor []byte) error
- func (s *RdsSession) SetMessageID(ns, client string, messageID int64) error
- func (s *RdsSession) Subscribe(ns, client string, topic string, qos byte) error
- func (s *RdsSession) Subscriptions(ns, client string) (map[string]byte, error)
- func (s *RdsSession) Unack(ns, client string, messages []*pbMessage.Message) error
- func (s *RdsSession) Unacks(ns, client string, pos []byte, count int64) ([]byte, []*pbMessage.Message, bool, error)
- func (s *RdsSession) Unsubscribe(ns, client string, topic string) error
- func (s *RdsSession) UpdateConndAddress(ns, client, address string, conneciontID int64) (*ClientInfo, error)
- type SessionOps
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func NewTiSession ¶
func NewTiSession(txn kv.Transaction) *tiSession
Types ¶
type ByMessageID ¶
func (ByMessageID) Len ¶
func (b ByMessageID) Len() int
func (ByMessageID) Swap ¶
func (b ByMessageID) Swap(i int, j int)
type ClientInfo ¶
type ClientInfo struct { TimeStamp int64 `json:"time_stamp"` Address string `json:"address"` ConnectionID int64 `json:"connection_id"` }
ClientInfo client basic information
type Option ¶
type Option func(r *sessionOps)
func SetRedisStore ¶
func SetTikvStore ¶
func SetTikvStore(txn kv.Transaction) Option
type RdsSession ¶
type RdsSession struct {
// contains filtered or unexported fields
}
func NewRdsSession ¶
func NewRdsSession(rdc util.Redic) *RdsSession
func (*RdsSession) ConndAddress ¶
func (s *RdsSession) ConndAddress(ns, client string) (*ClientInfo, error)
TODO
func (*RdsSession) DeleteConndAddress ¶
func (s *RdsSession) DeleteConndAddress(ns, client, address string, conneciontID int64) error
func (*RdsSession) DeleteCursor ¶
func (s *RdsSession) DeleteCursor(ns, client, topic string) error
func (*RdsSession) DeleteMessageID ¶
func (s *RdsSession) DeleteMessageID(ns, client string) error
func (*RdsSession) Drop ¶
func (s *RdsSession) Drop(ns, client string) error
func (*RdsSession) DropCursor ¶
func (s *RdsSession) DropCursor(ns, client string) error
func (*RdsSession) DropSub ¶
func (s *RdsSession) DropSub(ns, client string) error
func (*RdsSession) DropUnack ¶
func (s *RdsSession) DropUnack(ns, client string) error
func (*RdsSession) MessageID ¶
func (s *RdsSession) MessageID(ns, client string) (int64, error)
MessageID is used to query the message ID of specified client. if key is ErrNotFound ,return 0 nil
func (*RdsSession) SetConndAddress ¶
func (s *RdsSession) SetConndAddress(ns, client, address string, conneciontID int64) error
TODO
func (*RdsSession) SetCursor ¶
func (s *RdsSession) SetCursor(ns, client, topic string, cursor []byte) error
func (*RdsSession) SetMessageID ¶
func (s *RdsSession) SetMessageID(ns, client string, messageID int64) error
SetMessageID is used to update message ID in storage
func (*RdsSession) Subscribe ¶
func (s *RdsSession) Subscribe(ns, client string, topic string, qos byte) error
func (*RdsSession) Subscriptions ¶
func (s *RdsSession) Subscriptions(ns, client string) (map[string]byte, error)
func (*RdsSession) Unack ¶
func (s *RdsSession) Unack(ns, client string, messages []*pbMessage.Message) error
func (*RdsSession) Unacks ¶
func (s *RdsSession) Unacks(ns, client string, pos []byte, count int64) ([]byte, []*pbMessage.Message, bool, error)
TODO hscan
func (*RdsSession) Unsubscribe ¶
func (s *RdsSession) Unsubscribe(ns, client string, topic string) error
func (*RdsSession) UpdateConndAddress ¶
func (s *RdsSession) UpdateConndAddress(ns, client, address string, conneciontID int64) (*ClientInfo, error)
UpdateConnAddress update client info return client old info
type SessionOps ¶
type SessionOps interface { //Create(name string) (int64, error) //ID(name string) (int64, error) Subscribe(ns, client string, topic string, qos byte) error // 订阅Topic Unsubscribe(ns, client string, topic string) error // 取消订阅 Subscriptions(ns, client string) (map[string]byte, error) // 会话恢复 // 游标信息 topic->cursor SetCursor(ns, client, topic string, cursor []byte) error // 更新cursor,为了保证幂等,不提供move方式(类似INCRBY的方式) Cursor(ns, client, topic string) ([]byte, error) // 查询cursor DeleteCursor(ns, client, topic string) error ConndAddress(ns, client string) (*ClientInfo, error) SetConndAddress(ns, client, address string, conneciontID int64) error UpdateConndAddress(ns, client, address string, conneciontID int64) (*ClientInfo, error) DeleteConndAddress(ns, client, address string, conneciontID int64) error // 未ACK队列接口 DropUnack(ns, client string) error Unack(ns, client string, messages []*pbMessage.Message) error Ack(ns, client string, mid int64) ([]byte, error) Unacks(ns, client string, pos []byte, count int64) (first []byte, messages []*pbMessage.Message, end bool, err error) // 消息ID messageID SetMessageID(ns, client string, messageID int64) error MessageID(ns, client string) (int64, error) DeleteMessageID(ns, client string) error // 删除Session信息 Drop(ns, client string) error }
func NewSessionOps ¶
func NewSessionOps(opts ...Option) SessionOps
Click to show internal directories.
Click to hide internal directories.