session

package
v0.0.0-...-2aa8555 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2021 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMinimalIntervalLimit = errors.New("lower than minimal kick interval, request rejected")
	ErrInvalidRecord        = errors.New("invalid record")
	ErrNotMatch             = errors.New("not match")
	ErrEmpty                = errors.New("empty")

	MinimalKickInterval time.Duration = 500
)

Functions

func NewTiSession

func NewTiSession(txn kv.Transaction) *tiSession

Types

type ByMessageID

type ByMessageID []*pbMessage.Message

func (ByMessageID) Len

func (b ByMessageID) Len() int

func (ByMessageID) Less

func (b ByMessageID) Less(i int, j int) bool

func (ByMessageID) Swap

func (b ByMessageID) Swap(i int, j int)

type ClientInfo

type ClientInfo struct {
	TimeStamp    int64  `json:"time_stamp"`
	Address      string `json:"address"`
	ConnectionID int64  `json:"connection_id"`
}

ClientInfo client basic information

type Option

type Option func(r *sessionOps)

func SetRedisStore

func SetRedisStore(rdc util.Redic) Option

func SetTikvStore

func SetTikvStore(txn kv.Transaction) Option

type RdsSession

type RdsSession struct {
	// contains filtered or unexported fields
}

func NewRdsSession

func NewRdsSession(rdc util.Redic) *RdsSession

func (*RdsSession) Ack

func (s *RdsSession) Ack(ns, client string, mid int64) ([]byte, error)

func (*RdsSession) ConndAddress

func (s *RdsSession) ConndAddress(ns, client string) (*ClientInfo, error)

TODO

func (*RdsSession) Cursor

func (s *RdsSession) Cursor(ns, client, topic string) ([]byte, error)

func (*RdsSession) DeleteConndAddress

func (s *RdsSession) DeleteConndAddress(ns, client, address string, conneciontID int64) error

func (*RdsSession) DeleteCursor

func (s *RdsSession) DeleteCursor(ns, client, topic string) error

func (*RdsSession) DeleteMessageID

func (s *RdsSession) DeleteMessageID(ns, client string) error

func (*RdsSession) Drop

func (s *RdsSession) Drop(ns, client string) error

func (*RdsSession) DropCursor

func (s *RdsSession) DropCursor(ns, client string) error

func (*RdsSession) DropSub

func (s *RdsSession) DropSub(ns, client string) error

func (*RdsSession) DropUnack

func (s *RdsSession) DropUnack(ns, client string) error

func (*RdsSession) MessageID

func (s *RdsSession) MessageID(ns, client string) (int64, error)

MessageID is used to query the message ID of specified client. if key is ErrNotFound ,return 0 nil

func (*RdsSession) SetConndAddress

func (s *RdsSession) SetConndAddress(ns, client, address string, conneciontID int64) error

TODO

func (*RdsSession) SetCursor

func (s *RdsSession) SetCursor(ns, client, topic string, cursor []byte) error

func (*RdsSession) SetMessageID

func (s *RdsSession) SetMessageID(ns, client string, messageID int64) error

SetMessageID is used to update message ID in storage

func (*RdsSession) Subscribe

func (s *RdsSession) Subscribe(ns, client string, topic string, qos byte) error

func (*RdsSession) Subscriptions

func (s *RdsSession) Subscriptions(ns, client string) (map[string]byte, error)

func (*RdsSession) Unack

func (s *RdsSession) Unack(ns, client string, messages []*pbMessage.Message) error

func (*RdsSession) Unacks

func (s *RdsSession) Unacks(ns, client string, pos []byte, count int64) ([]byte, []*pbMessage.Message, bool, error)

TODO hscan

func (*RdsSession) Unsubscribe

func (s *RdsSession) Unsubscribe(ns, client string, topic string) error

func (*RdsSession) UpdateConndAddress

func (s *RdsSession) UpdateConndAddress(ns, client, address string, conneciontID int64) (*ClientInfo, error)

UpdateConnAddress update client info return client old info

type SessionOps

type SessionOps interface {

	//Create(name string) (int64, error)
	//ID(name string) (int64, error)
	Subscribe(ns, client string, topic string, qos byte) error // 订阅Topic
	Unsubscribe(ns, client string, topic string) error         // 取消订阅
	Subscriptions(ns, client string) (map[string]byte, error)  // 会话恢复

	// 游标信息 topic->cursor
	SetCursor(ns, client, topic string, cursor []byte) error // 更新cursor,为了保证幂等,不提供move方式(类似INCRBY的方式)
	Cursor(ns, client, topic string) ([]byte, error)         // 查询cursor
	DeleteCursor(ns, client, topic string) error

	ConndAddress(ns, client string) (*ClientInfo, error)
	SetConndAddress(ns, client, address string, conneciontID int64) error
	UpdateConndAddress(ns, client, address string, conneciontID int64) (*ClientInfo, error)
	DeleteConndAddress(ns, client, address string, conneciontID int64) error

	// 未ACK队列接口
	DropUnack(ns, client string) error
	Unack(ns, client string, messages []*pbMessage.Message) error
	Ack(ns, client string, mid int64) ([]byte, error)
	Unacks(ns, client string, pos []byte, count int64) (first []byte, messages []*pbMessage.Message, end bool, err error)

	// 消息ID messageID
	SetMessageID(ns, client string, messageID int64) error
	MessageID(ns, client string) (int64, error)
	DeleteMessageID(ns, client string) error

	// 删除Session信息
	Drop(ns, client string) error
}

func NewSessionOps

func NewSessionOps(opts ...Option) SessionOps

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL