netman

module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2022 License: MIT

README

目录

介绍

  • 轻量的高性能TCP网络框架,基于epoll/kqueue,reactor模型实现
  • 简单的API,细节在框架内部实现,几行代码即可构建高性能的Server
  • 支持路由配置,更专注业务需求的处理,无需关心封包解包
  • 支持自定义封包格式,更灵活
  • 支持linux/macos,windows请在docker中运行
  • 支持TLS
  • 支持websocket
  • 中间件

优势

  • 非阻塞IO
  • 底层基于事件循环,在net包中,一个连接需要一个goroutine去维持,但netman基于事件循环则不需要,大大减少了内存的占用,在大量连接的场景下更为明显
  • 基于路由配置,业务层不关心封包解包的实现
  • 全局中间件、分组中间件
  • 经过测试在阿里云服务器(单机)上建立100万个连接(C1000K)的内存消耗在3.8GB左右

安装

  • 下载

    go get -u github.com/ikilobyte/netman
    
  • 导入

    import "github.com/ikilobyte/netman/server"
    

开始

server
  • 基本使用
    package main
    
    import "github.com/ikilobyte/netman/server"
    
    type Hello struct{}
    
    func (h *Hello) Do(request iface.IRequest) {
        message := request.GetMessage()
        connect := request.GetConnect()
        n, err := connect.Send(message.ID(), message.Bytes())
        fmt.Println("conn.send.n", n, "send err", err, "recv len()", message.Len())
    
        // 以下方式都可以获取到所有连接
        // 1、request.GetConnects()
        // 2、connect.GetConnectMgr().GetConnects()
    
        // 主动关闭连接
        // connect.Close()
    }
    
    func main() {
        s := server.New(
            "0.0.0.0",
            6565,
    
            // options 更多配置请看 #配置 文档
            server.WithMaxBodyLength(1024*1024*100), // 包体最大长度限制,0表示不限制
        )
    
        // add router 
    	s.AddRouter(0, new(Hello))  // 设置消息ID为0的处理方法
        //s.AddRouter(1, new(xxx))  // ...
    
        s.Start()
    }
    
client
  • 示例

    package main
    
    import (
        "fmt"
        "io"
        "net"
        "os"
        "strings"
        "time"
    
        "github.com/ikilobyte/netman/util"
    )
    
    func main() {
        conn, err := net.Dial("tcp", "127.0.0.1:6565")
        if err != nil {
            panic(err)
        }
    
        // 用于消息的封包和解包,也可以自行实现封包解包规则
        packer := util.NewDataPacker()
    
        // 100MB
        c := strings.Repeat("a", 1024*1024*100)
        bs, err := packer.Pack(0, []byte(c))
    
        if err != nil {
            panic(err)
        }
    
        // 发送消息
        for {
            fmt.Println(conn.Write(bs))
            time.Sleep(time.Second * 1)
        }
    }
    
    
  • 接收消息

    // 备注:以下规则是框架默认实现的规则,你也可以自行实现,使用自己的 Packer 即可
    for {
        header := make([]byte, 8)
        n, err := io.ReadFull(conn, header)
        if n == 0 && err == io.EOF {
        	fmt.Println("连接已断开")
        	os.Exit(0)
        }
    
        if err != nil {
        	fmt.Println("read head bytes err", err)
        	os.Exit(1)
        }
    
        // 解包头部,会返回一个IMessage
        message, err := packer.UnPack(header)
        if err != nil {
        	fmt.Println("unpack err", err)
        	os.Exit(1)
        }
    
        // 创建一个和数据大小一样的bytes并读取
        dataBuff := make([]byte, message.Len())
        n, err = io.ReadFull(conn, dataBuff)
    
        if n == 0 && err == io.EOF {
        	fmt.Println("连接已断开")
        	os.Exit(0)
        }
    
        if err != nil {
        	fmt.Println("read dataBuff err", err, len(dataBuff[:n]))
        	os.Exit(1)
        }
    
        message.SetData(dataBuff)
        fmt.Printf(
            "recv msgID[%d] len[%d] %s\n",
            message.ID(),
            message.Len(),
            time.Now().Format("2006-01-02 15:04:05.000"),
        )
    }
    
    

Websocket

  • server
type Handler struct{}

// 连接建立
func (h *Handler) Open(connect iface.IConnect) {

    // 获取query参数
    query := connect.GetQueryStringParam()

	// 客户端连接的url应该设置为:ws://ip:port/path?key=value&token=xxx
	// 支持任意path,如:ws://ip:port/x/y/z/a/b/c?key=value&token=xxx
	if query.Get("token") != "xxx" {
        // 关闭连接
        connect.Close()
        return
    }
	
    fmt.Println("onopen", connect.GetID())
}

// 消息到来时
func (h *Handler) Message(request iface.IRequest) {

	// 消息
	message := request.GetMessage()

	// 来自那个连接的
	connect := request.GetConnect()

	fmt.Printf("recv %s\n", message.String())

	// 普通文本格式
	fmt.Println(connect.Text([]byte(fmt.Sprintf("hi %s", message.Bytes()))))

	// 二进制格式
	//fmt.Println(connect.Binary([]byte("hi")))
}

// 连接关闭
func (h *Handler) Close(connect iface.IConnect) {
	fmt.Println("onclose", connect.GetID())
}

s := server.Websocket(
    "0.0.0.0",
    6565,
    new(Handler),   // websocket事件回调处理
)
  • client
  • 各语言的Websocket Client库即可,如Javascript的 new Websocket
  • client.html

中间件

  • 可被定义为全局中间件,和分组中间件,目前websocket只支持全局中间件
  • 配置中间件后,接收到的每条消息都会先经过中间件,再到达对应的消息回调函数
  • 中间件可提前终止执行
  • 定义中间件
    func demo1() iface.MiddlewareFunc {
        return func(ctx iface.IContext, next iface.Next) interface{} {
            fmt.Println("Front middleware")
            fmt.Println("ctx data", ctx.GetConnect(), ctx.GetRequest(), ctx.GetMessage())
    
            ctx.Set("key", "value")
            ctx.Set("now", time.Now().UnixNano())
            // 继续往下执行
            resp := next(ctx)
            fmt.Println("Rear middleware")
            return resp
        }
    }
    
    func demo2() iface.MiddlewareFunc {
        return func(ctx iface.IContext, next iface.Next) interface{} {
            fmt.Println(ctx.Get("key"),ctx.Get("now"))
            return next(ctx)
        }
    }   
    
    //authentication 这个用来做分组中间件
    var loginStore map[int]time.Time
    func authentication() iface.MiddlewareFunc {
        return func(ctx iface.IContext, next iface.Next) interface{} {
            conn := ctx.GetConnect()
            // 判断是否登录过,
            if _, ok := loginStore[conn.GetID()]; !ok {
                // 提前结束执行,不会到对应的Router
                _, _ = conn.Send(1, []byte("Authentication failed"))
                _ = conn.Close()
                return nil
            }
            // 继续执行
            return next(ctx)
        }
    }
    
    
  • 使用
    // 全局中间件
    s.Use(demo1())
    s.Use(demo2())
    
    // 分组,只有对应的路由才会执行
    g := s.Group(authentication())
    {
        g.AddRouter(1, new(xxx))
        //g.AddRouter(2,new(xxx))
        //g.AddRouter(3,new(xxx))
        //g.AddRouter(4,new(xxx))
    }
    

配置

  • 所有配置对 TcpServer(TLS)Websocket Server 都是生效的
  • 更多配置请查看 options.go
Hooks
  • Websocket也可以生效
type Hooks struct{}

// OnOpen 连接建立后回调
func (h *Hooks) OnOpen(connect iface.IConnect) {
    fmt.Printf("connId[%d] onOpen\n", connect.GetID())
}

// OnClose 连接成功关闭时回调
func (h *Hooks) OnClose(connect iface.IConnect) {
    fmt.Printf("connId[%d] onClose\n", connect.GetID())
}

s := server.New(
    "0.0.0.0",
    6565,
    
    // 配置 Hooks
    server.WithHooks(new(Hooks)),
)
心跳检测
  • 二者需要同时配置才会生效
s := server.New(
    "0.0.0.0",
    6565,
    
    // 表示60秒检测一次
    server.WithHeartbeatCheckInterval(time.Second*60), 
    
    // 表示一个连接如果180秒内未向服务器发送任何数据,此连接将被强制关闭
    server.WithHeartbeatIdleTime(time.Second*180),     
)
包体最大长度
s := server.New(
    "0.0.0.0",
    6565,
    
    // 0表示不限制长度
    // 这里配置的是100MB,当某条消息超过100MB时,会被拒绝处理
    server.WithMaxBodyLength(1024*1024*100),
)
TCP Keepalive
s := server.New(
    "0.0.0.0",
    6565,
    
    server.WithTCPKeepAlive(time.Second*30),
)
TLS
tlsConfig := &tls.Config{
    Certificates: []tls.Certificate{...},
}

s := server.New(
    "0.0.0.0",
    6565,
    
    // 传入相关配置后,即可开启TLS
    server.WithTLSConfig(tlsConfig),
)
自定义封包解包
  • 为了更灵活的需求,可自定义封包解包规则,只需要使用IPacker接口即可
  • 配置
// IPacker 定义
type IPacker interface {
    Pack(msgID uint32, data []byte) ([]byte, error) // 封包
    UnPack([]byte) (IMessage, error)                // 解包
    SetMaxBodyLength(uint32)                        // 设置包体最大长度限制
    GetHeaderLength() uint32                        // 获取头部长度
}

type YouPacker struct {
    // implements IPacker
    // ... 
}
    
s := server.New(
    "0.0.0.0",
    6565,
    
    // 自定义Packer
    server.server.WithPacker(new(YouPacker)),
)
组合使用
s := server.New(
    "0.0.0.0",
    6565,
    server.WithNumEventLoop(runtime.NumCPU()*3),
    server.WithHooks(new(Hooks)),            // hook
    server.WithMaxBodyLength(0),             // 配置包体最大长度,默认为0(不限制大小)
    server.WithTCPKeepAlive(time.Second*30), // 设置TCPKeepAlive
    server.WithLogOutput(os.Stdout),         // 框架运行日志保存的地方
    server.WithPacker(new(YouPacker)),       // 可自行实现数据封包解包
    
    // 心跳检测机制,二者需要同时配置才会生效
    server.WithHeartbeatCheckInterval(time.Second*60), // 表示60秒检测一次
    server.WithHeartbeatIdleTime(time.Second*180),     // 表示一个连接如果180秒内未向服务器发送任何数据,此连接将被强制关闭
    
    // 开启TLS
    server.WithTLSConfig(tlsConfig),
)

s.Start()

架构

on

百万连接

  • 如看不到图片可以在examples目录中查看c1000k.png这张图片 c1000k

鸣谢

感谢 JetBrains 为此开源项目提供 GoLand 开发工具支持:

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL