db

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 13, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package db contains all communication between ppacer and the database.

Introduction

Supported databases

  • SQLite - used as the default database. It's also used as in-memory database and database on /tmp files for unit and integration tests.
  • Postgres
  • ... (More in the future)

Index

Constants

View Source
const (
	DagRunTaskStatusScheduled = "SCHEDULED"
)
View Source
const PPACER_ENV_LOG_LEVEL = "PPACER_LOG_LEVEL"

Name for environment variable for setting ppacer default logger severity level.

Variables

View Source
var TableNames []string = []string{
	"dags",
	"dagtasks",
	"dagruns",
	"dagruntasks",
	"schedules",
	"dagrunrestarts",
}

TableNames is a list of ppacer database table names.

Functions

func CleanUpSqliteTmp

func CleanUpSqliteTmp(c DBClient, t *testing.T)

CleanUpSqliteTmp deletes SQLite database source file if all tests in the scope passed. In at least one test failed, database will not be deleted, to enable further debugging. Even though this function takes generic *Client, it's mainly meant for SQLite-based database clients which are used in testing.

func SchemaStatements

func SchemaStatements(dbDriver string) ([]string, error)

SchemaStatements returns a list of SQL statements that setups new instance of scheduler internal database. It can differ a little bit between SQL databases, so exact list of statements are prepared based on given database driver name. If given database driver is not supported, then non-nil error is returned.

func SchemaStatementsForLogs

func SchemaStatementsForLogs(dbDriver string) ([]string, error)

Like SchemaStatements but for database for logs.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents the main database client.

func NewInMemoryClient

func NewInMemoryClient(schemaScriptPath string) (*Client, error)

Produces new Client using in-memory SQLite database with schema created based on given script.

func NewPostgresClient

func NewPostgresClient(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)

Produces new Client based on given connection to PostgreSQL database. If given database doesn't contain required ppacer schema, it will be created on the client initialization.

func NewPostgresClientForLogs

func NewPostgresClientForLogs(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)

Produces new Client for logs based on given connection to PostgreSQL database. If given database doesn't contain required logs schema, it will be created on the client initialization.

func NewSqliteClient

func NewSqliteClient(dbFilePath string, logger *slog.Logger) (*Client, error)

Produces new Client based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.

func NewSqliteInMemoryClient

func NewSqliteInMemoryClient(logger *slog.Logger) (*Client, error)

Produces new Client based on SQLite in-memory database. It's not persisted anywhere else. Useful usually for tests and small examples.

func NewSqliteTmpClient

func NewSqliteTmpClient(logger *slog.Logger) (*Client, error)

Produces new Client using SQLite database created as temp file. It's mainly for testing and ad-hocs.

func (*Client) Count

func (c *Client) Count(table string) int

Count returns count of rows for given table. If case of errors -1 is returned and error is logged.

func (*Client) CountWhere

func (c *Client) CountWhere(table, where string) int

CountWhere returns count of rows for given table filtered by given where condition. If case of errors -1 is returned and error is logged.

func (*Client) DBConn

func (c *Client) DBConn() DB

Client is a DBClient.

func (*Client) DagRunAlreadyScheduled

func (c *Client) DagRunAlreadyScheduled(
	ctx context.Context, dagId, execTs string,
) (bool, error)

DagRunAlreadyScheduled checks whenever dagrun already exists for given DAG ID and schedule timestamp.

func (*Client) InsertDagRun

func (c *Client) InsertDagRun(ctx context.Context, dagId, execTs string) (int64, error)

InsertDagRun inserts new row into dagruns table for given DagId and execution timestamp. Initial status is set to DagRunStatusScheduled. RunId for just inserted dag run is returned or -1 in case when error is not nil.

func (*Client) InsertDagRunNextRestart added in v0.0.11

func (c *Client) InsertDagRunNextRestart(
	ctx context.Context, dagId, execTs string,
) error

InsertDagRunNextRestart inserts new DAG run restart event. If given DAG run haven't been restarted yet, then record with Restart=1 will be inserted. In case when this is a second restart, then Restart=2 will be inserted and so on.

func (*Client) InsertDagRunTask

func (c *Client) InsertDagRunTask(ctx context.Context, dagId, execTs, taskId string, retry int, status string) error

Inserts new DagRunTask with default status SCHEDULED.

func (*Client) InsertDagSchedule added in v0.0.5

func (c *Client) InsertDagSchedule(
	ctx context.Context, dagId, event, nextSchedule string, schedule *string,
) error

InsertDagSchedule inserts new event regarding DAG schedule.

func (*Client) InsertDagTasks

func (c *Client) InsertDagTasks(ctx context.Context, d dag.Dag) error

InsertDagTasks inserts the tasks of given DAG to dagtasks table and set it as the current version. Previous versions would still be in dagtasks table but with set IsCurrent=0. In case of inserting any of dag's task insertion would be rollbacked (in terms of SQL transactions).

func (*Client) ReadDag

func (c *Client) ReadDag(ctx context.Context, dagId string) (Dag, error)

ReadDag reads metadata about DAG from dags table for given dagId.

func (*Client) ReadDagRun added in v0.0.7

func (c *Client) ReadDagRun(ctx context.Context, runId int) (DagRun, error)

Reads DAG run information for given run ID.

func (*Client) ReadDagRunByExecTs added in v0.0.11

func (c *Client) ReadDagRunByExecTs(ctx context.Context, dagId, execTs string) (DagRun, error)

ReadDagRunByExecTs reads DAG run information for given DAG ID and execTs.

func (*Client) ReadDagRunRestartLatest added in v0.0.11

func (c *Client) ReadDagRunRestartLatest(
	ctx context.Context, dagId, execTs string,
) (DagRunRestart, error)

ReadDagRunRestartLatest reads latest restart event for given DAG run.

func (*Client) ReadDagRunSingleTaskDetails added in v0.0.9

func (c *Client) ReadDagRunSingleTaskDetails(
	ctx context.Context, dagId, execTs, taskId string,
) (DagRunTaskDetails, error)

Read DAG run task with additional information including node positions and task configuration.

func (*Client) ReadDagRunTask

func (c *Client) ReadDagRunTask(ctx context.Context, dagId, execTs, taskId string, retry int) (DagRunTask, error)

ReadDagRunTask reads information about given taskId in given dag run.

func (*Client) ReadDagRunTaskDetails added in v0.0.7

func (c *Client) ReadDagRunTaskDetails(
	ctx context.Context, dagId, execTs string,
) ([]DagRunTaskDetails, error)

Reads all DAG tasks with all relevant details including node positions in DAG and task configurations.

func (*Client) ReadDagRunTaskLatest

func (c *Client) ReadDagRunTaskLatest(ctx context.Context, dagId, execTs, taskId string) (DagRunTask, error)

ReadDagRunTaskLatest reads information about latest DAG run task try, out of possibly multiple retries of task execution, for given taskId and dag run.

func (*Client) ReadDagRunTaskLatestRetries added in v0.0.11

func (c *Client) ReadDagRunTaskLatestRetries(ctx context.Context, dagId, execTs string) ([]DagRunTask, error)

ReadDagRunTaskLatestRetries reads, for a given DAG run, latest retries of all tasks in that DAG run.

func (*Client) ReadDagRunTasks

func (c *Client) ReadDagRunTasks(ctx context.Context, dagId, execTs string) ([]DagRunTask, error)

Reads DAG run tasks information from dagruntasks table for given DAG run.

func (*Client) ReadDagRunTasksAggByStatus added in v0.0.3

func (c *Client) ReadDagRunTasksAggByStatus(ctx context.Context) (map[string]int, error)

Reads aggregation of all DAG run tasks by its status.

func (*Client) ReadDagRunTasksNotFinished

func (c *Client) ReadDagRunTasksNotFinished(ctx context.Context) ([]DagRunTask, error)

ReadDagRunTasksNotFinished reads tasks from dagruntasks table which are not in terminal state (success or failed). Tasks are sorted from oldest to newest based on execTs and insertTs.

func (*Client) ReadDagRuns

func (c *Client) ReadDagRuns(ctx context.Context, dagId string, topN int) ([]DagRun, error)

ReadDagRuns reads topN latest dag runs for given DAG ID.

func (*Client) ReadDagRunsAggByStatus added in v0.0.3

func (c *Client) ReadDagRunsAggByStatus(ctx context.Context) (map[string]int, error)

Reads aggregation of all DAG run by its status.

func (*Client) ReadDagRunsNotFinished

func (c *Client) ReadDagRunsNotFinished(ctx context.Context) ([]DagRun, error)

Reads dag run from dagruns table which are not in terminal states.

func (*Client) ReadDagRunsWithTaskInfo added in v0.0.3

func (c *Client) ReadDagRunsWithTaskInfo(ctx context.Context, latest int) ([]DagRunWithTaskInfo, error)

Reads latest N DAG runs with information about number of tasks completed and tasks overall.

func (*Client) ReadDagSchedules added in v0.0.5

func (c *Client) ReadDagSchedules(ctx context.Context, dagId string) ([]Schedule, error)

ReadDagSchedules reads all schedule events for a given dag sorted from newest to oldest.

func (*Client) ReadDagTask

func (c *Client) ReadDagTask(ctx context.Context, dagId, taskId string) (DagTask, error)

ReadDagTask reads single row (current version) from dagtasks table for given DAG ID and task ID.

func (*Client) ReadDagTasks

func (c *Client) ReadDagTasks(ctx context.Context, dagId string) ([]DagTask, error)

ReadDagTasks reads all tasks for given dagId in the current version from dagtasks table.

func (*Client) ReadLatestDagRuns

func (c *Client) ReadLatestDagRuns(ctx context.Context) (map[string]DagRun, error)

ReadLatestDagRuns reads latest dag run for each Dag. Returns map from DagId to DagRun.

func (*Client) RunningTasksNum

func (c *Client) RunningTasksNum(ctx context.Context) (int, error)

RunningTasksNum returns number of currently running tasks. That means rows in dagruntasks table with status 'RUNNING'.

func (*Client) UpdateDagRunStatus

func (c *Client) UpdateDagRunStatus(
	ctx context.Context, runId int64, status string,
) error

Updates dagrun status for given runId.

func (*Client) UpdateDagRunStatusByExecTs

func (c *Client) UpdateDagRunStatusByExecTs(
	ctx context.Context, dagId, execTs, status string,
) error

Updates dagrun status for given dagId and execTs (when runId is not available). Pair (dagId, execTs) is unique in dagrun table.

func (*Client) UpdateDagRunTaskStatus

func (c *Client) UpdateDagRunTaskStatus(ctx context.Context, dagId, execTs, taskId string, retry int, status string) error

Updates dagruntask status for given dag run task.

func (*Client) UpsertDag

func (c *Client) UpsertDag(ctx context.Context, d dag.Dag) error

Upsert inserts or updates DAG details in dags table. TODO(dskrzypiec): Perhaps we should always insert new DAG into dags and keep IsCurrent flag? Similarly like we do in dagtasks. Not really needed for now, but something to consider in the future.

type DB

type DB interface {
	Begin() (*sql.Tx, error)
	Exec(query string, args ...any) (sql.Result, error)
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	Close() error
	DataSource() string
	Query(query string, args ...any) (*sql.Rows, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRow(query string, args ...any) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

DB defines a set of operations required from a database. Most of methods are identical with standard `*sql.DB` type.

type DBClient

type DBClient interface {
	DBConn() DB
}

DBClient defines abstraction class for database clients.

type Dag

type Dag struct {
	DagId               string
	StartTs             *string
	Schedule            *string
	CreateTs            string
	LatestUpdateTs      *string
	CreateVersion       string
	LatestUpdateVersion *string
	HashDagMeta         string
	HashTasks           string
	Attributes          string // serialized dag.Dag.Attr
}

func (Dag) Equals

func (d Dag) Equals(e Dag) bool

type DagRun

type DagRun struct {
	RunId          int64
	DagId          string
	ExecTs         string
	InsertTs       string
	Status         string
	StatusUpdateTs string
	Version        string
}

DagRun represent a row of data in dagruns table.

type DagRunRestart added in v0.0.11

type DagRunRestart struct {
	DagId   string
	ExecTs  string
	Restart int
	InserTs string
}

DagRunRestart represents a row in dagrunrestarts table.

type DagRunTask

type DagRunTask struct {
	DagId          string
	ExecTs         string
	TaskId         string
	Retry          int
	InsertTs       string
	Status         string
	StatusUpdateTs string
	Version        string
}

type DagRunTaskDetails added in v0.0.7

type DagRunTaskDetails struct {
	DagId          string
	TaskId         string
	PosDepth       int
	PosWidth       int
	ConfigJson     string
	TaskNotStarted bool
	ExecTs         string
	Retry          int
	InsertTs       string
	Status         string
	StatusUpdateTs string
	Version        string
}

type DagRunWithTaskInfo added in v0.0.3

type DagRunWithTaskInfo struct {
	DagRun           DagRun
	TaskNum          int
	TaskCompletedNum int
}

DagRunWithTaskInfo contains information about a DAG run and additionally information about that DAG tasks.

type DagTask

type DagTask struct {
	DagId          string
	TaskId         string
	IsCurrent      bool
	InsertTs       string
	PosDepth       int
	PosWidth       int
	Version        string
	TaskTypeName   string
	TaskConfig     string
	TaskBodyHash   string
	TaskBodySource string
}

DagTask represents single row in dagtasks table in the database.

type LogsClient

type LogsClient struct {
	// contains filtered or unexported fields
}

LogsClient represents ppacer task logs database client.

func NewSqliteClientForLogs

func NewSqliteClientForLogs(dbFilePath string, logger *slog.Logger) (*LogsClient, error)

Produces new Client for logs based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.

func NewSqliteTmpClientForLogs

func NewSqliteTmpClientForLogs(logger *slog.Logger) (*LogsClient, error)

Produces new Client for logs using SQLite database created as temp file. It's mainly for testing and ad-hocs.

func (*LogsClient) Count

func (lc *LogsClient) Count(table string) int

Count returns count of rows for given table. If case of errors -1 is returned and error is logged.

func (*LogsClient) DBConn

func (lc *LogsClient) DBConn() DB

LogsClient is a DBClient.

func (*LogsClient) InsertTaskLog

func (c *LogsClient) InsertTaskLog(tlr TaskLogRecord) error

InsertTaskLog inserts single log record into tasklogs table.

func (*LogsClient) ReadDagRunLogs

func (c *LogsClient) ReadDagRunLogs(ctx context.Context, dagId, execTs string) ([]TaskLogRecord, error)

ReadDagRunLogs reads all task logs for given DAG run in chronological order.

func (*LogsClient) ReadDagRunTaskLogs

func (c *LogsClient) ReadDagRunTaskLogs(ctx context.Context, dagId, execTs, taskId string, retry int) ([]TaskLogRecord, error)

ReadDagRunTaskLogs reads all logs for given DAG run task in chronological order.

func (*LogsClient) ReadDagRunTaskLogsLatest

func (c *LogsClient) ReadDagRunTaskLogsLatest(ctx context.Context, dagId, execTs, taskId string, retry, latest int) ([]TaskLogRecord, error)

ReadDagRunTaskLogsLatest reads given number of latest DAG run task logs in chronological order.

type PostgresDB

type PostgresDB struct {
	// contains filtered or unexported fields
}

PostgresDB represents Client for PostgreSQL database.

func (*PostgresDB) Begin

func (s *PostgresDB) Begin() (*sql.Tx, error)

func (*PostgresDB) Close

func (s *PostgresDB) Close() error

func (*PostgresDB) DataSource

func (s *PostgresDB) DataSource() string

func (*PostgresDB) Exec

func (s *PostgresDB) Exec(query string, args ...any) (sql.Result, error)

func (*PostgresDB) ExecContext

func (s *PostgresDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

func (*PostgresDB) Query

func (s *PostgresDB) Query(query string, args ...any) (*sql.Rows, error)

func (*PostgresDB) QueryContext

func (s *PostgresDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

func (*PostgresDB) QueryRow

func (s *PostgresDB) QueryRow(query string, args ...any) *sql.Row

func (*PostgresDB) QueryRowContext

func (s *PostgresDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

type Scannable added in v0.0.7

type Scannable interface {
	Scan(...any) error
}

Scannable interface is mainly to define parsers both for *sql.Row and *sql.Rows types.

type Schedule added in v0.0.5

type Schedule struct {
	DagId          string
	InsertTs       string
	Event          string
	ScheduleTs     *string
	NextScheduleTs string
}

Schedule represents a row in schedules table.

type SqliteDB

type SqliteDB struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*SqliteDB) Begin

func (s *SqliteDB) Begin() (*sql.Tx, error)

func (*SqliteDB) Close

func (s *SqliteDB) Close() error

func (*SqliteDB) DataSource

func (s *SqliteDB) DataSource() string

func (*SqliteDB) Exec

func (s *SqliteDB) Exec(query string, args ...any) (sql.Result, error)

func (*SqliteDB) ExecContext

func (s *SqliteDB) ExecContext(
	ctx context.Context, query string, args ...any,
) (sql.Result, error)

func (*SqliteDB) Query

func (s *SqliteDB) Query(query string, args ...any) (*sql.Rows, error)

func (*SqliteDB) QueryContext

func (s *SqliteDB) QueryContext(
	ctx context.Context, query string, args ...any,
) (*sql.Rows, error)

func (*SqliteDB) QueryRow

func (s *SqliteDB) QueryRow(query string, args ...any) *sql.Row

func (*SqliteDB) QueryRowContext

func (s *SqliteDB) QueryRowContext(
	ctx context.Context, query string, args ...any,
) *sql.Row

type SqliteDBInMemory

type SqliteDBInMemory struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SQLite database where data is stored in the memory rather than in a file on a disk. It needs additional level of isolation for concurrent access.

func (*SqliteDBInMemory) Begin

func (s *SqliteDBInMemory) Begin() (*sql.Tx, error)

func (*SqliteDBInMemory) Close

func (s *SqliteDBInMemory) Close() error

func (*SqliteDBInMemory) DataSource

func (s *SqliteDBInMemory) DataSource() string

func (*SqliteDBInMemory) Exec

func (s *SqliteDBInMemory) Exec(query string, args ...any) (sql.Result, error)

func (*SqliteDBInMemory) ExecContext

func (s *SqliteDBInMemory) ExecContext(
	ctx context.Context, query string, args ...any,
) (sql.Result, error)

func (*SqliteDBInMemory) Query

func (s *SqliteDBInMemory) Query(query string, args ...any) (*sql.Rows, error)

func (*SqliteDBInMemory) QueryContext

func (s *SqliteDBInMemory) QueryContext(
	ctx context.Context, query string, args ...any,
) (*sql.Rows, error)

func (*SqliteDBInMemory) QueryRow

func (s *SqliteDBInMemory) QueryRow(query string, args ...any) *sql.Row

func (*SqliteDBInMemory) QueryRowContext

func (s *SqliteDBInMemory) QueryRowContext(
	ctx context.Context, query string, args ...any,
) *sql.Row

type TaskLogRecord

type TaskLogRecord struct {
	DagId      string
	ExecTs     string
	TaskId     string
	Retry      int
	InsertTs   string
	Level      string
	Message    string
	Attributes string
}

LogRecord represents single row in tasklogs table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL