Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var Task = inspection_task.NewInspectionProcessor(k8saudittask.ManifestGenerateTaskID, []string{ inspection_task.ReaderFactoryGeneratorTaskID, k8saudittask.TimelineGroupingTaskID, gcp_task.GCPDefaultK8sResourceMergeConfigTask.ID().ReferenceId().String(), }, func(ctx context.Context, taskMode int, v *task.VariableSet, tp *progress.TaskProgress) (any, error) { if taskMode == inspection_task.TaskModeDryRun { return struct{}{}, nil } groups, err := task.GetTypedVariableFromTaskVariable[[]*types.TimelineGrouperResult](v, k8saudittask.TimelineGroupingTaskID, nil) if err != nil { return nil, err } mergeConfigRegistry, err := task.GetTypedVariableFromTaskVariable[*model_k8s.MergeConfigRegistry](v, gcp_task.GCPDefaultK8sResourceMergeConfigTask.ID().ReferenceId().String(), nil) if err != nil { return nil, err } readerFactory, err := inspection_task.GetReaderFactoryFromTaskVariable(v) if err != nil { return nil, err } totalLogCount := 0 for _, group := range groups { totalLogCount += len(group.PreParsedLogs) } processedCount := atomic.Int32{} updator := progress.NewProgressUpdator(tp, time.Second, func(tp *progress.TaskProgress) { current := processedCount.Load() tp.Percentage = float32(current) / float32(totalLogCount) tp.Message = fmt.Sprintf("%d/%d", current, totalLogCount) }) err = updator.Start(ctx) if err != nil { return nil, err } defer updator.Done() workerPool := worker.NewPool(16) for _, group := range groups { currentGroup := group workerPool.Run(func() { prevRevisionBody := "" var prevRevisionReader *structure.Reader for _, log := range currentGroup.PreParsedLogs { var currentRevisionBodyType rtype.Type if log.Code != 0 || log.GeneratedFromDeleteCollectionOperation { log.ResourceBodyYaml = prevRevisionBody log.ResourceBodyReader = prevRevisionReader processedCount.Add(1) continue } currentRevisionReader := log.Response currentRevisionBodyType = log.ResponseType if currentRevisionReader == nil || log.ResponseType != rtype.RTypeUnknown { currentRevisionReader = log.Request currentRevisionBodyType = log.RequestType } if currentRevisionReader == nil { log.ResourceBodyYaml = bodyPlaceholderForMetadataLevelAuditLog processedCount.Add(1) continue } isPartial := currentRevisionBodyType == rtype.RTypePatch currentRevisionBody, err := currentRevisionReader.ToYaml("") if err != nil { slog.WarnContext(ctx, fmt.Sprintf("failed to serialize resource body to yaml\n%s", err.Error())) processedCount.Add(1) continue } currentRevisionBody = removeAtType(currentRevisionBody) if prevRevisionBody != "" && isPartial { mergeConfigResolver := mergeConfigRegistry.Get(log.Operation.APIVersion, log.Operation.GetSingularKindName()) mergedReader, err := readerFactory.NewReader(adapter.MergeYaml(prevRevisionBody, currentRevisionBody, mergeConfigResolver)) if err != nil { slog.WarnContext(ctx, fmt.Sprintf("failed to merge resource body\n%s", err.Error())) processedCount.Add(1) continue } mergedYaml, err := mergedReader.ToYaml("") if err != nil { slog.WarnContext(ctx, fmt.Sprintf("failed to read the merged resource body\n%s", err.Error())) processedCount.Add(1) continue } log.ResourceBodyYaml = mergedYaml log.ResourceBodyReader = mergedReader } else { if currentRevisionBodyType == rtype.RTypeDeleteOptions { log.ResourceBodyYaml = prevRevisionBody log.ResourceBodyReader = prevRevisionReader processedCount.Add(1) continue } log.ResourceBodyYaml = currentRevisionBody log.ResourceBodyReader = currentRevisionReader } prevRevisionBody = log.ResourceBodyYaml prevRevisionReader = log.ResourceBodyReader processedCount.Add(1) } }) } workerPool.Wait() return groups, nil })
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.