canary

package
v1.1.0-beta.130 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2025 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultCanarySchedule = "@every 5m"
View Source
const ResourceTypeUpstream = "upstream"

Variables

View Source
var (
	CanaryScheduler              = cron.New()
	CanaryConfigFiles            []string
	DataFile                     string
	Executor                     bool
	LogPass, LogFail             bool
	MinimumTimeBetweenCanaryRuns = 10 * time.Second
	FuncScheduler                = cron.New()
)
View Source
var (
	ReconcilePageSize int

	// Only sync data created/updated in the last ReconcileMaxAge duration
	ReconcileMaxAge time.Duration

	// UpstreamConf is the global configuration for upstream
	UpstreamConf upstream.UpstreamConfig
)
View Source
var CanaryLastRuntimes = sync.Map{}
View Source
var CanaryStatusChannel chan CanaryStatusPayload
View Source
var CleanupCRDDeleteCanaries = &dutyjob.Job{
	Name:       "CleanupCRDDeletedCanaries",
	Schedule:   "@every 24h",
	RunNow:     true,
	Singleton:  true,
	JobHistory: true,
	Retention:  dutyjob.RetentionBalanced,
	Fn: func(ctx dutyjob.JobRuntime) error {
		var crdCanaries []models.Canary
		if err := ctx.DB().Select("id", "name", "namespace").
			Where("deleted_at IS NULL").
			Where("agent_id = ?", uuid.Nil.String()).
			Where("source LIKE 'kubernetes/%'").Find(&crdCanaries).Error; err != nil {
			return fmt.Errorf("failed to list all canaries with source=CRD: %w", err)
		}

		if len(crdCanaries) == 0 {
			return nil
		}

		canaryClient, err := ctx.KubernetesClient().
			GetClientByGroupVersionKind(v1.GroupVersion.Group, v1.GroupVersion.Version, "Canary")
		if err != nil {
			return fmt.Errorf("failed to get kubernetes client for canaries: %w", err)
		}

		for _, canary := range crdCanaries {
			if _, err := canaryClient.Namespace(canary.Namespace).Get(ctx, canary.Name, metav1.GetOptions{}); err != nil {
				var statusErr *apierrors.StatusError
				if errors.As(err, &statusErr) {
					if statusErr.ErrStatus.Reason == metav1.StatusReasonNotFound {
						if err := db.DeleteCanary(ctx.Context, canary.ID.String()); err != nil {
							ctx.History.AddErrorf("error deleting canary[%s]: %v", canary.ID, err)
						} else {
							ctx.History.IncrSuccess()
						}

						Unschedule(canary.ID.String())

						continue
					}
				}

				return fmt.Errorf("failed to delete canary %s/%s from kubernetes: %w", canary.Namespace, canary.Name, err)
			}
		}

		return nil
	},
}
View Source
var CleanupDeletedCanaryChecks = &dutyjob.Job{
	Name:       "CleanupDeletedCanaryChecks",
	Schedule:   "@every 1h",
	Singleton:  true,
	JobHistory: true,
	Retention:  dutyjob.RetentionBalanced,
	Fn: func(ctx dutyjob.JobRuntime) error {
		var rows []struct {
			ID string
		}

		if err := ctx.DB().Raw(`
        SELECT DISTINCT(canaries.id) FROM canaries
        INNER JOIN checks ON canaries.id = checks.canary_id
        WHERE
            checks.deleted_at IS NULL AND
            canaries.deleted_at IS NOT NULL
        `).Scan(&rows).Error; err != nil {
			return err
		}

		for _, r := range rows {
			if err := db.DeleteCanary(ctx.Context, r.ID); err != nil {
				ctx.History.AddErrorf("error deleting canary[%s]: %v", r.ID, err)
			} else {
				ctx.History.IncrSuccess()
			}
			Unschedule(r.ID)
		}
		return nil
	},
}
View Source
var PullUpstreamCanaries = &job.Job{
	Name:       "PullUpstreamCanaries",
	JobHistory: true,
	Singleton:  true,
	RunNow:     true,
	Schedule:   "@every 10m",
	Retention:  job.RetentionFew,
	Fn: func(ctx job.JobRuntime) error {
		ctx.History.ResourceType = ResourceTypeUpstream
		ctx.History.ResourceID = UpstreamConf.Host
		count, err := pull(ctx.Context, UpstreamConf)
		ctx.History.SuccessCount = count
		return err
	},
}
View Source
var ReconcileCanaries = &job.Job{
	Name:       "ReconcileCanaries",
	Schedule:   "@every 1m",
	Retention:  job.RetentionBalanced,
	Singleton:  true,
	JobHistory: true,
	RunNow:     true,
	Fn: func(ctx job.JobRuntime) error {
		ctx.History.ResourceType = job.ResourceTypeUpstream
		ctx.History.ResourceID = UpstreamConf.Host
		tablesToReconcile := []string{"canaries", "checks", "check_statuses", "check_config_relationships"}
		summary := upstream.ReconcileSome(ctx.Context, UpstreamConf, ReconcilePageSize, tablesToReconcile...)
		ctx.History.AddDetails("summary", summary)
		ctx.History.SuccessCount, ctx.History.ErrorCount = summary.GetSuccessFailure()
		if summary.Error() != nil {
			ctx.History.AddDetails("errors", summary.Error())
		}

		return nil
	},
}
View Source
var SyncCanaryJobs = &job.Job{
	Name:       "SyncCanaryJobs",
	JobHistory: true,
	Singleton:  true,
	RunNow:     true,
	Schedule:   "@every 5m",
	Retention:  job.RetentionFew,
	Fn: func(ctx job.JobRuntime) error {
		canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace)
		if err != nil {
			return err
		}

		ctx.Logger.V(1).Infof("syncing canary jobs for %d canaries", len(canaries))

		existingIDsInCron := getAllCanaryIDsInCron()
		idsInNewFetch := make([]string, 0, len(canaries))
		for _, c := range canaries {
			idsInNewFetch = append(idsInNewFetch, c.ID.String())
			if err := SyncCanaryJob(ctx.Context, c); err != nil {

				jobHistory := models.NewJobHistory(ctx.Logger, "SyncCanary", "canary", c.ID.String()).Start()
				logger.Errorf("Error syncing canary[%s]: %v", c.ID, err.Error())
				logIfError(jobHistory.AddError(err.Error()).End().Persist(ctx.DB()), "failed to persist job history [CanarySync]")

				ctx.History.AddError(err.Error())
				continue
			} else {
				ctx.History.IncrSuccess()
			}
		}

		idsToRemoveFromCron := utils.SetDifference(existingIDsInCron, idsInNewFetch)
		for _, id := range idsToRemoveFromCron {
			Unschedule(id)
		}
		return nil
	},
}

Functions

func FormCheckRelationships added in v1.0.227

func FormCheckRelationships(ctx context.Context, result *pkg.CheckResult) error

FormCheckRelationships forms check relationships with components and configs based on the lookup expressions in the check spec.

func SaveResults

func SaveResults(ctx context.Context, results []*pkg.CheckResult) ([]string, map[string]string, error)

func ScanCanaryConfigs

func ScanCanaryConfigs(ctx context.Context)

func StartScanCanaryConfigs

func StartScanCanaryConfigs(ctx context.Context, dataFile string, configFiles []string)

func SyncCanaryJob

func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error

func TriggerAt added in v1.0.197

func TriggerAt(ctx context.Context, dbCanary pkg.Canary, runAt time.Time) error

func Unschedule added in v1.0.197

func Unschedule(id string)

func UpdateCanaryStatusAndEvent added in v1.0.227

func UpdateCanaryStatusAndEvent(ctx context.Context, canary v1.Canary, results []*pkg.CheckResult)

Types

type CanaryJob

type CanaryJob struct {
	Canary   v1.Canary
	DBCanary pkg.Canary
}

func (CanaryJob) GetNamespacedName

func (j CanaryJob) GetNamespacedName() types.NamespacedName

func (CanaryJob) Run

func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error

type CanaryPullResponse added in v1.0.36

type CanaryPullResponse struct {
	Before   time.Time       `json:"before"`
	Canaries []models.Canary `json:"canaries,omitempty"`
}

type CanaryStatusPayload added in v0.38.213

type CanaryStatusPayload struct {
	Status               v1.CanaryStatusCondition
	CheckStatus          map[string]*v1.CheckStatus
	FailEvents           []string
	LastTransitionedTime *metav1.Time
	Message              string
	ErrorMessage         string
	Uptime               string
	Latency              string
	NamespacedName       types.NamespacedName
}

type RelatableCheck added in v1.0.129

type RelatableCheck interface {
	GetRelationship() *v1.CheckRelationship
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL