Documentation
¶
Index ¶
- type BytesReceived
- type ConnectionAccepted
- type LocalNode
- func (ln *LocalNode) AddToRxCache(msgID []byte) (bool, error)
- func (ln *LocalNode) AllocReplyChan(msgID []byte, expiration time.Duration) (chan *RemoteMessage, error)
- func (store LocalNode) ApplyMiddleware(mw interface{}) error
- func (ln *LocalNode) Connect(n *pbnode.Node) (*RemoteNode, bool, error)
- func (ln *LocalNode) GetNeighbors(filter func(*RemoteNode) bool) ([]*RemoteNode, error)
- func (ln *LocalNode) GetReplyChan(msgID []byte) (chan *RemoteMessage, bool)
- func (ln *LocalNode) GetRxMsgChan(routingType pbmsg.RoutingType) (chan *RemoteMessage, error)
- func (ln *LocalNode) HandleRemoteMessage(remoteMsg *RemoteMessage) error
- func (ln *LocalNode) NewExchangeNodeMessage() (*pbmsg.Message, error)
- func (ln *LocalNode) NewExchangeNodeReply(replyToID []byte) (*pbmsg.Message, error)
- func (ln *LocalNode) NewPingMessage() (*pbmsg.Message, error)
- func (ln *LocalNode) NewPingReply(replyToID []byte) (*pbmsg.Message, error)
- func (ln *LocalNode) NewStopMessage() (*pbmsg.Message, error)
- func (ln *LocalNode) RegisterRoutingType(routingType pbmsg.RoutingType)
- func (ln *LocalNode) SetInternalPort(port uint16)
- func (ln *LocalNode) Start() error
- func (ln *LocalNode) StartRemoteNode(conn net.Conn, isOutbound bool, n *pbnode.Node) (*RemoteNode, error)
- func (ln *LocalNode) Stop(err error)
- type LocalNodeStarted
- type LocalNodeStopped
- type LocalNodeWillStart
- type LocalNodeWillStop
- type MessageEncoded
- type MessageWillDecode
- type Node
- type RemoteMessage
- type RemoteNode
- func (rn *RemoteNode) ExchangeNode() (*pbnode.Node, error)
- func (rn *RemoteNode) GetConn() net.Conn
- func (rn *RemoteNode) GetLastRxTime() time.Time
- func (rn *RemoteNode) GetRoundTripTime() time.Duration
- func (rn *RemoteNode) NotifyStop() error
- func (rn *RemoteNode) Ping() error
- func (rn *RemoteNode) SendMessage(msg *pbmsg.Message, hasReply bool, replyTimeout time.Duration) (<-chan *RemoteMessage, error)
- func (rn *RemoteNode) SendMessageAsync(msg *pbmsg.Message) error
- func (rn *RemoteNode) SendMessageSync(msg *pbmsg.Message, replyTimeout time.Duration) (*RemoteMessage, error)
- func (rn *RemoteNode) Start() error
- func (rn *RemoteNode) Stop(err error)
- func (rn *RemoteNode) String() string
- type RemoteNodeConnected
- type RemoteNodeDisconnected
- type RemoteNodeReady
- type WillConnectToNode
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BytesReceived ¶
type BytesReceived struct { Func func(data, msgID, srcID []byte, remoteNode *RemoteNode) ([]byte, bool) Priority int32 }
BytesReceived is called when local node receive user-defined BYTES message. Message with the same message id will only trigger this middleware once. The argument it accepts are bytes data, message ID (can be used to reply message), sender ID, and the neighbor that passes you the message (may be different from the message sneder). Returns the bytes data to be passed in the next middleware and if we should proceed to the next middleware.
type ConnectionAccepted ¶
ConnectionAccepted is called when a network connection is accepted. Returns if we should accept the connection and if we should proceed to the next middleware.
type LocalNode ¶
LocalNode is a local node
func NewLocalNode ¶
NewLocalNode creates a local node
func (*LocalNode) AddToRxCache ¶
AddToRxCache add RemoteMessage id to rxMsgCache if not exists. Returns if msg id is added (instead of loaded) and error when adding
func (*LocalNode) AllocReplyChan ¶
func (ln *LocalNode) AllocReplyChan(msgID []byte, expiration time.Duration) (chan *RemoteMessage, error)
AllocReplyChan creates a reply chan for msg with id msgID
func (LocalNode) ApplyMiddleware ¶
func (store LocalNode) ApplyMiddleware(mw interface{}) error
ApplyMiddleware add a middleware to the store
func (*LocalNode) Connect ¶
Connect try to establish connection with address remoteNodeAddr, returns the remote node, if the remote node is ready, and error. The remote rode can be nil if another goroutine is connecting to the same address concurrently. The remote node is ready if an active connection to the remoteNodeAddr exists and node info has been exchanged.
func (*LocalNode) GetNeighbors ¶
func (ln *LocalNode) GetNeighbors(filter func(*RemoteNode) bool) ([]*RemoteNode, error)
GetNeighbors returns a list of remote nodes that are connected to local nodes where the filter function returns true. Pass nil filter to return all neighbors.
func (*LocalNode) GetReplyChan ¶
func (ln *LocalNode) GetReplyChan(msgID []byte) (chan *RemoteMessage, bool)
GetReplyChan gets the message reply channel for message id msgID
func (*LocalNode) GetRxMsgChan ¶
func (ln *LocalNode) GetRxMsgChan(routingType pbmsg.RoutingType) (chan *RemoteMessage, error)
GetRxMsgChan gets the message channel of a routing type, or return error if channel for routing type does not exist
func (*LocalNode) HandleRemoteMessage ¶
func (ln *LocalNode) HandleRemoteMessage(remoteMsg *RemoteMessage) error
HandleRemoteMessage add remoteMsg to handleMsgChan for further processing
func (*LocalNode) NewExchangeNodeMessage ¶
NewExchangeNodeMessage creates a EXCHANGE_NODE message to get node info
func (*LocalNode) NewExchangeNodeReply ¶
NewExchangeNodeReply creates a EXCHANGE_NODE reply to send node info
func (*LocalNode) NewPingMessage ¶
NewPingMessage creates a PING message for heartbeat
func (*LocalNode) NewPingReply ¶
NewPingReply creates a PING reply for heartbeat
func (*LocalNode) NewStopMessage ¶
NewStopMessage creates a STOP message to notify local node to close connection with remote node
func (*LocalNode) RegisterRoutingType ¶
func (ln *LocalNode) RegisterRoutingType(routingType pbmsg.RoutingType)
RegisterRoutingType register a routing type and creates the rxMsgChan for it
func (*LocalNode) SetInternalPort ¶
SetInternalPort changes the internal port that local node will listen on. It should not be called once the node starts. Note that it does not change the external port that other nodes will connect to.
func (*LocalNode) StartRemoteNode ¶
func (ln *LocalNode) StartRemoteNode(conn net.Conn, isOutbound bool, n *pbnode.Node) (*RemoteNode, error)
StartRemoteNode creates and starts a remote node using conn. If n is not nil, its id and address will be used to validate ExchangeNode info.
type LocalNodeStarted ¶
LocalNodeStarted is called right after local node starts listening and handling messages. Returns if we should proceed to the next middleware.
type LocalNodeStopped ¶
LocalNodeStopped is called right after local node stops listening and handling messages. Returns if we should proceed to the next middleware.
type LocalNodeWillStart ¶
LocalNodeWillStart is called right before local node starts listening and handling messages. It can be used to add additional data to local node, etc. Returns if we should proceed to the next middleware.
type LocalNodeWillStop ¶
LocalNodeWillStop is called right before local node stops listening and handling messages. Returns if we should proceed to the next middleware.
type MessageEncoded ¶
type MessageEncoded struct { Func func(*RemoteNode, []byte) ([]byte, bool) Priority int32 }
MessageEncoded is called when a protobuf.Message is encoded into bytes and is about to be sent to RemoteNode. This is a good place for transcoding. Returns the bytes to send and whether we should proceed to the next middleware. If returned bytes is nil, msg will be dropped.
type MessageWillDecode ¶
type MessageWillDecode struct { Func func(*RemoteNode, []byte) ([]byte, bool) Priority int32 }
MessageWillDecode is called when bytes is received from a RemoteNode and is about to be decoded into protobuf.Message. This is a good place for transcoding. Returns the bytes to send and whether we should proceed to the next middleware. If returned bytes is nil, msg will be dropped.
type RemoteMessage ¶
type RemoteMessage struct { RemoteNode *RemoteNode Msg *pbmsg.Message }
RemoteMessage is the received msg from remote node. RemoteNode is nil if message is sent by local node.
func NewRemoteMessage ¶
func NewRemoteMessage(rn *RemoteNode, msg *pbmsg.Message) (*RemoteMessage, error)
NewRemoteMessage creates a RemoteMessage with remote node rn and msg
type RemoteNode ¶
type RemoteNode struct { *Node LocalNode *LocalNode IsOutbound bool sync.RWMutex // contains filtered or unexported fields }
RemoteNode is a remote node
func NewRemoteNode ¶
func NewRemoteNode(localNode *LocalNode, conn net.Conn, isOutbound bool, n *pbnode.Node) (*RemoteNode, error)
NewRemoteNode creates a remote node
func (*RemoteNode) ExchangeNode ¶
func (rn *RemoteNode) ExchangeNode() (*pbnode.Node, error)
ExchangeNode sends a ExchangeNode message to remote node and wait for reply
func (*RemoteNode) GetConn ¶
func (rn *RemoteNode) GetConn() net.Conn
GetConn returns the connection with remote node
func (*RemoteNode) GetLastRxTime ¶
func (rn *RemoteNode) GetLastRxTime() time.Time
GetLastRxTime returns the time received last ping response
func (*RemoteNode) GetRoundTripTime ¶
func (rn *RemoteNode) GetRoundTripTime() time.Duration
GetRoundTripTime returns the measured round trip time between local node and remote node. Will return 0 if no result available yet.
func (*RemoteNode) NotifyStop ¶
func (rn *RemoteNode) NotifyStop() error
NotifyStop sends a Stop message to remote node to notify it that we will close connection with it
func (*RemoteNode) Ping ¶
func (rn *RemoteNode) Ping() error
Ping sends a Ping message to remote node and wait for reply
func (*RemoteNode) SendMessage ¶
func (rn *RemoteNode) SendMessage(msg *pbmsg.Message, hasReply bool, replyTimeout time.Duration) (<-chan *RemoteMessage, error)
SendMessage marshals and sends msg, will returns a RemoteMessage chan if hasReply is true and reply is received within replyTimeout.
func (*RemoteNode) SendMessageAsync ¶
func (rn *RemoteNode) SendMessageAsync(msg *pbmsg.Message) error
SendMessageAsync sends msg and returns if there is an error
func (*RemoteNode) SendMessageSync ¶
func (rn *RemoteNode) SendMessageSync(msg *pbmsg.Message, replyTimeout time.Duration) (*RemoteMessage, error)
SendMessageSync sends msg, returns reply message or error if reply is not received within replyTimeout. Will use default reply timeout in config if replyTimeout = 0.
func (*RemoteNode) Start ¶
func (rn *RemoteNode) Start() error
Start starts the runtime loop of the remote node
func (*RemoteNode) Stop ¶
func (rn *RemoteNode) Stop(err error)
Stop stops the runtime loop of the remote node
func (*RemoteNode) String ¶
func (rn *RemoteNode) String() string
type RemoteNodeConnected ¶
type RemoteNodeConnected struct { Func func(*RemoteNode) bool Priority int32 }
RemoteNodeConnected is called when a connection is established with a remote node, but the remote node id is typically nil, so it's not a good time to use the node yet, but can be used to stop the connection to remote node. Returns if we should proceed to the next middleware.
type RemoteNodeDisconnected ¶
type RemoteNodeDisconnected struct { Func func(*RemoteNode) bool Priority int32 }
RemoteNodeDisconnected is called when connection to remote node is closed. The cause of the connection close can be on either local node or remote node. Returns if we should proceed to the next middleware.
type RemoteNodeReady ¶
type RemoteNodeReady struct { Func func(*RemoteNode) bool Priority int32 }
RemoteNodeReady is called when local node has received the node info from remote node and the remote node is ready to use. Returns if we should proceed to the next middleware.