Documentation ¶
Overview ¶
Package dispatcher is a network packet send/receive handler for peer to peer connections between relays.
Messages between peers are usually somewhat large, multi-layered onion messages that contain forwarding instructions for sending, and the dispatcher breaks them down into uniform sized segments and randomises their order.
On the receiving side, there is a buffer for incoming message segments, and when sufficient segments are received to enable reconstruction, reconstruction is attempted.
Messages are broken up into pieces with additional segments added to ensure the receiver gets enough pieces to decode the message without a message retransmit request, using Reed Solomon encoding (accelerated by AVX).
The dispatcher operates with reliable TCP connections, and does not directly influence retransmit but instead monitors the latency of the messages and identifies when there has been a retransmit and increases the redundant data added to the stream of message packet segments.
In this way, the dispatcher aims to always see sufficient data arrive in one message cycle so to minimise the latency of connections between peers, and the subsequent latency of client's routed packets.
Another feature of the dispatcher is key change processing - this is implemented as a concurrent update from the receiver specifying a new public key to use, it keeps the old key for a time to deal with in transit messages that were encrypted with the old key and the peer should update its key to use for all messages after the key has been received and updated.
Index ¶
- Constants
- func AcknowledgeGen() codec.Codec
- func InitRekeyGen() codec.Codec
- func OnionGen() codec.Codec
- type Acknowledge
- type Completion
- type Dispatcher
- func (d *Dispatcher) GetRxRecordAndPartials(id nonce.ID) (rxr *RxRecord, packets packet.Packets)
- func (d *Dispatcher) Handle(m slice.Bytes, rxr *RxRecord)
- func (d *Dispatcher) HandlePing(p ping.Result)
- func (d *Dispatcher) Mx(fn func() bool) bool
- func (d *Dispatcher) ReKey()
- func (d *Dispatcher) RecvFromConn(m slice.Bytes)
- func (d *Dispatcher) RunGC()
- func (d *Dispatcher) SendAck(rxr *RxRecord)
- func (d *Dispatcher) SendToConn(m slice.Bytes) (pieces int)
- type NewKey
- type Onion
- type RxRecord
- type TxRecord
Constants ¶
const ( // DefaultStartingParity is set to 64, or 25% DefaultStartingParity = 64 // DefaultDispatcherRekey is 16mb to trigger rekey. DefaultDispatcherRekey = 1 << 20 // TimeoutPingCount defines the number of pings that fail to be sure the peer is // offline. TimeoutPingCount = 10 )
const ( NewKeyMagic = "newK" AcknowledgeMagic = "ackn" OnionMagic = "onio" )
Magic bytes message prefixes for Dispatcher messages.
Variables ¶
This section is empty.
Functions ¶
func AcknowledgeGen ¶
AcknowledgeGen is a factory function that will be added to the registry for recognition and generation.
func InitRekeyGen ¶
InitRekeyGen is a factory function to generate a NewKey.
Types ¶
type Acknowledge ¶
type Acknowledge struct {
*RxRecord
}
Acknowledge wraps up an RxRecord to tell the other side how a message transmission went.
func (*Acknowledge) Decode ¶
func (a *Acknowledge) Decode(s *splice.Splice) (e error)
Decode a splice with the cursor at the first byte after the magic.
func (*Acknowledge) Encode ¶
func (a *Acknowledge) Encode(s *splice.Splice) (e error)
Encode an Acknowledge message to a splice.
func (*Acknowledge) Len ¶
func (a *Acknowledge) Len() int
Len returns the length of an Acknowledge message in bytes.
func (*Acknowledge) Magic ¶
func (a *Acknowledge) Magic() string
Magic is the identifying 4 byte prefix of an Acknowledge in binary form.
func (*Acknowledge) Unwrap ¶ added in v0.1.19
func (a *Acknowledge) Unwrap() interface{}
Unwrap returns nil because there is no onion inside an Acknowledge.
type Completion ¶
Completion is a record of a completed transmission, identified by its nonce.ID.
type Dispatcher ¶
type Dispatcher struct { // Parity is the parity parameter to use in packet segments, this value // should be adjusted up and down in proportion with collected data on how // many packets led to receipt against the ratio of DataSent / TotalSent * // PingDivergence. Parity atomic.Uint32 // DataSent is the amount of actual data sent in messages. DataSent *big.Int // DataReceived is the amount of actual data received in messages. DataReceived *big.Int // TotalSent is the amount of bytes that were needed to successfully // transmit, including packet overhead. This is the raw size of the // segmented packets that were sent. TotalSent *big.Int // TotalReceived is the amount of bytes that were needed to successfully // transmit, including packet overhead. This is the raw size of the // segmented packets that were received. TotalReceived *big.Int // ErrorEWMA is the exponential weighted moving average of the error rate in // transmissions to a peer. ErrorEWMA ewma.MovingAverage // Ping records the exponential wegihted moving average of the round trip time to a peer. Ping ewma.MovingAverage // PingDivergence represents the proportion of time between start of send // and receiving acknowledgement, versus the ping RTT being actively // measured concurrently. Shorter/equal time means it can reduce redundancy, // longer time needs to increase it. // // Combined with DataSent / TotalSent this guides the error correction // parameter for a given transmission that minimises latency. Onion routing // necessarily amplifies any latency so making a transmission get across // before/without retransmits is as good as the path can provide. PingDivergence ewma.MovingAverage // Duplex is the in-memory atomic FIFO that is used in process to read and write // to the network data processing pipeline. Duplex *transport.DuplexByteChan // Done stores the completed transmissions and their completion time. Used to // evaluate the quality of the connection via relative time delays. Done []Completion // PendingInbound stores records for transmissions that are in process but have // not either failed all pieces recieved yet. PendingInbound []*RxRecord // PendingOutbound keeps track of all the messages dispatched to the peer but not // yet acknowledged or timed out. PendingOutbound []*TxRecord // Partials stores the received message segments identified by the transmission // nonce.ID. Partials map[nonce.ID]packet.Packets // Prv is the list of keys used in this connection, in case a transmission gets // delayed extraordinarily long time it can still be decrypted. GC should be done // on these to keep no more than a dozen or so past keys. Prv []*crypto.Prv // KeyLock is a mutex specifically for accessing the Prv field above. KeyLock sync.Mutex // Conn is the transport connection that messages to this peer are sent to and // received from. Conn *transport.Conn // Mutex lock for (todo: maybe we don't need so many of these?) Mutex sync.Mutex // Ready is a signal channel that is closed when the dispatcher is operational. Ready qu.C // contains filtered or unexported fields }
Dispatcher is a message splitter/joiner and error correction adjustment system that aims to minimise message latency by trading it for bandwidth especially to cope with radio connections.
Each connection has a dispatcher for handling messages, all relevant values in the structure relate to the connection to a single peer.
In its initial implementation by necessity reliable network transports are used, which means that the message transit time is increased for packet retransmits, thus a longer transit time than the ping indicates packet transmit failures.
PingDivergence is adjusted with each acknowledgement from the message transit time compared to the current ping, if it is within range of the ping RTT this doesn't affect the adjustment.
DataSent / TotalSent provides the ratio of redundancy the channel is using. TotalSent is not from the parameters at send but from acknowledgements of how much data was received before a message was reconstructed. Thus, it is used in combination with the PingDivergence to recompute the Parity parameter used for adjusting error correction redundancy as each message is decoded.
func NewDispatcher ¶
NewDispatcher initialises and starts up a Dispatcher with the provided connection, acquired by dialing or Accepting inbound connection from a peer.
func (*Dispatcher) GetRxRecordAndPartials ¶
GetRxRecordAndPartials returns the receive record and packets received so far for a message with a given nonce.ID.
func (*Dispatcher) Handle ¶
func (d *Dispatcher) Handle(m slice.Bytes, rxr *RxRecord)
Handle the message. This is expected to be called with the mutex locked, so nothing in it should be trying to lock it.
func (*Dispatcher) HandlePing ¶
func (d *Dispatcher) HandlePing(p ping.Result)
HandlePing adds a current ping result and combines it into the running exponential weighted moving average.
func (*Dispatcher) Mx ¶
func (d *Dispatcher) Mx(fn func() bool) bool
Mx runs a closure with the dispatcher mutex locked which returns a bool that passes through to the result of the dispatcher.Mx function. Don't call anything that touches the dispatcher's Mutex in this closure.
func (*Dispatcher) ReKey ¶
func (d *Dispatcher) ReKey()
ReKey sends a new key for the other party to use for sending future messages.
func (*Dispatcher) RecvFromConn ¶
func (d *Dispatcher) RecvFromConn(m slice.Bytes)
RecvFromConn receives a new message from the connection, checks if it can be reassembled and if it can, dispatches it to the receiver channel.
func (*Dispatcher) RunGC ¶
func (d *Dispatcher) RunGC()
RunGC runs the garbage collection for the dispatcher. Stale data and completed transmissions are purged from memory.
func (*Dispatcher) SendAck ¶
func (d *Dispatcher) SendAck(rxr *RxRecord)
SendAck sends an acknowledgement record for a successful transmission of a message.
func (*Dispatcher) SendToConn ¶
func (d *Dispatcher) SendToConn(m slice.Bytes) (pieces int)
SendToConn delivers a buffer to be sent over the connection, and returns the number of packets that were sent.
type NewKey ¶
NewKey delivers a new public key for the other side to use to encrypt messages.
func (*NewKey) Decode ¶
Decode a NewKey out of a splice with cursor pointing to the first byte after the magic.
type Onion ¶
Onion is an onion, intended to be processed by the recipient, its layer decoded and the enclosed message received and processed appropriately.
func (*Onion) Decode ¶
Decode an Onion out of a splice with cursor pointing to the first byte after the magic.
type RxRecord ¶
type RxRecord struct { ID nonce.ID // Hash is the hash of the reconstructed message received. Hash sha256.Hash // First is when the first packet was received. First time.Time // Last is when the last packet was received. A longer time than the current // ping RTT after First indicates retransmits. Last time.Time // Size of the message as found in the packet headers. Size uint64 // Received is the number of bytes received upon reconstruction, including // packet overhead. Received uint64 // Ping is the average ping RTT on the connection calculated at each packet // receive, used with the total message transmit time to estimate an // adjustment in the parity shards to be used in sending on this connection. Ping time.Duration }
RxRecord is the details of a message reception and mostly forms the data sent in a message received acknowledgement. This data goes into an acknowledgement message.
type TxRecord ¶
type TxRecord struct { ID nonce.ID // Hash is the record of the hash of the original message. sha256.Hash // First is the time the first piece was sent. First time.Time // Last is the time the last piece was sent. Last time.Time // Size is the number of bytes in the message payload. Size int // Ping is the recorded average current round trip time at send. Ping time.Duration }
TxRecord is the details of a send operation in progress. This is used with the data received in the acknowledgement, which is a completed RxRecord..