pubsub

package
v0.0.1-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ActivityJoined = iota
	ActivityLeft
)

Variables

This section is empty.

Functions

func BroadcastEvent

func BroadcastEvent(ctx context.Context, ps *pubsub.PubSub, topicName string, event NodeLifecycleEvent) error

BroadcastEvent marshals a NodeLifecycleEvent into JSON, publishes it to the given PubSub topic, and logs the operation. Returns any error. This allows broadcasting node join/leave events to other nodes.

func GetSelfNodeDataJson

func GetSelfNodeDataJson(host host.Host, isStaked bool) []byte

GetSelfNodeDataJson converts the local node's data into a JSON byte array. It populates a NodeData struct with the node's ID, staking status, and Ethereum address. The NodeData struct is then marshalled into a JSON byte array. Returns nil if there is an error marshalling to JSON.

func PrettyDuration

func PrettyDuration(d time.Duration) string

func StreamConsoleTo

func StreamConsoleTo(ctx context.Context, topic *pubsub.Topic)

StreamConsoleTo streams data read from stdin to the given PubSub topic. It launches a goroutine that continuously reads from stdin using a bufio.Reader. Each line that is read is published to the topic. Any errors are logged. The goroutine runs until ctx is canceled.

Types

type ConnectBufferEntry

type ConnectBufferEntry struct {
	NodeData    *NodeData
	ConnectTime time.Time
}

type JSONMultiaddr

type JSONMultiaddr struct {
	multiaddr.Multiaddr
}

func (*JSONMultiaddr) UnmarshalJSON

func (m *JSONMultiaddr) UnmarshalJSON(b []byte) error

UnmarshalJSON implements json.Unmarshaler. It parses the JSON representation of a multiaddress and stores the result in m.

type Manager

type Manager struct {
	PublicKeyPublisher *PublicKeyPublisher // Add this line
	// contains filtered or unexported fields
}

func NewPubSubManager

func NewPubSubManager(ctx context.Context, host host.Host) (*Manager, error)

NewPubSubManager creates a new PubSubManager instance. It initializes a new GossipSub and associates it with the given host. It also initializes data structures to track topics, subscriptions and handlers. The PublicKeyPublisher is initialized to enable publishing public keys over pubsub. The manager instance is returned, along with any error from initializing GossipSub.

func (*Manager) AddSubscription

func (sm *Manager) AddSubscription(topicName string, handler SubscriptionHandler) error

AddSubscription subscribes to the PubSub topic with the given topicName. It creates the topic if needed, subscribes to it, and adds the subscription and handler to the manager's maps. It launches a goroutine to handle incoming messages, skipping messages from self, and calling the handler on each message.

func (*Manager) GetHandler

func (sm *Manager) GetHandler(topic string) (SubscriptionHandler, error)

GetHandler returns the SubscriptionHandler for the given topic name. It returns an error if no handler exists for the given topic.

func (*Manager) GetSubscription

func (sm *Manager) GetSubscription(topic string) (*pubsub.Subscription, error)

GetSubscription returns the Subscription for the given topic name. It returns an error if no Subscription exists for the given topic.

func (*Manager) GetTopicNames

func (sm *Manager) GetTopicNames() []string

GetTopicNames returns a slice of the names of all topics currently managed.

func (*Manager) Publish

func (sm *Manager) Publish(topic string, data []byte) error

Publish publishes a message to the PubSub topic with the given topic name. It returns an error if no topic with the given name exists.

func (*Manager) PublishMessage

func (sm *Manager) PublishMessage(topicName, message string) error

PublishMessage publishes a message to the PubSub topic with the given topicName. It converts the message to a byte slice, checks if the topic exists, optionally creates the topic if it doesn't exist, and publishes using the existing Publish method. Returns an error if the topic does not exist and cannot be created.

func (*Manager) RemoveSubscription

func (sm *Manager) RemoveSubscription(topic string) error

RemoveSubscription unsubscribes from the PubSub topic with the given topic name. It closes the existing subscription, removes it from the manager's subscription map, and removes the associated handler. Returns an error if no subscription exists for the given topic.

func (*Manager) SetUpSubscriptions

func (sm *Manager) SetUpSubscriptions()

SetUpSubscriptions sets up default subscriptions for the PubSub manager based on predefined topics and handlers. This allows initializing subscriptions separately from creating the handlers.

func (*Manager) Subscribe

func (sm *Manager) Subscribe(topicName string, handler SubscriptionHandler) error

Subscribe registers a subscription handler to receive messages for the given topic name. It gets the existing subscription, saves it and the handler, and starts a goroutine to call the handler for each new message. Returns an error if unable to get the subscription.

type NodeData

type NodeData struct {
	Multiaddrs           []JSONMultiaddr `json:"multiaddrs,omitempty"`
	PeerId               peer.ID         `json:"peerId"`
	FirstJoined          time.Time       `json:"firstJoined,omitempty"`
	LastJoined           time.Time       `json:"lastJoined,omitempty"`
	LastLeft             time.Time       `json:"lastLeft,omitempty"`
	LastUpdated          time.Time       `json:"lastUpdated,omitempty"`
	CurrentUptime        time.Duration   `json:"currentUptime,omitempty"`
	CurrentUptimeStr     string          `json:"readableCurrentUptime,omitempty"`
	AccumulatedUptime    time.Duration   `json:"accumulatedUptime,omitempty"`
	AccumulatedUptimeStr string          `json:"readableAccumulatedUptime,omitempty"`
	EthAddress           string          `json:"ethAddress,omitempty"`
	Activity             int             `json:"activity,omitempty"`
	IsActive             bool            `json:"isActive"`
	IsStaked             bool            `json:"isStaked"`
	SelfIdentified       bool            `json:"-"`
	IsWriterNode         bool            `json:"isWriterNode"`
	IsTwitterScraper     bool            `json:"isTwitterScraper"`
	IsWebScraper         bool            `json:"isWebScraper"`
	BytesScraped         int             `json:"bytesScraped"`
}

func NewNodeData

func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData

NewNodeData creates a new NodeData struct initialized with the given parameters. It is used to represent data about a node in the network.

func (*NodeData) Address

func (n *NodeData) Address() string

Address returns a string representation of the NodeData's multiaddress and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC". This can be used by other nodes to connect to this node.

func (*NodeData) GetAccumulatedUptime

func (n *NodeData) GetAccumulatedUptime() time.Duration

GetAccumulatedUptime returns the total accumulated uptime for the node. It calculates this by adding the current uptime to the stored accumulated uptime.

func (*NodeData) GetCurrentUptime

func (n *NodeData) GetCurrentUptime() time.Duration

GetCurrentUptime returns the node's current uptime duration. If the node is active, it calculates the time elapsed since the last joined time. If the node is marked as left, it returns 0.

func (*NodeData) Joined

func (n *NodeData) Joined()

Joined updates the NodeData when the node joins the network. It sets the join times, activity, active status, and logs based on stake status.

func (*NodeData) Left

func (n *NodeData) Left()

Left updates the NodeData when the node leaves the network. It sets the leave time, stops uptime, sets activity to left, sets node as inactive, and logs based on stake status.

func (*NodeData) TwitterScraper

func (n *NodeData) TwitterScraper() bool

func (*NodeData) UpdateAccumulatedUptime

func (n *NodeData) UpdateAccumulatedUptime()

UpdateAccumulatedUptime updates the accumulated uptime of the node to account for any discrepancy between the last left and last joined times from gossipsub events. It calculates the duration between last left and joined if the node is marked as left. Otherwise, it uses the time since the last joined event.

func (*NodeData) WebScraper

func (n *NodeData) WebScraper() bool

type NodeEventTracker

type NodeEventTracker struct {
	NodeDataChan chan *NodeData

	ConnectBuffer map[string]ConnectBufferEntry
	// contains filtered or unexported fields
}

func NewNodeEventTracker

func NewNodeEventTracker(version, environment string) *NodeEventTracker

NewNodeEventTracker creates a new NodeEventTracker instance. It initializes the node data map, node data channel, node data file path, connect buffer map. It loads existing node data from file, starts a goroutine to clear expired buffer entries, and returns the initialized instance.

func (*NodeEventTracker) AddOrUpdateNodeData

func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error

func (*NodeEventTracker) ClearExpiredBufferEntries

func (net *NodeEventTracker) ClearExpiredBufferEntries()

ClearExpiredBufferEntries periodically clears expired entries from the connect buffer cache. It loops forever sleeping for a configured interval. On each loop it checks the current time against the connect time for each entry, and if expired, processes the connect and removes the entry.

func (*NodeEventTracker) Connected

func (net *NodeEventTracker) Connected(n network.Network, c network.Conn)

Connected handles when a remote peer connects to this node. It checks if the connecting node already exists in nodeData. If not, it returns without doing anything. If it exists but is not active, it buffers the connect event. If it exists and is active, it marks the node as joined and saves the updated nodeData.

func (*NodeEventTracker) Disconnected

func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn)

Disconnected handles when a remote peer disconnects from this node. It looks up the disconnected node's data. If it doesn't exist, it returns. If it exists but is not staked, it returns. Otherwise it handles disconnect logic like updating the node data, deleting buffered connect events, and sending updated node data through the channel.

func (*NodeEventTracker) DumpNodeData

func (net *NodeEventTracker) DumpNodeData()

DumpNodeData writes the NodeData map to a JSON file. It determines the file path based on the configured data directory, defaulting to nodeDataFile if not set. It logs any errors writing the file. This allows periodically persisting the node data.

func (*NodeEventTracker) GetAllNodeData

func (net *NodeEventTracker) GetAllNodeData() []NodeData

GetAllNodeData returns a slice containing the NodeData for all nodes currently tracked.

func (*NodeEventTracker) GetNodeData

func (net *NodeEventTracker) GetNodeData(peerID string) *NodeData

GetNodeData returns the NodeData for the node with the given peer ID, or nil if no NodeData exists for that peer ID.

func (*NodeEventTracker) GetUpdatedNodes

func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData

GetUpdatedNodes returns a slice of NodeData for nodes that have been updated since the given time. It filters the full node data set to only those updated after the passed in time, sorts the filtered results by update timestamp, and returns the sorted slice.

func (*NodeEventTracker) HandleMessage

func (net *NodeEventTracker) HandleMessage(msg *pubsub.Message)

HandleMessage unmarshals the received pubsub message into a NodeData struct, and passes it to HandleNodeData for further processing. This allows the NodeEventTracker to handle incoming node data messages from the pubsub layer.

func (*NodeEventTracker) HandleNodeData

func (net *NodeEventTracker) HandleNodeData(data NodeData)

HandleNodeData processes incoming NodeData from the pubsub layer. It adds new NodeData to the nodeData map, checks for replay attacks, and reconciles discrepancies between incoming and existing NodeData. This allows the tracker to maintain an up-to-date view of the node topology based on pubsub messages.

func (*NodeEventTracker) IsStaked

func (net *NodeEventTracker) IsStaked(peerID string) bool

IsStaked returns whether the node with the given peerID is marked as staked in the node data tracker. Returns false if no node data is found for the given peerID.

func (*NodeEventTracker) Listen

func (net *NodeEventTracker) Listen(n network.Network, a ma.Multiaddr)

Listen is called when the node starts listening on a new multiaddr. It logs the network and address that listening started on.

func (*NodeEventTracker) ListenClose

func (net *NodeEventTracker) ListenClose(n network.Network, a ma.Multiaddr)

ListenClose logs when the node stops listening on a multiaddr. It logs the network and multiaddr that was stopped listening on.

func (*NodeEventTracker) LoadNodeData

func (net *NodeEventTracker) LoadNodeData() error

LoadNodeData loads the node data from a JSON file. It determines the file path based on the configured data directory, defaulting to nodeDataFile if not set. It logs any errors reading or parsing the file. This allows initializing the node data tracker from persisted data.

func (*NodeEventTracker) RefreshFromBoot

func (net *NodeEventTracker) RefreshFromBoot(data NodeData)

RefreshFromBoot updates the node data map with the provided NodeData when the node boots up. It associates the NodeData with the peer ID string as the map key.

type NodeLifecycleEvent

type NodeLifecycleEvent struct {
	EventType string `json:"eventType"` // "join" or "leave"
	NodeID    string `json:"nodeID"`
	Nonce     int64  `json:"nonce"`
	Timestamp int64  `json:"timestamp"`
}

NodeLifecycleEvent represents a join or leave event

type PublicKeyMessage

type PublicKeyMessage struct {
	PublicKey string `json:"publicKey"`
	Signature string `json:"signature"`
	Data      string `json:"data"`
}

PublicKeyMessage represents the structure of the public key messages.

type PublicKeyPublisher

type PublicKeyPublisher struct {
	// contains filtered or unexported fields
}

func NewPublicKeyPublisher

func NewPublicKeyPublisher(manager *Manager, pubKey libp2pCrypto.PubKey) *PublicKeyPublisher

NewPublicKeyPublisher creates a new instance of PublicKeyPublisher.

func (*PublicKeyPublisher) GetPublishedMessages

func (p *PublicKeyPublisher) GetPublishedMessages() []PublicKeyMessage

Optionally, add a method to retrieve the stored messages

func (*PublicKeyPublisher) PublishNodePublicKey

func (p *PublicKeyPublisher) PublishNodePublicKey(publicKey string, data, signature []byte) error

PublishNodePublicKey publishes the node's public key to the designated topic.

type PublicKeySubscriptionHandler

type PublicKeySubscriptionHandler struct {
	PublicKeys  []PublicKeyMessage
	PubKeyTopic *pubsub.Topic
}

PublicKeySubscriptionHandler handles incoming messages on public key topics.

func (*PublicKeySubscriptionHandler) GetPublicKeys

func (handler *PublicKeySubscriptionHandler) GetPublicKeys() []PublicKeyMessage

GetPublicKeys returns the list of PublicKeyMessages.

func (*PublicKeySubscriptionHandler) HandleMessage

func (handler *PublicKeySubscriptionHandler) HandleMessage(m *pubsub.Message)

HandleMessage handles incoming public key messages, with verification and update logic.

type SafeMap

type SafeMap struct {
	// contains filtered or unexported fields
}

func NewSafeMap

func NewSafeMap() *SafeMap

func (*SafeMap) Delete

func (sm *SafeMap) Delete(key string)

func (*SafeMap) DumpNodeData

func (sm *SafeMap) DumpNodeData(filePath string) error

DumpNodeData writes the entire nodeData map to a file in JSON format.

func (*SafeMap) Get

func (sm *SafeMap) Get(key string) (*NodeData, bool)

func (*SafeMap) GetStakedNodesSlice

func (sm *SafeMap) GetStakedNodesSlice() []NodeData

func (*SafeMap) Len

func (sm *SafeMap) Len() int

func (*SafeMap) LoadNodeData

func (sm *SafeMap) LoadNodeData(filePath string) error

LoadNodeData reads nodeData from a file in JSON format and loads it into the map.

func (*SafeMap) MarshalJSON

func (sm *SafeMap) MarshalJSON() ([]byte, error)

MarshalJSON override json MarshalJSON to just return the map

func (*SafeMap) Set

func (sm *SafeMap) Set(key string, value *NodeData)

func (*SafeMap) UnmarshalJSON

func (sm *SafeMap) UnmarshalJSON(b []byte) error

UnmarshalJSON override json UnmarshalJSON to just set the map

type SubscriptionHandler

type SubscriptionHandler interface {
	HandleMessage(msg *pubsub.Message)
}

SubscriptionHandler defines the interface for handling pubsub messages. Implementations should subscribe to topics and handle incoming messages.

type TopicHandler

type TopicHandler struct {
	Subscription *pubsub.Subscription
}

TopicHandler is responsible for handling messages from subscribed topics.

func NewTopicHandler

func NewTopicHandler() *TopicHandler

NewTopicHandler creates a new TopicHandler with necessary initializations.

func (*TopicHandler) HandleMessage

func (h *TopicHandler) HandleMessage(msg *pubsub.Message)

HandleMessage processes messages received on the subscribed topics.

func (*TopicHandler) StartListening

func (h *TopicHandler) StartListening()

StartListening starts listening to messages on the subscribed topic.

type WorkerEventTracker

type WorkerEventTracker struct {
	Workers     []Workers
	WorkerTopic *pubsub.Topic

	WorkerStatusCh chan []byte
	// contains filtered or unexported fields
}

WorkerEventTracker is a struct that handles subscriptions for worker status updates. It contains the following fields: - WorkerTracker: A slice of WorkerTracker structs representing the status of workers. - Data: A byte slice containing the raw data received from subscriptions. - mu: A sync.Mutex used for synchronizing access to the handler's fields. - WorkerCh: A channel for sending worker status updates as byte slices.

func (*WorkerEventTracker) HandleMessage

func (h *WorkerEventTracker) HandleMessage(m *pubsub.Message)

HandleMessage implements subscription WorkerEventTracker handler

type WorkerStatus

type WorkerStatus struct {
	PeerID string `json:"peerId"`
	Data   []byte
}

type WorkerStatusHandler

type WorkerStatusHandler struct {
	WorkerStatus       []WorkerStatus
	CompletedWorkTopic *pubsub.Topic

	WorkerStatusCh chan []byte
	// contains filtered or unexported fields
}

WorkerStatusHandler is a struct that handles subscriptions for worker status updates. It contains the following fields: - WorkerStatus: A slice of WorkerStatus structs representing the status of workers. - Data: A byte slice containing the raw data received from subscriptions. - mu: A sync.Mutex used for synchronizing access to the handler's fields. - WorkerCh: A channel for sending worker status updates as byte slices.

func (*WorkerStatusHandler) HandleMessage

func (h *WorkerStatusHandler) HandleMessage(message *pubsub.Message)

HandleMessage implement subscription handler here

type Workers

type Workers struct {
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL