gmux

package
v0.0.0-...-2bcc11b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: GPL-3.0, MIT Imports: 11 Imported by: 0

README

smux

GoDoc MIT licensed Build Status Go Report Card Coverage Statusd Sourcegraph

smux

Introduction

gmux makes underlying net.Conn to upper level multiplexing connection, and user could open thousands of logical streams from the upper level multiplexing connection.

gmux ( Named MUltipleXing) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP, and provides stream-oriented multiplexing. The original intention of this library is to power the connection management for kcp-go.

Features

  1. Token bucket controlled receiving, which provides smoother bandwidth graph(see picture below).
  2. Session-wide receive buffer, shared among streams, fully controlled overall memory usage.
  3. Minimized header(8Bytes), maximized payload.
  4. Well-tested on millions of devices in kcptun.
  5. Builtin fair queue traffic shaping.
  6. Per-stream sliding window to control congestion.(protocol version 2+).

smooth bandwidth curve

Documentation

For complete documentation, see the associated Godoc.

Benchmark

$ go test -v -run=^$ -bench .
goos: darwin
goarch: amd64
pkg: github.com/xtaci/smux
BenchmarkMSB-4           	30000000	        51.8 ns/op
BenchmarkAcceptClose-4   	   50000	     36783 ns/op
BenchmarkConnSmux-4      	   30000	     58335 ns/op	2246.88 MB/s	    1208 B/op	      19 allocs/op
BenchmarkConnTCP-4       	   50000	     25579 ns/op	5124.04 MB/s	       0 B/op	       0 allocs/op
PASS
ok  	github.com/xtaci/smux	7.811s

Specification

VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)  

VALUES FOR LATEST VERSION:
VERSION:
    1/2
    
CMD:
    cmdSYN(0)
    cmdFIN(1)
    cmdPSH(2)
    cmdNOP(3)
    cmdUPD(4)	// only supported on version 2
    
STREAMID:
    client use odd numbers starts from 1
    server use even numbers starts from 0
    
cmdUPD:
    | CONSUMED(4B) | WINDOW(4B) |

Usage


func client() {
    // Get a TCP connection
    conn, err := net.Dial(...)
    if err != nil {
        panic(err)
    }

    // Setup client side of smux
    session, err := smux.Client(conn, nil)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := session.OpenStream()
    if err != nil {
        panic(err)
    }

    // Stream implements io.ReadWriteCloser
    stream.Write([]byte("ping"))
    stream.Close()
    session.Close()
}

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

    // Setup server side of smux
    session, err := smux.Server(conn, nil)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.AcceptStream()
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
    stream.Close()
    session.Close()
}

Thanks

It is based on https://github.com/xtaci/smux/tree/09e2c01560df5aaaed50e48e77547858e2623498

Thank you!

Documentation

Overview

Package gmux is a multiplexing library for Golang.

It relies on an underlying connection to provide reliability and ordering, such as TCP or KCP, and provides stream-oriented multiplexing over a single channel.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidProtocol = errors.New("invalid protocol")
	ErrConsumed        = errors.New("peer consumed more than sent")
	ErrGoAway          = errors.New("stream id overflows, should start a new connection")
	ErrTimeout         = errors.New("timeout")
	ErrWouldBlock      = errors.New("operation would block on IO")
	ErrInvalidCommand  = "invalid command 0x%x"
	ErrInvalidVersion  = "invalid version %d from %s"
)

Functions

func VerifyConfig

func VerifyConfig(config *Config) error

VerifyConfig is used to verify the sanity of configuration

Types

type Allocator

type Allocator struct {
	// contains filtered or unexported fields
}

Allocator for incoming frames, optimized to prevent overwriting after zeroing

func NewAllocator

func NewAllocator() *Allocator

NewAllocator initiates a []byte allocator for frames less than 65536 bytes, the waste(memory fragmentation) of space allocation is guaranteed to be no more than 50%.

func (*Allocator) Get

func (alloc *Allocator) Get(size int) []byte

Get a []byte from pool with most appropriate cap

func (*Allocator) Put

func (alloc *Allocator) Put(buf []byte) error

Put returns a []byte to pool for future use, which the cap must be exactly 2^n

type CloseNotifier

type CloseNotifier func(stream StreamIF, ctx interface{})

type Config

type Config struct {
	// SMUX Protocol version, support 1,2
	Version int

	// Disabled keepalive
	KeepAliveDisabled bool

	// KeepAliveInterval is how often to send a NOP command to the remote
	KeepAliveInterval time.Duration

	// KeepAliveTimeout is how long the session
	// will be closed if no data has arrived
	KeepAliveTimeout time.Duration

	// MaxFrameSize is used to control the maximum
	// frame size to sent to the remote
	MaxFrameSize int

	// MaxReceiveBuffer is used to control the maximum
	// number of data in the buffer pool
	MaxReceiveBuffer int

	// MaxStreamBuffer is used to control the maximum
	// number of data per stream
	MaxStreamBuffer int
}

Config is used to tune the Smux session

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig is used to return a default configuration

type Frame

type Frame struct {
	// contains filtered or unexported fields
}

Frame defines a packet from or to be multiplexed into a single connection Frame is friendly to upper layer users, but when sending data over the network, the data is not organized by Frame, but by rawHeader.

type MuxConnIF

type MuxConnIF interface {
	Open(streamName string) (io.ReadWriteCloser, error)
	Accept() (io.ReadWriteCloser, error)
	IsClosed() bool
	NumStreams() int
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	Close() error
}

MuxConnIF is an interface for upper level multiplexing connection which based on underlying net.Conn.

type Session

type Session struct {
	// contains filtered or unexported fields
}

TODO: rename to MuxConn Session defines a multiplexed connection for streams

func NewClient

func NewClient(conn io.ReadWriteCloser, config *Config) (*Session, error)

NewClient is used to initialize a new client-side connection. It wraps client side underlying net.Conn to upper level multiplexing connection.

func NewServer

func NewServer(conn io.ReadWriteCloser, config *Config) (*Session, error)

NewServer is used to initialize a new server-side connection.

func (*Session) Accept

func (s *Session) Accept() (io.ReadWriteCloser, error)

Accept Returns a generic ReadWriteCloser instead of smux.Stream

func (*Session) AcceptStream

func (s *Session) AcceptStream() (*Stream, error)

AcceptStream is used to block until the next available stream is ready to be accepted.

func (*Session) Close

func (s *Session) Close() error

Close is used to close the session and all streams.

func (*Session) GetNoDataTimeout

func (s *Session) GetNoDataTimeout() time.Duration

GetNoDataTimeout returns current noDataTimeout.

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed does a safe check to see if we have shutdown

func (*Session) LocalAddr

func (s *Session) LocalAddr() net.Addr

LocalAddr satisfies net.Conn interface

func (*Session) NumStreams

func (s *Session) NumStreams() int

NumStreams returns the number of currently open streams

func (*Session) Open

func (s *Session) Open(streamName string) (io.ReadWriteCloser, error)

Open returns a generic ReadWriteCloser

func (*Session) OpenStream

func (s *Session) OpenStream(streamName string) (*Stream, error)

OpenStream is used to create a new stream

func (*Session) RemoteAddr

func (s *Session) RemoteAddr() net.Addr

RemoteAddr satisfies net.Conn interface

func (*Session) SetDeadline

func (s *Session) SetDeadline(t time.Time) error

SetDeadline sets a deadline used by Accept* calls. A zero time value disables the deadline.

func (*Session) SetNoDataTimeout

func (s *Session) SetNoDataTimeout(noDataTimeout time.Duration) error

SetNoDataTimeout could be called multiple times, even if Read/Write/WriteTo is being executed or after.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream implements net.Conn, it is a logical stream.

func (*Stream) Close

func (s *Stream) Close() error

Close implements net.Conn

func (*Stream) GetDieCh

func (s *Stream) GetDieCh() <-chan struct{}

GetDieCh returns a readonly chan which can be readable when the stream is to be closed.

func (*Stream) GetNoDataTimeout

func (s *Stream) GetNoDataTimeout() time.Duration

GetNoDataTimeout returns current noDataTimeout.

func (*Stream) ID

func (s *Stream) ID() uint32

ID returns the unique stream ID.

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr satisfies net.Conn interface

func (*Stream) Name

func (s *Stream) Name() string

Name returns the stream name.

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

Read implements net.Conn

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr satisfies net.Conn interface

func (*Stream) SetCloseNotifier

func (s *Stream) SetCloseNotifier(notifier CloseNotifier, ctx interface{})

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets both read and write deadlines as defined by net.Conn.SetDeadline. A zero time value disables the deadlines.

func (*Stream) SetNoDataTimeout

func (s *Stream) SetNoDataTimeout(noDataTimeout time.Duration) error

SetNoDataTimeout could be called multiple times, even if Read/Write/WriteTo is being executed or after.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the read deadline as defined by net.Conn.SetReadDeadline. A zero time value disables the deadline.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the write deadline as defined by net.Conn.SetWriteDeadline. A zero time value disables the deadline.

func (*Stream) Write

func (s *Stream) Write(b []byte) (n int, err error)

Write implements net.Conn

Note that the behavior when multiple goroutines write concurrently is not deterministic, frames may interleave in random way.

func (*Stream) WriteTo

func (s *Stream) WriteTo(w io.Writer) (n int64, err error)

WriteTo implements io.WriteTo

type StreamIF

type StreamIF interface {
	ID() uint32
	Name() string
	GetNoDataTimeout() time.Duration
	SetNoDataTimeout(noDataTimeout time.Duration) error
	SetCloseNotifier(notifier CloseNotifier, ctx interface{})
	net.Conn
}

StreamIF is an interface for logical stream, it implements net.Conn.

Notes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL