Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var SensorCmd = &cobra.Command{ Use: "sensor [nodes file]", Short: "Start a devp2p sensor that discovers other peers and will receive blocks and transactions.", Long: "If no nodes.json file exists, it will be created.", Args: cobra.MinimumNArgs(1), PreRunE: func(cmd *cobra.Command, args []string) (err error) { inputSensorParams.NodesFile = args[0] inputSensorParams.nodes, err = p2p.ReadNodeSet(inputSensorParams.NodesFile) if err != nil { log.Warn().Err(err).Msgf("Creating nodes file %v because it does not exist", inputSensorParams.NodesFile) } if len(inputSensorParams.TrustedNodesFile) > 0 { inputSensorParams.trustedNodes, err = p2p.ReadNodeSet(inputSensorParams.TrustedNodesFile) if err != nil { log.Warn().Err(err).Msgf("Trusted nodes file %v not found", inputSensorParams.TrustedNodesFile) } } if len(inputSensorParams.Bootnodes) > 0 { inputSensorParams.bootnodes, err = p2p.ParseBootnodes(inputSensorParams.Bootnodes) if err != nil { return fmt.Errorf("unable to parse bootnodes: %w", err) } } if inputSensorParams.NetworkID == 0 { return errors.New("network ID must be greater than zero") } if inputSensorParams.ShouldRunPprof { go handlePprof() } if inputSensorParams.ShouldRunPrometheus { go handlePrometheus() } inputSensorParams.privateKey, err = crypto.GenerateKey() if err != nil { return err } if len(inputSensorParams.KeyFile) > 0 { var privateKey *ecdsa.PrivateKey privateKey, err = crypto.LoadECDSA(inputSensorParams.KeyFile) if err != nil { log.Warn().Err(err).Msg("Key file was not found, generating a new key file") err = crypto.SaveECDSA(inputSensorParams.KeyFile, inputSensorParams.privateKey) if err != nil { return err } } else { inputSensorParams.privateKey = privateKey } } if len(inputSensorParams.PrivateKey) > 0 { inputSensorParams.privateKey, err = crypto.HexToECDSA(inputSensorParams.PrivateKey) if err != nil { log.Error().Err(err).Msg("Failed to parse PrivateKey") return err } } inputSensorParams.nat, err = nat.Parse(inputSensorParams.NAT) if err != nil { log.Error().Err(err).Msg("Failed to parse NAT") return err } return nil }, RunE: func(cmd *cobra.Command, args []string) error { db := database.NewDatastore(cmd.Context(), database.DatastoreOptions{ ProjectID: inputSensorParams.ProjectID, DatabaseID: inputSensorParams.DatabaseID, SensorID: inputSensorParams.SensorID, MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, ShouldWritePeers: inputSensorParams.ShouldWritePeers, TTL: inputSensorParams.TTL, }) block, err := getLatestBlock(inputSensorParams.RPC) if err != nil { return err } head := p2p.HeadBlock{ Hash: block.Hash.ToHash(), TotalDifficulty: block.TotalDifficulty.ToBigInt(), Number: block.Number.ToUint64(), } peersGauge := promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "sensor", Name: "peers", Help: "The number of peers the sensor is connected to", }) msgCounter := promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "sensor", Name: "messages", Help: "The number and type of messages the sensor has received", }, []string{"message", "url", "name"}) opts := p2p.EthProtocolOptions{ Context: cmd.Context(), Database: db, GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), RPC: inputSensorParams.RPC, SensorID: inputSensorParams.SensorID, NetworkID: inputSensorParams.NetworkID, Peers: make(chan *enode.Node), Head: &head, HeadMutex: &sync.RWMutex{}, ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, MsgCounter: msgCounter, } config := ethp2p.Config{ PrivateKey: inputSensorParams.privateKey, BootstrapNodes: inputSensorParams.bootnodes, TrustedNodes: inputSensorParams.trustedNodes, MaxPeers: inputSensorParams.MaxPeers, ListenAddr: fmt.Sprintf(":%d", inputSensorParams.Port), DiscAddr: fmt.Sprintf(":%d", inputSensorParams.DiscoveryPort), DialRatio: inputSensorParams.DialRatio, NAT: inputSensorParams.nat, DiscoveryV4: true, DiscoveryV5: true, Protocols: []ethp2p.Protocol{ p2p.NewEthProtocol(66, opts), p2p.NewEthProtocol(67, opts), p2p.NewEthProtocol(68, opts), }, } if inputSensorParams.QuickStart { config.StaticNodes = inputSensorParams.nodes } server := ethp2p.Server{Config: config} log.Info().Str("enode", server.Self().URLv4()).Msg("Starting sensor") if err = server.Start(); err != nil { return err } defer server.Stop() events := make(chan *ethp2p.PeerEvent) sub := server.SubscribeEvents(events) defer sub.Unsubscribe() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) peers := make(map[enode.ID]string) for _, node := range inputSensorParams.nodes { peers[node.ID()] = node.URLv4() } go handleAPI(&server, msgCounter) for { select { case <-ticker.C: peersGauge.Set(float64(server.PeerCount())) if err := removePeerMessages(msgCounter, server.Peers()); err != nil { log.Error().Err(err).Msg("Failed to clean up peer messages") } db.WritePeers(context.Background(), server.Peers()) case peer := <-opts.Peers: if _, ok := peers[peer.ID()]; !ok { peers[peer.ID()] = peer.URLv4() if err := p2p.WritePeers(inputSensorParams.NodesFile, peers); err != nil { log.Error().Err(err).Msg("Failed to write nodes to file") } } case <-signals: log.Info().Msg("Stopping sensor...") return nil case event := <-events: log.Debug().Any("event", event).Send() case err := <-sub.Err(): log.Error().Err(err).Send() } } }, }
SensorCmd represents the sensor command. This is responsible for starting a sensor and transmitting blocks and transactions to a database.
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.