Documentation ¶
Index ¶
- Constants
- Variables
- func CompressionIsSupported() bool
- func IsRPCStreamResponse(stream *MuxedStream) bool
- type AtomicCounter
- type BooleanFuse
- type CompressionPreset
- type CompressionSetting
- type Header
- type IdleTimer
- type MuxReader
- type MuxWriter
- type MuxedStream
- func (s *MuxedStream) Close() error
- func (s *MuxedStream) CloseRead() error
- func (s *MuxedStream) CloseWrite() error
- func (s *MuxedStream) IsRPCStream() bool
- func (s *MuxedStream) Read(p []byte) (n int, err error)
- func (s *MuxedStream) Write(p []byte) (int, error)
- func (s *MuxedStream) WriteClosed() bool
- func (s *MuxedStream) WriteHeaders(headers []Header) error
- type MuxedStreamDataSignaller
- type MuxedStreamFunc
- type MuxedStreamHandler
- type MuxedStreamReader
- type MuxedStreamRequest
- type Muxer
- func (m *Muxer) AwaitResponseHeaders(ctx context.Context, stream *MuxedStream) error
- func (m *Muxer) CloseStreamRead(stream *MuxedStream)
- func (m *Muxer) MakeMuxedStreamRequest(ctx context.Context, request MuxedStreamRequest) error
- func (m *Muxer) Metrics() *MuxerMetrics
- func (m *Muxer) NewStream(headers []Header) *MuxedStream
- func (m *Muxer) OpenRPCStream(ctx context.Context) (*MuxedStream, error)
- func (m *Muxer) OpenStream(ctx context.Context, headers []Header, body io.Reader) (*MuxedStream, error)
- func (m *Muxer) Serve(ctx context.Context) error
- func (m *Muxer) Shutdown() <-chan struct{}
- func (m *Muxer) TimerRetries() uint64
- type MuxerApplicationError
- type MuxerConfig
- type MuxerHandshakeError
- type MuxerMetrics
- type MuxerProtocolError
- type MuxerStreamError
- type PingTimestamp
- type ReadWriteClosedCloser
- type ReadWriteLengther
- type ReadyList
- type SharedBuffer
- type Signal
- type StreamErrorMap
- type TunnelHostname
Constants ¶
const ( FrameSetCompressionContext http2.FrameType = 0xf0 FrameUseDictionary http2.FrameType = 0xf1 FrameSetDictionary http2.FrameType = 0xf2 )
const ( FlagSetDictionaryAppend http2.Flags = 0x1 FlagSetDictionaryOffset http2.Flags = 0x2 )
const ( SettingMuxerMagic http2.SettingID = 0x42db MuxerMagicOrigin uint32 = 0xa2e43c8b MuxerMagicEdge uint32 = 0x1088ebf9 )
const SettingCompression http2.SettingID = 0xff20
Assign temporary values
Variables ¶
var ( // HTTP2 error codes: https://http2.github.io/http2-spec/#ErrorCodes ErrHandshakeTimeout = MuxerHandshakeError{"1000 handshake timeout"} ErrBadHandshakeNotSettings = MuxerHandshakeError{"1001 unexpected response"} ErrBadHandshakeUnexpectedAck = MuxerHandshakeError{"1002 unexpected response"} ErrBadHandshakeNoMagic = MuxerHandshakeError{"1003 unexpected response"} ErrBadHandshakeWrongMagic = MuxerHandshakeError{"1004 connected to endpoint of wrong type"} ErrBadHandshakeNotSettingsAck = MuxerHandshakeError{"1005 unexpected response"} ErrBadHandshakeUnexpectedSettings = MuxerHandshakeError{"1006 unexpected response"} ErrUnexpectedFrameType = MuxerProtocolError{"2001 unexpected frame type", http2.ErrCodeProtocol} ErrUnknownStream = MuxerProtocolError{"2002 unknown stream", http2.ErrCodeProtocol} ErrInvalidStream = MuxerProtocolError{"2003 invalid stream", http2.ErrCodeProtocol} ErrNotRPCStream = MuxerProtocolError{"2004 not RPC stream", http2.ErrCodeProtocol} ErrStreamHeadersSent = MuxerApplicationError{"3000 headers already sent"} ErrStreamRequestConnectionClosed = MuxerApplicationError{"3001 connection closed while opening stream"} ErrConnectionDropped = MuxerApplicationError{"3002 connection dropped"} ErrStreamRequestTimeout = MuxerApplicationError{"3003 open stream timeout"} ErrResponseHeadersTimeout = MuxerApplicationError{"3004 timeout waiting for initial response headers"} ErrResponseHeadersConnectionClosed = MuxerApplicationError{"3005 connection closed while waiting for initial response headers"} ErrClosedStream = MuxerStreamError{"4000 stream closed", http2.ErrCodeStreamClosed} )
var (
ActiveStreams = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "netscale",
Subsystem: "tunnel",
Name: "active_streams",
Help: "Number of active streams created by all muxers.",
})
)
Functions ¶
func CompressionIsSupported ¶
func CompressionIsSupported() bool
func IsRPCStreamResponse ¶
func IsRPCStreamResponse(stream *MuxedStream) bool
Types ¶
type AtomicCounter ¶
type AtomicCounter struct {
// contains filtered or unexported fields
}
func NewAtomicCounter ¶
func NewAtomicCounter(initCount uint64) *AtomicCounter
func (*AtomicCounter) Count ¶
func (c *AtomicCounter) Count() uint64
Count returns the current value of counter and reset it to 0
func (*AtomicCounter) IncrementBy ¶
func (c *AtomicCounter) IncrementBy(number uint64)
func (*AtomicCounter) Value ¶
func (c *AtomicCounter) Value() uint64
Value returns the current value of counter
type BooleanFuse ¶
type BooleanFuse struct {
// contains filtered or unexported fields
}
BooleanFuse is a data structure that can be set once to a particular value using Fuse(value). Subsequent calls to Fuse() will have no effect.
func NewBooleanFuse ¶
func NewBooleanFuse() *BooleanFuse
func (*BooleanFuse) Await ¶
func (f *BooleanFuse) Await() bool
Await blocks until Fuse has been called at least once.
func (*BooleanFuse) Fuse ¶
func (f *BooleanFuse) Fuse(result bool)
type CompressionPreset ¶
type CompressionPreset struct {
// contains filtered or unexported fields
}
type CompressionSetting ¶
type CompressionSetting uint
const ( CompressionNone CompressionSetting = iota CompressionLow CompressionMedium CompressionMax )
type Header ¶
type Header struct {
Name, Value string
}
func RPCHeaders ¶
func RPCHeaders() []Header
type IdleTimer ¶
type IdleTimer struct { // The channel on which ticks are delivered. C <-chan time.Time // contains filtered or unexported fields }
IdleTimer is a type of Timer designed for managing heartbeats on an idle connection. The timer ticks on an interval with added jitter to avoid accidental synchronisation between two endpoints. It tracks the number of retries/ticks since the connection was last marked active.
The methods of IdleTimer must not be called while a goroutine is reading from C.
func (*IdleTimer) MarkActive ¶
func (t *IdleTimer) MarkActive()
MarkActive resets the idle connection timer and suppresses any outstanding idle events.
func (*IdleTimer) ResetTimer ¶
func (t *IdleTimer) ResetTimer()
Reset the idle timer according to the configured duration, with some added jitter.
func (*IdleTimer) Retry ¶
Retry should be called when retrying the idle timeout. If the maximum number of retries has been met, returns false. After calling this function and sending a heartbeat, call ResetTimer. Since sending the heartbeat could be a blocking operation, we resetting the timer after the write completes to avoid it expiring during the write.
func (*IdleTimer) RetryCount ¶
type MuxedStream ¶
type MuxedStream struct { // The headers that were most recently received. // Particularly: // * for an eyeball-initiated stream (as passed to TunnelHandler::ServeStream), // these are the request headers // * for a netscale-initiated stream (as created by Register/UnregisterTunnel), // these are the response headers. // They are useful in both of these contexts; hence `Headers` is public. Headers []Header // contains filtered or unexported fields }
MuxedStream is logically an HTTP/2 stream, with an additional buffer for outgoing data.
func NewStream ¶
func NewStream(config MuxerConfig, writeHeaders []Header, readyList MuxedStreamDataSignaller, dictionaries h2Dictionaries) *MuxedStream
func (*MuxedStream) Close ¶
func (s *MuxedStream) Close() error
func (*MuxedStream) CloseRead ¶
func (s *MuxedStream) CloseRead() error
func (*MuxedStream) CloseWrite ¶
func (s *MuxedStream) CloseWrite() error
func (*MuxedStream) IsRPCStream ¶
func (s *MuxedStream) IsRPCStream() bool
IsRPCStream returns if the stream is used to transport RPC.
func (*MuxedStream) Write ¶
func (s *MuxedStream) Write(p []byte) (int, error)
Blocks until len(p) bytes have been written to the buffer
func (*MuxedStream) WriteClosed ¶
func (s *MuxedStream) WriteClosed() bool
func (*MuxedStream) WriteHeaders ¶
func (s *MuxedStream) WriteHeaders(headers []Header) error
type MuxedStreamDataSignaller ¶
type MuxedStreamDataSignaller interface { // Non-blocking: call this when data is ready to be sent for the given stream ID. Signal(ID uint32) }
MuxedStreamDataSignaller is a write-only *ReadyList
type MuxedStreamFunc ¶
type MuxedStreamFunc func(stream *MuxedStream) error
func (MuxedStreamFunc) ServeStream ¶
func (f MuxedStreamFunc) ServeStream(stream *MuxedStream) error
type MuxedStreamHandler ¶
type MuxedStreamHandler interface {
ServeStream(*MuxedStream) error
}
type MuxedStreamReader ¶
type MuxedStreamReader struct {
*MuxedStream
}
MuxedStreamReader implements io.ReadCloser for the read end of the stream. This is useful for passing to functions that close the object after it is done reading, but you still want to be able to write data afterwards (e.g. http.Client).
func (MuxedStreamReader) Close ¶
func (s MuxedStreamReader) Close() error
type MuxedStreamRequest ¶
type MuxedStreamRequest struct {
// contains filtered or unexported fields
}
func NewMuxedStreamRequest ¶
func NewMuxedStreamRequest(stream *MuxedStream, body io.Reader) MuxedStreamRequest
type Muxer ¶
type Muxer struct {
// contains filtered or unexported fields
}
func Handshake ¶
func Handshake( w io.WriteCloser, r io.ReadCloser, config MuxerConfig, activeStreamsMetrics prometheus.Gauge, ) (*Muxer, error)
Handshake establishes a muxed connection with the peer. After the handshake completes, it is possible to open and accept streams.
func (*Muxer) AwaitResponseHeaders ¶
func (m *Muxer) AwaitResponseHeaders(ctx context.Context, stream *MuxedStream) error
func (*Muxer) CloseStreamRead ¶
func (m *Muxer) CloseStreamRead(stream *MuxedStream)
func (*Muxer) MakeMuxedStreamRequest ¶
func (m *Muxer) MakeMuxedStreamRequest(ctx context.Context, request MuxedStreamRequest) error
func (*Muxer) Metrics ¶
func (m *Muxer) Metrics() *MuxerMetrics
func (*Muxer) NewStream ¶
func (m *Muxer) NewStream(headers []Header) *MuxedStream
func (*Muxer) OpenRPCStream ¶
func (m *Muxer) OpenRPCStream(ctx context.Context) (*MuxedStream, error)
func (*Muxer) OpenStream ¶
func (m *Muxer) OpenStream(ctx context.Context, headers []Header, body io.Reader) (*MuxedStream, error)
OpenStream opens a new data stream with the given headers. Called by proxy server and tunnel
func (*Muxer) Serve ¶
Serve runs the event loops that comprise h2mux: - MuxReader.run() - MuxWriter.run() - muxMetricsUpdater.run() In the normal case, Shutdown() is called concurrently with Serve() to stop these loops.
func (*Muxer) Shutdown ¶
func (m *Muxer) Shutdown() <-chan struct{}
Shutdown is called to initiate the "happy path" of muxer termination. It blocks new streams from being created. It returns a channel that is closed when the last stream has been closed.
func (*Muxer) TimerRetries ¶
Return how many retries/ticks since the connection was last marked active
type MuxerApplicationError ¶
type MuxerApplicationError struct {
// contains filtered or unexported fields
}
func (MuxerApplicationError) Error ¶
func (e MuxerApplicationError) Error() string
type MuxerConfig ¶
type MuxerConfig struct { Timeout time.Duration Handler MuxedStreamHandler IsClient bool // Name is used to identify this muxer instance when logging. Name string // The minimum time this connection can be idle before sending a heartbeat. HeartbeatInterval time.Duration // The minimum number of heartbeats to send before terminating the connection. MaxHeartbeats uint64 // Logger to use Log *zerolog.Logger CompressionQuality CompressionSetting // Initial size for HTTP2 flow control windows DefaultWindowSize uint32 // Largest allowable size for HTTP2 flow control windows MaxWindowSize uint32 // Largest allowable capacity for the buffer of data to be sent StreamWriteBufferMaxLen int }
type MuxerHandshakeError ¶
type MuxerHandshakeError struct {
// contains filtered or unexported fields
}
func (MuxerHandshakeError) Error ¶
func (e MuxerHandshakeError) Error() string
type MuxerMetrics ¶
type MuxerMetrics struct {
RTT, RTTMin, RTTMax time.Duration
ReceiveWindowAve, SendWindowAve float64
ReceiveWindowMin, ReceiveWindowMax, SendWindowMin, SendWindowMax uint32
InBoundRateCurr, InBoundRateMin, InBoundRateMax uint64
OutBoundRateCurr, OutBoundRateMin, OutBoundRateMax uint64
CompBytesBefore, CompBytesAfter *AtomicCounter
}
func (*MuxerMetrics) CompRateAve ¶
func (m *MuxerMetrics) CompRateAve() float64
type MuxerProtocolError ¶
type MuxerProtocolError struct {
// contains filtered or unexported fields
}
func (MuxerProtocolError) Error ¶
func (e MuxerProtocolError) Error() string
type MuxerStreamError ¶
type MuxerStreamError struct {
// contains filtered or unexported fields
}
func (MuxerStreamError) Error ¶
func (e MuxerStreamError) Error() string
type PingTimestamp ¶
type PingTimestamp struct {
// contains filtered or unexported fields
}
PingTimestamp is an atomic interface around ping timestamping and signalling.
func NewPingTimestamp ¶
func NewPingTimestamp() *PingTimestamp
func (*PingTimestamp) Get ¶
func (pt *PingTimestamp) Get() int64
func (*PingTimestamp) GetUpdateChan ¶
func (pt *PingTimestamp) GetUpdateChan() <-chan struct{}
func (*PingTimestamp) Set ¶
func (pt *PingTimestamp) Set(v int64)
type ReadWriteClosedCloser ¶
type ReadWriteClosedCloser interface { io.ReadWriteCloser Closed() bool }
type ReadWriteLengther ¶
type ReadWriteLengther interface { io.ReadWriter Reset() Len() int }
type ReadyList ¶
type ReadyList struct {
// contains filtered or unexported fields
}
ReadyList multiplexes several event signals onto a single channel.
func NewReadyList ¶
func NewReadyList() *ReadyList
func (*ReadyList) ReadyChannel ¶
type SharedBuffer ¶
type SharedBuffer struct {
// contains filtered or unexported fields
}
func NewSharedBuffer ¶
func NewSharedBuffer() *SharedBuffer
func (*SharedBuffer) Close ¶
func (s *SharedBuffer) Close() error
func (*SharedBuffer) Closed ¶
func (s *SharedBuffer) Closed() bool
type Signal ¶
type Signal struct {
// contains filtered or unexported fields
}
Signal describes an event that can be waited on for at least one signal. Signalling the event while it is in the signalled state is a noop. When the waiter wakes up, the signal is set to unsignalled. It is a way for any number of writers to inform a reader (without blocking) that an event has happened.
func (Signal) WaitChannel ¶
func (s Signal) WaitChannel() <-chan struct{}
WaitChannel returns a channel that is readable after Signal is called.
type StreamErrorMap ¶
StreamErrorMap is used to track stream errors. This is a separate structure to ActiveStreamMap because errors can be raised against non-existent or closed streams.
func NewStreamErrorMap ¶
func NewStreamErrorMap() *StreamErrorMap
NewStreamErrorMap creates a new StreamErrorMap.
func (*StreamErrorMap) GetErrors ¶
func (s *StreamErrorMap) GetErrors() map[uint32]http2.ErrCode
GetErrors retrieves all errors currently raised. This resets the currently-tracked errors.
func (*StreamErrorMap) GetSignalChan ¶
func (s *StreamErrorMap) GetSignalChan() <-chan struct{}
GetSignalChan returns a channel that is signalled when an error is raised.
func (*StreamErrorMap) RaiseError ¶
func (s *StreamErrorMap) RaiseError(streamID uint32, err http2.ErrCode)
RaiseError raises a stream error.
type TunnelHostname ¶
type TunnelHostname string
func (TunnelHostname) IsSet ¶
func (th TunnelHostname) IsSet() bool
func (TunnelHostname) String ¶
func (th TunnelHostname) String() string