Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var CleanupDeletedTopologyComponents = &job.Job{ Name: "CleanupComponents", Schedule: "@every 1h", Singleton: true, JobHistory: true, Retention: job.RetentionBalanced, Fn: func(ctx job.JobRuntime) error { var rows []struct { ID string } if err := ctx.DB().Raw(` SELECT DISTINCT(topologies.id) FROM topologies INNER JOIN components ON topologies.id = components.topology_id WHERE components.deleted_at IS NULL AND topologies.deleted_at IS NOT NULL `).Scan(&rows).Error; err != nil { return err } for _, r := range rows { if err := db.DeleteComponentsOfTopology(ctx.DB(), r.ID); err != nil { ctx.History.AddError(fmt.Sprintf("Error deleting components for topology[%s]: %v", r.ID, err)) } else { ctx.History.IncrSuccess() } DeleteTopologyJob(r.ID) } return nil }, }
View Source
var SyncTopology = &job.Job{ Name: "SyncTopology", Schedule: "@every 5m", JobHistory: true, Singleton: true, RunNow: true, Fn: func(ctx job.JobRuntime) error { var topologies []pkg.Topology if err := ctx.DB().Table("topologies").Where(duty.LocalFilter).Where("source != ?", models.SourcePush). Find(&topologies).Error; err != nil { return err } for _, topology := range topologies { if err := SyncTopologyJob(ctx.Context, topology); err != nil { ctx.History.AddError(err.Error()) } else { ctx.History.IncrSuccess() } } return nil }, }
View Source
var TopologyCRDReconcile = &job.Job{ Name: "TopologyCRDReconcile", Schedule: "@every 8h", Singleton: true, JobHistory: true, Retention: job.RetentionBalanced, Fn: func(ctx job.JobRuntime) error { var topologies []models.Topology if err := ctx.DB(). Select("id", "name", "namespace"). Where("source IN (?, ?)", models.SourceCRD, models.SourceTopology). Where(duty.LocalFilter). Find(&topologies).Error; err != nil { return err } client, err := ctx.KubernetesDynamicClient().GetClientByKind("Topology") if err != nil { return fmt.Errorf("error creating dynamic client for Topology: %w", err) } objs, err := client.List(ctx, metav1.ListOptions{}) if err != nil { return fmt.Errorf("error listing Topology kind: %w", err) } k8sIDs := lo.Map(objs.Items, func(obj unstructured.Unstructured, _ int) string { return string(obj.GetUID()) }) if len(k8sIDs) == 0 { ctx.Warnf("Skipping topology CRD cleanup due to zero topologies returned") return nil } for _, t := range topologies { if !slices.Contains(k8sIDs, t.ID.String()) { if err := db.DeleteTopology(ctx.DB(), t.ID.String()); err != nil { ctx.History.AddErrorf("error deleting topology[%s]: %v", t.ID, err) continue } DeleteTopologyJob(t.ID.String()) } ctx.History.IncrSuccess() } return nil }, }
View Source
var TopologyScheduler = cron.New()
Functions ¶
func DeleteTopologyJob ¶
func DeleteTopologyJob(id string)
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.