scheduleringester

package
v0.3.75-rc-00397cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewInstructionConverter

func NewInstructionConverter(
	metrics *metrics.Metrics,
	priorityClasses map[string]configuration.PriorityClass,
	compressor compress.Compressor,
) ingest.InstructionConverter[*DbOperationsWithMessageIds]

func NewSchedulerDb

func NewSchedulerDb(
	db *pgxpool.Pool,
	metrics *metrics.Metrics,
	initialBackOff time.Duration,
	maxBackOff time.Duration,
	lockTimeout time.Duration,
) ingest.Sink[*DbOperationsWithMessageIds]

func Run

func Run(config Configuration)

Run will create a pipeline that will take Armada event messages from Pulsar and update the Scheduler database accordingly. This pipeline will run until a SIGTERM is received

Types

type Configuration

type Configuration struct {
	// Database configuration
	Postgres configuration.PostgresConfig
	// Metrics configuration
	Metrics configuration.MetricsConfig
	// General Pulsar configuration
	Pulsar configuration.PulsarConfig
	// Map of allowed priority classes by name
	PriorityClasses map[string]configuration.PriorityClass
	// Pulsar subscription name
	SubscriptionName string
	// Number of messages that will be batched together before being inserted into the database
	BatchSize int
	// Maximum time since the last batch before a batch will be inserted into the database
	BatchDuration time.Duration
	// Time for which the pulsar consumer will wait for a new message before retrying
	PulsarReceiveTimeout time.Duration
	// Time for which the pulsar consumer will back off after receiving an error on trying to receive a message
	PulsarBackoffTime time.Duration
}

type DbOperation

type DbOperation interface {
	// a.Merge(b) attempts to merge b into a, creating a single combined op.
	// Returns true if merging was successful.
	// If successful, modifies a in-place.
	// If not successful, neither op is mutated.
	Merge(DbOperation) bool
	// a.CanBeAppliedBefore(b) returns true if a can be placed before b
	// without changing the end result of the overall set of operations.
	CanBeAppliedBefore(DbOperation) bool
}

DbOperation captures a generic batch database operation.

There are 5 types of operations: - Insert jobs (i.e., add new jobs to the schedulerdb). - Insert runs (i.e., add new runs to the schedulerdb). - Job set operations (i.e., modify all jobs and runs in the schedulerdb part of a given job set). - Job operations (i.e., modify particular jobs). - Job run operations (i.e., modify particular runs).

To improve performance, several ops can be merged into a single op if of the same type. To increase the number of ops that can be merged, ops can sometimes be reordered.

Specifically, an op can be applied before another if: - Insert jobs: if prior op doesn't affect the job set. - Insert runs: if prior op doesn't affect the job set or defines the corresponding job. - Job set operations: if not affecting a job defined in prior op. - Job operations: if not affecting a job defined in a prior op. - Job run operations: if not affecting a run defined in a prior op.

In addition, UpdateJobPriorities can never be applied beforee UpdateJobSetPriorities and vice versa, since one may overwrite values set by the other.

func AppendDbOperation

func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation

AppendDbOperation appends a sql operation, possibly merging it with a previous operation if that can be done in such a way that the end result of applying the entire sequence of operations is unchanged.

type DbOperationsWithMessageIds

type DbOperationsWithMessageIds struct {
	Ops        []DbOperation
	MessageIds []pulsar.MessageID
}

DbOperationsWithMessageIds bundles a sequence of schedulerdb ops with the ids of all Pulsar messages that were consumed to produce it.

func (*DbOperationsWithMessageIds) GetMessageIDs

func (d *DbOperationsWithMessageIds) GetMessageIDs() []pulsar.MessageID

type InsertJobRunErrors added in v0.3.47

type InsertJobRunErrors map[uuid.UUID]*schedulerdb.JobRunError

func (InsertJobRunErrors) CanBeAppliedBefore added in v0.3.47

func (a InsertJobRunErrors) CanBeAppliedBefore(_ DbOperation) bool

func (InsertJobRunErrors) Merge added in v0.3.47

func (a InsertJobRunErrors) Merge(b DbOperation) bool

type InsertJobs

type InsertJobs map[string]*schedulerdb.Job

func (InsertJobs) CanBeAppliedBefore

func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool

func (InsertJobs) Merge

func (a InsertJobs) Merge(b DbOperation) bool

type InsertPartitionMarker added in v0.3.48

type InsertPartitionMarker struct {
	// contains filtered or unexported fields
}

func (*InsertPartitionMarker) CanBeAppliedBefore added in v0.3.48

func (a *InsertPartitionMarker) CanBeAppliedBefore(b DbOperation) bool

func (*InsertPartitionMarker) Merge added in v0.3.48

type InsertRuns

type InsertRuns map[uuid.UUID]*JobRunDetails

func (InsertRuns) CanBeAppliedBefore

func (a InsertRuns) CanBeAppliedBefore(b DbOperation) bool

func (InsertRuns) Merge

func (a InsertRuns) Merge(b DbOperation) bool

type InstructionConverter

type InstructionConverter struct {
	// contains filtered or unexported fields
}

func (*InstructionConverter) Convert

type JobQueuedStateUpdate added in v0.3.63

type JobQueuedStateUpdate struct {
	Queued             bool
	QueuedStateVersion int32
}

type JobRunDetails added in v0.3.68

type JobRunDetails struct {
	// contains filtered or unexported fields
}

type JobRunFailed added in v0.3.47

type JobRunFailed struct {
	LeaseReturned bool
	RunAttempted  bool
}

type JobSchedulingInfoUpdate added in v0.3.63

type JobSchedulingInfoUpdate struct {
	JobSchedulingInfo        []byte
	JobSchedulingInfoVersion int32
}

type JobSetCancelAction added in v0.3.68

type JobSetCancelAction struct {
	// contains filtered or unexported fields
}

type JobSetKey added in v0.3.68

type JobSetKey struct {
	// contains filtered or unexported fields
}

type JobSetOperation

type JobSetOperation interface {
	AffectsJobSet(queue string, jobSet string) bool
}

type MarkJobSetsCancelRequested added in v0.3.47

type MarkJobSetsCancelRequested map[JobSetKey]*JobSetCancelAction

func (MarkJobSetsCancelRequested) AffectsJobSet added in v0.3.47

func (a MarkJobSetsCancelRequested) AffectsJobSet(queue string, jobSet string) bool

func (MarkJobSetsCancelRequested) CanBeAppliedBefore added in v0.3.47

func (a MarkJobSetsCancelRequested) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobSetsCancelRequested) Merge added in v0.3.47

type MarkJobsCancelRequested added in v0.3.47

type MarkJobsCancelRequested map[string]bool

func (MarkJobsCancelRequested) CanBeAppliedBefore added in v0.3.47

func (a MarkJobsCancelRequested) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsCancelRequested) Merge added in v0.3.47

type MarkJobsCancelled

type MarkJobsCancelled map[string]bool

func (MarkJobsCancelled) CanBeAppliedBefore

func (a MarkJobsCancelled) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsCancelled) Merge

func (a MarkJobsCancelled) Merge(b DbOperation) bool

type MarkJobsFailed

type MarkJobsFailed map[string]bool

func (MarkJobsFailed) CanBeAppliedBefore

func (a MarkJobsFailed) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsFailed) Merge

func (a MarkJobsFailed) Merge(b DbOperation) bool

type MarkJobsSucceeded

type MarkJobsSucceeded map[string]bool

func (MarkJobsSucceeded) CanBeAppliedBefore

func (a MarkJobsSucceeded) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsSucceeded) Merge

func (a MarkJobsSucceeded) Merge(b DbOperation) bool

type MarkRunsFailed

type MarkRunsFailed map[uuid.UUID]*JobRunFailed

func (MarkRunsFailed) CanBeAppliedBefore

func (a MarkRunsFailed) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsFailed) Merge

func (a MarkRunsFailed) Merge(b DbOperation) bool

type MarkRunsRunning

type MarkRunsRunning map[uuid.UUID]bool

func (MarkRunsRunning) CanBeAppliedBefore

func (a MarkRunsRunning) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsRunning) Merge

func (a MarkRunsRunning) Merge(b DbOperation) bool

type MarkRunsSucceeded

type MarkRunsSucceeded map[uuid.UUID]bool

func (MarkRunsSucceeded) CanBeAppliedBefore

func (a MarkRunsSucceeded) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsSucceeded) Merge

func (a MarkRunsSucceeded) Merge(b DbOperation) bool

type SchedulerDb

type SchedulerDb struct {
	// contains filtered or unexported fields
}

SchedulerDb writes DbOperations into postgres.

func (*SchedulerDb) Store

func (s *SchedulerDb) Store(ctx context.Context, instructions *DbOperationsWithMessageIds) error

Store persists all operations in the database. Note that:

  • this function will retry until it either succeeds or a terminal error is encountered
  • this function will take out a postgres lock to ensure that other ingesters are not writing to the database at the same time (for details, see acquireLock())

func (*SchedulerDb) WriteDbOp

func (s *SchedulerDb) WriteDbOp(ctx context.Context, tx pgx.Tx, op DbOperation) error

type UpdateJobPriorities

type UpdateJobPriorities map[string]int64

func (UpdateJobPriorities) CanBeAppliedBefore

func (a UpdateJobPriorities) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobPriorities) Merge

type UpdateJobQueuedState added in v0.3.63

type UpdateJobQueuedState map[string]*JobQueuedStateUpdate

func (UpdateJobQueuedState) CanBeAppliedBefore added in v0.3.63

func (a UpdateJobQueuedState) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobQueuedState) Merge added in v0.3.63

type UpdateJobSchedulingInfo added in v0.3.63

type UpdateJobSchedulingInfo map[string]*JobSchedulingInfoUpdate

func (UpdateJobSchedulingInfo) CanBeAppliedBefore added in v0.3.63

func (a UpdateJobSchedulingInfo) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobSchedulingInfo) Merge added in v0.3.63

type UpdateJobSetPriorities

type UpdateJobSetPriorities map[JobSetKey]int64

func (UpdateJobSetPriorities) AffectsJobSet

func (a UpdateJobSetPriorities) AffectsJobSet(queue string, jobSet string) bool

func (UpdateJobSetPriorities) CanBeAppliedBefore

func (a UpdateJobSetPriorities) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobSetPriorities) Merge

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL