Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackendConn ¶
type BackendWSConnPool ¶
type BackendWSConnPool struct {
// contains filtered or unexported fields
}
BackendWSConnPool This should give a new connection for client connection request Should keep track of available backends at any point of time In a way it feels like it is doing the job of load balancer, but this additionally has to ensure there is at most one client connection per backend/pod
func NewBackendConnPool ¶
func NewBackendConnPool(maxIdleConnCount, maxAllowedErrorCountPerConn int64, logger logger) *BackendWSConnPool
func (*BackendWSConnPool) AddToPool ¶
func (bp *BackendWSConnPool) AddToPool(url string) error
func (*BackendWSConnPool) GetConn ¶
func (bp *BackendWSConnPool) GetConn() *BackendConn
GetConn as soon as this is called, the connection will be immediately marked for use, defer calling this till the moment you need it
func (*BackendWSConnPool) MarkError ¶
func (bp *BackendWSConnPool) MarkError(conn *BackendConn)
type ConnectionProviderPool ¶
type ConnectionProviderPool interface { AddToPool(url string) error GetConn() *BackendConn MarkError(conn *BackendConn) }
type CopyDirection ¶
type CopyDirection int
const ( CopyToBackend CopyDirection = iota + 1 CopyFromBacked )
type HandlerConfig ¶
type HandlerConfig struct { MaxIdleConnCount int64 MaxAllowedErrorCountPerConn int64 InterruptMemoryLimitPerConnInBytes int ClientIdExtractFunc func(conn *websocket.Conn) (uuid.UUID, error) }
HandlerConfig Configuration for the proxy and websocket handler
type InterruptibleWebsocketProxyHandler ¶
type InterruptibleWebsocketProxyHandler struct { websocket.Server *WebsocketPipeManager }
InterruptibleWebsocketProxyHandler Wrapped [Golang websocket server](https://pkg.go.dev/golang.org/x/net/websocket#Server)
func NewInterruptibleWebsocketProxyHandler ¶
func NewInterruptibleWebsocketProxyHandler(wsConfig websocket.Config, handlerConfig HandlerConfig, logger logger) *InterruptibleWebsocketProxyHandler
NewInterruptibleWebsocketProxyHandler Returns a readily configured websocket handler. The returned handler can be directly registered to a http web server.
E.g:
mux := http.NewServeMux() mux.Handle("/", websocketServer) // Start webserver err = http.ListenAndServe(":8080", mux) if err != nil { lgr.Error("error starting websocket server", err) return }
Example ¶
logger := zap.NewExample() lgr := &exampleLogger{ logger: logger, } parsedURL, err := url.Parse("ws://localhost:8080") if err != nil { log.Printf("error parsing url for setting origin: %s", err) return } memoryLimitPerConn := 5 * 1024 * 1024 // Create proxy handler instance interruptibleWebsocketProxyHandler := NewInterruptibleWebsocketProxyHandler(websocket.Config{Origin: parsedURL}, HandlerConfig{ MaxIdleConnCount: 5, MaxAllowedErrorCountPerConn: 100, InterruptMemoryLimitPerConnInBytes: memoryLimitPerConn, ClientIdExtractFunc: nil, }, lgr) // Register proxy handler to http server mux := http.NewServeMux() mux.Handle("/", interruptibleWebsocketProxyHandler) // Start webserver err = http.ListenAndServe(":8080", mux) if err != nil { lgr.Error("error starting websocket server", err) return }
Output:
type PersistentPipe ¶
type PersistentPipe struct { // ID Unique identifier for this pipe ID uuid.UUID // ClientUUID Unique identifier for the client ClientID uuid.UUID ErrorListener PipeErrorListener ClientConn io.ReadWriteCloser BackendConn io.ReadWriteCloser ClientErr error BackendErr error // contains filtered or unexported fields }
func NewPersistentPipe ¶
func NewPersistentPipe(clientID uuid.UUID, clientConn, backendConn io.ReadWriteCloser, interruptMemoryLimitPerConnInBytes int) *PersistentPipe
NewPersistentPipe Creates a new preempt-able websocket pipe
func (*PersistentPipe) Stream ¶
func (pep *PersistentPipe) Stream() error
Stream runs back and forth stream copy between clientConn and backendConn Also helps to restart stream when pre-empted
type PipeErrorListener ¶
type WebsocketPipeManager ¶
type WebsocketPipeManager struct {
// contains filtered or unexported fields
}
func NewDefaultWebsocketPipeManager ¶
func NewDefaultWebsocketPipeManager(maxIdleConnCount, maxAllowedErrorCount int64, interruptMemoryLimitPerConnInBytes int, logger logger) *WebsocketPipeManager
NewDefaultWebsocketPipeManager Creates a default pipe manager with given pool configuration as arguments
func NewWebsocketPipeManager ¶
func NewWebsocketPipeManager(pool ConnectionProviderPool, interruptMemoryLimitPerConnInBytes int, logger logger) *WebsocketPipeManager
NewWebsocketPipeManager Creates a websocket pipe manager with provided connection pool
Example ¶
ExampleNewWebsocketPipeManager Example to use a PipeManager instance with a websocket server. The usage of PipeManager instance within websocket server handler instance effectively converts websocket server into a transparent interruptible websocket proxy
package main import ( "fmt" "github.com/google/uuid" "go.uber.org/zap" "golang.org/x/net/websocket" "net/http" "net/url" "strings" ) type exampleLogger struct { logger *zap.Logger } func (pl *exampleLogger) Warn(msg string, nestedErr error) { pl.logger.Warn(msg, zap.Error(nestedErr)) } func (pl *exampleLogger) Error(msg string, nestedErr error) { pl.logger.Error(msg, zap.Error(nestedErr)) } func (pl *exampleLogger) Debug(msg string) { pl.logger.Debug(msg) } // ExampleNewWebsocketPipeManager Example to use a PipeManager instance with a websocket server. The usage of PipeManager // instance within websocket server handler instance effectively converts websocket server into a transparent interruptible // websocket proxy func main() { logger := zap.NewExample() lgr := &exampleLogger{ logger: logger, } // Create pipe manager instance pool := NewBackendConnPool(5, 100, lgr) memoryLimitPerConn := 5 * 1024 * 1024 pipeManager := NewWebsocketPipeManager(pool, memoryLimitPerConn, lgr) // Websocket Handler proxyWSHandler := websocket.Handler(func(conn *websocket.Conn) { defer conn.Close() clientIdString := strings.TrimPrefix(conn.Request().URL.Path, "/") clientId, err := uuid.Parse(clientIdString) if err != nil { lgr.Error(fmt.Sprintf("error parsing clientId, invalid clientIdString in the path: %s", clientIdString), err) return } // Create persistent pipe, this is a blocking call err = pipeManager.CreatePipe(clientId, conn) if err != nil { lgr.Error("error creating persistent pipe", err) return } }) parsedURL, err := url.Parse("ws://localhost:8080") if err != nil { lgr.Error("error parsing url for setting origin", err) return } // Setup websocket server with basic config and handler websocketServer := websocket.Server{ Config: websocket.Config{Origin: parsedURL}, Handler: proxyWSHandler, } mux := http.NewServeMux() mux.Handle("/", websocketServer) // Start webserver err = http.ListenAndServe(":8080", mux) if err != nil { lgr.Error("error starting websocket server", err) return } }
Output:
func (*WebsocketPipeManager) AddConnectionToPool ¶
func (pm *WebsocketPipeManager) AddConnectionToPool(url string) error
AddConnectionToPool Can add connection to the pool Note: the format for the URL should be checked for validity beforehand before calling this If you pass invalid websocket url, it will unnecessarily add delays for fetching fresh connections
func (*WebsocketPipeManager) CreatePipe ¶
func (pm *WebsocketPipeManager) CreatePipe(clientId uuid.UUID, conn io.ReadWriteCloser) error
CreatePipe This function is a blocking call when the pipe runs till completion. Returns nil if client closed the connection for any reason, otherwise can return error during connection fetch, stream
func (*WebsocketPipeManager) SetBackOffStrategyFunc ¶
func (pm *WebsocketPipeManager) SetBackOffStrategyFunc(backOffFunc func(counter *int64))
SetBackOffStrategyFunc Can set a custom defined back off function when failed to get new connection for backend The argument for the back off function can be used to pass counter from outside which can help with strategies like exponential backoff etc