rio

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2025 License: LGPL-3.0 Imports: 17 Imported by: 0

README

RIO

基于IOURINGAIO网络库,非CGO方式,且遵循标准库使用设计模式。

支持协议:TCPUDPUNIXUNIXGRAMIP为代理标准库)。

Linux 内核版本需要>= 5.14,推荐版本为>= 6.1

性能

测试环境:Win11(WSL2)、内核(6.6.36.6-microsoft-standard-WSL2)、CPU(13600K)。

基于默认参数的测试,10线程,每线程1000链接,共计10000链接。

RIO 相比 STD(go net 标准库)约快13%,详见 Benchmark

注意:CurveWaitTransmission 在不同环境下的性能体现是不同的,需按需调整来发挥出高效的性能。

http benchmark echo benchmark http benchmark

tcpkali压测。

类型 packet rate estimate
RIO 8967.3
NET 6866.4
  tcpkali --workers 1 -c 50 -T 10s -m "PING" 127.0.0.1:9000
------ RIO ------
Destination: [127.0.0.1]:9000
Interface lo address [127.0.0.1]:0
Interface lo address [10.255.255.254]:0
Using interface lo to connect to [127.0.0.1]:9000
Ramped up to 50 connections.
Total data sent:     125.1 MiB (131137536 bytes)
Total data received: 114.8 MiB (120352691 bytes)
Bandwidth per channel: 4.021⇅ Mbps (502.6 kBps)
Aggregate bandwidth: 96.206↓, 104.827↑ Mbps
Packet rate estimate: 8967.3↓, 9086.6↑ (2↓, 41↑ TCP MSS/op)
Test duration: 10.0079 s.
------ NET ------
Destination: [127.0.0.1]:9000
Interface lo address [127.0.0.1]:0
Interface lo address [10.255.255.254]:0
Using interface lo to connect to [127.0.0.1]:9000
Ramped up to 50 connections.
Total data sent:     93.3 MiB (97845248 bytes)
Total data received: 86.5 MiB (90741081 bytes)
Bandwidth per channel: 3.016⇅ Mbps (377.0 kBps)
Aggregate bandwidth: 72.565↓, 78.246↑ Mbps
Packet rate estimate: 6866.4↓, 6826.6↑ (2↓, 42↑ TCP MSS/op)
Test duration: 10.0038 s.
------ Benchmark ------
Port: 9000
Workers: 10
Count: 1000
NBytes: 1024
ECHO-RIO benching complete(1.564700361s): 6391 conn/sec, 6.2M inbounds/sec, 6.2M outbounds/sec, 0 failures
ECHO-STD benching complete(1.821161901s): 5491 conn/sec, 5.4M inbounds/sec, 5.4M outbounds/sec, 0 failures
HTTP-RIO benching complete(1.722059583s): 5807 conn/sec, 5.8M inbounds/sec, 5.8M outbounds/sec, 0 failures
HTTP-STD benching complete(1.948937829s): 5131 conn/sec, 5M inbounds/sec, 5M outbounds/sec, 0 failures

使用

go get -u github.com/brickingsoft/rio

基本使用rio替换net


// 将 net.Listen() 替换成 rio.Listen() 
ln, lnErr := rio.Listen("tcp", ":9000")
// 将 net.Dial() 替换成 rio.Dial() 
conn, dialErr := rio.Dial("tcp", "127.0.0.1:9000")

TLS场景:

// server("github.com/brickingsoft/rio/tls")
ln, _ = tls.Listen("tcp", "127.0.0.1:9000", tls.ConfigFrom(config))
// server(use wrap)
ln, _ := rio.Listen("tcp", ":9000")
ln, _ := tls.NewListener(ln, config)

// client("github.com/brickingsoft/rio/tls")
conn, _ = tls.Dial("tcp", "127.0.0.1:9000", tls.ConfigFrom(config))

// client(use wrap)
rawConn, dialErr := rio.Dial("tcp", "127.0.0.1:9000")
conn := tls.Client(rawConn, config)
if err := conn.HandshakeContext(ctx); err != nil {
	rawConn.Close()
	return nil, err
}

转换场景:

// tcp sendfile
reader, ok := conn.(io.ReaderFrom)
// 转换成 TCP 链接 
tcpConn, ok := conn.(*rio.TCPConn)
// 转换成 UDP 链接
udpConn, ok := conn.(*rio.UDPConn)
// 转换成 UNIX 链接
unixConn, ok := conn.(*rio.UnixConn)

纯客户端场景:

建议PINIOURING,直到程序退出再UNPIN

因为IOURING的生命周期为当被使用时开启,当被没有被使用时关闭。

因为Listen的生命周期往往和程序是一致的,所以IOURING为常驻状况。

Dial的生命周期是短的,往往是频繁Dial,所以需要PIN来常驻IOURING,而不是频繁启停。

// 程序启动位置
rio.Pin()
// 程序退出位置
rio.Unpin()

HTTP场景:

Server 使用Listener代替法。

Client 使用RoundTripper代替法。

// http server
http.Serve(ln, handler)
// fasthttp server
fasthttp.Serve(ln, handler)

REUSE PORT(监听TCP时自动启用):


lc := rio.ListenConfig{}
lc.SetReusePort(true)

ln, lnErr := lc.Listen(...)

进阶调参

通过设置环境变量进行调控,具体详见 IOURING

名称 说明
IOURING_ENTRIES 数字 环大小,默认为最大值 16384。
IOURING_FLAGS 文本 标识,如1 | 2
IOURING_FEATURES 文本 特性,如1 | 2
IOURING_PREPARE_BATCH_SIZE 数字 准备 SQE 的缓冲大小,默认为 SQ 的大小。
IOURING_USE_CPU_AFFILIATE 布尔 是否使用 CPU AFFILIATE
IOURING_CURVE_TRANSMISSION 文本 设置等待 CQ 策略曲线,如 1:1us, 8:1us

Documentation

Index

Constants

View Source
const (
	ReadFromFileUseMMapPolicy = int32(iota)
	ReadFromFileUseMixPolicy
)

Variables

View Source
var (
	DefaultDialer = Dialer{
		Timeout:         15 * time.Second,
		Deadline:        time.Time{},
		KeepAlive:       0,
		KeepAliveConfig: net.KeepAliveConfig{Enable: true},
		MultipathTCP:    false,
		FastOpen:        false,
		QuickAck:        false,
		UseSendZC:       false,
		Control:         nil,
		ControlContext:  nil,
	}
)

Functions

func Dial

func Dial(network string, address string) (net.Conn, error)

func DialContext

func DialContext(ctx context.Context, network string, address string) (net.Conn, error)

func DialTimeout

func DialTimeout(network string, address string, timeout time.Duration) (net.Conn, error)

func Listen

func Listen(network string, addr string) (ln net.Listener, err error)

func ListenPacket

func ListenPacket(network string, addr string) (c net.PacketConn, err error)

func Pin

func Pin() (err error)

Pin 钉住 IOURING 。 一般用于程序启动时。 这用手动管理 IOURING 的生命周期,一般用于只有 Dial 的使用。 注意:必须 Unpin 来关闭 IOURING 。

func Unpin

func Unpin() (err error)

Unpin 取钉。

func UseProcessPriority

func UseProcessPriority(level process.PriorityLevel)

func UseReadFromFilePolicy

func UseReadFromFilePolicy(policy int32)

Types

type Dialer

type Dialer struct {
	Timeout         time.Duration
	Deadline        time.Time
	KeepAlive       time.Duration
	KeepAliveConfig net.KeepAliveConfig
	MultipathTCP    bool
	FastOpen        bool
	QuickAck        bool
	UseSendZC       bool
	Control         func(network, address string, c syscall.RawConn) error
	ControlContext  func(ctx context.Context, network, address string, c syscall.RawConn) error
}

func (*Dialer) Dial

func (d *Dialer) Dial(network string, address string) (c net.Conn, err error)

func (*Dialer) DialContext

func (d *Dialer) DialContext(ctx context.Context, network, address string) (c net.Conn, err error)

func (*Dialer) DialIP

func (d *Dialer) DialIP(_ context.Context, network string, laddr, raddr *net.IPAddr) (*IPConn, error)

func (*Dialer) DialTCP

func (d *Dialer) DialTCP(ctx context.Context, network string, laddr, raddr *net.TCPAddr) (*TCPConn, error)

func (*Dialer) DialUDP

func (d *Dialer) DialUDP(ctx context.Context, network string, laddr, raddr *net.UDPAddr) (*UDPConn, error)

func (*Dialer) DialUnix

func (d *Dialer) DialUnix(ctx context.Context, network string, laddr, raddr *net.UnixAddr) (*UnixConn, error)

func (*Dialer) SetFastOpen

func (d *Dialer) SetFastOpen(use bool)

func (*Dialer) SetMultipathTCP

func (d *Dialer) SetMultipathTCP(use bool)

func (*Dialer) SetQuickAck

func (d *Dialer) SetQuickAck(use bool)

type IPConn

type IPConn struct {
	*net.IPConn
}

func DialIP

func DialIP(network string, laddr, raddr *net.IPAddr) (*IPConn, error)

func ListenIP

func ListenIP(network string, addr *net.IPAddr) (*IPConn, error)

type ListenConfig

type ListenConfig struct {
	Control         func(network, address string, c syscall.RawConn) error
	KeepAlive       time.Duration
	KeepAliveConfig net.KeepAliveConfig
	UseSendZC       bool
	MultipathTCP    bool
	FastOpen        bool
	QuickAck        bool
	ReusePort       bool
}

func (*ListenConfig) Listen

func (lc *ListenConfig) Listen(ctx context.Context, network string, address string) (ln net.Listener, err error)

func (*ListenConfig) ListenIP

func (lc *ListenConfig) ListenIP(_ context.Context, network string, addr *net.IPAddr) (*IPConn, error)

func (*ListenConfig) ListenMulticastUDP

func (lc *ListenConfig) ListenMulticastUDP(ctx context.Context, network string, ifi *net.Interface, addr *net.UDPAddr) (*UDPConn, error)

func (*ListenConfig) ListenPacket

func (lc *ListenConfig) ListenPacket(ctx context.Context, network, address string) (c net.PacketConn, err error)

func (*ListenConfig) ListenTCP

func (lc *ListenConfig) ListenTCP(ctx context.Context, network string, addr *net.TCPAddr) (*TCPListener, error)

func (*ListenConfig) ListenUDP

func (lc *ListenConfig) ListenUDP(ctx context.Context, network string, addr *net.UDPAddr) (*UDPConn, error)

func (*ListenConfig) ListenUnix

func (lc *ListenConfig) ListenUnix(ctx context.Context, network string, addr *net.UnixAddr) (*UnixListener, error)

func (*ListenConfig) ListenUnixgram

func (lc *ListenConfig) ListenUnixgram(ctx context.Context, network string, addr *net.UnixAddr) (*UnixConn, error)

func (*ListenConfig) SetFastOpen

func (lc *ListenConfig) SetFastOpen(use bool)

func (*ListenConfig) SetMultipathTCP

func (lc *ListenConfig) SetMultipathTCP(use bool)

func (*ListenConfig) SetQuickAck

func (lc *ListenConfig) SetQuickAck(use bool)

func (*ListenConfig) SetReusePort

func (lc *ListenConfig) SetReusePort(use bool)

func (*ListenConfig) SetSendZC

func (lc *ListenConfig) SetSendZC(use bool)

type TCPConn

type TCPConn struct {
	// contains filtered or unexported fields
}

func DialTCP

func DialTCP(network string, laddr, raddr *net.TCPAddr) (*TCPConn, error)

func (*TCPConn) Close

func (c *TCPConn) Close() error

func (*TCPConn) CloseRead

func (c *TCPConn) CloseRead() error

func (*TCPConn) CloseWrite

func (c *TCPConn) CloseWrite() error

func (*TCPConn) Context

func (c *TCPConn) Context() context.Context

func (*TCPConn) File

func (c *TCPConn) File() (f *os.File, err error)

func (*TCPConn) LocalAddr

func (c *TCPConn) LocalAddr() net.Addr

func (*TCPConn) MultipathTCP

func (c *TCPConn) MultipathTCP() (bool, error)

func (*TCPConn) Read

func (c *TCPConn) Read(b []byte) (n int, err error)

func (*TCPConn) ReadBuffer

func (c *TCPConn) ReadBuffer() (int, error)

func (*TCPConn) ReadFrom

func (c *TCPConn) ReadFrom(r io.Reader) (int64, error)

func (*TCPConn) RemoteAddr

func (c *TCPConn) RemoteAddr() net.Addr

func (*TCPConn) SetDeadline

func (c *TCPConn) SetDeadline(t time.Time) error

func (*TCPConn) SetKeepAlive

func (c *TCPConn) SetKeepAlive(keepalive bool) error

func (*TCPConn) SetKeepAliveConfig

func (c *TCPConn) SetKeepAliveConfig(config net.KeepAliveConfig) error

func (*TCPConn) SetKeepAlivePeriod

func (c *TCPConn) SetKeepAlivePeriod(period time.Duration) error

func (*TCPConn) SetLinger

func (c *TCPConn) SetLinger(sec int) error

func (*TCPConn) SetNoDelay

func (c *TCPConn) SetNoDelay(noDelay bool) error

func (*TCPConn) SetReadBuffer

func (c *TCPConn) SetReadBuffer(bytes int) error

func (*TCPConn) SetReadDeadline

func (c *TCPConn) SetReadDeadline(t time.Time) error

func (*TCPConn) SetWriteBuffer

func (c *TCPConn) SetWriteBuffer(bytes int) error

func (*TCPConn) SetWriteDeadline

func (c *TCPConn) SetWriteDeadline(t time.Time) error

func (*TCPConn) SyscallConn

func (c *TCPConn) SyscallConn() (syscall.RawConn, error)

func (*TCPConn) Write

func (c *TCPConn) Write(b []byte) (n int, err error)

func (*TCPConn) WriteBuffer

func (c *TCPConn) WriteBuffer() (int, error)

func (*TCPConn) WriteTo

func (c *TCPConn) WriteTo(w io.Writer) (int64, error)

type TCPListener

type TCPListener struct {
	// contains filtered or unexported fields
}

func ListenTCP

func ListenTCP(network string, addr *net.TCPAddr) (*TCPListener, error)

func (*TCPListener) Accept

func (ln *TCPListener) Accept() (net.Conn, error)

func (*TCPListener) AcceptTCP

func (ln *TCPListener) AcceptTCP() (tc *TCPConn, err error)

func (*TCPListener) Addr

func (ln *TCPListener) Addr() net.Addr

func (*TCPListener) Close

func (ln *TCPListener) Close() error

func (*TCPListener) File

func (ln *TCPListener) File() (f *os.File, err error)

func (*TCPListener) SetDeadline

func (ln *TCPListener) SetDeadline(t time.Time) error

func (*TCPListener) SyscallConn

func (ln *TCPListener) SyscallConn() (syscall.RawConn, error)

type UDPConn

type UDPConn struct {
	// contains filtered or unexported fields
}

func DialUDP

func DialUDP(network string, laddr, raddr *net.UDPAddr) (*UDPConn, error)

func ListenMulticastUDP

func ListenMulticastUDP(network string, ifi *net.Interface, addr *net.UDPAddr) (*UDPConn, error)

func ListenUDP

func ListenUDP(network string, addr *net.UDPAddr) (*UDPConn, error)

func (*UDPConn) Close

func (c *UDPConn) Close() error

func (*UDPConn) Context

func (c *UDPConn) Context() context.Context

func (*UDPConn) File

func (c *UDPConn) File() (f *os.File, err error)

func (*UDPConn) LocalAddr

func (c *UDPConn) LocalAddr() net.Addr

func (*UDPConn) Read

func (c *UDPConn) Read(b []byte) (n int, err error)

func (*UDPConn) ReadBuffer

func (c *UDPConn) ReadBuffer() (int, error)

func (*UDPConn) ReadFrom

func (c *UDPConn) ReadFrom(b []byte) (n int, addr net.Addr, err error)

func (*UDPConn) ReadFromUDP

func (c *UDPConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)

func (*UDPConn) ReadFromUDPAddrPort

func (c *UDPConn) ReadFromUDPAddrPort(b []byte) (n int, addr netip.AddrPort, err error)

func (*UDPConn) ReadMsgUDP

func (c *UDPConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error)

func (*UDPConn) ReadMsgUDPAddrPort

func (c *UDPConn) ReadMsgUDPAddrPort(b, oob []byte) (n, oobn, flags int, addr netip.AddrPort, err error)

func (*UDPConn) RemoteAddr

func (c *UDPConn) RemoteAddr() net.Addr

func (*UDPConn) SetDeadline

func (c *UDPConn) SetDeadline(t time.Time) error

func (*UDPConn) SetReadBuffer

func (c *UDPConn) SetReadBuffer(bytes int) error

func (*UDPConn) SetReadDeadline

func (c *UDPConn) SetReadDeadline(t time.Time) error

func (*UDPConn) SetWriteBuffer

func (c *UDPConn) SetWriteBuffer(bytes int) error

func (*UDPConn) SetWriteDeadline

func (c *UDPConn) SetWriteDeadline(t time.Time) error

func (*UDPConn) SyscallConn

func (c *UDPConn) SyscallConn() (syscall.RawConn, error)

func (*UDPConn) Write

func (c *UDPConn) Write(b []byte) (n int, err error)

func (*UDPConn) WriteBuffer

func (c *UDPConn) WriteBuffer() (int, error)

func (*UDPConn) WriteMsgUDP

func (c *UDPConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error)

func (*UDPConn) WriteMsgUDPAddrPort

func (c *UDPConn) WriteMsgUDPAddrPort(b, oob []byte, addr netip.AddrPort) (n, oobn int, err error)

func (*UDPConn) WriteTo

func (c *UDPConn) WriteTo(b []byte, addr net.Addr) (n int, err error)

func (*UDPConn) WriteToUDP

func (c *UDPConn) WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)

func (*UDPConn) WriteToUDPAddrPort

func (c *UDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (n int, err error)

type UnixConn

type UnixConn struct {
	// contains filtered or unexported fields
}

func DialUnix

func DialUnix(network string, laddr, raddr *net.UnixAddr) (*UnixConn, error)

func ListenUnixgram

func ListenUnixgram(network string, addr *net.UnixAddr) (*UnixConn, error)

func (*UnixConn) Close

func (c *UnixConn) Close() error

func (*UnixConn) Context

func (c *UnixConn) Context() context.Context

func (*UnixConn) File

func (c *UnixConn) File() (f *os.File, err error)

func (*UnixConn) LocalAddr

func (c *UnixConn) LocalAddr() net.Addr

func (*UnixConn) Read

func (c *UnixConn) Read(b []byte) (n int, err error)

func (*UnixConn) ReadBuffer

func (c *UnixConn) ReadBuffer() (int, error)

func (*UnixConn) ReadFrom

func (c *UnixConn) ReadFrom(b []byte) (int, net.Addr, error)

func (*UnixConn) ReadFromUnix

func (c *UnixConn) ReadFromUnix(b []byte) (n int, addr *net.UnixAddr, err error)

func (*UnixConn) ReadMsgUnix

func (c *UnixConn) ReadMsgUnix(b []byte, oob []byte) (n, oobn, flags int, addr *net.UnixAddr, err error)

func (*UnixConn) RemoteAddr

func (c *UnixConn) RemoteAddr() net.Addr

func (*UnixConn) SetDeadline

func (c *UnixConn) SetDeadline(t time.Time) error

func (*UnixConn) SetReadBuffer

func (c *UnixConn) SetReadBuffer(bytes int) error

func (*UnixConn) SetReadDeadline

func (c *UnixConn) SetReadDeadline(t time.Time) error

func (*UnixConn) SetWriteBuffer

func (c *UnixConn) SetWriteBuffer(bytes int) error

func (*UnixConn) SetWriteDeadline

func (c *UnixConn) SetWriteDeadline(t time.Time) error

func (*UnixConn) SyscallConn

func (c *UnixConn) SyscallConn() (syscall.RawConn, error)

func (*UnixConn) Write

func (c *UnixConn) Write(b []byte) (n int, err error)

func (*UnixConn) WriteBuffer

func (c *UnixConn) WriteBuffer() (int, error)

func (*UnixConn) WriteMsgUnix

func (c *UnixConn) WriteMsgUnix(b []byte, oob []byte, addr *net.UnixAddr) (n int, oobn int, err error)

func (*UnixConn) WriteTo

func (c *UnixConn) WriteTo(b []byte, addr net.Addr) (int, error)

func (*UnixConn) WriteToUnix

func (c *UnixConn) WriteToUnix(b []byte, addr *net.UnixAddr) (int, error)

type UnixListener

type UnixListener struct {
	// contains filtered or unexported fields
}

func ListenUnix

func ListenUnix(network string, addr *net.UnixAddr) (*UnixListener, error)

func (*UnixListener) Accept

func (ln *UnixListener) Accept() (net.Conn, error)

func (*UnixListener) AcceptUnix

func (ln *UnixListener) AcceptUnix() (c *UnixConn, err error)

func (*UnixListener) Addr

func (ln *UnixListener) Addr() net.Addr

func (*UnixListener) Close

func (ln *UnixListener) Close() error

func (*UnixListener) File

func (ln *UnixListener) File() (f *os.File, err error)

func (*UnixListener) SetDeadline

func (ln *UnixListener) SetDeadline(t time.Time) error

func (*UnixListener) SetUnlinkOnClose

func (ln *UnixListener) SetUnlinkOnClose(unlink bool)

func (*UnixListener) SyscallConn

func (ln *UnixListener) SyscallConn() (syscall.RawConn, error)

Directories

Path Synopsis
pkg
sys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL