Documentation ¶
Index ¶
- Constants
- Variables
- func ByteSliceEqualBCE(a, b []byte) bool
- func CheckClientVersionCompatibility(v1, v2 string) bool
- func CreateEd25519Key() (crypto.PrivKey, error)
- func GenUpgrader(n *swarm.Swarm) *tptu.Upgrader
- func HasKey(key string) bool
- func HasRecvMessage(s *Stream, hash uint32) bool
- func LoadNetworkKeyFromFile(path string) (crypto.PrivKey, error)
- func LoadNetworkKeyFromFileOrCreateNew(path string) (crypto.PrivKey, error)
- func MarshalNetworkKey(key crypto.PrivKey) (string, error)
- func NewcloudcardMessage(networkID uint32, chainID uint32, reserved []byte, version byte, ...) (*cloudcardMessage, error)
- func NewcloudcardService(conf *config.Config) (*cloudcardService, error)
- func ParseFromIPFSAddr(ipfsAddr ma.Multiaddr) (peer.ID, ma.Multiaddr, error)
- func ParsecloudcardMessage(networkID uint32, data []byte) (*cloudcardMessage, error)
- func RecordKey(key string)
- func RecordRecvMessage(s *Stream, hash uint32)
- func SetNetConfig(conf *config.Config, netCfg *NetConfig)
- func UnmarshalNetworkKey(data string) (crypto.PrivKey, error)
- type BaseMessage
- type ChainSyncPeersFilter
- type Config
- type Dispatcher
- type Message
- type MessageWeight
- type NetConfig
- type Node
- func (node *Node) AddPeer(id peer.ID)
- func (node *Node) AllPeerIds() []string
- func (node *Node) BroadcastMessage(messageName string, data Serializable, priority int)
- func (node *Node) BucketSize() int
- func (node *Node) ID() string
- func (node *Node) Members() []peer.ID
- func (node *Node) RelayMessage(messageName string, data Serializable, priority int)
- func (node *Node) RemovePeer(id peer.ID)
- func (node *Node) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error
- func (node *Node) SendMessageToPeerOrBroadcast(messageName string, messageContent Serializable, priority int, peerID string) error
- func (node *Node) SetcloudcardService(ns *cloudcardService)
- func (node *Node) Start() error
- func (node *Node) Stop()
- func (node *Node) StreamManager() *StreamManager
- func (node *Node) Synchronized() bool
- type PeerFilterAlgorithm
- type PeersSlice
- type RandomPeerFilter
- type RouteTable
- func (table *RouteTable) AddIPFSPeerAddr(addr ma.Multiaddr)
- func (table *RouteTable) AddPeer(pid peer.ID, addr ma.Multiaddr)
- func (table *RouteTable) AddPeerInfo(prettyID string, addrStr []string) error
- func (table *RouteTable) AddPeerStream(s *Stream)
- func (table *RouteTable) AddPeers(pid string, peers *netpb.Peers)
- func (table *RouteTable) GetRandomPeers(pid peer.ID) []peer.AddrInfo
- func (table *RouteTable) LoadInternalNodeList()
- func (table *RouteTable) LoadRouteTableFromFile()
- func (table *RouteTable) LoadSeedNodes()
- func (table *RouteTable) RemovePeerStream(s *Stream)
- func (table *RouteTable) SaveRouteTableToFile()
- func (table *RouteTable) Start()
- func (table *RouteTable) Stop()
- func (table *RouteTable) SyncRouteTable()
- func (table *RouteTable) SyncWithPeer(pid peer.ID)
- type Serializable
- type Service
- type Stream
- func (s *Stream) Bye()
- func (s *Stream) Connect() error
- func (s *Stream) Hello() error
- func (s *Stream) IsConnected() bool
- func (s *Stream) IsHandshakeSucceed() bool
- func (s *Stream) Ok() error
- func (s *Stream) RouteTable() error
- func (s *Stream) SendMessage(messageName string, data []byte, priority int) error
- func (s *Stream) SendProtoMessage(messageName string, pb proto.Message, priority int) error
- func (s *Stream) StartLoop()
- func (s *Stream) String() string
- func (s *Stream) SyncRoute() error
- func (s *Stream) Write(data []byte) error
- func (s *Stream) WriteMessage(messageName string, data []byte, reservedClientFlag byte) error
- func (s *Stream) WriteProtoMessage(messageName string, pb proto.Message, reservedClientFlag byte) error
- func (s *Stream) WritecloudcardMessage(message *cloudcardMessage) error
- type StreamManager
- func (sm *StreamManager) ActivePeersCount() uint32
- func (sm *StreamManager) Add(s network.Stream, node *Node)
- func (sm *StreamManager) AddStream(stream *Stream)
- func (sm *StreamManager) BroadcastMessage(messageName string, messageContent Serializable, priority int)
- func (sm *StreamManager) CloseStream(peerID string, reason error)
- func (sm *StreamManager) Find(pid peer.ID) *Stream
- func (sm *StreamManager) GetStreamByPeerID(peerID string) *Stream
- func (sm *StreamManager) RelayMessage(messageName string, messageContent Serializable, priority int)
- func (sm *StreamManager) RemoveStream(s *Stream)
- func (sm *StreamManager) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
- func (sm *StreamManager) Start()
- func (sm *StreamManager) Stop()
- type StreamValue
- type StreamValueSlice
- type Subscriber
Constants ¶
const ( // Consider that a block is too large in sync. MaxcloudcardMessageDataLength = 512 * 1024 * 1024 // 512m. MaxcloudcardMessageNameLength = 24 - 12 // 12. DefaultReservedFlag = 0x0 ReservedCompressionEnableFlag = 0x80 ReservedCompressionClientFlag = 0x40 )
const ( InitBucketCapacity = 64 InitRoutingTableMaxLatency = 10 InitPrivateKeyPath = "conf/network/key" InitMaxSyncNodes = 64 InitChainID = 1 InitMaxStreamNum = 210 InitReservedStreamNum = 20 MajorNetworkID = 1 RTNTNetworkID = 2 Thirty = 30 * time.Second ThreeMinutes = 3 * 60 * time.Second Network = "network" )
const
const ( ClientVersion = "0.1.0" HELLO = "hello" OK = "ok" BYE = "bye" SYNCROUTE = "syncroute" ROUTETABLE = "routetable" CurrentVersion = 0x0 )
Stream Message Type
const ( MessagePriorityHigh = iota MessagePriorityNormal MessagePriorityLow )
Message Priority.
const ( ChunkHeadersRequest = "sync" // ChainSync ChunkHeadersResponse = "chunks" // ChainChunks ChunkDataRequest = "getchunk" // ChainGetChunk ChunkDataResponse = "chunkdata" // ChainChunkData ProposeLearnRequest = "getpropose" //ProposeLearnRequest ProposeLearnResponse = "propose" //ProposeLearnResponse StandBy = "standby" // paxos ProposalID = "proposalID" Promise = "promise" Accept = "accept" Accepted = "accepted" LatestPropose = "newPropose" // pbft PbftPreprepare = "preprepare" PbftPrepare = "prepare" PbftCommit = "commit" )
Sync Message Type
const ( MessageWeightZero = MessageWeight(0) MessageWeightNewTx MessageWeightNewBlock = MessageWeight(0.5) MessageWeightRouteTable MessageWeightChainChunks MessageWeightChainChunkData MessageWeightPropose MessageWeightStandby )
const
const (
CleanupInterval = time.Second * 60
)
const MemberSize int = 3
Variables ¶
var ( MagicNumber = []byte{0x43, 0x43, 0x4D, 0x4E} RTNTMagicNumber = []byte{0x43, 0x43, 0x54, 0x4E} DefaultReserved = []byte{DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag} CompressionReserved = []byte{DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag | ReservedCompressionEnableFlag} ErrInsufficientMessageHeaderLength = errors.New("insufficient message header length") ErrInsufficientMessageDataLength = errors.New("insufficient message data length") ErrInvalidMagicNumber = errors.New("invalid magic number") ErrInvalidHeaderCheckSum = errors.New("invalid header checksum") ErrInvalidDataCheckSum = errors.New("invalid data checksum") ErrExceedMaxDataLength = errors.New("exceed max data length") ErrExceedMaxMessageNameLength = errors.New("exceed max message name length") ErrUncompressMessageFailed = errors.New("uncompress message failed") ErrInvalidNetworkID = errors.New("invalid network id") )
var ( DefaultListen = []string{"0.0.0.0:8888"} RouteTableSyncLoopInterval = Thirty RouteTableSaveToDiskInterval = ThreeMinutes RouteTableCacheFileName = "routetable.cache" RouteTableInternalNodeFileName = "conf/internal.txt" MaxPeersCountForSyncResp = 32 )
Default Configuration in P2P network
var ( ErrShouldCloseConnectionAndExitLoop = errors.New("should close connection and exit loop") ErrStreamIsNotConnected = errors.New("stream is not connected") )
Stream Errors
var ( ErrExceedMaxStreamNum = errors.New("too many streams connected") ErrElimination = errors.New("eliminated for low value") ErrDeprecatedStream = errors.New("deprecated stream") )
var ( ErrListenPortIsNotAvailable = errors.New("listen port is not available") ErrConfigLackNetWork = errors.New("config.conf should has network") )
Errors
var (
ErrExceedMaxSyncRouteResponse = errors.New("too many sync route table response")
)
Route Table Errors
var (
ErrPeerIsNotConnected = errors.New("peer is not connected")
)
Error types
Functions ¶
func ByteSliceEqualBCE ¶
ByteSliceEqualBCE determines whether two byte arrays are equal.
func CheckClientVersionCompatibility ¶
CheckClientVersionCompatibility if two clients are compatible If the clientVersion of node A is X.Y.Z, then node B must be X.Y.{} to be compatible with A.
func CreateEd25519Key ¶
GenerateEd25519Key return a new generated Ed22519 Private key.
func GenUpgrader ¶
GenUpgrader creates a new connection upgrader for use with this swarm.
func HasRecvMessage ¶
HasRecvMessage check if the received message exists before
func LoadNetworkKeyFromFile ¶
LoadNetworkKeyFromFile load network priv key from file.
func LoadNetworkKeyFromFileOrCreateNew ¶
LoadNetworkKeyFromFileOrCreateNew load network priv key from file or create new one.
func MarshalNetworkKey ¶
MarshalNetworkKey marshal network key.
func NewcloudcardMessage ¶
func NewcloudcardMessage(networkID uint32, chainID uint32, reserved []byte, version byte, messageName string, data []byte) (*cloudcardMessage, error)
NewcloudcardMessage new cloudcard message
func NewcloudcardService ¶
NewcloudcardService create netService
func ParseFromIPFSAddr ¶
ParseFromIPFSAddr return pid and address parsed from ipfs address
func ParsecloudcardMessage ¶
ParsecloudcardMessage parse cloudcard message
func RecordRecvMessage ¶
RecordRecvMessage records received message
func SetNetConfig ¶
Types ¶
type BaseMessage ¶
type BaseMessage struct {
// contains filtered or unexported fields
}
BaseMessage base message
func (*BaseMessage) MessageFrom ¶
func (msg *BaseMessage) MessageFrom() string
MessageFrom get message who send
func (*BaseMessage) MessageType ¶
func (msg *BaseMessage) MessageType() string
MessageType get message type
type ChainSyncPeersFilter ¶
type ChainSyncPeersFilter struct { }
ChainSyncPeersFilter will filter some peers randomly
func (*ChainSyncPeersFilter) Filter ¶
func (filter *ChainSyncPeersFilter) Filter(peers PeersSlice) PeersSlice
Filter implemets PeerFilterAlgorithm interface
type Config ¶
type Config struct { NetworkID uint32 BucketSize int Latency time.Duration BootNodes []multiaddr.Multiaddr PrivateKeyPath string Listen []string MaxSyncNodes int ChainID uint32 RoutingTableDir string StreamLimits uint32 ReservedStreamLimits uint32 }
func NewConfigFromDefaults ¶
func NewConfigFromDefaults() *Config
NewConfigFromDefaults return new config from defaults.
func NewP2PConfig ¶
NewP2PConfig return new config object.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher a message dispatcher service.
func (*Dispatcher) Deregister ¶
func (dp *Dispatcher) Deregister(subscribers ...*Subscriber)
Deregister deregister subscribers.
func (*Dispatcher) PutMessage ¶
func (dp *Dispatcher) PutMessage(msg Message)
PutMessage put new message to chan, then subscribers will be notified to process.
func (*Dispatcher) Register ¶
func (dp *Dispatcher) Register(subscribers ...*Subscriber)
Register register subscribers.
type NetConfig ¶
type NetConfig struct { Seed []string `yaml:"seed"` Listen []string `yaml:"listen"` NetworkId uint32 `yaml:"network_id"` PrivateKey string `yaml:"private_key"` StreamLimits uint32 `yaml:"stream_limits"` ReservedStreamLimits uint32 `yaml:"reserved_stream_limits"` RouteTableCacheFileName string `yaml:"route_table_cache_filename"` }
func GetNetConfig ¶
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node the node can be used as both the client and the server
func (*Node) AllPeerIds ¶
func (*Node) BroadcastMessage ¶
func (node *Node) BroadcastMessage(messageName string, data Serializable, priority int)
BroadcastMessage broadcast message.
func (*Node) BucketSize ¶
BucketSize return node routeTable's bucket size
func (*Node) RelayMessage ¶
func (node *Node) RelayMessage(messageName string, data Serializable, priority int)
RelayMessage relay message.
func (*Node) RemovePeer ¶
func (*Node) SendMessageToPeer ¶
func (node *Node) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error
SendMessageToPeer send message to a peer.
func (*Node) SendMessageToPeerOrBroadcast ¶
func (node *Node) SendMessageToPeerOrBroadcast(messageName string, messageContent Serializable, priority int, peerID string) error
SendMessageToPeer send message to a peer by peerId
func (*Node) SetcloudcardService ¶
func (node *Node) SetcloudcardService(ns *cloudcardService)
SetcloudcardService set netService
func (*Node) StreamManager ¶
func (node *Node) StreamManager() *StreamManager
StreamManager return node streamManager
func (*Node) Synchronized ¶
Synchronized return node synchronized status
type PeerFilterAlgorithm ¶
type PeerFilterAlgorithm interface {
Filter(PeersSlice) PeersSlice
}
PeerFilterAlgorithm is the algorithm used to filter peers
type RandomPeerFilter ¶
type RandomPeerFilter struct { }
RandomPeerFilter will filter a peer randomly
func (*RandomPeerFilter) Filter ¶
func (filter *RandomPeerFilter) Filter(peers PeersSlice) PeersSlice
Filter implemets PeerFilterAlgorithm interface
type RouteTable ¶
type RouteTable struct {
// contains filtered or unexported fields
}
RouteTable route table struct.
func NewRouteTable ¶
func NewRouteTable(config *Config, node *Node) *RouteTable
NewRouteTable new route table.
func (*RouteTable) AddIPFSPeerAddr ¶
func (table *RouteTable) AddIPFSPeerAddr(addr ma.Multiaddr)
AddIPFSPeerAddr add a peer to route table with ipfs address.
func (*RouteTable) AddPeer ¶
func (table *RouteTable) AddPeer(pid peer.ID, addr ma.Multiaddr)
AddPeer add peer to route table.
func (*RouteTable) AddPeerInfo ¶
func (table *RouteTable) AddPeerInfo(prettyID string, addrStr []string) error
AddPeerInfo add peer to route table.
func (*RouteTable) AddPeerStream ¶
func (table *RouteTable) AddPeerStream(s *Stream)
AddPeerStream add peer stream to peerStore.
func (*RouteTable) AddPeers ¶
func (table *RouteTable) AddPeers(pid string, peers *netpb.Peers)
AddPeers add peers to route table
func (*RouteTable) GetRandomPeers ¶
func (table *RouteTable) GetRandomPeers(pid peer.ID) []peer.AddrInfo
GetRandomPeers get random peers
func (*RouteTable) LoadInternalNodeList ¶
func (table *RouteTable) LoadInternalNodeList()
LoadInternalNodeList Load Internal Node list from file
func (*RouteTable) LoadRouteTableFromFile ¶
func (table *RouteTable) LoadRouteTableFromFile()
LoadRouteTableFromFile load route table from file.
func (*RouteTable) LoadSeedNodes ¶
func (table *RouteTable) LoadSeedNodes()
LoadSeedNodes load seed nodes.
func (*RouteTable) RemovePeerStream ¶
func (table *RouteTable) RemovePeerStream(s *Stream)
RemovePeerStream remove peerStream from peerStore.
func (*RouteTable) SaveRouteTableToFile ¶
func (table *RouteTable) SaveRouteTableToFile()
SaveRouteTableToFile save route table to file.
func (*RouteTable) SyncRouteTable ¶
func (table *RouteTable) SyncRouteTable()
SyncRouteTable sync route table.
func (*RouteTable) SyncWithPeer ¶
func (table *RouteTable) SyncWithPeer(pid peer.ID)
SyncWithPeer sync route table with a peer.
type Serializable ¶
Serializable model
type Service ¶
type Service interface { Start() error Stop() Node() *Node Register(...*Subscriber) Deregister(...*Subscriber) Broadcast(string, Serializable, int) Relay(string, Serializable, int) SendMessage(string, []byte, string, int) error SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error SendMessageToPeerOrBroadcast(messageName string, messageContent Serializable, priority int, peerID string) error ClosePeer(peerID string, reason error) }
Service net Service interface
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream define the structure of a stream in p2p network
func NewStreamFromPID ¶
NewStreamFromPID return a new Stream based on the pid
func (*Stream) IsConnected ¶
IsConnected return if the stream is connected
func (*Stream) IsHandshakeSucceed ¶
IsHandshakeSucceed return if the handshake in the stream succeed
func (*Stream) SendMessage ¶
SendMessage send msg to buffer
func (*Stream) SendProtoMessage ¶
SendProtoMessage send proto msg to buffer
func (*Stream) WriteMessage ¶
WriteMessage write raw msg in the stream
func (*Stream) WriteProtoMessage ¶
func (s *Stream) WriteProtoMessage(messageName string, pb proto.Message, reservedClientFlag byte) error
WriteProtoMessage write proto msg in the stream
func (*Stream) WritecloudcardMessage ¶
WritecloudcardMessage write cloudcard msg in the stream
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages all streams
func NewStreamManager ¶
func NewStreamManager(config *Config) *StreamManager
NewStreamManager return a new stream manager
func (*StreamManager) ActivePeersCount ¶
func (sm *StreamManager) ActivePeersCount() uint32
func (*StreamManager) Add ¶
func (sm *StreamManager) Add(s network.Stream, node *Node)
Add a new stream into the stream manager
func (*StreamManager) AddStream ¶
func (sm *StreamManager) AddStream(stream *Stream)
func (*StreamManager) BroadcastMessage ¶
func (sm *StreamManager) BroadcastMessage(messageName string, messageContent Serializable, priority int)
BroadcastMessage broadcast the message
func (*StreamManager) CloseStream ¶
func (sm *StreamManager) CloseStream(peerID string, reason error)
CloseStream with the given pid and reason
func (*StreamManager) Find ¶
func (sm *StreamManager) Find(pid peer.ID) *Stream
Find the stream with the given pid
func (*StreamManager) GetStreamByPeerID ¶
func (sm *StreamManager) GetStreamByPeerID(peerID string) *Stream
FindByPeerID find the stream with the given peerID
func (*StreamManager) RelayMessage ¶
func (sm *StreamManager) RelayMessage(messageName string, messageContent Serializable, priority int)
RelayMessage relay the message
func (*StreamManager) RemoveStream ¶
func (sm *StreamManager) RemoveStream(s *Stream)
RemoveStream from the stream manager
func (*StreamManager) SendMessageToPeers ¶
func (sm *StreamManager) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
SendMessageToPeers send the message to the peers filtered by the filter algorithm
type StreamValue ¶
type StreamValue struct {
// contains filtered or unexported fields
}
StreamValue value of stream in the past CleanupInterval
func (*StreamValue) String ¶
func (s *StreamValue) String() string
type StreamValueSlice ¶
type StreamValueSlice []*StreamValue
StreamValueSlice StreamValue slice
func (StreamValueSlice) Len ¶
func (s StreamValueSlice) Len() int
func (StreamValueSlice) Less ¶
func (s StreamValueSlice) Less(i, j int) bool
func (StreamValueSlice) Swap ¶
func (s StreamValueSlice) Swap(i, j int)
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber subscriber.
func NewSubscriber ¶
func NewSubscriber(id interface{}, msgChan chan Message, doFilter bool, msgType string, weight MessageWeight) *Subscriber
NewSubscriber return new Subscriber instance.
func (*Subscriber) MessageChan ¶
func (s *Subscriber) MessageChan() chan Message
MessageChan return msgChan.
func (*Subscriber) MessageType ¶
func (s *Subscriber) MessageType() string
MessageType return msgTypes.
func (*Subscriber) MessageWeight ¶
func (s *Subscriber) MessageWeight() MessageWeight
MessageWeight return weight of msgType