cellnet

package module
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2022 License: MIT Imports: 11 Imported by: 0

README

cellnetlogo

Build Status MIT licensed GoDoc

cellnet是一个组件化、高扩展性、高性能的开源服务器网络库

使用领域

cellnet经过多个版本的迭代,无论是作为初学者学习的范例,还是作为私用、商用项目的基础构建乃至核心技术层已经在业内广受了解及使用。

主要使用领域:

  • 游戏服务器

    方便定制私有协议,快速构建逻辑服务器、网关服务器、服务器间互联互通、对接第三方SDK、转换编码协议等

  • ARM设备

    设备间网络通讯

  • 证券软件

    内部RPC

特性 (Features)

传输协议支持

  • TCP

    TCP连接器的重连,侦听器的优雅重启。

  • UDP

    纯UDP裸包收发

  • HTTP(测试中)

    侦听器的优雅重启, 支持json及form的收发及封装。编写游戏服务器时,不再需要使用第三方HTTP服务器对接SDK。

    注: 如需要对接大规模网页, 请使用第三方专业网络库,如(https://github.com/gin-gonic/gin), cellnet的http支持主要目的在用统一的地址及peer管理

  • WebSocket

    采用(github.com/gorilla/websocket)实现

编码(Codec)

  • cellnet内建支持以下数据编码:

    可以通过codec包自行添加新的编码格式

  • 支持混合编码收发

    无需改动代码,只需调整消息注册方式,即可达成运行期同时收发不同编码的封包

    • 与其他语言编写的服务器使用protobuf

    • 与web服务器使用json通信

    • 与Unity3D(C#)使用ProtoPlus(github.com/davyxu/protoplus)协议通信

    优点:

    • 享受不同通信协议格式的优点,避免缺点。如Protobuf适合服务器间的传输,封包小,适应面广。

    • 私有的二进制协议方便加密和客户端处理,低GC和实现复杂度。

队列实现各种单线程/多线程,异步/同步业务处理模型

使用队列消息处理结构,方便实现以下效果:

  • 单线程异步逻辑,适用于MMORPG复杂交互,免加锁处理共享数据。

  • 多线程同步逻辑,适用于机器人逻辑,每个机器人使用独立的goroutine收发处理,机器人间互不干扰。

  • 多线程并发逻辑,适用于网关,消息转发,HTTP服务器,每条消息处理都会在完全并发下。

远程过程调用(RPC)

  • 支持同步RPC,适用于后台服务器向其他服务器请求数据后再顺处理事务。

  • 支持异步RPC,适用于单线程服务器逻辑。

消息日志

  • 可以方便的通过日志查看收发消息的每一个字段消息

cellnetlogo

获取+编译

编译和使用cellnet,请下载Go1.10以上版本

  go get -u -v github.com/szdartou/cellnet

  go get -u -v github.com/davyxu/golog

  go get -u -v github.com/davyxu/goobjfmt

  go get -u -v github.com/davyxu/protoplus

第三方库

cellnet 使用Protobuf时,需要使用附带的pb插件生成一个绑定代码,下面的链接可以处理这个问题 Google Protobuf 编码安装

WebSocket实现

架构

cellnet架构层次由如下图所示:

architecture

cellnet对Processor和Peer进行高度抽象,用户可以从这两个层面横向扩展以满足不同的网络封装需求

红色框内的部分为cellnet实现部分

消息处理流程

cellnet的消息收发处理流程如下图所示:

procflow

本图对应的接口为cellnet/processor.go

样例


const peerAddress = "127.0.0.1:17701"

// 服务器逻辑
func server() {

    // 创建服务器的事件队列,所有的消息,事件都会被投入这个队列处理
	queue := cellnet.NewEventQueue()

    // 创建一个服务器的接受器(Acceptor),接受客户端的连接
	peerIns := peer.NewGenericPeer("tcp.Acceptor", "server", peerAddress, queue)

    // 将接受器Peer与tcp.ltv的处理器绑定,并设置事件处理回调
    // tcp.ltv处理器负责处理消息收发,使用私有的封包格式以及日志,RPC等处理
	proc.BindProcessorHandler(peerIns, "tcp.ltv", func(ev cellnet.Event) {

        // 处理Peer收到的各种事件
		switch msg := ev.Message().(type) {
		case *cellnet.SessionAccepted: // 接受一个连接
			fmt.Println("server accepted")
		case *TestEchoACK: // 收到连接发送的消息

			fmt.Printf("server recv %+v\n", msg)

            // 发送回应消息
			ev.Session().Send(&TestEchoACK{
				Msg:   msg.Msg,
				Value: msg.Value,
			})

		case *cellnet.SessionClosed: // 会话连接断开
			fmt.Println("session closed: ", ev.Session().ID())
		}

	})

    // 启动Peer,服务器开始侦听
	peerIns.Start()

    // 开启事件队列,开始处理事件,此函数不阻塞
	queue.StartLoop()
}

// 模拟客户端逻辑
func client() {

    // 例子专用的完成标记
	done := make(chan struct{})

    // 创建客户端的事件处理队列
	queue := cellnet.NewEventQueue()

    // 创建客户端的连接器
	peerIns := peer.NewGenericPeer("tcp.Connector", "client", peerAddress, queue)

    // 将客户端连接器Peer与tcp.ltv处理器绑定,并设置接收事件回调
	proc.BindProcessorHandler(peerIns, "tcp.ltv", func(ev cellnet.Event) {

		switch msg := ev.Message().(type) {
		case *cellnet.SessionConnected: // 已经连接上
			fmt.Println("client connected")
			ev.Session().Send(&TestEchoACK{
				Msg:   "hello",
				Value: 1234,
			})
		case *TestEchoACK: //收到服务器发送的消息

			fmt.Printf("client recv %+v\n", msg)

			// 完成操作
			done <- struct{}{}

		case *cellnet.SessionClosed:
			fmt.Println("client closed")
		}
	})

    // 开启客户端Peer
	peerIns.Start()

    // 开启客户端队列处理
	queue.StartLoop()

	// 等待客户端收到消息
	<-done
}

目录功能

目录及功能一览

运行聊天例子

运行 服务器

cd examples/chat/server

go run main.go

运行 客户端

cd examples/chat/client

go run main.go

随后, 在命令行中输入hello后打回车, 就可以看到服务器返回


sid1 say: hello

基本概念及使用说明

理解下面链接中的概念,可以迅速使用cellnet做基本的网络通讯及消息处理

扩展及定制

若cellnet内建的Peer, Codec及Processor流程不能满足你的需求,可以阅读下面链接内容,添加并扩展cellnet功能

FAQ

常见问题及回答

这里应该有你想知道的答案

贡献者

按贡献时间排序,越靠前表示越新的贡献

superikw(https://github.com/superikw), 在v3中测试出一个websocket接口并发发送问题,wss支持,修复会话管理。

bruce.hu(https://github.com/hxdhero), 在v3中测试出一个竞态冲突的bug

M4tou(https://github.com/mutousay), 在v3中协助解决RPC异步超时回调处理

chuan.li(https://github.com/blade-226), 在v3中提供一个没有在io线程编码的bug

Chris Lonng(https://github.com/lonnng), 在v3中提供一个最大封包约束造成服务器间连接断开的bug

IronsDu(https://github.com/IronsDu), 在v2中大幅度性能优化

viwii(viwii@sina.cn), 在v2中,提供一个可能造成死锁的bug

版本历史

2018.5 v4版本 详细请查看

2017.8 v3版本 详细请查看

2017.1 v2版本 详细请查看

2015.8 v1版本

备注

感觉不错请star, 谢谢!

知乎: http://www.zhihu.com/people/sunicdavy

提交bug及特性: https://github.com/szdartou/cellnet/issues

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	OnlyOriginalData = false
)

Functions

func MessageMetaVisit

func MessageMetaVisit(nameRule string, callback func(meta *MessageMeta) bool) error

func MessageSize

func MessageSize(msg interface{}) int

func MessageToID

func MessageToID(msg interface{}) int

func MessageToName

func MessageToName(msg interface{}) string

消息名(没有包,纯类型名)

func MessageToString

func MessageToString(msg interface{}) string

func NewError

func NewError(s string) error

func NewErrorContext

func NewErrorContext(s string, context interface{}) error

func QueuedCall

func QueuedCall(queue EventQueue, callback func())

有队列时队列调用,无队列时直接调用

func SessionQueuedCall

func SessionQueuedCall(ses Session, callback func())

在会话对应的Peer上的事件队列中执行callback,如果没有队列,则马上执行

Types

type CapturePanicNotifyFunc

type CapturePanicNotifyFunc func(interface{}, EventQueue)

type CloseReason

type CloseReason int32
const (
	CloseReason_IO     CloseReason = iota // 普通IO断开
	CloseReason_Manual                    // 关闭前,调用过Session.Close
)

func (CloseReason) String

func (self CloseReason) String() string

type Codec

type Codec interface {
	// 将数据转换为字节数组
	Encode(msgObj interface{}, ctx ContextSet) (data interface{}, err error)

	// 将字节数组转换为数据
	Decode(data interface{}, msgObj interface{}) error

	// 编码器的名字
	Name() string

	// 兼容http类型
	MimeType() string
}

编码包

type ContextSet

type ContextSet interface {
	// 为对象设置一个自定义属性
	SetContext(key interface{}, v interface{})

	// 从对象上根据key获取一个自定义属性
	GetContext(key interface{}) (interface{}, bool)

	// 给定一个值指针, 自动根据值的类型GetContext后设置到值
	FetchContext(key, valuePtr interface{}) bool
}

设置和获取自定义属性

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (*Error) Error

func (self *Error) Error() string

type Event

type Event interface {

	// 事件对应的会话
	Session() Session

	// 事件携带的消息
	Message() interface{}
}

事件

type EventCallback

type EventCallback func(ev Event)

用户端处理

type EventHooker

type EventHooker interface {

	// 入站(接收)的事件处理
	OnInboundEvent(input Event) (output Event)

	// 出站(发送)的事件处理
	OnOutboundEvent(input Event) (output Event)
}

处理钩子(参数输入, 返回输出, 不给MessageProccessor处理时,可以将Event设置为nil)

type EventQueue

type EventQueue interface {
	// 事件队列开始工作
	StartLoop() EventQueue

	// 停止事件队列
	StopLoop() EventQueue

	// 等待退出
	Wait()

	// 投递事件, 通过队列到达消费者端
	Post(callback func())

	// 是否捕获异常
	EnableCapturePanic(v bool)

	// 获取事件数量
	Count() int
}

事件队列

func NewEventQueue

func NewEventQueue() EventQueue

创建默认长度的队列

type GenericPeer

type GenericPeer interface {
	Peer
	PeerProperty
}

基本的通用Peer

type HTTPAcceptor

type HTTPAcceptor interface {
	GenericPeer

	// 设置http文件服务虚拟地址和文件系统根目录
	SetFileServe(dir string, root string)

	// 设置模板文件地址
	SetTemplateDir(dir string)

	// 设置http模板的分隔符,解决默认{{ }}冲突问题
	SetTemplateDelims(delimsLeft, delimsRight string)

	// 设置模板的扩展名,默认: .tpl .html
	SetTemplateExtensions(exts []string)

	// 设置模板函数入口
	SetTemplateFunc(f []template.FuncMap)
}

type HTTPConnector

type HTTPConnector interface {
	GenericPeer
	Request(method, path string, param *HTTPRequest) error
}

HTTP连接器接口

type HTTPRequest

type HTTPRequest struct {
	REQMsg       interface{} // 请求消息, 指针
	ACKMsg       interface{} // 回应消息, 指针
	REQCodecName string      // 可为空, 默认为json格式
	ACKCodecName string      // 可为空, 默认为json格式
}

type HTTPSession

type HTTPSession interface {
	Request() *http.Request
}

type MessageMeta

type MessageMeta struct {
	Codec Codec        // 消息用到的编码
	Type  reflect.Type // 消息类型, 注册时使用指针类型

	ID int // 消息ID (二进制协议中使用)
	// contains filtered or unexported fields
}

消息元信息

func MessageMetaByFullName

func MessageMetaByFullName(name string) *MessageMeta

根据名字查找消息元信息

func MessageMetaByID

func MessageMetaByID(id int) *MessageMeta

根据id查找消息元信息

func MessageMetaByMsg

func MessageMetaByMsg(msg interface{}) *MessageMeta

根据消息对象获得消息元信息

func MessageMetaByType

func MessageMetaByType(t reflect.Type) *MessageMeta

根据类型查找消息元信息

func RegisterMessageMeta

func RegisterMessageMeta(meta *MessageMeta) *MessageMeta

注册消息元信息

func (*MessageMeta) FullName

func (self *MessageMeta) FullName() string

func (*MessageMeta) GetContext

func (self *MessageMeta) GetContext(key string) (interface{}, bool)

获取meta对应的名字绑定上下文

func (*MessageMeta) GetContextAsInt

func (self *MessageMeta) GetContextAsInt(name string, defaultValue int) int

按字符串格式取context

func (*MessageMeta) GetContextAsString

func (self *MessageMeta) GetContextAsString(key, defaultValue string) string

按字符串格式取context

func (*MessageMeta) NewType

func (self *MessageMeta) NewType() interface{}

创建meta类型的实例

func (*MessageMeta) SetContext

func (self *MessageMeta) SetContext(name string, data interface{}) *MessageMeta

为meta对应的名字绑定上下文

func (*MessageMeta) TypeName

func (self *MessageMeta) TypeName() string

type MessageTransmitter

type MessageTransmitter interface {

	// 接收消息
	OnRecvMessage(ses Session) (msg interface{}, err error)

	// 发送消息
	OnSendMessage(ses Session, msg interface{}) error
}

消息收发器

type MySQLConnector

type MySQLConnector interface {
	GenericPeer

	// 设置密码
	SetPassword(v string)

	// 设置连接数
	SetConnectionCount(v int)

	// 设置自动重连间隔, 0为默认值,关闭自动重连
	SetReconnectDuration(v time.Duration)

	ReconnectDuration() time.Duration
}

type MySQLOperator

type MySQLOperator interface {
	Operate(callback func(rawClient interface{}) interface{}) interface{}
}

type Peer

type Peer interface {
	// 开启端,传入地址
	Start() Peer

	// 停止通讯端
	Stop()

	// Peer的类型(protocol.type),例如tcp.Connector/udp.Acceptor
	TypeName() string
}

端, 可通过接口查询获得更多接口支持,如PeerProperty,ContextSet, SessionAccessor

type PeerCaptureIOPanic

type PeerCaptureIOPanic interface {
	// 开启IO层异常捕获
	EnableCaptureIOPanic(v bool)

	// 获取当前异常捕获值
	CaptureIOPanic() bool
}

开启IO层异常捕获,在生产版本对外端口应该打开此设置

type PeerProperty

type PeerProperty interface {
	Name() string

	Address() string

	Queue() EventQueue

	// 设置名称(可选)
	SetName(v string)

	// 设置Peer地址
	SetAddress(v string)

	// 设置Peer挂接队列(可选)
	SetQueue(v EventQueue)
}

Peer基础属性

type PeerReadyChecker

type PeerReadyChecker interface {
	IsReady() bool
}

检查Peer是否正常工作

type Pipe

type Pipe struct {
	// contains filtered or unexported fields
}

不限制大小,添加不发生阻塞,接收阻塞等待

func NewPipe

func NewPipe() *Pipe

func (*Pipe) Add

func (self *Pipe) Add(msg interface{})

添加时不会发送阻塞

func (*Pipe) Count

func (self *Pipe) Count() int

func (*Pipe) Pick

func (self *Pipe) Pick(retList *[]interface{}) (exit bool)

如果没有数据,发生阻塞

func (*Pipe) Reset

func (self *Pipe) Reset()

type RawPacket

type RawPacket struct {
	MsgData []byte
	MsgID   int
}

直接发送数据时,将*RawPacket作为Send参数

func (*RawPacket) Message

func (self *RawPacket) Message() interface{}

type RecvMsgEvent

type RecvMsgEvent struct {
	Ses Session
	Msg interface{}
}

接收到消息

func (*RecvMsgEvent) Message

func (self *RecvMsgEvent) Message() interface{}

func (*RecvMsgEvent) Reply

func (self *RecvMsgEvent) Reply(msg interface{})

兼容relay和rpc的回消息接口

func (*RecvMsgEvent) Send

func (self *RecvMsgEvent) Send(msg interface{})

func (*RecvMsgEvent) Session

func (self *RecvMsgEvent) Session() Session

type RedisConnector

type RedisConnector interface {
	GenericPeer

	// 设置密码
	SetPassword(v string)

	// 设置连接数
	SetConnectionCount(v int)

	// 设置库索引
	SetDBIndex(v int)
}

type RedisPoolOperator

type RedisPoolOperator interface {
	// 获取
	Operate(callback func(rawClient interface{}) interface{}) interface{}
}

type ReplyEvent

type ReplyEvent interface {
	Reply(msg interface{})
}

rpc, relay, 普通消息

type SendMsgEvent

type SendMsgEvent struct {
	Ses Session
	Msg interface{} // 用户需要发送的消息
}

会话开始发送数据事件

func (*SendMsgEvent) Message

func (self *SendMsgEvent) Message() interface{}

func (*SendMsgEvent) Session

func (self *SendMsgEvent) Session() Session

type Session

type Session interface {

	// 获得原始的Socket连接
	Raw() interface{}

	// 获得Session归属的Peer
	Peer() Peer

	// 发送消息,消息需要以指针格式传入
	Send(msg interface{})

	// 断开
	Close()

	// 标示ID
	ID() int64
}

长连接

type SessionAccepted

type SessionAccepted struct {
}

func (*SessionAccepted) String

func (self *SessionAccepted) String() string

func (*SessionAccepted) SystemMessage

func (self *SessionAccepted) SystemMessage()

type SessionAccessor

type SessionAccessor interface {

	// 获取一个连接
	GetSession(int64) Session

	// 遍历连接
	VisitSession(func(Session) bool)

	// 连接数量
	SessionCount() int

	// 关闭所有连接
	CloseAllSession()
}

会话访问

type SessionCloseNotify

type SessionCloseNotify struct {
}

udp通知关闭,内部使用

func (*SessionCloseNotify) String

func (self *SessionCloseNotify) String() string

func (*SessionCloseNotify) SystemMessage

func (self *SessionCloseNotify) SystemMessage()

type SessionClosed

type SessionClosed struct {
	Reason CloseReason // 断开原因
}

func (*SessionClosed) String

func (self *SessionClosed) String() string

func (*SessionClosed) SystemMessage

func (self *SessionClosed) SystemMessage()

type SessionConnectError

type SessionConnectError struct {
}

func (*SessionConnectError) String

func (self *SessionConnectError) String() string

func (*SessionConnectError) SystemMessage

func (self *SessionConnectError) SystemMessage()

type SessionConnected

type SessionConnected struct {
}

func (*SessionConnected) String

func (self *SessionConnected) String() string

func (*SessionConnected) SystemMessage

func (self *SessionConnected) SystemMessage()

type SessionInit

type SessionInit struct {
}

func (*SessionInit) String

func (self *SessionInit) String() string

func (*SessionInit) SystemMessage

func (self *SessionInit) SystemMessage()

标记系统消息

type SystemMessageIdentifier

type SystemMessageIdentifier interface {
	SystemMessage()
}

使用类型断言判断是否为系统消息

type TCPAcceptor

type TCPAcceptor interface {
	GenericPeer

	// 访问会话
	SessionAccessor

	TCPSocketOption

	// 查看当前侦听端口,使用host:0 作为Address时,socket底层自动分配侦听端口
	Port() int
}

TCP接受器,具备会话访问

type TCPConnector

type TCPConnector interface {
	GenericPeer

	TCPSocketOption

	// 设置重连时间
	SetReconnectDuration(time.Duration)

	// 获取重连时间
	ReconnectDuration() time.Duration

	// 默认会话
	Session() Session

	// 设置会话管理器 实现peer.SessionManager接口
	SetSessionManager(raw interface{})

	// 查看当前连接使用的端口
	Port() int
}

TCP连接器

type TCPSocketOption

type TCPSocketOption interface {
	// 收发缓冲大小,默认-1
	SetSocketBuffer(readBufferSize, writeBufferSize int, noDelay bool)

	// 设置最大的封包大小
	SetMaxPacketSize(maxSize int)

	// 设置读写超时,默认0,不超时
	SetSocketDeadline(read, write time.Duration)
}

TCP

type UDPAcceptor

type UDPAcceptor interface {

	// 底层使用TTL做session生命期管理,超时时间越短,内存占用越低
	SetSessionTTL(dur time.Duration)
}

UDP接受器

type UDPConnector

type UDPConnector interface {
	GenericPeer

	// 默认会话
	Session() Session
}

UDP连接器

type WSAcceptor

type WSAcceptor interface {
	GenericPeer

	// 访问会话
	SessionAccessor

	SetHttps(certfile, keyfile string)

	// 设置升级器
	SetUpgrader(upgrader interface{})

	// 查看当前侦听端口,使用host:0 作为Address时,socket底层自动分配侦听端口
	Port() int
}

Websocket接受器,具备会话访问

type WSConnector

type WSConnector interface {
	GenericPeer

	// 设置重连时间
	SetReconnectDuration(time.Duration)

	// 获取重连时间
	ReconnectDuration() time.Duration

	// 默认会话
	Session() Session

	// 设置会话管理器 实现peer.SessionManager接口
	SetSessionManager(raw interface{})

	// 查看当前连接使用的端口
	Port() int
}

Websocket连接器

Directories

Path Synopsis
gogopb/test
Package test is a generated protocol buffer package.
Package test is a generated protocol buffer package.
examples
tcp
udp
tcp
udp
Generated by github.com/davyxu/protoplus DO NOT EDIT!
Generated by github.com/davyxu/protoplus DO NOT EDIT!
Generated by github.com/davyxu/protoplus DO NOT EDIT!
Generated by github.com/davyxu/protoplus DO NOT EDIT!

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL