database

package
v0.3.79-rc-9a1ab98 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 5, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(ctx context.Context, db pgxtype.Querier) error

func PruneDb added in v0.3.47

func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion time.Duration, clock clock.Clock) error

PruneDb removes completed jobs (and related runs and errors) from the database if their `lastUpdateTime` is more than `keepAfterCompletion` in the past. Jobs are deleted in batches across transactions. This means that if this job fails midway through, it still may have deleted some jobs. The function will run until the supplied context is cancelled.

func WithTestDb

func WithTestDb(action func(queries *Queries, db *pgxpool.Pool) error) error

Types

type DBTX

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
}

type Executor

type Executor struct {
	ExecutorID  string    `db:"executor_id"`
	LastRequest []byte    `db:"last_request"`
	LastUpdated time.Time `db:"last_updated"`
}

type ExecutorRepository

type ExecutorRepository interface {
	// GetExecutors returns all known executors, regardless of their last heartbeat time
	GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error)
	// GetLastUpdateTimes returns a map of executor name -> last heartbeat time
	GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)
	// StoreExecutor persists the latest executor state
	StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error
}

ExecutorRepository is an interface to be implemented by structs which provide executor information

type InsertMarkerParams added in v0.3.48

type InsertMarkerParams struct {
	GroupID     uuid.UUID `db:"group_id"`
	PartitionID int32     `db:"partition_id"`
	Created     time.Time `db:"created"`
}

type Job

type Job struct {
	JobID                   string    `db:"job_id"`
	JobSet                  string    `db:"job_set"`
	Queue                   string    `db:"queue"`
	UserID                  string    `db:"user_id"`
	Submitted               int64     `db:"submitted"`
	Groups                  []byte    `db:"groups"`
	Priority                int64     `db:"priority"`
	Queued                  bool      `db:"queued"`
	QueuedVersion           int32     `db:"queued_version"`
	CancelRequested         bool      `db:"cancel_requested"`
	Cancelled               bool      `db:"cancelled"`
	CancelByJobsetRequested bool      `db:"cancel_by_jobset_requested"`
	Succeeded               bool      `db:"succeeded"`
	Failed                  bool      `db:"failed"`
	SubmitMessage           []byte    `db:"submit_message"`
	SchedulingInfo          []byte    `db:"scheduling_info"`
	SchedulingInfoVersion   int32     `db:"scheduling_info_version"`
	Serial                  int64     `db:"serial"`
	LastModified            time.Time `db:"last_modified"`
}

func (Job) GetSerial

func (job Job) GetSerial() int64

GetSerial is needed for the HasSerial interface

func (Job) InTerminalState

func (job Job) InTerminalState() bool

InTerminalState returns true if Job is in a terminal state

type JobRepository

type JobRepository interface {
	// FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively
	// These updates are guaranteed to be consistent with each other
	FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)

	// FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids.  The returned map is
	// keyed by job run id.  Any dbRuns which don't have errors wil be absent from the map.
	FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error)

	// CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding
	// to the provided groupId.  This is used by the scheduler to determine if the database represents the state of
	// pulsar after a given point in time.
	CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)

	// FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active
	// Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled
	FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)

	// FetchJobRunLeases fetches new job runs for a given executor.  A maximum of maxResults rows will be returned, while run
	// in excludedRunIds will be excluded
	FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error)
}

JobRepository is an interface to be implemented by structs which provide job and run information

type JobRunError

type JobRunError struct {
	RunID uuid.UUID `db:"run_id"`
	JobID string    `db:"job_id"`
	Error []byte    `db:"error"`
}

type JobRunLease

type JobRunLease struct {
	RunID         uuid.UUID
	Queue         string
	JobSet        string
	UserID        string
	Node          string
	Groups        []byte
	SubmitMessage []byte
}

type LegacyQueueRepository added in v0.3.47

type LegacyQueueRepository struct {
	// contains filtered or unexported fields
}

LegacyQueueRepository is a QueueRepository which is backed by Armada's redis store

func NewLegacyQueueRepository added in v0.3.47

func NewLegacyQueueRepository(db redis.UniversalClient) *LegacyQueueRepository

func (*LegacyQueueRepository) GetAllQueues added in v0.3.47

func (r *LegacyQueueRepository) GetAllQueues() ([]*Queue, error)

type MarkJobsCancelRequestedBySetAndQueuedStateParams added in v0.3.68

type MarkJobsCancelRequestedBySetAndQueuedStateParams struct {
	JobSet       string `db:"job_set"`
	Queue        string `db:"queue"`
	QueuedStates []bool `db:"queued_states"`
}

type Marker

type Marker struct {
	GroupID     uuid.UUID `db:"group_id"`
	PartitionID int32     `db:"partition_id"`
	Created     time.Time `db:"created"`
}

type PostgresExecutorRepository

type PostgresExecutorRepository struct {
	// contains filtered or unexported fields
}

PostgresExecutorRepository is an implementation of ExecutorRepository that stores its state in postgres

func NewPostgresExecutorRepository

func NewPostgresExecutorRepository(db *pgxpool.Pool) *PostgresExecutorRepository

func (*PostgresExecutorRepository) GetExecutors

GetExecutors returns all known executors, regardless of their last heartbeat time

func (*PostgresExecutorRepository) GetLastUpdateTimes

func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)

GetLastUpdateTimes returns a map of executor name -> last heartbeat time

func (*PostgresExecutorRepository) StoreExecutor

func (r *PostgresExecutorRepository) StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error

StoreExecutor persists the latest executor state

type PostgresJobRepository

type PostgresJobRepository struct {
	// contains filtered or unexported fields
}

PostgresJobRepository is an implementation of JobRepository that stores its state in postgres

func NewPostgresJobRepository

func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32) *PostgresJobRepository

func (*PostgresJobRepository) CountReceivedPartitions

func (r *PostgresJobRepository) CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)

CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding to the provided groupId. This is used by the scheduler to determine if the database represents the state of pulsar after a given point in time.

func (*PostgresJobRepository) FetchJobRunErrors

func (r *PostgresJobRepository) FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.Error, error)

FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is keyed by job run id. Any dbRuns which don't have errors wil be absent from the map.

func (*PostgresJobRepository) FetchJobRunLeases

func (r *PostgresJobRepository) FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error)

FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run in excludedRunIds will be excluded

func (*PostgresJobRepository) FetchJobUpdates

func (r *PostgresJobRepository) FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)

FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively These updates are guaranteed to be consistent with each other

func (*PostgresJobRepository) FindInactiveRuns

func (r *PostgresJobRepository) FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)

FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) CountGroup

func (q *Queries) CountGroup(ctx context.Context, groupID uuid.UUID) (int64, error)

func (*Queries) DeleteOldMarkers added in v0.3.47

func (q *Queries) DeleteOldMarkers(ctx context.Context, cutoff time.Time) error

func (*Queries) FindActiveRuns

func (q *Queries) FindActiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)

func (*Queries) InsertMarker added in v0.3.48

func (q *Queries) InsertMarker(ctx context.Context, arg InsertMarkerParams) error

func (*Queries) MarkJobRunsAttemptedById added in v0.3.63

func (q *Queries) MarkJobRunsAttemptedById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsFailedById

func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsReturnedById added in v0.3.47

func (q *Queries) MarkJobRunsReturnedById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsRunningById

func (q *Queries) MarkJobRunsRunningById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsSucceededById

func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobsCancelRequestedById added in v0.3.47

func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, jobIds []string) error

func (*Queries) MarkJobsCancelRequestedBySetAndQueuedState added in v0.3.68

func (q *Queries) MarkJobsCancelRequestedBySetAndQueuedState(ctx context.Context, arg MarkJobsCancelRequestedBySetAndQueuedStateParams) error

func (*Queries) MarkJobsCancelledById

func (q *Queries) MarkJobsCancelledById(ctx context.Context, jobIds []string) error

func (*Queries) MarkJobsFailedById

func (q *Queries) MarkJobsFailedById(ctx context.Context, jobIds []string) error

func (*Queries) MarkJobsSucceededById

func (q *Queries) MarkJobsSucceededById(ctx context.Context, jobIds []string) error

func (*Queries) MarkRunsCancelledByJobId added in v0.3.63

func (q *Queries) MarkRunsCancelledByJobId(ctx context.Context, jobIds []string) error

func (*Queries) SelectAllExecutors

func (q *Queries) SelectAllExecutors(ctx context.Context) ([]Executor, error)

func (*Queries) SelectAllJobIds added in v0.3.47

func (q *Queries) SelectAllJobIds(ctx context.Context) ([]string, error)

func (*Queries) SelectAllMarkers added in v0.3.47

func (q *Queries) SelectAllMarkers(ctx context.Context) ([]Marker, error)

func (*Queries) SelectAllRunErrors added in v0.3.47

func (q *Queries) SelectAllRunErrors(ctx context.Context) ([]JobRunError, error)

func (*Queries) SelectAllRunIds added in v0.3.47

func (q *Queries) SelectAllRunIds(ctx context.Context) ([]uuid.UUID, error)

func (*Queries) SelectExecutorUpdateTimes

func (q *Queries) SelectExecutorUpdateTimes(ctx context.Context) ([]SelectExecutorUpdateTimesRow, error)

func (*Queries) SelectJobsForExecutor

func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error)

func (*Queries) SelectNewJobs

func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([]Job, error)

func (*Queries) SelectNewRuns

func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([]Run, error)

func (*Queries) SelectNewRunsForJobs

func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error)

func (*Queries) SelectRunErrorsById

func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []uuid.UUID) ([]JobRunError, error)

Run errors

func (*Queries) SelectUpdatedJobs

func (q *Queries) SelectUpdatedJobs(ctx context.Context, arg SelectUpdatedJobsParams) ([]SelectUpdatedJobsRow, error)

func (*Queries) UpdateJobPriorityById

func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error

func (*Queries) UpdateJobPriorityByJobSet

func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error

func (*Queries) UpsertExecutor

func (q *Queries) UpsertExecutor(ctx context.Context, arg UpsertExecutorParams) error

func (*Queries) WithTx

func (q *Queries) WithTx(tx pgx.Tx) *Queries

type Queue

type Queue struct {
	Name   string  `db:"name"`
	Weight float64 `db:"weight"`
}

type QueueRepository

type QueueRepository interface {
	GetAllQueues() ([]*Queue, error)
}

QueueRepository is an interface to be implemented by structs which provide queue information

type RedisExecutorRepository added in v0.3.49

type RedisExecutorRepository struct {
	// contains filtered or unexported fields
}

func NewRedisExecutorRepository added in v0.3.49

func NewRedisExecutorRepository(db redis.UniversalClient, schedulerName string) *RedisExecutorRepository

func (*RedisExecutorRepository) GetExecutors added in v0.3.49

func (*RedisExecutorRepository) GetLastUpdateTimes added in v0.3.49

func (r *RedisExecutorRepository) GetLastUpdateTimes(_ context.Context) (map[string]time.Time, error)

func (*RedisExecutorRepository) StoreExecutor added in v0.3.49

func (r *RedisExecutorRepository) StoreExecutor(_ context.Context, executor *schedulerobjects.Executor) error

type Run

type Run struct {
	RunID        uuid.UUID `db:"run_id"`
	JobID        string    `db:"job_id"`
	Created      int64     `db:"created"`
	JobSet       string    `db:"job_set"`
	Executor     string    `db:"executor"`
	Node         string    `db:"node"`
	Cancelled    bool      `db:"cancelled"`
	Running      bool      `db:"running"`
	Succeeded    bool      `db:"succeeded"`
	Failed       bool      `db:"failed"`
	Returned     bool      `db:"returned"`
	RunAttempted bool      `db:"run_attempted"`
	Serial       int64     `db:"serial"`
	LastModified time.Time `db:"last_modified"`
}

func (Run) GetSerial

func (run Run) GetSerial() int64

GetSerial is needed for the HasSerial interface

type SelectExecutorUpdateTimesRow

type SelectExecutorUpdateTimesRow struct {
	ExecutorID  string    `db:"executor_id"`
	LastUpdated time.Time `db:"last_updated"`
}

type SelectJobsForExecutorParams

type SelectJobsForExecutorParams struct {
	Executor string      `db:"executor"`
	RunIds   []uuid.UUID `db:"run_ids"`
}

type SelectJobsForExecutorRow

type SelectJobsForExecutorRow struct {
	RunID         uuid.UUID `db:"run_id"`
	Queue         string    `db:"queue"`
	JobSet        string    `db:"job_set"`
	UserID        string    `db:"user_id"`
	Groups        []byte    `db:"groups"`
	SubmitMessage []byte    `db:"submit_message"`
}

type SelectNewJobsParams

type SelectNewJobsParams struct {
	Serial int64 `db:"serial"`
	Limit  int32 `db:"limit"`
}

type SelectNewRunsForJobsParams

type SelectNewRunsForJobsParams struct {
	Serial int64    `db:"serial"`
	JobIds []string `db:"job_ids"`
}

type SelectNewRunsParams

type SelectNewRunsParams struct {
	Serial int64 `db:"serial"`
	Limit  int32 `db:"limit"`
}

type SelectUpdatedJobsParams

type SelectUpdatedJobsParams struct {
	Serial int64 `db:"serial"`
	Limit  int32 `db:"limit"`
}

type SelectUpdatedJobsRow

type SelectUpdatedJobsRow struct {
	JobID                   string `db:"job_id"`
	JobSet                  string `db:"job_set"`
	Queue                   string `db:"queue"`
	Priority                int64  `db:"priority"`
	Submitted               int64  `db:"submitted"`
	Queued                  bool   `db:"queued"`
	QueuedVersion           int32  `db:"queued_version"`
	CancelRequested         bool   `db:"cancel_requested"`
	CancelByJobsetRequested bool   `db:"cancel_by_jobset_requested"`
	Cancelled               bool   `db:"cancelled"`
	Succeeded               bool   `db:"succeeded"`
	Failed                  bool   `db:"failed"`
	SchedulingInfo          []byte `db:"scheduling_info"`
	SchedulingInfoVersion   int32  `db:"scheduling_info_version"`
	Serial                  int64  `db:"serial"`
}

func (SelectUpdatedJobsRow) GetSerial

func (row SelectUpdatedJobsRow) GetSerial() int64

GetSerial is needed for the HasSerial interface

type UpdateJobPriorityByIdParams

type UpdateJobPriorityByIdParams struct {
	Priority int64  `db:"priority"`
	JobID    string `db:"job_id"`
}

type UpdateJobPriorityByJobSetParams

type UpdateJobPriorityByJobSetParams struct {
	Priority int64  `db:"priority"`
	JobSet   string `db:"job_set"`
	Queue    string `db:"queue"`
}

type UpsertExecutorParams

type UpsertExecutorParams struct {
	ExecutorID  string    `db:"executor_id"`
	LastRequest []byte    `db:"last_request"`
	UpdateTime  time.Time `db:"update_time"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL