Documentation ¶
Index ¶
- Constants
- func Run(config Configuration) error
- func SchedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time) (*schedulerobjects.JobSchedulingInfo, error)
- type Configuration
- type ControlPlaneEventsInstructionConverter
- type DbOperation
- type DbOperationsWithMessageIds
- type DeleteExecutorSettings
- type ExecutorSettingsDelete
- type ExecutorSettingsUpsert
- type InsertJobRunErrors
- type InsertJobs
- type InsertPartitionMarker
- type InsertRuns
- type JobQueuedStateUpdate
- type JobReprioritiseKey
- type JobRunDetails
- type JobRunFailed
- type JobSchedulingInfoUpdate
- type JobSetCancelAction
- type JobSetEventsInstructionConverter
- type JobSetKey
- type MarkJobSetsCancelRequested
- type MarkJobsCancelRequested
- type MarkJobsCancelled
- type MarkJobsFailed
- type MarkJobsSucceeded
- type MarkJobsValidated
- type MarkRunsFailed
- type MarkRunsForJobPreemptRequested
- type MarkRunsPending
- type MarkRunsPreempted
- type MarkRunsRunning
- type MarkRunsSucceeded
- type Operation
- type SchedulerDb
- type UpdateJobPriorities
- type UpdateJobQueuedState
- type UpdateJobSchedulingInfo
- type UpdateJobSetPriorities
- type UpsertExecutorSettings
Constants ¶
const ( InvalidLockKey = -1 JobSetEventsLockKey = 8741339439634283896 )
Variables ¶
This section is empty.
Functions ¶
func Run ¶
func Run(config Configuration) error
Run will create a pipeline that will take Armada event messages from Pulsar and update the schedulerDb. This pipeline will run until a SIGTERM is received.
func SchedulingInfoFromSubmitJob ¶ added in v0.3.90
func SchedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time) (*schedulerobjects.JobSchedulingInfo, error)
SchedulingInfoFromSubmitJob returns a minimal representation of a job containing only the info needed by the scheduler.
Types ¶
type Configuration ¶
type Configuration struct { // Database configuration Postgres configuration.PostgresConfig // Metrics Port MetricsPort uint16 // General Pulsar configuration Pulsar commonconfig.PulsarConfig // Pulsar subscription name SubscriptionName string // Number of event messages that will be batched together before being inserted into the database BatchSize int // Maximum time since the last batch before a batch will be inserted into the database BatchDuration time.Duration // If non-nil, configures pprof profiling Profiling *profilingconfig.ProfilingConfig }
func (Configuration) Validate ¶ added in v0.12.1
func (c Configuration) Validate() error
type ControlPlaneEventsInstructionConverter ¶ added in v0.15.0
type ControlPlaneEventsInstructionConverter struct {
// contains filtered or unexported fields
}
func NewControlPlaneEventsInstructionConverter ¶ added in v0.15.0
func NewControlPlaneEventsInstructionConverter( metrics *metrics.Metrics, ) (*ControlPlaneEventsInstructionConverter, error)
func (*ControlPlaneEventsInstructionConverter) Convert ¶ added in v0.15.0
func (c *ControlPlaneEventsInstructionConverter) Convert(ctx *armadacontext.Context, controlPlaneEvents *utils.EventsWithIds[*controlplaneevents.Event]) *DbOperationsWithMessageIds
type DbOperation ¶
type DbOperation interface { // a.Merge(b) attempts to merge b into a, creating a single combined op. // Returns true if merging was successful. // If successful, modifies a in-place. // If not successful, neither op is mutated. Merge(DbOperation) bool // a.CanBeAppliedBefore(b) returns true if a can be placed before b // without changing the end result of the overall set of operations. CanBeAppliedBefore(DbOperation) bool // GetOperation returns the Operation/grouping that this DbOperation belongs to. GetOperation() Operation }
DbOperation captures a generic batch database operation.
There are 5 types of operations: - Insert jobs (i.e., add new jobs to the schedulerdb). - Insert runs (i.e., add new runs to the schedulerdb). - Job set operations (i.e., modify all jobs and runs in the schedulerdb part of a given job set). - Job operations (i.e., modify particular jobs). - Job run operations (i.e., modify particular runs). - Control Plane Operations (i.e., upsert settings for a given executor)
To improve performance, several ops can be merged into a single op if of the same type. To increase the number of ops that can be merged, ops can sometimes be reordered.
Specifically, an op can be applied before another if: - Insert jobs: if prior op doesn't affect the job set. - Insert runs: if prior op doesn't affect the job set or defines the corresponding job. - Job set operations: if not affecting a job defined in prior op. - Job operations: if not affecting a job defined in a prior op. - Job run operations: if not affecting a run defined in a prior op. - Control Plane Operations: if not conflicting with prior op.
In addition, UpdateJobPriorities can never be applied beforee UpdateJobSetPriorities and vice versa, since one may overwrite values set by the other.
func AppendDbOperation ¶
func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation
AppendDbOperation appends a sql operation, possibly merging it with a previous operation if that can be done in such a way that the end result of applying the entire sequence of operations is unchanged.
type DbOperationsWithMessageIds ¶
type DbOperationsWithMessageIds struct { Ops []DbOperation MessageIds []pulsar.MessageID }
DbOperationsWithMessageIds bundles a sequence of schedulerdb ops with the ids of all Pulsar messages that were consumed to produce it.
func (*DbOperationsWithMessageIds) GetMessageIDs ¶
func (d *DbOperationsWithMessageIds) GetMessageIDs() []pulsar.MessageID
type DeleteExecutorSettings ¶ added in v0.15.0
type DeleteExecutorSettings map[string]*ExecutorSettingsDelete
func (DeleteExecutorSettings) CanBeAppliedBefore ¶ added in v0.15.0
func (a DeleteExecutorSettings) CanBeAppliedBefore(b DbOperation) bool
Can be applied before another operation only if it relates to a different executor
func (DeleteExecutorSettings) GetOperation ¶ added in v0.15.0
func (a DeleteExecutorSettings) GetOperation() Operation
func (DeleteExecutorSettings) Merge ¶ added in v0.15.0
func (a DeleteExecutorSettings) Merge(_ DbOperation) bool
type ExecutorSettingsDelete ¶ added in v0.15.0
type ExecutorSettingsDelete struct {
ExecutorID string
}
type ExecutorSettingsUpsert ¶ added in v0.15.0
type InsertJobRunErrors ¶ added in v0.3.47
type InsertJobRunErrors map[string]*schedulerdb.JobRunError
func (InsertJobRunErrors) CanBeAppliedBefore ¶ added in v0.3.47
func (a InsertJobRunErrors) CanBeAppliedBefore(_ DbOperation) bool
func (InsertJobRunErrors) GetOperation ¶ added in v0.15.0
func (a InsertJobRunErrors) GetOperation() Operation
func (InsertJobRunErrors) Merge ¶ added in v0.3.47
func (a InsertJobRunErrors) Merge(b DbOperation) bool
type InsertJobs ¶
type InsertJobs map[string]*schedulerdb.Job
func (InsertJobs) CanBeAppliedBefore ¶
func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool
func (InsertJobs) GetOperation ¶ added in v0.15.0
func (a InsertJobs) GetOperation() Operation
func (InsertJobs) Merge ¶
func (a InsertJobs) Merge(b DbOperation) bool
type InsertPartitionMarker ¶ added in v0.3.48
type InsertPartitionMarker struct {
// contains filtered or unexported fields
}
func (*InsertPartitionMarker) CanBeAppliedBefore ¶ added in v0.3.48
func (a *InsertPartitionMarker) CanBeAppliedBefore(b DbOperation) bool
func (*InsertPartitionMarker) GetOperation ¶ added in v0.15.0
func (a *InsertPartitionMarker) GetOperation() Operation
func (*InsertPartitionMarker) Merge ¶ added in v0.3.48
func (a *InsertPartitionMarker) Merge(b DbOperation) bool
type InsertRuns ¶
type InsertRuns map[string]*JobRunDetails
func (InsertRuns) CanBeAppliedBefore ¶
func (a InsertRuns) CanBeAppliedBefore(b DbOperation) bool
func (InsertRuns) GetOperation ¶ added in v0.15.0
func (a InsertRuns) GetOperation() Operation
func (InsertRuns) Merge ¶
func (a InsertRuns) Merge(b DbOperation) bool
type JobQueuedStateUpdate ¶ added in v0.3.63
type JobReprioritiseKey ¶ added in v0.4.52
type JobRunDetails ¶ added in v0.3.68
type JobRunDetails struct { Queue string DbRun *schedulerdb.Run }
type JobRunFailed ¶ added in v0.3.47
type JobSchedulingInfoUpdate ¶ added in v0.3.63
type JobSetCancelAction ¶ added in v0.3.68
type JobSetCancelAction struct {
// contains filtered or unexported fields
}
type JobSetEventsInstructionConverter ¶ added in v0.15.0
type JobSetEventsInstructionConverter struct {
// contains filtered or unexported fields
}
func NewJobSetEventsInstructionConverter ¶ added in v0.15.0
func NewJobSetEventsInstructionConverter( metrics *metrics.Metrics, ) (*JobSetEventsInstructionConverter, error)
func (*JobSetEventsInstructionConverter) Convert ¶ added in v0.15.0
func (c *JobSetEventsInstructionConverter) Convert(ctx *armadacontext.Context, eventsWithIds *utils.EventsWithIds[*armadaevents.EventSequence]) *DbOperationsWithMessageIds
type JobSetKey ¶ added in v0.3.68
type JobSetKey struct {
// contains filtered or unexported fields
}
type MarkJobSetsCancelRequested ¶ added in v0.3.47
type MarkJobSetsCancelRequested map[JobSetKey]*JobSetCancelAction
func (MarkJobSetsCancelRequested) AffectsJobSet ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) AffectsJobSet(queue string, jobSet string) bool
func (MarkJobSetsCancelRequested) CanBeAppliedBefore ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobSetsCancelRequested) GetOperation ¶ added in v0.15.0
func (a MarkJobSetsCancelRequested) GetOperation() Operation
func (MarkJobSetsCancelRequested) Merge ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) Merge(b DbOperation) bool
type MarkJobsCancelRequested ¶ added in v0.3.47
func (MarkJobsCancelRequested) CanBeAppliedBefore ¶ added in v0.3.47
func (a MarkJobsCancelRequested) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsCancelRequested) GetOperation ¶ added in v0.15.0
func (a MarkJobsCancelRequested) GetOperation() Operation
func (MarkJobsCancelRequested) Merge ¶ added in v0.3.47
func (a MarkJobsCancelRequested) Merge(b DbOperation) bool
type MarkJobsCancelled ¶
func (MarkJobsCancelled) CanBeAppliedBefore ¶
func (a MarkJobsCancelled) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsCancelled) GetOperation ¶ added in v0.15.0
func (a MarkJobsCancelled) GetOperation() Operation
func (MarkJobsCancelled) Merge ¶
func (a MarkJobsCancelled) Merge(b DbOperation) bool
type MarkJobsFailed ¶
func (MarkJobsFailed) CanBeAppliedBefore ¶
func (a MarkJobsFailed) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsFailed) GetOperation ¶ added in v0.15.0
func (a MarkJobsFailed) GetOperation() Operation
func (MarkJobsFailed) Merge ¶
func (a MarkJobsFailed) Merge(b DbOperation) bool
type MarkJobsSucceeded ¶
func (MarkJobsSucceeded) CanBeAppliedBefore ¶
func (a MarkJobsSucceeded) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsSucceeded) GetOperation ¶ added in v0.15.0
func (a MarkJobsSucceeded) GetOperation() Operation
func (MarkJobsSucceeded) Merge ¶
func (a MarkJobsSucceeded) Merge(b DbOperation) bool
type MarkJobsValidated ¶ added in v0.4.50
func (MarkJobsValidated) CanBeAppliedBefore ¶ added in v0.4.50
func (a MarkJobsValidated) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsValidated) GetOperation ¶ added in v0.15.0
func (a MarkJobsValidated) GetOperation() Operation
func (MarkJobsValidated) Merge ¶ added in v0.4.50
func (a MarkJobsValidated) Merge(b DbOperation) bool
type MarkRunsFailed ¶
type MarkRunsFailed map[string]*JobRunFailed
func (MarkRunsFailed) CanBeAppliedBefore ¶
func (a MarkRunsFailed) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsFailed) GetOperation ¶ added in v0.15.0
func (a MarkRunsFailed) GetOperation() Operation
func (MarkRunsFailed) Merge ¶
func (a MarkRunsFailed) Merge(b DbOperation) bool
type MarkRunsForJobPreemptRequested ¶ added in v0.4.41
func (MarkRunsForJobPreemptRequested) CanBeAppliedBefore ¶ added in v0.4.41
func (a MarkRunsForJobPreemptRequested) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsForJobPreemptRequested) GetOperation ¶ added in v0.15.0
func (a MarkRunsForJobPreemptRequested) GetOperation() Operation
func (MarkRunsForJobPreemptRequested) Merge ¶ added in v0.4.41
func (a MarkRunsForJobPreemptRequested) Merge(b DbOperation) bool
type MarkRunsPending ¶ added in v0.4.11
func (MarkRunsPending) CanBeAppliedBefore ¶ added in v0.4.11
func (a MarkRunsPending) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsPending) GetOperation ¶ added in v0.15.0
func (a MarkRunsPending) GetOperation() Operation
func (MarkRunsPending) Merge ¶ added in v0.4.11
func (a MarkRunsPending) Merge(b DbOperation) bool
type MarkRunsPreempted ¶ added in v0.4.11
func (MarkRunsPreempted) CanBeAppliedBefore ¶ added in v0.4.11
func (a MarkRunsPreempted) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsPreempted) GetOperation ¶ added in v0.15.0
func (a MarkRunsPreempted) GetOperation() Operation
func (MarkRunsPreempted) Merge ¶ added in v0.4.11
func (a MarkRunsPreempted) Merge(b DbOperation) bool
type MarkRunsRunning ¶
func (MarkRunsRunning) CanBeAppliedBefore ¶
func (a MarkRunsRunning) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsRunning) GetOperation ¶ added in v0.15.0
func (a MarkRunsRunning) GetOperation() Operation
func (MarkRunsRunning) Merge ¶
func (a MarkRunsRunning) Merge(b DbOperation) bool
type MarkRunsSucceeded ¶
func (MarkRunsSucceeded) CanBeAppliedBefore ¶
func (a MarkRunsSucceeded) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsSucceeded) GetOperation ¶ added in v0.15.0
func (a MarkRunsSucceeded) GetOperation() Operation
func (MarkRunsSucceeded) Merge ¶
func (a MarkRunsSucceeded) Merge(b DbOperation) bool
type SchedulerDb ¶
type SchedulerDb struct {
// contains filtered or unexported fields
}
SchedulerDb writes DbOperations into postgres.
func NewSchedulerDb ¶
func (*SchedulerDb) Store ¶
func (s *SchedulerDb) Store(ctx *armadacontext.Context, instructions *DbOperationsWithMessageIds) error
Store persists all operations in the database. This function retires until it either succeeds or encounters a terminal error. This function locks the postgres table to avoid write conflicts; see acquireLock() for details.
func (*SchedulerDb) WriteDbOp ¶
func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOperation) error
type UpdateJobPriorities ¶
type UpdateJobPriorities struct {
// contains filtered or unexported fields
}
func (*UpdateJobPriorities) CanBeAppliedBefore ¶
func (a *UpdateJobPriorities) CanBeAppliedBefore(b DbOperation) bool
func (*UpdateJobPriorities) GetOperation ¶ added in v0.15.0
func (a *UpdateJobPriorities) GetOperation() Operation
func (*UpdateJobPriorities) Merge ¶
func (a *UpdateJobPriorities) Merge(b DbOperation) bool
type UpdateJobQueuedState ¶ added in v0.3.63
type UpdateJobQueuedState map[string]*JobQueuedStateUpdate
func (UpdateJobQueuedState) CanBeAppliedBefore ¶ added in v0.3.63
func (a UpdateJobQueuedState) CanBeAppliedBefore(b DbOperation) bool
func (UpdateJobQueuedState) GetOperation ¶ added in v0.15.0
func (a UpdateJobQueuedState) GetOperation() Operation
func (UpdateJobQueuedState) Merge ¶ added in v0.3.63
func (a UpdateJobQueuedState) Merge(b DbOperation) bool
type UpdateJobSchedulingInfo ¶ added in v0.3.63
type UpdateJobSchedulingInfo map[string]*JobSchedulingInfoUpdate
func (UpdateJobSchedulingInfo) CanBeAppliedBefore ¶ added in v0.3.63
func (a UpdateJobSchedulingInfo) CanBeAppliedBefore(b DbOperation) bool
func (UpdateJobSchedulingInfo) GetOperation ¶ added in v0.15.0
func (a UpdateJobSchedulingInfo) GetOperation() Operation
func (UpdateJobSchedulingInfo) Merge ¶ added in v0.3.63
func (a UpdateJobSchedulingInfo) Merge(b DbOperation) bool
type UpdateJobSetPriorities ¶
func (UpdateJobSetPriorities) AffectsJobSet ¶
func (a UpdateJobSetPriorities) AffectsJobSet(queue string, jobSet string) bool
func (UpdateJobSetPriorities) CanBeAppliedBefore ¶
func (a UpdateJobSetPriorities) CanBeAppliedBefore(b DbOperation) bool
func (UpdateJobSetPriorities) GetOperation ¶ added in v0.15.0
func (a UpdateJobSetPriorities) GetOperation() Operation
func (UpdateJobSetPriorities) Merge ¶
func (a UpdateJobSetPriorities) Merge(b DbOperation) bool
type UpsertExecutorSettings ¶ added in v0.15.0
type UpsertExecutorSettings map[string]*ExecutorSettingsUpsert
func (UpsertExecutorSettings) CanBeAppliedBefore ¶ added in v0.15.0
func (a UpsertExecutorSettings) CanBeAppliedBefore(b DbOperation) bool
Can be applied before another operation only if it relates to a different executor
func (UpsertExecutorSettings) GetOperation ¶ added in v0.15.0
func (a UpsertExecutorSettings) GetOperation() Operation
func (UpsertExecutorSettings) Merge ¶ added in v0.15.0
func (a UpsertExecutorSettings) Merge(_ DbOperation) bool