Documentation ¶
Index ¶
- Constants
- Variables
- func ScanCanaryConfigs(ctx context.Context)
- func StartScanCanaryConfigs(ctx context.Context, dataFile string, configFiles []string)
- func StartUpstreamEventQueueConsumer(ctx context.Context) error
- func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error
- func TriggerAt(ctx context.Context, dbCanary pkg.Canary, runAt time.Time) error
- func Unschedule(id string)
- type CanaryJob
- type CanaryPullResponse
- type CanaryStatusPayload
- type RelatableCheck
Constants ¶
View Source
const ( EventPushQueueCreate = "push_queue.create" ResourceTypeUpstream = "upstream" )
Variables ¶
View Source
var ( ReconcilePageSize int // Only sync data created/updated in the last ReconcileMaxAge duration ReconcileMaxAge time.Duration // UpstreamConf is the global configuration for upstream UpstreamConf upstream.UpstreamConfig )
View Source
var CanaryConfigFiles []string
View Source
var CanaryScheduler = cron.New()
View Source
var CanaryStatusChannel chan CanaryStatusPayload
View Source
var CleanupDeletedCanaryChecks = &dutyjob.Job{ Name: "CleanupChecks", Schedule: "@every 1h", Singleton: true, JobHistory: true, Retention: dutyjob.RetentionDay, Fn: func(ctx dutyjob.JobRuntime) error { var rows []struct { ID string } if err := ctx.DB().Raw(` SELECT DISTINCT(canaries.id) FROM canaries INNER JOIN checks ON canaries.id = checks.canary_id WHERE checks.deleted_at IS NULL AND canaries.deleted_at IS NOT NULL `).Scan(&rows).Error; err != nil { return err } for _, r := range rows { if err := db.DeleteCanary(ctx.Context, r.ID); err != nil { ctx.History.AddError(fmt.Sprintf("Error deleting components for topology[%s]: %v", r.ID, err)) } else { ctx.History.IncrSuccess() } Unschedule(r.ID) } return nil }, }
View Source
var DataFile string
View Source
var Executor bool
View Source
var FuncScheduler = cron.New()
View Source
var LogPass, LogFail bool
View Source
var MinimumTimeBetweenCanaryRuns = 10 * time.Second
View Source
var PullUpstreamCanaries = job.Job{ Name: "PullUpstreamCanaries", JobHistory: true, Singleton: true, Schedule: "@every 10m", Retention: job.RetentionHour, Fn: func(ctx job.JobRuntime) error { ctx.History.ResourceType = ResourceTypeUpstream ctx.History.ResourceID = UpstreamConf.Host count, err := pull(ctx.Context, UpstreamConf) ctx.History.SuccessCount = count return err }, }
View Source
var ReconcileChecks = job.Job{ Name: "PushChecksToUpstream", JobHistory: true, Singleton: true, Retention: job.RetentionDay, RunNow: true, Schedule: "@every 30m", Fn: func(ctx job.JobRuntime) error { ctx.History.ResourceType = ResourceTypeUpstream ctx.History.ResourceID = UpstreamConf.Host if count, err := upstream.NewUpstreamReconciler(UpstreamConf, ReconcilePageSize). Sync(ctx.Context, "canaries"); err != nil { ctx.History.AddError(err.Error()) } else { ctx.History.SuccessCount += count } if count, err := upstream.NewUpstreamReconciler(UpstreamConf, ReconcilePageSize). Sync(ctx.Context, "checks"); err != nil { ctx.History.AddError(err.Error()) } else { ctx.History.SuccessCount += count } return nil }, }
View Source
var SyncCanaryJobs = &job.Job{ Name: "SyncCanaryJobs", JobHistory: true, Singleton: true, RunNow: true, Schedule: "@every 5m", Retention: job.RetentionHour, Fn: func(ctx job.JobRuntime) error { canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace) if err != nil { return err } existingIDsInCron := getAllCanaryIDsInCron() idsInNewFetch := make([]string, 0, len(canaries)) for _, c := range canaries { idsInNewFetch = append(idsInNewFetch, c.ID.String()) if err := SyncCanaryJob(ctx.Context, c); err != nil { jobHistory := models.NewJobHistory(ctx.Logger, "SyncCanary", "canary", c.ID.String()).Start() logger.Errorf("Error syncing canary[%s]: %v", c.ID, err.Error()) logIfError(jobHistory.AddError(err.Error()).End().Persist(ctx.DB()), "failed to persist job history [CanarySync]") ctx.History.AddError(err.Error()) continue } else { ctx.History.IncrSuccess() } } idsToRemoveFromCron := utils.SetDifference(existingIDsInCron, idsInNewFetch) for _, id := range idsToRemoveFromCron { Unschedule(id) } return nil }, }
View Source
var SyncCheckStatuses = job.Job{ Name: "SyncCheckStatusesWithUpstream", JobHistory: true, Singleton: true, Retention: job.RetentionHour, RunNow: true, Schedule: "@every 30s", Fn: func(ctx job.JobRuntime) error { ctx.History.ResourceType = ResourceTypeUpstream ctx.History.ResourceID = UpstreamConf.Host count, err := upstream.SyncCheckStatuses(ctx.Context, UpstreamConf, ReconcilePageSize) ctx.History.SuccessCount = count return err }, }
View Source
var UpstreamJobs = []job.Job{ SyncCheckStatuses, PullUpstreamCanaries, ReconcileChecks, }
Functions ¶
func ScanCanaryConfigs ¶
func StartScanCanaryConfigs ¶
func StartUpstreamEventQueueConsumer ¶ added in v1.0.69
func SyncCanaryJob ¶
TODO: Refactor to use database object instead of kubernetes
func Unschedule ¶ added in v1.0.197
func Unschedule(id string)
Types ¶
type CanaryJob ¶
func (CanaryJob) GetNamespacedName ¶
func (j CanaryJob) GetNamespacedName() types.NamespacedName
type CanaryPullResponse ¶ added in v1.0.36
type CanaryStatusPayload ¶ added in v0.38.213
type RelatableCheck ¶ added in v1.0.129
type RelatableCheck interface {
GetRelationship() *v1.CheckRelationship
}
Click to show internal directories.
Click to hide internal directories.