Documentation ¶
Index ¶
- Constants
- func BroadcastEvent(ctx context.Context, ps *pubsub.PubSub, topicName string, ...) error
- func GetEthAddress(remotePeer peer.ID, n network.Network) string
- func GetSelfNodeDataJson(host host.Host, isStaked bool) []byte
- func PrettyDuration(d time.Duration) string
- func StreamConsoleTo(ctx context.Context, topic *pubsub.Topic)
- type BlockEventTracker
- type Blocks
- type ConnectBufferEntry
- type JSONMultiaddr
- type Manager
- func (sm *Manager) AddSubscription(topicName string, handler SubscriptionHandler, includeSelf bool) error
- func (sm *Manager) GetHandler(topic string) (SubscriptionHandler, error)
- func (sm *Manager) GetSubscription(topic string) (*pubsub.Subscription, error)
- func (sm *Manager) GetTopicNames() []string
- func (sm *Manager) Publish(topic string, data []byte) error
- func (sm *Manager) PublishMessage(topicName, message string) error
- func (sm *Manager) RemoveSubscription(topic string) error
- func (sm *Manager) SetUpSubscriptions()
- func (sm *Manager) Subscribe(topicName string, handler SubscriptionHandler) error
- type NodeData
- func (n *NodeData) Address() string
- func (n *NodeData) CalculateAccumulatedUptime()
- func (n *NodeData) CalculateCurrentUptime()
- func (n *NodeData) DiscordScraper() bool
- func (n *NodeData) GetAccumulatedUptime() time.Duration
- func (n *NodeData) GetCurrentUptime() time.Duration
- func (n *NodeData) Joined()
- func (n *NodeData) Left()
- func (n *NodeData) TwitterScraper() bool
- func (n *NodeData) UpdateAccumulatedUptime()
- func (n *NodeData) WebScraper() bool
- type NodeEventTracker
- func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error
- func (net *NodeEventTracker) ClearExpiredBufferEntries()
- func (net *NodeEventTracker) Connected(n network.Network, c network.Conn)
- func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn)
- func (net *NodeEventTracker) DumpNodeData()
- func (net *NodeEventTracker) GetAllNodeData() []NodeData
- func (net *NodeEventTracker) GetNodeData(peerID string) *NodeData
- func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData
- func (net *NodeEventTracker) HandleMessage(msg *pubsub.Message)
- func (net *NodeEventTracker) HandleNodeData(data NodeData)
- func (net *NodeEventTracker) IsStaked(peerID string) bool
- func (net *NodeEventTracker) Listen(n network.Network, a ma.Multiaddr)
- func (net *NodeEventTracker) ListenClose(n network.Network, a ma.Multiaddr)
- func (net *NodeEventTracker) LoadNodeData() error
- func (net *NodeEventTracker) RefreshFromBoot(data NodeData)
- type NodeLifecycleEvent
- type PublicKeyMessage
- type PublicKeyPublisher
- type PublicKeySubscriptionHandler
- type ResponseChannelMap
- type SafeMap
- func (sm *SafeMap) Delete(key string)
- func (sm *SafeMap) DumpNodeData(filePath string) error
- func (sm *SafeMap) Get(key string) (*NodeData, bool)
- func (sm *SafeMap) GetStakedNodesSlice() []NodeData
- func (sm *SafeMap) Len() int
- func (sm *SafeMap) LoadNodeData(filePath string) error
- func (sm *SafeMap) MarshalJSON() ([]byte, error)
- func (sm *SafeMap) Set(key string, value *NodeData)
- func (sm *SafeMap) UnmarshalJSON(b []byte) error
- type SubscriptionHandler
- type TopicHandler
- type WorkerEventTracker
- type Workers
Constants ¶
const ( ActivityJoined = iota ActivityLeft )
Variables ¶
This section is empty.
Functions ¶
func BroadcastEvent ¶
func BroadcastEvent(ctx context.Context, ps *pubsub.PubSub, topicName string, event NodeLifecycleEvent) error
BroadcastEvent marshals a NodeLifecycleEvent into JSON, publishes it to the given PubSub topic, and logs the operation. Returns any error. This allows broadcasting node join/leave events to other nodes.
func GetEthAddress ¶
GetEthAddress returns the Ethereum address for the given remote peer. It gets the peer's public key from the network's peerstore, converts it to a hex string, and converts that to an Ethereum address. Returns an empty string if there is no public key for the peer.
func GetSelfNodeDataJson ¶
GetSelfNodeDataJson converts the local node's data into a JSON byte array. It populates a NodeData struct with the node's ID, staking status, and Ethereum address. The NodeData struct is then marshalled into a JSON byte array. Returns nil if there is an error marshalling to JSON.
func PrettyDuration ¶
PrettyDuration takes a time.Duration and returns a string representation rounded to the nearest minute. It will include the number of days, hours, and minutes as applicable. For example:
- 1 day 2 hours 3 minutes
- 2 hours 3 minutes
- 3 minutes
func StreamConsoleTo ¶
StreamConsoleTo streams data read from stdin to the given PubSub topic. It launches a goroutine that continuously reads from stdin using a bufio.Reader. Each line that is read is published to the topic. Any errors are logged. The goroutine runs until ctx is canceled.
Types ¶
type BlockEventTracker ¶
type BlockEventTracker struct { Blocks []Blocks BlockTopic *pubsub.Topic BlocksCh chan *pubsub.Message // contains filtered or unexported fields }
BlockEventTracker is a struct that handles subscriptions for worker status updates.
func (*BlockEventTracker) HandleMessage ¶
func (b *BlockEventTracker) HandleMessage(m *pubsub.Message)
HandleMessage implements subscription BlockEventTracker handler
type ConnectBufferEntry ¶
type JSONMultiaddr ¶
type JSONMultiaddr struct {
multiaddr.Multiaddr
}
func (*JSONMultiaddr) UnmarshalJSON ¶
func (m *JSONMultiaddr) UnmarshalJSON(b []byte) error
UnmarshalJSON implements json.Unmarshaler. It parses the JSON representation of a multiaddress and stores the result in m.
type Manager ¶
type Manager struct { PublicKeyPublisher *PublicKeyPublisher // Add this line // contains filtered or unexported fields }
func NewPubSubManager ¶
NewPubSubManager creates a new PubSubManager instance. It initializes a new GossipSub and associates it with the given host. It also initializes data structures to track topics, subscriptions and handlers. The PublicKeyPublisher is initialized to enable publishing public keys over pubsub. The manager instance is returned, along with any error from initializing GossipSub.
func (*Manager) AddSubscription ¶
func (sm *Manager) AddSubscription(topicName string, handler SubscriptionHandler, includeSelf bool) error
AddSubscription subscribes to the PubSub topic with the given topicName. It creates the topic if needed, subscribes to it, and adds the subscription and handler to the manager's maps. It launches a goroutine to handle incoming messages, skipping messages from self, and calling the handler on each message.
func (*Manager) GetHandler ¶
func (sm *Manager) GetHandler(topic string) (SubscriptionHandler, error)
GetHandler returns the SubscriptionHandler for the given topic name. It returns an error if no handler exists for the given topic.
func (*Manager) GetSubscription ¶
func (sm *Manager) GetSubscription(topic string) (*pubsub.Subscription, error)
GetSubscription returns the Subscription for the given topic name. It returns an error if no Subscription exists for the given topic.
func (*Manager) GetTopicNames ¶
GetTopicNames returns a slice of the names of all topics currently managed.
func (*Manager) Publish ¶
Publish publishes a message to the PubSub topic with the given topic name. It returns an error if no topic with the given name exists.
func (*Manager) PublishMessage ¶
PublishMessage publishes a message to the PubSub topic with the given topicName. It converts the message to a byte slice, checks if the topic exists, optionally creates the topic if it doesn't exist, and publishes using the existing Publish method. Returns an error if the topic does not exist and cannot be created.
func (*Manager) RemoveSubscription ¶
RemoveSubscription unsubscribes from the PubSub topic with the given topic name. It closes the existing subscription, removes it from the manager's subscription map, and removes the associated handler. Returns an error if no subscription exists for the given topic.
func (*Manager) SetUpSubscriptions ¶
func (sm *Manager) SetUpSubscriptions()
SetUpSubscriptions sets up default subscriptions for the PubSub manager based on predefined topics and handlers. This allows initializing subscriptions separately from creating the handlers.
func (*Manager) Subscribe ¶
func (sm *Manager) Subscribe(topicName string, handler SubscriptionHandler) error
Subscribe registers a subscription handler to receive messages for the given topic name. It gets the existing subscription, saves it and the handler, and starts a goroutine to call the handler for each new message. Returns an error if unable to get the subscription.
type NodeData ¶
type NodeData struct { Multiaddrs []JSONMultiaddr `json:"multiaddrs,omitempty"` PeerId peer.ID `json:"peerId"` FirstJoinedUnix int64 `json:"firstJoined,omitempty"` LastJoinedUnix int64 `json:"lastJoined,omitempty"` LastLeftUnix int64 `json:"-"` LastUpdatedUnix int64 `json:"lastUpdated,omitempty"` CurrentUptime time.Duration `json:"uptime,omitempty"` CurrentUptimeStr string `json:"uptimeStr,omitempty"` AccumulatedUptime time.Duration `json:"accumulatedUptime,omitempty"` AccumulatedUptimeStr string `json:"accumulatedUptimeStr,omitempty"` EthAddress string `json:"ethAddress,omitempty"` Activity int `json:"activity,omitempty"` IsActive bool `json:"isActive"` IsStaked bool `json:"isStaked"` SelfIdentified bool `json:"-"` IsValidator bool `json:"isValidator"` IsTwitterScraper bool `json:"isTwitterScraper"` IsDiscordScraper bool `json:"isDiscordScraper"` IsWebScraper bool `json:"isWebScraper"` BytesScraped int `json:"bytesScraped"` Records any `json:"records,omitempty"` Version string `json:"version"` }
func NewNodeData ¶
func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData
NewNodeData creates a new NodeData struct initialized with the given parameters. It is used to represent data about a node in the network.
func (*NodeData) Address ¶
Address returns a string representation of the NodeData's multiaddress and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC". This can be used by other nodes to connect to this node.
func (*NodeData) CalculateAccumulatedUptime ¶
func (n *NodeData) CalculateAccumulatedUptime()
CalculateAccumulatedUptime calculates the accumulated uptime based on Unix timestamps.
func (*NodeData) CalculateCurrentUptime ¶
func (n *NodeData) CalculateCurrentUptime()
CalculateCurrentUptime calculates the current uptime based on Unix timestamps.
func (*NodeData) DiscordScraper ¶
DiscordScraper checks if the current node is configured as a Discord scraper. It retrieves the configuration instance and returns the value of the DiscordScraper field.
func (*NodeData) GetAccumulatedUptime ¶
GetAccumulatedUptime returns the total accumulated uptime for the node. It calculates this by adding the current uptime to the stored accumulated uptime.
func (*NodeData) GetCurrentUptime ¶
GetCurrentUptime returns the node's current uptime duration. If the node is active, it calculates the time elapsed since the last joined time. If the node is marked as left, it returns 0.
func (*NodeData) Joined ¶
func (n *NodeData) Joined()
Joined updates the NodeData when the node joins the network. It sets the join times, activity, active status, and logs based on stake status.
func (*NodeData) Left ¶
func (n *NodeData) Left()
Left updates the NodeData when the node leaves the network. It sets the leave time, stops uptime, sets activity to left, sets node as inactive, and logs based on stake status.
func (*NodeData) TwitterScraper ¶
TwitterScraper checks if the current node is configured as a Twitter scraper. It retrieves the configuration instance and returns the value of the TwitterScraper field.
func (*NodeData) UpdateAccumulatedUptime ¶
func (n *NodeData) UpdateAccumulatedUptime()
UpdateAccumulatedUptime updates the accumulated uptime of the node to account for any discrepancy between the last left and last joined times from gossipsub events. It calculates the duration between last left and joined if the node is marked as left. Otherwise, it uses the time since the last joined event.
func (*NodeData) WebScraper ¶
WebScraper checks if the current node is configured as a Web scraper. It retrieves the configuration instance and returns the value of the WebScraper field.
type NodeEventTracker ¶
type NodeEventTracker struct { NodeDataChan chan *NodeData ConnectBuffer map[string]ConnectBufferEntry // contains filtered or unexported fields }
func NewNodeEventTracker ¶
func NewNodeEventTracker(version, environment string) *NodeEventTracker
NewNodeEventTracker creates a new NodeEventTracker instance. It initializes the node data map, node data channel, node data file path, connect buffer map. It loads existing node data from file, starts a goroutine to clear expired buffer entries, and returns the initialized instance.
func (*NodeEventTracker) AddOrUpdateNodeData ¶
func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error
AddOrUpdateNodeData adds or updates the node data in the node event tracker. If the node data does not exist, it is added and marked as self-identified. If the node data exists, it updates the staked status, Ethereum address, and multiaddress if needed. It also sends the updated node data to the NodeDataChan if the data changed or forceGossip is true.
func (*NodeEventTracker) ClearExpiredBufferEntries ¶
func (net *NodeEventTracker) ClearExpiredBufferEntries()
ClearExpiredBufferEntries periodically clears expired entries from the connect buffer cache. It loops forever sleeping for a configured interval. On each loop it checks the current time against the connect time for each entry, and if expired, processes the connect and removes the entry.
func (*NodeEventTracker) Connected ¶
func (net *NodeEventTracker) Connected(n network.Network, c network.Conn)
Connected handles when a remote peer connects to this node. It checks if the connecting node already exists in nodeData. If not, it returns without doing anything. If it exists but is not active, it buffers the connect event. If it exists and is active, it marks the node as joined and saves the updated nodeData.
func (*NodeEventTracker) Disconnected ¶
func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn)
Disconnected handles when a remote peer disconnects from this node. It looks up the disconnected node's data. If it doesn't exist, it returns. If it exists but is not staked, it returns. Otherwise it handles disconnect logic like updating the node data, deleting buffered connect events, and sending updated node data through the channel.
func (*NodeEventTracker) DumpNodeData ¶
func (net *NodeEventTracker) DumpNodeData()
DumpNodeData writes the NodeData map to a JSON file. It determines the file path based on the configured data directory, defaulting to nodeDataFile if not set. It logs any errors writing the file. This allows periodically persisting the node data. @note Obsoleted
func (*NodeEventTracker) GetAllNodeData ¶
func (net *NodeEventTracker) GetAllNodeData() []NodeData
GetAllNodeData returns a slice containing the NodeData for all nodes currently tracked.
func (*NodeEventTracker) GetNodeData ¶
func (net *NodeEventTracker) GetNodeData(peerID string) *NodeData
GetNodeData returns the NodeData for the node with the given peer ID, or nil if no NodeData exists for that peer ID.
func (*NodeEventTracker) GetUpdatedNodes ¶
func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData
GetUpdatedNodes returns a slice of NodeData for nodes that have been updated since the given time. It filters the full node data set to only those updated after the passed in time, sorts the filtered results by update timestamp, and returns the sorted slice.
func (*NodeEventTracker) HandleMessage ¶
func (net *NodeEventTracker) HandleMessage(msg *pubsub.Message)
HandleMessage unmarshals the received pubsub message into a NodeData struct, and passes it to HandleNodeData for further processing. This allows the NodeEventTracker to handle incoming node data messages from the pubsub layer.
func (*NodeEventTracker) HandleNodeData ¶
func (net *NodeEventTracker) HandleNodeData(data NodeData)
HandleNodeData processes incoming NodeData from the pubsub layer. It adds new NodeData to the nodeData map, checks for replay attacks, and reconciles discrepancies between incoming and existing NodeData. This allows the tracker to maintain an up-to-date view of the node topology based on pubsub messages.
func (*NodeEventTracker) IsStaked ¶
func (net *NodeEventTracker) IsStaked(peerID string) bool
IsStaked returns whether the node with the given peerID is marked as staked in the node data tracker. Returns false if no node data is found for the given peerID.
func (*NodeEventTracker) Listen ¶
func (net *NodeEventTracker) Listen(n network.Network, a ma.Multiaddr)
Listen is called when the node starts listening on a new multiaddr. It logs the network and address that listening started on.
func (*NodeEventTracker) ListenClose ¶
func (net *NodeEventTracker) ListenClose(n network.Network, a ma.Multiaddr)
ListenClose logs when the node stops listening on a multiaddr. It logs the network and multiaddr that was stopped listening on.
func (*NodeEventTracker) LoadNodeData ¶
func (net *NodeEventTracker) LoadNodeData() error
LoadNodeData loads the node data from a JSON file. It determines the file path based on the configured data directory, defaulting to nodeDataFile if not set. It logs any errors reading or parsing the file. This allows initializing the node data tracker from persisted data. @note Obsoleted
func (*NodeEventTracker) RefreshFromBoot ¶
func (net *NodeEventTracker) RefreshFromBoot(data NodeData)
RefreshFromBoot updates the node data map with the provided NodeData when the node boots up. It associates the NodeData with the peer ID string as the map key.
type NodeLifecycleEvent ¶
type NodeLifecycleEvent struct { EventType string `json:"eventType"` // "join" or "leave" NodeID string `json:"nodeID"` Nonce int64 `json:"nonce"` Timestamp int64 `json:"timestamp"` }
NodeLifecycleEvent represents a join or leave event
type PublicKeyMessage ¶
type PublicKeyMessage struct { PublicKey string `json:"publicKey"` Signature string `json:"signature"` Data string `json:"data"` }
PublicKeyMessage represents the structure of the public key messages.
type PublicKeyPublisher ¶
type PublicKeyPublisher struct {
// contains filtered or unexported fields
}
func NewPublicKeyPublisher ¶
func NewPublicKeyPublisher(manager *Manager, pubKey libp2pCrypto.PubKey) *PublicKeyPublisher
NewPublicKeyPublisher creates a new instance of PublicKeyPublisher.
func (*PublicKeyPublisher) GetPublishedMessages ¶
func (p *PublicKeyPublisher) GetPublishedMessages() []PublicKeyMessage
Optionally, add a method to retrieve the stored messages
func (*PublicKeyPublisher) PublishNodePublicKey ¶
func (p *PublicKeyPublisher) PublishNodePublicKey(publicKey string, data, signature []byte) error
PublishNodePublicKey publishes the node's public key to the designated topic.
type PublicKeySubscriptionHandler ¶
type PublicKeySubscriptionHandler struct { PublicKeys []PublicKeyMessage PubKeyTopic *pubsub.Topic }
PublicKeySubscriptionHandler handles incoming messages on public key topics.
func (*PublicKeySubscriptionHandler) GetPublicKeys ¶
func (handler *PublicKeySubscriptionHandler) GetPublicKeys() []PublicKeyMessage
GetPublicKeys returns the list of PublicKeyMessages.
func (*PublicKeySubscriptionHandler) HandleMessage ¶
func (handler *PublicKeySubscriptionHandler) HandleMessage(m *pubsub.Message)
HandleMessage handles incoming public key messages, with verification and update logic.
type ResponseChannelMap ¶
type ResponseChannelMap struct {
// contains filtered or unexported fields
}
func GetResponseChannelMap ¶
func GetResponseChannelMap() *ResponseChannelMap
GetResponseChannelMap returns the singleton instance of ResponseChannelMap.
func (*ResponseChannelMap) CreateChannel ¶
func (drm *ResponseChannelMap) CreateChannel(key string) chan []byte
func (*ResponseChannelMap) Delete ¶
func (drm *ResponseChannelMap) Delete(key string)
Delete removes the item with the specified key from the ResponseChannelMap. It acquires a write lock to ensure thread-safety while deleting the item.
func (*ResponseChannelMap) Get ¶
func (drm *ResponseChannelMap) Get(key string) (chan []byte, bool)
Get retrieves the value associated with the specified key from the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the ResponseChannelMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.
func (*ResponseChannelMap) Len ¶
func (drm *ResponseChannelMap) Len() int
Len returns the number of items in the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the length.
func (*ResponseChannelMap) Set ¶
func (drm *ResponseChannelMap) Set(key string, value chan []byte)
Set associates the specified value with the specified key in the ResponseChannelMap. It acquires a write lock to ensure thread-safety while setting the value.
type SafeMap ¶
type SafeMap struct {
// contains filtered or unexported fields
}
func NewSafeMap ¶
func NewSafeMap() *SafeMap
NewSafeMap creates a new instance of SafeMap. It initializes the underlying map that will store the key-value pairs.
func (*SafeMap) Delete ¶
Delete removes the item with the specified key from the SafeMap. It acquires a write lock to ensure thread-safety while deleting the item.
func (*SafeMap) DumpNodeData ¶
DumpNodeData writes the entire nodeData map to a file in JSON format. @note Obsoleted
func (*SafeMap) Get ¶
Get retrieves the value associated with the specified key from the SafeMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the SafeMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.
func (*SafeMap) GetStakedNodesSlice ¶
GetStakedNodesSlice returns a slice of NodeData for all staked nodes in the SafeMap. It creates a new slice, copies the NodeData from the SafeMap, and populates additional fields such as CurrentUptime, AccumulatedUptime, and their string representations. The resulting slice is sorted based on the LastUpdated timestamp of each NodeData.
func (*SafeMap) Len ¶
Len returns the number of items in the SafeMap. It acquires a read lock to ensure thread-safety while reading the length.
func (*SafeMap) LoadNodeData ¶
LoadNodeData reads nodeData from a file in JSON format and loads it into the map. @note Obsoleted
func (*SafeMap) MarshalJSON ¶
MarshalJSON override json MarshalJSON to just return the map
func (*SafeMap) Set ¶
Set associates the specified value with the specified key in the SafeMap. It acquires a write lock to ensure thread-safety while setting the value.
func (*SafeMap) UnmarshalJSON ¶
UnmarshalJSON override json UnmarshalJSON to just set the map
type SubscriptionHandler ¶
SubscriptionHandler defines the interface for handling pubsub messages. Implementations should subscribe to topics and handle incoming messages.
type TopicHandler ¶
type TopicHandler struct {
Subscription *pubsub.Subscription
}
TopicHandler is responsible for handling messages from subscribed topics.
func NewTopicHandler ¶
func NewTopicHandler() *TopicHandler
NewTopicHandler creates a new TopicHandler with necessary initializations.
func (*TopicHandler) HandleMessage ¶
func (h *TopicHandler) HandleMessage(msg *pubsub.Message)
HandleMessage processes messages received on the subscribed topics.
func (*TopicHandler) StartListening ¶
func (h *TopicHandler) StartListening()
StartListening starts listening to messages on the subscribed topic.
type WorkerEventTracker ¶
type WorkerEventTracker struct { Workers []Workers WorkerTopic *pubsub.Topic WorkerStatusCh chan *pubsub.Message // contains filtered or unexported fields }
WorkerEventTracker is a struct that handles subscriptions for worker status updates. It contains the following fields: - WorkerTracker: A slice of WorkerTracker structs representing the status of workers. - Data: A byte slice containing the raw data received from subscriptions. - mu: A sync.Mutex used for synchronizing access to the handler's fields. - WorkerCh: A channel for sending worker status updates as byte slices.
func (*WorkerEventTracker) HandleMessage ¶
func (h *WorkerEventTracker) HandleMessage(m *pubsub.Message)
HandleMessage implements subscription WorkerEventTracker handler