Documentation ¶
Index ¶
- Constants
- Variables
- func PerformMigrations(ctx context.Context, m Migrator, fs embed.FS, identifier string, ...) error
- type DB
- func (s *DB) Close() error
- func (s *DB) Exec(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (s *DB) Prepare(ctx context.Context, query string) (*LoggedStmt, error)
- func (s *DB) Query(ctx context.Context, query string, args ...any) (*LoggedRows, error)
- func (s *DB) QueryRow(ctx context.Context, query string, args ...any) *LoggedRow
- func (s *DB) Transaction(ctx context.Context, fn func(Tx) error) error
- type LoggedRow
- type LoggedRows
- type LoggedStmt
- type MainMigrator
- type Migration
- type Migrator
- type Tx
Constants ¶
const (
ConsensusInfoID = 1
)
Variables ¶
var ( MainMigrations = func(ctx context.Context, m MainMigrator, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration { dbIdentifier := "main" return []Migration{ { ID: "00001_init", Migrate: func(tx Tx) error { return ErrRunV072 }, }, { ID: "00001_object_metadata", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00001_object_metadata", log) }, }, { ID: "00002_prune_slabs_trigger", Migrate: func(tx Tx) error { err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00002_prune_slabs_trigger", log) if utils.IsErr(err, ErrMySQLNoSuperPrivilege) { log.Warn("migration 00002_prune_slabs_trigger requires the user to have the SUPER privilege to register triggers") } return err }, }, { ID: "00003_idx_objects_size", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00003_idx_objects_size", log) }, }, { ID: "00004_prune_slabs_cascade", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00004_prune_slabs_cascade", log) }, }, { ID: "00005_zero_size_object_health", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00005_zero_size_object_health", log) }, }, { ID: "00006_idx_objects_created_at", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00006_idx_objects_created_at", log) }, }, { ID: "00007_host_checks", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00007_host_checks", log) }, }, { ID: "00008_directories", Migrate: func(tx Tx) error { if err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00008_directories_1", log); err != nil { return fmt.Errorf("failed to migrate: %v", err) } // helper type type obj struct { ID uint ObjectID string } log.Info("beginning post-migration directory creation, this might take a while") batchSize := 10000 processedDirs := make(map[string]struct{}) for offset := 0; ; offset += batchSize { if offset > 0 && offset%batchSize == 0 { log.Infof("processed %v objects", offset) } var objBatch []obj rows, err := tx.Query(ctx, "SELECT id, object_id FROM objects ORDER BY id LIMIT ? OFFSET ?", batchSize, offset) if err != nil { return fmt.Errorf("failed to fetch objects: %v", err) } for rows.Next() { var o obj if err := rows.Scan(&o.ID, &o.ObjectID); err != nil { _ = rows.Close() return fmt.Errorf("failed to scan object: %v", err) } objBatch = append(objBatch, o) } if err := rows.Close(); err != nil { return fmt.Errorf("failed to close rows: %v", err) } if len(objBatch) == 0 { break } for _, obj := range objBatch { dir := "" if i := strings.LastIndex(obj.ObjectID, "/"); i > -1 { dir = obj.ObjectID[:i+1] } _, exists := processedDirs[dir] if exists { continue } processedDirs[dir] = struct{}{} dirID, err := m.MakeDirsForPath(ctx, tx, obj.ObjectID) if err != nil { return fmt.Errorf("failed to create directory %s: %w", obj.ObjectID, err) } if _, err := tx.Exec(ctx, ` UPDATE objects SET db_directory_id = ? WHERE object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ? AND INSTR(SUBSTR(object_id, ?), '/') = 0 `, dirID, dir+"%", utf8.RuneCountInString(dir), dir, utf8.RuneCountInString(dir)+1); err != nil { return fmt.Errorf("failed to update object %s: %w", obj.ObjectID, err) } } } log.Info("post-migration directory creation complete") if err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00008_directories_2", log); err != nil { return fmt.Errorf("failed to migrate: %v", err) } return nil }, }, { ID: "00009_json_settings", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00009_json_settings", log) }, }, { ID: "00010_webhook_headers", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00010_webhook_headers", log) }, }, { ID: "00011_host_subnets", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00011_host_subnets", log) }, }, { ID: "00012_peer_store", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00012_peer_store", log) }, }, { ID: "00013_coreutils_wallet", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00013_coreutils_wallet", log) }, }, { ID: "00014_hosts_resolvedaddresses", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00014_hosts_resolvedaddresses", log) }, }, { ID: "00015_reset_drift", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00015_reset_drift", log) }, }, { ID: "00016_account_owner", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00016_account_owner", log) }, }, { ID: "00017_unix_ms", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00017_unix_ms", log) }, }, { ID: "00018_directory_buckets", Migrate: func(tx Tx) error { if err := performMigration(ctx, tx, migrationsFs, dbIdentifier, "00018_directory_buckets_1", log); err != nil { return fmt.Errorf("failed to migrate: %v", err) } // fetch all objects type obj struct { ID int64 Path string Bucket string } rows, err := tx.Query(ctx, "SELECT o.id, o.object_id, b.name FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id") if err != nil { return fmt.Errorf("failed to fetch objects: %w", err) } defer rows.Close() var objects []obj for rows.Next() { var o obj if err := rows.Scan(&o.ID, &o.Path, &o.Bucket); err != nil { return fmt.Errorf("failed to scan object: %w", err) } objects = append(objects, o) } memo := make(map[string]int64) updates := make(map[int64]int64) for _, o := range objects { dirs := object.Directories(o.Path) last := dirs[len(dirs)-1] if _, ok := memo[last]; ok { updates[o.ID] = memo[last] continue } dirID, err := m.InsertDirectories(ctx, tx, o.Bucket, o.Path) if err != nil { return fmt.Errorf("failed to create directory %s in bucket %s: %w", o.Path, o.Bucket, err) } updates[o.ID] = dirID memo[last] = dirID } stmt, err := tx.Prepare(ctx, "UPDATE objects SET db_directory_id = ? WHERE id = ?") if err != nil { return fmt.Errorf("failed to prepare update statement: %w", err) } defer stmt.Close() for id, dirID := range updates { if _, err := stmt.Exec(ctx, dirID, id); err != nil { return fmt.Errorf("failed to update object %d: %w", id, err) } } return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00018_directory_buckets_2", log) }, }, { ID: "00019_scan_reset", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00019_scan_reset", log) }, }, { ID: "00020_remove_directories", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00020_remove_directories", log) }, }, } } MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration { dbIdentifier := "metrics" return []Migration{ { ID: "00001_init", Migrate: func(tx Tx) error { return ErrRunV072 }, }, { ID: "00001_idx_contracts_fcid_timestamp", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00001_idx_contracts_fcid_timestamp", log) }, }, { ID: "00002_idx_wallet_metrics_immature", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00002_idx_wallet_metrics_immature", log) }, }, { ID: "00003_unix_ms", Migrate: func(tx Tx) error { return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00003_unix_ms", log) }, }, } } )
var ( ErrInvalidNumberOfShards = errors.New("slab has invalid number of shards") ErrShardRootChanged = errors.New("shard root changed") ErrRunV072 = errors.New("can't upgrade to >=v1.0.0 from your current version - please upgrade to v0.7.2 first (https://github.com/SiaFoundation/renterd/releases/tag/v0.7.2)") ErrMySQLNoSuperPrivilege = errors.New("You do not have the SUPER privilege and binary logging is enabled") )
Functions ¶
Types ¶
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
A DB is a wrapper around a *sql.DB that provides additional utility
func (*DB) Exec ¶
exec executes a query without returning any rows. The args are for any placeholder parameters in the query.
func (*DB) Prepare ¶
prepare creates a prepared statement for later queries or executions. Multiple queries or executions may be run concurrently from the returned statement. The caller must call the statement's Close method when the statement is no longer needed.
func (*DB) Query ¶
query executes a query that returns rows, typically a SELECT. The args are for any placeholder parameters in the query.
func (*DB) QueryRow ¶
queryRow executes a query that is expected to return at most one row. QueryRow always returns a non-nil value. Errors are deferred until Row's Scan method is called. If the query selects no rows, the *Row's Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the first selected row and discards the rest.
func (*DB) Transaction ¶
transaction executes a function within a database transaction. If the function returns an error, the transaction is rolled back. Otherwise, the transaction is committed. If the transaction fails due to a busy error, it is retried up to 'maxRetryAttempts' times before returning.
type LoggedRow ¶
The following types are wrappers for the sql package types, adding logging capabilities.
type LoggedRows ¶
The following types are wrappers for the sql package types, adding logging capabilities.
func (*LoggedRows) Next ¶
func (lr *LoggedRows) Next() bool
func (*LoggedRows) Scan ¶
func (lr *LoggedRows) Scan(dest ...any) error
type LoggedStmt ¶
The following types are wrappers for the sql package types, adding logging capabilities.
func (*LoggedStmt) Query ¶
func (ls *LoggedStmt) Query(ctx context.Context, args ...any) (*LoggedRows, error)
type MainMigrator ¶
type Migrator ¶
type Migrator interface { ApplyMigration(ctx context.Context, fn func(tx Tx) (bool, error)) error CreateMigrationTable(ctx context.Context) error DB() *DB }
Migrator is an interface for defining database-specific helper methods required during migrations
type Tx ¶
type Tx interface { // Exec executes a query without returning any rows. The args are for // any placeholder parameters in the query. Exec(ctx context.Context, query string, args ...any) (sql.Result, error) // Prepare creates a prepared statement for later queries or executions. // Multiple queries or executions may be run concurrently from the // returned statement. The caller must call the statement's Close method // when the statement is no longer needed. Prepare(ctx context.Context, query string) (*LoggedStmt, error) // Query executes a query that returns rows, typically a SELECT. The // args are for any placeholder parameters in the query. Query(ctx context.Context, query string, args ...any) (*LoggedRows, error) // QueryRow executes a query that is expected to return at most one row. // QueryRow always returns a non-nil value. Errors are deferred until // Row's Scan method is called. If the query selects no rows, the *Row's // Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the // first selected row and discards the rest. QueryRow(ctx context.Context, query string, args ...any) *LoggedRow }
A txn is an interface for executing queries within a transaction.