Documentation ¶
Index ¶
- type MessageHub
- func (h *MessageHub) OnOwnProposal(proposal *flow.Header, targetPublicationTime time.Time)
- func (h *MessageHub) OnOwnTimeout(timeout *model.TimeoutObject)
- func (h *MessageHub) OnOwnVote(blockID flow.Identifier, view uint64, sigData []byte, ...)
- func (h *MessageHub) Process(channel channels.Channel, originID flow.Identifier, message interface{}) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MessageHub ¶
type MessageHub struct { *component.ComponentManager notifications.NoopConsumer // contains filtered or unexported fields }
MessageHub is a central module for handling incoming and outgoing messages via cluster consensus channel. It performs message routing for incoming messages by matching them by type and sending to respective engine. For incoming messages handling processing looks like this:
+-------------------+ +------------+ -->| Cluster-Channel |----->| MessageHub | +-------------------+ +------+-----+ ------------|------------ +------+---------+ | +------+-----+ | +------+------------+ | VoteAggregator |----+ | Compliance | +----| TimeoutAggregator | +----------------+ +------------+ +------+------------+ vote block timeout object
MessageHub acts as communicator and handles hotstuff.Consumer communication events to send votes, broadcast timeouts and proposals. It is responsible for communication between cluster consensus participants. It implements hotstuff.Consumer interface and needs to be subscribed for notifications via pub/sub. All communicator events are handled on worker thread to prevent sender from blocking. For outgoing messages processing logic looks like this:
+-------------------+ +------------+ +----------+ +------------------------+ | Cluster-Channel |<-----| MessageHub |<-----| Consumer |<-----| Hotstuff | +-------------------+ +------+-----+ +----------+ +------------------------+ pub/sub vote, timeout, proposal
MessageHub is safe to use in concurrent environment.
func NewMessageHub ¶
func NewMessageHub(log zerolog.Logger, engineMetrics module.EngineMetrics, net network.Network, me module.Local, compliance collection.Compliance, hotstuff module.HotStuff, voteAggregator hotstuff.VoteAggregator, timeoutAggregator hotstuff.TimeoutAggregator, state protocol.State, clusterState clusterkv.State, payloads storage.ClusterPayloads, ) (*MessageHub, error)
NewMessageHub constructs new instance of message hub No errors are expected during normal operations.
func (*MessageHub) OnOwnProposal ¶
func (h *MessageHub) OnOwnProposal(proposal *flow.Header, targetPublicationTime time.Time)
OnOwnProposal directly forwards proposal to HotStuff core logic(skipping compliance engine as we assume our own proposals to be correct) and queues proposal for subsequent propagation to all consensus participants (including this node). The proposal will only be placed in the queue, after the specified delay (or dropped on shutdown signal).
func (*MessageHub) OnOwnTimeout ¶
func (h *MessageHub) OnOwnTimeout(timeout *model.TimeoutObject)
OnOwnTimeout forwards timeout to node's internal `timeoutAggregator` and queues timeout for subsequent propagation to all consensus participants (excluding this node)
func (*MessageHub) OnOwnVote ¶
func (h *MessageHub) OnOwnVote(blockID flow.Identifier, view uint64, sigData []byte, recipientID flow.Identifier)
OnOwnVote propagates the vote to relevant recipient(s):
- [common case] vote is queued and is sent via unicast to another node that is the next leader by worker
- [special case] this node is the next leader: vote is directly forwarded to the node's internal `VoteAggregator`
func (*MessageHub) Process ¶
func (h *MessageHub) Process(channel channels.Channel, originID flow.Identifier, message interface{}) error
Process handles incoming messages from consensus channel. After matching message by type, sends it to the correct component for handling. No errors are expected during normal operations.