analyzer

package
v0.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2025 License: GPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CMD = &cobra.Command{
	Use:   command,
	Short: "Read and analyze ssv node logs",
	RunE: func(cmd *cobra.Command, args []string) error {
		isValid, err := configs.Values.Analyzer.Validate()
		if !isValid {
			return err
		}

		fileDirectory := configs.Values.Analyzer.LogFilesDirectory
		files, err := os.ReadDir(fileDirectory)
		if err != nil {
			return err
		}

		var (
			wg                  sync.WaitGroup
			errChan             = make(chan error, len(files))
			peerRecordsChan     = make(chan report.PeerRecord)
			clientRecordsChan   = make(chan report.ClientRecord)
			operatorRecordsChan = make(chan map[uint32]report.OperatorRecord)
			doneChan            = make(chan any)
			clientReport        = report.NewClient()
			peersReport         = report.NewPeers()
			operatorReport      = report.NewOperator()
		)

		var totalFileSizeMB float64
		for _, file := range files {
			if file.IsDir() {
				continue
			}
			stat, err := os.Stat(path.Join(fileDirectory, file.Name()))
			if err != nil {
				slog.With("err", err.Error()).Warn("error fetching the file info, ignoring")
			}
			totalFileSizeMB += float64(stat.Size()) / (1024 * 1024)

			wg.Add(1)
			go func(filePath string) {
				defer wg.Done()
				analyzeFile(filePath, peerRecordsChan, clientRecordsChan, operatorRecordsChan, errChan)
			}(filepath.Join(fileDirectory, file.Name()))
		}

		go func() {
			wg.Wait()
			close(errChan)
			close(peerRecordsChan)
			close(clientRecordsChan)
			close(doneChan)
		}()

		progressTicker := time.NewTicker(3 * time.Second)

		fileOperatorRecords := make(map[uint32][]report.OperatorRecord)

		for {
			select {
			case <-progressTicker.C:
				slog.
					With("count", len(files)).
					With("filesSizeMB", math.Round(totalFileSizeMB)).
					Info("⏳⏳⏳ processing file(s)...")
			case peerRecord, isOpen := <-peerRecordsChan:
				if isOpen {
					peersReport.AddRecord(peerRecord)
				}
			case clientRecord, isOpen := <-clientRecordsChan:
				if isOpen {
					clientReport.AddRecord(clientRecord)
				}
			case operatorRecord, isOpen := <-operatorRecordsChan:
				if isOpen {
					keys := maps.Keys(operatorRecord)
					for key := range keys {
						fileOperatorRecords[key] = append(fileOperatorRecords[key], operatorRecord[key])
					}
				}
			case err := <-errChan:
				if err != nil {
					return err
				}
			case <-doneChan:
				for _, records := range fileOperatorRecords {
					operatorStats := make(map[uint32]report.OperatorRecord)

					commitAvgTotal := make(map[uint32]time.Duration)
					commitAvgRecordCount := make(map[uint32]uint32)
					commitDelayHighest := make(map[uint32]time.Duration)
					commitDelayedPercentTotal := make(map[uint32]map[time.Duration]float32)
					commitDelayedRecordCount := make(map[uint32]map[time.Duration]uint16)

					prepareAvgTotal := make(map[uint32]time.Duration)
					prepareAvgRecordCount := make(map[uint32]uint32)
					prepareDelayHighest := make(map[uint32]time.Duration)
					prepareDelayedPercentTotal := make(map[uint32]map[time.Duration]float32)
					prepareDelayedRecordCount := make(map[uint32]map[time.Duration]uint16)

					consensusAvgTotal := make(map[uint32]time.Duration)
					consensusAvgRecordCount := make(map[uint32]uint32)

					consensusDuplicateBlockRootSubmissionsPercentTotal := make(map[uint32]float32)
					consensusDuplicateBlockRootSubmissionsPercentRecordCount := make(map[uint32]uint32)

					for _, record := range records {
						operatorStats[record.OperatorID] = report.OperatorRecord{
							OperatorID:        record.OperatorID,
							Clusters:          record.Clusters,
							IsLogFileOwner:    record.IsLogFileOwner,
							CommitTotalCount:  operatorStats[record.OperatorID].CommitTotalCount + record.CommitTotalCount,
							PrepareTotalCount: operatorStats[record.OperatorID].PrepareTotalCount + record.PrepareTotalCount,
						}
						commitAvgTotal[record.OperatorID] += record.CommitDelayAvg
						commitAvgRecordCount[record.OperatorID]++

						prepareAvgTotal[record.OperatorID] += record.PrepareDelayAvg
						prepareAvgRecordCount[record.OperatorID]++

						if commitDelayHighest[record.OperatorID] < record.CommitDelayHighest {
							commitDelayHighest[record.OperatorID] = record.CommitDelayHighest
						}

						if prepareDelayHighest[record.OperatorID] < record.PrepareDelayHighest {
							prepareDelayHighest[record.OperatorID] = record.PrepareDelayHighest
						}

						consensusAvgTotal[record.OperatorID] += record.ConsensusTimeAvg
						consensusAvgRecordCount[record.OperatorID]++

						consensusDuplicateBlockRootSubmissionsPercentTotal[record.OperatorID] += record.ConsensusDuplicateBlockRootSubmissionsPercent
						consensusDuplicateBlockRootSubmissionsPercentRecordCount[record.OperatorID]++

						for delay, percent := range record.CommitDelayPercent {
							_, ok := commitDelayedPercentTotal[record.OperatorID][delay]
							if !ok {
								commitDelayedPercentTotal[record.OperatorID] = make(map[time.Duration]float32)
							}
							_, ok = commitDelayedRecordCount[record.OperatorID][delay]
							if !ok {
								commitDelayedRecordCount[record.OperatorID] = make(map[time.Duration]uint16)
							}

							commitDelayedPercentTotal[record.OperatorID][delay] += percent
							commitDelayedRecordCount[record.OperatorID][delay]++
						}

						for delay, percent := range record.PrepareDelayPercent {
							_, ok := prepareDelayedPercentTotal[record.OperatorID][delay]
							if !ok {
								prepareDelayedPercentTotal[record.OperatorID] = make(map[time.Duration]float32)
							}
							_, ok = prepareDelayedRecordCount[record.OperatorID][delay]
							if !ok {
								prepareDelayedRecordCount[record.OperatorID] = make(map[time.Duration]uint16)
							}
							prepareDelayedPercentTotal[record.OperatorID][delay] += percent
							prepareDelayedRecordCount[record.OperatorID][delay]++
						}
					}

					for operatorID, record := range operatorStats {
						record.CommitDelayAvg = commitAvgTotal[operatorID] / time.Duration(commitAvgRecordCount[operatorID])
						record.CommitDelayHighest = commitDelayHighest[operatorID]

						for delayDuration, percentSum := range commitDelayedPercentTotal[operatorID] {
							_, ok := record.CommitDelayPercent[delayDuration]
							if !ok {
								record.CommitDelayPercent = make(map[time.Duration]float32)
							}
							record.CommitDelayPercent[delayDuration] = percentSum / float32(commitDelayedRecordCount[operatorID][delayDuration])
						}

						record.PrepareDelayAvg = prepareAvgTotal[operatorID] / time.Duration(prepareAvgRecordCount[operatorID])
						record.PrepareDelayHighest = prepareDelayHighest[operatorID]

						for delayDuration, percentSum := range prepareDelayedPercentTotal[operatorID] {
							_, ok := record.PrepareDelayPercent[delayDuration]
							if !ok {
								record.PrepareDelayPercent = make(map[time.Duration]float32)
							}
							record.PrepareDelayPercent[delayDuration] = percentSum / float32(prepareDelayedRecordCount[operatorID][delayDuration])
						}

						record.ConsensusTimeAvg = consensusAvgTotal[operatorID] / time.Duration(consensusAvgRecordCount[operatorID])
						record.ConsensusDuplicateBlockRootSubmissionsPercent =
							consensusDuplicateBlockRootSubmissionsPercentTotal[operatorID] / float32(consensusDuplicateBlockRootSubmissionsPercentRecordCount[operatorID])

						operatorStats[operatorID] = record
					}

					stats := slices.Collect(maps.Values(operatorStats))

					sort.Slice(stats, func(i, j int) bool {
						if stats[i].IsLogFileOwner {
							return true
						}
						if stats[j].IsLogFileOwner {
							return false
						}
						return false
					})

					for _, record := range stats {
						operatorReport.AddRecord(record)
					}
				}

				clientReport.Render()
				peersReport.Render()
				operatorReport.Render()
				return nil
			}
		}
	},
}

Functions

This section is empty.

Types

type AnalyzerResult

type AnalyzerResult struct {
	OperatorStats []OperatorStats
}

type OperatorStats

type OperatorStats struct {
	OperatorID     uint32
	IsLogFileOwner bool
	Clusters       [][]uint32

	CommitTotalDelay,
	CommitDelayAvg,
	CommitDelayHighest time.Duration
	CommitDelayPercent map[time.Duration]float32
	CommitCount        uint16

	ConsensusClientResponseTimeAvg,
	ConsensusClientResponseTimeP10,
	ConsensusClientResponseTimeP50,
	ConsensusClientResponseTimeP90 time.Duration
	ConsensusClientResponseTimeDelayPercent map[time.Duration]float32

	SSVClientCrashesTotal,
	SSVClientELUnhealthy,
	SSVClientCLUnhealthy uint16

	PrepareDelayAvg,
	PrepareDelayHighest time.Duration
	PrepareDelayPercent map[time.Duration]float32
	PrepareCount        uint16

	ConsensusTimeAvg                              time.Duration
	ConsensusDuplicateBlockRootSubmissionsPercent float32

	PeersCountAvg         parser.Metric[float64]
	PeerSSVClientVersions []string
	PeerID                string
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func New

func New(
	peersAnalyzer peersAnalyzer,
	consensusAnalyzer consensusAnalyzer,
	operatorAnalyzer operatorAnalyzer,
	clientAnalyzer clientAnalyzer,
	commitAnalyzer commitAnalyzer,
	prepareAnalyzer prepareAnalyzer,
	operators []uint32,
	cluster bool) (*Service, error)

func (*Service) Start

func (s *Service) Start() (AnalyzerResult, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL