node

package
v0.0.0-...-ecfc0d7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2024 License: Apache-2.0 Imports: 20 Imported by: 8

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BytesReceived

type BytesReceived struct {
	Func     func(data, msgID, srcID []byte, remoteNode *RemoteNode) ([]byte, bool)
	Priority int32
}

BytesReceived is called when local node receive user-defined BYTES message. Message with the same message id will only trigger this middleware once. The argument it accepts are bytes data, message ID (can be used to reply message), sender ID, and the neighbor that passes you the message (may be different from the message sneder). Returns the bytes data to be passed in the next middleware and if we should proceed to the next middleware.

type ConnectionAccepted

type ConnectionAccepted struct {
	Func     func(net.Conn) (bool, bool)
	Priority int32
}

ConnectionAccepted is called when a network connection is accepted. Returns if we should accept the connection and if we should proceed to the next middleware.

type LocalNode

type LocalNode struct {
	*Node
	*config.Config
	// contains filtered or unexported fields
}

LocalNode is a local node

func NewLocalNode

func NewLocalNode(id []byte, conf *config.Config) (*LocalNode, error)

NewLocalNode creates a local node

func (*LocalNode) AddToRxCache

func (ln *LocalNode) AddToRxCache(msgID []byte) (bool, error)

AddToRxCache add RemoteMessage id to rxMsgCache if not exists. Returns if msg id is added (instead of loaded) and error when adding

func (*LocalNode) AllocReplyChan

func (ln *LocalNode) AllocReplyChan(msgID []byte, expiration time.Duration) (chan *RemoteMessage, error)

AllocReplyChan creates a reply chan for msg with id msgID

func (LocalNode) ApplyMiddleware

func (store LocalNode) ApplyMiddleware(mw interface{}) error

ApplyMiddleware add a middleware to the store

func (*LocalNode) Connect

func (ln *LocalNode) Connect(n *pbnode.Node) (*RemoteNode, bool, error)

Connect try to establish connection with address remoteNodeAddr, returns the remote node, if the remote node is ready, and error. The remote rode can be nil if another goroutine is connecting to the same address concurrently. The remote node is ready if an active connection to the remoteNodeAddr exists and node info has been exchanged.

func (*LocalNode) GetNeighbors

func (ln *LocalNode) GetNeighbors(filter func(*RemoteNode) bool) ([]*RemoteNode, error)

GetNeighbors returns a list of remote nodes that are connected to local nodes where the filter function returns true. Pass nil filter to return all neighbors.

func (*LocalNode) GetReplyChan

func (ln *LocalNode) GetReplyChan(msgID []byte) (chan *RemoteMessage, bool)

GetReplyChan gets the message reply channel for message id msgID

func (*LocalNode) GetRxMsgChan

func (ln *LocalNode) GetRxMsgChan(routingType pbmsg.RoutingType) (chan *RemoteMessage, error)

GetRxMsgChan gets the message channel of a routing type, or return error if channel for routing type does not exist

func (*LocalNode) HandleRemoteMessage

func (ln *LocalNode) HandleRemoteMessage(remoteMsg *RemoteMessage) error

HandleRemoteMessage add remoteMsg to handleMsgChan for further processing

func (*LocalNode) NewExchangeNodeMessage

func (ln *LocalNode) NewExchangeNodeMessage() (*pbmsg.Message, error)

NewExchangeNodeMessage creates a EXCHANGE_NODE message to get node info

func (*LocalNode) NewExchangeNodeReply

func (ln *LocalNode) NewExchangeNodeReply(replyToID []byte) (*pbmsg.Message, error)

NewExchangeNodeReply creates a EXCHANGE_NODE reply to send node info

func (*LocalNode) NewPingMessage

func (ln *LocalNode) NewPingMessage() (*pbmsg.Message, error)

NewPingMessage creates a PING message for heartbeat

func (*LocalNode) NewPingReply

func (ln *LocalNode) NewPingReply(replyToID []byte) (*pbmsg.Message, error)

NewPingReply creates a PING reply for heartbeat

func (*LocalNode) NewStopMessage

func (ln *LocalNode) NewStopMessage() (*pbmsg.Message, error)

NewStopMessage creates a STOP message to notify local node to close connection with remote node

func (*LocalNode) RegisterRoutingType

func (ln *LocalNode) RegisterRoutingType(routingType pbmsg.RoutingType)

RegisterRoutingType register a routing type and creates the rxMsgChan for it

func (*LocalNode) SetInternalPort

func (ln *LocalNode) SetInternalPort(port uint16)

SetInternalPort changes the internal port that local node will listen on. It should not be called once the node starts. Note that it does not change the external port that other nodes will connect to.

func (*LocalNode) Start

func (ln *LocalNode) Start() error

Start starts the runtime loop of the local node

func (*LocalNode) StartRemoteNode

func (ln *LocalNode) StartRemoteNode(conn net.Conn, isOutbound bool, n *pbnode.Node) (*RemoteNode, error)

StartRemoteNode creates and starts a remote node using conn. If n is not nil, its id and address will be used to validate ExchangeNode info.

func (*LocalNode) Stop

func (ln *LocalNode) Stop(err error)

Stop stops the local node

type LocalNodeStarted

type LocalNodeStarted struct {
	Func     func(*LocalNode) bool
	Priority int32
}

LocalNodeStarted is called right after local node starts listening and handling messages. Returns if we should proceed to the next middleware.

type LocalNodeStopped

type LocalNodeStopped struct {
	Func     func(*LocalNode) bool
	Priority int32
}

LocalNodeStopped is called right after local node stops listening and handling messages. Returns if we should proceed to the next middleware.

type LocalNodeWillStart

type LocalNodeWillStart struct {
	Func     func(*LocalNode) bool
	Priority int32
}

LocalNodeWillStart is called right before local node starts listening and handling messages. It can be used to add additional data to local node, etc. Returns if we should proceed to the next middleware.

type LocalNodeWillStop

type LocalNodeWillStop struct {
	Func     func(*LocalNode) bool
	Priority int32
}

LocalNodeWillStop is called right before local node stops listening and handling messages. Returns if we should proceed to the next middleware.

type MessageEncoded

type MessageEncoded struct {
	Func     func(*RemoteNode, []byte) ([]byte, bool)
	Priority int32
}

MessageEncoded is called when a protobuf.Message is encoded into bytes and is about to be sent to RemoteNode. This is a good place for transcoding. Returns the bytes to send and whether we should proceed to the next middleware. If returned bytes is nil, msg will be dropped.

type MessageWillDecode

type MessageWillDecode struct {
	Func     func(*RemoteNode, []byte) ([]byte, bool)
	Priority int32
}

MessageWillDecode is called when bytes is received from a RemoteNode and is about to be decoded into protobuf.Message. This is a good place for transcoding. Returns the bytes to send and whether we should proceed to the next middleware. If returned bytes is nil, msg will be dropped.

type Node

type Node struct {
	sync.RWMutex
	*pbnode.Node
	common.LifeCycle
}

Node is a remote or local node

func NewNode

func NewNode(id []byte, addr string) (*Node, error)

NewNode creates a node

func (*Node) String

func (n *Node) String() string

type RemoteMessage

type RemoteMessage struct {
	RemoteNode *RemoteNode
	Msg        *pbmsg.Message
}

RemoteMessage is the received msg from remote node. RemoteNode is nil if message is sent by local node.

func NewRemoteMessage

func NewRemoteMessage(rn *RemoteNode, msg *pbmsg.Message) (*RemoteMessage, error)

NewRemoteMessage creates a RemoteMessage with remote node rn and msg

type RemoteNode

type RemoteNode struct {
	*Node
	LocalNode  *LocalNode
	IsOutbound bool

	sync.RWMutex
	// contains filtered or unexported fields
}

RemoteNode is a remote node

func NewRemoteNode

func NewRemoteNode(localNode *LocalNode, conn net.Conn, isOutbound bool, n *pbnode.Node) (*RemoteNode, error)

NewRemoteNode creates a remote node

func (*RemoteNode) ExchangeNode

func (rn *RemoteNode) ExchangeNode() (*pbnode.Node, error)

ExchangeNode sends a ExchangeNode message to remote node and wait for reply

func (*RemoteNode) GetConn

func (rn *RemoteNode) GetConn() net.Conn

GetConn returns the connection with remote node

func (*RemoteNode) GetLastRxTime

func (rn *RemoteNode) GetLastRxTime() time.Time

GetLastRxTime returns the time received last ping response

func (*RemoteNode) GetRoundTripTime

func (rn *RemoteNode) GetRoundTripTime() time.Duration

GetRoundTripTime returns the measured round trip time between local node and remote node. Will return 0 if no result available yet.

func (*RemoteNode) NotifyStop

func (rn *RemoteNode) NotifyStop() error

NotifyStop sends a Stop message to remote node to notify it that we will close connection with it

func (*RemoteNode) Ping

func (rn *RemoteNode) Ping() error

Ping sends a Ping message to remote node and wait for reply

func (*RemoteNode) SendMessage

func (rn *RemoteNode) SendMessage(msg *pbmsg.Message, hasReply bool, replyTimeout time.Duration) (<-chan *RemoteMessage, error)

SendMessage marshals and sends msg, will returns a RemoteMessage chan if hasReply is true and reply is received within replyTimeout.

func (*RemoteNode) SendMessageAsync

func (rn *RemoteNode) SendMessageAsync(msg *pbmsg.Message) error

SendMessageAsync sends msg and returns if there is an error

func (*RemoteNode) SendMessageSync

func (rn *RemoteNode) SendMessageSync(msg *pbmsg.Message, replyTimeout time.Duration) (*RemoteMessage, error)

SendMessageSync sends msg, returns reply message or error if reply is not received within replyTimeout. Will use default reply timeout in config if replyTimeout = 0.

func (*RemoteNode) Start

func (rn *RemoteNode) Start() error

Start starts the runtime loop of the remote node

func (*RemoteNode) Stop

func (rn *RemoteNode) Stop(err error)

Stop stops the runtime loop of the remote node

func (*RemoteNode) String

func (rn *RemoteNode) String() string

type RemoteNodeConnected

type RemoteNodeConnected struct {
	Func     func(*RemoteNode) bool
	Priority int32
}

RemoteNodeConnected is called when a connection is established with a remote node, but the remote node id is typically nil, so it's not a good time to use the node yet, but can be used to stop the connection to remote node. Returns if we should proceed to the next middleware.

type RemoteNodeDisconnected

type RemoteNodeDisconnected struct {
	Func     func(*RemoteNode) bool
	Priority int32
}

RemoteNodeDisconnected is called when connection to remote node is closed. The cause of the connection close can be on either local node or remote node. Returns if we should proceed to the next middleware.

type RemoteNodeReady

type RemoteNodeReady struct {
	Func     func(*RemoteNode) bool
	Priority int32
}

RemoteNodeReady is called when local node has received the node info from remote node and the remote node is ready to use. Returns if we should proceed to the next middleware.

type WillConnectToNode

type WillConnectToNode struct {
	Func     func(*pbnode.Node) (bool, bool)
	Priority int32
}

WillConnectToNode is called before local node connect to a new remote node. Returns if local node should continue connecting and if we should proceed to the next middleware.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL