mux

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: MIT Imports: 10 Imported by: 0

README

simple-mux

Go Reference

A simple connection multiplexing library for Golang.

  • KISS (keep it simple stupid).
  • Almost Zero-alloc. Still needs to allocate buffers, but they will be reused.
  • Small overhead. Data frame has 7 bytes header and can have 65535 bytes payload.
  • Streams can be opened by either client or server.
  • Builtin flow control and rx buffer for each stream. A slow io stream won't affect other streams in the same session.
  • Builtin session ping&pong for health check and keepalive.

Example

package main

import (
	"fmt"
	"io"
	"net"

	smux "github.com/IrineSistiana/simple-mux"
)

func main() {
	panicIfErr := func(err error) {
		if err != nil {
			panic(err.Error())
		}
	}

	clientConn, serverConn := net.Pipe()
	clientSession := smux.NewSession(clientConn, smux.Opts{})
	serverSession := smux.NewSession(serverConn, smux.Opts{AllowAccept: true})

	go func() {
		stream, err := clientSession.OpenStream()
		panicIfErr(err)
		defer stream.Close()

		_, err = stream.Write([]byte("hello world"))
		panicIfErr(err)
	}()

	clientStream, err := serverSession.Accept()
	panicIfErr(err)

	b, err := io.ReadAll(clientStream)
	panicIfErr(err)
	
	fmt.Printf("received msg from client: %s\n", b)
}

Benchmark

# Sending data through 8 streams concurrently via a single TCP loopback connection.
Benchmark_Mux_Concurrent_IO_Through_Single_TCP-8           89846             11995 ns/op              1302 Mb/s        2 B/op          0 allocs/op

# Sending data directly through a TCP loopback connection.
Benchmark_IO_Through_Single_TCP-8                         125737              8676 ns/op              1799 Mb/s        0 B/op          0 allocs/op

# Single cpu
Benchmark_Mux_Concurrent_IO_Through_Single_TCP     47194             25205 ns/op               619.6 Mb/s              1 B/op             0 allocs/op
Benchmark_IO_Through_Single_TCP                    56523             18854 ns/op               828.6 Mb/s              0 B/op             0 allocs/op
PASS

Documentation

Index

Constants

View Source
const (
	MinWindow = 64*1024 - 1
	MaxWindow = 1<<31 - 1
)
View Source
const (
	MaxStreamNum = 1<<31 - 1
)

Variables

View Source
var (
	ErrClosedSession      = errors.New("closed session")
	ErrStreamIdOverFlowed = errors.New("stream id is overflowed")
	ErrInvalidSynFrame    = errors.New("invalid syn frame")
	ErrInvalidSID         = errors.New("invalid stream id")
	ErrPingTimeout        = errors.New("ping timed out")
	ErrIdleTimeout        = errors.New("idle timed out")
	ErrAcceptNotAllowed   = errors.New("accept is not allowed")
	ErrFlowWindowOverflow = errors.New("flow control window overflowed")
)
View Source
var (
	ErrClosedStream = errors.New("closed stream")
)

Functions

This section is empty.

Types

type Opts

type Opts struct {
	// AllowAccept indicates this Session can accept streams
	// from peer. If AllowAccept is false and peer sends a SYN
	// frame, the Session will be closed with ErrInvalidSynFrame.
	AllowAccept bool

	// StreamReceiveWindow sets the default size of receive window when
	// a stream was opened/accepted.
	// Minimum rx window size is 64k and maximum is (1<<32 - 1).
	// If StreamReceiveWindow is invalid, the closest limit will
	// be used. Which means a zero value is 64k.
	StreamReceiveWindow uint32

	// PingInterval indicates how long will this Session sends a
	// ping request to the peer. Zero value means no ping will be sent.
	PingInterval time.Duration

	// PingTimeout indicates how long will this Session be closed with
	// ErrPingTimeout if no further data (any data, not just a pong) was
	// received after a ping was sent.
	// Default is 10s.
	// If PingTimeout > PingInterval, PingInterval will be used.
	PingTimeout time.Duration

	// IdleTimeout indicates how long will this Session be closed with
	// ErrIdleTimeout if no data (excluding ping and pong) was transmitted.
	// Zero value means no idle timeout.
	IdleTimeout time.Duration
}

type Session

type Session struct {
	// contains filtered or unexported fields
}

func NewSession

func NewSession(c io.ReadWriteCloser, opts Opts) *Session

func (*Session) Accept

func (s *Session) Accept() (*Stream, error)

Accept accepts a Stream from peer. Session must be created with Opts.AllowAccept. Otherwise, Accept returns ErrAcceptNotAllowed. A Stream must be Accept-ed ASAP. Otherwise, all read operations of this Session (all its streams) will be blocked.

func (*Session) Close

func (s *Session) Close() error

Close closes Session and all its Stream-s.

func (*Session) CloseErr

func (s *Session) CloseErr() error

CloseErr returns the error that closes the Session. If Session wasn't closed, it returns nil.

func (*Session) Closed

func (s *Session) Closed() bool

Closed reports whether this Session was closed. This is a faster way than checking CloseErr.

func (*Session) OngoingStreams

func (s *Session) OngoingStreams() int

OngoingStreams reports how many streams are currently in this Session.

func (*Session) OpenStream

func (s *Session) OpenStream() (*Stream, error)

OpenStream opens a stream. Returns: ErrClosedSession if Session was closed. ErrStreamIdOverFlowed if Session has opened too many streams (see MaxStreamNum). Any error that inner connection returns while sending syn frame.

func (*Session) ReserveStream

func (s *Session) ReserveStream() bool

ReserveStream reserves a stream id for the next OpenStream call. It returns false if stream id was overflowed. (> MaxStreamNum)

func (*Session) SubConn

func (s *Session) SubConn() io.ReadWriteCloser

SubConn returns the io.ReadWriteCloser that created this Session. This is for accessing info only. DO NOT r/w/c this sub connection.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func (*Stream) Close

func (s *Stream) Close() error

Close implements io.Closer. Close interrupts Read and Write.

func (*Stream) ID

func (s *Stream) ID() int32

ID returns the stream's id. Negative id means the stream is opened by peer.

func (*Stream) Read

func (s *Stream) Read(p []byte) (n int, err error)

Read implements io.Reader.

func (*Stream) ReadBufferSize

func (s *Stream) ReadBufferSize() int

ReadBufferSize returns the current buffer size that needs to be read. Useful to determine the buffer size for the next Read.

func (*Stream) Session

func (s *Stream) Session() *Session

Session returns the Session that this Stream is belonged to.

func (*Stream) SetRxWindowSize

func (s *Stream) SetRxWindowSize(n uint32)

SetRxWindowSize sets the stream rx windows size. If n is invalid, the closest limit will be used.

func (*Stream) Write

func (s *Stream) Write(p []byte) (n int, err error)

Write implements io.Writer.

func (*Stream) WriteTo

func (s *Stream) WriteTo(w io.Writer) (int64, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL