scheduler

package
v0.3.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2022 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CopyProtocolUpsert

func CopyProtocolUpsert(ctx context.Context, tx pgx.Tx, tableName string, schema string, records []interface{}) error

func JobErrorsSchema

func JobErrorsSchema() string

func JobRunAssignmentSchema

func JobRunAssignmentSchema() string

func JobRunErrorsSchema

func JobRunErrorsSchema() string

func JobsSchema

func JobsSchema() string

func LeaderelectionSchema

func LeaderelectionSchema() string

func NamesFromRecord

func NamesFromRecord(x interface{}) []string

NamesFromRecord returns a slice composed of the field names in a struct marked with "db" tags.

For example, if x is an instance of a struct with definition

type Rectangle struct {
	Width int  `db:"width"`
	Height int `db:"height"`
},

it returns ["width", "height"].

func NamesValuesFromRecord

func NamesValuesFromRecord(x interface{}) ([]string, []interface{})

NamesValuesFromRecord returns a slice composed of the field names and another composed of the corresponding values for fields of a struct marked with "db" tags.

For example, if x is an instance of a struct with definition

type Rectangle struct {
	Width int  `db:"width"`
	Height int `db:"height"`
},

where Width = 10 and Height = 5, it returns ["width", "height"], [10, 5].

This function does not handle pointers to structs, i.e., x must be Rectangle{} and not &Rectangle{}.

func NodeInfoSchema

func NodeInfoSchema() string

func PulsarSchema

func PulsarSchema() string

func RunsSchema

func RunsSchema() string

func Upsert

func Upsert(ctx context.Context, db *pgxpool.Pool, tableName string, schema string, records []interface{}) error

Upsert is an optimised SQL call for bulk upserts.

For efficiency, this function: 1. Creates an empty temporary SQL table. 2. Inserts all records into the temporary table using the postgres-specific COPY wire protocol. 3. Upserts all records from the temporary table into the target table (as specified by tableName).

The COPY protocol can be faster than repeated inserts for as little as 5 rows; see https://www.postgresql.org/docs/current/populate.html https://pkg.go.dev/github.com/jackc/pgx/v4#hdr-Copy_Protocol

The records to write should be structs with fields marked with "db" tags. Field names and values are extracted using the NamesValuesFromRecord function; see its definition for details. The first field is used as the primary key in SQL.

The temporary table is created with the provided schema, which should be of the form (

id UUID PRIMARY KEY,
width int NOT NULL,
height int NOT NULL

) I.e., it should omit everything before and after the "(" and ")", respectively.

func ValuesFromRecord

func ValuesFromRecord(x interface{}) []interface{}

ValuesFromRecord returns a slice composed of the values of the fields in a struct marked with "db" tags.

For example, if x is an instance of a struct with definition

type Rectangle struct {
 Name string,
	Width int  `db:"width"`
	Height int `db:"height"`
},

where Width = 5 and Height = 10, it returns [5, 10].

func WriteDbOp

func WriteDbOp(ctx context.Context, db *pgxpool.Pool, op DbOperation) error

TODO: The caller of this function should keep retrying on transient failures.

Types

type DBTX

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
}

type DbOperation

type DbOperation interface {
	// a.Merge(b) attempts to merge b into a, creating a single combined op.
	// Returns true if merging was successful.
	// If successful, modifies a in-place.
	// If not successful, neither op is mutated.
	Merge(DbOperation) bool
	// a.CanBeAppliedBefore(b) returns true if a can be placed before b
	// without changing the end result of the overall set of operations.
	CanBeAppliedBefore(DbOperation) bool
}

DbOperation captures a generic batch database operation.

There are 5 types of operations: - Insert jobs (i.e., add new jobs to the db). - Insert runs (i.e., add new runs to the db). - Job set operations (i.e., modify all jobs and runs in the db part of a given job set). - Job operations (i.e., modify particular jobs). - Job run operations (i.e., modify particular runs).

To improve performance, several ops can be merged into a single op if of the same type. To increase the number of ops that can be merged, ops can sometimes be reordered.

Specifically, an op can be applied before another if: - Insert jobs: if prior op doesn't affect the job set. - Insert runs: if prior op doesn't affect the job set or defines the corresponding job. - Job set operations: if not affecting a job defined in prior op. - Job operations: if not affecting a job defined in a prior op. - Job run operations: if not affecting a run defined in a prior op.

In addition, UpdateJobPriorities can never be applied beforee UpdateJobSetPriorities and vice versa, since one may overwrite values set by the other.

func AppendDbOperation

func AppendDbOperation(ops []DbOperation, op DbOperation) []DbOperation

AppendDbOperation appends a sql operation, possibly merging it with a previous operation if that can be done in such a way that the end result of applying the entire sequence of operations is unchanged.

func DbOpsFromEventInSequence

func DbOpsFromEventInSequence(sequence *armadaevents.EventSequence, i int) ([]DbOperation, error)

DbOpsFromEventInSequence returns a DbOperation produced from the i-th event in sequence, or nil if the i-th event doesn't correspond to any DbOperation.

type DbOperationsBatcher

type DbOperationsBatcher struct {
	In  chan *eventutil.EventSequenceWithMessageIds
	Out chan *DbOperationsWithMessageIds
	// Max number of Pulsar messages to include with each batch.
	MaxMessages int
	// Max time interval between batches.
	MaxInterval time.Duration
}

DbOperationsBatcher is a service that consumes event sequences and produces optimised sequences of database operations.

TODO: Move into dbopsfromevents.go

func (*DbOperationsBatcher) Run

func (srv *DbOperationsBatcher) Run(ctx context.Context) error

type DbOperationsWithMessageIds

type DbOperationsWithMessageIds struct {
	Ops        []DbOperation
	MessageIds []pulsar.MessageID
}

DbOperationsWithMessageIds bundles a sequence of db ops with the ids of all Pulsar messages that were consumed to produce it.

type DbOpsWriter

type DbOpsWriter struct {
	In chan *DbOperationsWithMessageIds
	// Connection to the postgres database.
	Db *pgxpool.Pool
	// Pulsar consumer used to ack messages.
	Consumer pulsar.Consumer
	// Optional logger.
	// If not provided, the default logrus logger is used.
	Logger *logrus.Entry
}

Service that writes DbOperations into postgres.

func (*DbOpsWriter) Run

func (srv *DbOpsWriter) Run(ctx context.Context) error

type ErrLostLeadership

type ErrLostLeadership struct {
	Id uuid.UUID
}

func (*ErrLostLeadership) Error

func (err *ErrLostLeadership) Error() string

type ExecutorApi

type ExecutorApi struct {
	api.UnimplementedAggregatedQueueServer
	// Embed the Redis-backed event server.
	// Provides methods for dual-publishing events etc.
	*server.EventServer
	Producer       pulsar.Producer
	Db             *pgxpool.Pool
	MaxJobsPerCall int32
}

func (*ExecutorApi) RenewLease

func (srv *ExecutorApi) RenewLease(ctx context.Context, req *api.RenewLeaseRequest) (*api.IdList, error)

func (*ExecutorApi) ReportDone

func (srv *ExecutorApi) ReportDone(ctx context.Context, req *api.IdList) (*api.IdList, error)

func (*ExecutorApi) ReportUsage

func (srv *ExecutorApi) ReportUsage(ctx context.Context, req *api.ClusterUsageReport) (*types.Empty, error)

TODO: Does nothing for now.

func (*ExecutorApi) ReturnLease

func (srv *ExecutorApi) ReturnLease(ctx context.Context, req *api.ReturnLeaseRequest) (*types.Empty, error)

func (*ExecutorApi) StreamingLeaseJobs

func (srv *ExecutorApi) StreamingLeaseJobs(stream api.AggregatedQueue_StreamingLeaseJobsServer) error

type Ingester

type Ingester struct {
	// Used to setup a Pulsar consumer.
	PulsarClient    pulsar.Client
	ConsumerOptions pulsar.ConsumerOptions

	// Connection to the postgres database.
	Db *pgxpool.Pool
	// Write to postgres at least this often (assuming there are records to write).
	MaxWriteInterval time.Duration
	// Max number of DbOperation to batch.
	MaxDbOps int
	// Optional logger.
	// If not provided, the default logrus logger is used.
	Logger *logrus.Entry
	// contains filtered or unexported fields
}

Service that updates the scheduler database.

At a high level, the ingester: 1. Reads messages from Pulsar, which are used to create DbOperations. 2. Db ops are collected for up to some amount of time. 3. Db ops are applied to postgres in batch. 4. The Pulsar messages read to produce the ops are acked.

func (*Ingester) Run

func (srv *Ingester) Run(ctx context.Context) error

Run the ingester until experiencing an unrecoverable error.

type InsertJobErrors

type InsertJobErrors map[int32]*JobError

func (InsertJobErrors) CanBeAppliedBefore

func (a InsertJobErrors) CanBeAppliedBefore(b DbOperation) bool

func (InsertJobErrors) Merge

func (a InsertJobErrors) Merge(b DbOperation) bool

type InsertJobRunErrors

type InsertJobRunErrors map[int32]*JobRunError

func (InsertJobRunErrors) CanBeAppliedBefore

func (a InsertJobRunErrors) CanBeAppliedBefore(b DbOperation) bool

func (InsertJobRunErrors) Merge

func (a InsertJobRunErrors) Merge(b DbOperation) bool

type InsertJobs

type InsertJobs map[uuid.UUID]*Job

Db operations (implements DbOperation).

func (InsertJobs) CanBeAppliedBefore

func (a InsertJobs) CanBeAppliedBefore(b DbOperation) bool

func (InsertJobs) Merge

func (a InsertJobs) Merge(b DbOperation) bool

type InsertRunAssignments

type InsertRunAssignments map[uuid.UUID]*JobRunAssignment

func (InsertRunAssignments) CanBeAppliedBefore

func (a InsertRunAssignments) CanBeAppliedBefore(b DbOperation) bool

func (InsertRunAssignments) Merge

type InsertRuns

type InsertRuns map[uuid.UUID]*Run

func (InsertRuns) CanBeAppliedBefore

func (a InsertRuns) CanBeAppliedBefore(b DbOperation) bool

func (InsertRuns) Merge

func (a InsertRuns) Merge(b DbOperation) bool

type Job

type Job struct {
	JobID          uuid.UUID `db:"job_id"`
	JobSet         string    `db:"job_set"`
	Queue          string    `db:"queue"`
	UserID         string    `db:"user_id"`
	Groups         []string  `db:"groups"`
	Priority       int64     `db:"priority"`
	Cancelled      bool      `db:"cancelled"`
	Succeeded      bool      `db:"succeeded"`
	Failed         bool      `db:"failed"`
	SubmitMessage  []byte    `db:"submit_message"`
	SchedulingInfo []byte    `db:"scheduling_info"`
	Serial         int64     `db:"serial"`
	LastModified   time.Time `db:"last_modified"`
}

type JobError

type JobError struct {
	ID           int32     `db:"id"`
	JobID        uuid.UUID `db:"job_id"`
	Error        []byte    `db:"error"`
	Terminal     bool      `db:"terminal"`
	Serial       int64     `db:"serial"`
	LastModified time.Time `db:"last_modified"`
}

type JobRunAssignment

type JobRunAssignment struct {
	RunID        uuid.UUID `db:"run_id"`
	Assignment   []byte    `db:"assignment"`
	Serial       int64     `db:"serial"`
	LastModified time.Time `db:"last_modified"`
}

type JobRunError

type JobRunError struct {
	ID           int32     `db:"id"`
	RunID        uuid.UUID `db:"run_id"`
	Error        []byte    `db:"error"`
	Terminal     bool      `db:"terminal"`
	Serial       int64     `db:"serial"`
	LastModified time.Time `db:"last_modified"`
}

type JobRuns

type JobRuns struct {
	// Any runs associated with this job that have not terminated.
	// Map from run id to run.
	ActiveRuns map[uuid.UUID]*Run
	// Any runs associated with this job for which the ingester has received
	// a terminal event (i.e., succeeded, failed, or cancelled).
	// Map from run id to run.
	InactiveRuns map[uuid.UUID]*Run
}

JobRuns is a collection of all runs associated with a particular job.

func NewJobRuns

func NewJobRuns() *JobRuns

type JobSetOperation

type JobSetOperation interface {
	AffectsJobSet(string) bool
}

type LeaderElection

type LeaderElection struct {
	Db *pgxpool.Pool
	// Id uniquely identifying this leader in this epoch.
	Id uuid.UUID
	// Interval between database operations.
	// Used both while leader and follower.
	Interval time.Duration
	// Time after which this instance may try to become leader
	// if there has been no activity from the current leader.
	Timeout time.Duration
	// Optional logger.
	// If not provided, the default logrus logger is used.
	Logger *logrus.Entry
}

LeaderElection is used by the scheduler to ensure only one instance is creating leases at a time.

The leader election process is based on timeouts. The leader continually writes its id into postgres. Non-leader instances monitor the last_modified value set by postgres for that row. If last_modified exceeds some threshold, a non-leader tries to become leader. It does so by, in a transaction, marking its own id as the leader.

func NewLeaderElection

func NewLeaderElection(db *pgxpool.Pool) *LeaderElection

func (*LeaderElection) BecomeLeader

func (srv *LeaderElection) BecomeLeader(ctx context.Context) error

BecomeLeader returns once this instance has become the leader.

Checks once per interval if the current leader has gone missing and, if so, tries to become leader.

func (*LeaderElection) StayLeader

func (srv *LeaderElection) StayLeader(ctx context.Context) error

StayLeader writes into postgres every interval to ensure no other replica attempts to become leader.

type Leaderelection

type Leaderelection struct {
	ID           uuid.UUID `db:"id"`
	IsLeader     bool      `db:"is_leader"`
	Serial       int64     `db:"serial"`
	LastModified time.Time `db:"last_modified"`
}

type MarkJobSetsCancelled

type MarkJobSetsCancelled map[string]bool

func (MarkJobSetsCancelled) AffectsJobSet

func (a MarkJobSetsCancelled) AffectsJobSet(jobSet string) bool

func (MarkJobSetsCancelled) CanBeAppliedBefore

func (a MarkJobSetsCancelled) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobSetsCancelled) Merge

type MarkJobsCancelled

type MarkJobsCancelled map[uuid.UUID]bool

func (MarkJobsCancelled) CanBeAppliedBefore

func (a MarkJobsCancelled) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsCancelled) Merge

func (a MarkJobsCancelled) Merge(b DbOperation) bool

type MarkJobsFailed

type MarkJobsFailed map[uuid.UUID]bool

func (MarkJobsFailed) CanBeAppliedBefore

func (a MarkJobsFailed) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsFailed) Merge

func (a MarkJobsFailed) Merge(b DbOperation) bool

type MarkJobsSucceeded

type MarkJobsSucceeded map[uuid.UUID]bool

func (MarkJobsSucceeded) CanBeAppliedBefore

func (a MarkJobsSucceeded) CanBeAppliedBefore(b DbOperation) bool

func (MarkJobsSucceeded) Merge

func (a MarkJobsSucceeded) Merge(b DbOperation) bool

type MarkRunsAsSentByExecutorAndJobIdParams

type MarkRunsAsSentByExecutorAndJobIdParams struct {
	Executor string      `db:"executor"`
	JobIds   []uuid.UUID `db:"job_ids"`
}

type MarkRunsFailed

type MarkRunsFailed map[uuid.UUID]bool

func (MarkRunsFailed) CanBeAppliedBefore

func (a MarkRunsFailed) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsFailed) Merge

func (a MarkRunsFailed) Merge(b DbOperation) bool

type MarkRunsRunning

type MarkRunsRunning map[uuid.UUID]bool

func (MarkRunsRunning) CanBeAppliedBefore

func (a MarkRunsRunning) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsRunning) Merge

func (a MarkRunsRunning) Merge(b DbOperation) bool

type MarkRunsSucceeded

type MarkRunsSucceeded map[uuid.UUID]bool

func (MarkRunsSucceeded) CanBeAppliedBefore

func (a MarkRunsSucceeded) CanBeAppliedBefore(b DbOperation) bool

func (MarkRunsSucceeded) Merge

func (a MarkRunsSucceeded) Merge(b DbOperation) bool

type Nodeinfo

type Nodeinfo struct {
	ExecutorNodeName string    `db:"executor_node_name"`
	NodeName         string    `db:"node_name"`
	Executor         string    `db:"executor"`
	Message          []byte    `db:"message"`
	Serial           int64     `db:"serial"`
	LastModified     time.Time `db:"last_modified"`
}

type Pulsar

type Pulsar struct {
	Topic        string `db:"topic"`
	LedgerID     int64  `db:"ledger_id"`
	EntryID      int64  `db:"entry_id"`
	BatchIdx     int32  `db:"batch_idx"`
	PartitionIdx int32  `db:"partition_idx"`
}

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) GetTopicMessageIds

func (q *Queries) GetTopicMessageIds(ctx context.Context, topic string) ([]Pulsar, error)

func (*Queries) MarkJobCancelledById

func (q *Queries) MarkJobCancelledById(ctx context.Context, jobID uuid.UUID) error

Job cancellation

func (*Queries) MarkJobFailedById

func (q *Queries) MarkJobFailedById(ctx context.Context, jobID uuid.UUID) error

Job failed

func (*Queries) MarkJobRunCancelledByJobId

func (q *Queries) MarkJobRunCancelledByJobId(ctx context.Context, jobID uuid.UUID) error

Job run cancelled

func (*Queries) MarkJobRunFailedById

func (q *Queries) MarkJobRunFailedById(ctx context.Context, runID uuid.UUID) error

Job run failed

func (*Queries) MarkJobRunRunningById

func (q *Queries) MarkJobRunRunningById(ctx context.Context, runID uuid.UUID) error

Job run running

func (*Queries) MarkJobRunSucceededById

func (q *Queries) MarkJobRunSucceededById(ctx context.Context, runID uuid.UUID) error

Job run succeeded

func (*Queries) MarkJobRunsCancelledByJobId

func (q *Queries) MarkJobRunsCancelledByJobId(ctx context.Context, jobIds []uuid.UUID) error

func (*Queries) MarkJobRunsCancelledBySet

func (q *Queries) MarkJobRunsCancelledBySet(ctx context.Context, jobSet string) error

func (*Queries) MarkJobRunsCancelledBySets

func (q *Queries) MarkJobRunsCancelledBySets(ctx context.Context, jobSets []string) error

func (*Queries) MarkJobRunsFailedById

func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsRunningById

func (q *Queries) MarkJobRunsRunningById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsSucceededById

func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobSucceededById

func (q *Queries) MarkJobSucceededById(ctx context.Context, jobID uuid.UUID) error

Job succeeded

func (*Queries) MarkJobsCancelledById

func (q *Queries) MarkJobsCancelledById(ctx context.Context, jobIds []uuid.UUID) error

func (*Queries) MarkJobsCancelledBySet

func (q *Queries) MarkJobsCancelledBySet(ctx context.Context, jobSet string) error

func (*Queries) MarkJobsCancelledBySets

func (q *Queries) MarkJobsCancelledBySets(ctx context.Context, jobSets []string) error

func (*Queries) MarkJobsFailedById

func (q *Queries) MarkJobsFailedById(ctx context.Context, jobIds []uuid.UUID) error

func (*Queries) MarkJobsFailedBySet

func (q *Queries) MarkJobsFailedBySet(ctx context.Context, jobSet string) error

func (*Queries) MarkJobsFailedBySets

func (q *Queries) MarkJobsFailedBySets(ctx context.Context, jobSets []string) error

func (*Queries) MarkJobsSucceededById

func (q *Queries) MarkJobsSucceededById(ctx context.Context, jobIds []uuid.UUID) error

func (*Queries) MarkJobsSucceededBySet

func (q *Queries) MarkJobsSucceededBySet(ctx context.Context, jobSet string) error

func (*Queries) MarkJobsSucceededBySets

func (q *Queries) MarkJobsSucceededBySets(ctx context.Context, jobSets []string) error

func (*Queries) MarkRunsAsSent

func (q *Queries) MarkRunsAsSent(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkRunsAsSentByExecutorAndJobId

func (q *Queries) MarkRunsAsSentByExecutorAndJobId(ctx context.Context, arg MarkRunsAsSentByExecutorAndJobIdParams) error

func (*Queries) SelectJobErrorsById

func (q *Queries) SelectJobErrorsById(ctx context.Context, jobID uuid.UUID) ([]JobError, error)

Job errors

func (*Queries) SelectJobsFromIds

func (q *Queries) SelectJobsFromIds(ctx context.Context, jobIds []uuid.UUID) ([]Job, error)

Jobs

func (*Queries) SelectLeader

func (q *Queries) SelectLeader(ctx context.Context) (Leaderelection, error)

Leader election Return the row associated with the current leader. If due to a bug several rows are marked as leader, return the most recently modified one.

func (*Queries) SelectNewActiveJobs

func (q *Queries) SelectNewActiveJobs(ctx context.Context, serial int64) ([]Job, error)

func (*Queries) SelectNewJobErrors

func (q *Queries) SelectNewJobErrors(ctx context.Context, serial int64) ([]JobError, error)

func (*Queries) SelectNewJobs

func (q *Queries) SelectNewJobs(ctx context.Context, serial int64) ([]Job, error)

func (*Queries) SelectNewNodeInfo

func (q *Queries) SelectNewNodeInfo(ctx context.Context, serial int64) ([]Nodeinfo, error)

NodeInfo

func (*Queries) SelectNewRunAssignments

func (q *Queries) SelectNewRunAssignments(ctx context.Context, serial int64) ([]JobRunAssignment, error)

Job run assignments

func (*Queries) SelectNewRunErrors

func (q *Queries) SelectNewRunErrors(ctx context.Context, serial int64) ([]JobRunError, error)

func (*Queries) SelectNewRunsForExecutorWithLimit

func (q *Queries) SelectNewRunsForExecutorWithLimit(ctx context.Context, arg SelectNewRunsForExecutorWithLimitParams) ([]Run, error)

func (*Queries) SelectNewRunsForJobs

func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error)

func (*Queries) SelectQueueJobSetFromId

func (q *Queries) SelectQueueJobSetFromId(ctx context.Context, jobID uuid.UUID) (SelectQueueJobSetFromIdRow, error)

func (*Queries) SelectQueueJobSetFromIds

func (q *Queries) SelectQueueJobSetFromIds(ctx context.Context, jobIds []uuid.UUID) ([]SelectQueueJobSetFromIdsRow, error)

func (*Queries) SelectReplicaById

func (q *Queries) SelectReplicaById(ctx context.Context, id uuid.UUID) (Leaderelection, error)

func (*Queries) SelectRunErrorsById

func (q *Queries) SelectRunErrorsById(ctx context.Context, runID uuid.UUID) ([]JobRunError, error)

Run errors

func (*Queries) SelectRunsFromExecutorAndJobs

func (q *Queries) SelectRunsFromExecutorAndJobs(ctx context.Context, arg SelectRunsFromExecutorAndJobsParams) ([]Run, error)

func (*Queries) SelectUnsentRunsForExecutor

func (q *Queries) SelectUnsentRunsForExecutor(ctx context.Context, executor string) ([]Run, error)

Runs

func (*Queries) UpdateJobPriorityById

func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error

Job priority

func (*Queries) UpdateJobPriorityByJobSet

func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error

func (*Queries) UpsertMessageId

func (q *Queries) UpsertMessageId(ctx context.Context, arg UpsertMessageIdParams) error

func (*Queries) WithTx

func (q *Queries) WithTx(tx pgx.Tx) *Queries

type Queue

type Queue struct {
	Name   string  `db:"name"`
	Weight float64 `db:"weight"`
}

type Run

type Run struct {
	RunID          uuid.UUID `db:"run_id"`
	JobID          uuid.UUID `db:"job_id"`
	JobSet         string    `db:"job_set"`
	Executor       string    `db:"executor"`
	SentToExecutor bool      `db:"sent_to_executor"`
	Cancelled      bool      `db:"cancelled"`
	Running        bool      `db:"running"`
	Succeeded      bool      `db:"succeeded"`
	Failed         bool      `db:"failed"`
	Serial         int64     `db:"serial"`
	LastModified   time.Time `db:"last_modified"`
}

type Scheduler

type Scheduler struct {
	Producer pulsar.Producer
	Db       *pgxpool.Pool
	// Map from job id to job struct.
	// Contains all jobs the scheduler is aware of that have not terminated,
	// i.e., succeed, failed, or been cancelled.
	// Jobs are added when read from postgres.
	// Jobs are removed when they terminate
	ActiveJobs map[uuid.UUID]*Job
	// Map from job id to a collection of all runs associated with that job.
	RunsByJobId map[uuid.UUID]*JobRuns
	// The queue consists of all active jobs that don't have at least one active run associated with it.
	// Ids of all jobs that don't have at least one active
	// Ids of jobs that have not yet been scheduled.
	QueuedJobIds []uuid.UUID
	// List of worker nodes available across all clusters.
	Nodes map[string]*Nodeinfo
	// Map from executor name to the last time we heard from that executor.
	Executors map[string]time.Time
	// Amount of time after which an executor is assumed to be unavailable
	// if no updates have been received.
	ExecutorAliveDuration time.Duration
	// Each write into postgres is marked with an increasing serial number.
	// For each table, we store the largest number seen so far.
	// When reading from postgres, we select all rows with a serial number larger than what we've seen so far.
	//
	// The first record written to postgres for each table will have serial 1,
	// so these should be initialised to 0.
	JobsSerial               int64
	RunsSerial               int64
	JobRunsAssignmentsSerial int64
	JobErrorsSerial          int64
	JobRunErrorsSerial       int64
	NodesSerial              int64
	// Optional logger.
	// If not provided, the default logrus logger is used.
	Logger *logrus.Entry
}

Scheduler implements a trivial scheduling algorithm. It's here just to test that the scheduling subsystem as a whole is working.

func NewScheduler

func NewScheduler(producer pulsar.Producer, db *pgxpool.Pool) *Scheduler

func (*Scheduler) Run

func (srv *Scheduler) Run(ctx context.Context) error

type SelectNewRunsForExecutorWithLimitParams

type SelectNewRunsForExecutorWithLimitParams struct {
	Executor string `db:"executor"`
	Limit    int32  `db:"limit"`
}

type SelectNewRunsForJobsParams

type SelectNewRunsForJobsParams struct {
	Serial int64       `db:"serial"`
	JobIds []uuid.UUID `db:"job_ids"`
}

type SelectQueueJobSetFromIdRow

type SelectQueueJobSetFromIdRow struct {
	JobID  uuid.UUID `db:"job_id"`
	Queue  string    `db:"queue"`
	JobSet string    `db:"job_set"`
}

type SelectQueueJobSetFromIdsRow

type SelectQueueJobSetFromIdsRow struct {
	JobID  uuid.UUID `db:"job_id"`
	Queue  string    `db:"queue"`
	JobSet string    `db:"job_set"`
}

type SelectRunsFromExecutorAndJobsParams

type SelectRunsFromExecutorAndJobsParams struct {
	Executor string      `db:"executor"`
	JobIds   []uuid.UUID `db:"job_ids"`
}

type UpdateJobPriorities

type UpdateJobPriorities map[uuid.UUID]int64

func (UpdateJobPriorities) CanBeAppliedBefore

func (a UpdateJobPriorities) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobPriorities) Merge

type UpdateJobPriorityByIdParams

type UpdateJobPriorityByIdParams struct {
	Priority int64     `db:"priority"`
	JobID    uuid.UUID `db:"job_id"`
}

type UpdateJobPriorityByJobSetParams

type UpdateJobPriorityByJobSetParams struct {
	Priority int64  `db:"priority"`
	JobSet   string `db:"job_set"`
}

type UpdateJobSetPriorities

type UpdateJobSetPriorities map[string]int64

func (UpdateJobSetPriorities) AffectsJobSet

func (a UpdateJobSetPriorities) AffectsJobSet(jobSet string) bool

func (UpdateJobSetPriorities) CanBeAppliedBefore

func (a UpdateJobSetPriorities) CanBeAppliedBefore(b DbOperation) bool

func (UpdateJobSetPriorities) Merge

type UpsertMessageIdParams

type UpsertMessageIdParams struct {
	Topic        string `db:"topic"`
	LedgerID     int64  `db:"ledger_id"`
	EntryID      int64  `db:"entry_id"`
	BatchIdx     int32  `db:"batch_idx"`
	PartitionIdx int32  `db:"partition_idx"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL