crossbar

package
v0.6.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2023 License: AGPL-3.0 Imports: 19 Imported by: 0

README

#Crossbar alt text alt text alt text

Crossbar relays websocket streams

Why?

Remote laboratories require a custom video and data communications eco-system in order to support their wider adoption. Key factors include:

  • Remote laboratory participants (whether human or machine) are often located behind institutional firewalls and NAT
  • Most instituational networks support particpants sending and receiving video and data to external relays, but not acting as a server
  • Those data streams are typically embedded in websockets, whereas UDP and some TCP protocols are sometimes explicitly blocked
  • Almost all video and data messaging vendors with relays are focused solely on human-human communications
    • often missing apparently-minor features from the API that are key for remote lab experiments (e.g. being able to change camera programmatically)
    • often require workarounds for remote lab adminstration tasks which are prevented by privacy features in browsers (e.g. identifying cameras)
    • typically require x10 more expensive computer for the experiment because of the overhead of running graphical operating system and browser
    • most vendors can - quite rightly - only guarantee long-term support for users that conform to their core use-case

Features

  • binary and text websockets
  • multiple, independent streams
    • organised by topics
    • topics set by routing
  • multiple streaming schemas
    • bidirectional N:N streaming
    • unidirectional 1:N streaming
  • streaming schemas are
    • set by routing
    • individually selectable for each stream
  • statistics are recorded, and available via
    • websocket API in JSON format
    • human-readable webpage with 5sec update at <host>/stats

What's with the name?

I once had an old IBM p690 compute cluster whose processor cores had crossbar switches (Core Interface Units) that connect any core with any cache, and do so more efficiently than a standard compute cluster. It seemed apt, because this relay is about connecting any experiment with any human, more efficiently than existing systems, (in a holistic sense, including total cost and effort of ownership, administration, maintenance etc).

Performance

Benchmarking of a mixed-communications relay can be relatively complex, although simple results are enough to give a picture of the performance available. Note that simple calculations on mixed loads, based on the single-producer, single-consumer benchmarks cannot be done accurately because of compiler efficiencies e.g. in duplicating outgoing messages (an order of magnitude less CPU + memory load to duplicate outgoing videos, than it is to receive an incoming video)

Single-producer, single-consumer benchmarks

On Intel(R) Xeon(R) Silver 4110 CPU @ 2.10GHz

Small message

small timeouts (1ms)

3-byte payload: 1000 messages per benchmark (single producer, single consumer)

BenchmarkSmallMessage-32    	       8	 134451695 ns/op

134.5ms for 1000 messages equates to in excess of 7,000 messages per second

1MByte message

Random data:

Each test is 100 message of 1MB, taking 934ms. The total data throughput is 8bits/Byte * 100MB = 800Mbits

BenchmarkLargeMessage: crossbar_test.go:488: Message size: 1048576 bytes
BenchmarkLargeMessage-32    	       2	 933766152 ns/op

for a bandwidth of 800Mbits/0.934sec = 858 Mbps (Mega bits per sec)

This bandwidth of ~850 Mbps is

  • more than twice my fibre broadband download speed (350 Mbps * 2 = 700 Mbps)
  • close to the maximum theoretical limit of a GiGE network connection (1000 Mbps)

The test used the same data in each packet to avoid dragging the results down with the overhead of generating random data. But we can explore whether the compilier is doing some fancy caching optimisation by running a separate benchmark on the random data generation.

BenchmarkLargeMessage
BenchmarkLargeMessage-32                   	       2	 958564581 ns/op
BenchmarkLargeRandomPacketGeneration
BenchmarkLargeRandomPacketGeneration-32    	       2	 618112575 ns/op
BenchmarkLargeRandomMessage
BenchmarkLargeRandomMessage-32             	       1	2421145176 ns/op

The adjusted duration for sending and receiving is now 2.42 - 0.62 = 1.8 sec

This gives an adjusted bandwidth, ignoring the data generation time, of 444 Mbps.

This bandwidth of ~450 Mbps is

  • still in excess of my fibre broadband download speed (350 Mbps)
  • close to the half the maximum theoretical limit of a GiGE network connection (1000 Mbps)

This is still a fairly benign test, in that all the network traffic is internal to my machine.

Real world test (>=375Mbps)

  • Videos (20) source - Edinburgh, UK
  • Crossbar - London, UK
  • Receivers (>300) - Edinburgh, UK

Running on EC2 c5.(x)large in EU-london I ran a 'real use' test and managted to sustain 20 videos in and >300 videos out, and could not push further because of my home fibre bandwidth limits. Each video captured live from raspberry pi on my home network (20x) Individual bandwidth was 1.1Mbps per video, ~30fps 640x480 MPEGTS from ffmpeg for use jsmpeg decoder in browser Total bandwidth in from video cameras was 22-24Mbps (within the published limit of my home fibre of 30Mbps) Total bandwith out ~375Mbps (just over the published limit of my home fibre sysm of 350Mbps).

The peak bandwith for the testing was done with 500 video clients, but this hit the rate limit of my fibre to my house, and it is therefore possible that crossbar could do more ....

Longer term testing

The following result comes from earlier in the development, when the core performance-affecting code became relatively stable. Following four months running continuously (March 2020 - July 2020)) with multiple streams and re-connections of broadcasters and receivers, including a fortnight where a stream was connecting to a new routing every second (in a sequence of four) using a bash script and timdrysdale/vw, there as no appreciable leakage of memory. Here's a text-grab from top on an Amazon EC2 c5.Large :

top - 12:57:15 up 232 days, 22:45, 14 users,  load average: 0.04, 0.04, 0.00
Tasks: 123 total,   1 running,  83 sleeping,   0 stopped,   0 zombie
%Cpu(s):  2.0 us,  0.5 sy,  0.0 ni, 97.5 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  3794284 total,   486436 free,   691080 used,  2616768 buff/cache
KiB Swap:        0 total,        0 free,        0 used.  2831916 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND             
24363 root      20   0  252616  52060   4720 S   4.0  1.4   1019:36 iftop               
31302 ubuntu    20   0  930664  26504   8856 S   1.7  0.7   3852:37 crossbar            
 5917 www-data  20   0  145712  11396   7852 S   1.3  0.3 123:49.74 nginx               
23179 root      20   0  447328  42808  23328 S   0.7  1.1 292:00.12 containerd          
  816 jvb       20   0 5770988 135244   8936 S   0.3  3.6   1002:19 java                
23051 root      20   0  471992  76156  40564 S   0.3  2.0 181:01.84 dockerd             
    1 root      20   0  225484   9556   7068 S   0.0  0.3  12:45.70 systemd

Getting started

Binaries will be available in the future, but for now it is straightforward to compile.

It's quick and easy to create a working go system, following the advice here.

Get the code, test, and install

$ go get github.com/timdrysdale/crossbar
$ cd `go list -f '{{.Dir}}' github.com/timdrysdale/crossbar`
$ go test ./cmd
$ go install

To run the relay

$export CROSSBAR_LISTEN=127.0.0.1:9999
$ crossbar

Navigate to the stats page, e.g. http://localhost:9999/stats

You won't see any connections, yet.

You can connect using any of the other tools in the practable ecosystem, e.g. timdrysdale/vw or if you already have the useful websocat installed, then

websocat --text ws://127.0.0.1:8089/expt - 

If you type some messages, you will see the contents of the row change to reflect that you have sent messages.

Multiple clients are supported

If you connect a second or even third or more times from other terminals, you will see the hub relaying your messages to all other clients. Try typing in each of the terminals, and see that your message makes it to each of the others.

Streams are independent

Try setting up a pair of terminals that are using a different topic, and notice that messages do not pass from one topic to another.

e.g. connect from two terminals using

websocat --text ws://127.0.0.1:8089/sometopic - 

Messages sent in a terminal connected to <>/sometopic will only go to terminals connected to the same route, and not to any other terminal.

Here's a screenshot (note that websocat does a local echo so the sender can see their message; the echo is not from crossbar)

alt text

Usage examples

The default is bidirectional messaging, as you have seen in the example above.

Unidirectional messaging

If you only want to broadcast messages, such as a video stream, then it is nice to have some certainty that one of your clients won't inadvertently mess up the video for others by transmitting some sort of reply. To take advantage of uni-directional messaging, start the server's route with /in/ and the clients' routes with /out/. Note that the rest of the route has to match.

You can try it out yourself.

websocat --text ws://127.0.0.1:8089/in/demo - 

and

websocat --text ws://127.0.0.1:8089/out/demo - 

You can see from the local echo that messages attempted to be sent from the clients connected to /out/ are not sent to any other clients - this is enforced by the hub and does not need any special behaviour from the clients (beyond connecting to the right route). Protecting unauthorised users from connecting to the /in/ route is outside the scope of the crossbar codebase, in line with conventional practice on separating concerns.

alt text

Applications

Relaying video and data

Crossbar has been successfully relaying MPEG video and JSON data (on separate topics) for penduino-ui experiments using ffmpeg and timdrysdale/vw.

Relaying shell access

Shell relay is in a separate project because these have different goals, development schedules, and performance targets, even though some of the underlying code and approach is similar. See timdrysdale/shellbar.

Support / Contributions

Get in touch! My efforts are going into system development at present so support for alpha-stage users is by personal arrangement/consultancy at present. If you wish to contribute, or have development resource available to do so, I'd be more than delighted to discuss with you the possibilities.

Developer stuff

The swagger2.0 spec for the relay-access API can be used to auto-generate the boilerplate for the server code with the command

swagger generate server -t pkg/access -f ./api/openapi-spec/access.yml --exclude-main -A access

Some links to articles on swagger and authentication: jwt auth; context.

Documentation

Overview

Package crossbar provides a message broker with topics

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Crossbar

func Crossbar(config Config, closed <-chan struct{}, denied chan string, parentwg *sync.WaitGroup)

Crossbar creates and runs a new crossbar instance

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a middleperson between the websocket connection and the hub.

type ClientReport

type ClientReport struct {
	CanRead bool `json:"canRead"`

	CanWrite bool `json:"canWrite"`

	Connected string `json:"connected"`

	ExpiresAt string `json:"expiresAt"`

	RemoteAddr string `json:"remoteAddr"`

	Scopes []string `json:"scopes"`

	Stats RxTx `json:"stats"`

	Topic string `json:"topic"`

	UserAgent string `json:"userAgent"`
}

ClientReport represents information about a client's connection, permissions, and statistics

type Config

type Config struct {

	// Audience must match the host in token
	Audience string

	//BufferSize sets the buffer size for client communications channels
	BufferSize int64

	// ExchangeCode swaps a code for the associated Token
	CodeStore *ttlcode.CodeStore

	//DenyStore holds deny-listed bookingIDs
	DenyStore *deny.Store

	// Listen is the listening port
	Listen int

	// Secret is used to validating statsTokens
	Secret string

	//StatsEvery sets how often stats are reported
	StatsEvery time.Duration
}

Config represents configuration options for a crossbar instance Use this struct to pass configuration as argument during testing

func NewDefaultConfig

func NewDefaultConfig() *Config

NewDefaultConfig returns a pointer to a Config struct with default parameters

func (*Config) WithAudience

func (c *Config) WithAudience(audience string) *Config

WithAudience specificies the audience for the tokens

func (*Config) WithBufferSize added in v0.4.8

func (c *Config) WithBufferSize(n int64) *Config

WithBufferSize specifies the client channel buffer size

func (*Config) WithCodeStoreTTL

func (c *Config) WithCodeStoreTTL(ttl int64) *Config

WithCodeStoreTTL specifies the lifetime for the codestore

func (*Config) WithListen

func (c *Config) WithListen(listen int) *Config

WithListen specified which (int) port to listen on

func (*Config) WithStatsEvery added in v0.5.0

func (c *Config) WithStatsEvery(statsEvery time.Duration) *Config

WithStatsEvery specifies how often to send stats

type ConnectionType

type ConnectionType int

ConnectionType represents whether the connection is session, shell, or unsupported

const (
	Session ConnectionType = iota
	Shell
	Unsupported
)

Session, Shell and Unsupported are enumerated values of ConnectionType

type Frames

type Frames struct {
	// contains filtered or unexported fields
}

Frames represents statistics on (video) frames sent over a connection

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub maintains the set of active clients and broadcasts messages to the clients.

func (*Hub) SetDenyChannelStore added in v0.3.0

func (h *Hub) SetDenyChannelStore(dcs *chanmap.Store)

SetDenyChannelStore adds a pointer to the channel map store to the hub

type ReportStats

type ReportStats struct {
	Last string `json:"last"` //how many seconds ago...

	Size float64 `json:"size"`

	Fps float64 `json:"fps"`
}

ReportStats represents statistics about what has been sent/received

type RxTx

type RxTx struct {
	Tx ReportStats `json:"tx"`
	Rx ReportStats `json:"rx"`
}

RxTx represents statistics for both receive and transmit

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

Stats represents statistics for a connection

type StatsCommand

type StatsCommand struct {
	Command string `json:"cmd"`
}

StatsCommand represents a command in string form

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL