sensor

package
v0.1.66 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: AGPL-3.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var SensorCmd = &cobra.Command{
	Use:   "sensor [nodes file]",
	Short: "Start a devp2p sensor that discovers other peers and will receive blocks and transactions.",
	Long:  "If no nodes.json file exists, it will be created.",
	Args:  cobra.MinimumNArgs(1),
	PreRunE: func(cmd *cobra.Command, args []string) (err error) {
		inputSensorParams.NodesFile = args[0]
		inputSensorParams.nodes, err = p2p.ReadNodeSet(inputSensorParams.NodesFile)
		if err != nil {
			log.Warn().Err(err).Msgf("Creating nodes file %v because it does not exist", inputSensorParams.NodesFile)
		}

		if len(inputSensorParams.TrustedNodesFile) > 0 {
			inputSensorParams.trustedNodes, err = p2p.ReadNodeSet(inputSensorParams.TrustedNodesFile)
			if err != nil {
				log.Warn().Err(err).Msgf("Trusted nodes file %v not found", inputSensorParams.TrustedNodesFile)
			}
		}

		if len(inputSensorParams.Bootnodes) > 0 {
			inputSensorParams.bootnodes, err = p2p.ParseBootnodes(inputSensorParams.Bootnodes)
			if err != nil {
				return fmt.Errorf("unable to parse bootnodes: %w", err)
			}
		}

		if inputSensorParams.NetworkID == 0 {
			return errors.New("network ID must be greater than zero")
		}

		if inputSensorParams.ShouldRunPprof {
			go handlePprof()
		}

		if inputSensorParams.ShouldRunPrometheus {
			go handlePrometheus()
		}

		inputSensorParams.privateKey, err = crypto.GenerateKey()
		if err != nil {
			return err
		}

		if len(inputSensorParams.KeyFile) > 0 {
			var privateKey *ecdsa.PrivateKey
			privateKey, err = crypto.LoadECDSA(inputSensorParams.KeyFile)

			if err != nil {
				log.Warn().Err(err).Msg("Key file was not found, generating a new key file")

				err = crypto.SaveECDSA(inputSensorParams.KeyFile, inputSensorParams.privateKey)
				if err != nil {
					return err
				}
			} else {
				inputSensorParams.privateKey = privateKey
			}
		}

		if len(inputSensorParams.PrivateKey) > 0 {
			inputSensorParams.privateKey, err = crypto.HexToECDSA(inputSensorParams.PrivateKey)
			if err != nil {
				log.Error().Err(err).Msg("Failed to parse PrivateKey")
				return err
			}
		}

		inputSensorParams.nat, err = nat.Parse(inputSensorParams.NAT)
		if err != nil {
			log.Error().Err(err).Msg("Failed to parse NAT")
			return err
		}

		return nil
	},
	RunE: func(cmd *cobra.Command, args []string) error {
		db := database.NewDatastore(cmd.Context(), database.DatastoreOptions{
			ProjectID:                    inputSensorParams.ProjectID,
			DatabaseID:                   inputSensorParams.DatabaseID,
			SensorID:                     inputSensorParams.SensorID,
			MaxConcurrency:               inputSensorParams.MaxDatabaseConcurrency,
			ShouldWriteBlocks:            inputSensorParams.ShouldWriteBlocks,
			ShouldWriteBlockEvents:       inputSensorParams.ShouldWriteBlockEvents,
			ShouldWriteTransactions:      inputSensorParams.ShouldWriteTransactions,
			ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
			ShouldWritePeers:             inputSensorParams.ShouldWritePeers,
			TTL:                          inputSensorParams.TTL,
		})

		block, err := getLatestBlock(inputSensorParams.RPC)
		if err != nil {
			return err
		}
		head := p2p.HeadBlock{
			Hash:            block.Hash.ToHash(),
			TotalDifficulty: block.TotalDifficulty.ToBigInt(),
			Number:          block.Number.ToUint64(),
		}

		peersGauge := promauto.NewGauge(prometheus.GaugeOpts{
			Namespace: "sensor",
			Name:      "peers",
			Help:      "The number of peers the sensor is connected to",
		})

		msgCounter := promauto.NewCounterVec(prometheus.CounterOpts{
			Namespace: "sensor",
			Name:      "messages",
			Help:      "The number and type of messages the sensor has received",
		}, []string{"message", "url", "name"})

		opts := p2p.EthProtocolOptions{
			Context:     cmd.Context(),
			Database:    db,
			GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
			RPC:         inputSensorParams.RPC,
			SensorID:    inputSensorParams.SensorID,
			NetworkID:   inputSensorParams.NetworkID,
			Peers:       make(chan *enode.Node),
			Head:        &head,
			HeadMutex:   &sync.RWMutex{},
			ForkID:      forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
			MsgCounter:  msgCounter,
		}

		config := ethp2p.Config{
			PrivateKey:     inputSensorParams.privateKey,
			BootstrapNodes: inputSensorParams.bootnodes,
			TrustedNodes:   inputSensorParams.trustedNodes,
			MaxPeers:       inputSensorParams.MaxPeers,
			ListenAddr:     fmt.Sprintf(":%d", inputSensorParams.Port),
			DiscAddr:       fmt.Sprintf(":%d", inputSensorParams.DiscoveryPort),
			DialRatio:      inputSensorParams.DialRatio,
			NAT:            inputSensorParams.nat,
			DiscoveryV4:    true,
			DiscoveryV5:    true,
			Protocols: []ethp2p.Protocol{
				p2p.NewEthProtocol(66, opts),
				p2p.NewEthProtocol(67, opts),
				p2p.NewEthProtocol(68, opts),
			},
		}

		if inputSensorParams.QuickStart {
			config.StaticNodes = inputSensorParams.nodes
		}

		server := ethp2p.Server{Config: config}

		log.Info().Str("enode", server.Self().URLv4()).Msg("Starting sensor")

		if err = server.Start(); err != nil {
			return err
		}
		defer server.Stop()

		events := make(chan *ethp2p.PeerEvent)
		sub := server.SubscribeEvents(events)
		defer sub.Unsubscribe()

		ticker := time.NewTicker(2 * time.Second)
		hourlyTicker := time.NewTicker(time.Hour)
		defer ticker.Stop()
		defer hourlyTicker.Stop()

		signals := make(chan os.Signal, 1)
		signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

		peers := make(map[enode.ID]string)
		var peersMutex sync.Mutex

		for _, node := range inputSensorParams.nodes {

			peers[node.ID()] = node.URLv4()
		}

		go handleAPI(&server, msgCounter)

		go handleDNSDiscovery(&server)

		for {
			select {
			case <-ticker.C:
				peersGauge.Set(float64(server.PeerCount()))
				if err := removePeerMessages(msgCounter, server.Peers()); err != nil {
					log.Error().Err(err).Msg("Failed to clean up peer messages")
				}
				db.WritePeers(context.Background(), server.Peers(), time.Now())
			case peer := <-opts.Peers:

				peersMutex.Lock()

				if _, ok := peers[peer.ID()]; !ok {
					peers[peer.ID()] = peer.URLv4()

					if err := p2p.WritePeers(inputSensorParams.NodesFile, peers); err != nil {
						log.Error().Err(err).Msg("Failed to write nodes to file")
					}
				}
				peersMutex.Unlock()
			case <-hourlyTicker.C:
				go handleDNSDiscovery(&server)
			case <-signals:

				log.Info().Msg("Stopping sensor...")
				return nil
			case event := <-events:
				log.Debug().Any("event", event).Send()
			case err := <-sub.Err():
				log.Error().Err(err).Send()
			}
		}
	},
}

SensorCmd represents the sensor command. This is responsible for starting a sensor and transmitting blocks and transactions to a database.

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL