sql

package
v1.1.1-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConsensusInfoID = 1
)

Variables

View Source
var (
	MainMigrations = func(ctx context.Context, m MainMigrator, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
		dbIdentifier := "main"
		return []Migration{
			{
				ID:      "00001_init",
				Migrate: func(tx Tx) error { return ErrRunV072 },
			},
			{
				ID: "00001_object_metadata",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00001_object_metadata", log)
				},
			},
			{
				ID: "00002_prune_slabs_trigger",
				Migrate: func(tx Tx) error {
					err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00002_prune_slabs_trigger", log)
					if utils.IsErr(err, ErrMySQLNoSuperPrivilege) {
						log.Warn("migration 00002_prune_slabs_trigger requires the user to have the SUPER privilege to register triggers")
					}
					return err
				},
			},
			{
				ID: "00003_idx_objects_size",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00003_idx_objects_size", log)
				},
			},
			{
				ID: "00004_prune_slabs_cascade",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00004_prune_slabs_cascade", log)
				},
			},
			{
				ID: "00005_zero_size_object_health",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00005_zero_size_object_health", log)
				},
			},
			{
				ID: "00006_idx_objects_created_at",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00006_idx_objects_created_at", log)
				},
			},
			{
				ID: "00007_host_checks",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00007_host_checks", log)
				},
			},
			{
				ID: "00008_directories",
				Migrate: func(tx Tx) error {
					if err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00008_directories_1", log); err != nil {
						return fmt.Errorf("failed to migrate: %v", err)
					}
					// helper type
					type obj struct {
						ID       uint
						ObjectID string
					}

					log.Info("beginning post-migration directory creation, this might take a while")
					batchSize := 10000
					processedDirs := make(map[string]struct{})
					for offset := 0; ; offset += batchSize {
						if offset > 0 && offset%batchSize == 0 {
							log.Infof("processed %v objects", offset)
						}
						var objBatch []obj
						rows, err := tx.Query(ctx, "SELECT id, object_id FROM objects ORDER BY id LIMIT ? OFFSET ?", batchSize, offset)
						if err != nil {
							return fmt.Errorf("failed to fetch objects: %v", err)
						}
						for rows.Next() {
							var o obj
							if err := rows.Scan(&o.ID, &o.ObjectID); err != nil {
								_ = rows.Close()
								return fmt.Errorf("failed to scan object: %v", err)
							}
							objBatch = append(objBatch, o)
						}
						if err := rows.Close(); err != nil {
							return fmt.Errorf("failed to close rows: %v", err)
						}
						if len(objBatch) == 0 {
							break
						}
						for _, obj := range objBatch {

							dir := ""
							if i := strings.LastIndex(obj.ObjectID, "/"); i > -1 {
								dir = obj.ObjectID[:i+1]
							}
							_, exists := processedDirs[dir]
							if exists {
								continue
							}
							processedDirs[dir] = struct{}{}

							dirID, err := m.MakeDirsForPath(ctx, tx, obj.ObjectID)
							if err != nil {
								return fmt.Errorf("failed to create directory %s: %w", obj.ObjectID, err)
							}

							if _, err := tx.Exec(ctx, `
							UPDATE objects
							SET db_directory_id = ?
							WHERE object_id LIKE ? AND
							SUBSTR(object_id, 1, ?) = ? AND
							INSTR(SUBSTR(object_id, ?), '/') = 0
						`,
								dirID,
								dir+"%",
								utf8.RuneCountInString(dir), dir,
								utf8.RuneCountInString(dir)+1); err != nil {
								return fmt.Errorf("failed to update object %s: %w", obj.ObjectID, err)
							}
						}
					}
					log.Info("post-migration directory creation complete")
					if err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00008_directories_2", log); err != nil {
						return fmt.Errorf("failed to migrate: %v", err)
					}
					return nil
				},
			},
			{
				ID: "00009_json_settings",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00009_json_settings", log)
				},
			},
			{
				ID: "00010_webhook_headers",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00010_webhook_headers", log)
				},
			},
			{
				ID: "00011_host_subnets",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00011_host_subnets", log)
				},
			},
			{
				ID: "00012_peer_store",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00012_peer_store", log)
				},
			},
			{
				ID: "00013_coreutils_wallet",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00013_coreutils_wallet", log)
				},
			},
			{
				ID: "00014_hosts_resolvedaddresses",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00014_hosts_resolvedaddresses", log)
				},
			},
			{
				ID: "00015_reset_drift",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00015_reset_drift", log)
				},
			},
			{
				ID: "00016_account_owner",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00016_account_owner", log)
				},
			},
			{
				ID: "00017_unix_ms",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00017_unix_ms", log)
				},
			},
			{
				ID: "00018_directory_buckets",
				Migrate: func(tx Tx) error {

					if err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00018_directory_buckets_1", log); err != nil {
						return fmt.Errorf("failed to migrate: %v", err)
					}

					// fetch all objects
					type obj struct {
						ID     int64
						Path   string
						Bucket string
					}

					rows, err := tx.Query(ctx, "SELECT o.id, o.object_id, b.name FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id")
					if err != nil {
						return fmt.Errorf("failed to fetch objects: %w", err)
					}
					defer rows.Close()

					var objects []obj
					for rows.Next() {
						var o obj
						if err := rows.Scan(&o.ID, &o.Path, &o.Bucket); err != nil {
							return fmt.Errorf("failed to scan object: %w", err)
						}
						objects = append(objects, o)
					}

					memo := make(map[string]int64)
					updates := make(map[int64]int64)
					for _, o := range objects {

						dirs := object.Directories(o.Path)
						last := dirs[len(dirs)-1]
						if _, ok := memo[last]; ok {
							updates[o.ID] = memo[last]
							continue
						}

						dirID, err := m.InsertDirectories(ctx, tx, o.Bucket, o.Path)
						if err != nil {
							return fmt.Errorf("failed to create directory %s in bucket %s: %w", o.Path, o.Bucket, err)
						}
						updates[o.ID] = dirID
						memo[last] = dirID
					}

					stmt, err := tx.Prepare(ctx, "UPDATE objects SET db_directory_id = ? WHERE id = ?")
					if err != nil {
						return fmt.Errorf("failed to prepare update statement: %w", err)
					}
					defer stmt.Close()

					for id, dirID := range updates {
						if _, err := stmt.Exec(ctx, dirID, id); err != nil {
							return fmt.Errorf("failed to update object %d: %w", id, err)
						}
					}

					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00018_directory_buckets_2", log)
				},
			},
			{
				ID: "00019_scan_reset",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00019_scan_reset", log)
				},
			},
			{
				ID: "00020_remove_directories",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00020_remove_directories", log)
				},
			},
		}
	}
	MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration {
		dbIdentifier := "metrics"
		return []Migration{
			{
				ID:      "00001_init",
				Migrate: func(tx Tx) error { return ErrRunV072 },
			},
			{
				ID: "00001_idx_contracts_fcid_timestamp",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00001_idx_contracts_fcid_timestamp", log)
				},
			},
			{
				ID: "00002_idx_wallet_metrics_immature",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00002_idx_wallet_metrics_immature", log)
				},
			},
			{
				ID: "00003_unix_ms",
				Migrate: func(tx Tx) error {
					return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00003_unix_ms", log)
				},
			},
		}
	}
)
View Source
var (
	ErrInvalidNumberOfShards = errors.New("slab has invalid number of shards")
	ErrShardRootChanged      = errors.New("shard root changed")

	ErrRunV072               = errors.New("can't upgrade to >=v1.0.0 from your current version - please upgrade to v0.7.2 first (https://github.com/SiaFoundation/renterd/releases/tag/v0.7.2)")
	ErrMySQLNoSuperPrivilege = errors.New("You do not have the SUPER privilege and binary logging is enabled")
)

Functions

func PerformMigrations

func PerformMigrations(ctx context.Context, m Migrator, fs embed.FS, identifier string, migrations []Migration) error

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

A DB is a wrapper around a *sql.DB that provides additional utility

func NewDB

func NewDB(db *sql.DB, log *zap.Logger, dbLockedMsgs []string, longQueryDuration, longTxDuration time.Duration) (*DB, error)

func (*DB) Close

func (s *DB) Close() error

Close closes the underlying database.

func (*DB) Exec

func (s *DB) Exec(ctx context.Context, query string, args ...any) (sql.Result, error)

exec executes a query without returning any rows. The args are for any placeholder parameters in the query.

func (*DB) Prepare

func (s *DB) Prepare(ctx context.Context, query string) (*LoggedStmt, error)

prepare creates a prepared statement for later queries or executions. Multiple queries or executions may be run concurrently from the returned statement. The caller must call the statement's Close method when the statement is no longer needed.

func (*DB) Query

func (s *DB) Query(ctx context.Context, query string, args ...any) (*LoggedRows, error)

query executes a query that returns rows, typically a SELECT. The args are for any placeholder parameters in the query.

func (*DB) QueryRow

func (s *DB) QueryRow(ctx context.Context, query string, args ...any) *LoggedRow

queryRow executes a query that is expected to return at most one row. QueryRow always returns a non-nil value. Errors are deferred until Row's Scan method is called. If the query selects no rows, the *Row's Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the first selected row and discards the rest.

func (*DB) Transaction

func (s *DB) Transaction(ctx context.Context, fn func(Tx) error) error

transaction executes a function within a database transaction. If the function returns an error, the transaction is rolled back. Otherwise, the transaction is committed. If the transaction fails due to a busy error, it is retried up to 'maxRetryAttempts' times before returning.

type LoggedRow

type LoggedRow struct {
	*sql.Row
	// contains filtered or unexported fields
}

The following types are wrappers for the sql package types, adding logging capabilities.

func (*LoggedRow) Scan

func (lr *LoggedRow) Scan(dest ...any) error

type LoggedRows

type LoggedRows struct {
	*sql.Rows
	// contains filtered or unexported fields
}

The following types are wrappers for the sql package types, adding logging capabilities.

func (*LoggedRows) Next

func (lr *LoggedRows) Next() bool

func (*LoggedRows) Scan

func (lr *LoggedRows) Scan(dest ...any) error

type LoggedStmt

type LoggedStmt struct {
	*sql.Stmt
	// contains filtered or unexported fields
}

The following types are wrappers for the sql package types, adding logging capabilities.

func (*LoggedStmt) Exec

func (ls *LoggedStmt) Exec(ctx context.Context, args ...any) (sql.Result, error)

func (*LoggedStmt) Query

func (ls *LoggedStmt) Query(ctx context.Context, args ...any) (*LoggedRows, error)

func (*LoggedStmt) QueryRow

func (ls *LoggedStmt) QueryRow(ctx context.Context, args ...any) *LoggedRow

type MainMigrator

type MainMigrator interface {
	Migrator
	InsertDirectories(ctx context.Context, tx Tx, bucket, path string) (int64, error)
	MakeDirsForPath(ctx context.Context, tx Tx, path string) (int64, error)
}

type Migration

type Migration struct {
	ID      string
	Migrate func(tx Tx) error
}

type Migrator

type Migrator interface {
	ApplyMigration(ctx context.Context, fn func(tx Tx) (bool, error)) error
	CreateMigrationTable(ctx context.Context) error
	DB() *DB
}

Migrator is an interface for defining database-specific helper methods required during migrations

type Tx

type Tx interface {
	// Exec executes a query without returning any rows. The args are for
	// any placeholder parameters in the query.
	Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
	// Prepare creates a prepared statement for later queries or executions.
	// Multiple queries or executions may be run concurrently from the
	// returned statement. The caller must call the statement's Close method
	// when the statement is no longer needed.
	Prepare(ctx context.Context, query string) (*LoggedStmt, error)
	// Query executes a query that returns rows, typically a SELECT. The
	// args are for any placeholder parameters in the query.
	Query(ctx context.Context, query string, args ...any) (*LoggedRows, error)
	// QueryRow executes a query that is expected to return at most one row.
	// QueryRow always returns a non-nil value. Errors are deferred until
	// Row's Scan method is called. If the query selects no rows, the *Row's
	// Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the
	// first selected row and discards the rest.
	QueryRow(ctx context.Context, query string, args ...any) *LoggedRow
}

A txn is an interface for executing queries within a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL