scheduleringester

package
v0.16.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InvalidLockKey      = -1
	JobSetEventsLockKey = 8741339439634283896
)

Variables

This section is empty.

Functions

func Run

func Run(config Configuration) error

Run will create a pipeline that will take Armada event messages from Pulsar and update the schedulerDb. This pipeline will run until a SIGTERM is received.

func SchedulingInfoFromSubmitJob added in v0.3.90

func SchedulingInfoFromSubmitJob(submitJob *armadaevents.SubmitJob, submitTime time.Time) (*schedulerobjects.JobSchedulingInfo, error)

SchedulingInfoFromSubmitJob returns a minimal representation of a job containing only the info needed by the scheduler.

Types

type CancelExecutor added in v0.15.4

type CancelExecutor map[string]*CancelOnExecutor

func (CancelExecutor) CanBeAppliedBefore added in v0.15.4

func (ce CancelExecutor) CanBeAppliedBefore(b DbOperation) bool

func (CancelExecutor) GetOperation added in v0.15.4

func (ce CancelExecutor) GetOperation() Operation

func (CancelExecutor) Merge added in v0.15.4

func (ce CancelExecutor) Merge(_ DbOperation) bool

type CancelOnExecutor added in v0.15.4

type CancelOnExecutor struct {
	Name            string
	Queues          []string
	PriorityClasses []string
}

type CancelOnQueue added in v0.15.4

type CancelOnQueue struct {
	Name            string
	PriorityClasses []string
	JobStates       []controlplaneevents.ActiveJobState
}

type CancelQueue added in v0.15.4

type CancelQueue map[string]*CancelOnQueue

func (CancelQueue) CanBeAppliedBefore added in v0.15.4

func (cq CancelQueue) CanBeAppliedBefore(b DbOperation) bool

func (CancelQueue) GetOperation added in v0.15.4

func (cq CancelQueue) GetOperation() Operation

func (CancelQueue) Merge added in v0.15.4

func (cq CancelQueue) Merge(_ DbOperation) bool

type Configuration

type Configuration struct {
	// Database configuration
	Postgres configuration.PostgresConfig
	// Metrics Port
	MetricsPort uint16
	// General Pulsar configuration
	Pulsar commonconfig.PulsarConfig
	// Pulsar subscription name
	SubscriptionName string
	// Number of event messages that will be batched together before being inserted into the database
	BatchSize int
	// Maximum time since the last batch before a batch will be inserted into the database
	BatchDuration time.Duration
	// If non-nil, configures pprof profiling
	Profiling *profilingconfig.ProfilingConfig
}

func (Configuration) Validate added in v0.12.1

func (c Configuration) Validate() error

type ControlPlaneEventsInstructionConverter added in v0.15.0

type ControlPlaneEventsInstructionConverter struct {
	// contains filtered or unexported fields
}

func NewControlPlaneEventsInstructionConverter added in v0.15.0

func NewControlPlaneEventsInstructionConverter(
	metrics *metrics.Metrics,
) (*ControlPlaneEventsInstructionConverter, error)

func (*ControlPlaneEventsInstructionConverter) Convert added in v0.15.0

type DbOperation

type DbOperation interface {
	// a.Merge(b) attempts to merge b into a, creating a single combined op.
	// Returns true if merging was successful.
	// If successful, modifies a in-place.
	// If not successful, neither op is mutated.
	Merge(DbOperation) bool
	// a.CanBeAppliedBefore(b) returns true if a can be placed before b
	// without changing the end result of the overall set of operations.
	CanBeAppliedBefore(DbOperation) bool
	// GetOperation returns the Operation/grouping that this DbOperation belongs to.
	GetOperation() Operation
}

DbOperation captures a generic batch database operation.

There are 5 types of operations: - Insert jobs (i.e., add new jobs to the schedulerdb). - Insert runs (i.e., add new runs to the schedulerdb). - Job set operations (i.e., modify all jobs and runs in the schedulerdb part of a given job set). - Job operations (i.e., modify particular jobs). - Job run operations (i.e., modify particular runs). - Control Plane Operations (i.e., upsert settings for a given executor)

To improve performance, several ops can be merged into a single op if of the same type. To increase the number of ops that can be merged, ops can sometimes be reordered.

Specifically, an op can be applied before another if: - Insert jobs: if prior op doesn't affect the job set. - Insert runs: if prior op doesn't affect the job set or defines the corresponding job. - Job set operations: if not affecting a job defined in prior op. - Job operations: if not affecting a job defined in a prior op. - Job run operations: if not affecting a run defined in a prior op. - Control Plane Operations: if not conflicting with prior op.

In addition, UpdateJobPriorities can never be applied beforee UpdateJobSetPriorities and vice versa, since one may overwrite values set by the other.

func AppendDbOperation

func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation

AppendDbOperation appends a sql operation, possibly merging it with a previous operation if that can be done in such a way that the end result of applying the entire sequence of operations is unchanged.

type DbOperationsWithMessageIds

type DbOperationsWithMessageIds struct {
	Ops        []DbOperation
	MessageIds []pulsar.MessageID
}

DbOperationsWithMessageIds bundles a sequence of schedulerdb ops with the ids of all Pulsar messages that were consumed to produce it.

func (*DbOperationsWithMessageIds) GetMessageIDs

func (d *DbOperationsWithMessageIds) GetMessageIDs() []pulsar.MessageID

type DeleteExecutorSettings added in v0.15.0

type DeleteExecutorSettings map[string]*ExecutorSettingsDelete

func (DeleteExecutorSettings) CanBeAppliedBefore added in v0.15.0

func (a DeleteExecutorSettings) CanBeAppliedBefore(b DbOperation) bool

Can be applied before another operation only if it relates to a different executor

func (DeleteExecutorSettings) GetOperation added in v0.15.0

func (a DeleteExecutorSettings) GetOperation() Operation

func (DeleteExecutorSettings) Merge added in v0.15.0

type ExecutorSettingsDelete added in v0.15.0

type ExecutorSettingsDelete struct {
	ExecutorID string
}

type ExecutorSettingsUpsert added in v0.15.0

type ExecutorSettingsUpsert struct {
	ExecutorID   string
	Cordoned     bool
	CordonReason string
	SetByUser    string
	SetAtTime    time.Time
}

type InsertJobRunErrors added in v0.3.47

type InsertJobRunErrors map[string]*schedulerdb.JobRunError

func (InsertJobRunErrors) CanBeAppliedBefore added in v0.3.47

func (a InsertJobRunErrors) CanBeAppliedBefore(_ DbOperation) bool

func (InsertJobRunErrors) GetOperation added in v0.15.0

func (a InsertJobRunErrors) GetOperation() Operation

func (InsertJobRunErrors) Merge added in v0.3.47

func (a InsertJobRunErrors) Merge(b DbOperation) bool

type InsertJobs

type InsertJobs map[string]*schedulerdb.Job

func (InsertJobs) CanBeAppliedBefore

func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool

func (InsertJobs) GetOperation added in v0.15.0

func (a InsertJobs) GetOperation() Operation

func (InsertJobs) Merge

func (a InsertJobs) Merge(b DbOperation) bool

type InsertPartitionMarker added in v0.3.48

type InsertPartitionMarker struct {
	// contains filtered or unexported fields
}

func (*InsertPartitionMarker) CanBeAppliedBefore added in v0.3.48

func (a *InsertPartitionMarker) CanBeAppliedBefore(b DbOperation) bool

func (*InsertPartitionMarker) GetOperation added in v0.15.0

func (a *InsertPartitionMarker) GetOperation() Operation

func (*InsertPartitionMarker) Merge added in v0.3.48

type InsertRuns

type InsertRuns map[string]*JobRunDetails

func (InsertRuns) CanBeAppliedBefore

func (a InsertRuns) CanBeAppliedBefore(b DbOperation) bool

func (InsertRuns) GetOperation added in v0.15.0

func (a InsertRuns) GetOperation() Operation

func (InsertRuns) Merge

func (a InsertRuns) Merge(b DbOperation) bool

type JobQueuedStateUpdate added in v0.3.63

type JobQueuedStateUpdate struct {
	Queued             bool
	QueuedStateVersion int32
}

type JobReprioritiseKey added in v0.4.52

type JobReprioritiseKey struct {
	JobSetKey
	Priority int64
}

type JobRunDetails added in v0.3.68

type JobRunDetails struct {
	Queue string
	DbRun *schedulerdb.Run
}

type JobRunFailed added in v0.3.47

type JobRunFailed struct {
	LeaseReturned bool
	RunAttempted  bool
	FailureTime   time.Time
}

type JobSchedulingInfoUpdate added in v0.3.63

type JobSchedulingInfoUpdate struct {
	JobSchedulingInfo        []byte
	JobSchedulingInfoVersion int32
}

type JobSetCancelAction added in v0.3.68

type JobSetCancelAction struct {
	// contains filtered or unexported fields
}

type JobSetEventsInstructionConverter added in v0.15.0

type JobSetEventsInstructionConverter struct {
	// contains filtered or unexported fields
}

func NewJobSetEventsInstructionConverter added in v0.15.0

func NewJobSetEventsInstructionConverter(
	metrics *metrics.Metrics,
) (*JobSetEventsInstructionConverter, error)

func (*JobSetEventsInstructionConverter) Convert added in v0.15.0

type JobSetKey added in v0.3.68

type JobSetKey struct {
	// contains filtered or unexported fields
}

type MarkJobSetsCancelRequested added in v0.3.47

type MarkJobSetsCancelRequested map[JobSetKey]*JobSetCancelAction

func (MarkJobSetsCancelRequested) AffectsJobSet added in v0.3.47

func (a MarkJobSetsCancelRequested) AffectsJobSet(queue string, jobSet string) bool

func (MarkJobSetsCancelRequested) CanBeAppliedBefore added in v0.3.47

func (a MarkJobSetsCancelRequested) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobSetsCancelRequested) GetOperation added in v0.15.0

func (a MarkJobSetsCancelRequested) GetOperation() Operation

func (MarkJobSetsCancelRequested) Merge added in v0.3.47

type MarkJobsCancelRequested added in v0.3.47

type MarkJobsCancelRequested map[JobSetKey][]string

func (MarkJobsCancelRequested) CanBeAppliedBefore added in v0.3.47

func (a MarkJobsCancelRequested) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsCancelRequested) GetOperation added in v0.15.0

func (a MarkJobsCancelRequested) GetOperation() Operation

func (MarkJobsCancelRequested) Merge added in v0.3.47

type MarkJobsCancelled

type MarkJobsCancelled map[string]time.Time

func (MarkJobsCancelled) CanBeAppliedBefore

func (a MarkJobsCancelled) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsCancelled) GetOperation added in v0.15.0

func (a MarkJobsCancelled) GetOperation() Operation

func (MarkJobsCancelled) Merge

func (a MarkJobsCancelled) Merge(b DbOperation) bool

type MarkJobsFailed

type MarkJobsFailed map[string]bool

func (MarkJobsFailed) CanBeAppliedBefore

func (a MarkJobsFailed) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsFailed) GetOperation added in v0.15.0

func (a MarkJobsFailed) GetOperation() Operation

func (MarkJobsFailed) Merge

func (a MarkJobsFailed) Merge(b DbOperation) bool

type MarkJobsSucceeded

type MarkJobsSucceeded map[string]bool

func (MarkJobsSucceeded) CanBeAppliedBefore

func (a MarkJobsSucceeded) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsSucceeded) GetOperation added in v0.15.0

func (a MarkJobsSucceeded) GetOperation() Operation

func (MarkJobsSucceeded) Merge

func (a MarkJobsSucceeded) Merge(b DbOperation) bool

type MarkJobsValidated added in v0.4.50

type MarkJobsValidated map[string][]string

func (MarkJobsValidated) CanBeAppliedBefore added in v0.4.50

func (a MarkJobsValidated) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsValidated) GetOperation added in v0.15.0

func (a MarkJobsValidated) GetOperation() Operation

func (MarkJobsValidated) Merge added in v0.4.50

func (a MarkJobsValidated) Merge(b DbOperation) bool

type MarkRunsFailed

type MarkRunsFailed map[string]*JobRunFailed

func (MarkRunsFailed) CanBeAppliedBefore

func (a MarkRunsFailed) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsFailed) GetOperation added in v0.15.0

func (a MarkRunsFailed) GetOperation() Operation

func (MarkRunsFailed) Merge

func (a MarkRunsFailed) Merge(b DbOperation) bool

type MarkRunsForJobPreemptRequested added in v0.4.41

type MarkRunsForJobPreemptRequested map[JobSetKey][]string

func (MarkRunsForJobPreemptRequested) CanBeAppliedBefore added in v0.4.41

func (a MarkRunsForJobPreemptRequested) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsForJobPreemptRequested) GetOperation added in v0.15.0

func (a MarkRunsForJobPreemptRequested) GetOperation() Operation

func (MarkRunsForJobPreemptRequested) Merge added in v0.4.41

type MarkRunsPending added in v0.4.11

type MarkRunsPending map[string]time.Time

func (MarkRunsPending) CanBeAppliedBefore added in v0.4.11

func (a MarkRunsPending) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsPending) GetOperation added in v0.15.0

func (a MarkRunsPending) GetOperation() Operation

func (MarkRunsPending) Merge added in v0.4.11

func (a MarkRunsPending) Merge(b DbOperation) bool

type MarkRunsPreempted added in v0.4.11

type MarkRunsPreempted map[string]time.Time

func (MarkRunsPreempted) CanBeAppliedBefore added in v0.4.11

func (a MarkRunsPreempted) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsPreempted) GetOperation added in v0.15.0

func (a MarkRunsPreempted) GetOperation() Operation

func (MarkRunsPreempted) Merge added in v0.4.11

func (a MarkRunsPreempted) Merge(b DbOperation) bool

type MarkRunsRunning

type MarkRunsRunning map[string]time.Time

func (MarkRunsRunning) CanBeAppliedBefore

func (a MarkRunsRunning) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsRunning) GetOperation added in v0.15.0

func (a MarkRunsRunning) GetOperation() Operation

func (MarkRunsRunning) Merge

func (a MarkRunsRunning) Merge(b DbOperation) bool

type MarkRunsSucceeded

type MarkRunsSucceeded map[string]time.Time

func (MarkRunsSucceeded) CanBeAppliedBefore

func (a MarkRunsSucceeded) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsSucceeded) GetOperation added in v0.15.0

func (a MarkRunsSucceeded) GetOperation() Operation

func (MarkRunsSucceeded) Merge

func (a MarkRunsSucceeded) Merge(b DbOperation) bool

type Operation added in v0.15.0

type Operation int
const (
	JobSetOperation       Operation = iota
	ControlPlaneOperation Operation = iota
)

type PreemptExecutor added in v0.15.4

type PreemptExecutor map[string]*PreemptOnExecutor

func (PreemptExecutor) CanBeAppliedBefore added in v0.15.4

func (pe PreemptExecutor) CanBeAppliedBefore(b DbOperation) bool

func (PreemptExecutor) GetOperation added in v0.15.4

func (pe PreemptExecutor) GetOperation() Operation

func (PreemptExecutor) Merge added in v0.15.4

func (pe PreemptExecutor) Merge(_ DbOperation) bool

type PreemptOnExecutor added in v0.15.4

type PreemptOnExecutor struct {
	Name            string
	Queues          []string
	PriorityClasses []string
}

type PreemptOnQueue added in v0.15.4

type PreemptOnQueue struct {
	Name            string
	PriorityClasses []string
}

type PreemptQueue added in v0.15.4

type PreemptQueue map[string]*PreemptOnQueue

func (PreemptQueue) CanBeAppliedBefore added in v0.15.4

func (pq PreemptQueue) CanBeAppliedBefore(b DbOperation) bool

func (PreemptQueue) GetOperation added in v0.15.4

func (pq PreemptQueue) GetOperation() Operation

func (PreemptQueue) Merge added in v0.15.4

func (pq PreemptQueue) Merge(_ DbOperation) bool

type SchedulerDb

type SchedulerDb struct {
	// contains filtered or unexported fields
}

SchedulerDb writes DbOperations into postgres.

func NewSchedulerDb

func NewSchedulerDb(
	db *pgxpool.Pool,
	metrics *metrics.Metrics,
	initialBackOff time.Duration,
	maxBackOff time.Duration,
	lockTimeout time.Duration,
) *SchedulerDb

func (*SchedulerDb) Store

func (s *SchedulerDb) Store(ctx *armadacontext.Context, instructions *DbOperationsWithMessageIds) error

Store persists all operations in the database. This function retires until it either succeeds or encounters a terminal error. This function locks the postgres table to avoid write conflicts; see acquireLock() for details.

func (*SchedulerDb) WriteDbOp

func (s *SchedulerDb) WriteDbOp(ctx *armadacontext.Context, tx pgx.Tx, op DbOperation) error

type UpdateJobPriorities

type UpdateJobPriorities struct {
	// contains filtered or unexported fields
}

func (*UpdateJobPriorities) CanBeAppliedBefore

func (a *UpdateJobPriorities) CanBeAppliedBefore(b DbOperation) bool

func (*UpdateJobPriorities) GetOperation added in v0.15.0

func (a *UpdateJobPriorities) GetOperation() Operation

func (*UpdateJobPriorities) Merge

func (a *UpdateJobPriorities) Merge(b DbOperation) bool

type UpdateJobQueuedState added in v0.3.63

type UpdateJobQueuedState map[string]*JobQueuedStateUpdate

func (UpdateJobQueuedState) CanBeAppliedBefore added in v0.3.63

func (a UpdateJobQueuedState) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobQueuedState) GetOperation added in v0.15.0

func (a UpdateJobQueuedState) GetOperation() Operation

func (UpdateJobQueuedState) Merge added in v0.3.63

type UpdateJobSchedulingInfo added in v0.3.63

type UpdateJobSchedulingInfo map[string]*JobSchedulingInfoUpdate

func (UpdateJobSchedulingInfo) CanBeAppliedBefore added in v0.3.63

func (a UpdateJobSchedulingInfo) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobSchedulingInfo) GetOperation added in v0.15.0

func (a UpdateJobSchedulingInfo) GetOperation() Operation

func (UpdateJobSchedulingInfo) Merge added in v0.3.63

type UpdateJobSetPriorities

type UpdateJobSetPriorities map[JobSetKey]int64

func (UpdateJobSetPriorities) AffectsJobSet

func (a UpdateJobSetPriorities) AffectsJobSet(queue string, jobSet string) bool

func (UpdateJobSetPriorities) CanBeAppliedBefore

func (a UpdateJobSetPriorities) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobSetPriorities) GetOperation added in v0.15.0

func (a UpdateJobSetPriorities) GetOperation() Operation

func (UpdateJobSetPriorities) Merge

type UpsertExecutorSettings added in v0.15.0

type UpsertExecutorSettings map[string]*ExecutorSettingsUpsert

func (UpsertExecutorSettings) CanBeAppliedBefore added in v0.15.0

func (a UpsertExecutorSettings) CanBeAppliedBefore(b DbOperation) bool

Can be applied before another operation only if it relates to a different executor

func (UpsertExecutorSettings) GetOperation added in v0.15.0

func (a UpsertExecutorSettings) GetOperation() Operation

func (UpsertExecutorSettings) Merge added in v0.15.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL