yamux

package module
v3.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2022 License: MPL-2.0 Imports: 16 Imported by: 0

README

Yamux

Yamux (Yet another Multiplexer) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or Unix domain sockets, and provides stream-oriented multiplexing. It is inspired by SPDY but is not interoperable with it.

Yamux features include:

  • Bi-directional streams
    • Streams can be opened by either client or server
    • Server-side push support
  • Flow control
    • Avoid starvation
    • Back-pressure to prevent overwhelming a receiver
  • Keep Alives
    • Enables persistent connections over a load balancer
  • Efficient
    • Enables thousands of logical streams with low overhead

Documentation

For complete documentation, see the associated Godoc.

Specification

The full specification for Yamux is provided in the spec.md file. It can be used as a guide to implementors of interoperable libraries.

Usage

Using Yamux is remarkably simple:


func client() {
    // Get a TCP connection
    conn, err := net.Dial(...)
    if err != nil {
        panic(err)
    }

    // Setup client side of yamux
    session, err := yamux.Client(conn, nil)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := session.Open()
    if err != nil {
        panic(err)
    }

    // Stream implements net.Conn
    stream.Write([]byte("ping"))
}

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

    // Setup server side of yamux
    session, err := yamux.Server(conn, nil)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.Accept()
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
}


The last gx published version of this module was: 1.1.5: QmUNMbRUsVYHi1D14annF7Rr7pQAX7TNLwpRCa975ojKnw

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidVersion means we received a frame with an
	// invalid version
	ErrInvalidVersion = &Error{msg: "invalid protocol version"}

	// ErrInvalidMsgType means we received a frame with an
	// invalid message type
	ErrInvalidMsgType = &Error{msg: "invalid msg type"}

	// ErrSessionShutdown is used if there is a shutdown during
	// an operation
	ErrSessionShutdown = &Error{msg: "session shutdown"}

	// ErrStreamsExhausted is returned if we have no more
	// stream ids to issue
	ErrStreamsExhausted = &Error{msg: "streams exhausted"}

	// ErrDuplicateStream is used if a duplicate stream is
	// opened inbound
	ErrDuplicateStream = &Error{msg: "duplicate stream initiated"}

	// ErrReceiveWindowExceeded indicates the window was exceeded
	ErrRecvWindowExceeded = &Error{msg: "recv window exceeded"}

	// ErrTimeout is used when we reach an IO deadline
	ErrTimeout = &Error{msg: "i/o deadline reached", timeout: true, temporary: true}

	// ErrStreamClosed is returned when using a closed stream
	ErrStreamClosed = &Error{msg: "stream closed"}

	// ErrUnexpectedFlag is set when we get an unexpected flag
	ErrUnexpectedFlag = &Error{msg: "unexpected flag"}

	// ErrRemoteGoAway is used when we get a go away from the other side
	ErrRemoteGoAway = &Error{msg: "remote end is not accepting connections"}

	// ErrStreamReset is sent if a stream is reset. This can happen
	// if the backlog is exceeded, or if there was a remote GoAway.
	ErrStreamReset = &Error{msg: "stream reset"}

	// ErrConnectionWriteTimeout indicates that we hit the "safety valve"
	// timeout writing to the underlying stream connection.
	ErrConnectionWriteTimeout = &Error{msg: "connection write timeout", timeout: true}

	// ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
	ErrKeepAliveTimeout = &Error{msg: "keepalive timeout", timeout: true}
)

Functions

func VerifyConfig

func VerifyConfig(config *Config) error

VerifyConfig is used to verify the sanity of configuration

Types

type Config

type Config struct {
	// AcceptBacklog is used to limit how many streams may be
	// waiting an accept.
	AcceptBacklog int

	// PingBacklog is used to limit how many ping acks we can queue.
	PingBacklog int

	// EnableKeepalive is used to do a period keep alive
	// messages using a ping.
	EnableKeepAlive bool

	// KeepAliveInterval is how often to perform the keep alive
	KeepAliveInterval time.Duration

	// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
	// we which will suspect a problem with the underlying connection and
	// close it. This is only applied to writes, where's there's generally
	// an expectation that things will move along quickly.
	ConnectionWriteTimeout time.Duration

	// MaxIncomingStreams is maximum number of concurrent incoming streams
	// that we accept. If the peer tries to open more streams, those will be
	// reset immediately.
	MaxIncomingStreams uint32

	// InitialStreamWindowSize is used to control the initial
	// window size that we allow for a stream.
	InitialStreamWindowSize uint32

	// MaxStreamWindowSize is used to control the maximum
	// window size that we allow for a stream.
	MaxStreamWindowSize uint32

	// LogOutput is used to control the log destination
	LogOutput io.Writer

	// ReadBufSize controls the size of the read buffer.
	//
	// Set to 0 to disable it.
	ReadBufSize int

	// WriteCoalesceDelay is the maximum amount of time we'll delay
	// coalescing a packet before sending it. This should be on the order of
	// micro-milliseconds.
	WriteCoalesceDelay time.Duration

	// MaxMessageSize is the maximum size of a message that we'll send on a
	// stream. This ensures that a single stream doesn't hog a connection.
	MaxMessageSize uint32
}

Config is used to tune the Yamux session

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig is used to return a default configuration

type Error

type Error struct {
	// contains filtered or unexported fields
}

func (*Error) Error

func (ye *Error) Error() string

func (*Error) Temporary

func (ye *Error) Temporary() bool

func (*Error) Timeout

func (ye *Error) Timeout() bool

type MemoryManager

type MemoryManager interface {
	// ReserveMemory reserves memory / buffer.
	ReserveMemory(size int, prio uint8) error
	// ReleaseMemory explicitly releases memory previously reserved with ReserveMemory
	ReleaseMemory(size int)
}

The MemoryManager allows management of memory allocations. Memory is allocated: 1. When opening / accepting a new stream. This uses the highest priority. 2. When trying to increase the stream receive window. This uses a lower priority.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is used to wrap a reliable ordered connection and to multiplex it into multiple streams.

func Client

func Client(conn net.Conn, config *Config, mm MemoryManager) (*Session, error)

Client is used to initialize a new client-side connection. There must be at most one client-side connection.

func Server

func Server(conn net.Conn, config *Config, mm MemoryManager) (*Session, error)

Server is used to initialize a new server-side connection. There must be at most one server-side connection. If a nil config is provided, the DefaultConfiguration will be used.

func (*Session) Accept

func (s *Session) Accept() (net.Conn, error)

Accept is used to block until the next available stream is ready to be accepted.

func (*Session) AcceptStream

func (s *Session) AcceptStream() (*Stream, error)

AcceptStream is used to block until the next available stream is ready to be accepted.

func (*Session) Addr

func (s *Session) Addr() net.Addr

Addr is used to get the address of the listener.

func (*Session) Close

func (s *Session) Close() error

Close is used to close the session and all streams. Attempts to send a GoAway before closing the connection.

func (*Session) CloseChan

func (s *Session) CloseChan() <-chan struct{}

CloseChan returns a read-only channel which is closed as soon as the session is closed.

func (*Session) GoAway

func (s *Session) GoAway() error

GoAway can be used to prevent accepting further connections. It does not close the underlying conn.

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed does a safe check to see if we have shutdown

func (*Session) LocalAddr

func (s *Session) LocalAddr() net.Addr

LocalAddr is used to get the local address of the underlying connection.

func (*Session) NumStreams

func (s *Session) NumStreams() int

NumStreams returns the number of currently open streams

func (*Session) Open

func (s *Session) Open(ctx context.Context) (net.Conn, error)

Open is used to create a new stream as a net.Conn

func (*Session) OpenStream

func (s *Session) OpenStream(ctx context.Context) (*Stream, error)

OpenStream is used to create a new stream

func (*Session) Ping

func (s *Session) Ping() (dur time.Duration, err error)

Ping is used to measure the RTT response time

func (*Session) RemoteAddr

func (s *Session) RemoteAddr() net.Addr

RemoteAddr is used to get the address of remote end of the underlying connection

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is used to represent a logical stream within a session.

func (*Stream) Close

func (s *Stream) Close() error

Close is used to close the stream.

func (*Stream) CloseRead

func (s *Stream) CloseRead() error

CloseRead is used to close the stream for writing.

func (*Stream) CloseWrite

func (s *Stream) CloseWrite() error

CloseWrite is used to close the stream for writing.

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr returns the local address

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

Read is used to read from the stream

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr returns the remote address

func (*Stream) Reset

func (s *Stream) Reset() error

Reset resets the stream (forcibly closes the stream)

func (*Stream) Session

func (s *Stream) Session() *Session

Session returns the associated stream session

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future Write calls

func (*Stream) StreamID

func (s *Stream) StreamID() uint32

StreamID returns the ID of this stream

func (*Stream) Write

func (s *Stream) Write(b []byte) (int, error)

Write is used to write to the stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL