Documentation
¶
Index ¶
- func CopyProtocolUpsert(ctx context.Context, tx pgx.Tx, tableName string, schema string, ...) error
- func JobErrorsSchema() string
- func JobRunAssignmentSchema() string
- func JobRunErrorsSchema() string
- func JobsSchema() string
- func LeaderelectionSchema() string
- func NamesFromRecord(x interface{}) []string
- func NamesValuesFromRecord(x interface{}) ([]string, []interface{})
- func NodeInfoSchema() string
- func PulsarSchema() string
- func RunsSchema() string
- func Upsert(ctx context.Context, db *pgxpool.Pool, tableName string, schema string, ...) error
- func ValuesFromRecord(x interface{}) []interface{}
- func WriteDbOp(ctx context.Context, db *pgxpool.Pool, op DbOperation) error
- type DBTX
- type DbOperation
- type DbOperationsBatcher
- type DbOperationsWithMessageIds
- type DbOpsWriter
- type ErrLostLeadership
- type ExecutorApi
- func (srv *ExecutorApi) RenewLease(ctx context.Context, req *api.RenewLeaseRequest) (*api.IdList, error)
- func (srv *ExecutorApi) ReportDone(ctx context.Context, req *api.IdList) (*api.IdList, error)
- func (srv *ExecutorApi) ReportUsage(ctx context.Context, req *api.ClusterUsageReport) (*types.Empty, error)
- func (srv *ExecutorApi) ReturnLease(ctx context.Context, req *api.ReturnLeaseRequest) (*types.Empty, error)
- func (srv *ExecutorApi) StreamingLeaseJobs(stream api.AggregatedQueue_StreamingLeaseJobsServer) error
- type Ingester
- type InsertJobErrors
- type InsertJobRunErrors
- type InsertJobs
- type InsertRunAssignments
- type InsertRuns
- type Job
- type JobError
- type JobRunAssignment
- type JobRunError
- type JobRuns
- type JobSetOperation
- type LeaderElection
- type Leaderelection
- type MarkJobSetsCancelled
- type MarkJobsCancelled
- type MarkJobsFailed
- type MarkJobsSucceeded
- type MarkRunsAsSentByExecutorAndJobIdParams
- type MarkRunsFailed
- type MarkRunsRunning
- type MarkRunsSucceeded
- type Nodeinfo
- type Pulsar
- type Queries
- func (q *Queries) GetTopicMessageIds(ctx context.Context, topic string) ([]Pulsar, error)
- func (q *Queries) MarkJobCancelledById(ctx context.Context, jobID uuid.UUID) error
- func (q *Queries) MarkJobFailedById(ctx context.Context, jobID uuid.UUID) error
- func (q *Queries) MarkJobRunCancelledByJobId(ctx context.Context, jobID uuid.UUID) error
- func (q *Queries) MarkJobRunFailedById(ctx context.Context, runID uuid.UUID) error
- func (q *Queries) MarkJobRunRunningById(ctx context.Context, runID uuid.UUID) error
- func (q *Queries) MarkJobRunSucceededById(ctx context.Context, runID uuid.UUID) error
- func (q *Queries) MarkJobRunsCancelledByJobId(ctx context.Context, jobIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsCancelledBySet(ctx context.Context, jobSet string) error
- func (q *Queries) MarkJobRunsCancelledBySets(ctx context.Context, jobSets []string) error
- func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsRunningById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobSucceededById(ctx context.Context, jobID uuid.UUID) error
- func (q *Queries) MarkJobsCancelledById(ctx context.Context, jobIds []uuid.UUID) error
- func (q *Queries) MarkJobsCancelledBySet(ctx context.Context, jobSet string) error
- func (q *Queries) MarkJobsCancelledBySets(ctx context.Context, jobSets []string) error
- func (q *Queries) MarkJobsFailedById(ctx context.Context, jobIds []uuid.UUID) error
- func (q *Queries) MarkJobsFailedBySet(ctx context.Context, jobSet string) error
- func (q *Queries) MarkJobsFailedBySets(ctx context.Context, jobSets []string) error
- func (q *Queries) MarkJobsSucceededById(ctx context.Context, jobIds []uuid.UUID) error
- func (q *Queries) MarkJobsSucceededBySet(ctx context.Context, jobSet string) error
- func (q *Queries) MarkJobsSucceededBySets(ctx context.Context, jobSets []string) error
- func (q *Queries) MarkRunsAsSent(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkRunsAsSentByExecutorAndJobId(ctx context.Context, arg MarkRunsAsSentByExecutorAndJobIdParams) error
- func (q *Queries) SelectJobErrorsById(ctx context.Context, jobID uuid.UUID) ([]JobError, error)
- func (q *Queries) SelectJobsFromIds(ctx context.Context, jobIds []uuid.UUID) ([]Job, error)
- func (q *Queries) SelectLeader(ctx context.Context) (Leaderelection, error)
- func (q *Queries) SelectNewActiveJobs(ctx context.Context, serial int64) ([]Job, error)
- func (q *Queries) SelectNewJobErrors(ctx context.Context, serial int64) ([]JobError, error)
- func (q *Queries) SelectNewJobs(ctx context.Context, serial int64) ([]Job, error)
- func (q *Queries) SelectNewNodeInfo(ctx context.Context, serial int64) ([]Nodeinfo, error)
- func (q *Queries) SelectNewRunAssignments(ctx context.Context, serial int64) ([]JobRunAssignment, error)
- func (q *Queries) SelectNewRunErrors(ctx context.Context, serial int64) ([]JobRunError, error)
- func (q *Queries) SelectNewRunsForExecutorWithLimit(ctx context.Context, arg SelectNewRunsForExecutorWithLimitParams) ([]Run, error)
- func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error)
- func (q *Queries) SelectQueueJobSetFromId(ctx context.Context, jobID uuid.UUID) (SelectQueueJobSetFromIdRow, error)
- func (q *Queries) SelectQueueJobSetFromIds(ctx context.Context, jobIds []uuid.UUID) ([]SelectQueueJobSetFromIdsRow, error)
- func (q *Queries) SelectReplicaById(ctx context.Context, id uuid.UUID) (Leaderelection, error)
- func (q *Queries) SelectRunErrorsById(ctx context.Context, runID uuid.UUID) ([]JobRunError, error)
- func (q *Queries) SelectRunsFromExecutorAndJobs(ctx context.Context, arg SelectRunsFromExecutorAndJobsParams) ([]Run, error)
- func (q *Queries) SelectUnsentRunsForExecutor(ctx context.Context, executor string) ([]Run, error)
- func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error
- func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error
- func (q *Queries) UpsertMessageId(ctx context.Context, arg UpsertMessageIdParams) error
- func (q *Queries) WithTx(tx pgx.Tx) *Queries
- type Queue
- type Run
- type Scheduler
- type SelectNewRunsForExecutorWithLimitParams
- type SelectNewRunsForJobsParams
- type SelectQueueJobSetFromIdRow
- type SelectQueueJobSetFromIdsRow
- type SelectRunsFromExecutorAndJobsParams
- type UpdateJobPriorities
- type UpdateJobPriorityByIdParams
- type UpdateJobPriorityByJobSetParams
- type UpdateJobSetPriorities
- type UpsertMessageIdParams
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CopyProtocolUpsert ¶
func JobErrorsSchema ¶
func JobErrorsSchema() string
func JobRunAssignmentSchema ¶
func JobRunAssignmentSchema() string
func JobRunErrorsSchema ¶
func JobRunErrorsSchema() string
func JobsSchema ¶
func JobsSchema() string
func LeaderelectionSchema ¶
func LeaderelectionSchema() string
func NamesFromRecord ¶
func NamesFromRecord(x interface{}) []string
NamesFromRecord returns a slice composed of the field names in a struct marked with "db" tags.
For example, if x is an instance of a struct with definition
type Rectangle struct { Width int `db:"width"` Height int `db:"height"` },
it returns ["width", "height"].
func NamesValuesFromRecord ¶
func NamesValuesFromRecord(x interface{}) ([]string, []interface{})
NamesValuesFromRecord returns a slice composed of the field names and another composed of the corresponding values for fields of a struct marked with "db" tags.
For example, if x is an instance of a struct with definition
type Rectangle struct { Width int `db:"width"` Height int `db:"height"` },
where Width = 10 and Height = 5, it returns ["width", "height"], [10, 5].
This function does not handle pointers to structs, i.e., x must be Rectangle{} and not &Rectangle{}.
func NodeInfoSchema ¶
func NodeInfoSchema() string
func PulsarSchema ¶
func PulsarSchema() string
func RunsSchema ¶
func RunsSchema() string
func Upsert ¶
func Upsert(ctx context.Context, db *pgxpool.Pool, tableName string, schema string, records []interface{}) error
Upsert is an optimised SQL call for bulk upserts.
For efficiency, this function: 1. Creates an empty temporary SQL table. 2. Inserts all records into the temporary table using the postgres-specific COPY wire protocol. 3. Upserts all records from the temporary table into the target table (as specified by tableName).
The COPY protocol can be faster than repeated inserts for as little as 5 rows; see https://www.postgresql.org/docs/current/populate.html https://pkg.go.dev/github.com/jackc/pgx/v4#hdr-Copy_Protocol
The records to write should be structs with fields marked with "db" tags. Field names and values are extracted using the NamesValuesFromRecord function; see its definition for details. The first field is used as the primary key in SQL.
The temporary table is created with the provided schema, which should be of the form (
id UUID PRIMARY KEY, width int NOT NULL, height int NOT NULL
) I.e., it should omit everything before and after the "(" and ")", respectively.
func ValuesFromRecord ¶
func ValuesFromRecord(x interface{}) []interface{}
ValuesFromRecord returns a slice composed of the values of the fields in a struct marked with "db" tags.
For example, if x is an instance of a struct with definition
type Rectangle struct { Name string, Width int `db:"width"` Height int `db:"height"` },
where Width = 5 and Height = 10, it returns [5, 10].
Types ¶
type DbOperation ¶
type DbOperation interface { // a.Merge(b) attempts to merge b into a, creating a single combined op. // Returns true if merging was successful. // If successful, modifies a in-place. // If not successful, neither op is mutated. Merge(DbOperation) bool // a.CanBeAppliedBefore(b) returns true if a can be placed before b // without changing the end result of the overall set of operations. CanBeAppliedBefore(DbOperation) bool }
DbOperation captures a generic batch database operation.
There are 5 types of operations: - Insert jobs (i.e., add new jobs to the db). - Insert runs (i.e., add new runs to the db). - Job set operations (i.e., modify all jobs and runs in the db part of a given job set). - Job operations (i.e., modify particular jobs). - Job run operations (i.e., modify particular runs).
To improve performance, several ops can be merged into a single op if of the same type. To increase the number of ops that can be merged, ops can sometimes be reordered.
Specifically, an op can be applied before another if: - Insert jobs: if prior op doesn't affect the job set. - Insert runs: if prior op doesn't affect the job set or defines the corresponding job. - Job set operations: if not affecting a job defined in prior op. - Job operations: if not affecting a job defined in a prior op. - Job run operations: if not affecting a run defined in a prior op.
In addition, UpdateJobPriorities can never be applied beforee UpdateJobSetPriorities and vice versa, since one may overwrite values set by the other.
func AppendDbOperation ¶
func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation
AppendDbOperation appends a sql operation, possibly merging it with a previous operation if that can be done in such a way that the end result of applying the entire sequence of operations is unchanged.
func DbOpsFromEventInSequence ¶
func DbOpsFromEventInSequence(sequence *armadaevents.EventSequence, i int) ([]DbOperation, error)
DbOpsFromEventInSequence returns a DbOperation produced from the i-th event in sequence, or nil if the i-th event doesn't correspond to any DbOperation.
type DbOperationsBatcher ¶
type DbOperationsBatcher struct { In chan *eventutil.EventSequenceWithMessageIds Out chan *DbOperationsWithMessageIds // Max number of Pulsar messages to include with each batch. MaxMessages int // Max time interval between batches. MaxInterval time.Duration }
DbOperationsBatcher is a service that consumes event sequences and produces optimised sequences of database operations.
TODO: Move into dbopsfromevents.go
func NewDbOperationsBatcher ¶
func NewDbOperationsBatcher(in chan *eventutil.EventSequenceWithMessageIds) *DbOperationsBatcher
type DbOperationsWithMessageIds ¶
type DbOperationsWithMessageIds struct { Ops []DbOperation MessageIds []pulsar.MessageID }
DbOperationsWithMessageIds bundles a sequence of db ops with the ids of all Pulsar messages that were consumed to produce it.
type DbOpsWriter ¶
type DbOpsWriter struct { In chan *DbOperationsWithMessageIds // Connection to the postgres database. Db *pgxpool.Pool // Pulsar consumer used to ack messages. Consumer pulsar.Consumer // Optional logger. // If not provided, the default logrus logger is used. Logger *logrus.Entry }
Service that writes DbOperations into postgres.
type ErrLostLeadership ¶
func (*ErrLostLeadership) Error ¶
func (err *ErrLostLeadership) Error() string
type ExecutorApi ¶
type ExecutorApi struct { api.UnimplementedAggregatedQueueServer // Embed the Redis-backed event server. // Provides methods for dual-publishing events etc. *server.EventServer Producer pulsar.Producer Db *pgxpool.Pool MaxJobsPerCall int32 }
func (*ExecutorApi) RenewLease ¶
func (srv *ExecutorApi) RenewLease(ctx context.Context, req *api.RenewLeaseRequest) (*api.IdList, error)
func (*ExecutorApi) ReportDone ¶
func (*ExecutorApi) ReportUsage ¶
func (srv *ExecutorApi) ReportUsage(ctx context.Context, req *api.ClusterUsageReport) (*types.Empty, error)
TODO: Does nothing for now.
func (*ExecutorApi) ReturnLease ¶
func (srv *ExecutorApi) ReturnLease(ctx context.Context, req *api.ReturnLeaseRequest) (*types.Empty, error)
func (*ExecutorApi) StreamingLeaseJobs ¶
func (srv *ExecutorApi) StreamingLeaseJobs(stream api.AggregatedQueue_StreamingLeaseJobsServer) error
type Ingester ¶
type Ingester struct { // Used to setup a Pulsar consumer. PulsarClient pulsar.Client ConsumerOptions pulsar.ConsumerOptions // Connection to the postgres database. Db *pgxpool.Pool // Write to postgres at least this often (assuming there are records to write). MaxWriteInterval time.Duration // Max number of DbOperation to batch. MaxDbOps int // Optional logger. // If not provided, the default logrus logger is used. Logger *logrus.Entry // contains filtered or unexported fields }
Service that updates the scheduler database.
At a high level, the ingester: 1. Reads messages from Pulsar, which are used to create DbOperations. 2. Db ops are collected for up to some amount of time. 3. Db ops are applied to postgres in batch. 4. The Pulsar messages read to produce the ops are acked.
type InsertJobErrors ¶
func (InsertJobErrors) CanBeAppliedBefore ¶
func (a InsertJobErrors) CanBeAppliedBefore(b DbOperation) bool
func (InsertJobErrors) Merge ¶
func (a InsertJobErrors) Merge(b DbOperation) bool
type InsertJobRunErrors ¶
type InsertJobRunErrors map[int32]*JobRunError
func (InsertJobRunErrors) CanBeAppliedBefore ¶
func (a InsertJobRunErrors) CanBeAppliedBefore(b DbOperation) bool
func (InsertJobRunErrors) Merge ¶
func (a InsertJobRunErrors) Merge(b DbOperation) bool
type InsertJobs ¶
Db operations (implements DbOperation).
func (InsertJobs) CanBeAppliedBefore ¶
func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool
func (InsertJobs) Merge ¶
func (a InsertJobs) Merge(b DbOperation) bool
type InsertRunAssignments ¶
type InsertRunAssignments map[uuid.UUID]*JobRunAssignment
func (InsertRunAssignments) CanBeAppliedBefore ¶
func (a InsertRunAssignments) CanBeAppliedBefore(b DbOperation) bool
func (InsertRunAssignments) Merge ¶
func (a InsertRunAssignments) Merge(b DbOperation) bool
type InsertRuns ¶
func (InsertRuns) CanBeAppliedBefore ¶
func (a InsertRuns) CanBeAppliedBefore(b DbOperation) bool
func (InsertRuns) Merge ¶
func (a InsertRuns) Merge(b DbOperation) bool
type Job ¶
type Job struct { JobID uuid.UUID `db:"job_id"` JobSet string `db:"job_set"` Queue string `db:"queue"` UserID string `db:"user_id"` Groups []string `db:"groups"` Priority int64 `db:"priority"` Cancelled bool `db:"cancelled"` Succeeded bool `db:"succeeded"` Failed bool `db:"failed"` SubmitMessage []byte `db:"submit_message"` SchedulingInfo []byte `db:"scheduling_info"` Serial int64 `db:"serial"` LastModified time.Time `db:"last_modified"` }
type JobRunAssignment ¶
type JobRunError ¶
type JobRuns ¶
type JobRuns struct { // Any runs associated with this job that have not terminated. // Map from run id to run. ActiveRuns map[uuid.UUID]*Run // Any runs associated with this job for which the ingester has received // a terminal event (i.e., succeeded, failed, or cancelled). // Map from run id to run. InactiveRuns map[uuid.UUID]*Run }
JobRuns is a collection of all runs associated with a particular job.
func NewJobRuns ¶
func NewJobRuns() *JobRuns
type JobSetOperation ¶
type LeaderElection ¶
type LeaderElection struct { Db *pgxpool.Pool // Id uniquely identifying this leader in this epoch. Id uuid.UUID // Interval between database operations. // Used both while leader and follower. Interval time.Duration // Time after which this instance may try to become leader // if there has been no activity from the current leader. Timeout time.Duration // Optional logger. // If not provided, the default logrus logger is used. Logger *logrus.Entry }
LeaderElection is used by the scheduler to ensure only one instance is creating leases at a time.
The leader election process is based on timeouts. The leader continually writes its id into postgres. Non-leader instances monitor the last_modified value set by postgres for that row. If last_modified exceeds some threshold, a non-leader tries to become leader. It does so by, in a transaction, marking its own id as the leader.
func NewLeaderElection ¶
func NewLeaderElection(db *pgxpool.Pool) *LeaderElection
func (*LeaderElection) BecomeLeader ¶
func (srv *LeaderElection) BecomeLeader(ctx context.Context) error
BecomeLeader returns once this instance has become the leader.
Checks once per interval if the current leader has gone missing and, if so, tries to become leader.
func (*LeaderElection) StayLeader ¶
func (srv *LeaderElection) StayLeader(ctx context.Context) error
StayLeader writes into postgres every interval to ensure no other replica attempts to become leader.
type Leaderelection ¶
type MarkJobSetsCancelled ¶
func (MarkJobSetsCancelled) AffectsJobSet ¶
func (a MarkJobSetsCancelled) AffectsJobSet(jobSet string) bool
func (MarkJobSetsCancelled) CanBeAppliedBefore ¶
func (a MarkJobSetsCancelled) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobSetsCancelled) Merge ¶
func (a MarkJobSetsCancelled) Merge(b DbOperation) bool
type MarkJobsCancelled ¶
func (MarkJobsCancelled) CanBeAppliedBefore ¶
func (a MarkJobsCancelled) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsCancelled) Merge ¶
func (a MarkJobsCancelled) Merge(b DbOperation) bool
type MarkJobsFailed ¶
func (MarkJobsFailed) CanBeAppliedBefore ¶
func (a MarkJobsFailed) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsFailed) Merge ¶
func (a MarkJobsFailed) Merge(b DbOperation) bool
type MarkJobsSucceeded ¶
func (MarkJobsSucceeded) CanBeAppliedBefore ¶
func (a MarkJobsSucceeded) CanBeAppliedBefore(b DbOperation) bool
func (MarkJobsSucceeded) Merge ¶
func (a MarkJobsSucceeded) Merge(b DbOperation) bool
type MarkRunsFailed ¶
func (MarkRunsFailed) CanBeAppliedBefore ¶
func (a MarkRunsFailed) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsFailed) Merge ¶
func (a MarkRunsFailed) Merge(b DbOperation) bool
type MarkRunsRunning ¶
func (MarkRunsRunning) CanBeAppliedBefore ¶
func (a MarkRunsRunning) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsRunning) Merge ¶
func (a MarkRunsRunning) Merge(b DbOperation) bool
type MarkRunsSucceeded ¶
func (MarkRunsSucceeded) CanBeAppliedBefore ¶
func (a MarkRunsSucceeded) CanBeAppliedBefore(b DbOperation) bool
func (MarkRunsSucceeded) Merge ¶
func (a MarkRunsSucceeded) Merge(b DbOperation) bool
type Queries ¶
type Queries struct {
// contains filtered or unexported fields
}
func (*Queries) GetTopicMessageIds ¶
func (*Queries) MarkJobCancelledById ¶
Job cancellation
func (*Queries) MarkJobFailedById ¶
Job failed
func (*Queries) MarkJobRunCancelledByJobId ¶
Job run cancelled
func (*Queries) MarkJobRunFailedById ¶
Job run failed
func (*Queries) MarkJobRunRunningById ¶
Job run running
func (*Queries) MarkJobRunSucceededById ¶
Job run succeeded
func (*Queries) MarkJobRunsCancelledByJobId ¶
func (*Queries) MarkJobRunsCancelledBySet ¶
func (*Queries) MarkJobRunsCancelledBySets ¶
func (*Queries) MarkJobRunsFailedById ¶
func (*Queries) MarkJobRunsRunningById ¶
func (*Queries) MarkJobRunsSucceededById ¶
func (*Queries) MarkJobSucceededById ¶
Job succeeded
func (*Queries) MarkJobsCancelledById ¶
func (*Queries) MarkJobsCancelledBySet ¶
func (*Queries) MarkJobsCancelledBySets ¶
func (*Queries) MarkJobsFailedById ¶
func (*Queries) MarkJobsFailedBySet ¶
func (*Queries) MarkJobsFailedBySets ¶
func (*Queries) MarkJobsSucceededById ¶
func (*Queries) MarkJobsSucceededBySet ¶
func (*Queries) MarkJobsSucceededBySets ¶
func (*Queries) MarkRunsAsSent ¶
func (*Queries) MarkRunsAsSentByExecutorAndJobId ¶
func (q *Queries) MarkRunsAsSentByExecutorAndJobId(ctx context.Context, arg MarkRunsAsSentByExecutorAndJobIdParams) error
func (*Queries) SelectJobErrorsById ¶
Job errors
func (*Queries) SelectJobsFromIds ¶
Jobs
func (*Queries) SelectLeader ¶
func (q *Queries) SelectLeader(ctx context.Context) (Leaderelection, error)
Leader election Return the row associated with the current leader. If due to a bug several rows are marked as leader, return the most recently modified one.
func (*Queries) SelectNewActiveJobs ¶
func (*Queries) SelectNewJobErrors ¶
func (*Queries) SelectNewJobs ¶
func (*Queries) SelectNewNodeInfo ¶
NodeInfo
func (*Queries) SelectNewRunAssignments ¶
func (q *Queries) SelectNewRunAssignments(ctx context.Context, serial int64) ([]JobRunAssignment, error)
Job run assignments
func (*Queries) SelectNewRunErrors ¶
func (*Queries) SelectNewRunsForExecutorWithLimit ¶
func (*Queries) SelectNewRunsForJobs ¶
func (*Queries) SelectQueueJobSetFromId ¶
func (*Queries) SelectQueueJobSetFromIds ¶
func (*Queries) SelectReplicaById ¶
func (*Queries) SelectRunErrorsById ¶
Run errors
func (*Queries) SelectRunsFromExecutorAndJobs ¶
func (*Queries) SelectUnsentRunsForExecutor ¶
Runs
func (*Queries) UpdateJobPriorityById ¶
func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error
Job priority
func (*Queries) UpdateJobPriorityByJobSet ¶
func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error
func (*Queries) UpsertMessageId ¶
func (q *Queries) UpsertMessageId(ctx context.Context, arg UpsertMessageIdParams) error
type Run ¶
type Run struct { RunID uuid.UUID `db:"run_id"` JobID uuid.UUID `db:"job_id"` JobSet string `db:"job_set"` Executor string `db:"executor"` SentToExecutor bool `db:"sent_to_executor"` Cancelled bool `db:"cancelled"` Running bool `db:"running"` Succeeded bool `db:"succeeded"` Failed bool `db:"failed"` Serial int64 `db:"serial"` LastModified time.Time `db:"last_modified"` }
type Scheduler ¶
type Scheduler struct { Producer pulsar.Producer Db *pgxpool.Pool // Map from job id to job struct. // Contains all jobs the scheduler is aware of that have not terminated, // i.e., succeed, failed, or been cancelled. // Jobs are added when read from postgres. // Jobs are removed when they terminate ActiveJobs map[uuid.UUID]*Job // Map from job id to a collection of all runs associated with that job. RunsByJobId map[uuid.UUID]*JobRuns // The queue consists of all active jobs that don't have at least one active run associated with it. // Ids of all jobs that don't have at least one active // Ids of jobs that have not yet been scheduled. QueuedJobIds []uuid.UUID // List of worker nodes available across all clusters. Nodes map[string]*Nodeinfo // Map from executor name to the last time we heard from that executor. Executors map[string]time.Time // Amount of time after which an executor is assumed to be unavailable // if no updates have been received. ExecutorAliveDuration time.Duration // Each write into postgres is marked with an increasing serial number. // For each table, we store the largest number seen so far. // When reading from postgres, we select all rows with a serial number larger than what we've seen so far. // // The first record written to postgres for each table will have serial 1, // so these should be initialised to 0. JobsSerial int64 RunsSerial int64 JobRunsAssignmentsSerial int64 JobErrorsSerial int64 JobRunErrorsSerial int64 NodesSerial int64 // Optional logger. // If not provided, the default logrus logger is used. Logger *logrus.Entry }
Scheduler implements a trivial scheduling algorithm. It's here just to test that the scheduling subsystem as a whole is working.
type UpdateJobPriorities ¶
func (UpdateJobPriorities) CanBeAppliedBefore ¶
func (a UpdateJobPriorities) CanBeAppliedBefore(b DbOperation) bool
func (UpdateJobPriorities) Merge ¶
func (a UpdateJobPriorities) Merge(b DbOperation) bool
type UpdateJobSetPriorities ¶
func (UpdateJobSetPriorities) AffectsJobSet ¶
func (a UpdateJobSetPriorities) AffectsJobSet(jobSet string) bool
func (UpdateJobSetPriorities) CanBeAppliedBefore ¶
func (a UpdateJobSetPriorities) CanBeAppliedBefore(b DbOperation) bool
func (UpdateJobSetPriorities) Merge ¶
func (a UpdateJobSetPriorities) Merge(b DbOperation) bool