multiplexing

package
v0.13.0-beta2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package multiplexing provides stream multiplexing.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMultiplexerClosed is returned from operations that fail due to a
	// multiplexer being closed.
	ErrMultiplexerClosed = errors.New("multiplexer closed")
	// ErrStreamRejected is returned from open operations that fail due to the
	// remote endpoint rejecting the open request.
	ErrStreamRejected = errors.New("stream rejected")
)
View Source
var (
	// ErrWriteClosed is returned from operations that fail due to a stream
	// being closed for writing. It is analgous to net.ErrClosed, but indicates
	// that only the write portion of a stream is closed.
	ErrWriteClosed = errors.New("closed for writing")
)

Functions

This section is empty.

Types

type Carrier

type Carrier interface {
	io.Reader
	io.ByteReader
	// Discard attempts to discard the next n bytes from the stream, returning
	// the number of bytes discarded and any error that occurred. The returned
	// error must be non-nil if and only if discarded != n.
	Discard(n int) (discarded int, err error)
	io.Writer
	io.Closer
}

Carrier is the interface that the streams used for multiplexing must implement. It imposes the additional constraint that the Close method must unblock any pending read, discard, or write operations. This interface can be implemented by custom code, but it can also be implemented efficiently using the NewCarrierFromStream function.

func NewCarrierFromStream

func NewCarrierFromStream(stream io.ReadWriteCloser) Carrier

NewCarrierFromStream constructs a new Carrier by wrapping an underlying io.ReadWriteCloser. The underlying stream must have the property that its Close method unblocks any pending Read or Write calls.

type Configuration

type Configuration struct {
	// StreamReceiveWindow is the size (in bytes) of the stream receive window
	// (which also sets the size of the local per-stream receive buffer). If
	// less than or equal to 0, then no inbound data will be allowed and all
	// writes on the other end of the stream will block until write deadline
	// expiration or a call to CloseWrite or Close. The default value is 64 kB.
	StreamReceiveWindow int
	// WriteBufferCount is the number of write buffers to use within the
	// multiplexer. Each write buffer contains a fixed amount of internal
	// storage, currently 65548 bytes. If less than or equal to 0, then a single
	// write buffer will be created. The default is 5.
	WriteBufferCount int
	// AcceptBacklog is the maximum number of concurrent pending inbound open
	// requests that will be allowed. If less than or equal to 0, then it will
	// be set to 1. The default value is 10.
	AcceptBacklog int
	// HeartbeatTransmitInterval is the interval on which heartbeats will be
	// transmitted. If less than or equal to 0, then heartbeat transmission will
	// be disabled. The default interval is 5 seconds.
	HeartbeatTransmitInterval time.Duration
	// MaximumHeartbeatReceiveInterval is the maximum amount of time that the
	// multiplexer will be allowed to operate without receiving a heartbeat
	// message from the remote. If less than or equal to 0, remote heartbeats
	// will be processed but not required. The default interval is 10 seconds.
	MaximumHeartbeatReceiveInterval time.Duration
}

Configuration encodes multiplexer configuration.

func DefaultConfiguration

func DefaultConfiguration() *Configuration

DefaultConfiguration returns the default multiplexer configuration.

type Multiplexer

type Multiplexer struct {
	// contains filtered or unexported fields
}

Multiplexer provides bidirectional stream multiplexing.

func Multiplex

func Multiplex(carrier Carrier, even bool, configuration *Configuration) *Multiplexer

Multiplex creates a new multiplexer on top of an existing carrier stream. The multiplexer takes ownership of the carrier, so it should not be used directly after being passed to this function.

Multiplexers are symmetric, meaning that a multiplexer at either end of the carrier can both open and accept connections. However, a single asymmetric parameter is required to avoid the need for negotiating stream identifiers, so the even parameter must be set to true on one endpoint and false on the other (using some implicit or out-of-band coordination mechanism, such as false for client and true for server). The value of even has no observable effect on the multiplexer, other than determining the evenness of outbound stream identifiers.

If configuration is nil, the default configuration will be used.

func (*Multiplexer) Accept

func (m *Multiplexer) Accept() (net.Conn, error)

Accept implements net.Listener.Accept. It is implemented as a wrapper around AcceptStream and simply casts the resulting stream to a net.Conn.

func (*Multiplexer) AcceptStream

func (m *Multiplexer) AcceptStream(ctx context.Context) (*Stream, error)

AcceptContext accepts an incoming stream.

func (*Multiplexer) Addr

func (m *Multiplexer) Addr() net.Addr

Addr implements net.Listener.Addr.

func (*Multiplexer) Close

func (m *Multiplexer) Close() error

Close implements net.Listener.Close. Only the first call to Close will have any effect. Subsequent calls will behave as no-ops and return nil errors.

func (*Multiplexer) Closed

func (m *Multiplexer) Closed() <-chan struct{}

Closed returns a channel that is closed when the multiplexer is closed (due to either internal failure or a manual call to Close).

func (*Multiplexer) InternalError

func (m *Multiplexer) InternalError() error

InternalError returns any internal error that caused the multiplexer to close (as indicated by closure of the result of Closed). It returns nil if Close was manually invoked.

func (*Multiplexer) OpenStream

func (m *Multiplexer) OpenStream(ctx context.Context) (*Stream, error)

OpenStream opens a new stream, cancelling the open operation if the provided context is cancelled, an error occurs, or the multiplexer is closed. The context must not be nil. The context only regulates the lifetime of the open operation, not the stream itself.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a single multiplexed stream. It implements net.Conn but also provides a CloseWrite method for half-closures.

func (*Stream) Close

func (s *Stream) Close() error

Close implements net.Conn.Close. Subsequent calls to Close are no-ops and will return nil.

func (*Stream) CloseWrite

func (s *Stream) CloseWrite() error

CloseWrite performs half-closure (write-closure) of the stream. Any blocked Write or SetWriteDeadline calls will be unblocked. Subsequent calls to CloseWrite are no-ops and will return nil.

func (*Stream) LocalAddr

func (s *Stream) LocalAddr() net.Addr

LocalAddr implements net.Conn.LocalAddr.

func (*Stream) Read

func (s *Stream) Read(buffer []byte) (int, error)

Read implements net.Conn.Read.

func (*Stream) RemoteAddr

func (s *Stream) RemoteAddr() net.Addr

RemoteAddr implements net.Conn.RemoteAddr.

func (*Stream) SetDeadline

func (s *Stream) SetDeadline(deadline time.Time) error

SetDeadline implements net.Conn.SetDeadline.

func (*Stream) SetReadDeadline

func (s *Stream) SetReadDeadline(deadline time.Time) error

SetReadDeadline implements net.Conn.SetReadDeadline.

func (*Stream) SetWriteDeadline

func (s *Stream) SetWriteDeadline(deadline time.Time) error

SetWriteDeadline implements net.Conn.SetWriteDeadline.

func (*Stream) Write

func (s *Stream) Write(data []byte) (int, error)

Write implements net.Conn.Write.

Directories

Path Synopsis
Package ring provides an efficient ring buffer for bytes.
Package ring provides an efficient ring buffer for bytes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL