gev

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2022 License: MIT Imports: 18 Imported by: 31

README

gev

Github Actions Go Report Card Codacy Badge GoDoc LICENSE Code Size Sourcegraph

中文 | English

gev is a lightweight, fast non-blocking TCP network library / websocket server based on Reactor mode.

Support custom protocols to quickly and easily build high-performance servers.

Features

  • High-performance event loop based on epoll and kqueue
  • Support multi-core and multi-threading
  • Dynamic expansion of read and write buffers implemented by Ring Buffer
  • Asynchronous read and write
  • SO_REUSEPORT port reuse support
  • Automatically clean up idle connections
  • Support WebSocket/Protobuf, custom protocols
  • Support for scheduled tasks, delayed tasks
  • High performance websocket server

Network model

gev uses only a few goroutines, one of them listens for connections and the others (work coroutines) handle read and write events of connected clients. The count of work coroutines is configurable, which is the core number of host CPUs by default.

Performance Test

📈 Test chart

Test environment: Ubuntu18.04 | 4 Virtual CPUs | 4.0 GiB

Throughput Test

limit GOMAXPROCS=1(Single thread),1 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Other Test
Speed ​​Test

Compared with the simple performance of similar libraries, the pressure measurement method is the same as the evio project.

  • gnet
  • eviop
  • evio
  • net (StdLib)

limit GOMAXPROCS=1,1 work goroutine

image

limit GOMAXPROCS=1,4 work goroutine

image

limit GOMAXPROCS=4,4 work goroutine

image

Install

go get -u github.com/Allenxuxu/gev

Getting start

echo demo
package main

import (
	"flag"
	"net/http"
	_ "net/http/pprof"
	"strconv"
	"time"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/log"
	"github.com/Allenxuxu/toolkit/sync/atomic"
)

type example struct {
	Count atomic.Int64
}

func (s *example) OnConnect(c *gev.Connection) {
	s.Count.Add(1)
	//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
	//log.Println("OnMessage")
	out = data
	return
}

func (s *example) OnClose(c *gev.Connection) {
	s.Count.Add(-1)
	//log.Println("OnClose")
}

func main() {
	go func() {
		if err := http.ListenAndServe(":6060", nil); err != nil {
			panic(err)
		}
	}()

	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Network("tcp"),
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.MetricsServer("", ":9091"),
	)
	if err != nil {
		panic(err)
	}

	s.RunEvery(time.Second*2, func() {
		log.Info("connections :", handler.Count.Get())
	})

	s.Start()
}

Handler is an interface that programs must implement.

type CallBack interface {
	OnMessage(c *Connection, ctx interface{}, data []byte) interface{}
	OnClose(c *Connection)
}

type Handler interface {
	CallBack
	OnConnect(c *Connection)
}

OnMessage will be called back when a complete data frame arrives.Users can get the data, process the business logic, and return the data that needs to be sent.

When there is data coming, gev does not call back OnMessage immediately, but instead calls back an UnPacket function.Probably the execution logic is as follows:

ctx, receivedData := c.protocol.UnPacket(c, buffer)
for ctx != nil || len(receivedData) != 0 {
	sendData := c.callBack.OnMessage(c, ctx, receivedData)
	if sendData != nil {
		*tmpBuffer = append(*tmpBuffer, c.protocol.Packet(c, sendData)...)
	}

	ctx, receivedData = c.protocol.UnPacket(c, buffer)
}

protocol

The UnPacket function will check whether the data in the ringbuffer is a complete data frame. If it is, the data will be unpacked and return the payload data. If it is not a complete data frame, it will return directly.

The return value of UnPacket (interface{}, []byte) will be passed in as a call to OnMessage ctx interface{}, data []byte and callback.Ctx is designed to pass special information generated when parsing data frames in the UnPacket function (which is required for complex data frame protocols), and data is used to pass payload data.

type Protocol interface {
	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
	Packet(c *Connection, data interface{}) []byte
}

type DefaultProtocol struct{}

func (d *DefaultProtocol) UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
	s, e := buffer.PeekAll()
	if len(e) > 0 {
		size := len(s) + len(e)
		userBuffer := *c.UserBuffer()
		if size > cap(userBuffer) {
			userBuffer = make([]byte, size)
			*c.UserBuffer() = userBuffer
		}

		copy(userBuffer, s)
		copy(userBuffer[len(s):], e)

		return nil, userBuffer
	} else {
		buffer.RetrieveAll()

		return nil, s
	}
}

func (d *DefaultProtocol) Packet(c *Connection, data interface{}) []byte {
	return data.([]byte)
}

As above, gev provides a default Protocol implementation that will fetch all data in the receive buffer ( ringbuffer ).In actual use, there is usually a data frame protocol of its own, and gev can be set in the form of a plug-in: it is set by variable parameters when creating Server.

s, err := gev.NewServer(handler,gev.Protocol(&ExampleProtocol{}))

Check out the example Protocol for a detailed.

There is also a Send method that can be used for sending data. But Send puts the data to Event-Loop and invokes it to send the data rather than sending data by itself immediately.

Check out the example Server timing push for a detailed.

func (c *Connection) Send(data interface{}, opts ...ConnectionOption) error

ShutdownWrite works for reverting connected status to false and closing connection.

Check out the example Maximum connections for a detailed.

func (c *Connection) ShutdownWrite() error

RingBuffer is a dynamical expansion implementation of circular buffer.

WebSocket

The WebSocket protocol is built on top of the TCP protocol, so gev doesn't need to be built in, but instead provides support in the form of plugins, in the plugins/websocket directory.

code
type Protocol struct {
	upgrade *ws.Upgrader
}

func New(u *ws.Upgrader) *Protocol {
	return &Protocol{upgrade: u}
}

func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
	upgraded := c.Context()
	if upgraded == nil {
		var err error
		out, _, err = p.upgrade.Upgrade(buffer)
		if err != nil {
			log.Println("Websocket Upgrade :", err)
			return
		}
		c.SetContext(true)
	} else {
		header, err := ws.VirtualReadHeader(buffer)
		if err != nil {
			log.Println(err)
			return
		}
		if buffer.VirtualLength() >= int(header.Length) {
			buffer.VirtualFlush()

			payload := make([]byte, int(header.Length))
			_, _ = buffer.Read(payload)

			if header.Masked {
				ws.Cipher(payload, header.Mask, 0)
			}

			ctx = &header
			out = payload
		} else {
			buffer.VirtualRevert()
		}
	}
	return
}

func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
	return data
}

The detailed implementation can be viewed by the plugin. The source code can be viewed using the websocket example.

Example

Buy me a coffee

Paypal: Paypal/AllenXuxu

Thanks

Thanks JetBrains for the free open source license

References

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrConnectionClosed = errors.New("connection closed")

Functions

This section is empty.

Types

type CallBack added in v0.4.0

type CallBack interface {
	OnMessage(c *Connection, ctx interface{}, data []byte) interface{}
	OnClose(c *Connection)
}

type Connection added in v0.4.0

type Connection struct {
	KeyValueContext
	// contains filtered or unexported fields
}

Connection TCP 连接

func NewConnection added in v0.4.0

func NewConnection(fd int,
	loop *eventloop.EventLoop,
	sa unix.Sockaddr,
	protocol Protocol,
	tw *timingwheel.TimingWheel,
	idleTime time.Duration,
	callBack CallBack) *Connection

NewConnection 创建 Connection

func (*Connection) Close added in v0.4.0

func (c *Connection) Close() error

Close 关闭连接

func (*Connection) Connected added in v0.4.0

func (c *Connection) Connected() bool

Connected 是否已连接

func (*Connection) Context added in v0.4.0

func (c *Connection) Context() interface{}

Context 获取 Context

func (*Connection) HandleEvent added in v0.4.0

func (c *Connection) HandleEvent(fd int, events poller.Event)

HandleEvent 内部使用,event loop 回调

func (*Connection) PeerAddr added in v0.4.0

func (c *Connection) PeerAddr() string

PeerAddr 获取客户端地址信息

func (*Connection) ReadBufferLength added in v0.4.0

func (c *Connection) ReadBufferLength() int64

ReadBufferLength read buffer 当前积压的数据长度

func (*Connection) Send added in v0.4.0

func (c *Connection) Send(data interface{}, opts ...ConnectionOption) error

Send 用来在非 loop 协程发送

func (*Connection) SetContext added in v0.4.0

func (c *Connection) SetContext(ctx interface{})

SetContext 设置 Context

func (*Connection) ShutdownWrite added in v0.4.0

func (c *Connection) ShutdownWrite() error

ShutdownWrite 关闭可写端,等待读取完接收缓冲区所有数据

func (*Connection) UserBuffer added in v0.4.0

func (c *Connection) UserBuffer() *[]byte

func (*Connection) WriteBufferLength added in v0.4.0

func (c *Connection) WriteBufferLength() int64

WriteBufferLength write buffer 当前积压的数据长度

type ConnectionOption added in v0.4.0

type ConnectionOption func(*ConnectionOptions)

func SendInLoop added in v0.4.0

func SendInLoop(f SendInLoopFunc) ConnectionOption

type ConnectionOptions added in v0.4.0

type ConnectionOptions struct {
	// contains filtered or unexported fields
}

type DefaultProtocol added in v0.4.0

type DefaultProtocol struct{}

DefaultProtocol 默认 Protocol

func (*DefaultProtocol) Packet added in v0.4.0

func (d *DefaultProtocol) Packet(c *Connection, data interface{}) []byte

Packet 封包

func (*DefaultProtocol) UnPacket added in v0.4.0

func (d *DefaultProtocol) UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)

UnPacket 拆包

type Handler

type Handler interface {
	CallBack
	OnConnect(c *Connection)
}

Handler Server 注册接口

type KeyValueContext added in v0.4.0

type KeyValueContext struct {
	// contains filtered or unexported fields
}

func (*KeyValueContext) Delete added in v0.4.0

func (c *KeyValueContext) Delete(key string)

func (*KeyValueContext) Get added in v0.4.0

func (c *KeyValueContext) Get(key string) (value interface{}, exists bool)

func (*KeyValueContext) Set added in v0.4.0

func (c *KeyValueContext) Set(key string, value interface{})

type LoadBalanceStrategy added in v0.2.1

type LoadBalanceStrategy func([]*eventloop.EventLoop) *eventloop.EventLoop

func LeastConnection added in v0.2.1

func LeastConnection() LoadBalanceStrategy

func RoundRobin added in v0.2.1

func RoundRobin() LoadBalanceStrategy

type Option

type Option func(*Options)

Option ...

func Address

func Address(a string) Option

Address server 监听地址

func CustomProtocol added in v0.4.0

func CustomProtocol(p Protocol) Option

CustomProtocol 数据包处理

func IdleTime added in v0.1.9

func IdleTime(t time.Duration) Option

IdleTime 最大空闲时间(秒)

func LoadBalance added in v0.2.1

func LoadBalance(strategy LoadBalanceStrategy) Option

func MetricsServer added in v0.1.11

func MetricsServer(path, address string) Option

func Network

func Network(n string) Option

Network [tcp] 暂时只支持tcp

func NumLoops

func NumLoops(n int) Option

NumLoops work eventloop 的数量

func ReusePort

func ReusePort(reusePort bool) Option

ReusePort 设置 SO_REUSEPORT

type Options

type Options struct {
	Network   string
	Address   string
	NumLoops  int
	ReusePort bool
	IdleTime  time.Duration
	Protocol  Protocol
	Strategy  LoadBalanceStrategy
	// contains filtered or unexported fields
}

Options 服务配置

type Protocol added in v0.1.2

type Protocol interface {
	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
	Packet(c *Connection, data interface{}) []byte
}

Protocol 自定义协议编解码接口

type SendInLoopFunc added in v0.4.0

type SendInLoopFunc func(interface{})

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server gev Server

func NewServer

func NewServer(handler Handler, opts ...Option) (server *Server, err error)

NewServer 创建 Server

func (*Server) Options added in v0.1.2

func (s *Server) Options() Options

Options 返回 options

func (*Server) RunAfter added in v0.0.2

func (s *Server) RunAfter(d time.Duration, f func()) *timingwheel.Timer

RunAfter 延时任务

Example
handler := new(example)

s, err := NewServer(handler,
	Network("tcp"),
	Address(":1833"),
	NumLoops(8),
	ReusePort(true))
if err != nil {
	panic(err)
}

go s.Start()
defer s.Stop()

s.RunAfter(time.Second, func() {
	fmt.Println("RunAfter")
})

time.Sleep(2500 * time.Millisecond)
Output:

RunAfter

func (*Server) RunEvery added in v0.0.2

func (s *Server) RunEvery(d time.Duration, f func()) *timingwheel.Timer

RunEvery 定时任务

Example
handler := new(example)

s, err := NewServer(handler,
	Network("tcp"),
	Address(":1833"),
	NumLoops(8),
	ReusePort(true))
if err != nil {
	panic(err)
}

go s.Start()
defer s.Stop()

t := s.RunEvery(time.Second, func() {
	fmt.Println("EveryFunc")
})

time.Sleep(4500 * time.Millisecond)
t.Stop()
time.Sleep(4500 * time.Millisecond)
Output:

EveryFunc
EveryFunc
EveryFunc
EveryFunc

func (*Server) Start

func (s *Server) Start()

Start 启动 Server

func (*Server) Stop

func (s *Server) Stop()

Stop 关闭 Server

Directories

Path Synopsis
benchmarks
example
Package log is from https://github.com/micro/go-micro/blob/master/util/log/log.go
Package log is from https://github.com/micro/go-micro/blob/master/util/log/log.go
plugins

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL