origin

module
v2.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: Apache-2.0

README

origin 游戏服务器引擎简介

origin 是一个由 Go 语言(golang)编写的分布式开源游戏服务器引擎。origin适用于各类游戏服务器的开发,包括 H5(HTML5)游戏服务器。

origin 解决的问题:

  • origin总体设计如go语言设计一样,总是尽可能的提供简洁和易用的模式,快速开发。
  • 能够根据业务需求快速并灵活的制定服务器架构。
  • 利用多核优势,将不同的service配置到不同的node,并能高效的协同工作。
  • 将整个引擎抽象三大对象,node,service,module。通过统一的组合模型管理游戏中各功能模块的关系。
  • 有丰富并健壮的工具库。

Hello world!

下面我们来一步步的建立origin服务器,先下载origin引擎,或者使用如下命令:

go get -v -u  github.com/duanhf2012/origin/v2

建议使用origin v2版本(import引用中都加入v2)

README.md 于是下载到GOPATH环境目录中,在src中加入main.go,内容如下:

package main

import (
    "github.com/duanhf2012/origin/v2/node"
)

func main() {
    node.Start()
}

以上只是基础代码,具体运行参数和配置请参照第一章节。

一个origin进程需要创建一个node对象,Start开始运行。您也可以直接下载origin引擎示例:

go get -v -u github.com/duanhf2012/originserver_v2

本文所有的说明都是基于该示例为主。

origin引擎三大对象关系

  • Node: 可以认为每一个Node代表着一个origin进程
  • Service:一个独立的服务可以认为是一个大的功能模块,他是Node的子集,创建完成并安装Node对象中。服务可以支持对外部RPC等功能。
  • Module: 这是origin最小对象单元,强烈建议所有的业务模块都划分成各个小的Module组合,origin引擎将监控所有服务与Module运行状态,例如可以监控它们的慢处理和死循环函数。Module可以建立树状关系。Service本身也是Module的类型。

origin配置说明

origin的配置文件以json格式,主要包含Discovery、RpcMode、NodeList、Service部分,具体格式如下:

{
    "Discovery":{},
    "RpcMode":{},
    "NodeList":[],
    "Service":{},
    "Global": {}
}

Discovery部分

origin目前支持etcd与origin自带的服务发现类型。

Etcd方式示例:

  "Discovery": {
    "Etcd": {
      "TTLSecond": 10,
      "DialTimeoutMillisecond": 3000,
      "EtcdList": [
        {
          "NetworkName": ["network1"],
          "Endpoints": ["http://192.168.13.24:12379"]
        }
      ]
    }
  }

TTLSecond:表示健康检查TTL失效时间10秒

DialTimeoutMillisecond: 与etcd连接超时时间

EtcdList:Etcd列表,可以多个Etcd服务器连接

NetworkName:所在的网络名称,可以配置多个。node会往对应的网络名称中注册、监听发现Service。NetworkName也起到发现隔离的作用。

Endpoints:Etcd服务器地址

Origin方式示例:

{
  "Discovery": {
    "Origin":{
      "TTLSecond": 10,
      "MasterNodeList": [
        {
          "NodeId": "test_1",
          "ListenAddr": "127.0.0.1:8801"
        }
      ]
    }
  }
}

TTLSecond:表示健康检查TTL失效时间10秒

MasterNodeList:指定哪些Node为服务发现Master结点,需要配置NodeId与ListenAddr,注意它们要与实际的Node配置一致。

RpcMode部分

默认模式

{
  "RpcMode":{
      "Type": "Default"
  }
}

默认模式下,origin的node之前通过tcp连接组网。

Nats模式

{
  "RpcMode":{
      "Type": "Nats",
      "remark": "support Default or Nats",
      "Nats": {
          "NatsUrl":"127.0.0.1:4222",
          "NoRandomize": true
      }
  }
}

NatsUrl:Nats连接url串

NoRandomize:在多连接集群模式下,连接nats节点是否顺序策略。false表示随机连接,true表示顺序连接。

NodeList部分

{
    "NodeList":[
        {
          "NodeId": "node_1",
          "Private": false,
          "ListenAddr":"127.0.0.1:8001",
          "MaxRpcParamLen": 409600,
          "CompressBytesLen": 20480,
          "remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
          "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","_TcpService","HttpService","WSService"]
        },
        {
          "NodeId": "node_2",
          "Private": false,
          "ListenAddr":"127.0.0.1:8002",
          "MaxRpcParamLen": 409600,
          "CompressBytesLen": 20480,
          "remark":"//以_打头的,表示只在本机进程,不对整个子网公开",
          "ServiceList": ["TestService1","TestService2","TestServiceCall","GateService","TcpService","HttpService","WSService"]
        }
    ]

以上配置了两个结点服务器程序:

  • NodeId: 表示origin程序的结点Id标识,同一个服务发现网络中不允许重复。
  • Private: 是否私有结点,如果为true,表示其他结点不会发现它,但可以自我运行。
  • ListenAddr:Rpc通信服务的监听地址
  • MaxRpcParamLen:Rpc参数数据包最大长度,该参数可以缺省,默认一次Rpc调用支持最大4294967295byte长度数据。
  • CompressBytesLen:Rpc网络数据压缩,当数据>=20480byte时将被压缩。该参数可以缺省或者填0时不进行压缩。
  • remark:备注,可选项
  • ServiceList:该Node拥有的服务列表,注意:origin按配置的顺序进行安装初始化。但停止服务的顺序是相反。

在启动程序命令originserver -start nodeid="node_1"中nodeid就是根据该配置装载服务。 更多参数使用,请使用originserver -help查看。

Service 部分

service.json如下:

{
  "Service":{
      "HttpService":{
        "ListenAddr":"0.0.0.0:9402",
        "ReadTimeout":10000,
        "WriteTimeout":10000,
        "ProcessTimeout":10000,
        "ManualStart": false,
        "CAFile":[
        {
          "Certfile":"",
            "Keyfile":""
        }]
    
      },
      "TcpService":{
        "ListenAddr":"0.0.0.0:9030",
        "MaxConnNum":3000,
        "PendingWriteNum":10000,
        "LittleEndian":false,
        "MinMsgLen":4,
        "MaxMsgLen":65535
      },
      "WSService":{
        "ListenAddr":"0.0.0.0:9031",
        "MaxConnNum":3000,
        "PendingWriteNum":10000,
        "MaxMsgLen":65535
      }  
  },
  "NodeService":[
   {
      "NodeId":1,
      "TcpService":{
        "ListenAddr":"0.0.0.0:9830",
        "MaxConnNum":3000,
        "PendingWriteNum":10000,
        "LittleEndian":false,
        "MinMsgLen":4,
        "MaxMsgLen":65535
      },
      "WSService":{
        "ListenAddr":"0.0.0.0:9031",
        "MaxConnNum":3000,
        "PendingWriteNum":10000,
        "MaxMsgLen":65535
      }  
   },
   {
      "NodeId":2,
      "TcpService":{
        "ListenAddr":"0.0.0.0:9030",
        "MaxConnNum":3000,
        "PendingWriteNum":10000,
        "LittleEndian":false,
        "MinMsgLen":4,
        "MaxMsgLen":65535
      },
      "WSService":{
        "ListenAddr":"0.0.0.0:9031",
        "MaxConnNum":3000,
        "PendingWriteNum":10000,
        "MaxMsgLen":65535
      }  
   }
  ]
 
}

以上配置分为两个部分:Global,Service与NodeService。Global是全局配置,在任何服务中都可以通过cluster.GetCluster().GetGlobalCfg()获取,NodeService中配置的对应结点中服务的配置,如果启动程序中根据nodeid查找该域的对应的服务,如果找不到时,从Service公共部分查找。

HttpService配置

  • ListenAddr:Http监听地址
  • ReadTimeout:读网络超时毫秒
  • WriteTimeout:写网络超时毫秒
  • ProcessTimeout: 处理超时毫秒
  • ManualStart: 是否手动控制开始监听,如果true,需要手动调用StartListen()函数
  • CAFile: 证书文件,如果您的服务器通过web服务器代理配置https可以忽略该配置

TcpService配置

  • ListenAddr: 监听地址
  • MaxConnNum: 允许最大连接数
  • PendingWriteNum:发送网络队列最大数量
  • LittleEndian:是否小端
  • MinMsgLen:包最小长度
  • MaxMsgLen:包最大长度

WSService配置

  • ListenAddr: 监听地址
  • MaxConnNum: 允许最大连接数
  • PendingWriteNum:发送网络队列最大数量
  • MaxMsgLen:包最大长度

Global部分

{
  "Global": {
    "AreaId": 1
  }
}

这部分,在所有的服务中都可以类似以下代码获取:

globalCfg := cluster.GetCluster().GetGlobalCfg()
mapGlobal, ok := globalCfg.(map[string]interface{})
if ok == false {
    return fmt.Errorf("Canot find Global from config.")
}

areaId, ok := mapGlobal["AreaId"]

第一章:origin基础:

查看github.com/duanhf2012/originserver_v2中的simple_service中新建两个服务,分别是TestService1.go与CTestService2.go。

simple_service/TestService1.go如下:

package simple_service

import (
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
)

//模块加载时自动安装TestService1服务
func init(){
    node.Setup(&TestService1{})
}

//新建自定义服务TestService1
type TestService1 struct {
    //所有的自定义服务必需加入service.Service基服务
    //那么该自定义服务将有各种功能特性
    //例如: Rpc,事件驱动,定时器等
    service.Service
}

//服务初始化函数,在安装服务时,服务将自动调用OnInit函数
func (slf *TestService1) OnInit() error {
    return nil
}


simple_service/TestService2.go如下:

import (
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
)

func init(){
    node.Setup(&TestService2{})
}

type TestService2 struct {
    service.Service
}

func (slf *TestService2) OnInit() error {
    return nil
}


  • main.go运行代码
package main

import (
    "github.com/duanhf2012/origin/v2/node"
    //导入simple_service模块
    _"orginserver/simple_service"
)

func main(){
    node.Start()
}

  • config/cluster/cluster.json如下:
{
    "NodeList":[
        {
          "NodeId": "nodeid_1",
          "Private": false,
          "ListenAddr":"127.0.0.1:8001",
          "remark":"//以_打头的,表示只在本机进程,不对整个子网开发",
          "ServiceList": ["TestService1","TestService2"]
        }
    ]
}

编译后运行结果如下:

#originserver -start nodeid="nodeid_1"
TestService1 OnInit.
TestService2 OnInit.

第二章:Service中常用功能:

定时器:

在开发中最常用的功能有定时任务,origin提供两种定时方式:

一种AfterFunc函数,可以间隔一定时间触发回调,参照simple_service/TestService2.go,实现如下:

func (slf *TestService2) OnInit() error {
    fmt.Printf("TestService2 OnInit.\n")
    slf.AfterFunc(time.Second*1,slf.OnSecondTick)
    return nil
}

func (slf *TestService2) OnSecondTick(){
    fmt.Printf("tick.\n")
    slf.AfterFunc(time.Second*1,slf.OnSecondTick)
}

此时日志可以看到每隔1秒钟会print一次"tick.",如果下次还需要触发,需要重新设置定时器

另一种方式是类似Linux系统的crontab命令,使用如下:


func (slf *TestService2) OnInit() error {
    fmt.Printf("TestService2 OnInit.\n")

    //crontab模式定时触发
    //NewCronExpr的参数分别代表:Seconds Minutes Hours DayOfMonth Month DayOfWeek
    //以下为每换分钟时触发
    cron,_:=timer.NewCronExpr("0 * * * * *")
    slf.CronFunc(cron,slf.OnCron)
    return nil
}


func (slf *TestService2) OnCron(cron *timer.Cron){
    fmt.Printf(":A minute passed!\n")
}

以上运行结果每换分钟时打印:A minute passed!

打开多协程模式:

在origin引擎设计中,所有的服务是单协程模式,这样在编写逻辑代码时,不用考虑线程安全问题。极大的减少开发难度,但某些开发场景下不用考虑这个问题,而且需要并发执行的情况,比如,某服务只处理数据库操作控制,而数据库处理中发生阻塞等待的问题,因为一个协程,该服务接受的数据库操作只能是一个 一个的排队处理,效率过低。于是可以打开此模式指定处理协程数,代码如下:

func (slf *TestService1) OnInit() error {
    fmt.Printf("TestService1 OnInit.\n")

    //打开多线程处理模式,10个协程并发处理
    slf.SetGoRoutineNum(10)
    return nil
}

性能监控功能:

我们在开发一个大型的系统时,经常由于一些代码质量的原因,产生处理过慢或者死循环的产生,该功能可以被监测到。使用方法如下:

func (slf *TestService1) OnInit() error {
    fmt.Printf("TestService1 OnInit.\n")
    //打开性能分析工具
    slf.OpenProfiler()
    //监控超过1秒的慢处理
    slf.GetProfiler().SetOverTime(time.Second*1)
    //监控超过10秒的超慢处理,您可以用它来定位是否存在死循环
    //比如以下设置10秒,我的应用中是不会发生超过10秒的一次函数调用
    //所以设置为10秒。
    slf.GetProfiler().SetMaxOverTime(time.Second*10)

    slf.AfterFunc(time.Second*2,slf.Loop)
    //打开多线程处理模式,10个协程并发处理
    //slf.SetGoRoutineNum(10)
    return nil
}

func (slf *TestService1) Loop(){
    for {
        time.Sleep(time.Second*1)
    }
}


func main(){
    //打开性能分析报告功能,并设置10秒汇报一次
    node.OpenProfilerReport(time.Second*10)
    node.Start()
}

上面通过GetProfiler().SetOverTime与slf.GetProfiler().SetMaxOverTimer设置监控时间 并在main.go中,打开了性能报告器,以每10秒汇报一次,因为上面的例子中,定时器是有死循环,所以可以得到以下报告:

2020/04/22 17:53:30 profiler.go:179: [release] Profiler report tag TestService1: process count 0,take time 0 Milliseconds,average 0 Milliseconds/per. too slow process:Timer_orginserver/simple_service.(*TestService1).Loop-fm is take 38003 Milliseconds 直接帮助找到TestService1服务中的Loop函数

结点连接和断开事件监听:

在有些业务中需要关注某结点是否断开连接,可以注册回调如下:

func (ts *TestService) OnInit() error{
    ts.RegRpcListener(ts)

    return nil
}

func (ts *TestService) OnNodeConnected(nodeId int){
}

func (ts *TestService) OnNodeDisconnect(nodeId int){
}

第三章:Module使用:

Module创建与销毁:

可以认为Service就是一种Module,它有Module所有的功能。在示例代码中可以参考originserver_v2/simple_module/TestService3.go。

package simple_module

import (
    "fmt"
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
)

func init(){
    node.Setup(&TestService3{})
}

type TestService3 struct {
    service.Service
}

type Module1 struct {
    service.Module
}

type Module2 struct {
    service.Module
}

func (slf *Module1) OnInit()error{
    fmt.Printf("Module1 OnInit.\n")
    return nil
}

func (slf *Module1) OnRelease(){
    fmt.Printf("Module1 Release.\n")
}

func (slf *Module2) OnInit()error{
    fmt.Printf("Module2 OnInit.\n")
    return nil
}

func (slf *Module2) OnRelease(){
    fmt.Printf("Module2 Release.\n")
}


func (slf *TestService3) OnInit() error {
    //新建两个Module对象
    module1 := &Module1{}
    module2 := &Module2{}
    //将module1添加到服务中
    module1Id,_ := slf.AddModule(module1)
    //在module1中添加module2模块
    module1.AddModule(module2)
    fmt.Printf("module1 id is %d, module2 id is %d",module1Id,module2.GetModuleId())

    //释放模块module1
    slf.ReleaseModule(module1Id)
    fmt.Printf("xxxxxxxxxxx")
    return nil
}

在OnInit中创建了一条线型的模块关系TestService3->module1->module2,调用AddModule后会返回Module的Id,自动生成的Id从10e17开始,内部的id,您可以自己设置Id。当调用ReleaseModule释放时module1时,同样会将module2释放。会自动调用OnRelease函数,日志顺序如下:

Module1 OnInit.
Module2 OnInit.
module1 id is 100000000000000001, module2 id is 100000000000000002
Module2 Release.
Module1 Release.

在Module中同样可以使用定时器功能,请参照第二章节的定时器部分。

第四章:事件使用

事件是origin中一个重要的组成部分,可以在同一个node中的service与service或者与module之间进行事件通知。系统内置的几个服务,如:TcpService/HttpService等都是通过事件功能实现。他也是一个典型的观察者设计模型。在event中有两个类型的interface,一个是event.IEventProcessor它提供注册与卸载功能,另一个是event.IEventHandler提供消息广播等功能。

在目录simple_event/TestService4.go中

package simple_event

import (
    "github.com/duanhf2012/origin/v2/event"
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
    "time"
)

const (
    //自定义事件类型,必需从event.Sys_Event_User_Define开始
    //event.Sys_Event_User_Define以内给系统预留
    EVENT1 event.EventType =event.Sys_Event_User_Define+1
)

func init(){
    node.Setup(&TestService4{})
}

type TestService4 struct {
    service.Service
}

func (slf *TestService4) OnInit() error {
    //10秒后触发广播事件
    slf.AfterFunc(time.Second*10,slf.TriggerEvent)
    return nil
}

func (slf *TestService4) TriggerEvent(){
    //广播事件,传入event.Event对象,类型为EVENT1,Data可以自定义任何数据
    //这样,所有监听者都可以收到该事件
    slf.GetEventHandler().NotifyEvent(&event.Event{
        Type: EVENT1,
        Data: "event data.",
    })
}


在目录simple_event/TestService5.go中

package simple_event

import (
    "fmt"
    "github.com/duanhf2012/origin/v2/event"
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
)

func init(){
    node.Setup(&TestService5{})
}

type TestService5 struct {
    service.Service
}

type TestModule struct {
    service.Module
}

func (slf *TestModule) OnInit() error{
    //在当前node中查找TestService4
    pService := node.GetService("TestService4")

    //在TestModule中,往TestService4中注册EVENT1类型事件监听
    pService.(*TestService4).GetEventProcessor().RegEventReciverFunc(EVENT1,slf.GetEventHandler(),slf.OnModuleEvent)
    return nil
}

func (slf *TestModule) OnModuleEvent(ev event.IEvent){
    event := ev.(*event.Event)
    fmt.Printf("OnModuleEvent type :%d data:%+v\n",event.GetEventType(),event.Data)
}


//服务初始化函数,在安装服务时,服务将自动调用OnInit函数
func (slf *TestService5) OnInit() error {
    //通过服务名获取服务对象
    pService := node.GetService("TestService4")

    ////在TestModule中,往TestService4中注册EVENT1类型事件监听
    pService.(*TestService4).GetEventProcessor().RegEventReciverFunc(EVENT1,slf.GetEventHandler(),slf.OnServiceEvent)
    slf.AddModule(&TestModule{})
    return nil
}

func (slf *TestService5) OnServiceEvent(ev event.IEvent){
    event := ev.(*event.Event)
    fmt.Printf("OnServiceEvent type :%d data:%+v\n",event.Type,event.Data)
}


程序运行10秒后,调用slf.TriggerEvent函数广播事件,于是在TestService5中会收到

OnServiceEvent type :1001 data:event data.
OnModuleEvent type :1001 data:event data.

在上面的TestModule中监听的事情,当这个Module被Release时监听会自动卸载。

第五章:RPC使用

RPC是service与service间通信的重要方式,它允许跨进程node互相访问,当然也可以指定nodeid进行调用。如下示例:

simple_rpc/TestService6.go文件如下:

package simple_rpc

import (
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
)

func init(){
    node.Setup(&TestService6{})
}

type TestService6 struct {
    service.Service
}

func (slf *TestService6) OnInit() error {
    return nil
}

type InputData struct {
    A int
    B int
}

// 注意RPC函数名的格式必需为RPC_FunctionName或者是RPCFunctionName,如下的RPC_Sum也可以写成RPCSum
func (slf *TestService6) RPC_Sum(input *InputData,output *int) error{
    *output = input.A+input.B
    return nil
}

simple_rpc/TestService7.go文件如下:

package simple_rpc

import (
    "fmt"
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
    "time"
)

func init(){
    node.Setup(&TestService7{})
}

type TestService7 struct {
    service.Service
}

func (slf *TestService7) OnInit() error {
    slf.AfterFunc(time.Second*2,slf.CallTest)
    slf.AfterFunc(time.Second*2,slf.AsyncCallTest)
    slf.AfterFunc(time.Second*2,slf.GoTest)
    return nil
}

func (slf *TestService7) CallTest(){
    var input InputData
    input.A = 300
    input.B = 600
    var output int

    //同步调用其他服务的rpc,input为传入的rpc,output为输出参数
    err := slf.Call("TestService6.RPC_Sum",&input,&output)
    if err != nil {
        fmt.Printf("Call error :%+v\n",err)
    }else{
        fmt.Printf("Call output %d\n",output)
    }
    
    
    //自定义超时,默认rpc超时时间为15s,以下设置1秒钟超过
    err = slf.CallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, &output)
    if err != nil {
        fmt.Printf("Call error :%+v\n", err)
    } else {
        fmt.Printf("Call output %d\n", output)
    }
}


func (slf *TestService7) AsyncCallTest(){
    var input InputData
    input.A = 300
    input.B = 600
    /*slf.AsyncCallNode(1,"TestService6.RPC_Sum",&input,func(output *int,err error){
    })*/
    //异步调用,在数据返回时,会回调传入函数
    //注意函数的第一个参数一定是RPC_Sum函数的第二个参数,err error为RPC_Sum返回值
    err := slf.AsyncCall("TestService6.RPC_Sum", &input, func(output *int, err error) {
        if err != nil {
            fmt.Printf("AsyncCall error :%+v\n", err)
        } else {
            fmt.Printf("AsyncCall output %d\n", *output)
        }
    })
    fmt.Println(err)

    //自定义超时,返回一个cancel函数,可以在业务需要时取消rpc调用
    rpcCancel, err := slf.AsyncCallWithTimeout(time.Second*1, "TestService6.RPC_Sum", &input, func(output *int, err error) {
        //如果下面注释的rpcCancel()函数被调用,这里可能将不再返回
        if err != nil {
            fmt.Printf("AsyncCall error :%+v\n", err)
        } else {
            fmt.Printf("AsyncCall output %d\n", *output)
        }
    })
    //rpcCancel()
    fmt.Println(err, rpcCancel)
    
}

func (slf *TestService7) GoTest(){
    var input InputData
    input.A = 300
    input.B = 600

    //在某些应用场景下不需要数据返回可以使用Go,它是不阻塞的,只需要填入输入参数
    err := slf.Go("TestService6.RPC_Sum",&input)
    if err != nil {
        fmt.Printf("Go error :%+v\n",err)
    }

    //以下是广播方式,如果在同一个子网中有多个同名的服务名,CastGo将会广播给所有的node
    //slf.CastGo("TestService6.RPC_Sum",&input)
}

您可以把TestService6配置到其他的Node中,比如NodeId为2中。只要在一个子网,origin引擎可以无差别调用。开发者只需要关注Service关系。同样它也是您服务器架构设计的核心需要思考的部分。

第六章:并发函数调用

在开发中经常会有将某些任务放到其他协程中并发执行,执行完成后,将服务的工作线程去回调。使用方式很简单,先打开该功能如下代码:

    //以下通过cpu数量来定开启协程并发数量,建议:(1)cpu密集型计算使用1.0  (2)i/o密集型使用2.0或者更高
    slf.OpenConcurrentByNumCPU(1.0)
    
    //以下通过函数打开并发协程数,以下协程数最小5,最大10,任务管道的cap数量1000000
    //origin会根据任务的数量在最小与最大协程数间动态伸缩
    //slf.OpenConcurrent(5, 10, 1000000)

使用示例如下:


func (slf *TestService13) testAsyncDo() {
    var context struct {
        data int64
    }

    //1.示例普通使用
    //参数一的函数在其他协程池中执行完成,将执行完成事件放入服务工作协程,
    //参数二的函数在服务协程中执行,是协程安全的。
    slf.AsyncDo(func() bool {
        //该函数回调在协程池中执行
        context.data = 100
        return true
    }, func(err error) {
        //函数将在服务协程中执行
        fmt.Print(context.data) //显示100
    })

    //2.示例按队列顺序
    //参数一传入队列Id,同一个队列Id将在协程池中被排队执行
    //以下进行两次调用,因为两次都传入参数queueId都为1,所以它们会都进入queueId为1的排队执行
    queueId := int64(1)
    for i := 0; i < 2; i++ {
        slf.AsyncDoByQueue(queueId, func() bool {
            //该函数会被2次调用,但是会排队执行
            return true
        }, func(err error) {
            //函数将在服务协程中执行
        })
    }

    //3.函数参数可以某中一个为空
    //参数二函数将被延迟执行
    slf.AsyncDo(nil, func(err error) {
        //将在下
    })

    //参数一函数在协程池中执行,但没有在服务协程中回调
    slf.AsyncDo(func() bool {
        return true
    }, nil)

    //4.函数返回值控制不进行回调
    slf.AsyncDo(func() bool {
        //返回false时,参数二函数将不会被执行; 为true时,则会被执行
        return false
    }, func(err error) {
        //该函数将不会被执行
    })
}

第七章:服务发现

origin引擎默认使用读取所有结点配置的进行确认结点有哪些Service。引擎也支持动态服务发现的方式,支持etcd与origin类型,具体请参照【配置说明】部分,Node结点可以配置只发现某些服务,如下示例:

{
    "NodeList": [{
        "NodeId": "nodeid_test",
        "ListenAddr": "127.0.0.1:8801",
        "MaxRpcParamLen": 409600,
        "Private": false,
        "remark": "//以_打头的,表示只在本机进程,不对整个子网开发",
        "ServiceList": ["_TestService1", "TestService9", "TestService10"],
        "DiscoveryService": [
            {
                "MasterNodeId": "nodeid_1",
                "NetworkName":"networkname1"
                "DiscoveryService": ["TestService8"]
            }
        ]
    }]
}

DiscoveryService:在当前nodeid为nodeid_test的结点中,只发现 MasterNodeId为nodeid_1或NetworkName为networkname1网络中的TestService8服务。

注意:MasterNodeId与NetworkName只配置一个,分别在模式为origin或者etcd服务发现类型时。

第八章:HttpService使用

HttpService是origin引擎中系统实现的http服务,http接口中常用的GET,POST以及url路由处理。

simple_http/TestHttpService.go文件如下:

package simple_http

import (
    "fmt"
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
    "github.com/duanhf2012/origin/v2/sysservice"
    "net/http"
)

func init(){
    node.Setup(&sysservice.HttpService{})
    node.Setup(&TestHttpService{})
}

//新建自定义服务TestService1
type TestHttpService struct {
    service.Service
}

func (slf *TestHttpService) OnInit() error {
    //获取系统httpservice服务
    httpservice := node.GetService("HttpService").(*sysservice.HttpService)

    //新建并设置路由对象
    httpRouter := sysservice.NewHttpHttpRouter()
    httpservice.SetHttpRouter(httpRouter,slf.GetEventHandler())

    //GET方法,请求url:http://127.0.0.1:9402/get/query?nickname=boyce
    //并header中新增key为uid,value为1000的头,则用postman测试返回结果为:
    //head uid:1000, nickname:boyce
    httpRouter.GET("/get/query", slf.HttpGet)

    //POST方法 请求url:http://127.0.0.1:9402/post/query
    //返回结果为:{"msg":"hello world"}
    httpRouter.POST("/post/query", slf.HttpPost)

    //GET方式获取目录下的资源,http://127.0.0.1:port/img/head/a.jpg
    httpRouter.SetServeFile(sysservice.METHOD_GET,"/img/head/","d:/img")

    //如果配置"ManualStart": true配置为true,则使用以下方法进行开启http监听
    //httpservice.StartListen()
    return nil
}

func (slf *TestHttpService) HttpGet(session *sysservice.HttpSession){
    //从头中获取key为uid对应的值
    uid := session.GetHeader("uid")
    //从url参数中获取key为nickname对应的值
    nickname,_ := session.Query("nickname")
    //向body部分写入数据
    session.Write([]byte(fmt.Sprintf("head uid:%s, nickname:%s",uid,nickname)))
    //写入http状态
    session.WriteStatusCode(http.StatusOK)
    //完成返回
    session.Done()
}

type HttpRespone struct {
    Msg string `json:"msg"`
}

func (slf *TestHttpService) HttpPost(session *sysservice.HttpSession){
    //也可以采用直接返回数据对象方式,如下:
    session.WriteJsonDone(http.StatusOK,&HttpRespone{Msg: "hello world"})
}

注意,要在main.go中加入import _ "orginserver/simple_service",并且在config/cluster/cluster.json中的ServiceList加入服务。

第九章:TcpService服务使用

TcpService是origin引擎中系统实现的Tcp服务,可以支持自定义消息格式处理器。只要重新实现network.Processor接口。目前内置已经实现最常用的protobuf处理器。

simple_tcp/TestTcpService.go文件如下:

package simple_tcp

import (
    "fmt"
    "github.com/duanhf2012/origin/v2/network/processor"
    "github.com/duanhf2012/origin/v2/node"
    "github.com/duanhf2012/origin/v2/service"
    "github.com/duanhf2012/origin/v2/sysservice"
    "github.com/golang/protobuf/proto"
    "orginserver/simple_tcp/msgpb"
)

func init(){
    node.Setup(&sysservice.TcpService{})
    node.Setup(&TestTcpService{})
}

//新建自定义服务TestService1
type TestTcpService struct {
    service.Service
    processor *processor.PBProcessor
    tcpService *sysservice.TcpService
}

func (slf *TestTcpService) OnInit() error {
    //获取安装好了的TcpService对象
    slf.tcpService =  node.GetService("TcpService").(*sysservice.TcpService)

    //新建内置的protobuf处理器,您也可以自定义路由器,比如json,后续会补充
    slf.processor = processor.NewPBProcessor()

    //注册监听客户连接断开事件
    slf.processor.RegisterDisConnected(slf.OnDisconnected)
    //注册监听客户连接事件
    slf.processor.RegisterConnected(slf.OnConnected)
    //注册监听消息类型MsgType_MsgReq,并注册回调
    slf.processor.Register(uint16(msgpb.MsgType_MsgReq),&msgpb.Req{},slf.OnRequest)
    //将protobuf消息处理器设置到TcpService服务中
    slf.tcpService.SetProcessor(slf.processor,slf.GetEventHandler())

    return nil
}


func (slf *TestTcpService) OnConnected(clientid string){
    fmt.Printf("client id %s connected\n",clientid)
}


func (slf *TestTcpService) OnDisconnected(clientid string){
    fmt.Printf("client id %s disconnected\n",clientid)
}

func (slf *TestTcpService) OnRequest (clientid string,msg proto.Message){
    //解析客户端发过来的数据
    pReq := msg.(*msgpb.Req)
    //发送数据给客户端
    err := slf.tcpService.SendMsg(clientid,&msgpb.Req{
        Msg: proto.String(pReq.GetMsg()),
    })
    if err != nil {
        fmt.Printf("send msg is fail %+v!",err)
    }
}

第十章:其他系统模块介绍

  • sysservice/wsservice/:支持了WebSocket协议,使用方法与TcpService类似
  • sysservice/messagequeueservice/:自定义的消息队列
  • sysservice/rankservice/:排行榜服务,采用跳表数据结构实现
  • sysmodule/mysqlmodule/:对mysql数据库操作
  • sysmodule/redismodule/:对Redis数据进行操作
  • sysmodule/httpclientmodule/:Http客户端请求封装
  • sysmodule/ginmodule/:对gin模块的封装,支持服务协程处理
  • sysmodule/kafkamodule/:对kafka的封装
  • log/log.go:日志的封装,可以使用它构建对象记录业务文件日志
  • util:在该目录下,有常用的uuid,hash,md5,协程封装等工具库
  • https://github.com/duanhf2012/originservice: 其他扩展支持的服务可以在该工程上看到,目前支持firebase推送的封装。

备注:

感觉不错请star, 谢谢!

欢迎加入origin服务器开发QQ交流群:168306674,有任何疑问我都会及时解答

提交bug及特性: https://github.com/duanhf2012/origin/issues

因服务器是由个人维护,如果这个项目对您有帮助,您可以点我进行捐赠,感谢! 特别感谢以下赞助网友:

咕咕兽
_
死磕代码
bp-li
阿正
大头
Now'C

Directories

Path Synopsis
sysmodule
sysservice
util
algorithms/skip
Package skip defines a skiplist datastructure.
Package skip defines a skiplist datastructure.
md5
queue
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.
Package queue provides a fast, ring-buffer queue based on the version suggested by Dariusz Górecki.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL