Documentation ¶
Overview ¶
socket implements a signed uncencrypted TCP socket.
Index ¶
- Constants
- Variables
- func AssembleCommittee[T TokenComparer](ctx context.Context, peers []TokenAddr, connected []T, ...) chan []T
- func ConnectToAll(ctx context.Context, peers []TokenAddr, connected []*SignedConnection, ...) chan []*SignedConnection
- func CreateConnectionPair(node string, port int) (*SignedConnection, *SignedConnection)
- func Listen(address string) (net.Listener, error)
- type AcceptValidConnections
- type Aggregator
- func (b *Aggregator) AddNewOne(peers []TokenAddr) (*SignedConnection, error)
- func (b *Aggregator) AddOne(peers []TokenAddr) (*SignedConnection, error)
- func (b *Aggregator) AddProvider(provider TokenAddr) (*SignedConnection, error)
- func (b *Aggregator) CloseProvider(provider crypto.Token)
- func (b *Aggregator) Has(peer TokenAddr) bool
- func (b *Aggregator) HasAny(peers []TokenAddr) bool
- func (b *Aggregator) Read() ([]byte, error)
- func (b *Aggregator) Shutdown()
- type BufferedChannel
- func (b *BufferedChannel) Is(token crypto.Token) bool
- func (b *BufferedChannel) Len() int
- func (b *BufferedChannel) Read() []byte
- func (b *BufferedChannel) ReadSide() []byte
- func (b *BufferedChannel) Send(data []byte)
- func (b *BufferedChannel) SendSide(data []byte)
- func (b *BufferedChannel) Shutdown()
- type BufferedMultiChannel
- func (b *BufferedMultiChannel) Is(token crypto.Token) bool
- func (b *BufferedMultiChannel) Read(epoch uint64) []byte
- func (b *BufferedMultiChannel) ReadSide() []byte
- func (b *BufferedMultiChannel) Release(epoch uint64)
- func (b *BufferedMultiChannel) Send(epoch uint64, data []byte)
- func (b *BufferedMultiChannel) SendSide(data []byte)
- func (b *BufferedMultiChannel) Shutdown()
- type CachedConnection
- type ChannelConnection
- func (c *ChannelConnection) Activate()
- func (c *ChannelConnection) Is(token crypto.Token) bool
- func (c *ChannelConnection) Read(epoch uint64) []byte
- func (c *ChannelConnection) Register(epoch uint64, signal chan []byte) bool
- func (c *ChannelConnection) Release(epoch uint64)
- func (c *ChannelConnection) Send(msg []byte)
- func (c *ChannelConnection) Shutdown()
- func (c *ChannelConnection) Sleep()
- type ConnectionPool
- type Gossip
- type GossipMessage
- type Message
- type PercolationPool
- type PercolationRule
- type SignedConnection
- func Dial(hostname, address string, credentials crypto.PrivateKey, token crypto.Token) (*SignedConnection, error)
- func DialCtx(ctx context.Context, hostname, address string, credentials crypto.PrivateKey, ...) (*SignedConnection, error)
- func DialTCP(laddr, raddr *net.TCPAddr, credentials crypto.PrivateKey, token crypto.Token) (*SignedConnection, error)
- func PromoteConnection(conn net.Conn, prvKey crypto.PrivateKey, validator ValidateConnection) (*SignedConnection, error)
- type TokenAddr
- type TokenComparer
- type TrustedAggregator
- type ValidateConnection
- type ValidateSingleConnection
Constants ¶
const CommitteeRetries = 5 // number of retries to connect to a peer before giving up
const (
PingPongInterval = time.Second
)
Variables ¶
var AcceptAllConnections = acceptAll{}
An implementation with ValidateConnection interface that accepts all reequested connections.
var CommitteeRetryDelay = time.Second // should wait for this period before retrying
var ErrInvalidSignature = errors.New("signature is invalid")
var ErrMessageTooLarge = errors.New("message size cannot be larger than 65.536 bytes")
var TCPNetworkTest = &testNetwork{ hosts: make(map[string]*testHost), ctx: context.Background(), listeners: make(map[string]*fakePort), live: make([]net.Conn, 0), }
TCPNetworkTest is the global test network
Functions ¶
func AssembleCommittee ¶
func AssembleCommittee[T TokenComparer](ctx context.Context, peers []TokenAddr, connected []T, NewT func(*SignedConnection) T, credentials crypto.PrivateKey, port int, hostname string) chan []T
AssembleCommittee assembles a committee of nodes. It returns a channel for the slice of connections. The channel will be populated with all the connections that were possible to establish. The caller is responsible to attest if the pool is acceptable or not. peers is the list of peers expected in the committee. connected is the list of live connections. NewT is a function that creates a new T object from a signed connection. credentials is the private key of the node. port is the port to listen on for new connections (other nodes will try to assemble the pool at the same time). hostname is "localhost" or "" for internet connections anything else for testing.
func ConnectToAll ¶
func ConnectToAll(ctx context.Context, peers []TokenAddr, connected []*SignedConnection, credentials crypto.PrivateKey, port int, hostname string) chan []*SignedConnection
func CreateConnectionPair ¶
func CreateConnectionPair(node string, port int) (*SignedConnection, *SignedConnection)
Types ¶
type AcceptValidConnections ¶
type AcceptValidConnections struct {
// contains filtered or unexported fields
}
An implementation with ValidateConnection interface that accepts only connections from a list of tokens.
func NewValidConnections ¶
func NewValidConnections(conn []crypto.Token, open bool) *AcceptValidConnections
NewValidConnections returns a new AcceptValidConnections with the given list of tokens.
func (*AcceptValidConnections) Add ¶
func (a *AcceptValidConnections) Add(token crypto.Token)
Add adds a token to the list of valid tokens.
func (*AcceptValidConnections) Remove ¶
func (a *AcceptValidConnections) Remove(token crypto.Token)
Remove removes a token from the list of valid tokens.
func (*AcceptValidConnections) String ¶
func (a *AcceptValidConnections) String() string
func (*AcceptValidConnections) ValidateConnection ¶
func (a *AcceptValidConnections) ValidateConnection(token crypto.Token) chan bool
ValidateConnection returns channled with value true if the given token is in the list of valid tokens and false otherwise.
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
Aggregator consolidates data from multiple providers into a bufferred channel. Redundant data is discarded. The aggregator is live until the context is done. All connections for Aggregator are initiated by the aggregator itself.
func NewAgregator ¶
func NewAgregator(ctx context.Context, hostname string, credentials crypto.PrivateKey, connections ...*SignedConnection) *Aggregator
NewAgregator creates a new aggregator. The aggregator is live until the context is done. hostname should be empty or localhost for internet connections. credentials are used to stablish connections to providers.
func (*Aggregator) AddNewOne ¶
func (b *Aggregator) AddNewOne(peers []TokenAddr) (*SignedConnection, error)
AddNewOne will try to establish a new connection with one of the given peers. It will select a random peer to connect to, and if not successful it will try the next (in a circular fashion) one until it can connect to one. If there is no new peer provided or it cannot connect to any of the given new
peers, an error is returned.
func (*Aggregator) AddOne ¶
func (b *Aggregator) AddOne(peers []TokenAddr) (*SignedConnection, error)
AddOne will return nil if the aggregator has a connection to any of the given peers or it could establish a connection with one of the given peers. It will select a random peer to connect to, and if not successful it will try the next (in a circular fashion) one until it can connect to one. If it cannot connect to any of the given peers, an error is returned.
func (*Aggregator) AddProvider ¶
func (b *Aggregator) AddProvider(provider TokenAddr) (*SignedConnection, error)
AddProvider tries to connect to the given provider and add it to the list of providers. If the connection fails, an error is returned.
func (*Aggregator) CloseProvider ¶
func (b *Aggregator) CloseProvider(provider crypto.Token)
CloseProvider closes the connection to the given provider and exludes it from the provider list.
func (*Aggregator) Has ¶
func (b *Aggregator) Has(peer TokenAddr) bool
Has returns true if the aggregator has a connection to the given provider ( same address and same token) or false otherwise
func (*Aggregator) HasAny ¶
func (b *Aggregator) HasAny(peers []TokenAddr) bool
HasAny returns true if the aggregator has a connection to any of the given providers or false otherwise
func (*Aggregator) Read ¶
func (b *Aggregator) Read() ([]byte, error)
Read returns the next data from the aggregator. It blocks if there is no data available.
func (*Aggregator) Shutdown ¶
func (b *Aggregator) Shutdown()
CloseAllProviders closes all connections to providers and clears the provider list.
type BufferedChannel ¶
type BufferedChannel struct { Conn *SignedConnection Live bool // contains filtered or unexported fields }
BufferedChannel channels data from a signed connection to byte array channel. Data is read from the connection and store in a buffer until it is read from the channel. The channel will block when not data is available. BufferedChannel offers two channels per connection. Data on transit over the main channel is prepended with a zero byte, data on the side channel prepended with a one byte. The structure also offers a Len() method to check the number of bufferred messages in each channel.
func NewBufferredChannel ¶
func NewBufferredChannel(conn *SignedConnection) *BufferedChannel
NewBufferredChannel returns a new BufferedChannel for the given connection.
func (*BufferedChannel) Is ¶
func (b *BufferedChannel) Is(token crypto.Token) bool
Is returns true if the token of the connection is equal to the given token.
func (*BufferedChannel) Len ¶
func (b *BufferedChannel) Len() int
Len returns the number of messages bufferred in the main channel.
func (*BufferedChannel) Read ¶
func (b *BufferedChannel) Read() []byte
Read reads data from the main channel buffer. If the buffer is empty, it blocks until data is available.
func (*BufferedChannel) ReadSide ¶
func (b *BufferedChannel) ReadSide() []byte
Read reads data from the side channel buffer. If the buffer is empty, it blocks until data is available.
func (*BufferedChannel) Send ¶
func (b *BufferedChannel) Send(data []byte)
Send messages through the main channel. The message is prepended with a zero byte.
func (*BufferedChannel) SendSide ¶
func (b *BufferedChannel) SendSide(data []byte)
Sned messages through the side channel. The message is prepended with a one byte.
func (*BufferedChannel) Shutdown ¶
func (b *BufferedChannel) Shutdown()
type BufferedMultiChannel ¶
type BufferedMultiChannel struct { Conn *SignedConnection Live bool // contains filtered or unexported fields }
BufferedChannel channels data from a signed connection to byte array channel. Data is read from the connection and store in a buffer until it is read from the channel. The channel will block when not data is available. BufferedChannel offers two channels per connection. Data on transit over the main channel is prepended with a zero byte, data on the side channel prepended with a one byte. The structure also offers a Len() method to check the number of bufferred messages in each channel.
func NewBufferredMultiChannel ¶
func NewBufferredMultiChannel(conn *SignedConnection) *BufferedMultiChannel
NewBufferredChannel returns a new BufferedChannel for the given connection.
func (*BufferedMultiChannel) Is ¶
func (b *BufferedMultiChannel) Is(token crypto.Token) bool
Is returns true if the token of the connection is equal to the given token.
func (*BufferedMultiChannel) Read ¶
func (b *BufferedMultiChannel) Read(epoch uint64) []byte
Read reads data from the main channel buffer. If the buffer is empty, it blocks until data is available.
func (*BufferedMultiChannel) ReadSide ¶
func (b *BufferedMultiChannel) ReadSide() []byte
Read reads data from the side channel buffer. If the buffer is empty, it blocks until data is available.
func (*BufferedMultiChannel) Release ¶
func (b *BufferedMultiChannel) Release(epoch uint64)
Release the channel and buffer (if any) of the provided epoch.
func (*BufferedMultiChannel) Send ¶
func (b *BufferedMultiChannel) Send(epoch uint64, data []byte)
Send messages through the main channel. The message is prepended with the given epoch.
func (*BufferedMultiChannel) SendSide ¶
func (b *BufferedMultiChannel) SendSide(data []byte)
Send a message through the side channel. The message is prepended with eight zero bytes.
func (*BufferedMultiChannel) Shutdown ¶
func (b *BufferedMultiChannel) Shutdown()
type CachedConnection ¶
type CachedConnection struct { Live bool // contains filtered or unexported fields }
ChachedConnection is a wrapper around a SignedConnection that buffers sent data until the connection is declared ready. Data can be sent without buffering by calling SendDirect. The connection is declared ready by calling Ready(). This is used for syncing the blockchain. New information is sent through the bufferred channel while past information is sent directly.
func NewCachedConnection ¶
func NewCachedConnection(conn *SignedConnection) *CachedConnection
NewCachedConnection creates a new CachedConnection over a signed connection.
func (*CachedConnection) Close ¶
func (c *CachedConnection) Close()
Close graciously closes the connection.
func (*CachedConnection) Ready ¶
func (c *CachedConnection) Ready()
Ready declares the connection ready to send data. This will trigger the buffered data to be sent.
func (*CachedConnection) Send ¶
func (c *CachedConnection) Send(data []byte)
Send sends data to the remote node. If the connection is not ready, the data is buffered. If the connection is ready, the data is sent directly.
func (*CachedConnection) SendDirect ¶
func (c *CachedConnection) SendDirect(data []byte) error
SendDirect sends data to the remote node without buffering. If the connection is ready, Send should be used instead.
func (*CachedConnection) Token ¶
func (c *CachedConnection) Token() crypto.Token
Token returns the remote token of the underlying signed connection.
type ChannelConnection ¶
type ChannelConnection struct { Conn *SignedConnection Signal map[uint64]chan []byte Iddle bool Live bool // contains filtered or unexported fields }
ChannelConnection is a wrapper around a SignedConnection that separates messages by epoch and routes them to dedicated byte array channels. The epoch is store between byte 1 and byte 8 of the message. If there is an open channel for that epoch, the message is sent to the channel. Otherwise it is discarded. The connection can be sent into iddle mode, in which case all messages received are simply ignored.
func AssembleChannelNetwork ¶
func AssembleChannelNetwork(ctx context.Context, peers []TokenAddr, credentials crypto.PrivateKey, port int, hostname string, existing []*ChannelConnection) []*ChannelConnection
AssembleChannelNetwork assembles a committee of ChannelConnections. It returns a channel for the slice of connections. The channel will be populated with all the connections that were possible to establish.
func NewChannelConnection ¶
func NewChannelConnection(conn *SignedConnection) *ChannelConnection
NewChannelConnection returns a new ChannelConnection for the given signed connection.
func (*ChannelConnection) Activate ¶
func (c *ChannelConnection) Activate()
Activate sets the connection to active mode, in which case messages are routed to the corresponding channels.
func (*ChannelConnection) Is ¶
func (c *ChannelConnection) Is(token crypto.Token) bool
Is returns true if the token of the connection is equal to the given token.
func (*ChannelConnection) Read ¶
func (c *ChannelConnection) Read(epoch uint64) []byte
Read reads a message from the channel corresponding to the given epoch. If there is no channel for that epoch, it returns nil. Otherwise it will block until a message is received.
func (*ChannelConnection) Register ¶
func (c *ChannelConnection) Register(epoch uint64, signal chan []byte) bool
Register registers a new channel for a given epoch.
func (*ChannelConnection) Release ¶
func (c *ChannelConnection) Release(epoch uint64)
Release releases a channel for a given epoch. If epoch = 0, it releases all channels and sets the connection to iddle. Released channels are closed and removed from the connection.
func (*ChannelConnection) Send ¶
func (c *ChannelConnection) Send(msg []byte)
Send sends a message to the remote node if the connection is live.
func (*ChannelConnection) Shutdown ¶
func (c *ChannelConnection) Shutdown()
func (*ChannelConnection) Sleep ¶
func (c *ChannelConnection) Sleep()
Sleep sets the connection to iddle mode, in which case messages are discarded.
type ConnectionPool ¶
type ConnectionPool map[crypto.Token]*CachedConnection
ConnectionPool is a map of cached connections to other nodes in the peer group.
func (ConnectionPool) Add ¶
func (p ConnectionPool) Add(c *CachedConnection)
Add adds a new cached connection to the connection pool.
func (ConnectionPool) Broadcast ¶
func (p ConnectionPool) Broadcast(data []byte)
Broadcast sends data to all nodes in the connection pool.
func (ConnectionPool) Drop ¶
func (p ConnectionPool) Drop(token crypto.Token)
Drop closes a connection in the connection pool.
func (ConnectionPool) DropAll ¶
func (p ConnectionPool) DropAll()
DropAll closes all connections in the connection pool.
func (ConnectionPool) DropDead ¶
func (p ConnectionPool) DropDead()
DropDead closes all dead connections in the connection pool.
type Gossip ¶
type Gossip struct { Signal chan GossipMessage // contains filtered or unexported fields }
Gossip is a group of connections where every node broadcasts to every other node new messages received. It is used as a communication primitive of the consensus committee. It should only be used for lightweight messages.
func GroupGossip ¶
func GroupGossip(epoch uint64, connections []*ChannelConnection) *Gossip
GroupGossip creates a new gossip network for a given epoch from a slice of ChannelConnections.
func (*Gossip) BroadcastExcept ¶
BroadcastExcept sends a message to all nodes in the gossip network except the one with the given token.
func (*Gossip) Messages ¶
func (g *Gossip) Messages() chan GossipMessage
Messages returns the GossipMessage channel og the network.
func (*Gossip) Release ¶
func (g *Gossip) Release()
Release releases all channel connections of associated epoch in the gossip network.
func (*Gossip) ReleaseToken ¶
Release a single channel connection of associated epoch in the gossip network.
type GossipMessage ¶
GossipMessage is a message received from the gossip network together with its author.
type PercolationPool ¶
type PercolationPool struct {
// contains filtered or unexported fields
}
PercolationPool is a pool of BufferedChannel connections to other nodes in the peer group and a percolation rule that orients how any messgae is transmitted between nodes until every node is reached.
func AssembleOwnPercolationPool ¶
func AssembleOwnPercolationPool() *PercolationPool
AssembleOwnPercolationPool creates an empty pool of connections. This is used for the case where the network is composed of a single node.
func AssemblePercolationPool ¶
func AssemblePercolationPool(ctx context.Context, peers []TokenAddr, credentials crypto.PrivateKey, port int, hostname string, rule PercolationRule, existing *PercolationPool) *PercolationPool
AssemblePercolationPool creates a pool of connections to other nodes in the peer group. It uses live connection over an existing pool if provided.
func (*PercolationPool) GetLeader ¶
func (p *PercolationPool) GetLeader(token crypto.Token) (*BufferedMultiChannel, []*BufferedMultiChannel)
func (*PercolationPool) Send ¶
func (b *PercolationPool) Send(epoch uint64, data []byte)
Send sends a message to all nodes designated in the percolation rule.
type PercolationRule ¶
PercolationRule defines a rule for diffusion of block data among validators. At a given epoch a node will be required to broadcast data to a subsect of other nodes.
func MergeRules ¶
func MergeRules(r ...PercolationRule) PercolationRule
MergeRules combines several rules into a single one. If a node is designated by any of the rules for a given epoch, it will be included in the merged rule.
type SignedConnection ¶
type SignedConnection struct { Token crypto.Token Address string Live bool // contains filtered or unexported fields }
SignedConnection is the key type of this package. It implements the Reader, Sender, Closer interface providing a simple interface to send and receive signed messages.
func Dial ¶
func Dial(hostname, address string, credentials crypto.PrivateKey, token crypto.Token) (*SignedConnection, error)
Dial tries to establish a signed connection to the given address. Hostname should be "localhost" or "" for intertnet connections. Should be anything else for local machine connections for testing. Address must have the form "address:port". Credentials is the private key of the party dialing. Token is the token of the party beeing dialed. It returns the signed connection or a nil and an errror.
func DialCtx ¶
func DialCtx(ctx context.Context, hostname, address string, credentials crypto.PrivateKey, token crypto.Token) (*SignedConnection, error)
func DialTCP ¶
func DialTCP(laddr, raddr *net.TCPAddr, credentials crypto.PrivateKey, token crypto.Token) (*SignedConnection, error)
func PromoteConnection ¶
func PromoteConnection(conn net.Conn, prvKey crypto.PrivateKey, validator ValidateConnection) (*SignedConnection, error)
PromoteConnection promotes a connection to a signed connection. It performs the handshake and returns a SignedConnection if the handshake is successful.
func (*SignedConnection) Listen ¶
func (s *SignedConnection) Listen(newMessages chan Message, shutdown chan crypto.Token)
Helper function that Reads messages from the underlying connection and send them to the given channel identifying the token of the connection.
func (*SignedConnection) Read ¶
func (s *SignedConnection) Read() ([]byte, error)
Read reads a message from the underlying connection. It first reads the size of the message, than it reads the entire message and checks the signature. It returns an ErrInvalidSignature error if it could read but signature does not match.
func (*SignedConnection) Send ¶
func (s *SignedConnection) Send(msg []byte) error
Send up to 1<<32 - 1 bytes of data. It returns an error if the message is larger than 1<<32 - 1 bytes or if the underlying connection cannot send data.
func (*SignedConnection) Shutdown ¶
func (s *SignedConnection) Shutdown()
Shutdown graciously closed the connection.
type TokenComparer ¶
TokenComparer is an interface for comparing a token to a given token. The pool assemblage will use this to check if a given token is already connected.
type TrustedAggregator ¶
type TrustedAggregator struct { Activate chan *SignedConnection // contains filtered or unexported fields }
TrustedAggregator mantains a sample of connections preferrably to trusted tokens.
func NewTrustedAgregator ¶
func NewTrustedAgregator(ctx context.Context, hostname string, credentials crypto.PrivateKey, size int, trusted, available []TokenAddr, connections ...*SignedConnection) *TrustedAggregator
func (*TrustedAggregator) Read ¶
func (t *TrustedAggregator) Read() ([]byte, error)
func (*TrustedAggregator) SendAll ¶
func (t *TrustedAggregator) SendAll(msg []byte)
func (*TrustedAggregator) Shutdown ¶
func (t *TrustedAggregator) Shutdown()
func (*TrustedAggregator) TryAddProvider ¶
func (t *TrustedAggregator) TryAddProvider() *SignedConnection
TryAddProvider will try to add a new provider to the aggregator. It will try to add a trusted provider first, then an untrusted provider, and finally a closed provider. If it cannot add any provider, it will return nil.
type ValidateConnection ¶
type ValidateConnection interface { ValidateConnection(token crypto.Token) chan bool String() string }
ValidateConnection is an interface used by handshake protocol to confirm if a given token is accredited with rights to establish the connection.
type ValidateSingleConnection ¶
An implementation with ValidateConnection interface that accepts only connection from a single token.
func (ValidateSingleConnection) String ¶
func (v ValidateSingleConnection) String() string
func (ValidateSingleConnection) ValidateConnection ¶
func (v ValidateSingleConnection) ValidateConnection(token crypto.Token) chan bool
ValidateConnection returns a channel with true if the given token is equal to the token assocated with ValidateSingleConnection and false otherwise.