migrations

package
v0.0.0-...-1f8a15b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2023 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMigrationNonUniqueName   = fmt.Errorf("please provide unique migration name")
	ErrMigrationCommitNotCalled = fmt.Errorf("migration before-commit function was not called")
	ErrMigrationETLFilesDeleted = fmt.Errorf("db migration progress was interrupted after extraction step and ETL files was deleted, please contact development team for help or re-sync from scratch")
)
View Source
var ErrTxsBeginEndNoMigration = fmt.Errorf("in this Erigon version DB format was changed: added additional first/last system-txs to blocks. There is no DB migration for this change. Please re-sync or switch to earlier version")
View Source
var TxsBeginEnd = Migration{
	Name: "txs_begin_end",
	Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) {
		logEvery := time.NewTicker(10 * time.Second)
		defer logEvery.Stop()

		var latestBlock uint64
		if err := db.View(context.Background(), func(tx kv.Tx) error {
			bodiesProgress, err := stages.GetStageProgress(tx, stages.Bodies)
			if err != nil {
				return err
			}
			if progress != nil {
				latestBlock = binary.BigEndian.Uint64(progress)
				logger.Info("[database version migration] Continue migration", "from_block", latestBlock)
			} else {
				latestBlock = bodiesProgress + 1
			}
			return nil
		}); err != nil {
			return err
		}

		tx, err := db.BeginRw(context.Background())
		if err != nil {
			return err
		}
		defer tx.Rollback()

		numBuf := make([]byte, 8)
		numHashBuf := make([]byte, 8+32)
		for i := int(latestBlock); i >= 0; i-- {
			blockNum := uint64(i)

			select {
			case <-logEvery.C:
				var m runtime.MemStats
				dbg.ReadMemStats(&m)
				logger.Info("[database version migration] Adding system-txs",
					"progress", fmt.Sprintf("%.2f%%", 100-100*float64(blockNum)/float64(latestBlock)), "block_num", blockNum,
					"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
			default:
			}

			canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
			if err != nil {
				return err
			}

			var oldBlock *types.Body
			if assert.Enable {
				oldBlock = readCanonicalBodyWithTransactionsDeprecated(tx, canonicalHash, blockNum)
			}

			binary.BigEndian.PutUint64(numHashBuf[:8], blockNum)
			copy(numHashBuf[8:], canonicalHash[:])
			b, err := rawdb.ReadBodyForStorageByKey(tx, numHashBuf)
			if err != nil {
				return err
			}
			if b == nil {
				continue
			}

			txs, err := canonicalTransactions(tx, b.BaseTxId, b.TxAmount)
			if err != nil {
				return err
			}

			b.BaseTxId += (blockNum) * 2
			b.TxAmount += 2
			if err := rawdb.WriteBodyForStorage(tx, canonicalHash, blockNum, b); err != nil {
				return fmt.Errorf("failed to write body: %w", err)
			}

			if err = tx.Delete(kv.EthTx, hexutility.EncodeTs(b.BaseTxId)); err != nil {
				return err
			}
			if err := writeTransactionsNewDeprecated(tx, txs, b.BaseTxId+1); err != nil {
				return fmt.Errorf("failed to write body txs: %w", err)
			}

			if err = tx.Delete(kv.EthTx, hexutility.EncodeTs(b.BaseTxId+uint64(b.TxAmount)-1)); err != nil {
				return err
			}

			if assert.Enable {
				newBlock, baseTxId, txAmount := rawdb.ReadBody(tx, canonicalHash, blockNum)
				newBlock.Transactions, err = canonicalTransactions(tx, baseTxId, txAmount)
				for i, oldTx := range oldBlock.Transactions {
					newTx := newBlock.Transactions[i]
					if oldTx.GetNonce() != newTx.GetNonce() {
						panic(blockNum)
					}
				}
			}

			if err = tx.ForPrefix(kv.BlockBody, numHashBuf[:8], func(k, v []byte) error {
				if bytes.Equal(k, numHashBuf) {
					return nil
				}
				bodyForStorage := new(types.BodyForStorage)
				if err := rlp.DecodeBytes(v, bodyForStorage); err != nil {
					return err
				}

				for i := bodyForStorage.BaseTxId; i < bodyForStorage.BaseTxId+uint64(bodyForStorage.TxAmount); i++ {
					binary.BigEndian.PutUint64(numBuf, i)
					if err = tx.Delete(kv.NonCanonicalTxs, numBuf); err != nil {
						return err
					}
				}

				if err = tx.Delete(kv.BlockBody, k); err != nil {
					return err
				}
				if err = tx.Delete(kv.Headers, k); err != nil {
					return err
				}
				if err = tx.Delete(kv.HeaderTD, k); err != nil {
					return err
				}
				if err = tx.Delete(kv.HeaderNumber, k[8:]); err != nil {
					return err
				}
				if err = tx.Delete(kv.HeaderNumber, k[8:]); err != nil {
					return err
				}

				return nil
			}); err != nil {
				return err
			}

			binary.BigEndian.PutUint64(numBuf, blockNum)
			if err := BeforeCommit(tx, numBuf, false); err != nil {
				return err
			}
			if blockNum%10_000 == 0 {
				if err := tx.Commit(); err != nil {
					return err
				}
				tx, err = db.BeginRw(context.Background())
				if err != nil {
					return err
				}
			}
		}

		if err := tx.Commit(); err != nil {
			return err
		}

		return db.Update(context.Background(), func(tx kv.RwTx) error {

			v, err := tx.ReadSequence(kv.NonCanonicalTxs)
			if err != nil {
				return err
			}
			if _, err := tx.IncrementSequence(kv.NonCanonicalTxs, -v); err != nil {
				return err
			}

			{
				c, err := tx.Cursor(kv.HeaderCanonical)
				if err != nil {
					return err
				}
				k, v, err := c.Last()
				if err != nil {
					return err
				}
				data, err := tx.GetOne(kv.BlockBody, append(k, v...))
				if err != nil {
					return err
				}
				var newSeqValue uint64
				if len(data) > 0 {
					bodyForStorage := new(types.BodyForStorage)
					if err := rlp.DecodeBytes(data, bodyForStorage); err != nil {
						return fmt.Errorf("rlp.DecodeBytes(bodyForStorage): %w", err)
					}
					currentSeq, err := tx.ReadSequence(kv.EthTx)
					if err != nil {
						return err
					}
					newSeqValue = bodyForStorage.BaseTxId + uint64(bodyForStorage.TxAmount) - currentSeq
				}

				if _, err := tx.IncrementSequence(kv.EthTx, newSeqValue); err != nil {
					return err
				}
			}

			return BeforeCommit(tx, nil, true)
		})
	},
}
View Source
var TxsV3 = Migration{
	Name: "txs_v3",
	Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) (err error) {
		logEvery := time.NewTicker(10 * time.Second)
		defer logEvery.Stop()

		txIDBytes := make([]byte, 8)

		if err := db.Update(context.Background(), func(tx kv.RwTx) error {
			from := hexutility.EncodeTs(1)
			if err := tx.ForEach(kv.BlockBody, from, func(k, _ []byte) error {
				blockNum := binary.BigEndian.Uint64(k)
				select {
				case <-logEvery.C:
					var m runtime.MemStats
					dbg.ReadMemStats(&m)
					logger.Info("[txs_v3] Migration progress", "block_num", blockNum,
						"alloc", common2.ByteCount(m.Alloc), "sys", common2.ByteCount(m.Sys))
				default:
				}

				canonicalHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
				if err != nil {
					return err
				}
				isCanonical := bytes.Equal(k[8:], canonicalHash[:])
				if isCanonical {
					return nil
				}
				b, err := rawdb.ReadBodyForStorageByKey(tx, k)
				if err != nil {
					return err
				}
				if b == nil {
					log.Debug("PruneBlocks: block body not found", "height", blockNum)
				} else {
					for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ {
						binary.BigEndian.PutUint64(txIDBytes, txID)
						if err = tx.Delete(kv.NonCanonicalTxs, txIDBytes); err != nil {
							return err
						}
					}
				}

				kCopy := common2.CopyBytes(k)
				if err = tx.Delete(kv.Senders, kCopy); err != nil {
					return err
				}
				if err = tx.Delete(kv.BlockBody, kCopy); err != nil {
					return err
				}
				if err = tx.Delete(kv.Headers, kCopy); err != nil {
					return err
				}
				return nil
			}); err != nil {
				return err
			}

			return BeforeCommit(tx, nil, true)
		}); err != nil {
			return err
		}
		return nil
	},
}

Functions

func AppliedMigrations

func AppliedMigrations(tx kv.Tx, withPayload bool) (map[string][]byte, error)

func MakeBodiesNonCanonicalDeprecated

func MakeBodiesNonCanonicalDeprecated(tx kv.RwTx, from uint64, ctx context.Context, logPrefix string, logEvery *time.Ticker) error

func MarshalMigrationPayload

func MarshalMigrationPayload(db kv.Getter) ([]byte, error)

func UnmarshalMigrationPayload

func UnmarshalMigrationPayload(data []byte) (map[string][]byte, error)

Types

type Callback

type Callback func(tx kv.RwTx, progress []byte, isDone bool) error

type Migration

type Migration struct {
	Name string
	Up   func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) error
}

type Migrator

type Migrator struct {
	Migrations []Migration
}

func NewMigrator

func NewMigrator(label kv.Label) *Migrator

func (*Migrator) Apply

func (m *Migrator) Apply(db kv.RwDB, dataDir string, logger log.Logger) error

func (*Migrator) HasPendingMigrations

func (m *Migrator) HasPendingMigrations(db kv.RwDB) (bool, error)

func (*Migrator) PendingMigrations

func (m *Migrator) PendingMigrations(tx kv.Tx) ([]Migration, error)

func (*Migrator) VerifyVersion

func (m *Migrator) VerifyVersion(db kv.RwDB) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL