database

package
v0.3.47 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(ctx context.Context, db pgxtype.Querier) error

func PruneDb added in v0.3.47

func PruneDb(ctx ctx.Context, db *pgx.Conn, batchLimit int, keepAfterCompletion time.Duration, clock clock.Clock) error

PruneDb removes completed jobs (and related runs and errors) from the database if their `lastUpdateTime` is more than `keepAfterCompletion` in the past. Jobs are deleted in batches across transactions. This means that if this job fails midway through, it still may have deleted some jobs. The function will run until the supplied context is cancelled.

func WithTestDb

func WithTestDb(action func(queries *Queries, db *pgxpool.Pool) error) error

Types

type DBTX

type DBTX interface {
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
}

type Executor

type Executor struct {
	ExecutorID  string    `db:"executor_id"`
	LastRequest []byte    `db:"last_request"`
	LastUpdated time.Time `db:"last_updated"`
}

type ExecutorRepository

type ExecutorRepository interface {
	// GetExecutors returns all known executors, regardless of their last heartbeat time
	GetExecutors(ctx context.Context) ([]*schedulerobjects.Executor, error)
	// GetLastUpdateTimes returns a map of executor name -> last heartbeat time
	GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)
	// StoreExecutor persists the latest executor state
	StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error
}

ExecutorRepository is an interface to be implemented by structs which provide executor information

type Job

type Job struct {
	JobID           string    `db:"job_id"`
	JobSet          string    `db:"job_set"`
	Queue           string    `db:"queue"`
	UserID          string    `db:"user_id"`
	Submitted       int64     `db:"submitted"`
	Groups          []byte    `db:"groups"`
	Priority        int64     `db:"priority"`
	CancelRequested bool      `db:"cancel_requested"`
	Cancelled       bool      `db:"cancelled"`
	Succeeded       bool      `db:"succeeded"`
	Failed          bool      `db:"failed"`
	SubmitMessage   []byte    `db:"submit_message"`
	SchedulingInfo  []byte    `db:"scheduling_info"`
	Serial          int64     `db:"serial"`
	LastModified    time.Time `db:"last_modified"`
}

func (Job) GetSerial

func (job Job) GetSerial() int64

GetSerial is needed for the HasSerial interface

func (Job) InTerminalState

func (job Job) InTerminalState() bool

InTerminalState returns true if Job is in a terminal state

type JobRepository

type JobRepository interface {
	// FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively
	// These updates are guaranteed to be consistent with each other
	FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)

	// FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids.  The returned map is
	// keyed by job run id.  Any dbRuns which don't have errors wil be absent from the map.
	FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.JobRunErrors, error)

	// CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding
	// to the provided groupId.  This is used by the scheduler to determine if the database represents the state of
	// pulsar after a given point in time.
	CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)

	// FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active
	// Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled
	FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)

	// FetchJobRunLeases fetches new job runs for a given executor.  A maximum of maxResults rows will be returned, while run
	// in excludedRunIds will be excluded
	FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error)
}

JobRepository is an interface to be implemented by structs which provide job and run information

type JobRunError

type JobRunError struct {
	RunID uuid.UUID `db:"run_id"`
	JobID string    `db:"job_id"`
	Error []byte    `db:"error"`
}

type JobRunLease

type JobRunLease struct {
	RunID         uuid.UUID
	Queue         string
	JobSet        string
	UserID        string
	Groups        []byte
	SubmitMessage []byte
}

type LegacyQueueRepository added in v0.3.47

type LegacyQueueRepository struct {
	// contains filtered or unexported fields
}

LegacyQueueRepository is a QueueRepository which is backed by Armada's redis store

func NewLegacyQueueRepository added in v0.3.47

func NewLegacyQueueRepository(db redis.UniversalClient) *LegacyQueueRepository

func (*LegacyQueueRepository) GetAllQueues added in v0.3.47

func (r *LegacyQueueRepository) GetAllQueues() ([]*Queue, error)

type Marker

type Marker struct {
	GroupID     uuid.UUID `db:"group_id"`
	PartitionID int32     `db:"partition_id"`
	Created     time.Time `db:"created"`
}

type PostgresExecutorRepository

type PostgresExecutorRepository struct {
	// contains filtered or unexported fields
}

PostgresExecutorRepository is an implementation of ExecutorRepository that stores its state in postgres

func NewPostgresExecutorRepository

func NewPostgresExecutorRepository(db *pgxpool.Pool) *PostgresExecutorRepository

func (*PostgresExecutorRepository) GetExecutors

GetExecutors returns all known executors, regardless of their last heartbeat time

func (*PostgresExecutorRepository) GetLastUpdateTimes

func (r *PostgresExecutorRepository) GetLastUpdateTimes(ctx context.Context) (map[string]time.Time, error)

GetLastUpdateTimes returns a map of executor name -> last heartbeat time

func (*PostgresExecutorRepository) StoreExecutor

func (r *PostgresExecutorRepository) StoreExecutor(ctx context.Context, executor *schedulerobjects.Executor) error

StoreExecutor persists the latest executor state

type PostgresJobRepository

type PostgresJobRepository struct {
	// contains filtered or unexported fields
}

PostgresJobRepository is an implementation of JobRepository that stores its state in postgres

func NewPostgresJobRepository

func NewPostgresJobRepository(db *pgxpool.Pool, batchSize int32) *PostgresJobRepository

func (*PostgresJobRepository) CountReceivedPartitions

func (r *PostgresJobRepository) CountReceivedPartitions(ctx context.Context, groupId uuid.UUID) (uint32, error)

CountReceivedPartitions returns a count of the number of partition messages present in the database corresponding to the provided groupId. This is used by the scheduler to determine if the database represents the state of pulsar after a given point in time.

func (*PostgresJobRepository) FetchJobRunErrors

func (r *PostgresJobRepository) FetchJobRunErrors(ctx context.Context, runIds []uuid.UUID) (map[uuid.UUID]*armadaevents.JobRunErrors, error)

FetchJobRunErrors returns all armadaevents.JobRunErrors for the provided job run ids. The returned map is keyed by job run id. Any dbRuns which don't have errors wil be absent from the map.

func (*PostgresJobRepository) FetchJobRunLeases

func (r *PostgresJobRepository) FetchJobRunLeases(ctx context.Context, executor string, maxResults uint, excludedRunIds []uuid.UUID) ([]*JobRunLease, error)

FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run in excludedRunIds will be excluded

func (*PostgresJobRepository) FetchJobUpdates

func (r *PostgresJobRepository) FetchJobUpdates(ctx context.Context, jobSerial int64, jobRunSerial int64) ([]Job, []Run, error)

FetchJobUpdates returns all jobs and job dbRuns that have been updated after jobSerial and jobRunSerial respectively These updates are guaranteed to be consistent with each other

func (*PostgresJobRepository) FindInactiveRuns

func (r *PostgresJobRepository) FindInactiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)

FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled

type Queries

type Queries struct {
	// contains filtered or unexported fields
}

func New

func New(db DBTX) *Queries

func (*Queries) CountGroup

func (q *Queries) CountGroup(ctx context.Context, groupID uuid.UUID) (int64, error)

func (*Queries) DeleteOldMarkers added in v0.3.47

func (q *Queries) DeleteOldMarkers(ctx context.Context, cutoff time.Time) error

func (*Queries) FindActiveRuns

func (q *Queries) FindActiveRuns(ctx context.Context, runIds []uuid.UUID) ([]uuid.UUID, error)

func (*Queries) MarkJobRunsFailedById

func (q *Queries) MarkJobRunsFailedById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsReturnedById added in v0.3.47

func (q *Queries) MarkJobRunsReturnedById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsRunningById

func (q *Queries) MarkJobRunsRunningById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobRunsSucceededById

func (q *Queries) MarkJobRunsSucceededById(ctx context.Context, runIds []uuid.UUID) error

func (*Queries) MarkJobsCancelRequestedById added in v0.3.47

func (q *Queries) MarkJobsCancelRequestedById(ctx context.Context, jobIds []string) error

func (*Queries) MarkJobsCancelRequestedBySets added in v0.3.47

func (q *Queries) MarkJobsCancelRequestedBySets(ctx context.Context, jobSets []string) error

func (*Queries) MarkJobsCancelledById

func (q *Queries) MarkJobsCancelledById(ctx context.Context, jobIds []string) error

func (*Queries) MarkJobsFailedById

func (q *Queries) MarkJobsFailedById(ctx context.Context, jobIds []string) error

func (*Queries) MarkJobsSucceededById

func (q *Queries) MarkJobsSucceededById(ctx context.Context, jobIds []string) error

func (*Queries) SelectAllExecutors

func (q *Queries) SelectAllExecutors(ctx context.Context) ([]Executor, error)

func (*Queries) SelectAllJobIds added in v0.3.47

func (q *Queries) SelectAllJobIds(ctx context.Context) ([]string, error)

func (*Queries) SelectAllMarkers added in v0.3.47

func (q *Queries) SelectAllMarkers(ctx context.Context) ([]Marker, error)

func (*Queries) SelectAllRunErrors added in v0.3.47

func (q *Queries) SelectAllRunErrors(ctx context.Context) ([]JobRunError, error)

func (*Queries) SelectAllRunIds added in v0.3.47

func (q *Queries) SelectAllRunIds(ctx context.Context) ([]uuid.UUID, error)

func (*Queries) SelectExecutorUpdateTimes

func (q *Queries) SelectExecutorUpdateTimes(ctx context.Context) ([]SelectExecutorUpdateTimesRow, error)

func (*Queries) SelectJobsForExecutor

func (q *Queries) SelectJobsForExecutor(ctx context.Context, arg SelectJobsForExecutorParams) ([]SelectJobsForExecutorRow, error)

func (*Queries) SelectNewJobs

func (q *Queries) SelectNewJobs(ctx context.Context, arg SelectNewJobsParams) ([]Job, error)

func (*Queries) SelectNewRuns

func (q *Queries) SelectNewRuns(ctx context.Context, arg SelectNewRunsParams) ([]Run, error)

func (*Queries) SelectNewRunsForJobs

func (q *Queries) SelectNewRunsForJobs(ctx context.Context, arg SelectNewRunsForJobsParams) ([]Run, error)

func (*Queries) SelectRunErrorsById

func (q *Queries) SelectRunErrorsById(ctx context.Context, runIds []uuid.UUID) ([]JobRunError, error)

Run errors

func (*Queries) SelectUpdatedJobs

func (q *Queries) SelectUpdatedJobs(ctx context.Context, arg SelectUpdatedJobsParams) ([]SelectUpdatedJobsRow, error)

func (*Queries) UpdateJobPriorityById

func (q *Queries) UpdateJobPriorityById(ctx context.Context, arg UpdateJobPriorityByIdParams) error

func (*Queries) UpdateJobPriorityByJobSet

func (q *Queries) UpdateJobPriorityByJobSet(ctx context.Context, arg UpdateJobPriorityByJobSetParams) error

func (*Queries) UpsertExecutor

func (q *Queries) UpsertExecutor(ctx context.Context, arg UpsertExecutorParams) error

func (*Queries) WithTx

func (q *Queries) WithTx(tx pgx.Tx) *Queries

type Queue

type Queue struct {
	Name   string  `db:"name"`
	Weight float64 `db:"weight"`
}

type QueueRepository

type QueueRepository interface {
	GetAllQueues() ([]*Queue, error)
}

QueueRepository is an interface to be implemented by structs which provide queue information

type Run

type Run struct {
	RunID        uuid.UUID `db:"run_id"`
	JobID        string    `db:"job_id"`
	JobSet       string    `db:"job_set"`
	Executor     string    `db:"executor"`
	Cancelled    bool      `db:"cancelled"`
	Running      bool      `db:"running"`
	Succeeded    bool      `db:"succeeded"`
	Failed       bool      `db:"failed"`
	Returned     bool      `db:"returned"`
	Serial       int64     `db:"serial"`
	LastModified time.Time `db:"last_modified"`
}

func (Run) GetSerial

func (run Run) GetSerial() int64

GetSerial is needed for the HasSerial interface

type SelectExecutorUpdateTimesRow

type SelectExecutorUpdateTimesRow struct {
	ExecutorID  string    `db:"executor_id"`
	LastUpdated time.Time `db:"last_updated"`
}

type SelectJobsForExecutorParams

type SelectJobsForExecutorParams struct {
	Executor string      `db:"executor"`
	RunIds   []uuid.UUID `db:"run_ids"`
}

type SelectJobsForExecutorRow

type SelectJobsForExecutorRow struct {
	RunID         uuid.UUID `db:"run_id"`
	Queue         string    `db:"queue"`
	JobSet        string    `db:"job_set"`
	UserID        string    `db:"user_id"`
	Groups        []byte    `db:"groups"`
	SubmitMessage []byte    `db:"submit_message"`
}

type SelectNewJobsParams

type SelectNewJobsParams struct {
	Serial int64 `db:"serial"`
	Limit  int32 `db:"limit"`
}

type SelectNewRunsForJobsParams

type SelectNewRunsForJobsParams struct {
	Serial int64    `db:"serial"`
	JobIds []string `db:"job_ids"`
}

type SelectNewRunsParams

type SelectNewRunsParams struct {
	Serial int64 `db:"serial"`
	Limit  int32 `db:"limit"`
}

type SelectUpdatedJobsParams

type SelectUpdatedJobsParams struct {
	Serial int64 `db:"serial"`
	Limit  int32 `db:"limit"`
}

type SelectUpdatedJobsRow

type SelectUpdatedJobsRow struct {
	JobID           string `db:"job_id"`
	JobSet          string `db:"job_set"`
	Queue           string `db:"queue"`
	Priority        int64  `db:"priority"`
	Submitted       int64  `db:"submitted"`
	CancelRequested bool   `db:"cancel_requested"`
	Cancelled       bool   `db:"cancelled"`
	Succeeded       bool   `db:"succeeded"`
	Failed          bool   `db:"failed"`
	SchedulingInfo  []byte `db:"scheduling_info"`
	Serial          int64  `db:"serial"`
}

func (SelectUpdatedJobsRow) GetSerial

func (row SelectUpdatedJobsRow) GetSerial() int64

GetSerial is needed for the HasSerial interface

type UpdateJobPriorityByIdParams

type UpdateJobPriorityByIdParams struct {
	Priority int64  `db:"priority"`
	JobID    string `db:"job_id"`
}

type UpdateJobPriorityByJobSetParams

type UpdateJobPriorityByJobSetParams struct {
	Priority int64  `db:"priority"`
	JobSet   string `db:"job_set"`
}

type UpsertExecutorParams

type UpsertExecutorParams struct {
	ExecutorID  string    `db:"executor_id"`
	LastRequest []byte    `db:"last_request"`
	UpdateTime  time.Time `db:"update_time"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL