Documentation ¶
Index ¶
- func NewInstructionConverter(metrics *metrics.Metrics, ...) ingest.InstructionConverter[*DbOperationsWithMessageIds]
- func NewSchedulerDb(db *pgxpool.Pool, metrics *metrics.Metrics, initialBackOff time.Duration, ...) ingest.Sink[*DbOperationsWithMessageIds]
- func Run(config Configuration)
- type Configuration
- type DbOperation
- type DbOperationsWithMessageIds
- type InsertJobRunErrors
- type InsertJobs
- type InsertPartitionMarker
- type InsertRuns
- type InstructionConverter
- type JobRunFailed
- type JobSetOperation
- type MarkJobSetsCancelRequested
- type MarkJobsCancelRequested
- type MarkJobsCancelled
- type MarkJobsFailed
- type MarkJobsSucceeded
- type MarkRunsFailed
- type MarkRunsRunning
- type MarkRunsSucceeded
- type SchedulerDb
- type UpdateJobPriorities
- type UpdateJobSetPriorities
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewInstructionConverter ¶
func NewInstructionConverter( metrics *metrics.Metrics, priorityClasses map[string]configuration.PriorityClass, compressor compress.Compressor, ) ingest.InstructionConverter[*DbOperationsWithMessageIds]
func NewSchedulerDb ¶
func Run ¶
func Run(config Configuration)
Run will create a pipeline that will take Armada event messages from Pulsar and update the Scheduler database accordingly. This pipeline will run until a SIGTERM is received
Types ¶
type Configuration ¶
type Configuration struct { // Database configuration Postgres configuration.PostgresConfig // Metrics configuration Metrics configuration.MetricsConfig // General Pulsar configuration Pulsar configuration.PulsarConfig // Map of allowed priority classes by name PriorityClasses map[string]configuration.PriorityClass // Pulsar subscription name SubscriptionName string // Number of messages that will be batched together before being inserted into the database BatchSize int // Maximum time since the last batch before a batch will be inserted into the database BatchDuration time.Duration // Time for which the pulsar consumer will wait for a new message before retrying PulsarReceiveTimeout time.Duration // Time for which the pulsar consumer will back off after receiving an error on trying to receive a message PulsarBackoffTime time.Duration }
type DbOperation ¶
type DbOperation interface { // a.Merge(b) attempts to merge b into a, creating a single combined op. // Returns true if merging was successful. // If successful, modifies a in-place. // If not successful, neither op is mutated. Merge(DbOperation) bool // a.CanBeAppliedBefore(b) returns true if a can be placed before b // without changing the end result of the overall set of operations. CanBeAppliedBefore(DbOperation) bool }
DbOperation captures a generic batch database operation.
There are 5 types of operations: - Insert jobs (i.e., add new jobs to the schedulerdb). - Insert runs (i.e., add new runs to the schedulerdb). - Job set operations (i.e., modify all jobs and runs in the schedulerdb part of a given job set). - Job operations (i.e., modify particular jobs). - Job run operations (i.e., modify particular runs).
To improve performance, several ops can be merged into a single op if of the same type. To increase the number of ops that can be merged, ops can sometimes be reordered.
Specifically, an op can be applied before another if: - Insert jobs: if prior op doesn't affect the job set. - Insert runs: if prior op doesn't affect the job set or defines the corresponding job. - Job set operations: if not affecting a job defined in prior op. - Job operations: if not affecting a job defined in a prior op. - Job run operations: if not affecting a run defined in a prior op.
In addition, UpdateJobPriorities can never be applied beforee UpdateJobSetPriorities and vice versa, since one may overwrite values set by the other.
func AppendDbOperation ¶
func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation
AppendDbOperation appends a sql operation, possibly merging it with a previous operation if that can be done in such a way that the end result of applying the entire sequence of operations is unchanged.
type DbOperationsWithMessageIds ¶
type DbOperationsWithMessageIds struct { Ops []DbOperation MessageIds []pulsar.MessageID }
DbOperationsWithMessageIds bundles a sequence of schedulerdb ops with the ids of all Pulsar messages that were consumed to produce it.
func (*DbOperationsWithMessageIds) GetMessageIDs ¶
func (d *DbOperationsWithMessageIds) GetMessageIDs() []pulsar.MessageID
type InsertJobRunErrors ¶ added in v0.3.47
type InsertJobRunErrors map[uuid.UUID]*schedulerdb.JobRunError
func (InsertJobRunErrors) CanBeAppliedBefore ¶ added in v0.3.47
func (a InsertJobRunErrors) CanBeAppliedBefore(_ DbOperation) bool
func (InsertJobRunErrors) Merge ¶ added in v0.3.47
func (a InsertJobRunErrors) Merge(b DbOperation) bool
type InsertJobs ¶
type InsertJobs map[string]*schedulerdb.Job
func (InsertJobs) CanBeAppliedBefore ¶
func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool
func (InsertJobs) Merge ¶
func (a InsertJobs) Merge(b DbOperation) bool
type InsertPartitionMarker ¶ added in v0.3.48
type InsertPartitionMarker struct {
// contains filtered or unexported fields
}
func (*InsertPartitionMarker) CanBeAppliedBefore ¶ added in v0.3.48
func (a *InsertPartitionMarker) CanBeAppliedBefore(b DbOperation) bool
func (*InsertPartitionMarker) Merge ¶ added in v0.3.48
func (a *InsertPartitionMarker) Merge(b DbOperation) bool
type InsertRuns ¶
type InsertRuns map[uuid.UUID]*schedulerdb.Run
func (InsertRuns) CanBeAppliedBefore ¶
func (a InsertRuns) CanBeAppliedBefore(b DbOperation) bool
func (InsertRuns) Merge ¶
func (a InsertRuns) Merge(b DbOperation) bool
type InstructionConverter ¶
type InstructionConverter struct {
// contains filtered or unexported fields
}
func (*InstructionConverter) Convert ¶
func (c *InstructionConverter) Convert(_ context.Context, sequencesWithIds *ingest.EventSequencesWithIds) *DbOperationsWithMessageIds
type JobRunFailed ¶ added in v0.3.47
type JobRunFailed struct {
LeaseReturned bool
}
type JobSetOperation ¶
type MarkJobSetsCancelRequested ¶ added in v0.3.47
func (MarkJobSetsCancelRequested) AffectsJobSet ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) AffectsJobSet(jobSet string) bool
func (MarkJobSetsCancelRequested) CanBeAppliedBefore ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobSetsCancelRequested) Merge ¶ added in v0.3.47
func (a MarkJobSetsCancelRequested) Merge(b DbOperation) bool
type MarkJobsCancelRequested ¶ added in v0.3.47
func (MarkJobsCancelRequested) CanBeAppliedBefore ¶ added in v0.3.47
func (a MarkJobsCancelRequested) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsCancelRequested) Merge ¶ added in v0.3.47
func (a MarkJobsCancelRequested) Merge(b DbOperation) bool
type MarkJobsCancelled ¶
func (MarkJobsCancelled) CanBeAppliedBefore ¶
func (a MarkJobsCancelled) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsCancelled) Merge ¶
func (a MarkJobsCancelled) Merge(b DbOperation) bool
type MarkJobsFailed ¶
func (MarkJobsFailed) CanBeAppliedBefore ¶
func (a MarkJobsFailed) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsFailed) Merge ¶
func (a MarkJobsFailed) Merge(b DbOperation) bool
type MarkJobsSucceeded ¶
func (MarkJobsSucceeded) CanBeAppliedBefore ¶
func (a MarkJobsSucceeded) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsSucceeded) Merge ¶
func (a MarkJobsSucceeded) Merge(b DbOperation) bool
type MarkRunsFailed ¶
type MarkRunsFailed map[uuid.UUID]*JobRunFailed
func (MarkRunsFailed) CanBeAppliedBefore ¶
func (a MarkRunsFailed) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsFailed) Merge ¶
func (a MarkRunsFailed) Merge(b DbOperation) bool
type MarkRunsRunning ¶
func (MarkRunsRunning) CanBeAppliedBefore ¶
func (a MarkRunsRunning) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsRunning) Merge ¶
func (a MarkRunsRunning) Merge(b DbOperation) bool
type MarkRunsSucceeded ¶
func (MarkRunsSucceeded) CanBeAppliedBefore ¶
func (a MarkRunsSucceeded) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsSucceeded) Merge ¶
func (a MarkRunsSucceeded) Merge(b DbOperation) bool
type SchedulerDb ¶
type SchedulerDb struct {
// contains filtered or unexported fields
}
SchedulerDb writes DbOperations into postgres.
func (*SchedulerDb) Store ¶
func (s *SchedulerDb) Store(ctx context.Context, instructions *DbOperationsWithMessageIds) error
func (*SchedulerDb) WriteDbOp ¶
func (s *SchedulerDb) WriteDbOp(ctx context.Context, op DbOperation) error
type UpdateJobPriorities ¶
func (UpdateJobPriorities) CanBeAppliedBefore ¶
func (a UpdateJobPriorities) CanBeAppliedBefore(b DbOperation) bool
func (UpdateJobPriorities) Merge ¶
func (a UpdateJobPriorities) Merge(b DbOperation) bool
type UpdateJobSetPriorities ¶
func (UpdateJobSetPriorities) AffectsJobSet ¶
func (a UpdateJobSetPriorities) AffectsJobSet(jobSet string) bool
func (UpdateJobSetPriorities) CanBeAppliedBefore ¶
func (a UpdateJobSetPriorities) CanBeAppliedBefore(b DbOperation) bool
func (UpdateJobSetPriorities) Merge ¶
func (a UpdateJobSetPriorities) Merge(b DbOperation) bool