Documentation ¶
Index ¶
- func Migrate(ctx context.Context, db pgxtype.Querier) error
- func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, ...) error
- func WithTestDb(action func(queries *Queries, db *pgxpool.Pool) error) error
- type DBTX
- type Executor
- type ExecutorRepository
- type InsertMarkerParams
- type Job
- type JobRepository
- type JobRunError
- type JobRunLease
- type LegacyQueueRepository
- type Marker
- type PostgresExecutorRepository
- func (r *PostgresExecutorRepository) GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error)
- func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)
- func (r *PostgresExecutorRepository) StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error
- type PostgresJobRepository
- func (r *PostgresJobRepository) CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)
- func (r *PostgresJobRepository) FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error)
- func (r *PostgresJobRepository) FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, ...) ([]*JobRunLease, error)
- func (r *PostgresJobRepository) FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)
- func (r *PostgresJobRepository) FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
- type Queries
- func (q *Queries) CountGroup(ctx context.Context, groupID uuid.UUID) (int64, error)
- func (q *Queries) DeleteOldMarkers(ctx context.Context, cutoff time.Time) error
- func (q *Queries) FindActiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
- func (q *Queries) InsertMarker(ctx context.Context, arg InsertMarkerParams) error
- func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsReturnedById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsRunningById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []uuid.UUID) error
- func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, jobIds []string) error
- func (q *Queries) MarkJobsCancelRequestedBySets(ctx context.Context, jobSets []string) error
- func (q *Queries) MarkJobsCancelledById(ctx context.Context, jobIds []string) error
- func (q *Queries) MarkJobsFailedById(ctx context.Context, jobIds []string) error
- func (q *Queries) MarkJobsSucceededById(ctx context.Context, jobIds []string) error
- func (q *Queries) SelectAllExecutors(ctx context.Context) ([]Executor, error)
- func (q *Queries) SelectAllJobIds(ctx context.Context) ([]string, error)
- func (q *Queries) SelectAllMarkers(ctx context.Context) ([]Marker, error)
- func (q *Queries) SelectAllRunErrors(ctx context.Context) ([]JobRunError, error)
- func (q *Queries) SelectAllRunIds(ctx context.Context) ([]uuid.UUID, error)
- func (q *Queries) SelectExecutorUpdateTimes(ctx context.Context) ([]SelectExecutorUpdateTimesRow, error)
- func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error)
- func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([]Job, error)
- func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([]Run, error)
- func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error)
- func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []uuid.UUID) ([]JobRunError, error)
- func (q *Queries) SelectUpdatedJobs(ctx context.Context, arg SelectUpdatedJobsParams) ([]SelectUpdatedJobsRow, error)
- func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error
- func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error
- func (q *Queries) UpsertExecutor(ctx context.Context, arg UpsertExecutorParams) error
- func (q *Queries) WithTx(tx pgx.Tx) *Queries
- type Queue
- type QueueRepository
- type RedisExecutorRepository
- func (r *RedisExecutorRepository) GetExecutors(_ context.Context) ([]*schedulerobjects.Executor, error)
- func (r *RedisExecutorRepository) GetLastUpdateTimes(_ context.Context) (map[string]time.Time, error)
- func (r *RedisExecutorRepository) StoreExecutor(_ context.Context, executor *schedulerobjects.Executor) error
- type Run
- type SelectExecutorUpdateTimesRow
- type SelectJobsForExecutorParams
- type SelectJobsForExecutorRow
- type SelectNewJobsParams
- type SelectNewRunsForJobsParams
- type SelectNewRunsParams
- type SelectUpdatedJobsParams
- type SelectUpdatedJobsRow
- type UpdateJobPriorityByIdParams
- type UpdateJobPriorityByJobSetParams
- type UpsertExecutorParams
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PruneDb ¶ added in v0.3.47
func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion time.Duration, clock clock.Clock) error
PruneDb removes completed jobs (and related runs and errors) from the database if their `lastUpdateTime` is more than `keepAfterCompletion` in the past. Jobs are deleted in batches across transactions. This means that if this job fails midway through, it still may have deleted some jobs. The function will run until the supplied context is cancelled.
Types ¶
type ExecutorRepository ¶
type ExecutorRepository interface { // GetExecutors returns all known executors, regardless of their last heartbeat time GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error) // GetLastUpdateTimes returns a map of executor name -> last heartbeat time GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error) // StoreExecutor persists the latest executor state StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error }
ExecutorRepository is an interface to be implemented by structs which provide executor information
type InsertMarkerParams ¶ added in v0.3.48
type Job ¶
type Job struct { JobID string `db:"job_id"` JobSet string `db:"job_set"` Queue string `db:"queue"` UserID string `db:"user_id"` Submitted int64 `db:"submitted"` Groups []byte `db:"groups"` Priority int64 `db:"priority"` CancelRequested bool `db:"cancel_requested"` Cancelled bool `db:"cancelled"` CancelByJobsetRequested bool `db:"cancel_by_jobset_requested"` Succeeded bool `db:"succeeded"` Failed bool `db:"failed"` SubmitMessage []byte `db:"submit_message"` SchedulingInfo []byte `db:"scheduling_info"` Serial int64 `db:"serial"` LastModified time.Time `db:"last_modified"` }
func (Job) InTerminalState ¶
InTerminalState returns true if Job is in a terminal state
type JobRepository ¶
type JobRepository interface { // FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively // These updates are guaranteed to be consistent with each other FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error) // FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is // keyed by job run id. Any dbRuns which don't have errors wil be absent from the map. FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error) // CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding // to the provided groupId. This is used by the scheduler to determine if the database represents the state of // pulsar after a given point in time. CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error) // FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active // Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error) // FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run // in excludedRunIds will be excluded FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error) }
JobRepository is an interface to be implemented by structs which provide job and run information
type JobRunError ¶
type JobRunLease ¶
type LegacyQueueRepository ¶ added in v0.3.47
type LegacyQueueRepository struct {
// contains filtered or unexported fields
}
LegacyQueueRepository is a QueueRepository which is backed by Armada's redis store
func NewLegacyQueueRepository ¶ added in v0.3.47
func NewLegacyQueueRepository(db redis.UniversalClient) *LegacyQueueRepository
func (*LegacyQueueRepository) GetAllQueues ¶ added in v0.3.47
func (r *LegacyQueueRepository) GetAllQueues() ([]*Queue, error)
type PostgresExecutorRepository ¶
type PostgresExecutorRepository struct {
// contains filtered or unexported fields
}
PostgresExecutorRepository is an implementation of ExecutorRepository that stores its state in postgres
func NewPostgresExecutorRepository ¶
func NewPostgresExecutorRepository(db *pgxpool.Pool) *PostgresExecutorRepository
func (*PostgresExecutorRepository) GetExecutors ¶
func (r *PostgresExecutorRepository) GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error)
GetExecutors returns all known executors, regardless of their last heartbeat time
func (*PostgresExecutorRepository) GetLastUpdateTimes ¶
func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)
GetLastUpdateTimes returns a map of executor name -> last heartbeat time
func (*PostgresExecutorRepository) StoreExecutor ¶
func (r *PostgresExecutorRepository) StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error
StoreExecutor persists the latest executor state
type PostgresJobRepository ¶
type PostgresJobRepository struct {
// contains filtered or unexported fields
}
PostgresJobRepository is an implementation of JobRepository that stores its state in postgres
func NewPostgresJobRepository ¶
func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32) *PostgresJobRepository
func (*PostgresJobRepository) CountReceivedPartitions ¶
func (r *PostgresJobRepository) CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)
CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding to the provided groupId. This is used by the scheduler to determine if the database represents the state of pulsar after a given point in time.
func (*PostgresJobRepository) FetchJobRunErrors ¶
func (r *PostgresJobRepository) FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error)
FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is keyed by job run id. Any dbRuns which don't have errors wil be absent from the map.
func (*PostgresJobRepository) FetchJobRunLeases ¶
func (r *PostgresJobRepository) FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error)
FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run in excludedRunIds will be excluded
func (*PostgresJobRepository) FetchJobUpdates ¶
func (r *PostgresJobRepository) FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)
FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively These updates are guaranteed to be consistent with each other
func (*PostgresJobRepository) FindInactiveRuns ¶
func (r *PostgresJobRepository) FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)
FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled
type Queries ¶
type Queries struct {
// contains filtered or unexported fields
}
func (*Queries) CountGroup ¶
func (*Queries) DeleteOldMarkers ¶ added in v0.3.47
func (*Queries) FindActiveRuns ¶
func (*Queries) InsertMarker ¶ added in v0.3.48
func (q *Queries) InsertMarker(ctx context.Context, arg InsertMarkerParams) error
func (*Queries) MarkJobRunsFailedById ¶
func (*Queries) MarkJobRunsReturnedById ¶ added in v0.3.47
func (*Queries) MarkJobRunsRunningById ¶
func (*Queries) MarkJobRunsSucceededById ¶
func (*Queries) MarkJobsCancelRequestedById ¶ added in v0.3.47
func (*Queries) MarkJobsCancelRequestedBySets ¶ added in v0.3.47
func (*Queries) MarkJobsCancelledById ¶
func (*Queries) MarkJobsFailedById ¶
func (*Queries) MarkJobsSucceededById ¶
func (*Queries) SelectAllExecutors ¶
func (*Queries) SelectAllJobIds ¶ added in v0.3.47
func (*Queries) SelectAllMarkers ¶ added in v0.3.47
func (*Queries) SelectAllRunErrors ¶ added in v0.3.47
func (q *Queries) SelectAllRunErrors(ctx context.Context) ([]JobRunError, error)
func (*Queries) SelectAllRunIds ¶ added in v0.3.47
func (*Queries) SelectExecutorUpdateTimes ¶
func (q *Queries) SelectExecutorUpdateTimes(ctx context.Context) ([]SelectExecutorUpdateTimesRow, error)
func (*Queries) SelectJobsForExecutor ¶
func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error)
func (*Queries) SelectNewJobs ¶
func (*Queries) SelectNewRuns ¶
func (*Queries) SelectNewRunsForJobs ¶
func (*Queries) SelectRunErrorsById ¶
func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []uuid.UUID) ([]JobRunError, error)
Run errors
func (*Queries) SelectUpdatedJobs ¶
func (q *Queries) SelectUpdatedJobs(ctx context.Context, arg SelectUpdatedJobsParams) ([]SelectUpdatedJobsRow, error)
func (*Queries) UpdateJobPriorityById ¶
func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error
func (*Queries) UpdateJobPriorityByJobSet ¶
func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error
func (*Queries) UpsertExecutor ¶
func (q *Queries) UpsertExecutor(ctx context.Context, arg UpsertExecutorParams) error
type QueueRepository ¶
QueueRepository is an interface to be implemented by structs which provide queue information
type RedisExecutorRepository ¶ added in v0.3.49
type RedisExecutorRepository struct {
// contains filtered or unexported fields
}
func NewRedisExecutorRepository ¶ added in v0.3.49
func NewRedisExecutorRepository(db redis.UniversalClient, schedulerName string) *RedisExecutorRepository
func (*RedisExecutorRepository) GetExecutors ¶ added in v0.3.49
func (r *RedisExecutorRepository) GetExecutors(_ context.Context) ([]*schedulerobjects.Executor, error)
func (*RedisExecutorRepository) GetLastUpdateTimes ¶ added in v0.3.49
func (*RedisExecutorRepository) StoreExecutor ¶ added in v0.3.49
func (r *RedisExecutorRepository) StoreExecutor(_ context.Context, executor *schedulerobjects.Executor) error
type Run ¶
type Run struct { RunID uuid.UUID `db:"run_id"` JobID string `db:"job_id"` Created int64 `db:"created"` JobSet string `db:"job_set"` Executor string `db:"executor"` Node string `db:"node"` Cancelled bool `db:"cancelled"` Running bool `db:"running"` Succeeded bool `db:"succeeded"` Failed bool `db:"failed"` Returned bool `db:"returned"` Serial int64 `db:"serial"` LastModified time.Time `db:"last_modified"` }
type SelectNewJobsParams ¶
type SelectNewRunsParams ¶
type SelectUpdatedJobsParams ¶
type SelectUpdatedJobsRow ¶
type SelectUpdatedJobsRow struct { JobID string `db:"job_id"` JobSet string `db:"job_set"` Queue string `db:"queue"` Priority int64 `db:"priority"` Submitted int64 `db:"submitted"` CancelRequested bool `db:"cancel_requested"` CancelByJobsetRequested bool `db:"cancel_by_jobset_requested"` Cancelled bool `db:"cancelled"` Succeeded bool `db:"succeeded"` Failed bool `db:"failed"` SchedulingInfo []byte `db:"scheduling_info"` Serial int64 `db:"serial"` }
func (SelectUpdatedJobsRow) GetSerial ¶
func (row SelectUpdatedJobsRow) GetSerial() int64
GetSerial is needed for the HasSerial interface