network

package
v0.0.0-...-03c5838 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2020 License: LGPL-3.0 Imports: 48 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// Consider that a block is too large in sync.
	MaxcloudcardMessageDataLength = 512 * 1024 * 1024 // 512m.
	MaxcloudcardMessageNameLength = 24 - 12           // 12.

	DefaultReservedFlag           = 0x0
	ReservedCompressionEnableFlag = 0x80
	ReservedCompressionClientFlag = 0x40
)
View Source
const (
	InitBucketCapacity         = 64
	InitRoutingTableMaxLatency = 10
	InitPrivateKeyPath         = "conf/network/key"
	InitMaxSyncNodes           = 64
	InitChainID                = 1
	InitMaxStreamNum           = 210
	InitReservedStreamNum      = 20
	MajorNetworkID             = 1
	RTNTNetworkID              = 2
	Thirty                     = 30 * time.Second
	ThreeMinutes               = 3 * 60 * time.Second
	Network                    = "network"
)

const

View Source
const (
	ClientVersion = "0.1.0"

	HELLO          = "hello"
	OK             = "ok"
	BYE            = "bye"
	SYNCROUTE      = "syncroute"
	ROUTETABLE     = "routetable"
	CurrentVersion = 0x0
)

Stream Message Type

View Source
const (
	MessagePriorityHigh = iota
	MessagePriorityNormal
	MessagePriorityLow
)

Message Priority.

View Source
const (
	ChunkHeadersRequest  = "sync"       // ChainSync
	ChunkHeadersResponse = "chunks"     // ChainChunks
	ChunkDataRequest     = "getchunk"   // ChainGetChunk
	ChunkDataResponse    = "chunkdata"  // ChainChunkData
	ProposeLearnRequest  = "getpropose" //ProposeLearnRequest
	ProposeLearnResponse = "propose"    //ProposeLearnResponse

	StandBy = "standby"

	// paxos
	ProposalID    = "proposalID"
	Promise       = "promise"
	Accept        = "accept"
	Accepted      = "accepted"
	LatestPropose = "newPropose"

	// pbft
	PbftPreprepare = "preprepare"
	PbftPrepare    = "prepare"
	PbftCommit     = "commit"
)

Sync Message Type

View Source
const (
	MessageWeightZero = MessageWeight(0)
	MessageWeightNewTx
	MessageWeightNewBlock = MessageWeight(0.5)
	MessageWeightRouteTable
	MessageWeightChainChunks
	MessageWeightChainChunkData
	MessageWeightPropose

	MessageWeightStandby
)

const

View Source
const (
	CleanupInterval = time.Second * 60
)
View Source
const MemberSize int = 3

Variables

View Source
var (
	MagicNumber         = []byte{0x43, 0x43, 0x4D, 0x4E}
	RTNTMagicNumber     = []byte{0x43, 0x43, 0x54, 0x4E}
	DefaultReserved     = []byte{DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag}
	CompressionReserved = []byte{DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag | ReservedCompressionEnableFlag}

	ErrInsufficientMessageHeaderLength = errors.New("insufficient message header length")
	ErrInsufficientMessageDataLength   = errors.New("insufficient message data length")
	ErrInvalidMagicNumber              = errors.New("invalid magic number")
	ErrInvalidHeaderCheckSum           = errors.New("invalid header checksum")
	ErrInvalidDataCheckSum             = errors.New("invalid data checksum")
	ErrExceedMaxDataLength             = errors.New("exceed max data length")
	ErrExceedMaxMessageNameLength      = errors.New("exceed max message name length")
	ErrUncompressMessageFailed         = errors.New("uncompress message failed")
	ErrInvalidNetworkID                = errors.New("invalid network id")
)
View Source
var (
	DefaultListen = []string{"0.0.0.0:8888"}

	RouteTableSyncLoopInterval     = Thirty
	RouteTableSaveToDiskInterval   = ThreeMinutes
	RouteTableCacheFileName        = "routetable.cache"
	RouteTableInternalNodeFileName = "conf/internal.txt"

	MaxPeersCountForSyncResp = 32
)

Default Configuration in P2P network

View Source
var (
	ErrShouldCloseConnectionAndExitLoop = errors.New("should close connection and exit loop")
	ErrStreamIsNotConnected             = errors.New("stream is not connected")
)

Stream Errors

View Source
var (
	ErrExceedMaxStreamNum = errors.New("too many streams connected")
	ErrElimination        = errors.New("eliminated for low value")
	ErrDeprecatedStream   = errors.New("deprecated stream")
)
View Source
var (
	ErrListenPortIsNotAvailable = errors.New("listen port is not available")
	ErrConfigLackNetWork        = errors.New("config.conf should has network")
)

Errors

View Source
var (
	ErrExceedMaxSyncRouteResponse = errors.New("too many sync route table response")
)

Route Table Errors

View Source
var (
	ErrPeerIsNotConnected = errors.New("peer is not connected")
)

Error types

Functions

func ByteSliceEqualBCE

func ByteSliceEqualBCE(a, b []byte) bool

ByteSliceEqualBCE determines whether two byte arrays are equal.

func CheckClientVersionCompatibility

func CheckClientVersionCompatibility(v1, v2 string) bool

CheckClientVersionCompatibility if two clients are compatible If the clientVersion of node A is X.Y.Z, then node B must be X.Y.{} to be compatible with A.

func CreateEd25519Key

func CreateEd25519Key() (crypto.PrivKey, error)

GenerateEd25519Key return a new generated Ed22519 Private key.

func GenUpgrader

func GenUpgrader(n *swarm.Swarm) *tptu.Upgrader

GenUpgrader creates a new connection upgrader for use with this swarm.

func HasKey

func HasKey(key string) bool

HasKey use bloom filter to check if the key exists quickly

func HasRecvMessage

func HasRecvMessage(s *Stream, hash uint32) bool

HasRecvMessage check if the received message exists before

func LoadNetworkKeyFromFile

func LoadNetworkKeyFromFile(path string) (crypto.PrivKey, error)

LoadNetworkKeyFromFile load network priv key from file.

func LoadNetworkKeyFromFileOrCreateNew

func LoadNetworkKeyFromFileOrCreateNew(path string) (crypto.PrivKey, error)

LoadNetworkKeyFromFileOrCreateNew load network priv key from file or create new one.

func MarshalNetworkKey

func MarshalNetworkKey(key crypto.PrivKey) (string, error)

MarshalNetworkKey marshal network key.

func NewcloudcardMessage

func NewcloudcardMessage(networkID uint32, chainID uint32, reserved []byte, version byte, messageName string, data []byte) (*cloudcardMessage, error)

NewcloudcardMessage new cloudcard message

func NewcloudcardService

func NewcloudcardService(conf *config.Config) (*cloudcardService, error)

NewcloudcardService create netService

func ParseFromIPFSAddr

func ParseFromIPFSAddr(ipfsAddr ma.Multiaddr) (peer.ID, ma.Multiaddr, error)

ParseFromIPFSAddr return pid and address parsed from ipfs address

func ParsecloudcardMessage

func ParsecloudcardMessage(networkID uint32, data []byte) (*cloudcardMessage, error)

ParsecloudcardMessage parse cloudcard message

func RecordKey

func RecordKey(key string)

RecordKey add key to bloom filter.

func RecordRecvMessage

func RecordRecvMessage(s *Stream, hash uint32)

RecordRecvMessage records received message

func SetNetConfig

func SetNetConfig(conf *config.Config, netCfg *NetConfig)

func UnmarshalNetworkKey

func UnmarshalNetworkKey(data string) (crypto.PrivKey, error)

UnmarshalNetworkKey unmarshal network key.

Types

type BaseMessage

type BaseMessage struct {
	// contains filtered or unexported fields
}

BaseMessage base message

func (*BaseMessage) Data

func (msg *BaseMessage) Data() []byte

Data get the message data

func (*BaseMessage) Hash

func (msg *BaseMessage) Hash() string

Hash return the message hash

func (*BaseMessage) MessageFrom

func (msg *BaseMessage) MessageFrom() string

MessageFrom get message who send

func (*BaseMessage) MessageType

func (msg *BaseMessage) MessageType() string

MessageType get message type

type ChainSyncPeersFilter

type ChainSyncPeersFilter struct {
}

ChainSyncPeersFilter will filter some peers randomly

func (*ChainSyncPeersFilter) Filter

func (filter *ChainSyncPeersFilter) Filter(peers PeersSlice) PeersSlice

Filter implemets PeerFilterAlgorithm interface

type Config

type Config struct {
	NetworkID            uint32
	BucketSize           int
	Latency              time.Duration
	BootNodes            []multiaddr.Multiaddr
	PrivateKeyPath       string
	Listen               []string
	MaxSyncNodes         int
	ChainID              uint32
	RoutingTableDir      string
	StreamLimits         uint32
	ReservedStreamLimits uint32
}

func NewConfigFromDefaults

func NewConfigFromDefaults() *Config

NewConfigFromDefaults return new config from defaults.

func NewP2PConfig

func NewP2PConfig(cfg *config.Config) *Config

NewP2PConfig return new config object.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher a message dispatcher service.

func NewDispatcher

func NewDispatcher() *Dispatcher

NewDispatcher create Dispatcher instance.

func (*Dispatcher) Deregister

func (dp *Dispatcher) Deregister(subscribers ...*Subscriber)

Deregister deregister subscribers.

func (*Dispatcher) PutMessage

func (dp *Dispatcher) PutMessage(msg Message)

PutMessage put new message to chan, then subscribers will be notified to process.

func (*Dispatcher) Register

func (dp *Dispatcher) Register(subscribers ...*Subscriber)

Register register subscribers.

func (*Dispatcher) Start

func (dp *Dispatcher) Start()

Start start message dispatch goroutine.

func (*Dispatcher) Stop

func (dp *Dispatcher) Stop()

Stop stop goroutine.

type Message

type Message interface {
	MessageType() string
	MessageFrom() string
	Data() []byte
	Hash() string
}

Message interface for message.

func NewBaseMessage

func NewBaseMessage(t string, from string, data []byte) Message

NewBaseMessage new base message

type MessageWeight

type MessageWeight float64

MessageWeight float64

type NetConfig

type NetConfig struct {
	Seed                    []string `yaml:"seed"`
	Listen                  []string `yaml:"listen"`
	NetworkId               uint32   `yaml:"network_id"`
	PrivateKey              string   `yaml:"private_key"`
	StreamLimits            uint32   `yaml:"stream_limits"`
	ReservedStreamLimits    uint32   `yaml:"reserved_stream_limits"`
	RouteTableCacheFileName string   `yaml:"route_table_cache_filename"`
}

func GetNetConfig

func GetNetConfig(conf *config.Config) *NetConfig

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node the node can be used as both the client and the server

func NewNode

func NewNode(config *Config) (*Node, error)

NewNode return new Node according to the config.

func (*Node) AddPeer

func (node *Node) AddPeer(id peer.ID)

func (*Node) AllPeerIds

func (node *Node) AllPeerIds() []string

func (*Node) BroadcastMessage

func (node *Node) BroadcastMessage(messageName string, data Serializable, priority int)

BroadcastMessage broadcast message.

func (*Node) BucketSize

func (node *Node) BucketSize() int

BucketSize return node routeTable's bucket size

func (*Node) ID

func (node *Node) ID() string

ID return node ID.

func (*Node) Members

func (node *Node) Members() []peer.ID

func (*Node) RelayMessage

func (node *Node) RelayMessage(messageName string, data Serializable, priority int)

RelayMessage relay message.

func (*Node) RemovePeer

func (node *Node) RemovePeer(id peer.ID)

func (*Node) SendMessageToPeer

func (node *Node) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error

SendMessageToPeer send message to a peer.

func (*Node) SendMessageToPeerOrBroadcast

func (node *Node) SendMessageToPeerOrBroadcast(messageName string, messageContent Serializable, priority int, peerID string) error

SendMessageToPeer send message to a peer by peerId

func (*Node) SetcloudcardService

func (node *Node) SetcloudcardService(ns *cloudcardService)

SetcloudcardService set netService

func (*Node) Start

func (node *Node) Start() error

Start host & route table discovery

func (*Node) Stop

func (node *Node) Stop()

Stop stop a node.

func (*Node) StreamManager

func (node *Node) StreamManager() *StreamManager

StreamManager return node streamManager

func (*Node) Synchronized

func (node *Node) Synchronized() bool

Synchronized return node synchronized status

type PeerFilterAlgorithm

type PeerFilterAlgorithm interface {
	Filter(PeersSlice) PeersSlice
}

PeerFilterAlgorithm is the algorithm used to filter peers

type PeersSlice

type PeersSlice []interface{}

PeersSlice is a slice which contains peers

type RandomPeerFilter

type RandomPeerFilter struct {
}

RandomPeerFilter will filter a peer randomly

func (*RandomPeerFilter) Filter

func (filter *RandomPeerFilter) Filter(peers PeersSlice) PeersSlice

Filter implemets PeerFilterAlgorithm interface

type RouteTable

type RouteTable struct {
	// contains filtered or unexported fields
}

RouteTable route table struct.

func NewRouteTable

func NewRouteTable(config *Config, node *Node) *RouteTable

NewRouteTable new route table.

func (*RouteTable) AddIPFSPeerAddr

func (table *RouteTable) AddIPFSPeerAddr(addr ma.Multiaddr)

AddIPFSPeerAddr add a peer to route table with ipfs address.

func (*RouteTable) AddPeer

func (table *RouteTable) AddPeer(pid peer.ID, addr ma.Multiaddr)

AddPeer add peer to route table.

func (*RouteTable) AddPeerInfo

func (table *RouteTable) AddPeerInfo(prettyID string, addrStr []string) error

AddPeerInfo add peer to route table.

func (*RouteTable) AddPeerStream

func (table *RouteTable) AddPeerStream(s *Stream)

AddPeerStream add peer stream to peerStore.

func (*RouteTable) AddPeers

func (table *RouteTable) AddPeers(pid string, peers *netpb.Peers)

AddPeers add peers to route table

func (*RouteTable) GetRandomPeers

func (table *RouteTable) GetRandomPeers(pid peer.ID) []peer.AddrInfo

GetRandomPeers get random peers

func (*RouteTable) LoadInternalNodeList

func (table *RouteTable) LoadInternalNodeList()

LoadInternalNodeList Load Internal Node list from file

func (*RouteTable) LoadRouteTableFromFile

func (table *RouteTable) LoadRouteTableFromFile()

LoadRouteTableFromFile load route table from file.

func (*RouteTable) LoadSeedNodes

func (table *RouteTable) LoadSeedNodes()

LoadSeedNodes load seed nodes.

func (*RouteTable) RemovePeerStream

func (table *RouteTable) RemovePeerStream(s *Stream)

RemovePeerStream remove peerStream from peerStore.

func (*RouteTable) SaveRouteTableToFile

func (table *RouteTable) SaveRouteTableToFile()

SaveRouteTableToFile save route table to file.

func (*RouteTable) Start

func (table *RouteTable) Start()

Start start route table syncLoop.

func (*RouteTable) Stop

func (table *RouteTable) Stop()

Stop quit route table syncLoop.

func (*RouteTable) SyncRouteTable

func (table *RouteTable) SyncRouteTable()

SyncRouteTable sync route table.

func (*RouteTable) SyncWithPeer

func (table *RouteTable) SyncWithPeer(pid peer.ID)

SyncWithPeer sync route table with a peer.

type Serializable

type Serializable interface {
	ToProto() (proto.Message, error)
	FromProto(proto.Message) error
}

Serializable model

type Service

type Service interface {
	Start() error
	Stop()
	Node() *Node
	Register(...*Subscriber)
	Deregister(...*Subscriber)
	Broadcast(string, Serializable, int)
	Relay(string, Serializable, int)
	SendMessage(string, []byte, string, int) error
	SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
	SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error
	SendMessageToPeerOrBroadcast(messageName string, messageContent Serializable, priority int, peerID string) error
	ClosePeer(peerID string, reason error)
}

Service net Service interface

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream define the structure of a stream in p2p network

func NewStream

func NewStream(stream network.Stream, node *Node) *Stream

NewStream return a new Stream

func NewStreamFromPID

func NewStreamFromPID(pid peer.ID, node *Node) *Stream

NewStreamFromPID return a new Stream based on the pid

func (*Stream) Bye

func (s *Stream) Bye()

Bye say bye in the stream

func (*Stream) Connect

func (s *Stream) Connect() error

Connect to the stream

func (*Stream) Hello

func (s *Stream) Hello() error

func (*Stream) IsConnected

func (s *Stream) IsConnected() bool

IsConnected return if the stream is connected

func (*Stream) IsHandshakeSucceed

func (s *Stream) IsHandshakeSucceed() bool

IsHandshakeSucceed return if the handshake in the stream succeed

func (*Stream) Ok

func (s *Stream) Ok() error

Ok say ok in the stream

func (*Stream) RouteTable

func (s *Stream) RouteTable() error

RouteTable send sync table request

func (*Stream) SendMessage

func (s *Stream) SendMessage(messageName string, data []byte, priority int) error

SendMessage send msg to buffer

func (*Stream) SendProtoMessage

func (s *Stream) SendProtoMessage(messageName string, pb proto.Message, priority int) error

SendProtoMessage send proto msg to buffer

func (*Stream) StartLoop

func (s *Stream) StartLoop()

StartLoop start stream handling loop.

func (*Stream) String

func (s *Stream) String() string

func (*Stream) SyncRoute

func (s *Stream) SyncRoute() error

SyncRoute send sync route request

func (*Stream) Write

func (s *Stream) Write(data []byte) error

func (*Stream) WriteMessage

func (s *Stream) WriteMessage(messageName string, data []byte, reservedClientFlag byte) error

WriteMessage write raw msg in the stream

func (*Stream) WriteProtoMessage

func (s *Stream) WriteProtoMessage(messageName string, pb proto.Message, reservedClientFlag byte) error

WriteProtoMessage write proto msg in the stream

func (*Stream) WritecloudcardMessage

func (s *Stream) WritecloudcardMessage(message *cloudcardMessage) error

WritecloudcardMessage write cloudcard msg in the stream

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages all streams

func NewStreamManager

func NewStreamManager(config *Config) *StreamManager

NewStreamManager return a new stream manager

func (*StreamManager) ActivePeersCount

func (sm *StreamManager) ActivePeersCount() uint32

func (*StreamManager) Add

func (sm *StreamManager) Add(s network.Stream, node *Node)

Add a new stream into the stream manager

func (*StreamManager) AddStream

func (sm *StreamManager) AddStream(stream *Stream)

func (*StreamManager) BroadcastMessage

func (sm *StreamManager) BroadcastMessage(messageName string, messageContent Serializable, priority int)

BroadcastMessage broadcast the message

func (*StreamManager) CloseStream

func (sm *StreamManager) CloseStream(peerID string, reason error)

CloseStream with the given pid and reason

func (*StreamManager) Find

func (sm *StreamManager) Find(pid peer.ID) *Stream

Find the stream with the given pid

func (*StreamManager) GetStreamByPeerID

func (sm *StreamManager) GetStreamByPeerID(peerID string) *Stream

FindByPeerID find the stream with the given peerID

func (*StreamManager) RelayMessage

func (sm *StreamManager) RelayMessage(messageName string, messageContent Serializable, priority int)

RelayMessage relay the message

func (*StreamManager) RemoveStream

func (sm *StreamManager) RemoveStream(s *Stream)

RemoveStream from the stream manager

func (*StreamManager) SendMessageToPeers

func (sm *StreamManager) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string

SendMessageToPeers send the message to the peers filtered by the filter algorithm

func (*StreamManager) Start

func (sm *StreamManager) Start()

Start stream manager service

func (*StreamManager) Stop

func (sm *StreamManager) Stop()

Stop stream manager service

type StreamValue

type StreamValue struct {
	// contains filtered or unexported fields
}

StreamValue value of stream in the past CleanupInterval

func (*StreamValue) String

func (s *StreamValue) String() string

type StreamValueSlice

type StreamValueSlice []*StreamValue

StreamValueSlice StreamValue slice

func (StreamValueSlice) Len

func (s StreamValueSlice) Len() int

func (StreamValueSlice) Less

func (s StreamValueSlice) Less(i, j int) bool

func (StreamValueSlice) Swap

func (s StreamValueSlice) Swap(i, j int)

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber subscriber.

func NewSubscriber

func NewSubscriber(id interface{}, msgChan chan Message, doFilter bool, msgType string, weight MessageWeight) *Subscriber

NewSubscriber return new Subscriber instance.

func (*Subscriber) DoFilter

func (s *Subscriber) DoFilter() bool

DoFilter return doFilter

func (*Subscriber) ID

func (s *Subscriber) ID() interface{}

ID return id.

func (*Subscriber) MessageChan

func (s *Subscriber) MessageChan() chan Message

MessageChan return msgChan.

func (*Subscriber) MessageType

func (s *Subscriber) MessageType() string

MessageType return msgTypes.

func (*Subscriber) MessageWeight

func (s *Subscriber) MessageWeight() MessageWeight

MessageWeight return weight of msgType

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL