Documentation ¶
Index ¶
- Variables
- func MsgIDFunction(pmsg *pubsub_pb.Message) string
- type GossipSub
- type MessageMetrics
- func (c *MessageMetrics) AddMessgeToTopic(topic string) int32
- func (c *MessageMetrics) GetTopicMsgs(topic string) int32
- func (c *MessageMetrics) GetTotalMessages() int64
- func (c *MessageMetrics) NewTopic(topic string) bool
- func (c *MessageMetrics) ResetAllTopics() error
- func (c *MessageMetrics) ResetTopic(topic string) int32
- type TopicSubscription
Constants ¶
This section is empty.
Variables ¶
var ( ReceivedTotalMessages = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "crawler", Name: "total_received_messages_psec", Help: "The number of messages received in the last second", }) ReceivedMessages = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "crawler", Name: "received_messages_psec", Help: "Number of messages received per second on each topic", }, []string{"topic"}, ) )
var (
ModuleName = "GOSSIP-SUB"
)
Functions ¶
func MsgIDFunction ¶
WithMessageIdFn is an option to customize the way a message ID is computed for a pubsub message
Types ¶
type GossipSub ¶
type GossipSub struct { BasicHost *hosts.BasicLibp2pHost PeerStore *db.PeerStore PubsubService *pubsub.PubSub ExporterService *exporters.ExporterService // map where the key are the topic names in string, and the values are the TopicSubscription TopicArray map[string]*TopicSubscription MessageMetrics *MessageMetrics // contains filtered or unexported fields }
GossipSub sumarizes the control fields necesary to manage and govern the GossipSub internal service.
func NewEmptyGossipSub ¶
func NewEmptyGossipSub() *GossipSub
NewEmptyGossipSub: Sumarizes the control fields necesary to manage and govern over a joined and subscribed topic @return: gossipsub struct
func NewGossipSub ¶
func NewGossipSub(ctx context.Context, exporter *exporters.ExporterService, h *hosts.BasicLibp2pHost, peerstore *db.PeerStore) *GossipSub
NewGossipSub: Sumarizes the control fields necesary to manage and govern over a joined and subscribed topic. @param ctx: parent context for the gossip service. @param h: the libp2p.PubSub topic of the joined topic. @param peerstore: the peerstore where to sotre the data. @param stdOpts: list of options to generate the base of the gossipsub service. @return: pointer to GossipSub struct.
func (*GossipSub) JoinAndSubscribe ¶
JoinAndSubscribe: This method allows the GossipSub service to join and subscribe to a topic. @param topicName: name of the topic to subscribe. @return: pointer to GossipSub struct.
func (*GossipSub) ServeMetrics ¶ added in v1.1.0
func (gs *GossipSub) ServeMetrics()
ServePrometheusMetrics: This method will generate the metrics from GossipSub msg Metrics and serve the values to the local prometheus instance.
type MessageMetrics ¶
type MessageMetrics struct {
// contains filtered or unexported fields
}
MessageMetrics fgdgdfgdfgSummarizes all the metrics that could be obtained from the received msgs. Right now divided by topic and containing only the local counter between server ticker.
func NewMessageMetrics ¶
func NewMessageMetrics() MessageMetrics
NewMessageMetrics: @return intialized MessageMetrics struct
func (*MessageMetrics) AddMessgeToTopic ¶
func (c *MessageMetrics) AddMessgeToTopic(topic string) int32
AddMessgeToTopic: @param gossipsub topic name. @return curren message counter, or -1 if there was an error (non-existing topic).
func (*MessageMetrics) GetTopicMsgs ¶
func (c *MessageMetrics) GetTopicMsgs(topic string) int32
GetTopicMsgs: Obtain the counter of messages from last ticker of given topic. @return current message counter, or -1 if there was an error (non-existing topic).
func (*MessageMetrics) GetTotalMessages ¶
func (c *MessageMetrics) GetTotalMessages() int64
GetTotalMessages: Obtain the total of messages received from last ticker from all the topics. @return total message counter, or -1 if there was an error (non-existing topic).
func (*MessageMetrics) NewTopic ¶
func (c *MessageMetrics) NewTopic(topic string) bool
NewTopic: @param name of the topic @return a possitive boolean if the topic was already in Metrics, negative one otherwise
func (*MessageMetrics) ResetAllTopics ¶
func (c *MessageMetrics) ResetAllTopics() error
ResetAllTopics: Resets all the topic counters to 0. @return current message counter, or -1 if there was an error (non-existing topic).
func (*MessageMetrics) ResetTopic ¶
func (c *MessageMetrics) ResetTopic(topic string) int32
ResetTopic: @param gossipsub topic name. @return curren message counter, or -1 if there was an error (non-existing topic).
type TopicSubscription ¶
type TopicSubscription struct { // Messages is a channel of messages received from other peers in the chat room Messages chan []byte Topic *pubsub.Topic Sub *pubsub.Subscription MessageMetrics *MessageMetrics // contains filtered or unexported fields }
TopicSubscription Sumarizes the control fields necesary to manage and govern over a joined and subscribed topic like message logging or record. Serves as a server for a singe topic subscription.
func NewTopicSubscription ¶
func NewTopicSubscription(ctx context.Context, topic *pubsub.Topic, sub pubsub.Subscription, msgMetrics *MessageMetrics) *TopicSubscription
NewTopicSubscription: Sumarizes the control fields necesary to manage and govern over a joined and subscribed topic. @param ctx: parent context of the topic subscription, generally gossipsub context. @param topic: the libp2p.PubSub topic of the joined topic. @param sub: the libp2p.PubSub subscription of the subscribed topic. @param msgMetrics: underlaying message metrics regarding each of the joined topics. @param stdOpts: list of options to generate the base of the topic subscription service. @return: pointer to TopicSubscription.
func (*TopicSubscription) MessageReadingLoop ¶
func (c *TopicSubscription) MessageReadingLoop(h host.Host, peerstore *db.PeerStore)
MessageReadingLoop: Pulls messages from the pubsub topic and pushes them onto the Messages channel and the underlaying msg metrics. @param h: libp2p host. @param peerstore: peerstore of the crawler app.