xserver

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2023 License: GPL-2.0 Imports: 35 Imported by: 0

Documentation

Overview

提供了服务注册发现、服务互联互通、线路负载均衡、业务逻辑承载等功能.

Index

Constants

View Source
const (
	LAN_CTRL_ROOT     = "/"         // 运维帮助
	LAN_CTRL_STATUS   = "/status"   // 线路状态
	LAN_CTRL_HEALTH   = "/health"   // 线路检测
	LAN_CTRL_BACKUP   = "/backup"   // 线路备份
	LAN_CTRL_RESUME   = "/resume"   // 线路恢复
	LAN_CTRL_PAUSE    = "/pause"    // 线路暂停
	LAN_CTRL_CLOSE    = "/close"    // 线路关闭
	LAN_CTRL_DUMP     = "/dump"     // 清空内存
	LAN_CTRL_PRINT    = "/print"    // 内存打印
	LAN_CTRL_CONSOLE  = "/console"  // 线路日志
	LAN_CTRL_FLUSHLOG = "/flushlog" // 清空日志
)
View Source
const (
	LAN_CIN_MAX_FRAME  = 50000 // 最大输入网络帧数
	LAN_COUT_MAX_FRAME = 50000 // 最大输出网络帧数
)
View Source
const (
	EVT_SERVER_STARTED = -1 // 服务就绪(配置就绪 & 日志就绪 & DB就绪 & Redis就绪 & Lan就绪)
	EVT_SERVER_CHANGED = -2 // 服务变更(参数类型:[]interface{}{added map[string][]string, removed map[string][]string})
	EVT_SERVER_PREQUIT = -3 // 服务即将退出
)
View Source
const (
	ENV_DEV  = "dev"  // 开发
	ENV_TEST = "test" // 内测
	ENV_BETA = "beta" // 公测
	ENV_LIVE = "live" // 线上
)
View Source
const (
	SERVER_SLEEP time.Duration = 10 * time.Millisecond // 帧刷新间隔
)
View Source
const (
	UPDATE_SLEEP time.Duration = 10 * time.Millisecond // 帧刷新间隔
)

Variables

View Source
var (
	GMsg = xevt.NewEvtMgr(true)  // Msg消息中心
	GRpc = xevt.NewEvtMgr(false) // Rpc消息中心
	GCgi = xevt.NewEvtMgr(false) // Cgi消息中心
	GEvt = xevt.NewEvtMgr(true)  // Evt消息中心
)
View Source
var (
	ERR_SEND_CHAN_FULL  = errors.New("send chan is full")
	ERR_NO_ROUTE_FOUND  = errors.New("no route found")
	ERR_RPC_TIMEOUT     = errors.New("rpc call timeout")
	ERR_CGI_TIMEOUT     = errors.New("cgi call timeout")
	ERR_RPC_INTERRUPTED = errors.New("rpc call has been interrupted, see log context for more details.")
	ERR_CGI_INTERRUPTED = errors.New("cgi call has been interrupted, see log context for more details.")
)
View Source
var (
	GWrap   *Wrap   // 服务封装器
	GServer IServer // 全局服务
)
View Source
var CGIROUTEMAP map[int]*CgiRoute // cgi路由
View Source
var (
	CslClt *consulapi.Client // Consul连接

)
View Source
var MSGROUTEMAP map[int]*MsgRoute // msg路由
View Source
var (
	MainTID int64 = -1 // 主线程ID
)
View Source
var RPCROUTEMAP map[int]*RpcRoute // rpc路由

Functions

func BackupLan

func BackupLan() int

线路备份

func ClearInterval

func ClearInterval(id int, tid ...int64)

取消间歇调用(务必在逻辑线程中调用或指定线程ID)

id: 定时器ID
tid: 线程ID

func ClearTimeout

func ClearTimeout(id int, tid ...int64)

取消超时调用(务必在逻辑线程中调用或指定线程ID)

id: 定时器ID
tid: 线程ID

func CloseLan

func CloseLan()

关闭线路

func MonitorLan

func MonitorLan()

监控线路

func NotifyCgi

func NotifyCgi(id int, creq *xproto.CgiReq, cresp *xproto.CgiResp) bool

广播Cgi消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
creq: 消息请求
cresp: 消息响应

func NotifyEvt

func NotifyEvt(id int, param interface{}) bool

广播Evt消息(用于服务器内部)(全局)

id: 消息ID
param: 透传参数

func NotifyMsg

func NotifyMsg(id int, mreq *xproto.MsgReq) bool

广播Msg消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
mreq: 消息对象

func NotifyRpc

func NotifyRpc(id int, rreq *xproto.RpcReq, rresp *xproto.RpcResp) bool

广播Rpc消息(用于服务器之间交互)(全局)

id: 消息ID
rreq: 消息请求
rresp: 消息响应

func PauseLan

func PauseLan()

暂停线路

func PostKV

func PostKV(key string, value string, version string, block ...bool) bool

推送KV(键值对)至Consul Storage

key: 键
value: 值
version: 版本号
block-是否阻塞

func PullKV

func PullKV(key string) []byte

从Consul Storage中拉取KV(键值对)(阻塞)

key: 键

func RecvLan

func RecvLan()

线路接收

func RegCgi

func RegCgi(id int, fun func(creq *xproto.CgiReq, cresp *xproto.CgiResp)) int

注册Cgi消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
fun: 消息回调

func RegCgiRoute

func RegCgiRoute(_map map[int]*CgiRoute)

注册Cgi路由

func RegEvt

func RegEvt(id int, fun func(param interface{})) int

注册Evt消息(用于服务器内部)(全局)

id: 消息ID
fun: 消息回调

func RegMsg

func RegMsg(id int, fun func(*xproto.MsgReq)) int

注册Msg消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
fun: 消息回调

func RegMsgRoute

func RegMsgRoute(_map map[int]*MsgRoute)

注册Msg路由

func RegRpc

func RegRpc(id int, fun func(rreq *xproto.RpcReq, rresp *xproto.RpcResp)) int

注册Rpc消息(用于服务器之间交互)(全局)

id: 消息ID
fun: 消息回调

func RegRpcRoute

func RegRpcRoute(_map map[int]*RpcRoute)

注册Rpc路由

func RestoreLan

func RestoreLan()

线路恢复

func ResumeLan

func ResumeLan()

恢复线路

func RunIn

func RunIn(tid int64, fun func(), tag ...string) chan bool

在指定逻辑线程中调用(返回的chan可用于阻塞当前线程)

tid: 线程ID
fun: 回调函数
tag: 日志标签

func RunInMain

func RunInMain(fun func(), tag ...string) chan bool

在逻辑主线程中调用(返回的chan可用于阻塞当前线程)

fun: 回调函数
tag: 日志标签

func SendAsync

func SendAsync(id int, uid int, req proto.Message, addr string, callback func(frame *xproto.RpcResp, err error), offsetAndTimeout ...int)

发送Rpc消息(异步)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
addr: 目标服务器
callback: 回调函数
offset: 目标协程ID偏移(基于protocol中定义)
timeout: 超时时长

func SendCgi

func SendCgi(id int, uid int, req *http.Request, addr string, timeout ...int) (cresp *xproto.CgiResp, err error)

发送Cgi消息(同步,否则ResponseWriter无法输出)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
addr: 目标服务器
timeout: 超时时长

func SendFrame

func SendFrame(frame xproto.IFrame) bool

发送网络帧(根据UID负载均衡)

frame: 网络帧

func SendMsg

func SendMsg(id int, msg proto.Message, mreq *xproto.MsgReq) bool

发送Msg消息

id: 消息ID
msg: 结构体
mreq: msg帧

func SendSync

func SendSync(id int, uid int, req proto.Message, resp proto.Message, addr string, offsetAndTimeout ...int) error

发送Rpc消息(同步)

id: 消息ID
uid: 用户ID(负载均衡)
req: 请求结构体
resp: 返回结构体
addr: 目标服务器
offset: 目标协程偏移(基于protocol中定义)
timeout: 超时时长

func Start

func Start(server IServer)

启动

server: 服务对象

func StartLan

func StartLan(lanCfg *LanCfg, handleMsg func(*xproto.MsgReq),
	handleRpc func(*xproto.RpcReq, *xproto.RpcResp),
	handleCgi func(*xproto.CgiReq, *xproto.CgiResp))

启动线路

lancfg: 线路配置
handleMsg: 消息处理函数

func Stop

func Stop()

停止

func SubKV

func SubKV(key string, interval int, onUpdate func(data []byte))

订阅Consul Storage中的KV(键值对)(阻塞)

key: 键(注意订阅的Key需要设置版本)
interval: 间歇时间

func UnregCgi

func UnregCgi(id int, hid int) bool

注销Cgi消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func UnregEvt

func UnregEvt(id int, hid int) bool

注销Evt消息(用于服务器内部)(全局)

id: 消息ID
hid: 句柄ID

func UnregMsg

func UnregMsg(id int, hid int) bool

注销Msg消息(用于客户端和服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func UnregRpc

func UnregRpc(id int, hid int) bool

注销Rpc消息(用于服务器之间交互)(全局)

id: 消息ID
hid: 句柄ID

func WatchSignal

func WatchSignal() <-chan string

Types

type CgiFunc

type CgiFunc func(creq *xproto.CgiReq, cresp *xproto.CgiResp)

Cgi函数类型

func (CgiFunc) Handle

func (this CgiFunc) Handle(reply *xevt.EvtReply, req interface{}, resp interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type CgiRoute

type CgiRoute struct {
	Route
	Method  []string // 请求方式
	Timeout int      // 超时时间
}

Cgi路由

type CslCfg

type CslCfg struct {
	Ns       string `json:"ns"`                   // 命名空间(必要)
	Addr     string `json:"addr"`                 // 注册地址(必要)
	Logout   int    `json:"logout" default:"60"`  // 超时注销(可选,默认:60,开发阶段应设置较大值避免注销)
	Timeout  int    `json:"timeout" default:"5"`  // 检测超时(可选,默认:5,开发阶段应设置较大值避免注销)
	Interval int    `json:"interval" default:"5"` // 检测间隔(可选,默认:5,开发阶段应设置较大值避免注销)
	Fetch    int    `json:"fetch" default:"5"`    // 拉取间隔(可选,默认:5)
}

中心配置

type EvtFunc

type EvtFunc func(param interface{})

Evt函数类型

func (EvtFunc) Handle

func (this EvtFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type IServer

type IServer interface {
	Init(cfg *SvrCfg) bool                              // 初始化
	Start()                                             // 服务启动
	Update(delta float32)                               // 服务循环
	Destroy()                                           // 服务结束
	PreQuit()                                           // 服务即将退出
	Name() string                                       // 服务名称
	GetEnv() string                                     // 服务环境
	GetCfg() *SvrCfg                                    // 获取配置
	GetFPS() int                                        // 获取帧率
	SetTitle() string                                   // 更新标题
	GetTitle() string                                   // 获取标题
	RecvMsg(mreq *xproto.MsgReq)                        // 接收Msg消息
	RecvRpc(rreq *xproto.RpcReq, rresp *xproto.RpcResp) // 接收Rpc消息
	RecvCgi(creq *xproto.CgiReq, cresp *xproto.CgiResp) // 接收Cgi消息
}

服务接口

type LanCfg

type LanCfg struct {
	Name  string   `json:"name"`                 // 线路名称(必要)
	Addr  string   `json:"addr"`                 // 线路地址(可选:固定地址/动态分配,开发阶段应固定端口)
	Ctrl  string   `json:"ctrl"`                 // 运维地址(可选:固定地址/动态分配)
	GO    int      `json:"go" default:"1"`       // 逻辑线程数(可选,默认:1)
	MaxRx int      `json:"maxrx" default:"4096"` // 最大接收字节数(可选,单位:KB,默认:4096)
	Msg   string   `json:"msg" default:"pb"`     // msg消息协议类型(可选:pb/json,默认:pb)
	Cgi   string   `json:"cgi" default:"json"`   // cgi消息协议类型(可选:pb/json,默认:json)
	Link  []string `json:"link"`                 // 关联线路(可选)
}

线路配置

func (*LanCfg) ID added in v1.0.1

func (this *LanCfg) ID() string

线路ID(name@ip:port)

func (*LanCfg) IP

func (this *LanCfg) IP() string

线路IP

func (*LanCfg) Port

func (this *LanCfg) Port() int

线路端口

func (*LanCfg) Resolve added in v1.0.1

func (this *LanCfg) Resolve() (id string, ip string, port int)

线路解析

id: 线路ID(name@ip:port)
ip: 线路IP
port: 线路端口

type LanClt

type LanClt struct {
	Sockets []mangos.Socket // Socket连接池
	ID      string          // 线路ID
	Addr    string          // 线路地址
}

线路连接

func NewLanClt

func NewLanClt(id string, addr string) *LanClt

新建线路连接

id: 线路ID
addr: 线路地址

func (*LanClt) Close

func (this *LanClt) Close()

关闭连接

func (*LanClt) Send

func (this *LanClt) Send(bytes []byte, idx int) error

发送数据

bytes: 数据
idx: 连接索引

type LanSvr

type LanSvr struct {
	*LanCfg
	mangos.Socket
	Clients  sync.Map // 连接池(map[string][]*LanClt)
	ClientID sync.Map // 连接映射(map[string]*LanClt)
	SClosed  bool     // 是否关闭
}

线路服务

var (
	GLan  *LanSvr // 全局线路服务
	GProc []*Proc // 全局业务处理器
)

func NewLanSvr

func NewLanSvr(cfg *LanCfg) *LanSvr

新建线路服务

cfg: 线路配置

func (*LanSvr) Close

func (this *LanSvr) Close()

线路关闭

func (*LanSvr) Recv

func (this *LanSvr) Recv() ([]byte, error)

线路接收

func (*LanSvr) SelectAll

func (this *LanSvr) SelectAll(svr string) []*LanClt

选择所有指定类型的线路

svr: 服务类型

func (*LanSvr) SelectRand

func (this *LanSvr) SelectRand(svr string) *LanClt

随机选择指定类型的线路

svr: 服务类型

func (*LanSvr) SendData

func (this *LanSvr) SendData(svr string, bytes []byte, idx int) error

发送数据

svr: 服务类型
bytes: 数据
idx: 连接索引

func (*LanSvr) Update

func (this *LanSvr) Update(smap map[string][]string)

路由更新

smap: 路由表

type MsgFunc

type MsgFunc func(mreq *xproto.MsgReq)

Msg函数类型

func (MsgFunc) Handle

func (this MsgFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type MsgRoute

type MsgRoute struct {
	Route
}

Msg路由

type Proc

type Proc struct {
	TID   int64              // 线路的GoID
	Num   int                // 线路线程总数
	CIN   chan xproto.IFrame // 输入队列
	COUT  chan xproto.IFrame // 输出队列
	Loop  bool               // 循环标识
	Pause bool               // 暂停标识
	Resp  sync.Map           // map[int64]chan *xproto.RpcReq/*xproto.CgiFrame

}

业务处理器

func NewProc

func NewProc() *Proc

新建业务处理器

func (*Proc) MaxID

func (this *Proc) MaxID() int64

自增ID

func (*Proc) PopCIN

func (this *Proc) PopCIN() (xproto.IFrame, bool)

弹出第一个输入网络帧

func (*Proc) PushCIN

func (this *Proc) PushCIN(frame xproto.IFrame) bool

压入一个输入网络帧

frame: 网络帧

type Route

type Route struct {
	ID   int      // 路由ID
	Name string   // 路由名称
	GoL  int      // 协程ID(左)
	GoR  int      //协程ID(右)
	RW   bool     // 可读可写(默认true)
	Log  int      // 日志层级(参考xlog的LogLevel)
	Dst  []string // 目标
}

路由信息

func (*Route) GetLog

func (this *Route) GetLog() int

获取日志层级,若未指定则使用全局日志层级

type RpcFunc

type RpcFunc func(rreq *xproto.RpcReq, rresp *xproto.RpcResp)

Rpc函数类型

func (RpcFunc) Handle

func (this RpcFunc) Handle(reply *xevt.EvtReply, param1 interface{}, param2 interface{})

处理回调

reply: 响应对象
param1: 参数1
param2: 参数2

type RpcRoute

type RpcRoute struct {
	Route
}

Rpc路由

type Server

type Server struct {
	xobj.OBJECT
	REAL  IServer
	Cfg   *SvrCfg // 配置信息
	FPS   int     // 应用帧率
	Title string  // 应用标题
}

服务对象

func (*Server) CTOR

func (this *Server) CTOR(CHILD interface{})

构造函数

func (*Server) Destroy

func (this *Server) Destroy()

服务结束

func (*Server) GetCfg added in v1.0.1

func (this *Server) GetCfg() *SvrCfg

获取配置

func (*Server) GetEnv added in v1.0.1

func (this *Server) GetEnv() string

服务环境

func (*Server) GetFPS

func (this *Server) GetFPS() int

获取帧率

func (*Server) GetTitle

func (this *Server) GetTitle() string

获取标题

func (*Server) Init

func (_this *Server) Init(cfg *SvrCfg) bool

初始化

func (*Server) Name

func (this *Server) Name() string

服务名称

func (*Server) PreQuit

func (_this *Server) PreQuit()

func (*Server) RecvCgi

func (this *Server) RecvCgi(rreq *xproto.CgiReq, rresp *xproto.CgiResp)

接收Cgi消息

func (*Server) RecvMsg

func (this *Server) RecvMsg(mreq *xproto.MsgReq)

func (*Server) RecvRpc

func (this *Server) RecvRpc(rreq *xproto.RpcReq, rresp *xproto.RpcResp)

接收Rpc消息

func (*Server) SetTitle added in v1.0.1

func (this *Server) SetTitle() string

func (*Server) Start

func (this *Server) Start()

服务启动

func (*Server) Update

func (this *Server) Update(delta float32)

type SvrCfg

type SvrCfg struct {
	Raw   []byte                    `json:"-"`               // 原始数据
	Env   string                    `json:"env"`             // 环境标识(必要:dev[开发]/test[内测]/beta[公测]/live[线上])
	Csl   *CslCfg                   `json:"csl"`             // 中心配置(必要)
	Lan   *LanCfg                   `json:"lan"`             // 线路配置(必要)
	Log   map[string]*xlog.LogCfg   `json:"log"`             // 日志配置(必要)
	Redis *xorm.RedisCfg            `json:"redis,omitempty"` // Redis配置(可选)
	Mysql map[string]*xorm.MysqlCfg `json:"mysql,omitempty"` // Mysql配置(可选)
}

服务配置

var (
	GCfg *SvrCfg // 全局配置
)

func (*SvrCfg) Init

func (this *SvrCfg) Init() bool

初始化

type TimerEntity

type TimerEntity struct {
	ID      int         // 定时器ID
	Func    func()      // 定时器回调
	Time    int         // 定时时间
	RawTime int         // 初始时间
	Repeat  bool        // 循环调用
	Crash   bool        // 是否崩溃
	RW      bool        // 是否读写
	Tag     interface{} // 日志标签
	Log     int         // 日志层级
}

定时器对象

func RunInNext

func RunInNext(fun func()) *TimerEntity

在当前逻辑线程中的下一帧调用

fun: 回调函数

func SetInterval

func SetInterval(fun func(), interval float32, tid ...int64) *TimerEntity

设置间歇调用(务必在逻辑线程中调用或指定线程ID)

fun: 回调函数
interval: 间歇时间(秒)
tid: 线程ID

func SetTimeout

func SetTimeout(fun func(), timeout float32, tid ...int64) *TimerEntity

设置超时调用(务必在逻辑线程中调用或指定线程ID)

fun: 回调函数
timeout: 超时时间(秒)
tid: 线程ID

func (*TimerEntity) SetLog

func (this *TimerEntity) SetLog(log int) *TimerEntity

设置会话的日志层级

func (*TimerEntity) SetRW

func (this *TimerEntity) SetRW(sig bool) *TimerEntity

设置会话的可读性(默认为可读可写)

func (*TimerEntity) SetTag

func (this *TimerEntity) SetTag(tag interface{}) *TimerEntity

设置会话的标签

type TimerRecord

type TimerRecord struct {
	Timers   sync.Map // 定时器映射
	TimerID  int64    // 定时器ID
	LastTime int      // 上次时间
}

定时器句柄

func (*TimerRecord) MaxID

func (this *TimerRecord) MaxID() int64

自增ID

type Wrap

type Wrap struct {
	Svr    IServer
	ChQuit chan bool // 阻塞chan
}

服务封装器

func NewWrap

func NewWrap(server IServer) *Wrap

新建服务封装器

server: 服务对象

func (*Wrap) Destroy

func (this *Wrap) Destroy()

销毁

func (*Wrap) Init

func (this *Wrap) Init() bool

初始化

func (*Wrap) Run

func (this *Wrap) Run()

运行

func (*Wrap) Stop

func (this *Wrap) Stop()

停止

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL