Documentation ¶
Index ¶
- Constants
- Variables
- func ByteSliceEqualBCE(a, b []byte) bool
- func CheckClientVersionCompatibility(v1, v2 string) bool
- func GenerateEd25519Key() (crypto.PrivKey, error)
- func HasKey(key string) bool
- func HasRecvMessage(s *Stream, hash uint32) bool
- func LoadNetworkKeyFromFile(path string) (crypto.PrivKey, error)
- func LoadNetworkKeyFromFileOrCreateNew(path string) (crypto.PrivKey, error)
- func MarshalNetworkKey(key crypto.PrivKey) (string, error)
- func ParseFromIPFSAddr(ipfsAddr ma.Multiaddr) (peer.ID, ma.Multiaddr, error)
- func RecordKey(key string)
- func RecordRecvMessage(s *Stream, hash uint32)
- func UnmarshalNetworkKey(data string) (crypto.PrivKey, error)
- type BaseMessage
- type ChainSyncPeersFilter
- type Config
- type Dispatcher
- type Message
- type MessageType
- type MessageWeight
- type NebMessage
- func (message *NebMessage) ChainID() uint32
- func (message *NebMessage) Content() []byte
- func (message *NebMessage) Data() ([]byte, error)
- func (message *NebMessage) DataCheckSum() uint32
- func (message *NebMessage) DataLength() uint32
- func (message *NebMessage) FlagSendMessageAt()
- func (message *NebMessage) FlagWriteMessageAt()
- func (message *NebMessage) HeaderCheckSum() uint32
- func (message *NebMessage) HeaderWithoutCheckSum() []byte
- func (message *NebMessage) LatencyFromSendToWrite() int64
- func (message *NebMessage) Length() uint64
- func (message *NebMessage) MagicNumber() []byte
- func (message *NebMessage) MessageName() string
- func (message *NebMessage) OriginalData() []byte
- func (message *NebMessage) ParseMessageData(data []byte) error
- func (message *NebMessage) Reserved() []byte
- func (message *NebMessage) VerifyData() error
- func (message *NebMessage) VerifyHeader() error
- func (message *NebMessage) Version() byte
- type NebService
- func (ns *NebService) Broadcast(name string, msg Serializable, priority int)
- func (ns *NebService) BroadcastNetworkID(msg []byte)
- func (ns *NebService) ClosePeer(peerID string, reason error)
- func (ns *NebService) Deregister(subscribers ...*Subscriber)
- func (ns *NebService) Node() *Node
- func (ns *NebService) PutMessage(msg Message)
- func (ns *NebService) Register(subscribers ...*Subscriber)
- func (ns *NebService) Relay(name string, msg Serializable, priority int)
- func (ns *NebService) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error
- func (ns *NebService) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
- func (ns *NebService) SendMsg(msgName string, msg []byte, target string, priority int) error
- func (ns *NebService) Start() error
- func (ns *NebService) Stop()
- type Neblet
- type Node
- func (node *Node) BroadcastMessage(messageName string, data Serializable, priority int)
- func (node *Node) Config() *Config
- func (node *Node) ID() string
- func (node *Node) IsSynchronizing() bool
- func (node *Node) PeersCount() int32
- func (node *Node) RelayMessage(messageName string, data Serializable, priority int)
- func (node *Node) RouteTable() *RouteTable
- func (node *Node) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error
- func (node *Node) SetNebService(ns *NebService)
- func (node *Node) SetSynchronizing(synchronizing bool)
- func (node *Node) Start() error
- func (node *Node) Stop()
- type PeerFilterAlgorithm
- type PeersSlice
- type RandomPeerFilter
- type RouteTable
- func (table *RouteTable) AddIPFSPeerAddr(addr ma.Multiaddr)
- func (table *RouteTable) AddPeer(pid peer.ID, addr ma.Multiaddr)
- func (table *RouteTable) AddPeerInfo(prettyID string, addrStr []string) error
- func (table *RouteTable) AddPeerStream(s *Stream)
- func (table *RouteTable) AddPeers(pid string, peers *netpb.Peers)
- func (table *RouteTable) GetRandomPeers(pid peer.ID) []peerstore.PeerInfo
- func (table *RouteTable) LoadInternalNodeList()
- func (table *RouteTable) LoadRouteTableFromFile()
- func (table *RouteTable) LoadSeedNodes()
- func (table *RouteTable) Peers() map[peer.ID][]ma.Multiaddr
- func (table *RouteTable) RemovePeerStream(s *Stream)
- func (table *RouteTable) SaveRouteTableToFile()
- func (table *RouteTable) Start()
- func (table *RouteTable) Stop()
- func (table *RouteTable) SyncRouteTable()
- func (table *RouteTable) SyncWithPeer(pid peer.ID)
- type Serializable
- type Service
- type Stream
- func (s *Stream) Bye()
- func (s *Stream) Connect() error
- func (s *Stream) Hello() error
- func (s *Stream) IsConnected() bool
- func (s *Stream) IsHandshakeSucceed() bool
- func (s *Stream) Ok() error
- func (s *Stream) RouteTable() error
- func (s *Stream) SendMessage(messageName string, data []byte, priority int) error
- func (s *Stream) SendProtoMessage(messageName string, pb proto.Message, priority int) error
- func (s *Stream) StartLoop()
- func (s *Stream) String() string
- func (s *Stream) SyncRoute() error
- func (s *Stream) Write(data []byte) error
- func (s *Stream) WriteMessage(messageName string, data []byte, reservedClientFlag byte) error
- func (s *Stream) WriteNebMessage(message *NebMessage) error
- func (s *Stream) WriteProtoMessage(messageName string, pb proto.Message, reservedClientFlag byte) error
- type StreamManager
- func (sm *StreamManager) Add(s libnet.Stream, node *Node)
- func (sm *StreamManager) AddStream(stream *Stream)
- func (sm *StreamManager) BroadcastMessage(messageName string, messageContent Serializable, priority int)
- func (sm *StreamManager) CloseStream(peerID string, reason error)
- func (sm *StreamManager) Count() int32
- func (sm *StreamManager) Find(pid peer.ID) *Stream
- func (sm *StreamManager) FindByPeerID(peerID string) *Stream
- func (sm *StreamManager) RelayMessage(messageName string, messageContent Serializable, priority int)
- func (sm *StreamManager) RemoveStream(s *Stream)
- func (sm *StreamManager) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
- func (sm *StreamManager) Start()
- func (sm *StreamManager) Stop()
- type StreamValue
- type StreamValueSlice
- type Subscriber
Constants ¶
const ( DefaultBucketCapacity = 64 DefaultRoutingTableMaxLatency = 10 DefaultPrivateKeyPath = "conf/network.key" DefaultMaxSyncNodes = 64 DefaultChainID = 1 DefaultRoutingTableDir = "" DefaultMaxStreamNum = 200 DefaultReservedStreamNum = 20 )
const
const ( NebMessageMagicNumberEndIdx = 4 NebMessageChainIDEndIdx = 8 NebMessageReservedEndIdx = 11 NebMessageVersionIndex = 11 NebMessageVersionEndIdx = 12 NebMessageNameEndIdx = 24 NebMessageDataLengthEndIdx = 28 NebMessageDataCheckSumEndIdx = 32 NebMessageHeaderCheckSumEndIdx = 36 NebMessageHeaderLength = 36 // Consider that a block is too large in sync. MaxNebMessageDataLength = 512 * 1024 * 1024 // 512m. MaxNebMessageNameLength = 24 - 12 // 12. DefaultReservedFlag = 0x0 ReservedCompressionEnableFlag = 0x80 ReservedCompressionClientFlag = 0x1 )
NebMessage defines protocol in Nebulas, we define our own wire protocol, as the following:
0 1 2 3 (bytes) 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Magic Number | +---------------------------------------------------------------+ | Chain ID | +-----------------------------------------------+---------------+ | Reserved | Version | +-----------------------------------------------+---------------+ | | + + | Message Name | + + | | +---------------------------------------------------------------+ | Data Length | +---------------------------------------------------------------+ | Data Checksum | +---------------------------------------------------------------+ | Header Checksum | |---------------------------------------------------------------+ | | + Data + . . | | +---------------------------------------------------------------+
const
const ( ClientVersion = "0.3.0" NebProtocolID = "/neb/1.0.0" HELLO = "hello" OK = "ok" BYE = "bye" SYNCROUTE = "syncroute" ROUTETABLE = "routetable" RECVEDMSG = "recvedmsg" CurrentVersion = 0x0 )
Stream Message Type
const ( MessagePriorityHigh = iota MessagePriorityNormal MessagePriorityLow )
Message Priority.
const ( ChunkHeadersRequest = "sync" // ChainSync ChunkHeadersResponse = "chunks" // ChainChunks ChunkDataRequest = "getchunk" // ChainGetChunk ChunkDataResponse = "chunkdata" // ChainChunkData )
Sync Message Type
const ( MessageWeightZero = MessageWeight(0) MessageWeightNewTx MessageWeightNewBlock = MessageWeight(0.5) MessageWeightRouteTable MessageWeightChainChunks MessageWeightChainChunkData )
const
const (
CleanupInterval = time.Second * 60
)
const
Variables ¶
var ( DefaultListen = []string{"0.0.0.0:8680"} RouteTableSyncLoopInterval = 30 * time.Second RouteTableSaveToDiskInterval = 3 * 60 * time.Second RouteTableCacheFileName = "routetable.cache" RouteTableInternalNodeFileName = "conf/internal_list.txt" MaxPeersCountForSyncResp = 32 )
Default Configuration in P2P network
var ( MagicNumber = []byte{0x4e, 0x45, 0x42, 0x31} DefaultReserved = []byte{DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag} CurrentReserved = []byte{DefaultReservedFlag | ReservedCompressionEnableFlag, DefaultReservedFlag, DefaultReservedFlag} ErrInsufficientMessageHeaderLength = errors.New("insufficient message header length") ErrInsufficientMessageDataLength = errors.New("insufficient message data length") ErrInvalidMagicNumber = errors.New("invalid magic number") ErrInvalidHeaderCheckSum = errors.New("invalid header checksum") ErrInvalidDataCheckSum = errors.New("invalid data checksum") ErrExceedMaxDataLength = errors.New("exceed max data length") ErrExceedMaxMessageNameLength = errors.New("exceed max message name length") ErrUncompressMessageFailed = errors.New("uncompress message failed") )
Error types
var ( ErrShouldCloseConnectionAndExitLoop = errors.New("should close connection and exit loop") ErrStreamIsNotConnected = errors.New("stream is not connected") )
Stream Errors
var ( ErrExceedMaxStreamNum = errors.New("too many streams connected") ErrElimination = errors.New("eliminated for low value") ErrDeprecatedStream = errors.New("deprecated stream") )
var
var ( ErrListenPortIsNotAvailable = errors.New("listen port is not available") ErrConfigLackNetWork = errors.New("config.conf should has network") )
Errors
var (
ErrExceedMaxSyncRouteResponse = errors.New("too many sync route table response")
)
Route Table Errors
var (
ErrPeerIsNotConnected = errors.New("peer is not connected")
)
Error types
var (
ErrPeersIsNotEnough = errors.New("peers is not enough")
)
Sync Errors
Functions ¶
func ByteSliceEqualBCE ¶ added in v1.0.2
ByteSliceEqualBCE determines whether two byte arrays are equal.
func CheckClientVersionCompatibility ¶ added in v1.0.0
CheckClientVersionCompatibility if two clients are compatible If the clientVersion of node A is X.Y.Z, then node B must be X.Y.{} to be compatible with A.
func GenerateEd25519Key ¶ added in v1.0.0
GenerateEd25519Key return a new generated Ed22519 Private key.
func HasRecvMessage ¶ added in v1.0.0
HasRecvMessage check if the received message exists before
func LoadNetworkKeyFromFile ¶ added in v1.0.0
LoadNetworkKeyFromFile load network priv key from file.
func LoadNetworkKeyFromFileOrCreateNew ¶ added in v1.0.0
LoadNetworkKeyFromFileOrCreateNew load network priv key from file or create new one.
func MarshalNetworkKey ¶ added in v1.0.0
MarshalNetworkKey marshal network key.
func ParseFromIPFSAddr ¶ added in v1.0.0
ParseFromIPFSAddr return pid and address parsed from ipfs address
func RecordRecvMessage ¶ added in v1.0.0
RecordRecvMessage records received message
Types ¶
type BaseMessage ¶ added in v1.0.0
type BaseMessage struct {
// contains filtered or unexported fields
}
BaseMessage base message
func (*BaseMessage) Data ¶ added in v1.0.0
func (msg *BaseMessage) Data() []byte
Data get the message data
func (*BaseMessage) Hash ¶ added in v1.0.0
func (msg *BaseMessage) Hash() string
Hash return the message hash
func (*BaseMessage) MessageFrom ¶ added in v1.0.0
func (msg *BaseMessage) MessageFrom() string
MessageFrom get message who send
func (*BaseMessage) MessageType ¶ added in v1.0.0
func (msg *BaseMessage) MessageType() string
MessageType get message type
func (*BaseMessage) String ¶ added in v1.0.0
func (msg *BaseMessage) String() string
String get the message to string
type ChainSyncPeersFilter ¶ added in v1.0.0
type ChainSyncPeersFilter struct { }
ChainSyncPeersFilter will filter some peers randomly
func (*ChainSyncPeersFilter) Filter ¶ added in v1.0.0
func (filter *ChainSyncPeersFilter) Filter(peers PeersSlice) PeersSlice
Filter implemets PeerFilterAlgorithm interface
type Config ¶ added in v1.0.0
type Config struct { Bucketsize int Latency time.Duration BootNodes []multiaddr.Multiaddr PrivateKeyPath string Listen []string MaxSyncNodes int ChainID uint32 RoutingTableDir string StreamLimits int32 ReservedStreamLimits int32 }
Config TODO: move to proto config.
func NewConfigFromDefaults ¶ added in v1.0.0
func NewConfigFromDefaults() *Config
NewConfigFromDefaults return new config from defaults.
func NewP2PConfig ¶ added in v1.0.0
NewP2PConfig return new config object.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher a message dispatcher service.
func (*Dispatcher) Deregister ¶
func (dp *Dispatcher) Deregister(subscribers ...*Subscriber)
Deregister deregister subscribers.
func (*Dispatcher) PutMessage ¶
func (dp *Dispatcher) PutMessage(msg Message)
PutMessage put new message to chan, then subscribers will be notified to process.
func (*Dispatcher) Register ¶
func (dp *Dispatcher) Register(subscribers ...*Subscriber)
Register register subscribers.
type NebMessage ¶ added in v1.0.0
type NebMessage struct {
// contains filtered or unexported fields
}
NebMessage struct
func NewNebMessage ¶ added in v1.0.0
func NewNebMessage(chainID uint32, reserved []byte, version byte, messageName string, data []byte) (*NebMessage, error)
NewNebMessage new neb message
func ParseNebMessage ¶ added in v1.0.0
func ParseNebMessage(data []byte) (*NebMessage, error)
ParseNebMessage parse neb message
func (*NebMessage) ChainID ¶ added in v1.0.0
func (message *NebMessage) ChainID() uint32
ChainID return chainID
func (*NebMessage) Content ¶ added in v1.0.0
func (message *NebMessage) Content() []byte
Content return message content
func (*NebMessage) Data ¶ added in v1.0.0
func (message *NebMessage) Data() ([]byte, error)
Data return data
func (*NebMessage) DataCheckSum ¶ added in v1.0.0
func (message *NebMessage) DataCheckSum() uint32
DataCheckSum return data checkSum
func (*NebMessage) DataLength ¶ added in v1.0.0
func (message *NebMessage) DataLength() uint32
DataLength return dataLength
func (*NebMessage) FlagSendMessageAt ¶ added in v1.0.0
func (message *NebMessage) FlagSendMessageAt()
FlagSendMessageAt flag of send message time
func (*NebMessage) FlagWriteMessageAt ¶ added in v1.0.0
func (message *NebMessage) FlagWriteMessageAt()
FlagWriteMessageAt flag of write message time
func (*NebMessage) HeaderCheckSum ¶ added in v1.0.0
func (message *NebMessage) HeaderCheckSum() uint32
HeaderCheckSum return header checkSum
func (*NebMessage) HeaderWithoutCheckSum ¶ added in v1.0.0
func (message *NebMessage) HeaderWithoutCheckSum() []byte
HeaderWithoutCheckSum return header without checkSum
func (*NebMessage) LatencyFromSendToWrite ¶ added in v1.0.0
func (message *NebMessage) LatencyFromSendToWrite() int64
LatencyFromSendToWrite latency from sendMessage to writeMessage
func (*NebMessage) Length ¶ added in v1.0.0
func (message *NebMessage) Length() uint64
Length return message Length
func (*NebMessage) MagicNumber ¶ added in v1.0.0
func (message *NebMessage) MagicNumber() []byte
MagicNumber return magicNumber
func (*NebMessage) MessageName ¶ added in v1.0.0
func (message *NebMessage) MessageName() string
MessageName return message name
func (*NebMessage) OriginalData ¶ added in v1.0.2
func (message *NebMessage) OriginalData() []byte
OriginalData return original data
func (*NebMessage) ParseMessageData ¶ added in v1.0.0
func (message *NebMessage) ParseMessageData(data []byte) error
ParseMessageData parse neb message data
func (*NebMessage) Reserved ¶ added in v1.0.0
func (message *NebMessage) Reserved() []byte
Reserved return reserved
func (*NebMessage) VerifyData ¶ added in v1.0.0
func (message *NebMessage) VerifyData() error
VerifyData verify message data
func (*NebMessage) VerifyHeader ¶ added in v1.0.0
func (message *NebMessage) VerifyHeader() error
VerifyHeader verify message header
func (*NebMessage) Version ¶ added in v1.0.0
func (message *NebMessage) Version() byte
Version return version
type NebService ¶ added in v1.0.0
type NebService struct {
// contains filtered or unexported fields
}
NebService service for nebulas p2p network
func NewNebService ¶ added in v1.0.0
func NewNebService(n Neblet) (*NebService, error)
NewNebService create netService
func (*NebService) Broadcast ¶ added in v1.0.0
func (ns *NebService) Broadcast(name string, msg Serializable, priority int)
Broadcast message.
func (*NebService) BroadcastNetworkID ¶ added in v1.0.0
func (ns *NebService) BroadcastNetworkID(msg []byte)
BroadcastNetworkID broadcast networkID when changed.
func (*NebService) ClosePeer ¶ added in v1.0.0
func (ns *NebService) ClosePeer(peerID string, reason error)
ClosePeer close the stream to a peer.
func (*NebService) Deregister ¶ added in v1.0.0
func (ns *NebService) Deregister(subscribers ...*Subscriber)
Deregister Deregister the subscribers.
func (*NebService) Node ¶ added in v1.0.0
func (ns *NebService) Node() *Node
Node return the peer node
func (*NebService) PutMessage ¶ added in v1.0.0
func (ns *NebService) PutMessage(msg Message)
PutMessage put message to dispatcher.
func (*NebService) Register ¶ added in v1.0.0
func (ns *NebService) Register(subscribers ...*Subscriber)
Register register the subscribers.
func (*NebService) Relay ¶ added in v1.0.0
func (ns *NebService) Relay(name string, msg Serializable, priority int)
Relay message.
func (*NebService) SendMessageToPeer ¶ added in v1.0.0
func (ns *NebService) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error
SendMessageToPeer send message to a peer.
func (*NebService) SendMessageToPeers ¶ added in v1.0.0
func (ns *NebService) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
SendMessageToPeers send message to peers.
func (*NebService) Start ¶ added in v1.0.0
func (ns *NebService) Start() error
Start start p2p manager.
type Neblet ¶ added in v1.0.0
type Neblet interface {
Config() *nebletpb.Config
}
Neblet interface breaks cycle import dependency.
type Node ¶ added in v1.0.0
type Node struct {
// contains filtered or unexported fields
}
Node the node can be used as both the client and the server
func (*Node) BroadcastMessage ¶ added in v1.0.0
func (node *Node) BroadcastMessage(messageName string, data Serializable, priority int)
BroadcastMessage broadcast message.
func (*Node) IsSynchronizing ¶ added in v1.0.0
IsSynchronizing return node synchronizing
func (*Node) PeersCount ¶ added in v1.0.0
PeersCount return stream count.
func (*Node) RelayMessage ¶ added in v1.0.0
func (node *Node) RelayMessage(messageName string, data Serializable, priority int)
RelayMessage relay message.
func (*Node) RouteTable ¶ added in v1.0.0
func (node *Node) RouteTable() *RouteTable
RouteTable return route table.
func (*Node) SendMessageToPeer ¶ added in v1.0.0
func (node *Node) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error
SendMessageToPeer send message to a peer.
func (*Node) SetNebService ¶ added in v1.0.0
func (node *Node) SetNebService(ns *NebService)
SetNebService set netService
func (*Node) SetSynchronizing ¶ added in v1.0.0
SetSynchronizing set node synchronizing.
type PeerFilterAlgorithm ¶ added in v1.0.0
type PeerFilterAlgorithm interface {
Filter(PeersSlice) PeersSlice
}
PeerFilterAlgorithm is the algorithm used to filter peers
type PeersSlice ¶ added in v1.0.0
type PeersSlice []interface{}
PeersSlice is a slice which contains peers
type RandomPeerFilter ¶ added in v1.0.0
type RandomPeerFilter struct { }
RandomPeerFilter will filter a peer randomly
func (*RandomPeerFilter) Filter ¶ added in v1.0.0
func (filter *RandomPeerFilter) Filter(peers PeersSlice) PeersSlice
Filter implemets PeerFilterAlgorithm interface
type RouteTable ¶ added in v1.0.0
type RouteTable struct {
// contains filtered or unexported fields
}
RouteTable route table struct.
func NewRouteTable ¶ added in v1.0.0
func NewRouteTable(config *Config, node *Node) *RouteTable
NewRouteTable new route table.
func (*RouteTable) AddIPFSPeerAddr ¶ added in v1.0.0
func (table *RouteTable) AddIPFSPeerAddr(addr ma.Multiaddr)
AddIPFSPeerAddr add a peer to route table with ipfs address.
func (*RouteTable) AddPeer ¶ added in v1.0.0
func (table *RouteTable) AddPeer(pid peer.ID, addr ma.Multiaddr)
AddPeer add peer to route table.
func (*RouteTable) AddPeerInfo ¶ added in v1.0.0
func (table *RouteTable) AddPeerInfo(prettyID string, addrStr []string) error
AddPeerInfo add peer to route table.
func (*RouteTable) AddPeerStream ¶ added in v1.0.0
func (table *RouteTable) AddPeerStream(s *Stream)
AddPeerStream add peer stream to peerStore.
func (*RouteTable) AddPeers ¶ added in v1.0.0
func (table *RouteTable) AddPeers(pid string, peers *netpb.Peers)
AddPeers add peers to route table
func (*RouteTable) GetRandomPeers ¶ added in v1.0.0
func (table *RouteTable) GetRandomPeers(pid peer.ID) []peerstore.PeerInfo
GetRandomPeers get random peers
func (*RouteTable) LoadInternalNodeList ¶ added in v1.0.1
func (table *RouteTable) LoadInternalNodeList()
LoadInternalNodeList Load Internal Node list from file
func (*RouteTable) LoadRouteTableFromFile ¶ added in v1.0.0
func (table *RouteTable) LoadRouteTableFromFile()
LoadRouteTableFromFile load route table from file.
func (*RouteTable) LoadSeedNodes ¶ added in v1.0.0
func (table *RouteTable) LoadSeedNodes()
LoadSeedNodes load seed nodes.
func (*RouteTable) Peers ¶ added in v1.0.0
func (table *RouteTable) Peers() map[peer.ID][]ma.Multiaddr
Peers return peers in route table.
func (*RouteTable) RemovePeerStream ¶ added in v1.0.0
func (table *RouteTable) RemovePeerStream(s *Stream)
RemovePeerStream remove peerStream from peerStore.
func (*RouteTable) SaveRouteTableToFile ¶ added in v1.0.0
func (table *RouteTable) SaveRouteTableToFile()
SaveRouteTableToFile save route table to file.
func (*RouteTable) Start ¶ added in v1.0.0
func (table *RouteTable) Start()
Start start route table syncLoop.
func (*RouteTable) Stop ¶ added in v1.0.0
func (table *RouteTable) Stop()
Stop quit route table syncLoop.
func (*RouteTable) SyncRouteTable ¶ added in v1.0.0
func (table *RouteTable) SyncRouteTable()
SyncRouteTable sync route table.
func (*RouteTable) SyncWithPeer ¶ added in v1.0.0
func (table *RouteTable) SyncWithPeer(pid peer.ID)
SyncWithPeer sync route table with a peer.
type Serializable ¶
Serializable model
type Service ¶ added in v1.0.0
type Service interface { Start() error Stop() Node() *Node Register(...*Subscriber) Deregister(...*Subscriber) Broadcast(string, Serializable, int) Relay(string, Serializable, int) SendMsg(string, []byte, string, int) error SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error ClosePeer(peerID string, reason error) BroadcastNetworkID([]byte) }
Service net Service interface
type Stream ¶ added in v1.0.0
type Stream struct {
// contains filtered or unexported fields
}
Stream define the structure of a stream in p2p network
func NewStreamFromPID ¶ added in v1.0.0
NewStreamFromPID return a new Stream based on the pid
func (*Stream) IsConnected ¶ added in v1.0.0
IsConnected return if the stream is connected
func (*Stream) IsHandshakeSucceed ¶ added in v1.0.0
IsHandshakeSucceed return if the handshake in the stream succeed
func (*Stream) RouteTable ¶ added in v1.0.0
RouteTable send sync table request
func (*Stream) SendMessage ¶ added in v1.0.0
SendMessage send msg to buffer
func (*Stream) SendProtoMessage ¶ added in v1.0.0
SendProtoMessage send proto msg to buffer
func (*Stream) StartLoop ¶ added in v1.0.0
func (s *Stream) StartLoop()
StartLoop start stream handling loop.
func (*Stream) WriteMessage ¶ added in v1.0.0
WriteMessage write raw msg in the stream
func (*Stream) WriteNebMessage ¶ added in v1.0.0
func (s *Stream) WriteNebMessage(message *NebMessage) error
WriteNebMessage write neb msg in the stream
type StreamManager ¶ added in v1.0.0
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages all streams
func NewStreamManager ¶ added in v1.0.0
func NewStreamManager(config *Config) *StreamManager
NewStreamManager return a new stream manager
func (*StreamManager) Add ¶ added in v1.0.0
func (sm *StreamManager) Add(s libnet.Stream, node *Node)
Add a new stream into the stream manager
func (*StreamManager) AddStream ¶ added in v1.0.0
func (sm *StreamManager) AddStream(stream *Stream)
AddStream into the stream manager
func (*StreamManager) BroadcastMessage ¶ added in v1.0.0
func (sm *StreamManager) BroadcastMessage(messageName string, messageContent Serializable, priority int)
BroadcastMessage broadcast the message
func (*StreamManager) CloseStream ¶ added in v1.0.0
func (sm *StreamManager) CloseStream(peerID string, reason error)
CloseStream with the given pid and reason
func (*StreamManager) Count ¶ added in v1.0.0
func (sm *StreamManager) Count() int32
Count return active peers count in the stream manager
func (*StreamManager) Find ¶ added in v1.0.0
func (sm *StreamManager) Find(pid peer.ID) *Stream
Find the stream with the given pid
func (*StreamManager) FindByPeerID ¶ added in v1.0.0
func (sm *StreamManager) FindByPeerID(peerID string) *Stream
FindByPeerID find the stream with the given peerID
func (*StreamManager) RelayMessage ¶ added in v1.0.0
func (sm *StreamManager) RelayMessage(messageName string, messageContent Serializable, priority int)
RelayMessage relay the message
func (*StreamManager) RemoveStream ¶ added in v1.0.0
func (sm *StreamManager) RemoveStream(s *Stream)
RemoveStream from the stream manager
func (*StreamManager) SendMessageToPeers ¶ added in v1.0.0
func (sm *StreamManager) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
SendMessageToPeers send the message to the peers filtered by the filter algorithm
func (*StreamManager) Start ¶ added in v1.0.0
func (sm *StreamManager) Start()
Start stream manager service
func (*StreamManager) Stop ¶ added in v1.0.0
func (sm *StreamManager) Stop()
Stop stream manager service
type StreamValue ¶ added in v1.0.0
type StreamValue struct {
// contains filtered or unexported fields
}
StreamValue value of stream in the past CleanupInterval
func (*StreamValue) String ¶ added in v1.0.0
func (s *StreamValue) String() string
type StreamValueSlice ¶ added in v1.0.0
type StreamValueSlice []*StreamValue
StreamValueSlice StreamValue slice
func (StreamValueSlice) Len ¶ added in v1.0.0
func (s StreamValueSlice) Len() int
func (StreamValueSlice) Less ¶ added in v1.0.0
func (s StreamValueSlice) Less(i, j int) bool
func (StreamValueSlice) Swap ¶ added in v1.0.0
func (s StreamValueSlice) Swap(i, j int)
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber subscriber.
func NewSubscriber ¶
func NewSubscriber(id interface{}, msgChan chan Message, doFilter bool, msgType string, weight MessageWeight) *Subscriber
NewSubscriber return new Subscriber instance.
func (*Subscriber) DoFilter ¶ added in v1.0.0
func (s *Subscriber) DoFilter() bool
DoFilter return doFilter
func (*Subscriber) MessageChan ¶
func (s *Subscriber) MessageChan() chan Message
MessageChan return msgChan.
func (*Subscriber) MessageType ¶
func (s *Subscriber) MessageType() string
MessageType return msgTypes.
func (*Subscriber) MessageWeight ¶ added in v1.0.0
func (s *Subscriber) MessageWeight() MessageWeight
MessageWeight return weight of msgType