topology

package
v1.1.0-beta.98 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CleanupDeletedTopologyComponents = &job.Job{
	Name:       "CleanupComponents",
	Schedule:   "@every 1h",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionBalanced,
	Fn: func(ctx job.JobRuntime) error {
		var rows []struct {
			ID string
		}

		if err := ctx.DB().Raw(`
        SELECT DISTINCT(topologies.id) FROM topologies
        INNER JOIN components ON topologies.id = components.topology_id
        WHERE
            components.deleted_at IS NULL AND
            topologies.deleted_at IS NOT NULL
        `).Scan(&rows).Error; err != nil {
			return err
		}

		for _, r := range rows {
			if err := db.DeleteComponentsOfTopology(ctx.DB(), r.ID); err != nil {
				ctx.History.AddError(fmt.Sprintf("Error deleting components for topology[%s]: %v", r.ID, err))
			} else {
				ctx.History.IncrSuccess()
			}
			DeleteTopologyJob(r.ID)
		}
		return nil
	},
}
View Source
var SyncTopology = &job.Job{
	Name:       "SyncTopology",
	Schedule:   "@every 5m",
	JobHistory: true,
	Singleton:  true,
	RunNow:     true,
	Fn: func(ctx job.JobRuntime) error {
		var topologies []pkg.Topology
		if err := ctx.DB().Table("topologies").Where(duty.LocalFilter).Where("source != ?", models.SourcePush).
			Find(&topologies).Error; err != nil {
			return err
		}

		for _, topology := range topologies {
			if err := SyncTopologyJob(ctx.Context, topology); err != nil {
				ctx.History.AddError(err.Error())
			} else {
				ctx.History.IncrSuccess()
			}
		}
		return nil
	},
}
View Source
var TopologyCRDReconcile = &job.Job{
	Name:       "TopologyCRDReconcile",
	Schedule:   "@every 8h",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionBalanced,
	Fn: func(ctx job.JobRuntime) error {
		var topologies []models.Topology

		if err := ctx.DB().
			Select("id", "name", "namespace").
			Where("source IN (?, ?)", models.SourceCRD, models.SourceTopology).
			Where(duty.LocalFilter).
			Find(&topologies).Error; err != nil {
			return err
		}

		client, err := ctx.KubernetesClient().GetClientByKind("Topology")
		if err != nil {
			return fmt.Errorf("error creating dynamic client for Topology: %w", err)
		}

		objs, err := client.List(ctx, metav1.ListOptions{})
		if err != nil {
			return fmt.Errorf("error listing Topology kind: %w", err)
		}
		k8sIDs := lo.Map(objs.Items, func(obj unstructured.Unstructured, _ int) string { return string(obj.GetUID()) })
		if len(k8sIDs) == 0 {

			ctx.Warnf("Skipping topology CRD cleanup due to zero topologies returned")
			return nil
		}
		for _, t := range topologies {
			if !slices.Contains(k8sIDs, t.ID.String()) {

				if err := db.DeleteTopology(ctx.DB(), t.ID.String()); err != nil {
					ctx.History.AddErrorf("error deleting topology[%s]: %v", t.ID, err)
					continue
				}
				DeleteTopologyJob(t.ID.String())
			}

			ctx.History.IncrSuccess()
		}
		return nil
	},
}
View Source
var TopologyScheduler = cron.New()

Functions

func DeleteTopologyJob

func DeleteTopologyJob(id string)

func SyncTopologyJob

func SyncTopologyJob(ctx context.Context, t pkg.Topology) error

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL