Documentation
¶
Index ¶
- Variables
- func AppliedMigrations(tx kv.Tx, withPayload bool) (map[string][]byte, error)
- func MakeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, ...) error
- func MarshalMigrationPayload(db kv.Getter) ([]byte, error)
- func UnmarshalMigrationPayload(data []byte) (map[string][]byte, error)
- type Callback
- type Migration
- type Migrator
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrMigrationNonUniqueName = fmt.Errorf("please provide unique migration name") ErrMigrationCommitNotCalled = fmt.Errorf("migration before-commit function was not called") ErrMigrationETLFilesDeleted = fmt.Errorf( "db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch", ) )
View Source
var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version")
View Source
var ProhibitNewDownloadsLock = Migration{ Name: "prohibit_new_downloads_lock", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { tx, err := db.BeginRw(context.Background()) if err != nil { return err } defer tx.Rollback() snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) if err != nil { return err } if snapshotsStageProgress > 0 { fPath := filepath.Join(dirs.Snap, downloader.ProhibitNewDownloadsFileName) if !dir.FileExist(fPath) { f, err := os.Create(fPath) if err != nil { return err } defer f.Close() if err := f.Sync(); err != nil { return err } } } if err := BeforeCommit(tx, nil, true); err != nil { return err } return tx.Commit() }, }
View Source
var ProhibitNewDownloadsLock2 = Migration{ Name: "prohibit_new_downloads_lock2", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { tx, err := db.BeginRw(context.Background()) if err != nil { return err } defer tx.Rollback() fPath := filepath.Join(dirs.Snap, downloader.ProhibitNewDownloadsFileName) if !dir.FileExist(fPath) { if err := BeforeCommit(tx, nil, true); err != nil { return err } return tx.Commit() } content, err := os.ReadFile(fPath) if err != nil { return err } if len(content) == 0 { locked := []string{} for _, t := range coresnaptype.BlockSnapshotTypes { locked = append(locked, t.Name()) } for _, t := range borsnaptype.BorSnapshotTypes() { locked = append(locked, t.Name()) } for _, t := range snaptype.CaplinSnapshotTypes { if t.Name() != snaptype.BlobSidecars.Name() { locked = append(locked, t.Name()) } } newContent, err := json.Marshal(locked) if err != nil { return err } if err := os.WriteFile(fPath, newContent, fs.FileMode(os.O_TRUNC|os.O_WRONLY)); err != nil { return err } } if err := BeforeCommit(tx, nil, true); err != nil { return err } return tx.Commit() }, }
Switch to the second version of download.lock.
View Source
var TxsBeginEnd = Migration{ Name: "txs_begin_end", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { logEvery := time.NewTicker(10 * time.Second) defer logEvery.Stop() var latestBlock uint64 if err := db.View(context.Background(), func(tx kv.Tx) error { bodiesProgress, err := stages.GetStageProgress(tx, stages.Bodies) if err != nil { return err } if progress != nil { latestBlock = binary.BigEndian.Uint64(progress) logger.Info("[database version migration] Continue migration", "from_block", latestBlock) } else { latestBlock = bodiesProgress + 1 } return nil }); err != nil { return err } tx, err := db.BeginRw(context.Background()) if err != nil { return err } defer tx.Rollback() numBuf := make([]byte, 8) numHashBuf := make([]byte, 8+32) for i := int(latestBlock); i >= 0; i-- { blockNum := uint64(i) select { case <-logEvery.C: var m runtime.MemStats dbg.ReadMemStats(&m) logger.Info("[database version migration] Adding system-txs", "progress", fmt.Sprintf("%.2f%%", 100-100*float64(blockNum)/float64(latestBlock)), "block_num", blockNum, "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) default: } canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum) if err != nil { return err } var oldBlock *types.Body if assert.Enable { oldBlock = readCanonicalBodyWithTransactionsDeprecated(tx, canonicalHash, blockNum) } binary.BigEndian.PutUint64(numHashBuf[:8], blockNum) copy(numHashBuf[8:], canonicalHash[:]) b, err := rawdb.ReadBodyForStorageByKey(tx, numHashBuf) if err != nil { return err } if b == nil { continue } txs, err := canonicalTransactions(tx, b.BaseTxId, b.TxAmount) if err != nil { return err } b.BaseTxId += (blockNum) * 2 b.TxAmount += 2 if err := rawdb.WriteBodyForStorage(tx, canonicalHash, blockNum, b); err != nil { return fmt.Errorf("failed to write body: %w", err) } if err = tx.Delete(kv.EthTx, hexutility.EncodeTs(b.BaseTxId)); err != nil { return err } if err := writeTransactionsNewDeprecated(tx, txs, b.BaseTxId+1); err != nil { return fmt.Errorf("failed to write body txs: %w", err) } if err = tx.Delete(kv.EthTx, hexutility.EncodeTs(b.BaseTxId+uint64(b.TxAmount)-1)); err != nil { return err } if assert.Enable { newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum) newBlock.Transactions, err = canonicalTransactions(tx, baseTxId, txAmount) for i, oldTx := range oldBlock.Transactions { newTx := newBlock.Transactions[i] if oldTx.GetNonce() != newTx.GetNonce() { panic(blockNum) } } } if err = tx.ForPrefix(kv.BlockBody, numHashBuf[:8], func(k, v []byte) error { if bytes.Equal(k, numHashBuf) { return nil } bodyForStorage := new(types.BodyForStorage) if err := rlp2.DecodeBytes(v, bodyForStorage); err != nil { return err } for i := bodyForStorage.BaseTxId; i < bodyForStorage.BaseTxId+uint64(bodyForStorage.TxAmount); i++ { binary.BigEndian.PutUint64(numBuf, i) if err = tx.Delete(kv.NonCanonicalTxs, numBuf); err != nil { return err } } if err = tx.Delete(kv.BlockBody, k); err != nil { return err } if err = tx.Delete(kv.Headers, k); err != nil { return err } if err = tx.Delete(kv.HeaderTD, k); err != nil { return err } if err = tx.Delete(kv.HeaderNumber, k[8:]); err != nil { return err } if err = tx.Delete(kv.HeaderNumber, k[8:]); err != nil { return err } return nil }); err != nil { return err } binary.BigEndian.PutUint64(numBuf, blockNum) if err := BeforeCommit(tx, numBuf, false); err != nil { return err } if blockNum%10_000 == 0 { if err := tx.Commit(); err != nil { return err } tx, err = db.BeginRw(context.Background()) if err != nil { return err } } } if err := tx.Commit(); err != nil { return err } return db.Update(context.Background(), func(tx kv.RwTx) error { v, err := tx.ReadSequence(kv.NonCanonicalTxs) if err != nil { return err } if _, err := tx.IncrementSequence(kv.NonCanonicalTxs, -v); err != nil { return err } { c, err := tx.Cursor(kv.HeaderCanonical) if err != nil { return err } k, v, err := c.Last() if err != nil { return err } data, err := tx.GetOne(kv.BlockBody, append(k, v...)) if err != nil { return err } var newSeqValue uint64 if len(data) > 0 { bodyForStorage := new(types.BodyForStorage) if err := rlp2.DecodeBytes(data, bodyForStorage); err != nil { return fmt.Errorf("rlp.DecodeBytes(bodyForStorage): %w", err) } currentSeq, err := tx.ReadSequence(kv.EthTx) if err != nil { return err } newSeqValue = bodyForStorage.BaseTxId + uint64(bodyForStorage.TxAmount) - currentSeq } if _, err := tx.IncrementSequence(kv.EthTx, newSeqValue); err != nil { return err } } return BeforeCommit(tx, nil, true) }) }, }
View Source
var TxsV3 = Migration{ Name: "txs_v3", Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) { logEvery := time.NewTicker(10 * time.Second) defer logEvery.Stop() txIDBytes := make([]byte, 8) if err := db.Update(context.Background(), func(tx kv.RwTx) error { from := hexutility.EncodeTs(1) if err := tx.ForEach(kv.BlockBody, from, func(k, _ []byte) error { blockNum := binary.BigEndian.Uint64(k) select { case <-logEvery.C: var m runtime.MemStats dbg.ReadMemStats(&m) logger.Info("[txs_v3] Migration progress", "block_num", blockNum, "alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys)) default: } canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum) if err != nil { return err } isCanonical := bytes.Equal(k[8:], canonicalHash[:]) if isCanonical { return nil } b, err := rawdb.ReadBodyForStorageByKey(tx, k) if err != nil { return err } if b == nil { log.Debug("PruneBlocks: block body not found", "height", blockNum) } else { for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ { binary.BigEndian.PutUint64(txIDBytes, txID) if err = tx.Delete(kv.NonCanonicalTxs, txIDBytes); err != nil { return err } } } kCopy := common2.CopyBytes(k) if err = tx.Delete(kv.Senders, kCopy); err != nil { return err } if err = tx.Delete(kv.BlockBody, kCopy); err != nil { return err } if err = tx.Delete(kv.Headers, kCopy); err != nil { return err } return nil }); err != nil { return err } return BeforeCommit(tx, nil, true) }); err != nil { return err } return nil }, }
Functions ¶
func AppliedMigrations ¶
Types ¶
type Migrator ¶
type Migrator struct {
Migrations []Migration
}
func NewMigrator ¶
func (*Migrator) HasPendingMigrations ¶
func (*Migrator) PendingMigrations ¶
Click to show internal directories.
Click to hide internal directories.