interruptible_websocket_proxy

package module
v0.0.0-...-5b97ad6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

README

Interruptible WebSocket Proxy

(a.k.a UnInterrupted WebSocket Proxy)

Websocket connections are meant to be persistent for longer durations. If a backend is supporting multiple client websocket connections, it makes the backend to be less dynamic in terms of scaling, load balancing, deployments, quick restarts, minor network interruptions etc.

The core idea of this library is to be used an interrupt resistant component for a websocket proxy. It gives the proxy an ability to restore a backend websocket connection for a given client websocket connection from minor interruptions.

Note that overall tolerance of this proxy is based on the availability of number of connections and free RAM. It's best to limit number of connections to this proxy and also set per connection memory limit. There is a default per connection memory limit of 5MB (if not set explicitly).

Working model

This library creates a persistent connection object called PersistentPipe for each unique client request using PipeManager instance.
When the backend connection is down, next available backend connection is chosen from the pool. Once a new backend connection is restored, all the data received in the meantime will be flushed to the backend connection first before connecting client stream.
The data received from client is temporarily stored in a byte array in memory. There is a max limit for each byte array, if reached before finding a backed connection from pool, the client connection is dropped to prevent memory hog of a particular connection over other ones. PipeManager also can register/de-register a backend connection to availability pool.

How to Use

The solution can be consumed in two ways

  1. Wire up the PipeManager to a regular websocket server thus making it a proxy. Here is an example usage

  2. Wire up InterruptibleWebsocketProxyHandler to a http server to make a proxy as well. Here is an example usage

A sample curl request for the examples

curl --location --request GET 'http://localhost:8080/098d8a97-3615-4eb8-b803-c57c01c7536c'

Additionally, in order register websocket urls to backend pool. We have to use the following method

// Refer to below pipemanager from example 1
pipeManager.AddConnectionToPool("ws://localhost:8081/listener")

// Refer to below interruptibleWebsocketProxyHandler from example 2
interruptibleWebsocketProxyHandler.AddConnectionToPool("ws://localhost:8081/listener")

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackendConn

type BackendConn struct {
	net.Conn

	ErrorInfo
	// contains filtered or unexported fields
}

type BackendWSConnPool

type BackendWSConnPool struct {
	// contains filtered or unexported fields
}

BackendWSConnPool This should give a new connection for client connection request Should keep track of available backends at any point of time In a way it feels like it is doing the job of load balancer, but this additionally has to ensure there is at most one client connection per backend/pod

func NewBackendConnPool

func NewBackendConnPool(maxIdleConnCount, maxAllowedErrorCountPerConn int64, logger logger) *BackendWSConnPool

func (*BackendWSConnPool) AddToPool

func (bp *BackendWSConnPool) AddToPool(url string) error

func (*BackendWSConnPool) GetConn

func (bp *BackendWSConnPool) GetConn() *BackendConn

GetConn as soon as this is called, the connection will be immediately marked for use, defer calling this till the moment you need it

func (*BackendWSConnPool) MarkError

func (bp *BackendWSConnPool) MarkError(conn *BackendConn)

type ConnectionProviderPool

type ConnectionProviderPool interface {
	AddToPool(url string) error
	GetConn() *BackendConn
	MarkError(conn *BackendConn)
}

type CopyDirection

type CopyDirection int
const (
	CopyToBackend CopyDirection = iota + 1
	CopyFromBacked
)

type ErrorInfo

type ErrorInfo struct {
	// contains filtered or unexported fields
}

type HandlerConfig

type HandlerConfig struct {
	MaxIdleConnCount                   int64
	MaxAllowedErrorCountPerConn        int64
	InterruptMemoryLimitPerConnInBytes int
	ClientIdExtractFunc                func(conn *websocket.Conn) (uuid.UUID, error)
}

HandlerConfig Configuration for the proxy and websocket handler

type InterruptibleWebsocketProxyHandler

type InterruptibleWebsocketProxyHandler struct {
	websocket.Server
	*WebsocketPipeManager
}

InterruptibleWebsocketProxyHandler Wrapped [Golang websocket server](https://pkg.go.dev/golang.org/x/net/websocket#Server)

func NewInterruptibleWebsocketProxyHandler

func NewInterruptibleWebsocketProxyHandler(wsConfig websocket.Config,
	handlerConfig HandlerConfig, logger logger) *InterruptibleWebsocketProxyHandler

NewInterruptibleWebsocketProxyHandler Returns a readily configured websocket handler. The returned handler can be directly registered to a http web server.

E.g:

 mux := http.NewServeMux()
	mux.Handle("/", websocketServer)

	// Start webserver
	err = http.ListenAndServe(":8080", mux)
	if err != nil {
		lgr.Error("error starting websocket server", err)
		return
	}
Example
logger := zap.NewExample()

lgr := &exampleLogger{
	logger: logger,
}

parsedURL, err := url.Parse("ws://localhost:8080")
if err != nil {
	log.Printf("error parsing url for setting origin: %s", err)
	return
}

memoryLimitPerConn := 5 * 1024 * 1024

// Create proxy handler instance
interruptibleWebsocketProxyHandler := NewInterruptibleWebsocketProxyHandler(websocket.Config{Origin: parsedURL}, HandlerConfig{
	MaxIdleConnCount:                   5,
	MaxAllowedErrorCountPerConn:        100,
	InterruptMemoryLimitPerConnInBytes: memoryLimitPerConn,
	ClientIdExtractFunc:                nil,
}, lgr)

// Register proxy handler to http server
mux := http.NewServeMux()
mux.Handle("/", interruptibleWebsocketProxyHandler)

// Start webserver
err = http.ListenAndServe(":8080", mux)
if err != nil {
	lgr.Error("error starting websocket server", err)
	return
}
Output:

type PersistentPipe

type PersistentPipe struct {
	// ID Unique identifier for this pipe
	ID uuid.UUID
	// ClientUUID Unique identifier for the client
	ClientID      uuid.UUID
	ErrorListener PipeErrorListener
	ClientConn    io.ReadWriteCloser
	BackendConn   io.ReadWriteCloser
	ClientErr     error
	BackendErr    error
	// contains filtered or unexported fields
}

func NewPersistentPipe

func NewPersistentPipe(clientID uuid.UUID, clientConn, backendConn io.ReadWriteCloser, interruptMemoryLimitPerConnInBytes int) *PersistentPipe

NewPersistentPipe Creates a new preempt-able websocket pipe

func (*PersistentPipe) Stream

func (pep *PersistentPipe) Stream() error

Stream runs back and forth stream copy between clientConn and backendConn Also helps to restart stream when pre-empted

type PipeErrorListener

type PipeErrorListener func(pipeId uuid.UUID, err error)

type WebsocketPipeManager

type WebsocketPipeManager struct {
	// contains filtered or unexported fields
}

func NewDefaultWebsocketPipeManager

func NewDefaultWebsocketPipeManager(maxIdleConnCount, maxAllowedErrorCount int64, interruptMemoryLimitPerConnInBytes int, logger logger) *WebsocketPipeManager

NewDefaultWebsocketPipeManager Creates a default pipe manager with given pool configuration as arguments

func NewWebsocketPipeManager

func NewWebsocketPipeManager(pool ConnectionProviderPool, interruptMemoryLimitPerConnInBytes int, logger logger) *WebsocketPipeManager

NewWebsocketPipeManager Creates a websocket pipe manager with provided connection pool

Example

ExampleNewWebsocketPipeManager Example to use a PipeManager instance with a websocket server. The usage of PipeManager instance within websocket server handler instance effectively converts websocket server into a transparent interruptible websocket proxy

package main

import (
	"fmt"
	"github.com/google/uuid"
	"go.uber.org/zap"
	"golang.org/x/net/websocket"
	"net/http"
	"net/url"
	"strings"
)

type exampleLogger struct {
	logger *zap.Logger
}

func (pl *exampleLogger) Warn(msg string, nestedErr error) {
	pl.logger.Warn(msg, zap.Error(nestedErr))
}

func (pl *exampleLogger) Error(msg string, nestedErr error) {
	pl.logger.Error(msg, zap.Error(nestedErr))
}

func (pl *exampleLogger) Debug(msg string) {
	pl.logger.Debug(msg)
}

// ExampleNewWebsocketPipeManager Example to use a PipeManager instance with a websocket server. The usage of PipeManager
// instance within websocket server handler instance effectively converts websocket server into a transparent interruptible
// websocket proxy
func main() {
	logger := zap.NewExample()

	lgr := &exampleLogger{
		logger: logger,
	}

	// Create pipe manager instance
	pool := NewBackendConnPool(5, 100, lgr)
	memoryLimitPerConn := 5 * 1024 * 1024
	pipeManager := NewWebsocketPipeManager(pool, memoryLimitPerConn, lgr)

	// Websocket Handler
	proxyWSHandler := websocket.Handler(func(conn *websocket.Conn) {
		defer conn.Close()

		clientIdString := strings.TrimPrefix(conn.Request().URL.Path, "/")
		clientId, err := uuid.Parse(clientIdString)
		if err != nil {
			lgr.Error(fmt.Sprintf("error parsing clientId, invalid clientIdString in the path: %s", clientIdString), err)
			return
		}

		// Create persistent pipe, this is a blocking call
		err = pipeManager.CreatePipe(clientId, conn)
		if err != nil {
			lgr.Error("error creating persistent pipe", err)
			return
		}
	})

	parsedURL, err := url.Parse("ws://localhost:8080")
	if err != nil {
		lgr.Error("error parsing url for setting origin", err)
		return
	}

	// Setup websocket server with basic config and handler
	websocketServer := websocket.Server{
		Config:  websocket.Config{Origin: parsedURL},
		Handler: proxyWSHandler,
	}

	mux := http.NewServeMux()
	mux.Handle("/", websocketServer)

	// Start webserver
	err = http.ListenAndServe(":8080", mux)
	if err != nil {
		lgr.Error("error starting websocket server", err)
		return
	}
}
Output:

func (*WebsocketPipeManager) AddConnectionToPool

func (pm *WebsocketPipeManager) AddConnectionToPool(url string) error

AddConnectionToPool Can add connection to the pool Note: the format for the URL should be checked for validity beforehand before calling this If you pass invalid websocket url, it will unnecessarily add delays for fetching fresh connections

func (*WebsocketPipeManager) CreatePipe

func (pm *WebsocketPipeManager) CreatePipe(clientId uuid.UUID, conn io.ReadWriteCloser) error

CreatePipe This function is a blocking call when the pipe runs till completion. Returns nil if client closed the connection for any reason, otherwise can return error during connection fetch, stream

func (*WebsocketPipeManager) SetBackOffStrategyFunc

func (pm *WebsocketPipeManager) SetBackOffStrategyFunc(backOffFunc func(counter *int64))

SetBackOffStrategyFunc Can set a custom defined back off function when failed to get new connection for backend The argument for the back off function can be used to pass counter from outside which can help with strategies like exponential backoff etc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL