rio

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2025 License: LGPL-3.0 Imports: 17 Imported by: 0

README

RIO

基于IOURINGAIO网络库,非CGO方式,且遵循标准库使用设计模式。

支持协议:TCPUDPUNIXUNIXGRAMIP为代理标准库)。

Linux 内核版本需要>= 5.14,推荐版本为>= 6.1

性能

Benchmark

测试环境:Win11(WSL2)、内核(6.6.36.6-microsoft-standard-WSL2)、CPU(13600K)。

基于默认参数的测试,10线程,每线程1000链接,共计10000链接。

RIO 相比 STD(go net 标准库)约快 13%,详见 Benchmark

注意:CurveWaitTransmission 在不同环境下的性能体现是不同的,需按需调整来发挥出高效的性能。

echo benchmark http benchmark
------ Benchmark ------
Port: 9000
Workers: 10
Count: 1000
NBytes: 1024
ECHO-RIO benching complete(1.564700361s): 6391 conn/sec, 6.2M inbounds/sec, 6.2M outbounds/sec, 0 failures
ECHO-STD benching complete(1.821161901s): 5491 conn/sec, 5.4M inbounds/sec, 5.4M outbounds/sec, 0 failures
HTTP-RIO benching complete(1.722059583s): 5807 conn/sec, 5.8M inbounds/sec, 5.8M outbounds/sec, 0 failures
HTTP-STD benching complete(1.948937829s): 5131 conn/sec, 5M inbounds/sec, 5M outbounds/sec, 0 failures
TCPKALI

服务端环境:Win11(Hyper-V)、Ubuntu24.10(6.11.0-8-generic)、CPU(4核)。

客户端环境:Win11(WSL2)、内核(6.6.36.6-microsoft-standard-WSL2)、CPU(13600K)。

tcpkali 压测结果为 RIO 相比 STD(go net 标准库)约快 30%

注意:请不要本地压测本地。

http benchmark
类型 packet rate estimate
RIO 22330.7
NET 16918.8
tcpkali --workers 1 -c 50 -T 10s -m "PING" 192.168.100.120:9000
------ RIO ------
Destination: [192.168.100.120]:9000
Interface eth0 address [192.168.100.1]:0
Using interface eth0 to connect to [192.168.100.120]:9000
Ramped up to 50 connections.
Total data sent:     218.8 MiB (229455392 bytes)
Total data received: 217.3 MiB (227831216 bytes)
Bandwidth per channel: 7.316⇅ Mbps (914.5 kBps)
Aggregate bandwidth: 182.254↓, 183.554↑ Mbps
Packet rate estimate: 22330.7↓, 15969.6↑ (3↓, 35↑ TCP MSS/op)
Test duration: 10.0006 s.
------ NET ------
Destination: [192.168.100.120]:9000
Interface eth0 address [192.168.100.1]:0
Using interface eth0 to connect to [192.168.100.120]:9000
Ramped up to 50 connections.
Total data sent:     217.6 MiB (228130816 bytes)
Total data received: 215.8 MiB (226292180 bytes)
Bandwidth per channel: 7.122⇅ Mbps (890.2 kBps)
Aggregate bandwidth: 180.871↓, 182.341↑ Mbps
Packet rate estimate: 16918.8↓, 15884.2↑ (2↓, 45↑ TCP MSS/op)
Test duration: 10.009 s.

使用

go get -u github.com/brickingsoft/rio

基本使用rio替换net


// 将 net.Listen() 替换成 rio.Listen() 
ln, lnErr := rio.Listen("tcp", ":9000")
// 将 net.Dial() 替换成 rio.Dial() 
conn, dialErr := rio.Dial("tcp", "127.0.0.1:9000")

TLS场景:

// server("github.com/brickingsoft/rio/tls")
ln, _ = tls.Listen("tcp", "127.0.0.1:9000", tls.ConfigFrom(config))
// server(use wrap)
ln, _ := rio.Listen("tcp", ":9000")
ln, _ := tls.NewListener(ln, config)

// client("github.com/brickingsoft/rio/tls")
conn, _ = tls.Dial("tcp", "127.0.0.1:9000", tls.ConfigFrom(config))

// client(use wrap)
rawConn, dialErr := rio.Dial("tcp", "127.0.0.1:9000")
conn := tls.Client(rawConn, config)
if err := conn.HandshakeContext(ctx); err != nil {
	rawConn.Close()
	return nil, err
}

转换场景:

// tcp sendfile
reader, ok := conn.(io.ReaderFrom)
// 转换成 TCP 链接 
tcpConn, ok := conn.(*rio.TCPConn)
// 转换成 UDP 链接
udpConn, ok := conn.(*rio.UDPConn)
// 转换成 UNIX 链接
unixConn, ok := conn.(*rio.UnixConn)

纯客户端场景:

建议PINIOURING,直到程序退出再UNPIN

因为IOURING的生命周期为当被使用时开启,当被没有被使用时关闭。

因为Listen的生命周期往往和程序是一致的,所以IOURING为常驻状况。

Dial的生命周期是短的,往往是频繁Dial,所以需要PIN来常驻IOURING,而不是频繁启停。

// 程序启动位置
rio.Pin()
// 程序退出位置
rio.Unpin()

HTTP场景:

Server 使用Listener代替法。

Client 使用RoundTripper代替法。

// http server
http.Serve(ln, handler)
// fasthttp server
fasthttp.Serve(ln, handler)

REUSE PORT(监听TCP时自动启用):


lc := rio.ListenConfig{}
lc.SetReusePort(true)

ln, lnErr := lc.Listen(...)

进阶调参

通过设置环境变量进行调控,具体详见 IOURING

名称 说明
IOURING_ENTRIES 数字 环大小,默认为最大值 16384。
IOURING_SETUP_FLAGS 文本 标识,如IORING_SETUP_SQPOLL, IORING_SETUP_SUBMIT_ALL等。
IOURING_SETUP_FLAGS_SCHEMA 文本 标识方案,DEFAULTPERFORMANCE
IOURING_SQ_THREAD_CPU 数字 设置环锁亲和的CPUID。
IOURING_SQ_THREAD_IDLE 数字 在含有IORING_SETUP_SQPOLL标识时,设置空闲时长,单位为毫秒,默认是 1 毫秒。
IOURING_PREPARE_BATCH_SIZE 数字 准备 SQE 的缓冲大小,默认为 SQ 的大小。
IOURING_USE_CPU_AFFILIATE 布尔 是否使用 CPU AFFILIATE。
IOURING_CURVE_TRANSMISSION 文本 设置等待 CQ 策略曲线,如 1:1us, 8:2us

注意事项:

  • IOURING_SETUP_FLAGS 与系统内核版本有关联,请务必确认版本。
  • IORING_SETUP_SQPOLL 取决于运行环境,请自行测试效果。
  • IOURING_SETUP_FLAGS_SCHEMA 优先级低于 IOURING_SETUP_FLAGS
  • PERFORMANCEIORING_SETUP_SQPOLL IORING_SETUP_SUBMIT_ALL IORING_SETUP_SINGLE_ISSUER 的组合。
  • DEFAULTIORING_SETUP_SUBMIT_ALL

Documentation

Index

Constants

View Source
const (
	ReadFromFileUseMMapPolicy = int32(iota)
	ReadFromFileUseMixPolicy
)

Variables

View Source
var (
	DefaultDialer = Dialer{
		Timeout:         15 * time.Second,
		Deadline:        time.Time{},
		KeepAlive:       0,
		KeepAliveConfig: net.KeepAliveConfig{Enable: true},
		MultipathTCP:    false,
		FastOpen:        false,
		QuickAck:        false,
		UseSendZC:       false,
		Control:         nil,
		ControlContext:  nil,
	}
)

Functions

func Dial

func Dial(network string, address string) (net.Conn, error)

func DialContext

func DialContext(ctx context.Context, network string, address string) (net.Conn, error)

func DialTimeout

func DialTimeout(network string, address string, timeout time.Duration) (net.Conn, error)

func Listen

func Listen(network string, addr string) (ln net.Listener, err error)

func ListenPacket

func ListenPacket(network string, addr string) (c net.PacketConn, err error)

func Pin

func Pin() (err error)

Pin 钉住 IOURING 。 一般用于程序启动时。 这用手动管理 IOURING 的生命周期,一般用于只有 Dial 的使用。 注意:必须 Unpin 来关闭 IOURING 。

func Unpin

func Unpin() (err error)

Unpin 取钉。

func UseProcessPriority

func UseProcessPriority(level process.PriorityLevel)

func UseReadFromFilePolicy

func UseReadFromFilePolicy(policy int32)

Types

type Dialer

type Dialer struct {
	Timeout         time.Duration
	Deadline        time.Time
	KeepAlive       time.Duration
	KeepAliveConfig net.KeepAliveConfig
	MultipathTCP    bool
	FastOpen        bool
	QuickAck        bool
	UseSendZC       bool
	Control         func(network, address string, c syscall.RawConn) error
	ControlContext  func(ctx context.Context, network, address string, c syscall.RawConn) error
}

func (*Dialer) Dial

func (d *Dialer) Dial(network string, address string) (c net.Conn, err error)

func (*Dialer) DialContext

func (d *Dialer) DialContext(ctx context.Context, network, address string) (c net.Conn, err error)

func (*Dialer) DialIP

func (d *Dialer) DialIP(_ context.Context, network string, laddr, raddr *net.IPAddr) (*IPConn, error)

func (*Dialer) DialTCP

func (d *Dialer) DialTCP(ctx context.Context, network string, laddr, raddr *net.TCPAddr) (*TCPConn, error)

func (*Dialer) DialUDP

func (d *Dialer) DialUDP(ctx context.Context, network string, laddr, raddr *net.UDPAddr) (*UDPConn, error)

func (*Dialer) DialUnix

func (d *Dialer) DialUnix(ctx context.Context, network string, laddr, raddr *net.UnixAddr) (*UnixConn, error)

func (*Dialer) SetFastOpen

func (d *Dialer) SetFastOpen(use bool)

func (*Dialer) SetMultipathTCP

func (d *Dialer) SetMultipathTCP(use bool)

func (*Dialer) SetQuickAck

func (d *Dialer) SetQuickAck(use bool)

func (*Dialer) SetSendZC added in v1.2.0

func (d *Dialer) SetSendZC(use bool)

type IPConn

type IPConn struct {
	*net.IPConn
}

func DialIP

func DialIP(network string, laddr, raddr *net.IPAddr) (*IPConn, error)

func ListenIP

func ListenIP(network string, addr *net.IPAddr) (*IPConn, error)

type ListenConfig

type ListenConfig struct {
	Control         func(network, address string, c syscall.RawConn) error
	KeepAlive       time.Duration
	KeepAliveConfig net.KeepAliveConfig
	UseSendZC       bool
	MultipathTCP    bool
	FastOpen        bool
	QuickAck        bool
	ReusePort       bool
}

func (*ListenConfig) Listen

func (lc *ListenConfig) Listen(ctx context.Context, network string, address string) (ln net.Listener, err error)

func (*ListenConfig) ListenIP

func (lc *ListenConfig) ListenIP(_ context.Context, network string, addr *net.IPAddr) (*IPConn, error)

func (*ListenConfig) ListenMulticastUDP

func (lc *ListenConfig) ListenMulticastUDP(ctx context.Context, network string, ifi *net.Interface, addr *net.UDPAddr) (*UDPConn, error)

func (*ListenConfig) ListenPacket

func (lc *ListenConfig) ListenPacket(ctx context.Context, network, address string) (c net.PacketConn, err error)

func (*ListenConfig) ListenTCP

func (lc *ListenConfig) ListenTCP(ctx context.Context, network string, addr *net.TCPAddr) (*TCPListener, error)

func (*ListenConfig) ListenUDP

func (lc *ListenConfig) ListenUDP(ctx context.Context, network string, addr *net.UDPAddr) (*UDPConn, error)

func (*ListenConfig) ListenUnix

func (lc *ListenConfig) ListenUnix(ctx context.Context, network string, addr *net.UnixAddr) (*UnixListener, error)

func (*ListenConfig) ListenUnixgram

func (lc *ListenConfig) ListenUnixgram(ctx context.Context, network string, addr *net.UnixAddr) (*UnixConn, error)

func (*ListenConfig) SetFastOpen

func (lc *ListenConfig) SetFastOpen(use bool)

func (*ListenConfig) SetMultipathTCP

func (lc *ListenConfig) SetMultipathTCP(use bool)

func (*ListenConfig) SetQuickAck

func (lc *ListenConfig) SetQuickAck(use bool)

func (*ListenConfig) SetReusePort

func (lc *ListenConfig) SetReusePort(use bool)

func (*ListenConfig) SetSendZC

func (lc *ListenConfig) SetSendZC(use bool)

type TCPConn

type TCPConn struct {
	// contains filtered or unexported fields
}

func DialTCP

func DialTCP(network string, laddr, raddr *net.TCPAddr) (*TCPConn, error)

func (*TCPConn) Close

func (c *TCPConn) Close() error

func (*TCPConn) CloseRead

func (c *TCPConn) CloseRead() error

func (*TCPConn) CloseWrite

func (c *TCPConn) CloseWrite() error

func (*TCPConn) Context

func (c *TCPConn) Context() context.Context

func (*TCPConn) File

func (c *TCPConn) File() (f *os.File, err error)

func (*TCPConn) LocalAddr

func (c *TCPConn) LocalAddr() net.Addr

func (*TCPConn) MultipathTCP

func (c *TCPConn) MultipathTCP() (bool, error)

func (*TCPConn) Read

func (c *TCPConn) Read(b []byte) (n int, err error)

func (*TCPConn) ReadBuffer

func (c *TCPConn) ReadBuffer() (int, error)

func (*TCPConn) ReadFrom

func (c *TCPConn) ReadFrom(r io.Reader) (int64, error)

func (*TCPConn) RemoteAddr

func (c *TCPConn) RemoteAddr() net.Addr

func (*TCPConn) SetDeadline

func (c *TCPConn) SetDeadline(t time.Time) error

func (*TCPConn) SetKeepAlive

func (c *TCPConn) SetKeepAlive(keepalive bool) error

func (*TCPConn) SetKeepAliveConfig

func (c *TCPConn) SetKeepAliveConfig(config net.KeepAliveConfig) error

func (*TCPConn) SetKeepAlivePeriod

func (c *TCPConn) SetKeepAlivePeriod(period time.Duration) error

func (*TCPConn) SetLinger

func (c *TCPConn) SetLinger(sec int) error

func (*TCPConn) SetNoDelay

func (c *TCPConn) SetNoDelay(noDelay bool) error

func (*TCPConn) SetReadBuffer

func (c *TCPConn) SetReadBuffer(bytes int) error

func (*TCPConn) SetReadDeadline

func (c *TCPConn) SetReadDeadline(t time.Time) error

func (*TCPConn) SetWriteBuffer

func (c *TCPConn) SetWriteBuffer(bytes int) error

func (*TCPConn) SetWriteDeadline

func (c *TCPConn) SetWriteDeadline(t time.Time) error

func (*TCPConn) SyscallConn

func (c *TCPConn) SyscallConn() (syscall.RawConn, error)

func (*TCPConn) Write

func (c *TCPConn) Write(b []byte) (n int, err error)

func (*TCPConn) WriteBuffer

func (c *TCPConn) WriteBuffer() (int, error)

func (*TCPConn) WriteTo

func (c *TCPConn) WriteTo(w io.Writer) (int64, error)

type TCPListener

type TCPListener struct {
	// contains filtered or unexported fields
}

func ListenTCP

func ListenTCP(network string, addr *net.TCPAddr) (*TCPListener, error)

func (*TCPListener) Accept

func (ln *TCPListener) Accept() (net.Conn, error)

func (*TCPListener) AcceptTCP

func (ln *TCPListener) AcceptTCP() (tc *TCPConn, err error)

func (*TCPListener) Addr

func (ln *TCPListener) Addr() net.Addr

func (*TCPListener) Close

func (ln *TCPListener) Close() error

func (*TCPListener) File

func (ln *TCPListener) File() (f *os.File, err error)

func (*TCPListener) SetDeadline

func (ln *TCPListener) SetDeadline(t time.Time) error

func (*TCPListener) SyscallConn

func (ln *TCPListener) SyscallConn() (syscall.RawConn, error)

type UDPConn

type UDPConn struct {
	// contains filtered or unexported fields
}

func DialUDP

func DialUDP(network string, laddr, raddr *net.UDPAddr) (*UDPConn, error)

func ListenMulticastUDP

func ListenMulticastUDP(network string, ifi *net.Interface, addr *net.UDPAddr) (*UDPConn, error)

func ListenUDP

func ListenUDP(network string, addr *net.UDPAddr) (*UDPConn, error)

func (*UDPConn) Close

func (c *UDPConn) Close() error

func (*UDPConn) Context

func (c *UDPConn) Context() context.Context

func (*UDPConn) File

func (c *UDPConn) File() (f *os.File, err error)

func (*UDPConn) LocalAddr

func (c *UDPConn) LocalAddr() net.Addr

func (*UDPConn) Read

func (c *UDPConn) Read(b []byte) (n int, err error)

func (*UDPConn) ReadBuffer

func (c *UDPConn) ReadBuffer() (int, error)

func (*UDPConn) ReadFrom

func (c *UDPConn) ReadFrom(b []byte) (n int, addr net.Addr, err error)

func (*UDPConn) ReadFromUDP

func (c *UDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)

func (*UDPConn) ReadFromUDPAddrPort

func (c *UDPConn) ReadFromUDPAddrPort(b []byte) (n int, addr netip.AddrPort, err error)

func (*UDPConn) ReadMsgUDP

func (c *UDPConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error)

func (*UDPConn) ReadMsgUDPAddrPort

func (c *UDPConn) ReadMsgUDPAddrPort(b, oob []byte) (n, oobn, flags int, addr netip.AddrPort, err error)

func (*UDPConn) RemoteAddr

func (c *UDPConn) RemoteAddr() net.Addr

func (*UDPConn) SetDeadline

func (c *UDPConn) SetDeadline(t time.Time) error

func (*UDPConn) SetReadBuffer

func (c *UDPConn) SetReadBuffer(bytes int) error

func (*UDPConn) SetReadDeadline

func (c *UDPConn) SetReadDeadline(t time.Time) error

func (*UDPConn) SetWriteBuffer

func (c *UDPConn) SetWriteBuffer(bytes int) error

func (*UDPConn) SetWriteDeadline

func (c *UDPConn) SetWriteDeadline(t time.Time) error

func (*UDPConn) SyscallConn

func (c *UDPConn) SyscallConn() (syscall.RawConn, error)

func (*UDPConn) Write

func (c *UDPConn) Write(b []byte) (n int, err error)

func (*UDPConn) WriteBuffer

func (c *UDPConn) WriteBuffer() (int, error)

func (*UDPConn) WriteMsgUDP

func (c *UDPConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error)

func (*UDPConn) WriteMsgUDPAddrPort

func (c *UDPConn) WriteMsgUDPAddrPort(b, oob []byte, addr netip.AddrPort) (n, oobn int, err error)

func (*UDPConn) WriteTo

func (c *UDPConn) WriteTo(b []byte, addr net.Addr) (n int, err error)

func (*UDPConn) WriteToUDP

func (c *UDPConn) WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)

func (*UDPConn) WriteToUDPAddrPort

func (c *UDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (n int, err error)

type UnixConn

type UnixConn struct {
	// contains filtered or unexported fields
}

func DialUnix

func DialUnix(network string, laddr, raddr *net.UnixAddr) (*UnixConn, error)

func ListenUnixgram

func ListenUnixgram(network string, addr *net.UnixAddr) (*UnixConn, error)

func (*UnixConn) Close

func (c *UnixConn) Close() error

func (*UnixConn) Context

func (c *UnixConn) Context() context.Context

func (*UnixConn) File

func (c *UnixConn) File() (f *os.File, err error)

func (*UnixConn) LocalAddr

func (c *UnixConn) LocalAddr() net.Addr

func (*UnixConn) Read

func (c *UnixConn) Read(b []byte) (n int, err error)

func (*UnixConn) ReadBuffer

func (c *UnixConn) ReadBuffer() (int, error)

func (*UnixConn) ReadFrom

func (c *UnixConn) ReadFrom(b []byte) (int, net.Addr, error)

func (*UnixConn) ReadFromUnix

func (c *UnixConn) ReadFromUnix(b []byte) (n int, addr *net.UnixAddr, err error)

func (*UnixConn) ReadMsgUnix

func (c *UnixConn) ReadMsgUnix(b []byte, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error)

func (*UnixConn) RemoteAddr

func (c *UnixConn) RemoteAddr() net.Addr

func (*UnixConn) SetDeadline

func (c *UnixConn) SetDeadline(t time.Time) error

func (*UnixConn) SetReadBuffer

func (c *UnixConn) SetReadBuffer(bytes int) error

func (*UnixConn) SetReadDeadline

func (c *UnixConn) SetReadDeadline(t time.Time) error

func (*UnixConn) SetWriteBuffer

func (c *UnixConn) SetWriteBuffer(bytes int) error

func (*UnixConn) SetWriteDeadline

func (c *UnixConn) SetWriteDeadline(t time.Time) error

func (*UnixConn) SyscallConn

func (c *UnixConn) SyscallConn() (syscall.RawConn, error)

func (*UnixConn) Write

func (c *UnixConn) Write(b []byte) (n int, err error)

func (*UnixConn) WriteBuffer

func (c *UnixConn) WriteBuffer() (int, error)

func (*UnixConn) WriteMsgUnix

func (c *UnixConn) WriteMsgUnix(b []byte, oob []byte, addr *net.UnixAddr) (n int, oobn int, err error)

func (*UnixConn) WriteTo

func (c *UnixConn) WriteTo(b []byte, addr net.Addr) (int, error)

func (*UnixConn) WriteToUnix

func (c *UnixConn) WriteToUnix(b []byte, addr *net.UnixAddr) (int, error)

type UnixListener

type UnixListener struct {
	// contains filtered or unexported fields
}

func ListenUnix

func ListenUnix(network string, addr *net.UnixAddr) (*UnixListener, error)

func (*UnixListener) Accept

func (ln *UnixListener) Accept() (net.Conn, error)

func (*UnixListener) AcceptUnix

func (ln *UnixListener) AcceptUnix() (c *UnixConn, err error)

func (*UnixListener) Addr

func (ln *UnixListener) Addr() net.Addr

func (*UnixListener) Close

func (ln *UnixListener) Close() error

func (*UnixListener) File

func (ln *UnixListener) File() (f *os.File, err error)

func (*UnixListener) SetDeadline

func (ln *UnixListener) SetDeadline(t time.Time) error

func (*UnixListener) SetUnlinkOnClose

func (ln *UnixListener) SetUnlinkOnClose(unlink bool)

func (*UnixListener) SyscallConn

func (ln *UnixListener) SyscallConn() (syscall.RawConn, error)

Directories

Path Synopsis
pkg
sys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL