yamux

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2023 License: MPL-2.0 Imports: 16 Imported by: 0

README

Yamux

Yamux (Yet another Multiplexer) is a multiplexing library for Golang. It relies on an underlying connection to provide reliability and ordering, such as TCP or Unix domain sockets, and provides stream-oriented multiplexing. It is inspired by SPDY but is not interoperable with it.

Yamux features include:

  • Bi-directional streams
    • Streams can be opened by either client or server
    • Useful for NAT traversal
    • Server-side push support
  • Flow control
    • Avoid starvation
    • Back-pressure to prevent overwhelming a receiver
  • Keep Alives
    • Enables persistent connections over a load balancer
  • Efficient
    • Enables thousands of logical streams with low overhead

Documentation

For complete documentation, see the associated Godoc.

Specification

The full specification for Yamux is provided in the spec.md file. It can be used as a guide to implementors of interoperable libraries.

Usage

Using Yamux is remarkably simple:


func client() {
    // Get a TCP connection
    conn, err := net.Dial(...)
    if err != nil {
        panic(err)
    }

    // Setup client side of yamux
    session, err := yamux.Client(conn, nil)
    if err != nil {
        panic(err)
    }

    // Open a new stream
    stream, err := session.Open()
    if err != nil {
        panic(err)
    }

    // Stream implements net.Conn
    stream.Write([]byte("ping"))
}

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

    // Setup server side of yamux
    session, err := yamux.Server(conn, nil)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.Accept()
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidVersion means we received a frame with an
	// invalid version
	ErrInvalidVersion = fmt.Errorf("invalid protocol version")

	// ErrInvalidMsgType means we received a frame with an
	// invalid message type
	ErrInvalidMsgType = fmt.Errorf("invalid msg type")

	// ErrSessionShutdown is used if there is a shutdown during
	// an operation
	ErrSessionShutdown = fmt.Errorf("session shutdown")

	// ErrStreamsExhausted is returned if we have no more
	// stream ids to issue
	ErrStreamsExhausted = fmt.Errorf("streams exhausted")

	// ErrDuplicateStream is used if a duplicate stream is
	// opened inbound
	ErrDuplicateStream = fmt.Errorf("duplicate stream initiated")

	// ErrReceiveWindowExceeded indicates the window was exceeded
	ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded")

	// ErrTimeout is used when we reach an IO deadline
	ErrTimeout = &NetError{
		err: fmt.Errorf("i/o deadline reached"),

		timeout: true,
	}

	// ErrStreamClosed is returned when using a closed stream
	ErrStreamClosed = fmt.Errorf("stream closed")

	// ErrUnexpectedFlag is set when we get an unexpected flag
	ErrUnexpectedFlag = fmt.Errorf("unexpected flag")

	// ErrRemoteGoAway is used when we get a go away from the other side
	ErrRemoteGoAway = fmt.Errorf("remote end is not accepting connections")

	// ErrConnectionReset is sent if a stream is reset. This can happen
	// if the backlog is exceeded, or if there was a remote GoAway.
	ErrConnectionReset = fmt.Errorf("connection reset")

	// ErrConnectionWriteTimeout indicates that we hit the "safety valve"
	// timeout writing to the underlying stream connection.
	ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout")

	// ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
	ErrKeepAliveTimeout = fmt.Errorf("keepalive timeout")
)

Functions

func VerifyConfig

func VerifyConfig(config *Config) error

VerifyConfig is used to verify the sanity of configuration

Types

type Config

type Config struct {
	// AcceptBacklog is used to limit how many streams may be
	// waiting an accept.
	AcceptBacklog int

	// EnableKeepalive is used to do a period keep alive
	// messages using a ping.
	EnableKeepAlive bool

	// KeepAliveInterval is how often to perform the keep alive
	KeepAliveInterval time.Duration

	// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
	// we which will suspect a problem with the underlying connection and
	// close it. This is only applied to writes, where's there's generally
	// an expectation that things will move along quickly.
	ConnectionWriteTimeout time.Duration

	// MaxStreamWindowSize is used to control the maximum
	// window size that we allow for a stream.
	MaxStreamWindowSize uint32

	// StreamOpenTimeout is the maximum amount of time that a stream will
	// be allowed to remain in pending state while waiting for an ack from the peer.
	// Once the timeout is reached the session will be gracefully closed.
	// A zero value disables the StreamOpenTimeout allowing unbounded
	// blocking on OpenStream calls.
	StreamOpenTimeout time.Duration

	// StreamCloseTimeout is the maximum time that a stream will allowed to
	// be in a half-closed state when `Close` is called before forcibly
	// closing the connection. Forcibly closed connections will empty the
	// receive buffer, drop any future packets received for that stream,
	// and send a RST to the remote side.
	StreamCloseTimeout time.Duration

	// LogOutput is used to control the log destination. Either Logger or
	// LogOutput can be set, not both.
	LogOutput io.Writer

	// Logger is used to pass in the logger to be used. Either Logger or
	// LogOutput can be set, not both.
	Logger Logger
}

Config is used to tune the Yamux session

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig is used to return a default configuration

type Logger

type Logger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

Logger is a abstract of *log.Logger

type NetError

type NetError struct {
	// contains filtered or unexported fields
}

NetError implements net.Error

func (*NetError) Error

func (e *NetError) Error() string

func (*NetError) Temporary

func (e *NetError) Temporary() bool

func (*NetError) Timeout

func (e *NetError) Timeout() bool

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session is used to wrap a reliable ordered connection and to multiplex it into multiple streams.

func Client

func Client(conn io.ReadWriteCloser, config *Config) (*Session, error)

Client is used to initialize a new client-side connection. There must be at most one client-side connection.

func Server

func Server(conn io.ReadWriteCloser, config *Config) (*Session, error)

Server is used to initialize a new server-side connection. There must be at most one server-side connection. If a nil config is provided, the DefaultConfiguration will be used.

func (*Session) Accept

func (s *Session) Accept() (net.Conn, error)

Accept is used to block until the next available stream is ready to be accepted.

func (*Session) AcceptStream

func (s *Session) AcceptStream() (*Stream, error)

AcceptStream is used to block until the next available stream is ready to be accepted.

func (*Session) AcceptStreamWithContext

func (s *Session) AcceptStreamWithContext(ctx context.Context) (*Stream, error)

AcceptStream is used to block until the next available stream is ready to be accepted.

func (*Session) Addr

func (s *Session) Addr() net.Addr

Addr is used to get the address of the listener.

func (*Session) Close

func (s *Session) Close() error

Close is used to close the session and all streams. Attempts to send a GoAway before closing the connection.

func (*Session) CloseChan

func (s *Session) CloseChan() <-chan struct{}

CloseChan returns a read-only channel which is closed as soon as the session is closed.

func (*Session) GoAway

func (s *Session) GoAway() error

GoAway can be used to prevent accepting further connections. It does not close the underlying conn.

func (*Session) IsClosed

func (s *Session) IsClosed() bool

IsClosed does a safe check to see if we have shutdown

func (*Session) LocalAddr

func (s *Session) LocalAddr() net.Addr

LocalAddr is used to get the local address of the underlying connection.

func (*Session) NumStreams

func (s *Session) NumStreams() int

NumStreams returns the number of currently open streams

func (*Session) Open

func (s *Session) Open() (net.Conn, error)

Open is used to create a new stream as a net.Conn

func (*Session) OpenStream

func (s *Session) OpenStream() (*Stream, error)

OpenStream is used to create a new stream

func (*Session) Ping

func (s *Session) Ping() (time.Duration, error)

Ping is used to measure the RTT response time

func (*Session) RemoteAddr

func (s *Session) RemoteAddr() net.Addr

RemoteAddr is used to get the address of remote end of the underlying connection

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is used to represent a logical stream within a session.

func (*Stream) Close

func (s *Stream) Close() error

Close is used to close the stream

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr returns the local address

func (*Stream) Read

func (s *Stream) Read(b []byte) (n int, err error)

Read is used to read from the stream

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr returns the remote address

func (*Stream) Session

func (s *Stream) Session() *Session

Session returns the associated stream session

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for blocked and future Read calls.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for blocked and future Write calls

func (*Stream) Shrink

func (s *Stream) Shrink()

Shrink is used to compact the amount of buffers utilized This is useful when using Yamux in a connection pool to reduce the idle memory utilization.

func (*Stream) StreamID

func (s *Stream) StreamID() uint32

StreamID returns the ID of this stream

func (*Stream) Write

func (s *Stream) Write(b []byte) (n int, err error)

Write is used to write to the stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL