Documentation ¶
Index ¶
- Constants
- func BroadcastEvent(ctx context.Context, ps *pubsub.PubSub, topicName string, ...) error
- func GetEthAddress(remotePeer peer.ID, n network.Network) string
- func PrettyDuration(d time.Duration) string
- func SortNodesByTwitterReliability(nodes []NodeData)
- func StreamConsoleTo(ctx context.Context, topic *pubsub.Topic)
- type ConnectBufferEntry
- type JSONMultiaddr
- type Manager
- func (sm *Manager) AddSubscription(topicName string, handler types.SubscriptionHandler, includeSelf bool) error
- func (sm *Manager) GetHandler(topic string) (types.SubscriptionHandler, error)
- func (sm *Manager) GetSubscription(topic string) (*pubsub.Subscription, error)
- func (sm *Manager) GetTopicNames() []string
- func (sm *Manager) Publish(topic string, data []byte) error
- func (sm *Manager) PublishMessage(topicName, message string) error
- func (sm *Manager) RemoveSubscription(topic string) error
- func (sm *Manager) SetUpSubscriptions()
- func (sm *Manager) Subscribe(topicName string, handler types.SubscriptionHandler) error
- type NodeData
- func (n *NodeData) Address() string
- func (n *NodeData) CanDoWork(workerType WorkerCategory) bool
- func (n *NodeData) DiscordScraper() bool
- func (n *NodeData) GetAccumulatedUptime() time.Duration
- func (n *NodeData) GetCurrentUptime() time.Duration
- func (n *NodeData) Joined(nodeVersion string)
- func (n *NodeData) Left()
- func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr)
- func (n *NodeData) TelegramScraper() bool
- func (n *NodeData) TwitterScraper() bool
- func (n *NodeData) UpdateAccumulatedUptime()
- func (nd *NodeData) UpdateTwitterFields(fields NodeData)
- func (n *NodeData) WebScraper() bool
- type NodeEventTracker
- func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error
- func (net *NodeEventTracker) ClearExpiredBufferEntries()
- func (net *NodeEventTracker) ClearExpiredWorkerTimeouts()
- func (net *NodeEventTracker) Connected(n network.Network, c network.Conn)
- func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn)
- func (net *NodeEventTracker) GetAllNodeData() []NodeData
- func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []NodeData
- func (net *NodeEventTracker) GetNodeData(peerID string) *NodeData
- func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData
- func (net *NodeEventTracker) HandleMessage(msg *pubsub.Message)
- func (net *NodeEventTracker) HandleNodeData(data NodeData)
- func (net *NodeEventTracker) IsStaked(peerID string) bool
- func (net *NodeEventTracker) Listen(n network.Network, a ma.Multiaddr)
- func (net *NodeEventTracker) ListenClose(n network.Network, a ma.Multiaddr)
- func (net *NodeEventTracker) RefreshFromBoot(data NodeData)
- func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context, hostId string)
- func (net *NodeEventTracker) UpdateNodeDataTwitter(peerID string, updates NodeData) error
- type NodeLifecycleEvent
- type NodeSorter
- type PublicKeyMessage
- type PublicKeySubscriptionHandler
- type SafeMap
- func (sm *SafeMap) Delete(key string)
- func (sm *SafeMap) Get(key string) (*NodeData, bool)
- func (sm *SafeMap) GetStakedNodesSlice() []NodeData
- func (sm *SafeMap) Len() int
- func (sm *SafeMap) MarshalJSON() ([]byte, error)
- func (sm *SafeMap) Set(key string, value *NodeData)
- func (sm *SafeMap) UnmarshalJSON(b []byte) error
- type TopicHandler
- type WorkerCategory
- type WorkerEventTracker
- type Workers
Constants ¶
const ( ActivityJoined = iota ActivityLeft )
Variables ¶
This section is empty.
Functions ¶
func BroadcastEvent ¶
func BroadcastEvent(ctx context.Context, ps *pubsub.PubSub, topicName string, event NodeLifecycleEvent) error
BroadcastEvent marshals a NodeLifecycleEvent into JSON, publishes it to the given PubSub topic, and logs the operation. Returns any error. This allows broadcasting node join/leave events to other nodes.
func GetEthAddress ¶
GetEthAddress returns the Ethereum address for the given remote peer. It gets the peer's public key from the network's peer store, converts it to a hex string, and converts that to an Ethereum address. Returns an empty string if there is no public key for the peer.
func PrettyDuration ¶
PrettyDuration takes a time.Duration and returns a string representation rounded to the nearest minute. It will include the number of days, hours, and minutes as applicable. For example:
- 1 day 2 hours 3 minutes
- 2 hours 3 minutes
- 3 minutes
func SortNodesByTwitterReliability ¶ added in v0.8.3
func SortNodesByTwitterReliability(nodes []NodeData)
SortNodesByTwitterReliability sorts the given nodes based on their Twitter reliability. It uses multiple criteria to determine the reliability and performance of nodes:
- Prioritizes nodes with more recent last returned tweet
- Then by higher number of returned tweets
- Considers the time since last timeout (longer time is better)
- Then by lower number of timeouts
- Deprioritizes nodes with more recent last not found time
- Finally, sorts by PeerId for stability when no performance data is available
The function modifies the input slice in-place, sorting the nodes from most to least reliable.
func StreamConsoleTo ¶
StreamConsoleTo streams data read from stdin to the given PubSub topic. It launches a goroutine that continuously reads from stdin using a bufio.Reader. Each line that is read is published to the topic. Any errors are logged. The goroutine runs until ctx is canceled.
Types ¶
type ConnectBufferEntry ¶
type JSONMultiaddr ¶
type JSONMultiaddr struct {
multiaddr.Multiaddr
}
func (*JSONMultiaddr) UnmarshalJSON ¶
func (m *JSONMultiaddr) UnmarshalJSON(b []byte) error
UnmarshalJSON implements json.Unmarshaler. It parses the JSON representation of a multiaddress and stores the result in m.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewPubSubManager ¶
NewPubSubManager creates a new PubSubManager instance. It initializes a new GossipSub and associates it with the given host. It also initializes data structures to track topics, subscriptions and handlers. The PublicKeyPublisher is initialized to enable publishing public keys over pubsub. The manager instance is returned, along with any error from initializing GossipSub.
func (*Manager) AddSubscription ¶
func (sm *Manager) AddSubscription(topicName string, handler types.SubscriptionHandler, includeSelf bool) error
AddSubscription subscribes to the PubSub topic with the given topicName. It creates the topic if needed, subscribes to it, and adds the subscription and handler to the manager's maps. It launches a goroutine to handle incoming messages, skipping messages from self, and calling the handler on each message.
func (*Manager) GetHandler ¶
func (sm *Manager) GetHandler(topic string) (types.SubscriptionHandler, error)
GetHandler returns the types.SubscriptionHandler for the given topic name. It returns an error if no handler exists for the given topic.
func (*Manager) GetSubscription ¶
func (sm *Manager) GetSubscription(topic string) (*pubsub.Subscription, error)
GetSubscription returns the Subscription for the given topic name. It returns an error if no Subscription exists for the given topic.
func (*Manager) GetTopicNames ¶
GetTopicNames returns a slice of the names of all topics currently managed.
func (*Manager) Publish ¶
Publish publishes a message to the PubSub topic with the given topic name. It returns an error if no topic with the given name exists.
func (*Manager) PublishMessage ¶
PublishMessage publishes a message to the PubSub topic with the given topicName. It converts the message to a byte slice, checks if the topic exists, optionally creates the topic if it doesn't exist, and publishes using the existing Publish method. Returns an error if the topic does not exist and cannot be created.
func (*Manager) RemoveSubscription ¶
RemoveSubscription unsubscribes from the PubSub topic with the given topic name. It closes the existing subscription, removes it from the manager's subscription map, and removes the associated handler. Returns an error if no subscription exists for the given topic.
func (*Manager) SetUpSubscriptions ¶
func (sm *Manager) SetUpSubscriptions()
SetUpSubscriptions sets up default subscriptions for the PubSub manager based on predefined topics and handlers. This allows initializing subscriptions separately from creating the handlers.
func (*Manager) Subscribe ¶
func (sm *Manager) Subscribe(topicName string, handler types.SubscriptionHandler) error
Subscribe registers a subscription handler to receive messages for the given topic name. It gets the existing subscription, saves it and the handler, and starts a goroutine to call the handler for each new message. Returns an error if unable to get the subscription.
type NodeData ¶
type NodeData struct { Multiaddrs []JSONMultiaddr `json:"multiaddrs,omitempty"` MultiaddrsString string `json:"multiaddrsString,omitempty"` PeerId peer.ID `json:"peerId"` FirstJoinedUnix int64 `json:"firstJoined,omitempty"` LastJoinedUnix int64 `json:"lastJoined,omitempty"` LastLeftUnix int64 `json:"-"` LastUpdatedUnix int64 `json:"lastUpdated,omitempty"` CurrentUptime time.Duration `json:"uptime,omitempty"` CurrentUptimeStr string `json:"uptimeStr,omitempty"` AccumulatedUptime time.Duration `json:"accumulatedUptime,omitempty"` AccumulatedUptimeStr string `json:"accumulatedUptimeStr,omitempty"` EthAddress string `json:"ethAddress,omitempty"` Activity int `json:"activity,omitempty"` IsActive bool `json:"isActive"` IsStaked bool `json:"isStaked"` SelfIdentified bool `json:"-"` IsValidator bool `json:"isValidator"` IsTwitterScraper bool `json:"isTwitterScraper"` IsDiscordScraper bool `json:"isDiscordScraper"` IsTelegramScraper bool `json:"isTelegramScraper"` IsWebScraper bool `json:"isWebScraper"` Records any `json:"records,omitempty"` Version string `json:"version"` WorkerTimeout time.Time `json:"workerTimeout,omitempty"` ReturnedTweets int `json:"returnedTweets"` // a running count of the number of tweets returned LastReturnedTweet time.Time `json:"lastReturnedTweet"` TweetTimeout bool `json:"tweetTimeout"` TweetTimeouts int `json:"tweetTimeouts"` // a running countthe number of times a tweet request times out LastTweetTimeout time.Time `json:"lastTweetTimeout"` LastNotFoundTime time.Time `json:"lastNotFoundTime"` NotFoundCount int `json:"notFoundCount"` // a running count of the number of times a node is not found }
func NewNodeData ¶
func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData
NewNodeData creates a new NodeData struct initialized with the given parameters. It is used to represent data about a node in the network.
func (*NodeData) Address ¶
Address returns a string representation of the NodeData's multiaddress and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC". This can be used by other nodes to connect to this node.
func (*NodeData) CanDoWork ¶ added in v0.5.1
func (n *NodeData) CanDoWork(workerType WorkerCategory) bool
CanDoWork checks if the node can perform work of the specified WorkerType. It returns true if the node is configured for the given worker type, false otherwise.
func (*NodeData) DiscordScraper ¶
DiscordScraper checks if the current node is configured as a Discord scraper. It retrieves the configuration instance and returns the value of the DiscordScraper field.
func (*NodeData) GetAccumulatedUptime ¶
GetAccumulatedUptime returns the total accumulated uptime for the node. It calculates this by adding the current uptime to the stored accumulated uptime.
func (*NodeData) GetCurrentUptime ¶
GetCurrentUptime returns the node's current uptime duration. If the node is active, it calculates the time elapsed since the last joined time. If the node is marked as left, it returns 0.
func (*NodeData) Joined ¶
Joined updates the NodeData when the node joins the network. It sets the join times, activity, active status, and logs based on stake status.
func (*NodeData) Left ¶
func (n *NodeData) Left()
Left updates the NodeData when the node leaves the network. It sets the leave time, stops uptime, sets activity to left, sets node as inactive, and logs based on stake status.
func (*NodeData) MergeMultiaddresses ¶ added in v0.7.0
func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr)
func (*NodeData) TelegramScraper ¶ added in v0.5.1
TelegramScraper checks if the current node is configured as a Discord scraper. It retrieves the configuration instance and returns the value of the TelegramScraper field.
func (*NodeData) TwitterScraper ¶
TwitterScraper checks if the current node is configured as a Twitter scraper. It retrieves the configuration instance and returns the value of the TwitterScraper field.
func (*NodeData) UpdateAccumulatedUptime ¶
func (n *NodeData) UpdateAccumulatedUptime()
UpdateAccumulatedUptime updates the accumulated uptime of the node to account for any discrepancy between the last left and last joined times from gossipsub events. It calculates the duration between last left and joined if the node is marked as left. Otherwise, it uses the time since the last joined event.
func (*NodeData) UpdateTwitterFields ¶ added in v0.8.3
func (*NodeData) WebScraper ¶
WebScraper checks if the current node is configured as a Web scraper. It retrieves the configuration instance and returns the value of the WebScraper field.
type NodeEventTracker ¶
type NodeEventTracker struct { NodeDataChan chan *NodeData ConnectBuffer map[string]ConnectBufferEntry // contains filtered or unexported fields }
func NewNodeEventTracker ¶
func NewNodeEventTracker(version, environment, hostId string) *NodeEventTracker
NewNodeEventTracker creates a new NodeEventTracker instance. It initializes the node data map, node data channel, node data file path, connect buffer map. It loads existing node data from file, starts a goroutine to clear expired buffer entries, and returns the initialized instance.
func (*NodeEventTracker) AddOrUpdateNodeData ¶
func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error
AddOrUpdateNodeData adds or updates the node data in the node event tracker. If the node data does not exist, it is added and marked as self-identified. If the node data exists, it updates the staked status, Ethereum address, and multiaddress if needed. It also sends the updated node data to the NodeDataChan if the data changed or forceGossip is true.
func (*NodeEventTracker) ClearExpiredBufferEntries ¶
func (net *NodeEventTracker) ClearExpiredBufferEntries()
ClearExpiredBufferEntries periodically clears expired entries from the connect buffer cache. It loops forever sleeping for a configured interval. On each loop it checks the current time against the connect time for each entry, and if expired, processes the connect and removes the entry.
func (*NodeEventTracker) ClearExpiredWorkerTimeouts ¶ added in v0.5.1
func (net *NodeEventTracker) ClearExpiredWorkerTimeouts()
ClearExpiredWorkerTimeouts periodically checks and clears expired worker timeouts. It runs in an infinite loop, sleeping for 5 minutes between each iteration. For each node in the network, it checks if the worker timeout has expired (after 60 minutes). If a timeout has expired, it resets the WorkerTimeout to zero and updates the node data. This function helps manage the availability of workers in the network by clearing temporary timeout states.
func (*NodeEventTracker) Connected ¶
func (net *NodeEventTracker) Connected(n network.Network, c network.Conn)
Connected handles when a remote peer connects to this node. It checks if the connecting node already exists in nodeData. If not, it returns without doing anything. If it exists but is not active, it buffers the connect event. If it exists and is active, it marks the node as joined and saves the updated nodeData.
func (*NodeEventTracker) Disconnected ¶
func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn)
Disconnected handles when a remote peer disconnects from this node. It looks up the disconnected node's data. If it doesn't exist, it returns. If it exists but is not staked, it returns. Otherwise it handles disconnect logic like updating the node data, deleting buffered connect events, and sending updated node data through the channel.
func (*NodeEventTracker) GetAllNodeData ¶
func (net *NodeEventTracker) GetAllNodeData() []NodeData
GetAllNodeData returns a slice containing the NodeData for all nodes currently tracked.
func (*NodeEventTracker) GetEligibleWorkerNodes ¶ added in v0.6.0
func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []NodeData
GetEligibleWorkerNodes returns a slice of NodeData for nodes that are eligible to perform a specific category of work.
func (*NodeEventTracker) GetNodeData ¶
func (net *NodeEventTracker) GetNodeData(peerID string) *NodeData
GetNodeData returns the NodeData for the node with the given peer ID, or nil if no NodeData exists for that peer ID.
func (*NodeEventTracker) GetUpdatedNodes ¶
func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData
GetUpdatedNodes returns a slice of NodeData for nodes that have been updated since the given time. It filters the full node data set to only those updated after the passed in time, sorts the filtered results by update timestamp, and returns the sorted slice.
func (*NodeEventTracker) HandleMessage ¶
func (net *NodeEventTracker) HandleMessage(msg *pubsub.Message)
HandleMessage unmarshals the received pubsub message into a NodeData struct, and passes it to HandleNodeData for further processing. This allows the NodeEventTracker to handle incoming node data messages from the pubsub layer.
func (*NodeEventTracker) HandleNodeData ¶
func (net *NodeEventTracker) HandleNodeData(data NodeData)
HandleNodeData processes incoming NodeData from the pubsub layer. It adds new NodeData to the nodeData map, checks for replay attacks, and reconciles discrepancies between incoming and existing NodeData. This allows the tracker to maintain an up-to-date view of the node topology based on pubsub messages.
func (*NodeEventTracker) IsStaked ¶
func (net *NodeEventTracker) IsStaked(peerID string) bool
IsStaked returns whether the node with the given peerID is marked as staked in the node data tracker. Returns false if no node data is found for the given peerID.
func (*NodeEventTracker) Listen ¶
func (net *NodeEventTracker) Listen(n network.Network, a ma.Multiaddr)
Listen is called when the node starts listening on a new multiaddr. It logs the network and address that listening started on.
func (*NodeEventTracker) ListenClose ¶
func (net *NodeEventTracker) ListenClose(n network.Network, a ma.Multiaddr)
ListenClose logs when the node stops listening on a multiaddr. It logs the network and multiaddr that was stopped listening on.
func (*NodeEventTracker) RefreshFromBoot ¶
func (net *NodeEventTracker) RefreshFromBoot(data NodeData)
RefreshFromBoot updates the node data map with the provided NodeData when the node boots up. It associates the NodeData with the peer ID string as the map key.
func (*NodeEventTracker) StartCleanupRoutine ¶ added in v0.5.1
func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context, hostId string)
StartCleanupRoutine starts a goroutine that periodically checks for and removes stale peers
func (*NodeEventTracker) UpdateNodeDataTwitter ¶ added in v0.8.3
func (net *NodeEventTracker) UpdateNodeDataTwitter(peerID string, updates NodeData) error
type NodeLifecycleEvent ¶
type NodeLifecycleEvent struct { EventType string `json:"eventType"` // "join" or "leave" NodeID string `json:"nodeID"` Nonce int64 `json:"nonce"` Timestamp int64 `json:"timestamp"` }
NodeLifecycleEvent represents a join or leave event
type NodeSorter ¶ added in v0.8.3
type NodeSorter struct {
// contains filtered or unexported fields
}
NodeSorter provides methods for sorting NodeData slices
func (NodeSorter) Len ¶ added in v0.8.3
func (s NodeSorter) Len() int
Len returns the length of the nodes slice
func (NodeSorter) Less ¶ added in v0.8.3
func (s NodeSorter) Less(i, j int) bool
Less compares nodes at indices i and j using the provided less function
func (NodeSorter) Swap ¶ added in v0.8.3
func (s NodeSorter) Swap(i, j int)
Swap swaps the nodes at indices i and j
type PublicKeyMessage ¶
type PublicKeyMessage struct { PublicKey string `json:"publicKey"` Signature string `json:"signature"` Data string `json:"data"` }
PublicKeyMessage represents the structure of the public key messages.
type PublicKeySubscriptionHandler ¶
type PublicKeySubscriptionHandler struct { PublicKeys []PublicKeyMessage PubKeyTopic *pubsub.Topic }
PublicKeySubscriptionHandler handles incoming messages on public key topics.
func (*PublicKeySubscriptionHandler) GetPublicKeys ¶
func (handler *PublicKeySubscriptionHandler) GetPublicKeys() []PublicKeyMessage
GetPublicKeys returns the list of PublicKeyMessages.
func (*PublicKeySubscriptionHandler) HandleMessage ¶
func (handler *PublicKeySubscriptionHandler) HandleMessage(m *pubsub.Message)
HandleMessage handles incoming public key messages, with verification and update logic.
type SafeMap ¶
type SafeMap struct {
// contains filtered or unexported fields
}
func NewSafeMap ¶
func NewSafeMap() *SafeMap
NewSafeMap creates a new instance of SafeMap. It initializes the underlying map that will store the key-value pairs.
func (*SafeMap) Delete ¶
Delete removes the item with the specified key from the SafeMap. It acquires a write lock to ensure thread-safety while deleting the item.
func (*SafeMap) Get ¶
Get retrieves the value associated with the specified key from the SafeMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the SafeMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.
func (*SafeMap) GetStakedNodesSlice ¶
GetStakedNodesSlice returns a slice of NodeData for all staked nodes in the SafeMap. It creates a new slice, copies the NodeData from the SafeMap, and populates additional fields such as CurrentUptime, AccumulatedUptime, and their string representations. The resulting slice is sorted based on the LastUpdated timestamp of each NodeData.
func (*SafeMap) Len ¶
Len returns the number of items in the SafeMap. It acquires a read lock to ensure thread-safety while reading the length.
func (*SafeMap) MarshalJSON ¶
MarshalJSON override json MarshalJSON to just return the map
func (*SafeMap) Set ¶
Set associates the specified value with the specified key in the SafeMap. It acquires a write lock to ensure thread-safety while setting the value.
func (*SafeMap) UnmarshalJSON ¶
UnmarshalJSON override json UnmarshalJSON to just set the map
type TopicHandler ¶
type TopicHandler struct {
Subscription *pubsub.Subscription
}
TopicHandler is responsible for handling messages from subscribed topics.
func NewTopicHandler ¶
func NewTopicHandler() *TopicHandler
NewTopicHandler creates a new TopicHandler with necessary initializations.
func (*TopicHandler) HandleMessage ¶
func (h *TopicHandler) HandleMessage(msg *pubsub.Message)
HandleMessage processes messages received on the subscribed topics.
func (*TopicHandler) StartListening ¶
func (h *TopicHandler) StartListening()
StartListening starts listening to messages on the subscribed topic.
type WorkerCategory ¶ added in v0.5.1
type WorkerCategory int
WorkerCategory represents the main categories of workers
const ( CategoryDiscord WorkerCategory = iota CategoryTelegram CategoryTwitter CategoryWeb )
func (WorkerCategory) String ¶ added in v0.5.1
func (wc WorkerCategory) String() string
String returns the string representation of the WorkerCategory
type WorkerEventTracker ¶
type WorkerEventTracker struct { Workers []Workers WorkerTopic *pubsub.Topic WorkerStatusCh chan *pubsub.Message // contains filtered or unexported fields }
WorkerEventTracker is a struct that handles subscriptions for worker status updates. It contains the following fields: - WorkerTracker: A slice of WorkerTracker structs representing the status of workers. - Data: A byte slice containing the raw data received from subscriptions. - mu: A sync.Mutex used for synchronizing access to the handler's fields. - WorkerCh: A channel for sending worker status updates as byte slices.
func (*WorkerEventTracker) HandleMessage ¶
func (h *WorkerEventTracker) HandleMessage(m *pubsub.Message)
HandleMessage implements subscription WorkerEventTracker handler