dispatcher

package
v0.1.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2023 License: Unlicense Imports: 22 Imported by: 0

Documentation

Overview

Package dispatcher is a network packet send/receive handler for peer to peer connections between relays.

Messages between peers are usually somewhat large, multi-layered onion messages that contain forwarding instructions for sending, and the dispatcher breaks them down into uniform sized segments and randomises their order.

On the receiving side, there is a buffer for incoming message segments, and when sufficient segments are received to enable reconstruction, reconstruction is attempted.

Messages are broken up into pieces with additional segments added to ensure the receiver gets enough pieces to decode the message without a message retransmit request, using Reed Solomon encoding (accelerated by AVX).

The dispatcher operates with reliable TCP connections, and does not directly influence retransmit but instead monitors the latency of the messages and identifies when there has been a retransmit and increases the redundant data added to the stream of message packet segments.

In this way, the dispatcher aims to always see sufficient data arrive in one message cycle so to minimise the latency of connections between peers, and the subsequent latency of client's routed packets.

Another feature of the dispatcher is key change processing - this is implemented as a concurrent update from the receiver specifying a new public key to use, it keeps the old key for a time to deal with in transit messages that were encrypted with the old key and the peer should update its key to use for all messages after the key has been received and updated.

Index

Constants

View Source
const (
	// DefaultStartingParity is set to 64, or 25%
	DefaultStartingParity = 64
	// DefaultDispatcherRekey is 16mb to trigger rekey.
	DefaultDispatcherRekey = 1 << 20
	TimeoutPingCount       = 10
)
View Source
const (
	NewKeyMagic      = "newK"
	AcknowledgeMagic = "ackn"
	OnionMagic       = "onio"
)

Variables

This section is empty.

Functions

func AcknowledgeGen

func AcknowledgeGen() coding.Codec

func InitRekeyGen

func InitRekeyGen() coding.Codec

func OnionGen

func OnionGen() coding.Codec

Types

type Acknowledge

type Acknowledge struct {
	*RxRecord
}

Acknowledge wraps up an RxRecord to tell the other side how a message transmission went.

func (*Acknowledge) Decode

func (a *Acknowledge) Decode(s *splice.Splice) (e error)

func (*Acknowledge) Encode

func (a *Acknowledge) Encode(s *splice.Splice) (e error)

func (*Acknowledge) GetOnion

func (a *Acknowledge) GetOnion() interface{}

func (*Acknowledge) Len

func (a *Acknowledge) Len() int

func (*Acknowledge) Magic

func (a *Acknowledge) Magic() string

type Completion

type Completion struct {
	ID   nonce.ID
	Time time.Time
}

type Dispatcher

type Dispatcher struct {
	// Parity is the parity parameter to use in packet segments, this value
	// should be adjusted up and down in proportion with collected data on how
	// many packets led to receipt against the ratio of DataSent / TotalSent *
	// PingDivergence.
	Parity atomic.Uint32
	// DataSent is the amount of actual data sent in messages.
	DataSent *big.Int
	// DataReceived is the amount of actual data received in messages.
	DataReceived *big.Int
	// TotalSent is the amount of bytes that were needed to successfully
	// transmit, including packet overhead. This is the raw size of the
	// segmented packets that were sent.
	TotalSent *big.Int
	// TotalReceived is the amount of bytes that were needed to successfully
	// transmit, including packet overhead. This is the raw size of the
	// segmented packets that were received.
	TotalReceived *big.Int
	ErrorEWMA     ewma.MovingAverage
	Ping          ewma.MovingAverage
	// PingDivergence represents the proportion of time between start of send
	// and receiving acknowledgement, versus the ping RTT being actively
	// measured concurrently. Shorter/equal time means it can reduce redundancy,
	// longer time needs to increase it.
	//
	// Combined with DataSent / TotalSent this guides the error correction
	// parameter for a given transmission that minimises latency. Onion routing
	// necessarily amplifies any latency so making a transmission get across
	// before/without retransmits is as good as the path can provide.
	PingDivergence  ewma.MovingAverage
	Duplex          *transport.DuplexByteChan
	Done            []Completion
	PendingInbound  []*RxRecord
	PendingOutbound []*TxRecord
	Partials        map[nonce.ID]packet.Packets
	Prv             []*crypto.Prv
	KeyLock         sync.Mutex

	Conn  *transport.Conn
	Mutex sync.Mutex
	Ready qu.C
	// contains filtered or unexported fields
}

Dispatcher is a message splitter/joiner and error correction adjustment system that aims to minimise message latency by trading it for bandwidth especially to cope with radio connections.

In its initial implementation by necessity reliable network transports are used, which means that the message transit time is increased for packet retransmits, thus a longer transit time than the ping indicates packet transmit failures.

PingDivergence is adjusted with each acknowledgement from the message transit time compared to the current ping, if it is within range of the ping RTT this doesn't affect the adjustment.

DataSent / TotalSent provides the ratio of redundancy the channel is using. TotalSent is not from the parameters at send but from acknowledgements of how much data was received before a message was reconstructed. Thus, it is used in combination with the PingDivergence to recompute the Parity parameter used for adjusting error correction redundancy as each message is decoded.

func NewDispatcher

func NewDispatcher(l *transport.Conn, ctx context.Context,
	ks *crypto.KeySet) (d *Dispatcher)

NewDispatcher initialises and starts up a Dispatcher with the provided connection, acquired by dialing or Accepting inbound connection from a peer.

func (*Dispatcher) GetRxRecordAndPartials

func (d *Dispatcher) GetRxRecordAndPartials(id nonce.ID) (rxr *RxRecord,
	packets packet.Packets)

GetRxRecordAndPartials returns the receive record and packets received so far for a message with a given ID>

func (*Dispatcher) Handle

func (d *Dispatcher) Handle(m slice.Bytes, rxr *RxRecord)

Handle the message. This is expected to be called with the mutex locked, so nothing in it should be trying to lock it.

func (*Dispatcher) HandlePing

func (d *Dispatcher) HandlePing(p ping.Result)

HandlePing adds a current ping result and combines it into the running exponential weighted moving average.

func (*Dispatcher) Mx

func (d *Dispatcher) Mx(fn func() bool) bool

Mx runs a closure with the dispatcher mutex locked which returns a bool that passes through to the result of the dispatcher.Mx function.

func (*Dispatcher) ReKey

func (d *Dispatcher) ReKey()

ReKey sends a new key for the other party to use for sending future messages.

func (*Dispatcher) RecvFromConn

func (d *Dispatcher) RecvFromConn(m slice.Bytes)

RecvFromConn receives a new message from the connection, checks if it can be reassembled and if it can, dispatches it to the receiver channel.

func (*Dispatcher) RunGC

func (d *Dispatcher) RunGC()

RunGC runs the garbage collection for the dispatcher. Stale data and completed transmissions are purged from memory.

func (*Dispatcher) SendAck

func (d *Dispatcher) SendAck(rxr *RxRecord)

SendAck sends an acknowledgement record for a successful transmission of a message.

func (*Dispatcher) SendToConn

func (d *Dispatcher) SendToConn(m slice.Bytes) (pieces int)

SendToConn delivers a buffer to be sent over the connection, and returns the number of packets that were sent.

type NewKey

type NewKey struct {
	NewPubkey *crypto.Pub
}

NewKey delivers a new public key for the other side to use to encrypt messages.

func (*NewKey) Decode

func (k *NewKey) Decode(s *splice.Splice) (e error)

func (*NewKey) Encode

func (k *NewKey) Encode(s *splice.Splice) (e error)

func (*NewKey) GetOnion

func (k *NewKey) GetOnion() interface{}

func (*NewKey) Len

func (k *NewKey) Len() int

func (*NewKey) Magic

func (k *NewKey) Magic() string

type Onion

type Onion struct {
	slice.Bytes // contains an encoded Onion.
}

func (*Onion) Decode

func (m *Onion) Decode(s *splice.Splice) (e error)

func (*Onion) Encode

func (m *Onion) Encode(s *splice.Splice) (e error)

func (*Onion) GetOnion

func (m *Onion) GetOnion() interface{}

func (*Onion) Len

func (m *Onion) Len() int

func (*Onion) Magic

func (m *Onion) Magic() string

func (Onion) Unpack

func (m Onion) Unpack() (mu ont.Onion)

type RxRecord

type RxRecord struct {
	ID nonce.ID
	// Hash is the hash of the reconstructed message received.
	Hash sha256.Hash
	// First is when the first packet was received.
	First time.Time
	// Last is when the last packet was received. A longer time than the current
	// ping RTT after First indicates retransmits.
	Last time.Time
	// Size of the message as found in the packet headers.
	Size uint64
	// Received is the number of bytes received upon reconstruction, including
	// packet overhead.
	Received uint64
	// Ping is the average ping RTT on the connection calculated at each packet
	// receive, used with the total message transmit time to estimate an
	// adjustment in the parity shards to be used in sending on this connection.
	Ping time.Duration
}

RxRecord is the details of a message reception and mostly forms the data sent in a message received acknowledgement. This data goes into an acknowledgement message.

type TxRecord

type TxRecord struct {
	ID nonce.ID
	// Hash is the record of the hash of the original message.
	sha256.Hash
	// First is the time the first piece was sent.
	First time.Time
	// Last is the time the last piece was sent.
	Last time.Time
	// Size is the number of bytes in the message payload.
	Size int
	// Ping is the recorded average current round trip time at send.
	Ping time.Duration
}

TxRecord is the details of a send operation in progress. This is used with the data received in the acknowledgement, which is a completed RxRecord..

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL