pubsub

package
v0.0.3-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ActivityJoined = iota
	ActivityLeft
)

Variables

This section is empty.

Functions

func BroadcastEvent

func BroadcastEvent(ctx context.Context, ps *pubsub.PubSub, topicName string, event NodeLifecycleEvent) error

BroadcastEvent marshals a NodeLifecycleEvent into JSON, publishes it to the given PubSub topic, and logs the operation. Returns any error. This allows broadcasting node join/leave events to other nodes.

func GetEthAddress

func GetEthAddress(remotePeer peer.ID, n network.Network) string

GetEthAddress returns the Ethereum address for the given remote peer. It gets the peer's public key from the network's peerstore, converts it to a hex string, and converts that to an Ethereum address. Returns an empty string if there is no public key for the peer.

func GetSelfNodeDataJson

func GetSelfNodeDataJson(host host.Host, isStaked bool) []byte

GetSelfNodeDataJson converts the local node's data into a JSON byte array. It populates a NodeData struct with the node's ID, staking status, and Ethereum address. The NodeData struct is then marshalled into a JSON byte array. Returns nil if there is an error marshalling to JSON.

func PrettyDuration

func PrettyDuration(d time.Duration) string

PrettyDuration takes a time.Duration and returns a string representation rounded to the nearest minute. It will include the number of days, hours, and minutes as applicable. For example:

  • 1 day 2 hours 3 minutes
  • 2 hours 3 minutes
  • 3 minutes

func StreamConsoleTo

func StreamConsoleTo(ctx context.Context, topic *pubsub.Topic)

StreamConsoleTo streams data read from stdin to the given PubSub topic. It launches a goroutine that continuously reads from stdin using a bufio.Reader. Each line that is read is published to the topic. Any errors are logged. The goroutine runs until ctx is canceled.

Types

type ConnectBufferEntry

type ConnectBufferEntry struct {
	NodeData    *NodeData
	ConnectTime time.Time
}

type JSONMultiaddr

type JSONMultiaddr struct {
	multiaddr.Multiaddr
}

func (*JSONMultiaddr) UnmarshalJSON

func (m *JSONMultiaddr) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler. It parses the JSON representation of a multiaddress and stores the result in m.

type Manager

type Manager struct {
	PublicKeyPublisher *PublicKeyPublisher // Add this line
	// contains filtered or unexported fields
}

func NewPubSubManager

func NewPubSubManager(ctx context.Context, host host.Host) (*Manager, error)

NewPubSubManager creates a new PubSubManager instance. It initializes a new GossipSub and associates it with the given host. It also initializes data structures to track topics, subscriptions and handlers. The PublicKeyPublisher is initialized to enable publishing public keys over pubsub. The manager instance is returned, along with any error from initializing GossipSub.

func (*Manager) AddSubscription

func (sm *Manager) AddSubscription(topicName string, handler SubscriptionHandler, includeSelf bool) error

AddSubscription subscribes to the PubSub topic with the given topicName. It creates the topic if needed, subscribes to it, and adds the subscription and handler to the manager's maps. It launches a goroutine to handle incoming messages, skipping messages from self, and calling the handler on each message.

func (*Manager) GetHandler

func (sm *Manager) GetHandler(topic string) (SubscriptionHandler, error)

GetHandler returns the SubscriptionHandler for the given topic name. It returns an error if no handler exists for the given topic.

func (*Manager) GetSubscription

func (sm *Manager) GetSubscription(topic string) (*pubsub.Subscription, error)

GetSubscription returns the Subscription for the given topic name. It returns an error if no Subscription exists for the given topic.

func (*Manager) GetTopicNames

func (sm *Manager) GetTopicNames() []string

GetTopicNames returns a slice of the names of all topics currently managed.

func (*Manager) Publish

func (sm *Manager) Publish(topic string, data []byte) error

Publish publishes a message to the PubSub topic with the given topic name. It returns an error if no topic with the given name exists.

func (*Manager) PublishMessage

func (sm *Manager) PublishMessage(topicName, message string) error

PublishMessage publishes a message to the PubSub topic with the given topicName. It converts the message to a byte slice, checks if the topic exists, optionally creates the topic if it doesn't exist, and publishes using the existing Publish method. Returns an error if the topic does not exist and cannot be created.

func (*Manager) RemoveSubscription

func (sm *Manager) RemoveSubscription(topic string) error

RemoveSubscription unsubscribes from the PubSub topic with the given topic name. It closes the existing subscription, removes it from the manager's subscription map, and removes the associated handler. Returns an error if no subscription exists for the given topic.

func (*Manager) SetUpSubscriptions

func (sm *Manager) SetUpSubscriptions()

SetUpSubscriptions sets up default subscriptions for the PubSub manager based on predefined topics and handlers. This allows initializing subscriptions separately from creating the handlers.

func (*Manager) Subscribe

func (sm *Manager) Subscribe(topicName string, handler SubscriptionHandler) error

Subscribe registers a subscription handler to receive messages for the given topic name. It gets the existing subscription, saves it and the handler, and starts a goroutine to call the handler for each new message. Returns an error if unable to get the subscription.

type NodeData

type NodeData struct {
	Multiaddrs           []JSONMultiaddr `json:"multiaddrs,omitempty"`
	PeerId               peer.ID         `json:"peerId"`
	FirstJoined          time.Time       `json:"firstJoined,omitempty"`
	LastJoined           time.Time       `json:"lastJoined,omitempty"`
	LastLeft             time.Time       `json:"lastLeft,omitempty"`
	LastUpdated          time.Time       `json:"lastUpdated,omitempty"`
	CurrentUptime        time.Duration   `json:"currentUptime,omitempty"`
	CurrentUptimeStr     string          `json:"readableCurrentUptime,omitempty"`
	AccumulatedUptime    time.Duration   `json:"accumulatedUptime,omitempty"`
	AccumulatedUptimeStr string          `json:"readableAccumulatedUptime,omitempty"`
	EthAddress           string          `json:"ethAddress,omitempty"`
	Activity             int             `json:"activity,omitempty"`
	IsActive             bool            `json:"isActive"`
	IsStaked             bool            `json:"isStaked"`
	SelfIdentified       bool            `json:"-"`
	IsWriterNode         bool            `json:"isWriterNode"`
	IsTwitterScraper     bool            `json:"isTwitterScraper"`
	IsWebScraper         bool            `json:"isWebScraper"`
	BytesScraped         int             `json:"bytesScraped"`
	Records              any             `json:"records,omitempty"`
}

func NewNodeData

func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData

NewNodeData creates a new NodeData struct initialized with the given parameters. It is used to represent data about a node in the network.

func (*NodeData) Address

func (n *NodeData) Address() string

Address returns a string representation of the NodeData's multiaddress and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC". This can be used by other nodes to connect to this node.

func (*NodeData) GetAccumulatedUptime

func (n *NodeData) GetAccumulatedUptime() time.Duration

GetAccumulatedUptime returns the total accumulated uptime for the node. It calculates this by adding the current uptime to the stored accumulated uptime.

func (*NodeData) GetCurrentUptime

func (n *NodeData) GetCurrentUptime() time.Duration

GetCurrentUptime returns the node's current uptime duration. If the node is active, it calculates the time elapsed since the last joined time. If the node is marked as left, it returns 0.

func (*NodeData) Joined

func (n *NodeData) Joined()

Joined updates the NodeData when the node joins the network. It sets the join times, activity, active status, and logs based on stake status.

func (*NodeData) Left

func (n *NodeData) Left()

Left updates the NodeData when the node leaves the network. It sets the leave time, stops uptime, sets activity to left, sets node as inactive, and logs based on stake status.

func (*NodeData) TwitterScraper

func (n *NodeData) TwitterScraper() bool

func (*NodeData) UpdateAccumulatedUptime

func (n *NodeData) UpdateAccumulatedUptime()

UpdateAccumulatedUptime updates the accumulated uptime of the node to account for any discrepancy between the last left and last joined times from gossipsub events. It calculates the duration between last left and joined if the node is marked as left. Otherwise, it uses the time since the last joined event.

func (*NodeData) WebScraper

func (n *NodeData) WebScraper() bool

type NodeEventTracker

type NodeEventTracker struct {
	NodeDataChan chan *NodeData

	ConnectBuffer map[string]ConnectBufferEntry
	// contains filtered or unexported fields
}

func NewNodeEventTracker

func NewNodeEventTracker(version, environment string) *NodeEventTracker

NewNodeEventTracker creates a new NodeEventTracker instance. It initializes the node data map, node data channel, node data file path, connect buffer map. It loads existing node data from file, starts a goroutine to clear expired buffer entries, and returns the initialized instance.

func (*NodeEventTracker) AddOrUpdateNodeData

func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error

AddOrUpdateNodeData adds or updates the node data in the node event tracker. If the node data does not exist, it is added and marked as self-identified. If the node data exists, it updates the staked status, Ethereum address, and multiaddress if needed. It also sends the updated node data to the NodeDataChan if the data changed or forceGossip is true.

func (*NodeEventTracker) ClearExpiredBufferEntries

func (net *NodeEventTracker) ClearExpiredBufferEntries()

ClearExpiredBufferEntries periodically clears expired entries from the connect buffer cache. It loops forever sleeping for a configured interval. On each loop it checks the current time against the connect time for each entry, and if expired, processes the connect and removes the entry.

func (*NodeEventTracker) Connected

func (net *NodeEventTracker) Connected(n network.Network, c network.Conn)

Connected handles when a remote peer connects to this node. It checks if the connecting node already exists in nodeData. If not, it returns without doing anything. If it exists but is not active, it buffers the connect event. If it exists and is active, it marks the node as joined and saves the updated nodeData.

func (*NodeEventTracker) Disconnected

func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn)

Disconnected handles when a remote peer disconnects from this node. It looks up the disconnected node's data. If it doesn't exist, it returns. If it exists but is not staked, it returns. Otherwise it handles disconnect logic like updating the node data, deleting buffered connect events, and sending updated node data through the channel.

func (*NodeEventTracker) DumpNodeData

func (net *NodeEventTracker) DumpNodeData()

DumpNodeData writes the NodeData map to a JSON file. It determines the file path based on the configured data directory, defaulting to nodeDataFile if not set. It logs any errors writing the file. This allows periodically persisting the node data.

func (*NodeEventTracker) GetAllNodeData

func (net *NodeEventTracker) GetAllNodeData() []NodeData

GetAllNodeData returns a slice containing the NodeData for all nodes currently tracked.

func (*NodeEventTracker) GetNodeData

func (net *NodeEventTracker) GetNodeData(peerID string) *NodeData

GetNodeData returns the NodeData for the node with the given peer ID, or nil if no NodeData exists for that peer ID.

func (*NodeEventTracker) GetUpdatedNodes

func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData

GetUpdatedNodes returns a slice of NodeData for nodes that have been updated since the given time. It filters the full node data set to only those updated after the passed in time, sorts the filtered results by update timestamp, and returns the sorted slice.

func (*NodeEventTracker) HandleMessage

func (net *NodeEventTracker) HandleMessage(msg *pubsub.Message)

HandleMessage unmarshals the received pubsub message into a NodeData struct, and passes it to HandleNodeData for further processing. This allows the NodeEventTracker to handle incoming node data messages from the pubsub layer.

func (*NodeEventTracker) HandleNodeData

func (net *NodeEventTracker) HandleNodeData(data NodeData)

HandleNodeData processes incoming NodeData from the pubsub layer. It adds new NodeData to the nodeData map, checks for replay attacks, and reconciles discrepancies between incoming and existing NodeData. This allows the tracker to maintain an up-to-date view of the node topology based on pubsub messages.

func (*NodeEventTracker) IsStaked

func (net *NodeEventTracker) IsStaked(peerID string) bool

IsStaked returns whether the node with the given peerID is marked as staked in the node data tracker. Returns false if no node data is found for the given peerID.

func (*NodeEventTracker) Listen

func (net *NodeEventTracker) Listen(n network.Network, a ma.Multiaddr)

Listen is called when the node starts listening on a new multiaddr. It logs the network and address that listening started on.

func (*NodeEventTracker) ListenClose

func (net *NodeEventTracker) ListenClose(n network.Network, a ma.Multiaddr)

ListenClose logs when the node stops listening on a multiaddr. It logs the network and multiaddr that was stopped listening on.

func (*NodeEventTracker) LoadNodeData

func (net *NodeEventTracker) LoadNodeData() error

LoadNodeData loads the node data from a JSON file. It determines the file path based on the configured data directory, defaulting to nodeDataFile if not set. It logs any errors reading or parsing the file. This allows initializing the node data tracker from persisted data.

func (*NodeEventTracker) RefreshFromBoot

func (net *NodeEventTracker) RefreshFromBoot(data NodeData)

RefreshFromBoot updates the node data map with the provided NodeData when the node boots up. It associates the NodeData with the peer ID string as the map key.

type NodeLifecycleEvent

type NodeLifecycleEvent struct {
	EventType string `json:"eventType"` // "join" or "leave"
	NodeID    string `json:"nodeID"`
	Nonce     int64  `json:"nonce"`
	Timestamp int64  `json:"timestamp"`
}

NodeLifecycleEvent represents a join or leave event

type PublicKeyMessage

type PublicKeyMessage struct {
	PublicKey string `json:"publicKey"`
	Signature string `json:"signature"`
	Data      string `json:"data"`
}

PublicKeyMessage represents the structure of the public key messages.

type PublicKeyPublisher

type PublicKeyPublisher struct {
	// contains filtered or unexported fields
}

func NewPublicKeyPublisher

func NewPublicKeyPublisher(manager *Manager, pubKey libp2pCrypto.PubKey) *PublicKeyPublisher

NewPublicKeyPublisher creates a new instance of PublicKeyPublisher.

func (*PublicKeyPublisher) GetPublishedMessages

func (p *PublicKeyPublisher) GetPublishedMessages() []PublicKeyMessage

Optionally, add a method to retrieve the stored messages

func (*PublicKeyPublisher) PublishNodePublicKey

func (p *PublicKeyPublisher) PublishNodePublicKey(publicKey string, data, signature []byte) error

PublishNodePublicKey publishes the node's public key to the designated topic.

type PublicKeySubscriptionHandler

type PublicKeySubscriptionHandler struct {
	PublicKeys  []PublicKeyMessage
	PubKeyTopic *pubsub.Topic
}

PublicKeySubscriptionHandler handles incoming messages on public key topics.

func (*PublicKeySubscriptionHandler) GetPublicKeys

func (handler *PublicKeySubscriptionHandler) GetPublicKeys() []PublicKeyMessage

GetPublicKeys returns the list of PublicKeyMessages.

func (*PublicKeySubscriptionHandler) HandleMessage

func (handler *PublicKeySubscriptionHandler) HandleMessage(m *pubsub.Message)

HandleMessage handles incoming public key messages, with verification and update logic.

type ResponseChannelMap

type ResponseChannelMap struct {
	// contains filtered or unexported fields
}

func GetResponseChannelMap

func GetResponseChannelMap() *ResponseChannelMap

GetResponseChannelMap returns the singleton instance of ResponseChannelMap.

func (*ResponseChannelMap) CreateChannel

func (drm *ResponseChannelMap) CreateChannel(key string) chan []byte

func (*ResponseChannelMap) Delete

func (drm *ResponseChannelMap) Delete(key string)

Delete removes the item with the specified key from the ResponseChannelMap. It acquires a write lock to ensure thread-safety while deleting the item.

func (*ResponseChannelMap) Get

func (drm *ResponseChannelMap) Get(key string) (chan []byte, bool)

Get retrieves the value associated with the specified key from the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the ResponseChannelMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.

func (*ResponseChannelMap) Len

func (drm *ResponseChannelMap) Len() int

Len returns the number of items in the ResponseChannelMap. It acquires a read lock to ensure thread-safety while reading the length.

func (*ResponseChannelMap) Set

func (drm *ResponseChannelMap) Set(key string, value chan []byte)

Set associates the specified value with the specified key in the ResponseChannelMap. It acquires a write lock to ensure thread-safety while setting the value.

type SafeMap

type SafeMap struct {
	// contains filtered or unexported fields
}

func NewSafeMap

func NewSafeMap() *SafeMap

NewSafeMap creates a new instance of SafeMap. It initializes the underlying map that will store the key-value pairs.

func (*SafeMap) Delete

func (sm *SafeMap) Delete(key string)

Delete removes the item with the specified key from the SafeMap. It acquires a write lock to ensure thread-safety while deleting the item.

func (*SafeMap) DumpNodeData

func (sm *SafeMap) DumpNodeData(filePath string) error

DumpNodeData writes the entire nodeData map to a file in JSON format.

func (*SafeMap) Get

func (sm *SafeMap) Get(key string) (*NodeData, bool)

Get retrieves the value associated with the specified key from the SafeMap. It acquires a read lock to ensure thread-safety while reading the value. If the key exists in the SafeMap, it returns the corresponding value and true. If the key does not exist, it returns nil and false.

func (*SafeMap) GetStakedNodesSlice

func (sm *SafeMap) GetStakedNodesSlice() []NodeData

GetStakedNodesSlice returns a slice of NodeData for all staked nodes in the SafeMap. It creates a new slice, copies the NodeData from the SafeMap, and populates additional fields such as CurrentUptime, AccumulatedUptime, and their string representations. The resulting slice is sorted based on the LastUpdated timestamp of each NodeData.

func (*SafeMap) Len

func (sm *SafeMap) Len() int

Len returns the number of items in the SafeMap. It acquires a read lock to ensure thread-safety while reading the length.

func (*SafeMap) LoadNodeData

func (sm *SafeMap) LoadNodeData(filePath string) error

LoadNodeData reads nodeData from a file in JSON format and loads it into the map.

func (*SafeMap) MarshalJSON

func (sm *SafeMap) MarshalJSON() ([]byte, error)

MarshalJSON override json MarshalJSON to just return the map

func (*SafeMap) Set

func (sm *SafeMap) Set(key string, value *NodeData)

Set associates the specified value with the specified key in the SafeMap. It acquires a write lock to ensure thread-safety while setting the value.

func (*SafeMap) UnmarshalJSON

func (sm *SafeMap) UnmarshalJSON(b []byte) error

UnmarshalJSON override json UnmarshalJSON to just set the map

type SubscriptionHandler

type SubscriptionHandler interface {
	HandleMessage(msg *pubsub.Message)
}

SubscriptionHandler defines the interface for handling pubsub messages. Implementations should subscribe to topics and handle incoming messages.

type TopicHandler

type TopicHandler struct {
	Subscription *pubsub.Subscription
}

TopicHandler is responsible for handling messages from subscribed topics.

func NewTopicHandler

func NewTopicHandler() *TopicHandler

NewTopicHandler creates a new TopicHandler with necessary initializations.

func (*TopicHandler) HandleMessage

func (h *TopicHandler) HandleMessage(msg *pubsub.Message)

HandleMessage processes messages received on the subscribed topics.

func (*TopicHandler) StartListening

func (h *TopicHandler) StartListening()

StartListening starts listening to messages on the subscribed topic.

type WorkerEventTracker

type WorkerEventTracker struct {
	Workers     []Workers
	WorkerTopic *pubsub.Topic

	WorkerStatusCh chan *pubsub.Message
	// contains filtered or unexported fields
}

WorkerEventTracker is a struct that handles subscriptions for worker status updates. It contains the following fields: - WorkerTracker: A slice of WorkerTracker structs representing the status of workers. - Data: A byte slice containing the raw data received from subscriptions. - mu: A sync.Mutex used for synchronizing access to the handler's fields. - WorkerCh: A channel for sending worker status updates as byte slices.

func (*WorkerEventTracker) HandleMessage

func (h *WorkerEventTracker) HandleMessage(m *pubsub.Message)

HandleMessage implements subscription WorkerEventTracker handler

type WorkerStatus

type WorkerStatus struct {
	PeerID string `json:"peerId"`
	Data   []byte
}

type WorkerStatusHandler

type WorkerStatusHandler struct {
	WorkerStatus       []WorkerStatus
	CompletedWorkTopic *pubsub.Topic

	WorkerStatusCh chan []byte
	// contains filtered or unexported fields
}

WorkerStatusHandler is a struct that handles subscriptions for worker status updates. It contains the following fields: - WorkerStatus: A slice of WorkerStatus structs representing the status of workers. - Data: A byte slice containing the raw data received from subscriptions. - mu: A sync.Mutex used for synchronizing access to the handler's fields. - WorkerCh: A channel for sending worker status updates as byte slices.

func (*WorkerStatusHandler) HandleMessage

func (h *WorkerStatusHandler) HandleMessage(message *pubsub.Message)

HandleMessage implement subscription handler here

type Workers

type Workers struct {
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL