net

package
v2.0.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 20, 2019 License: LGPL-3.0 Imports: 39 Imported by: 8

README

net

Documentation

Index

Constants

View Source
const (
	DefaultBucketCapacity         = 64
	DefaultRoutingTableMaxLatency = 10
	DefaultPrivateKeyPath         = "conf/network.key"
	DefaultMaxSyncNodes           = 64
	DefaultChainID                = 1
	DefaultRoutingTableDir        = ""
	DefaultMaxStreamNum           = 200
	DefaultReservedStreamNum      = 20
)

const

View Source
const (
	NebMessageMagicNumberEndIdx    = 4
	NebMessageChainIDEndIdx        = 8
	NebMessageReservedEndIdx       = 11
	NebMessageVersionIndex         = 11
	NebMessageVersionEndIdx        = 12
	NebMessageNameEndIdx           = 24
	NebMessageDataLengthEndIdx     = 28
	NebMessageDataCheckSumEndIdx   = 32
	NebMessageHeaderCheckSumEndIdx = 36
	NebMessageHeaderLength         = 36

	// Consider that a block is too large in sync.
	MaxNebMessageDataLength = 512 * 1024 * 1024 // 512m.
	MaxNebMessageNameLength = 24 - 12           // 12.

	DefaultReservedFlag           = 0x0
	ReservedCompressionEnableFlag = 0x80
	ReservedCompressionClientFlag = 0x1
)

NebMessage defines protocol in Nebulas, we define our own wire protocol, as the following:

0               1               2               3              (bytes)
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1

+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Magic Number | +---------------------------------------------------------------+ | Chain ID | +-----------------------------------------------+---------------+ | Reserved | Version | +-----------------------------------------------+---------------+ | | + + | Message Name | + + | | +---------------------------------------------------------------+ | Data Length | +---------------------------------------------------------------+ | Data Checksum | +---------------------------------------------------------------+ | Header Checksum | |---------------------------------------------------------------+ | | + Data + . . | | +---------------------------------------------------------------+

const

View Source
const (
	ClientVersion  = "0.3.0"
	NebProtocolID  = "/neb/1.0.0"
	HELLO          = "hello"
	OK             = "ok"
	BYE            = "bye"
	SYNCROUTE      = "syncroute"
	ROUTETABLE     = "routetable"
	RECVEDMSG      = "recvedmsg"
	CurrentVersion = 0x0
)

Stream Message Type

View Source
const (
	MessagePriorityHigh = iota
	MessagePriorityNormal
	MessagePriorityLow
)

Message Priority.

View Source
const (
	ChunkHeadersRequest  = "sync"      // ChainSync
	ChunkHeadersResponse = "chunks"    // ChainChunks
	ChunkDataRequest     = "getchunk"  // ChainGetChunk
	ChunkDataResponse    = "chunkdata" // ChainChunkData
)

Sync Message Type

View Source
const (
	MessageWeightZero = MessageWeight(0)
	MessageWeightNewTx
	MessageWeightNewBlock = MessageWeight(0.5)
	MessageWeightRouteTable
	MessageWeightChainChunks
	MessageWeightChainChunkData
)

const

View Source
const (
	CleanupInterval = time.Second * 60
)

const

Variables

View Source
var (
	DefaultListen = []string{"0.0.0.0:8680"}

	RouteTableSyncLoopInterval     = 30 * time.Second
	RouteTableSaveToDiskInterval   = 3 * 60 * time.Second
	RouteTableCacheFileName        = "routetable.cache"
	RouteTableInternalNodeFileName = "conf/internal_list.txt"

	MaxPeersCountForSyncResp = 32
)

Default Configuration in P2P network

View Source
var (
	MagicNumber     = []byte{0x4e, 0x45, 0x42, 0x31}
	DefaultReserved = []byte{DefaultReservedFlag, DefaultReservedFlag, DefaultReservedFlag}
	CurrentReserved = []byte{DefaultReservedFlag | ReservedCompressionEnableFlag, DefaultReservedFlag, DefaultReservedFlag}

	ErrInsufficientMessageHeaderLength = errors.New("insufficient message header length")
	ErrInsufficientMessageDataLength   = errors.New("insufficient message data length")
	ErrInvalidMagicNumber              = errors.New("invalid magic number")
	ErrInvalidHeaderCheckSum           = errors.New("invalid header checksum")
	ErrInvalidDataCheckSum             = errors.New("invalid data checksum")
	ErrExceedMaxDataLength             = errors.New("exceed max data length")
	ErrExceedMaxMessageNameLength      = errors.New("exceed max message name length")
	ErrUncompressMessageFailed         = errors.New("uncompress message failed")
)

Error types

View Source
var (
	ErrShouldCloseConnectionAndExitLoop = errors.New("should close connection and exit loop")
	ErrStreamIsNotConnected             = errors.New("stream is not connected")
)

Stream Errors

View Source
var (
	ErrExceedMaxStreamNum = errors.New("too many streams connected")
	ErrElimination        = errors.New("eliminated for low value")
	ErrDeprecatedStream   = errors.New("deprecated stream")
)

var

View Source
var (
	ErrListenPortIsNotAvailable = errors.New("listen port is not available")
	ErrConfigLackNetWork        = errors.New("config.conf should has network")
)

Errors

View Source
var (
	ErrExceedMaxSyncRouteResponse = errors.New("too many sync route table response")
)

Route Table Errors

View Source
var (
	ErrPeerIsNotConnected = errors.New("peer is not connected")
)

Error types

View Source
var (
	ErrPeersIsNotEnough = errors.New("peers is not enough")
)

Sync Errors

Functions

func ByteSliceEqualBCE added in v1.0.2

func ByteSliceEqualBCE(a, b []byte) bool

ByteSliceEqualBCE determines whether two byte arrays are equal.

func CheckClientVersionCompatibility added in v1.0.0

func CheckClientVersionCompatibility(v1, v2 string) bool

CheckClientVersionCompatibility if two clients are compatible If the clientVersion of node A is X.Y.Z, then node B must be X.Y.{} to be compatible with A.

func GenerateEd25519Key added in v1.0.0

func GenerateEd25519Key() (crypto.PrivKey, error)

GenerateEd25519Key return a new generated Ed22519 Private key.

func HasKey added in v1.0.0

func HasKey(key string) bool

HasKey use bloom filter to check if the key exists quickly

func HasRecvMessage added in v1.0.0

func HasRecvMessage(s *Stream, hash uint32) bool

HasRecvMessage check if the received message exists before

func LoadNetworkKeyFromFile added in v1.0.0

func LoadNetworkKeyFromFile(path string) (crypto.PrivKey, error)

LoadNetworkKeyFromFile load network priv key from file.

func LoadNetworkKeyFromFileOrCreateNew added in v1.0.0

func LoadNetworkKeyFromFileOrCreateNew(path string) (crypto.PrivKey, error)

LoadNetworkKeyFromFileOrCreateNew load network priv key from file or create new one.

func MarshalNetworkKey added in v1.0.0

func MarshalNetworkKey(key crypto.PrivKey) (string, error)

MarshalNetworkKey marshal network key.

func ParseFromIPFSAddr added in v1.0.0

func ParseFromIPFSAddr(ipfsAddr ma.Multiaddr) (peer.ID, ma.Multiaddr, error)

ParseFromIPFSAddr return pid and address parsed from ipfs address

func RecordKey added in v1.0.0

func RecordKey(key string)

RecordKey add key to bloom filter.

func RecordRecvMessage added in v1.0.0

func RecordRecvMessage(s *Stream, hash uint32)

RecordRecvMessage records received message

func UnmarshalNetworkKey added in v1.0.0

func UnmarshalNetworkKey(data string) (crypto.PrivKey, error)

UnmarshalNetworkKey unmarshal network key.

Types

type BaseMessage added in v1.0.0

type BaseMessage struct {
	// contains filtered or unexported fields
}

BaseMessage base message

func (*BaseMessage) Data added in v1.0.0

func (msg *BaseMessage) Data() []byte

Data get the message data

func (*BaseMessage) Hash added in v1.0.0

func (msg *BaseMessage) Hash() string

Hash return the message hash

func (*BaseMessage) MessageFrom added in v1.0.0

func (msg *BaseMessage) MessageFrom() string

MessageFrom get message who send

func (*BaseMessage) MessageType added in v1.0.0

func (msg *BaseMessage) MessageType() string

MessageType get message type

func (*BaseMessage) String added in v1.0.0

func (msg *BaseMessage) String() string

String get the message to string

type ChainSyncPeersFilter added in v1.0.0

type ChainSyncPeersFilter struct {
}

ChainSyncPeersFilter will filter some peers randomly

func (*ChainSyncPeersFilter) Filter added in v1.0.0

func (filter *ChainSyncPeersFilter) Filter(peers PeersSlice) PeersSlice

Filter implemets PeerFilterAlgorithm interface

type Config added in v1.0.0

type Config struct {
	Bucketsize           int
	Latency              time.Duration
	BootNodes            []multiaddr.Multiaddr
	PrivateKeyPath       string
	Listen               []string
	MaxSyncNodes         int
	ChainID              uint32
	RoutingTableDir      string
	StreamLimits         int32
	ReservedStreamLimits int32
}

Config TODO: move to proto config.

func NewConfigFromDefaults added in v1.0.0

func NewConfigFromDefaults() *Config

NewConfigFromDefaults return new config from defaults.

func NewP2PConfig added in v1.0.0

func NewP2PConfig(n Neblet) *Config

NewP2PConfig return new config object.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher a message dispatcher service.

func NewDispatcher

func NewDispatcher() *Dispatcher

NewDispatcher create Dispatcher instance.

func (*Dispatcher) Deregister

func (dp *Dispatcher) Deregister(subscribers ...*Subscriber)

Deregister deregister subscribers.

func (*Dispatcher) PutMessage

func (dp *Dispatcher) PutMessage(msg Message)

PutMessage put new message to chan, then subscribers will be notified to process.

func (*Dispatcher) Register

func (dp *Dispatcher) Register(subscribers ...*Subscriber)

Register register subscribers.

func (*Dispatcher) Start

func (dp *Dispatcher) Start()

Start start message dispatch goroutine.

func (*Dispatcher) Stop

func (dp *Dispatcher) Stop()

Stop stop goroutine.

type Message

type Message interface {
	MessageType() string
	MessageFrom() string
	Data() []byte
	Hash() string
}

Message interface for message.

func NewBaseMessage added in v1.0.0

func NewBaseMessage(t string, from string, data []byte) Message

NewBaseMessage new base message

type MessageType

type MessageType string

MessageType a string for message type.

type MessageWeight added in v1.0.0

type MessageWeight float64

MessageWeight float64

type NebMessage added in v1.0.0

type NebMessage struct {
	// contains filtered or unexported fields
}

NebMessage struct

func NewNebMessage added in v1.0.0

func NewNebMessage(chainID uint32, reserved []byte, version byte, messageName string, data []byte) (*NebMessage, error)

NewNebMessage new neb message

func ParseNebMessage added in v1.0.0

func ParseNebMessage(data []byte) (*NebMessage, error)

ParseNebMessage parse neb message

func (*NebMessage) ChainID added in v1.0.0

func (message *NebMessage) ChainID() uint32

ChainID return chainID

func (*NebMessage) Content added in v1.0.0

func (message *NebMessage) Content() []byte

Content return message content

func (*NebMessage) Data added in v1.0.0

func (message *NebMessage) Data() ([]byte, error)

Data return data

func (*NebMessage) DataCheckSum added in v1.0.0

func (message *NebMessage) DataCheckSum() uint32

DataCheckSum return data checkSum

func (*NebMessage) DataLength added in v1.0.0

func (message *NebMessage) DataLength() uint32

DataLength return dataLength

func (*NebMessage) FlagSendMessageAt added in v1.0.0

func (message *NebMessage) FlagSendMessageAt()

FlagSendMessageAt flag of send message time

func (*NebMessage) FlagWriteMessageAt added in v1.0.0

func (message *NebMessage) FlagWriteMessageAt()

FlagWriteMessageAt flag of write message time

func (*NebMessage) HeaderCheckSum added in v1.0.0

func (message *NebMessage) HeaderCheckSum() uint32

HeaderCheckSum return header checkSum

func (*NebMessage) HeaderWithoutCheckSum added in v1.0.0

func (message *NebMessage) HeaderWithoutCheckSum() []byte

HeaderWithoutCheckSum return header without checkSum

func (*NebMessage) LatencyFromSendToWrite added in v1.0.0

func (message *NebMessage) LatencyFromSendToWrite() int64

LatencyFromSendToWrite latency from sendMessage to writeMessage

func (*NebMessage) Length added in v1.0.0

func (message *NebMessage) Length() uint64

Length return message Length

func (*NebMessage) MagicNumber added in v1.0.0

func (message *NebMessage) MagicNumber() []byte

MagicNumber return magicNumber

func (*NebMessage) MessageName added in v1.0.0

func (message *NebMessage) MessageName() string

MessageName return message name

func (*NebMessage) OriginalData added in v1.0.2

func (message *NebMessage) OriginalData() []byte

OriginalData return original data

func (*NebMessage) ParseMessageData added in v1.0.0

func (message *NebMessage) ParseMessageData(data []byte) error

ParseMessageData parse neb message data

func (*NebMessage) Reserved added in v1.0.0

func (message *NebMessage) Reserved() []byte

Reserved return reserved

func (*NebMessage) VerifyData added in v1.0.0

func (message *NebMessage) VerifyData() error

VerifyData verify message data

func (*NebMessage) VerifyHeader added in v1.0.0

func (message *NebMessage) VerifyHeader() error

VerifyHeader verify message header

func (*NebMessage) Version added in v1.0.0

func (message *NebMessage) Version() byte

Version return version

type NebService added in v1.0.0

type NebService struct {
	// contains filtered or unexported fields
}

NebService service for nebulas p2p network

func NewNebService added in v1.0.0

func NewNebService(n Neblet) (*NebService, error)

NewNebService create netService

func (*NebService) Broadcast added in v1.0.0

func (ns *NebService) Broadcast(name string, msg Serializable, priority int)

Broadcast message.

func (*NebService) BroadcastNetworkID added in v1.0.0

func (ns *NebService) BroadcastNetworkID(msg []byte)

BroadcastNetworkID broadcast networkID when changed.

func (*NebService) ClosePeer added in v1.0.0

func (ns *NebService) ClosePeer(peerID string, reason error)

ClosePeer close the stream to a peer.

func (*NebService) Deregister added in v1.0.0

func (ns *NebService) Deregister(subscribers ...*Subscriber)

Deregister Deregister the subscribers.

func (*NebService) Node added in v1.0.0

func (ns *NebService) Node() *Node

Node return the peer node

func (*NebService) PutMessage added in v1.0.0

func (ns *NebService) PutMessage(msg Message)

PutMessage put message to dispatcher.

func (*NebService) Register added in v1.0.0

func (ns *NebService) Register(subscribers ...*Subscriber)

Register register the subscribers.

func (*NebService) Relay added in v1.0.0

func (ns *NebService) Relay(name string, msg Serializable, priority int)

Relay message.

func (*NebService) SendMessageToPeer added in v1.0.0

func (ns *NebService) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error

SendMessageToPeer send message to a peer.

func (*NebService) SendMessageToPeers added in v1.0.0

func (ns *NebService) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string

SendMessageToPeers send message to peers.

func (*NebService) SendMsg added in v1.0.0

func (ns *NebService) SendMsg(msgName string, msg []byte, target string, priority int) error

SendMsg send message to a peer.

func (*NebService) Start added in v1.0.0

func (ns *NebService) Start() error

Start start p2p manager.

func (*NebService) Stop added in v1.0.0

func (ns *NebService) Stop()

Stop stop p2p manager.

type Neblet added in v1.0.0

type Neblet interface {
	Config() *nebletpb.Config
}

Neblet interface breaks cycle import dependency.

type Node added in v1.0.0

type Node struct {
	// contains filtered or unexported fields
}

Node the node can be used as both the client and the server

func NewNode added in v1.0.0

func NewNode(config *Config) (*Node, error)

NewNode return new Node according to the config.

func (*Node) BroadcastMessage added in v1.0.0

func (node *Node) BroadcastMessage(messageName string, data Serializable, priority int)

BroadcastMessage broadcast message.

func (*Node) Config added in v1.0.0

func (node *Node) Config() *Config

Config return node config.

func (*Node) ID added in v1.0.0

func (node *Node) ID() string

ID return node ID.

func (*Node) IsSynchronizing added in v1.0.0

func (node *Node) IsSynchronizing() bool

IsSynchronizing return node synchronizing

func (*Node) PeersCount added in v1.0.0

func (node *Node) PeersCount() int32

PeersCount return stream count.

func (*Node) RelayMessage added in v1.0.0

func (node *Node) RelayMessage(messageName string, data Serializable, priority int)

RelayMessage relay message.

func (*Node) RouteTable added in v1.0.0

func (node *Node) RouteTable() *RouteTable

RouteTable return route table.

func (*Node) SendMessageToPeer added in v1.0.0

func (node *Node) SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error

SendMessageToPeer send message to a peer.

func (*Node) SetNebService added in v1.0.0

func (node *Node) SetNebService(ns *NebService)

SetNebService set netService

func (*Node) SetSynchronizing added in v1.0.0

func (node *Node) SetSynchronizing(synchronizing bool)

SetSynchronizing set node synchronizing.

func (*Node) Start added in v1.0.0

func (node *Node) Start() error

Start host & route table discovery

func (*Node) Stop added in v1.0.0

func (node *Node) Stop()

Stop stop a node.

type PeerFilterAlgorithm added in v1.0.0

type PeerFilterAlgorithm interface {
	Filter(PeersSlice) PeersSlice
}

PeerFilterAlgorithm is the algorithm used to filter peers

type PeersSlice added in v1.0.0

type PeersSlice []interface{}

PeersSlice is a slice which contains peers

type RandomPeerFilter added in v1.0.0

type RandomPeerFilter struct {
}

RandomPeerFilter will filter a peer randomly

func (*RandomPeerFilter) Filter added in v1.0.0

func (filter *RandomPeerFilter) Filter(peers PeersSlice) PeersSlice

Filter implemets PeerFilterAlgorithm interface

type RouteTable added in v1.0.0

type RouteTable struct {
	// contains filtered or unexported fields
}

RouteTable route table struct.

func NewRouteTable added in v1.0.0

func NewRouteTable(config *Config, node *Node) *RouteTable

NewRouteTable new route table.

func (*RouteTable) AddIPFSPeerAddr added in v1.0.0

func (table *RouteTable) AddIPFSPeerAddr(addr ma.Multiaddr)

AddIPFSPeerAddr add a peer to route table with ipfs address.

func (*RouteTable) AddPeer added in v1.0.0

func (table *RouteTable) AddPeer(pid peer.ID, addr ma.Multiaddr)

AddPeer add peer to route table.

func (*RouteTable) AddPeerInfo added in v1.0.0

func (table *RouteTable) AddPeerInfo(prettyID string, addrStr []string) error

AddPeerInfo add peer to route table.

func (*RouteTable) AddPeerStream added in v1.0.0

func (table *RouteTable) AddPeerStream(s *Stream)

AddPeerStream add peer stream to peerStore.

func (*RouteTable) AddPeers added in v1.0.0

func (table *RouteTable) AddPeers(pid string, peers *netpb.Peers)

AddPeers add peers to route table

func (*RouteTable) GetRandomPeers added in v1.0.0

func (table *RouteTable) GetRandomPeers(pid peer.ID) []peerstore.PeerInfo

GetRandomPeers get random peers

func (*RouteTable) LoadInternalNodeList added in v1.0.1

func (table *RouteTable) LoadInternalNodeList()

LoadInternalNodeList Load Internal Node list from file

func (*RouteTable) LoadRouteTableFromFile added in v1.0.0

func (table *RouteTable) LoadRouteTableFromFile()

LoadRouteTableFromFile load route table from file.

func (*RouteTable) LoadSeedNodes added in v1.0.0

func (table *RouteTable) LoadSeedNodes()

LoadSeedNodes load seed nodes.

func (*RouteTable) Peers added in v1.0.0

func (table *RouteTable) Peers() map[peer.ID][]ma.Multiaddr

Peers return peers in route table.

func (*RouteTable) RemovePeerStream added in v1.0.0

func (table *RouteTable) RemovePeerStream(s *Stream)

RemovePeerStream remove peerStream from peerStore.

func (*RouteTable) SaveRouteTableToFile added in v1.0.0

func (table *RouteTable) SaveRouteTableToFile()

SaveRouteTableToFile save route table to file.

func (*RouteTable) Start added in v1.0.0

func (table *RouteTable) Start()

Start start route table syncLoop.

func (*RouteTable) Stop added in v1.0.0

func (table *RouteTable) Stop()

Stop quit route table syncLoop.

func (*RouteTable) SyncRouteTable added in v1.0.0

func (table *RouteTable) SyncRouteTable()

SyncRouteTable sync route table.

func (*RouteTable) SyncWithPeer added in v1.0.0

func (table *RouteTable) SyncWithPeer(pid peer.ID)

SyncWithPeer sync route table with a peer.

type Serializable

type Serializable interface {
	ToProto() (proto.Message, error)
	FromProto(proto.Message) error
}

Serializable model

type Service added in v1.0.0

type Service interface {
	Start() error
	Stop()

	Node() *Node

	Register(...*Subscriber)
	Deregister(...*Subscriber)

	Broadcast(string, Serializable, int)
	Relay(string, Serializable, int)
	SendMsg(string, []byte, string, int) error

	SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string
	SendMessageToPeer(messageName string, data []byte, priority int, peerID string) error

	ClosePeer(peerID string, reason error)

	BroadcastNetworkID([]byte)
}

Service net Service interface

type Stream added in v1.0.0

type Stream struct {
	// contains filtered or unexported fields
}

Stream define the structure of a stream in p2p network

func NewStream added in v1.0.0

func NewStream(stream libnet.Stream, node *Node) *Stream

NewStream return a new Stream

func NewStreamFromPID added in v1.0.0

func NewStreamFromPID(pid peer.ID, node *Node) *Stream

NewStreamFromPID return a new Stream based on the pid

func (*Stream) Bye added in v1.0.0

func (s *Stream) Bye()

Bye say bye in the stream

func (*Stream) Connect added in v1.0.0

func (s *Stream) Connect() error

Connect to the stream

func (*Stream) Hello added in v1.0.0

func (s *Stream) Hello() error

Hello say hello in the stream

func (*Stream) IsConnected added in v1.0.0

func (s *Stream) IsConnected() bool

IsConnected return if the stream is connected

func (*Stream) IsHandshakeSucceed added in v1.0.0

func (s *Stream) IsHandshakeSucceed() bool

IsHandshakeSucceed return if the handshake in the stream succeed

func (*Stream) Ok added in v1.0.0

func (s *Stream) Ok() error

Ok say ok in the stream

func (*Stream) RouteTable added in v1.0.0

func (s *Stream) RouteTable() error

RouteTable send sync table request

func (*Stream) SendMessage added in v1.0.0

func (s *Stream) SendMessage(messageName string, data []byte, priority int) error

SendMessage send msg to buffer

func (*Stream) SendProtoMessage added in v1.0.0

func (s *Stream) SendProtoMessage(messageName string, pb proto.Message, priority int) error

SendProtoMessage send proto msg to buffer

func (*Stream) StartLoop added in v1.0.0

func (s *Stream) StartLoop()

StartLoop start stream handling loop.

func (*Stream) String added in v1.0.0

func (s *Stream) String() string

func (*Stream) SyncRoute added in v1.0.0

func (s *Stream) SyncRoute() error

SyncRoute send sync route request

func (*Stream) Write added in v1.0.0

func (s *Stream) Write(data []byte) error

func (*Stream) WriteMessage added in v1.0.0

func (s *Stream) WriteMessage(messageName string, data []byte, reservedClientFlag byte) error

WriteMessage write raw msg in the stream

func (*Stream) WriteNebMessage added in v1.0.0

func (s *Stream) WriteNebMessage(message *NebMessage) error

WriteNebMessage write neb msg in the stream

func (*Stream) WriteProtoMessage added in v1.0.0

func (s *Stream) WriteProtoMessage(messageName string, pb proto.Message, reservedClientFlag byte) error

WriteProtoMessage write proto msg in the stream

type StreamManager added in v1.0.0

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages all streams

func NewStreamManager added in v1.0.0

func NewStreamManager(config *Config) *StreamManager

NewStreamManager return a new stream manager

func (*StreamManager) Add added in v1.0.0

func (sm *StreamManager) Add(s libnet.Stream, node *Node)

Add a new stream into the stream manager

func (*StreamManager) AddStream added in v1.0.0

func (sm *StreamManager) AddStream(stream *Stream)

AddStream into the stream manager

func (*StreamManager) BroadcastMessage added in v1.0.0

func (sm *StreamManager) BroadcastMessage(messageName string, messageContent Serializable, priority int)

BroadcastMessage broadcast the message

func (*StreamManager) CloseStream added in v1.0.0

func (sm *StreamManager) CloseStream(peerID string, reason error)

CloseStream with the given pid and reason

func (*StreamManager) Count added in v1.0.0

func (sm *StreamManager) Count() int32

Count return active peers count in the stream manager

func (*StreamManager) Find added in v1.0.0

func (sm *StreamManager) Find(pid peer.ID) *Stream

Find the stream with the given pid

func (*StreamManager) FindByPeerID added in v1.0.0

func (sm *StreamManager) FindByPeerID(peerID string) *Stream

FindByPeerID find the stream with the given peerID

func (*StreamManager) RelayMessage added in v1.0.0

func (sm *StreamManager) RelayMessage(messageName string, messageContent Serializable, priority int)

RelayMessage relay the message

func (*StreamManager) RemoveStream added in v1.0.0

func (sm *StreamManager) RemoveStream(s *Stream)

RemoveStream from the stream manager

func (*StreamManager) SendMessageToPeers added in v1.0.0

func (sm *StreamManager) SendMessageToPeers(messageName string, data []byte, priority int, filter PeerFilterAlgorithm) []string

SendMessageToPeers send the message to the peers filtered by the filter algorithm

func (*StreamManager) Start added in v1.0.0

func (sm *StreamManager) Start()

Start stream manager service

func (*StreamManager) Stop added in v1.0.0

func (sm *StreamManager) Stop()

Stop stream manager service

type StreamValue added in v1.0.0

type StreamValue struct {
	// contains filtered or unexported fields
}

StreamValue value of stream in the past CleanupInterval

func (*StreamValue) String added in v1.0.0

func (s *StreamValue) String() string

type StreamValueSlice added in v1.0.0

type StreamValueSlice []*StreamValue

StreamValueSlice StreamValue slice

func (StreamValueSlice) Len added in v1.0.0

func (s StreamValueSlice) Len() int

func (StreamValueSlice) Less added in v1.0.0

func (s StreamValueSlice) Less(i, j int) bool

func (StreamValueSlice) Swap added in v1.0.0

func (s StreamValueSlice) Swap(i, j int)

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber subscriber.

func NewSubscriber

func NewSubscriber(id interface{}, msgChan chan Message, doFilter bool, msgType string, weight MessageWeight) *Subscriber

NewSubscriber return new Subscriber instance.

func (*Subscriber) DoFilter added in v1.0.0

func (s *Subscriber) DoFilter() bool

DoFilter return doFilter

func (*Subscriber) ID

func (s *Subscriber) ID() interface{}

ID return id.

func (*Subscriber) MessageChan

func (s *Subscriber) MessageChan() chan Message

MessageChan return msgChan.

func (*Subscriber) MessageType

func (s *Subscriber) MessageType() string

MessageType return msgTypes.

func (*Subscriber) MessageWeight added in v1.0.0

func (s *Subscriber) MessageWeight() MessageWeight

MessageWeight return weight of msgType

Directories

Path Synopsis
Package netpb is a generated protocol buffer package.
Package netpb is a generated protocol buffer package.
testing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL