peasocket

package module
v0.0.0-...-4fdb282 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2023 License: MIT Imports: 21 Imported by: 0

README

Go Report Card GoDoc codecov

peasocket

Little websocket implementation

Highlights

  • Zero allocations when fragmenting messages with WriteFragmentedMessage

  • Empty go.mod file.

  • Lockless base implementation

  • Dead-simple Client implementation

    • No channels
    • No goroutines
    • 1 Mutex on state and another on Tx to auto-reply pings.
  • Ready for use in embedded systems

  • Tinygo compatible.

  • Base implementation is very simple, 1000 lines of code excluding Client. This makes debugging and testing easy.

  • Easy to extend. Exposes all websocket functionality.

cmd/peacho Command

A echoing client. Connects to a server and echoes back any messages received.

Documentation

Index

Examples

Constants

View Source
const (
	// MaxControlPayload is the maximum length of a control frame payload as per [section 5.5].
	//
	// [section 5.5]: https://tools.ietf.org/html/rfc6455#section-5.5.
	MaxControlPayload = 125

	MaxHeaderSize = 14
)

Variables

This section is empty.

Functions

func ListenAndServe

func ListenAndServe(mainCtx context.Context, address string, handler func(ctx context.Context, sv *Server)) error

ListenAndServe is the websocket counterpart to http.ListenAndServe. It sets up a tcp.Listener on the argument address and begins listening for incoming websocket client connections. On a succesful webosocket connection the handler is called with a newly allocated Server instance.

TODO(soypat): Add a Listener type for vetting HTTP headers and whatnot. ListenAndServe should then use a default Listener.

Types

type Client

type Client struct {
	ServerURL string
	// contains filtered or unexported fields
}

Client is a client websocket implementation.

Users should use Client in no more than two goroutines unless calling the explicitly stated as concurrency safe methods i.e: Err, IsConnected. These two goroutines should be separated by responsability:

  • Read goroutine should call methods that read received messages
  • Write goroutine should call methods that write to the connection.

Client automatically takes care of incoming Ping and Close frames during ReadNextFrame

Example (Echo)
package main

import (
	"context"
	"errors"
	"io"
	"log"
	"net"
	"time"

	"github.com/soypat/peasocket"
)

func main() {
	const (
		message = "Hello!"
	)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	client := peasocket.NewClient("ws://localhost:8080", nil, nil)
	err := client.DialViaHTTPClient(ctx, nil)
	if err != nil {
		log.Fatal("while dialing:", err)
	}
	defer client.CloseWebsocket(&peasocket.CloseError{
		Status: peasocket.StatusGoingAway,
		Reason: []byte("bye bye"),
	})
	log.Printf("protocol switch success. prepare msg=%q", message)
	go func() {
		// This goroutine reads frames from network.
		for {
			err := client.HandleNextFrame()
			if errors.Is(err, net.ErrClosed) {
				log.Println("connection closed, ending loop")
				return
			}
			if err != nil {
				log.Println("read next frame failed:", err)
				time.Sleep(500 * time.Millisecond)
			}
		}
	}()

	for {
		// This goroutine gets messages that have been read
		// from the client's buffer and prints them.
		msg, _, err := client.BufferedMessageReader()
		if err != nil {
			if errors.Is(err, net.ErrClosed) {
				log.Fatal("websocket closed")
			}
			log.Println("no next message err:", err)
			time.Sleep(500 * time.Millisecond)
			continue
		}

		b, err := io.ReadAll(msg)
		log.Println("got message:", string(b))
	}
}
Output:

func NewClient

func NewClient(serverURL string, userBuffer []byte, entropy func() uint32) *Client

NewClient creates a new Client ready to connect to a websocket enabled server. serverURL can start with wss:// or ws:// to enable TLS (secure websocket protocol). entropy is a function that returns randomized 32 bit integers from a high-entropy source. userBuffer will be used as scratch space to copy messages.

If entropy is nil then a default will be used. If userBuffer is zero-lengthed one will be automatically allocated.

func (*Client) Buffered

func (c *Client) Buffered() int

Buffered returns the amount of bytes ready to be read in the received message buffer.

Buffered is safe for concurrent use.

func (*Client) BufferedMessageReader

func (c *Client) BufferedMessageReader() (io.Reader, FrameType, error)

BufferedMessageReader returns a reader to a complete websocket message. It copies the whole contents of the received message to a new buffer which is returned to the caller. The returned message may have been fragmented over the wire.

func (*Client) BufferedMessages

func (c *Client) BufferedMessages() int

BufferedMessages returns the amount of received messages N that are ready to be read such that the user is guaranteed to not get an error calling NextMessageReader N times.

BufferedMessages is safe for concurrent use.

func (*Client) CloseConn

func (c *Client) CloseConn(err error) error

CloseConn closes the underlying transport without sending websocket frames.

If err is nil CloseConn will panic.

func (*Client) CloseWebsocket

func (c *Client) CloseWebsocket(err error) error

CloseWebsocket sends a close control frame over the websocket connection. Does not attempt to close the underlying transport. If CloseWebsocket received a [*CloseErr] type no memory allocation is performed and it's status code and reason is used in the websocket close frame.

If err is nil CloseWebsocket panics.

func (*Client) Dial

func (c *Client) Dial(ctx context.Context, overwriteHeaders http.Header) error

Dial not yet tested. Performs websocket handshake over net.Conn.

func (*Client) DialViaHTTPClient

func (c *Client) DialViaHTTPClient(ctx context.Context, overwriteHeaders http.Header) error

DialViaHTTPClient completes a websocket handshake and returns an error if the handshake failed. After a successful handshake the Client is ready to begin communicating with the server via websocket protocol.

func (*Client) Err

func (c *Client) Err() error

Err returns the error that closed the connection. It only returns an error while the connection is closed. One can test if the server gracefully closed the connection with

errors.As(err, &peasocket.CloseErr{})

Err is safe for concurrent use.

func (*Client) HandleNextFrame

func (c *Client) HandleNextFrame() error

HandleNextFrame reads next frame in connection. Should be called in a loop

func (*Client) IsConnected

func (c *Client) IsConnected() bool

IsConnected returns true if the underlying transport of the websocket is ready to send/receive messages. It is shorthand for c.Err() == nil.

IsConnected is safe for concurrent use.

func (*Client) WriteBufferedMessageTo

func (c *Client) WriteBufferedMessageTo(w io.Writer) (FrameType, int64, error)

WriteBufferedMessageTo writes the next received message in the queue to the argument writer.

func (*Client) WriteFragmentedMessage

func (c *Client) WriteFragmentedMessage(r io.Reader, userBuffer []byte) (int, error)

WriteFragmentedMessage writes contents of r over the wire using the userBuffer as scratch memory. This function does not allocate memory unless userBuffer is nil.

func (*Client) WriteMessage

func (c *Client) WriteMessage(payload []byte) error

WriteMessage writes a binary message over the websocket connection.

func (*Client) WriteTextMessage

func (c *Client) WriteTextMessage(payload []byte) error

WriteTextMessage writes a utf-8 message over the websocket connection.

type CloseError

type CloseError struct {
	Status StatusCode
	Reason []byte
}

CloseError implements the error interface. It represents the websocket Close Frame metadata that elaborates the reason for closure and a status code.

func (*CloseError) Error

func (c *CloseError) Error() string

Error implements the error interface for CloseError. Example of output:

"going away: a user defined reason"

type ExponentialBackoff

type ExponentialBackoff struct {
	// Wait defines the amount of time that Miss will wait on next call.
	Wait time.Duration
	// Maximum allowable value for Wait.
	MaxWait time.Duration
	// StartWait is the value that Wait takes after a call to Hit.
	StartWait time.Duration
	// ExpMinusOne is the shift performed on Wait minus one, so the zero value performs a shift of 1.
	ExpMinusOne uint32
}

ExponentialBackoff implements a Exponential Backoff delay algorithm to prevent saturation network or processor with failing tasks. An ExponentialBackoff with a non-zero MaxWait is ready for use.

func (*ExponentialBackoff) Hit

func (eb *ExponentialBackoff) Hit()

Hit sets eb.Wait to the StartWait value.

func (*ExponentialBackoff) Miss

func (eb *ExponentialBackoff) Miss()

Miss sleeps for eb.Wait and increases eb.Wait exponentially.

type FrameType

type FrameType uint8

Defines the interpretation of the "Payload data". If an unknown opcode is received, the receiving endpoint MUST Fail the WebSocket Connection.

const (
	FrameContinuation FrameType = iota
	FrameText
	FrameBinary
)

Non-control frames.

const (
	FrameClose FrameType = iota + _numNCFrames
	// A Ping control frame may serve either as a keepalive or as a means to verify that
	// the remote endpoint is still responsive.
	// An endpoint MAY send a Ping frame any time after the connection is
	// established and before the connection is closed.
	FramePing
	// A Pong control frame sent in response to a Ping frame must have identical
	// "Application data" as found in the message body of the Ping frame
	// being replied to.
	FramePong
)

Control frames.

func (FrameType) IsControl

func (op FrameType) IsControl() bool

IsControl returns true if the frame is valid and is a control frame (ping, pong or close).

func (FrameType) IsUTF8

func (ft FrameType) IsUTF8() bool

IsUTF8 returns true if the frame type is a websocket text frame.

func (FrameType) String

func (ft FrameType) String() (s string)

String returns the lower-case human readable representation of the websocket op code.

type Header struct {

	// All frames sent from the client to the server are masked by a
	// 32-bit value that is contained within the frame.  This field is
	// present if the mask bit is set to 1 and is absent if the mask bit
	// is set to 0.  See Section 5.3 for further information on client-
	// to-server masking.
	Mask uint32 // A mask key of zero has no effect in XOR operations, thus zero value is no mask.

	// Defines whether the "Payload data" is masked.  If set to 1, a
	// masking key is present in masking-key, and this is used to unmask
	// the "Payload data" as per Section 5.3.  All frames sent from
	// client to server have this bit set to 1.
	// Masked bool
	// The length of the "Payload data", in bytes: if 0-125, that is the
	// payload length.  If 126, the following 2 bytes interpreted as a
	// 16-bit unsigned integer are the payload length.  If 127, the
	// following 8 bytes interpreted as a 64-bit unsigned integer (the
	// most significant bit MUST be 0) are the payload length.
	PayloadLength uint64
	// contains filtered or unexported fields
}

Header represents a WebSocket frame header. A header can occupy 2 to 10 bytes on the wire depending on payload size and if content is masked.

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

func DecodeHeader

func DecodeHeader(r io.Reader) (Header, int, error)

DecodeHeader reads a header from the io.Reader. It assumes the first byte corresponds to the header's first byte.

func NewHeader

func NewHeader(op FrameType, payload int, mask uint, fin bool) (Header, error)

NewHeader creates a new websocket frame header. Set mask to 0 to disable masking on the payload.

func (*Header) Encode

func (h *Header) Encode(w io.Writer) (int, error)

Encode writes the websocket header to w as it would be sent over the wire. It does not encode a payload.

func (Header) Fin

func (h Header) Fin() bool

Fin returns true if the FIN bit in the frame header is set. Always should be set for control frames.

func (Header) FrameType

func (h Header) FrameType() FrameType

FrameType returns the header's operation code as per section 11.

func (Header) IsMasked

func (h Header) IsMasked() bool

IsMasked returns true if payload is masked.

func (*Header) Put

func (h *Header) Put(b []byte) (int, error)

Put encodes the header into the start of the byte slice.

func (Header) Rsv1

func (h Header) Rsv1() bool

Rsv1 returns true if the first frame header reserved bit is set. Used for extensions.

func (Header) Rsv2

func (h Header) Rsv2() bool

Rsv2 returns true if the second frame header reserved bit is set. Used for extensions.

func (Header) Rsv3

func (h Header) Rsv3() bool

Rsv3 returns true if the third frame header reserved bit is set. Used for extensions.

func (Header) String

func (h Header) String() string

String returns a human readable representation of the Frame header in the following format:

Frame:<frame type> (payload=<payload length decimal>) FIN:<true|false> <RSV if set>

type Rx

type Rx struct {
	LastReceivedHeader Header

	RxCallbacks RxCallbacks
	// contains filtered or unexported fields
}

Rx provides a way to handle a Websocket transport.

func (*Rx) ReadNextFrame

func (rx *Rx) ReadNextFrame() (int, error)

ReadNextFrame reads a frame from the underlying transport of rx. This method is meant to be used after setting the rx's callbacks. If the callback is not set then the payload data will be read and discarded.

func (*Rx) SetRxTransport

func (rx *Rx) SetRxTransport(rc io.ReadCloser)

SetRxTransport sets the underlying transport of rx.

type RxCallbacks

type RxCallbacks struct {
	// OnError executed when a decoding error is encountered after
	// consuming a non-zero amoung of bytes from the underlying transport.
	// If this callback is set then it becomes the responsability of the callback
	// to close the underlying transport.
	OnError func(rx *Rx, err error)

	// If OnCtl is not nil then this is executed on every control frame received.
	// These may or may not have a payload that can be readily read from r.
	OnCtl func(rx *Rx, payload io.Reader) error
	// If OnMessage is not nil it is called on the Rx for every application
	// message received (non-control frames).
	// The Application Message is contained in message which will return EOF
	// when the whole payload has been read.
	// The payload bytes are unmasked unless unmasking explicitly disabled on the Rx.
	OnMessage func(rx *Rx, message io.Reader) error
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a stateful websocket single-connection server implementation. It is only partly safe for concurrent use.

Users should use Server in no more than two goroutines unless calling the explicitly stated as concurrency safe methods i.e: Err, IsConnected. These two goroutines should be separated by responsability: The Read goroutine should call methods that read received messages and the Write goroutine should call methods that write to the connection.

Server automatically takes care of incoming Ping and Close frames during ReadNextFrame

func NewServer

func NewServer(rxCopyBuf []byte) *Server

NewClient creates a new Client ready to connect to a websocket enabled server. serverURL can start with wss:// or ws:// to enable TLS (secure websocket protocol). entropy is a function that returns randomized 32 bit integers from a high-entropy source. If entropy is nil then a default will be used.

func (*Server) CloseConn

func (sv *Server) CloseConn(err error) error

CloseConn closes the underlying transport without sending websocket frames.

If err is nil CloseConn will panic.

func (*Server) CloseWebsocket

func (sv *Server) CloseWebsocket(err error) error

CloseWebsocket sends a close control frame over the websocket connection. Does not attempt to close the underlying transport. If CloseWebsocket receives a [*CloseErr] type no memory allocation is performed and it's status code and reason is used in the websocket close frame.

If err is nil CloseWebsocket panics.

func (*Server) ENABLE_DEBUG_LOG deprecated

func (sv *Server) ENABLE_DEBUG_LOG(enable bool)

ENABLE_DEBUG_LOG will NOT stay in this package. It is a temporary helper that is why the name is the way it is. Calling this function with enable set to true without a enable=false call in between will cause duplicated log output.

Deprecated: ENABLE_DEBUG_LOG is deprecated. Will not stay in package peasocket.

func (*Server) Err

func (sv *Server) Err() error

Err returns the error that closed the connection. It only returns an error while the connection is closed. One can test if the client gracefully closed the connection with

errors.As(err, &peasocket.CloseErr{}) // true on graceful websocket closure

Err is safe for concurrent use.

func (*Server) HandleNextFrame

func (sv *Server) HandleNextFrame() error

HandleNextFrame reads next frame in connection and takes care of control frame actions. Should be called in a loop separate from write goroutines.

func (*Server) IsConnected

func (sv *Server) IsConnected() bool

IsConnected returns true if the underlying transport of the websocket is ready to send/receive messages. It is shorthand for s.Err() == nil.

IsConnected is safe for concurrent use.

func (*Server) Ping

func (sv *Server) Ping(ctx context.Context, message []byte) error

Ping blocks until a call to ReadNextFrame receives the corresponding Pong message. Should be called in a goroutine separate to the Server Read goroutine.

func (*Server) WriteFragmentedMessage

func (c *Server) WriteFragmentedMessage(r io.Reader, userBuffer []byte) (int, error)

WriteFragmentedMessage writes contents of r over the wire using the userBuffer as scratch memory. This function does not allocate memory unless userBuffer is nil.

func (*Server) WriteNextMessageTo

func (sv *Server) WriteNextMessageTo(w io.Writer) (FrameType, int64, error)

WriteNextMessageTo writes entire contents the next message in queue to the passed in writer.

type StatusCode

type StatusCode uint16

StatusCode represents a WebSocket status code.

const (
	StatusNormalClosure   StatusCode = 1000
	StatusGoingAway       StatusCode = 1001
	StatusProtocolError   StatusCode = 1002
	StatusUnsupportedData StatusCode = 1003

	// StatusNoStatusRcvd cannot be sent in a close message.
	// It is reserved for when a close message is received without
	// a status code.
	StatusNoStatusRcvd StatusCode = 1005

	// StatusAbnormalClosure is exported for use only with Wasm.
	// In non Wasm Go, the returned error will indicate whether the
	// connection was closed abnormally.
	StatusAbnormalClosure StatusCode = 1006

	StatusInvalidFramePayloadData StatusCode = 1007
	StatusPolicyViolation         StatusCode = 1008
	StatusMessageTooBig           StatusCode = 1009
	StatusMandatoryExtension      StatusCode = 1010
	StatusInternalError           StatusCode = 1011
	StatusServiceRestart          StatusCode = 1012
	StatusTryAgainLater           StatusCode = 1013
	StatusBadGateway              StatusCode = 1014

	// StatusTLSHandshake is only exported for use with Wasm.
	// In non Wasm Go, the returned error will indicate whether there was
	// a TLS handshake failure.
	StatusTLSHandshake StatusCode = 1015
)

These are only the status codes defined by the protocol.

You can define custom codes in the 3000-4999 range. The 3000-3999 range is reserved for use by libraries, frameworks and applications. The 4000-4999 range is reserved for private use.

func (StatusCode) String

func (sc StatusCode) String() (s string)

String returns the lower-cased human readable representation of the websocket closure status code.

type TxBuffered

type TxBuffered struct {
	TxCallbacks TxCallbacks
	// contains filtered or unexported fields
}

TxBuffered handles the marshalling of frames over a underlying transport.

func NewTxBuffered

func NewTxBuffered(wc io.WriteCloser) *TxBuffered

NewTxBuffered creates a new TxBuffered ready for use.

func (*TxBuffered) SetTxTransport

func (tx *TxBuffered) SetTxTransport(wc io.WriteCloser)

SetTxTransport sets the underlying TxBuffered transport writer.

func (*TxBuffered) WriteClose

func (tx *TxBuffered) WriteClose(mask uint32, status StatusCode, reason []byte) (int, error)

WriteClose writes a Close frame over the websocket connection. reason will be mutated if mask is non-zero.

func (*TxBuffered) WriteFragmentedMessage

func (tx *TxBuffered) WriteFragmentedMessage(mask uint32, payload io.Reader, userBuffer []byte) (int, error)

WriteFragmentedMessage writes a fragmented message over websocket without allocating memory. It reads the application message from payload and masks it with mask.

func (*TxBuffered) WriteMessage

func (tx *TxBuffered) WriteMessage(mask uint32, payload []byte) (int, error)

WriteTextMessage writes an arbitrary message over the transport. payload's bytes will be masked and thus mutated if mask is non-zero.

func (*TxBuffered) WritePing

func (tx *TxBuffered) WritePing(mask uint32, message []byte) (int, error)

WritePing writes a ping message over the Tx.

func (*TxBuffered) WritePong

func (tx *TxBuffered) WritePong(mask uint32, message []byte) (int, error)

WritePong writes a pong message over the Tx.

func (*TxBuffered) WriteTextMessage

func (tx *TxBuffered) WriteTextMessage(mask uint32, payload []byte) (int, error)

WriteTextMessage writes an UTF-8 encoded message over the transport. This implementation does not check if payload is valid UTF-8. This can be done with the unicode/utf8 Valid function. payload's bytes will be masked and thus mutated if mask is non-zero.

type TxCallbacks

type TxCallbacks struct {
	OnError func(tx *TxBuffered, err error)
}

TxCallbacks stores functions to be called on events during marshalling of websocket frames.

Directories

Path Synopsis
cmd
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL