tower

package
v0.4.2-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrorClose gateway连接已经关闭的错误信息
	ErrorClose = errors.New("firetower is collapsed")
	// ErrorTopicEmpty topic不存在的错误信息
	ErrorTopicEmpty = errors.New("topic is empty")
	// Server Side Mode Can not send to self
	ErrorServerSideMode = errors.New("server side tower can not send to self")
)

Functions

func NewFire

func NewFire(source protocol.FireSource, tower PusherInfo) *protocol.FireInfo

Types

type Bucket

type Bucket struct {
	BuffChan chan *protocol.FireInfo // bucket的消息处理队列
	// contains filtered or unexported fields
}

Bucket 的作用是将一个实例的连接均匀的分布在多个bucket中来达到并发推送的目的

func (*Bucket) AddSubscribe

func (b *Bucket) AddSubscribe(topic string, bt *FireTower)

AddSubscribe 添加当前实例中的topic->conn的订阅关系

func (*Bucket) DelSubscribe

func (b *Bucket) DelSubscribe(topic string, bt *FireTower)

DelSubscribe 删除当前实例中的topic->conn的订阅关系

type FireTower

type FireTower struct {
	Cookie []byte // 这里提供给业务放一个存放跟当前连接相关的数据信息
	// contains filtered or unexported fields
}

FireTower 客户端连接结构体 包含了客户端一个连接的所有信息

func BuildTower

func BuildTower(ws *websocket.Conn, clientId string) (tower *FireTower, err error)

BuildTower 实例化一个websocket客户端

func (*FireTower) ClientID

func (t *FireTower) ClientID() string

func (*FireTower) Close

func (t *FireTower) Close()

Close 关闭客户端连接并注销 调用该方法会完全注销掉由BuildTower生成的一切内容

func (*FireTower) Ext

func (t *FireTower) Ext() *sync.Map

func (*FireTower) GetConnectNum

func (t *FireTower) GetConnectNum(topic string) (uint64, error)

GetConnectNum 获取话题订阅数的grpc方法封装

func (*FireTower) Logger

func (t *FireTower) Logger() *zap.Logger

func (*FireTower) OnClose

func (t *FireTower) OnClose() chan struct{}

func (*FireTower) Publish

func (t *FireTower) Publish(fire *protocol.FireInfo) error

Publish 推送接口 通过BuildTower生成的实例都可以调用该方法来达到推送的目的

func (*FireTower) Run

func (t *FireTower) Run()

Run 启动websocket客户端

func (*FireTower) SetBeforeSubscribeHandler

func (t *FireTower) SetBeforeSubscribeHandler(fn func(context protocol.FireLife, topic []string) bool)

SetBeforeSubscribeHandler 订阅前回调事件 用户订阅topic前触发

func (*FireTower) SetOnConnectHandler

func (t *FireTower) SetOnConnectHandler(fn func() bool)

SetOnConnectHandler 建立连接事件

func (*FireTower) SetOnOfflineHandler

func (t *FireTower) SetOnOfflineHandler(fn func())

SetOnOfflineHandler 用户连接关闭时触发

func (*FireTower) SetOnSystemRemove

func (t *FireTower) SetOnSystemRemove(fn func(topic string))

SetOnSystemRemove 系统移除某个用户的topic订阅

func (*FireTower) SetReadHandler

func (t *FireTower) SetReadHandler(fn func(*protocol.FireInfo) bool)

SetReadHandler 客户端推送事件 接收到用户publish的消息时触发

func (*FireTower) SetReadTimeoutHandler

func (t *FireTower) SetReadTimeoutHandler(fn func(*protocol.FireInfo))

SetReadTimeoutHandler 超时回调 readIn channal写满了 生产 > 消费的情况下触发超时机制

func (*FireTower) SetReceivedHandler

func (t *FireTower) SetReceivedHandler(fn func(*protocol.FireInfo) bool)

func (*FireTower) SetSubscribeHandler

func (t *FireTower) SetSubscribeHandler(fn func(context protocol.FireLife, topic []string) bool)

SetSubscribeHandler 订阅事件 用户订阅topic后触发

func (*FireTower) SetUnSubscribeHandler

func (t *FireTower) SetUnSubscribeHandler(fn func(context protocol.FireLife, topic []string) bool)

SetUnSubscribeHandler 取消订阅事件 用户取消订阅topic后触发

func (*FireTower) SetUserID

func (t *FireTower) SetUserID(id string)

func (*FireTower) Subscribe

func (t *FireTower) Subscribe(context protocol.FireLife, topics []string) error

func (*FireTower) ToSelf

func (t *FireTower) ToSelf(b []byte) error

ToSelf 向自己推送消息 这里描述一下使用场景 只针对当前客户端进行的推送请调用该方法

func (*FireTower) TopicList

func (t *FireTower) TopicList() []string

func (*FireTower) UnSubscribe

func (t *FireTower) UnSubscribe(context protocol.FireLife, topics []string) error

func (*FireTower) UserID

func (t *FireTower) UserID() string

type Manager

type Manager interface {
	protocol.Pusher
	GetTopics() (map[string]uint64, error)
	ClusterID() int64
	Store() stores
	Logger() *zap.Logger
}

func BuildFoundation

func BuildFoundation(cfg config.FireTowerConfig, opts ...TowerOption) (Manager, error)

func Setup

func Setup(cfg config.FireTowerConfig, opts ...TowerOption) (Manager, error)

Init 初始化firetower 在调用firetower前请一定要先调用Init方法

type PusherInfo

type PusherInfo interface {
	ClientID() string
	UserID() string
}

type ServerSideTower

type ServerSideTower interface {
	SetOnConnectHandler(fn func() bool)
	SetOnOfflineHandler(fn func())
	SetReceivedHandler(fn func(*protocol.FireInfo) bool)
	SetSubscribeHandler(fn func(context protocol.FireLife, topic []string) bool)
	SetUnSubscribeHandler(fn func(context protocol.FireLife, topic []string) bool)
	SetBeforeSubscribeHandler(fn func(context protocol.FireLife, topic []string) bool)
	SetOnSystemRemove(fn func(topic string))
	GetConnectNum(topic string) (uint64, error)
	Publish(fire *protocol.FireInfo) error
	Subscribe(context protocol.FireLife, topics []string) error
	UnSubscribe(context protocol.FireLife, topics []string) error
	Logger() *zap.Logger
	TopicList() []string
	Run()
	Close()
	OnClose() chan struct{}
}

func BuildServerSideTower

func BuildServerSideTower(clientId string) ServerSideTower

BuildTower 实例化一个websocket客户端

type TowerManager

type TowerManager struct {
	protocol.Pusher
	// contains filtered or unexported fields
}

TowerManager 包含中心处理队列和多个bucket bucket的作用是将一个实例的连接均匀的分布在多个bucket中来达到并发推送的目的

func (*TowerManager) ClusterID

func (t *TowerManager) ClusterID() int64

func (*TowerManager) GetBucket

func (t *TowerManager) GetBucket(bt *FireTower) (bucket *Bucket)

GetBucket 获取一个可以分配当前连接的bucket

func (*TowerManager) GetTopics

func (t *TowerManager) GetTopics() (map[string]uint64, error)

func (*TowerManager) Logger

func (t *TowerManager) Logger() *zap.Logger

func (*TowerManager) SetConnCountChangedHandler

func (t *TowerManager) SetConnCountChangedHandler(f func())

func (*TowerManager) SetTopicCountChangedHandler

func (t *TowerManager) SetTopicCountChangedHandler(f func(string))

func (*TowerManager) Store

func (t *TowerManager) Store() stores

type TowerOption

type TowerOption func(t *TowerManager)

func BuildWithClusterID

func BuildWithClusterID(id int64) TowerOption

func BuildWithCoder

func BuildWithCoder(coder protocol.Coder) TowerOption

func BuildWithLogger

func BuildWithLogger(logger *zap.Logger) TowerOption

func BuildWithPusher

func BuildWithPusher(pusher protocol.Pusher) TowerOption

func BuildWithStore

func BuildWithStore(store stores) TowerOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL