Documentation
¶
Overview ¶
Package db contains all communication between ppacer and the database.
Introduction ¶
Supported databases ¶
- SQLite - used as the default database. It's also used as in-memory database and database on /tmp files for unit and integration tests.
- Postgres
- ... (More in the future)
Index ¶
- Constants
- Variables
- func CleanUpSqliteTmp(c DBClient, t *testing.T)
- func SchemaStatements(dbDriver string) ([]string, error)
- func SchemaStatementsForLogs(dbDriver string) ([]string, error)
- type Client
- func NewInMemoryClient(schemaScriptPath string) (*Client, error)
- func NewPostgresClient(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)
- func NewPostgresClientForLogs(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)
- func NewSqliteClient(dbFilePath string, logger *slog.Logger) (*Client, error)
- func NewSqliteInMemoryClient(logger *slog.Logger) (*Client, error)
- func NewSqliteTmpClient(logger *slog.Logger) (*Client, error)
- func (c *Client) Count(table string) int
- func (c *Client) CountWhere(table, where string) int
- func (c *Client) DBConn() DB
- func (c *Client) DagRunAlreadyScheduled(ctx context.Context, dagId, execTs string) (bool, error)
- func (c *Client) InsertDagRun(ctx context.Context, dagId, execTs string) (int64, error)
- func (c *Client) InsertDagRunNextRestart(ctx context.Context, dagId, execTs string) error
- func (c *Client) InsertDagRunTask(ctx context.Context, dagId, execTs, taskId string, retry int, status string) error
- func (c *Client) InsertDagSchedule(ctx context.Context, dagId, event, nextSchedule string, schedule *string) error
- func (c *Client) InsertDagTasks(ctx context.Context, d dag.Dag) error
- func (c *Client) ReadDag(ctx context.Context, dagId string) (Dag, error)
- func (c *Client) ReadDagRun(ctx context.Context, runId int) (DagRun, error)
- func (c *Client) ReadDagRunByExecTs(ctx context.Context, dagId, execTs string) (DagRun, error)
- func (c *Client) ReadDagRunRestartLatest(ctx context.Context, dagId, execTs string) (DagRunRestart, error)
- func (c *Client) ReadDagRunSingleTaskDetails(ctx context.Context, dagId, execTs, taskId string) (DagRunTaskDetails, error)
- func (c *Client) ReadDagRunTask(ctx context.Context, dagId, execTs, taskId string, retry int) (DagRunTask, error)
- func (c *Client) ReadDagRunTaskDetails(ctx context.Context, dagId, execTs string) ([]DagRunTaskDetails, error)
- func (c *Client) ReadDagRunTaskLatest(ctx context.Context, dagId, execTs, taskId string) (DagRunTask, error)
- func (c *Client) ReadDagRunTaskLatestRetries(ctx context.Context, dagId, execTs string) ([]DagRunTask, error)
- func (c *Client) ReadDagRunTasks(ctx context.Context, dagId, execTs string) ([]DagRunTask, error)
- func (c *Client) ReadDagRunTasksAggByStatus(ctx context.Context) (map[string]int, error)
- func (c *Client) ReadDagRunTasksNotFinished(ctx context.Context) ([]DagRunTask, error)
- func (c *Client) ReadDagRuns(ctx context.Context, dagId string, topN int) ([]DagRun, error)
- func (c *Client) ReadDagRunsAggByStatus(ctx context.Context) (map[string]int, error)
- func (c *Client) ReadDagRunsNotFinished(ctx context.Context) ([]DagRun, error)
- func (c *Client) ReadDagRunsWithTaskInfo(ctx context.Context, latest int) ([]DagRunWithTaskInfo, error)
- func (c *Client) ReadDagSchedules(ctx context.Context, dagId string) ([]Schedule, error)
- func (c *Client) ReadDagTask(ctx context.Context, dagId, taskId string) (DagTask, error)
- func (c *Client) ReadDagTasks(ctx context.Context, dagId string) ([]DagTask, error)
- func (c *Client) ReadLatestDagRuns(ctx context.Context) (map[string]DagRun, error)
- func (c *Client) RunningTasksNum(ctx context.Context) (int, error)
- func (c *Client) UpdateDagRunStatus(ctx context.Context, runId int64, status string) error
- func (c *Client) UpdateDagRunStatusByExecTs(ctx context.Context, dagId, execTs, status string) error
- func (c *Client) UpdateDagRunTaskStatus(ctx context.Context, dagId, execTs, taskId string, retry int, status string) error
- func (c *Client) UpsertDag(ctx context.Context, d dag.Dag) error
- type DB
- type DBClient
- type Dag
- type DagRun
- type DagRunRestart
- type DagRunTask
- type DagRunTaskDetails
- type DagRunWithTaskInfo
- type DagTask
- type LogsClient
- func (lc *LogsClient) Count(table string) int
- func (lc *LogsClient) DBConn() DB
- func (c *LogsClient) InsertTaskLog(tlr TaskLogRecord) error
- func (c *LogsClient) ReadDagRunLogs(ctx context.Context, dagId, execTs string) ([]TaskLogRecord, error)
- func (c *LogsClient) ReadDagRunTaskLogs(ctx context.Context, dagId, execTs, taskId string, retry int) ([]TaskLogRecord, error)
- func (c *LogsClient) ReadDagRunTaskLogsLatest(ctx context.Context, dagId, execTs, taskId string, retry, latest int) ([]TaskLogRecord, error)
- type PostgresDB
- func (s *PostgresDB) Begin() (*sql.Tx, error)
- func (s *PostgresDB) Close() error
- func (s *PostgresDB) DataSource() string
- func (s *PostgresDB) Exec(query string, args ...any) (sql.Result, error)
- func (s *PostgresDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (s *PostgresDB) Query(query string, args ...any) (*sql.Rows, error)
- func (s *PostgresDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (s *PostgresDB) QueryRow(query string, args ...any) *sql.Row
- func (s *PostgresDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type Scannable
- type Schedule
- type SqliteDB
- func (s *SqliteDB) Begin() (*sql.Tx, error)
- func (s *SqliteDB) Close() error
- func (s *SqliteDB) DataSource() string
- func (s *SqliteDB) Exec(query string, args ...any) (sql.Result, error)
- func (s *SqliteDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (s *SqliteDB) Query(query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDB) QueryRow(query string, args ...any) *sql.Row
- func (s *SqliteDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type SqliteDBInMemory
- func (s *SqliteDBInMemory) Begin() (*sql.Tx, error)
- func (s *SqliteDBInMemory) Close() error
- func (s *SqliteDBInMemory) DataSource() string
- func (s *SqliteDBInMemory) Exec(query string, args ...any) (sql.Result, error)
- func (s *SqliteDBInMemory) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (s *SqliteDBInMemory) Query(query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDBInMemory) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDBInMemory) QueryRow(query string, args ...any) *sql.Row
- func (s *SqliteDBInMemory) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type TaskLogRecord
Constants ¶
const (
DagRunTaskStatusScheduled = "SCHEDULED"
)
const PPACER_ENV_LOG_LEVEL = "PPACER_LOG_LEVEL"
Name for environment variable for setting ppacer default logger severity level.
Variables ¶
var TableNames []string = []string{
"dags",
"dagtasks",
"dagruns",
"dagruntasks",
"schedules",
"dagrunrestarts",
}
TableNames is a list of ppacer database table names.
Functions ¶
func CleanUpSqliteTmp ¶
CleanUpSqliteTmp deletes SQLite database source file if all tests in the scope passed. In at least one test failed, database will not be deleted, to enable further debugging. Even though this function takes generic *Client, it's mainly meant for SQLite-based database clients which are used in testing.
func SchemaStatements ¶
SchemaStatements returns a list of SQL statements that setups new instance of scheduler internal database. It can differ a little bit between SQL databases, so exact list of statements are prepared based on given database driver name. If given database driver is not supported, then non-nil error is returned.
func SchemaStatementsForLogs ¶
Like SchemaStatements but for database for logs.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents the main database client.
func NewInMemoryClient ¶
Produces new Client using in-memory SQLite database with schema created based on given script.
func NewPostgresClient ¶
Produces new Client based on given connection to PostgreSQL database. If given database doesn't contain required ppacer schema, it will be created on the client initialization.
func NewPostgresClientForLogs ¶
Produces new Client for logs based on given connection to PostgreSQL database. If given database doesn't contain required logs schema, it will be created on the client initialization.
func NewSqliteClient ¶
Produces new Client based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.
func NewSqliteInMemoryClient ¶
Produces new Client based on SQLite in-memory database. It's not persisted anywhere else. Useful usually for tests and small examples.
func NewSqliteTmpClient ¶
Produces new Client using SQLite database created as temp file. It's mainly for testing and ad-hocs.
func (*Client) Count ¶
Count returns count of rows for given table. If case of errors -1 is returned and error is logged.
func (*Client) CountWhere ¶
CountWhere returns count of rows for given table filtered by given where condition. If case of errors -1 is returned and error is logged.
func (*Client) DagRunAlreadyScheduled ¶
DagRunAlreadyScheduled checks whenever dagrun already exists for given DAG ID and schedule timestamp.
func (*Client) InsertDagRun ¶
InsertDagRun inserts new row into dagruns table for given DagId and execution timestamp. Initial status is set to DagRunStatusScheduled. RunId for just inserted dag run is returned or -1 in case when error is not nil.
func (*Client) InsertDagRunNextRestart ¶ added in v0.0.11
InsertDagRunNextRestart inserts new DAG run restart event. If given DAG run haven't been restarted yet, then record with Restart=1 will be inserted. In case when this is a second restart, then Restart=2 will be inserted and so on.
func (*Client) InsertDagRunTask ¶
func (c *Client) InsertDagRunTask(ctx context.Context, dagId, execTs, taskId string, retry int, status string) error
Inserts new DagRunTask with default status SCHEDULED.
func (*Client) InsertDagSchedule ¶ added in v0.0.5
func (c *Client) InsertDagSchedule( ctx context.Context, dagId, event, nextSchedule string, schedule *string, ) error
InsertDagSchedule inserts new event regarding DAG schedule.
func (*Client) InsertDagTasks ¶
InsertDagTasks inserts the tasks of given DAG to dagtasks table and set it as the current version. Previous versions would still be in dagtasks table but with set IsCurrent=0. In case of inserting any of dag's task insertion would be rollbacked (in terms of SQL transactions).
func (*Client) ReadDagRun ¶ added in v0.0.7
Reads DAG run information for given run ID.
func (*Client) ReadDagRunByExecTs ¶ added in v0.0.11
ReadDagRunByExecTs reads DAG run information for given DAG ID and execTs.
func (*Client) ReadDagRunRestartLatest ¶ added in v0.0.11
func (c *Client) ReadDagRunRestartLatest( ctx context.Context, dagId, execTs string, ) (DagRunRestart, error)
ReadDagRunRestartLatest reads latest restart event for given DAG run.
func (*Client) ReadDagRunSingleTaskDetails ¶ added in v0.0.9
func (c *Client) ReadDagRunSingleTaskDetails( ctx context.Context, dagId, execTs, taskId string, ) (DagRunTaskDetails, error)
Read DAG run task with additional information including node positions and task configuration.
func (*Client) ReadDagRunTask ¶
func (c *Client) ReadDagRunTask(ctx context.Context, dagId, execTs, taskId string, retry int) (DagRunTask, error)
ReadDagRunTask reads information about given taskId in given dag run.
func (*Client) ReadDagRunTaskDetails ¶ added in v0.0.7
func (c *Client) ReadDagRunTaskDetails( ctx context.Context, dagId, execTs string, ) ([]DagRunTaskDetails, error)
Reads all DAG tasks with all relevant details including node positions in DAG and task configurations.
func (*Client) ReadDagRunTaskLatest ¶
func (c *Client) ReadDagRunTaskLatest(ctx context.Context, dagId, execTs, taskId string) (DagRunTask, error)
ReadDagRunTaskLatest reads information about latest DAG run task try, out of possibly multiple retries of task execution, for given taskId and dag run.
func (*Client) ReadDagRunTaskLatestRetries ¶ added in v0.0.11
func (c *Client) ReadDagRunTaskLatestRetries(ctx context.Context, dagId, execTs string) ([]DagRunTask, error)
ReadDagRunTaskLatestRetries reads, for a given DAG run, latest retries of all tasks in that DAG run.
func (*Client) ReadDagRunTasks ¶
Reads DAG run tasks information from dagruntasks table for given DAG run.
func (*Client) ReadDagRunTasksAggByStatus ¶ added in v0.0.3
Reads aggregation of all DAG run tasks by its status.
func (*Client) ReadDagRunTasksNotFinished ¶
func (c *Client) ReadDagRunTasksNotFinished(ctx context.Context) ([]DagRunTask, error)
ReadDagRunTasksNotFinished reads tasks from dagruntasks table which are not in terminal state (success or failed). Tasks are sorted from oldest to newest based on execTs and insertTs.
func (*Client) ReadDagRuns ¶
ReadDagRuns reads topN latest dag runs for given DAG ID.
func (*Client) ReadDagRunsAggByStatus ¶ added in v0.0.3
Reads aggregation of all DAG run by its status.
func (*Client) ReadDagRunsNotFinished ¶
Reads dag run from dagruns table which are not in terminal states.
func (*Client) ReadDagRunsWithTaskInfo ¶ added in v0.0.3
func (c *Client) ReadDagRunsWithTaskInfo(ctx context.Context, latest int) ([]DagRunWithTaskInfo, error)
Reads latest N DAG runs with information about number of tasks completed and tasks overall.
func (*Client) ReadDagSchedules ¶ added in v0.0.5
ReadDagSchedules reads all schedule events for a given dag sorted from newest to oldest.
func (*Client) ReadDagTask ¶
ReadDagTask reads single row (current version) from dagtasks table for given DAG ID and task ID.
func (*Client) ReadDagTasks ¶
ReadDagTasks reads all tasks for given dagId in the current version from dagtasks table.
func (*Client) ReadLatestDagRuns ¶
ReadLatestDagRuns reads latest dag run for each Dag. Returns map from DagId to DagRun.
func (*Client) RunningTasksNum ¶
RunningTasksNum returns number of currently running tasks. That means rows in dagruntasks table with status 'RUNNING'.
func (*Client) UpdateDagRunStatus ¶
Updates dagrun status for given runId.
func (*Client) UpdateDagRunStatusByExecTs ¶
func (c *Client) UpdateDagRunStatusByExecTs( ctx context.Context, dagId, execTs, status string, ) error
Updates dagrun status for given dagId and execTs (when runId is not available). Pair (dagId, execTs) is unique in dagrun table.
func (*Client) UpdateDagRunTaskStatus ¶
func (c *Client) UpdateDagRunTaskStatus(ctx context.Context, dagId, execTs, taskId string, retry int, status string) error
Updates dagruntask status for given dag run task.
type DB ¶
type DB interface { Begin() (*sql.Tx, error) Exec(query string, args ...any) (sql.Result, error) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) Close() error DataSource() string Query(query string, args ...any) (*sql.Rows, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) QueryRow(query string, args ...any) *sql.Row QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row }
DB defines a set of operations required from a database. Most of methods are identical with standard `*sql.DB` type.
type DBClient ¶
type DBClient interface {
DBConn() DB
}
DBClient defines abstraction class for database clients.
type Dag ¶
type DagRun ¶
type DagRun struct { RunId int64 DagId string ExecTs string InsertTs string Status string StatusUpdateTs string Version string }
DagRun represent a row of data in dagruns table.
type DagRunRestart ¶ added in v0.0.11
DagRunRestart represents a row in dagrunrestarts table.
type DagRunTask ¶
type DagRunTaskDetails ¶ added in v0.0.7
type DagRunWithTaskInfo ¶ added in v0.0.3
DagRunWithTaskInfo contains information about a DAG run and additionally information about that DAG tasks.
type DagTask ¶
type DagTask struct { DagId string TaskId string IsCurrent bool InsertTs string PosDepth int PosWidth int Version string TaskTypeName string TaskConfig string TaskBodyHash string TaskBodySource string }
DagTask represents single row in dagtasks table in the database.
type LogsClient ¶
type LogsClient struct {
// contains filtered or unexported fields
}
LogsClient represents ppacer task logs database client.
func NewSqliteClientForLogs ¶
func NewSqliteClientForLogs(dbFilePath string, logger *slog.Logger) (*LogsClient, error)
Produces new Client for logs based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.
func NewSqliteTmpClientForLogs ¶
func NewSqliteTmpClientForLogs(logger *slog.Logger) (*LogsClient, error)
Produces new Client for logs using SQLite database created as temp file. It's mainly for testing and ad-hocs.
func (*LogsClient) Count ¶
func (lc *LogsClient) Count(table string) int
Count returns count of rows for given table. If case of errors -1 is returned and error is logged.
func (*LogsClient) InsertTaskLog ¶
func (c *LogsClient) InsertTaskLog(tlr TaskLogRecord) error
InsertTaskLog inserts single log record into tasklogs table.
func (*LogsClient) ReadDagRunLogs ¶
func (c *LogsClient) ReadDagRunLogs(ctx context.Context, dagId, execTs string) ([]TaskLogRecord, error)
ReadDagRunLogs reads all task logs for given DAG run in chronological order.
func (*LogsClient) ReadDagRunTaskLogs ¶
func (c *LogsClient) ReadDagRunTaskLogs(ctx context.Context, dagId, execTs, taskId string, retry int) ([]TaskLogRecord, error)
ReadDagRunTaskLogs reads all logs for given DAG run task in chronological order.
func (*LogsClient) ReadDagRunTaskLogsLatest ¶
func (c *LogsClient) ReadDagRunTaskLogsLatest(ctx context.Context, dagId, execTs, taskId string, retry, latest int) ([]TaskLogRecord, error)
ReadDagRunTaskLogsLatest reads given number of latest DAG run task logs in chronological order.
type PostgresDB ¶
type PostgresDB struct {
// contains filtered or unexported fields
}
PostgresDB represents Client for PostgreSQL database.
func (*PostgresDB) Close ¶
func (s *PostgresDB) Close() error
func (*PostgresDB) DataSource ¶
func (s *PostgresDB) DataSource() string
func (*PostgresDB) ExecContext ¶
func (*PostgresDB) QueryContext ¶
func (*PostgresDB) QueryRowContext ¶
type Scannable ¶ added in v0.0.7
Scannable interface is mainly to define parsers both for *sql.Row and *sql.Rows types.
type Schedule ¶ added in v0.0.5
type Schedule struct { DagId string InsertTs string Event string ScheduleTs *string NextScheduleTs string }
Schedule represents a row in schedules table.
type SqliteDB ¶
func (*SqliteDB) DataSource ¶
func (*SqliteDB) ExecContext ¶
func (*SqliteDB) QueryContext ¶
type SqliteDBInMemory ¶
SQLite database where data is stored in the memory rather than in a file on a disk. It needs additional level of isolation for concurrent access.
func (*SqliteDBInMemory) Close ¶
func (s *SqliteDBInMemory) Close() error
func (*SqliteDBInMemory) DataSource ¶
func (s *SqliteDBInMemory) DataSource() string
func (*SqliteDBInMemory) ExecContext ¶
func (*SqliteDBInMemory) QueryContext ¶
func (*SqliteDBInMemory) QueryRow ¶
func (s *SqliteDBInMemory) QueryRow(query string, args ...any) *sql.Row