pubsub

package
v0.8.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ActivityJoined = iota
	ActivityLeft
)

Variables

This section is empty.

Functions

func BroadcastEvent

func BroadcastEvent(ctx context.Context, ps *pubsub.PubSub, topicName string, event NodeLifecycleEvent) error

BroadcastEvent marshals a NodeLifecycleEvent into JSON, publishes it to the given PubSub topic, and logs the operation. Returns any error. This allows broadcasting node join/leave events to other nodes.

func GetEthAddress

func GetEthAddress(remotePeer peer.ID, n network.Network) string

GetEthAddress returns the Ethereum address for the given remote peer. It gets the peer's public key from the network's peer store, converts it to a hex string, and converts that to an Ethereum address. Returns an empty string if there is no public key for the peer.

func PrettyDuration

func PrettyDuration(d time.Duration) string

PrettyDuration takes a time.Duration and returns a string representation rounded to the nearest minute. It will include the number of days, hours, and minutes as applicable. For example:

  • 1 day 2 hours 3 minutes
  • 2 hours 3 minutes
  • 3 minutes

func SortNodesByTwitterReliability added in v0.8.3

func SortNodesByTwitterReliability(nodes []NodeData)

SortNodesByTwitterReliability sorts the given nodes based on their Twitter reliability. It uses multiple criteria to determine the reliability and performance of nodes:

  1. Prioritizes nodes with more recent last returned tweet
  2. Then by higher number of returned tweets
  3. Considers the time since last timeout (longer time is better)
  4. Then by lower number of timeouts
  5. Deprioritizes nodes with more recent last not found time
  6. Finally, sorts by PeerId for stability when no performance data is available

The function modifies the input slice in-place, sorting the nodes from most to least reliable.

func StreamConsoleTo

func StreamConsoleTo(ctx context.Context, topic *pubsub.Topic)

StreamConsoleTo streams data read from stdin to the given PubSub topic. It launches a goroutine that continuously reads from stdin using a bufio.Reader. Each line that is read is published to the topic. Any errors are logged. The goroutine runs until ctx is canceled.

Types

type ConnectBufferEntry

type ConnectBufferEntry struct {
	NodeData    *NodeData
	ConnectTime time.Time
}

type JSONMultiaddr

type JSONMultiaddr struct {
	multiaddr.Multiaddr
}

func (*JSONMultiaddr) UnmarshalJSON

func (m *JSONMultiaddr) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler. It parses the JSON representation of a multiaddress and stores the result in m.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewPubSubManager

func NewPubSubManager(ctx context.Context, host host.Host) (*Manager, error)

NewPubSubManager creates a new PubSubManager instance. It initializes a new GossipSub and associates it with the given host. It also initializes data structures to track topics, subscriptions and handlers. The PublicKeyPublisher is initialized to enable publishing public keys over pubsub. The manager instance is returned, along with any error from initializing GossipSub.

func (*Manager) AddSubscription

func (sm *Manager) AddSubscription(topicName string, handler types.SubscriptionHandler, includeSelf bool) error

AddSubscription subscribes to the PubSub topic with the given topicName. It creates the topic if needed, subscribes to it, and adds the subscription and handler to the manager's maps. It launches a goroutine to handle incoming messages, skipping messages from self, and calling the handler on each message.

func (*Manager) GetHandler

func (sm *Manager) GetHandler(topic string) (types.SubscriptionHandler, error)

GetHandler returns the types.SubscriptionHandler for the given topic name. It returns an error if no handler exists for the given topic.

func (*Manager) GetSubscription

func (sm *Manager) GetSubscription(topic string) (*pubsub.Subscription, error)

GetSubscription returns the Subscription for the given topic name. It returns an error if no Subscription exists for the given topic.

func (*Manager) GetTopicNames

func (sm *Manager) GetTopicNames() []string

GetTopicNames returns a slice of the names of all topics currently managed.

func (*Manager) Publish

func (sm *Manager) Publish(topic string, data []byte) error

Publish publishes a message to the PubSub topic with the given topic name. It returns an error if no topic with the given name exists.

func (*Manager) PublishMessage

func (sm *Manager) PublishMessage(topicName, message string) error

PublishMessage publishes a message to the PubSub topic with the given topicName. It converts the message to a byte slice, checks if the topic exists, optionally creates the topic if it doesn't exist, and publishes using the existing Publish method. Returns an error if the topic does not exist and cannot be created.

func (*Manager) RemoveSubscription

func (sm *Manager) RemoveSubscription(topic string) error

RemoveSubscription unsubscribes from the PubSub topic with the given topic name. It closes the existing subscription, removes it from the manager's subscription map, and removes the associated handler. Returns an error if no subscription exists for the given topic.

func (*Manager) SetUpSubscriptions

func (sm *Manager) SetUpSubscriptions()

SetUpSubscriptions sets up default subscriptions for the PubSub manager based on predefined topics and handlers. This allows initializing subscriptions separately from creating the handlers.

func (*Manager) Subscribe

func (sm *Manager) Subscribe(topicName string, handler types.SubscriptionHandler) error

Subscribe registers a subscription handler to receive messages for the given topic name. It gets the existing subscription, saves it and the handler, and starts a goroutine to call the handler for each new message. Returns an error if unable to get the subscription.

type NodeData

type NodeData struct {
	Multiaddrs           []JSONMultiaddr `json:"multiaddrs,omitempty"`
	MultiaddrsString     string          `json:"multiaddrsString,omitempty"`
	PeerId               peer.ID         `json:"peerId"`
	FirstJoinedUnix      int64           `json:"firstJoined,omitempty"`
	LastJoinedUnix       int64           `json:"lastJoined,omitempty"`
	LastLeftUnix         int64           `json:"-"`
	LastUpdatedUnix      int64           `json:"lastUpdated,omitempty"`
	CurrentUptime        time.Duration   `json:"uptime,omitempty"`
	CurrentUptimeStr     string          `json:"uptimeStr,omitempty"`
	AccumulatedUptime    time.Duration   `json:"accumulatedUptime,omitempty"`
	AccumulatedUptimeStr string          `json:"accumulatedUptimeStr,omitempty"`
	EthAddress           string          `json:"ethAddress,omitempty"`
	Activity             int             `json:"activity,omitempty"`
	IsActive             bool            `json:"isActive"`
	IsStaked             bool            `json:"isStaked"`
	SelfIdentified       bool            `json:"-"`
	IsValidator          bool            `json:"isValidator"`
	IsTwitterScraper     bool            `json:"isTwitterScraper"`
	IsDiscordScraper     bool            `json:"isDiscordScraper"`
	IsTelegramScraper    bool            `json:"isTelegramScraper"`
	IsWebScraper         bool            `json:"isWebScraper"`
	Records              any             `json:"records,omitempty"`
	Version              string          `json:"version"`
	WorkerTimeout        time.Time       `json:"workerTimeout,omitempty"`
	ReturnedTweets       int             `json:"returnedTweets"` // a running count of the number of tweets returned
	LastReturnedTweet    time.Time       `json:"lastReturnedTweet"`
	TweetTimeout         bool            `json:"tweetTimeout"`
	TweetTimeouts        int             `json:"tweetTimeouts"` // a running countthe number of times a tweet request times out
	LastTweetTimeout     time.Time       `json:"lastTweetTimeout"`
	LastNotFoundTime     time.Time       `json:"lastNotFoundTime"`
	NotFoundCount        int             `json:"notFoundCount"` // a running count of the number of times a node is not found
}

func NewNodeData

func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData

NewNodeData creates a new NodeData struct initialized with the given parameters. It is used to represent data about a node in the network.

func (*NodeData) Address

func (n *NodeData) Address() string

Address returns a string representation of the NodeData's multiaddress and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC". This can be used by other nodes to connect to this node.

func (*NodeData) CanDoWork added in v0.5.1

func (n *NodeData) CanDoWork(workerType WorkerCategory) bool

CanDoWork checks if the node can perform work of the specified WorkerType. It returns true if the node is configured for the given worker type, false otherwise.

func (*NodeData) DiscordScraper

func (n *NodeData) DiscordScraper() bool

DiscordScraper checks if the current node is configured as a Discord scraper. It retrieves the configuration instance and returns the value of the DiscordScraper field.

func (*NodeData) GetAccumulatedUptime

func (n *NodeData) GetAccumulatedUptime() time.Duration

GetAccumulatedUptime returns the total accumulated uptime for the node. It calculates this by adding the current uptime to the stored accumulated uptime.

func (*NodeData) GetCurrentUptime

func (n *NodeData) GetCurrentUptime() time.Duration

GetCurrentUptime returns the node's current uptime duration. If the node is active, it calculates the time elapsed since the last joined time. If the node is marked as left, it returns 0.

func (*NodeData) Joined

func (n *NodeData) Joined(nodeVersion string)

Joined updates the NodeData when the node joins the network. It sets the join times, activity, active status, and logs based on stake status.

func (*NodeData) Left

func (n *NodeData) Left()

Left updates the NodeData when the node leaves the network. It sets the leave time, stops uptime, sets activity to left, sets node as inactive, and logs based on stake status.

func (*NodeData) MergeMultiaddresses added in v0.7.0

func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr)

func (*NodeData) TelegramScraper added in v0.5.1

func (n *NodeData) TelegramScraper() bool

TelegramScraper checks if the current node is configured as a Discord scraper. It retrieves the configuration instance and returns the value of the TelegramScraper field.

func (*NodeData) TwitterScraper

func (n *NodeData) TwitterScraper() bool

TwitterScraper checks if the current node is configured as a Twitter scraper. It retrieves the configuration instance and returns the value of the TwitterScraper field.

func (*NodeData) UpdateAccumulatedUptime

func (n *NodeData) UpdateAccumulatedUptime()

UpdateAccumulatedUptime updates the accumulated uptime of the node to account for any discrepancy between the last left and last joined times from gossipsub events. It calculates the duration between last left and joined if the node is marked as left. Otherwise, it uses the time since the last joined event.

func (*NodeData) UpdateTwitterFields added in v0.8.3

func (nd *NodeData) UpdateTwitterFields(fields NodeData)

func (*NodeData) WebScraper

func (n *NodeData) WebScraper() bool

WebScraper checks if the current node is configured as a Web scraper. It retrieves the configuration instance and returns the value of the WebScraper field.

type NodeEventTracker

type NodeEventTracker struct {
	NodeDataChan chan *NodeData

	ConnectBuffer map[string]ConnectBufferEntry
	// contains filtered or unexported fields
}

func NewNodeEventTracker

func NewNodeEventTracker(version, environment, hostId string) *NodeEventTracker

NewNodeEventTracker creates a new NodeEventTracker instance. It initializes the node data map, node data channel, node data file path, connect buffer map. It loads existing node data from file, starts a goroutine to clear expired buffer entries, and returns the initialized instance.

func (*NodeEventTracker) AddOrUpdateNodeData

func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error

AddOrUpdateNodeData adds or updates the node data in the node event tracker. If the node data does not exist, it is added and marked as self-identified. If the node data exists, it updates the staked status, Ethereum address, and multiaddress if needed. It also sends the updated node data to the NodeDataChan if the data changed or forceGossip is true.

func (*NodeEventTracker) ClearExpiredBufferEntries

func (net *NodeEventTracker) ClearExpiredBufferEntries()

ClearExpiredBufferEntries periodically clears expired entries from the connect buffer cache. It loops forever sleeping for a configured interval. On each loop it checks the current time against the connect time for each entry, and if expired, processes the connect and removes the entry.

func (*NodeEventTracker) ClearExpiredWorkerTimeouts added in v0.5.1

func (net *NodeEventTracker) ClearExpiredWorkerTimeouts()

ClearExpiredWorkerTimeouts periodically checks and clears expired worker timeouts. It runs in an infinite loop, sleeping for 5 minutes between each iteration. For each node in the network, it checks if the worker timeout has expired (after 60 minutes). If a timeout has expired, it resets the WorkerTimeout to zero and updates the node data. This function helps manage the availability of workers in the network by clearing temporary timeout states.

func (*NodeEventTracker) Connected

func (net *NodeEventTracker) Connected(n network.Network, c network.Conn)

Connected handles when a remote peer connects to this node. It checks if the connecting node already exists in nodeData. If not, it returns without doing anything. If it exists but is not active, it buffers the connect event. If it exists and is active, it marks the node as joined and saves the updated nodeData.

func (*NodeEventTracker) Disconnected

func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn)

Disconnected handles when a remote peer disconnects from this node. It looks up the disconnected node's data. If it doesn't exist, it returns. If it exists but is not staked, it returns. Otherwise it handles disconnect logic like updating the node data, deleting buffered connect events, and sending updated node data through the channel.

func (*NodeEventTracker) GetAllNodeData

func (net *NodeEventTracker) GetAllNodeData() []NodeData

GetAllNodeData returns a slice containing the NodeData for all nodes currently tracked.

func (*NodeEventTracker) GetEligibleWorkerNodes added in v0.6.0

func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []NodeData

GetEligibleWorkerNodes returns a slice of NodeData for nodes that are eligible to perform a specific category of work.

func (*NodeEventTracker) GetNodeData

func (net *NodeEventTracker) GetNodeData(peerID string) *NodeData

GetNodeData returns the NodeData for the node with the given peer ID, or nil if no NodeData exists for that peer ID.

func (*NodeEventTracker) GetUpdatedNodes

func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData

GetUpdatedNodes returns a slice of NodeData for nodes that have been updated since the given time. It filters the full node data set to only those updated after the passed in time, sorts the filtered results by update timestamp, and returns the sorted slice.

func (*NodeEventTracker) HandleMessage

func (net *NodeEventTracker) HandleMessage(msg *pubsub.Message)

HandleMessage unmarshals the received pubsub message into a NodeData struct, and passes it to HandleNodeData for further processing. This allows the NodeEventTracker to handle incoming node data messages from the pubsub layer.

func (*NodeEventTracker) HandleNodeData

func (net *NodeEventTracker) HandleNodeData(data NodeData)

HandleNodeData processes incoming NodeData from the pubsub layer. It adds new NodeData to the nodeData map, checks for replay attacks, and reconciles discrepancies between incoming and existing NodeData. This allows the tracker to maintain an up-to-date view of the node topology based on pubsub messages.

func (*NodeEventTracker) IsStaked

func (net *NodeEventTracker) IsStaked(peerID string) bool

IsStaked returns whether the node with the given peerID is marked as staked in the node data tracker. Returns false if no node data is found for the given peerID.

func (*NodeEventTracker) Listen

func (net *NodeEventTracker) Listen(n network.Network, a ma.Multiaddr)

Listen is called when the node starts listening on a new multiaddr. It logs the network and address that listening started on.

func (*NodeEventTracker) ListenClose

func (net *NodeEventTracker) ListenClose(n network.Network, a ma.Multiaddr)

ListenClose logs when the node stops listening on a multiaddr. It logs the network and multiaddr that was stopped listening on.

func (*NodeEventTracker) RefreshFromBoot

func (net *NodeEventTracker) RefreshFromBoot(data NodeData)

RefreshFromBoot updates the node data map with the provided NodeData when the node boots up. It associates the NodeData with the peer ID string as the map key.

func (*NodeEventTracker) StartCleanupRoutine added in v0.5.1

func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context, hostId string)

StartCleanupRoutine starts a goroutine that periodically checks for and removes stale peers

func (*NodeEventTracker) UpdateNodeDataTwitter added in v0.8.3

func (net *NodeEventTracker) UpdateNodeDataTwitter(peerID string, updates NodeData) error

type NodeLifecycleEvent

type NodeLifecycleEvent struct {
	EventType string `json:"eventType"` // "join" or "leave"
	NodeID    string `json:"nodeID"`
	Nonce     int64  `json:"nonce"`
	Timestamp int64  `json:"timestamp"`
}

NodeLifecycleEvent represents a join or leave event

type NodeSorter added in v0.8.3

type NodeSorter struct {
	// contains filtered or unexported fields
}

NodeSorter provides methods for sorting NodeData slices

func (NodeSorter) Len added in v0.8.3

func (s NodeSorter) Len() int

Len returns the length of the nodes slice

func (NodeSorter) Less added in v0.8.3

func (s NodeSorter) Less(i, j int) bool

Less compares nodes at indices i and j using the provided less function

func (NodeSorter) Swap added in v0.8.3

func (s NodeSorter) Swap(i, j int)

Swap swaps the nodes at indices i and j

type PublicKeyMessage

type PublicKeyMessage struct {
	PublicKey string `json:"publicKey"`
	Signature string `json:"signature"`
	Data      string `json:"data"`
}

PublicKeyMessage represents the structure of the public key messages.

type PublicKeySubscriptionHandler

type PublicKeySubscriptionHandler struct {
	PublicKeys  []PublicKeyMessage
	PubKeyTopic *pubsub.Topic
}

PublicKeySubscriptionHandler handles incoming messages on public key topics.

func (*PublicKeySubscriptionHandler) GetPublicKeys

func (handler *PublicKeySubscriptionHandler) GetPublicKeys() []PublicKeyMessage

GetPublicKeys returns the list of PublicKeyMessages.

func (*PublicKeySubscriptionHandler) HandleMessage

func (handler *PublicKeySubscriptionHandler) HandleMessage(m *pubsub.Message)

HandleMessage handles incoming public key messages, with verification and update logic.

type SafeMap

type SafeMap struct {
	// contains filtered or unexported fields
}

func NewSafeMap

func NewSafeMap() *SafeMap

NewSafeMap creates a new instance of SafeMap. It initializes the underlying map that will store the key-value pairs.

func (*SafeMap) Delete

func (sm *SafeMap) Delete(key string)

Delete removes the item with the specified key from the SafeMap. It acquires a write lock to ensure thread-safety while deleting the item.

func (*SafeMap) Get

func (sm *SafeMap) Get(key string) (*NodeData, bool)

Get retrieves the value associated with the specified key from the SafeMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the SafeMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.

func (*SafeMap) GetStakedNodesSlice

func (sm *SafeMap) GetStakedNodesSlice() []NodeData

GetStakedNodesSlice returns a slice of NodeData for all staked nodes in the SafeMap. It creates a new slice, copies the NodeData from the SafeMap, and populates additional fields such as CurrentUptime, AccumulatedUptime, and their string representations. The resulting slice is sorted based on the LastUpdated timestamp of each NodeData.

func (*SafeMap) Len

func (sm *SafeMap) Len() int

Len returns the number of items in the SafeMap. It acquires a read lock to ensure thread-safety while reading the length.

func (*SafeMap) MarshalJSON

func (sm *SafeMap) MarshalJSON() ([]byte, error)

MarshalJSON override json MarshalJSON to just return the map

func (*SafeMap) Set

func (sm *SafeMap) Set(key string, value *NodeData)

Set associates the specified value with the specified key in the SafeMap. It acquires a write lock to ensure thread-safety while setting the value.

func (*SafeMap) UnmarshalJSON

func (sm *SafeMap) UnmarshalJSON(b []byte) error

UnmarshalJSON override json UnmarshalJSON to just set the map

type TopicHandler

type TopicHandler struct {
	Subscription *pubsub.Subscription
}

TopicHandler is responsible for handling messages from subscribed topics.

func NewTopicHandler

func NewTopicHandler() *TopicHandler

NewTopicHandler creates a new TopicHandler with necessary initializations.

func (*TopicHandler) HandleMessage

func (h *TopicHandler) HandleMessage(msg *pubsub.Message)

HandleMessage processes messages received on the subscribed topics.

func (*TopicHandler) StartListening

func (h *TopicHandler) StartListening()

StartListening starts listening to messages on the subscribed topic.

type WorkerCategory added in v0.5.1

type WorkerCategory int

WorkerCategory represents the main categories of workers

const (
	CategoryDiscord WorkerCategory = iota
	CategoryTelegram
	CategoryTwitter
	CategoryWeb
)

func (WorkerCategory) String added in v0.5.1

func (wc WorkerCategory) String() string

String returns the string representation of the WorkerCategory

type WorkerEventTracker

type WorkerEventTracker struct {
	Workers     []Workers
	WorkerTopic *pubsub.Topic

	WorkerStatusCh chan *pubsub.Message
	// contains filtered or unexported fields
}

WorkerEventTracker is a struct that handles subscriptions for worker status updates. It contains the following fields: - WorkerTracker: A slice of WorkerTracker structs representing the status of workers. - Data: A byte slice containing the raw data received from subscriptions. - mu: A sync.Mutex used for synchronizing access to the handler's fields. - WorkerCh: A channel for sending worker status updates as byte slices.

func (*WorkerEventTracker) HandleMessage

func (h *WorkerEventTracker) HandleMessage(m *pubsub.Message)

HandleMessage implements subscription WorkerEventTracker handler

type Workers

type Workers struct {
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL