topology

package
v1.1.0-beta.116 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultDepth = 3

Variables

View Source
var CleanupCanaries = &job.Job{
	Name:       "CleanupCanaries",
	Schedule:   "@every 12h",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionBalanced,
	RunNow:     true,
	Fn: func(ctx job.JobRuntime) error {
		retention := ctx.Properties().Duration("canary.retention.age", DefaultRetention)
		tx := ctx.DB().Exec(`
		DELETE FROM canaries
		WHERE
				id NOT IN (SELECT canary_id FROM checks) AND
				(NOW() - deleted_at) > INTERVAL '1 second' * ?
		`, int64(retention.Seconds()))

		ctx.History.SuccessCount = int(tx.RowsAffected)
		return tx.Error
	},
}
View Source
var CleanupChecks = &job.Job{
	Name:       "CleanupChecks",
	Schedule:   "@every 12h",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionBalanced,
	Fn: func(ctx job.JobRuntime) error {
		retention := ctx.Properties().Duration("check.retention.age", DefaultRetention)
		tx := ctx.DB().Exec(`
					DELETE FROM checks
					WHERE
							id NOT IN (SELECT check_id FROM evidences WHERE check_id IS NOT NULL) AND
							(NOW() - deleted_at) > INTERVAL '1 second' * ?
					`, int64(retention.Seconds()),
		)

		ctx.History.SuccessCount = int(tx.RowsAffected)
		return tx.Error
	},
}
View Source
var CleanupMetricsGauges = &job.Job{
	Name:       "CleanupMetricsGauges",
	Schedule:   "@every 1h",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionBalanced,
	RunNow:     true,
	Fn: func(ctx job.JobRuntime) error {

		sevenDaysAgo := time.Now().Add(-time.Hour * 24 * 7)
		var deletedCheckIDs []string
		if err := ctx.DB().Model(&models.Check{}).Where("deleted_at > ?", sevenDaysAgo).Pluck("id", &deletedCheckIDs).Error; err != nil {
			return fmt.Errorf("error finding deleted checks: %v", err)
		}

		if ctx.IsDebug() {
			ctx.Debugf("Found %d deleted checks since %s", len(deletedCheckIDs), sevenDaysAgo.Format("2006-01-02 15:04:05"))
		}
		for _, id := range deletedCheckIDs {
			if metrics.Gauge.DeletePartialMatch(prometheus.Labels{"key": id}) > 0 {
				logger.Debugf("Deleted gauge for check: %s", id)
				ctx.History.IncrSuccess()
			}
		}
		return nil
	},
}

CleanupMetricsGauges removes gauges for checks that no longer exist.

View Source
var CleanupSoftDeletedComponents = &job.Job{
	Name:       "CleanupSoftDeletedComponents",
	Schedule:   "@every 24h",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionBalanced,
	Fn: func(ctx job.JobRuntime) error {
		ctx.History.ResourceType = job.ResourceTypeComponent
		retention := ctx.Properties().Duration("component.retention.period", DefaultRetention)

		seconds := int64(retention.Seconds())

		linkedComponents := `
		SELECT component_id FROM evidences WHERE component_id IS NOT NULL
		UNION
		SELECT component_id FROM playbook_runs WHERE component_id IS NOT NULL
		`

		tx := ctx.Context.DB().Exec(
			fmt.Sprintf(`
				DELETE FROM component_relationships
					WHERE deleted_at < NOW() - interval '1 SECONDS' * ? OR
					component_id in (SELECT id FROM components WHERE id NOT IN (%s) AND deleted_at < NOW() - interval '1 SECONDS' * ?) OR
					relationship_id in (SELECT id FROM components WHERE id NOT IN (%s) AND deleted_at < NOW() - interval '1 SECONDS' * ?)
			`, linkedComponents, linkedComponents),
			seconds, seconds, seconds)
		if tx.Error != nil {
			return tx.Error
		}

		if tx := ctx.Context.DB().Exec("UPDATE components SET parent_id = null WHERE id IN (SELECT component_id FROM evidences WHERE component_id IS NOT NULL) AND parent_id is not null AND deleted_at < NOW() - interval '7 days'"); tx.Error != nil {
			return tx.Error
		}

		for {
			tx = ctx.Context.DB().Exec(fmt.Sprintf(`
				DELETE FROM components WHERE id in (
					SELECT id FROM components WHERE id NOT IN (%s) 
					AND deleted_at < NOW() - interval '1 SECONDS' * ?
					ORDER BY length(path) DESC LIMIT 1000
				)`, linkedComponents), seconds)
			if tx.Error != nil {
				return tx.Error
			}
			ctx.History.SuccessCount += int(tx.RowsAffected)

			if tx.RowsAffected == 0 {
				break
			}
		}

		return nil
	},
}
View Source
var ComponentCheckRun = &job.Job{
	Name:       "ComponentCheckRun",
	Schedule:   "@every 2m",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionFew,
	Fn: func(run job.JobRuntime) error {
		var components = []pkg.Component{}
		if err := run.DB().Table("components").
			Where("component_checks != 'null'").
			Where(duty.LocalFilter).
			Find(&components).Error; err != nil {
			return fmt.Errorf("error getting components: %v", err)
		}

		for _, component := range components {
			relationships, err := GetChecksForComponent(run.Context, &component)
			if err != nil {
				return err
			}
			err = syncCheckComponentRelationships(run.Context, component, relationships)
			if err != nil {
				run.History.AddError(fmt.Sprintf("error persisting relationships: %v", err))
				continue
			}
			run.History.IncrSuccess()
		}
		return nil
	},
}
View Source
var ComponentConfigRun = &job.Job{
	Name:       "ComponentConfigRun",
	Schedule:   "@every 2m",
	Singleton:  true,
	JobHistory: true,
	Retention:  job.RetentionFew,
	Fn: func(run job.JobRuntime) error {
		db := run.DB().Session(&gorm.Session{NewDB: true})

		var components []pkg.Component
		if err := db.Where(duty.LocalFilter).
			Select("id", "configs").
			Where("configs != 'null'").
			Find(&components).Error; err != nil {
			return fmt.Errorf("error getting components: %w", err)
		}

		var existingRelationships []ComponentConfigsRelationship
		if err := db.Model(&models.ConfigComponentRelationship{}).
			Select("component_id, ARRAY_AGG(config_id) AS config_ids").
			Where("deleted_at IS NULL").
			Group("component_id").Find(&existingRelationships).Error; err != nil {
			return fmt.Errorf("error fetching existing config_component_relationships: %w", err)
		}

		existingConfigIDsByComponentID := make(map[string][]string)
		for _, er := range existingRelationships {
			existingConfigIDsByComponentID[er.ComponentID] = er.ConfigIDs
		}

		for _, component := range components {
			if err := SyncComponentConfigRelationship(run.Context, component, existingConfigIDsByComponentID[component.ID.String()]); err != nil {
				run.History.AddError(fmt.Sprintf("error persisting config relationships: %v", err))
				continue
			}
			run.History.IncrSuccess()
		}

		cleanupQuery := `
            UPDATE config_component_relationships
            SET deleted_at = NOW()
            WHERE component_id IN (
                SELECT id FROM components WHERE configs = 'null'
            )
        `
		if err := db.Exec(cleanupQuery).Error; err != nil {
			return fmt.Errorf("error cleaning up old config_component_relationships: %w", err)
		}

		return nil
	},
}
View Source
var ComponentCostRun = &job.Job{
	Name:       "ComponentCostSync",
	JobHistory: true,
	Singleton:  true,
	Retention:  job.RetentionBalanced,
	Schedule:   "@every 1h",
	Fn: func(ctx job.JobRuntime) error {
		return ctx.DB().Exec(`
				WITH
				component_children AS (
						SELECT components.id, ARRAY(
								SELECT child_id FROM lookup_component_children(components.id::text, -1)
								UNION
								SELECT relationship_id as child_id FROM component_relationships WHERE component_id IN (
										SELECT child_id FROM lookup_component_children(components.id::text, -1)
								)
						) AS child_ids
						FROM components
						GROUP BY components.id
				),
				component_configs AS (
						SELECT component_children.id, ARRAY_AGG(ccr.config_id) as config_ids
						FROM component_children
						INNER JOIN config_component_relationships ccr ON ccr.component_id = ANY(component_children.child_ids)
						GROUP BY component_children.id
				),
				component_config_costs AS (
						SELECT
								component_configs.id,
								SUM(cost_per_minute) AS cost_per_minute,
								SUM(cost_total_1d) AS cost_total_1d,
								SUM(cost_total_7d) AS cost_total_7d,
								SUM(cost_total_30d) AS cost_total_30d
						FROM config_items
						INNER JOIN component_configs ON config_items.id = ANY(component_configs.config_ids)
						GROUP BY component_configs.id
				)

				UPDATE components
				SET
						cost_per_minute = component_config_costs.cost_per_minute,
						cost_total_1d = component_config_costs.cost_total_1d,
						cost_total_7d = component_config_costs.cost_total_7d,
						cost_total_30d = component_config_costs.cost_total_30d
				FROM component_config_costs
				WHERE components.id = component_config_costs.id
				`).Error
	},
}
View Source
var ComponentRelationshipSync = &job.Job{
	Name:       "ComponentRelationshipSync",
	Schedule:   "@every 5m",
	JobHistory: true,
	Retention:  job.RetentionFew,
	Singleton:  true,
	Fn: func(ctx job.JobRuntime) error {
		var components []models.Component
		if err := ctx.DB().Where(duty.LocalFilter).
			Where("selectors != 'null'").
			Find(&components).Error; err != nil {
			return fmt.Errorf("error getting components: %v", err)
		}

		for _, component := range components {
			hash := component.Selectors.Hash()
			if err := templateSelector(ctx.Context, &component); err != nil {
				ctx.History.AddError(err)
				continue
			}

			comps, err := query.FindComponents(ctx.Context, -1, component.Selectors...)
			if err != nil {
				ctx.History.AddError(fmt.Sprintf("error getting components with selectors: %v. err: %v", component.Selectors, err))
				continue
			}
			relationships := []models.ComponentRelationship{}
			compIDs := collections.Dedup(lo.Map(comps, func(comp models.Component, _ int) uuid.UUID { return comp.ID }))
			for _, cID := range compIDs {
				relationships = append(relationships, models.ComponentRelationship{
					RelationshipID:   component.ID,
					ComponentID:      cID,
					SelectorID:       hash,
					RelationshipPath: component.Path + "." + component.ID.String(),
				})
			}

			err = syncComponentRelationships(ctx.Context, component.ID, relationships)
			if err != nil {
				ctx.History.AddError(fmt.Sprintf("error syncing relationships: %v", err))
				continue
			}
			ctx.History.IncrSuccess()
		}

		cleanupQuery := `
            UPDATE component_relationships
            SET deleted_at = NOW()
            WHERE relationship_id IN (
                SELECT id FROM components WHERE selectors = 'null'
            )
        `
		if err := ctx.DB().Exec(cleanupQuery).Error; err != nil {
			return fmt.Errorf("error cleaning up dead component_relationships: %w", err)
		}

		return nil
	},
}
View Source
var ComponentStatusSummarySync = &job.Job{
	Name:       "ComponentStatusSummarySync",
	Schedule:   "@every 2m",
	JobHistory: true,
	Retention:  job.RetentionFew,
	Singleton:  true,
	Fn: func(ctx job.JobRuntime) error {
		topology, err := Query(ctx.Context, duty.TopologyOptions{Depth: 3})
		if err != nil {
			return fmt.Errorf("error getting components: %v", err)
		}

		for _, c := range topology.Components {
			tx := ctx.DB().Where("id = ? and (status != ? or summary != ?)", c.ID, c.Status, c.Summary).
				UpdateColumns(models.Component{Status: c.Status, Summary: c.Summary})
			if tx.Error != nil {
				ctx.History.AddError(tx.Error.Error())
			} else {
				ctx.History.IncrSuccess()
			}
		}

		return nil
	},
}
View Source
var DefaultRetention = time.Hour * 24 * 7

Functions

func GetChecksForComponent added in v1.0.152

func GetChecksForComponent(ctx context.Context, component *pkg.Component) ([]models.CheckComponentRelationship, error)

func NewTopologyParams

func NewTopologyParams(values url.Values) dutyQuery.TopologyOptions

func PersistConfigComponentRelationship added in v1.0.152

func PersistConfigComponentRelationship(db *gorm.DB, configID, componentID uuid.UUID, selectorID string) error

func PersistConfigComponentRelationships added in v1.0.193

func PersistConfigComponentRelationships(ctx context.Context, rels []models.ConfigComponentRelationship) error

func Run

func SyncComponentConfigRelationship added in v1.0.152

func SyncComponentConfigRelationship(ctx context.Context, component pkg.Component, existingConfigIDs []string) error

Types

type ComponentConfigsRelationship

type ComponentConfigsRelationship struct {
	ComponentID string
	ConfigIDs   pq.StringArray `gorm:"type:[]text"`
}

ComponentConfigsRelationship

type ComponentContext added in v0.38.135

type ComponentContext struct {
	Topology     v1.Topology
	ComponentAPI v1.Component
	// Components keep track of the components that properties can apply to,
	// properties can return a map of component names to properties to facilitate
	// queries that are more efficient to perform for all components rather than a component at a time
	Components *pkg.Components
	// Properties can either be looked up on an individual component, or act as a summary across all components
	CurrentComponent *pkg.Component

	JobHistory *models.JobHistory
	dutyContext.Context
	// contains filtered or unexported fields
}

func NewComponentContext added in v0.38.135

func NewComponentContext(ctx dutyContext.Context, system v1.Topology) *ComponentContext

func (*ComponentContext) Clone added in v0.38.135

func (c *ComponentContext) Clone() *ComponentContext

func (*ComponentContext) GetTemplater added in v0.38.193

func (c *ComponentContext) GetTemplater() gomplate.StructTemplater

func (*ComponentContext) SetCurrentComponent added in v0.38.193

func (c *ComponentContext) SetCurrentComponent(component *pkg.Component)

func (*ComponentContext) String added in v1.0.152

func (c *ComponentContext) String() string

func (*ComponentContext) TemplateComponent added in v0.38.193

func (c *ComponentContext) TemplateComponent(component *v1.ComponentSpec) error

func (*ComponentContext) TemplateConfig added in v0.38.193

func (c *ComponentContext) TemplateConfig(config *types.ConfigQuery) error

func (*ComponentContext) TemplateProperty added in v0.38.193

func (c *ComponentContext) TemplateProperty(property *v1.Property) error

func (*ComponentContext) TemplateStruct added in v0.38.193

func (c *ComponentContext) TemplateStruct(data interface{}) error

func (*ComponentContext) WithComponents added in v0.38.135

func (c *ComponentContext) WithComponents(components *pkg.Components, current *pkg.Component) *ComponentContext

type TopologyJob added in v1.0.197

type TopologyJob struct {
	Topology  v1.Topology
	Namespace string
	Output    pkg.Components
}

func (*TopologyJob) Run added in v1.0.197

func (tj *TopologyJob) Run(job job.JobRuntime) error

type TopologyRunOptions

type TopologyRunOptions struct {
	job.JobRuntime
	Depth     int
	Namespace string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL