Documentation ¶
Index ¶
- Constants
- Variables
- func DetectAndSegmentRangeByChunkSize(readConf postgres.Config, chunkSize uint64) ([][2]uint64, error)
- func NewDB(ctx context.Context, conf postgres.Config) (*sqlx.DB, error)
- func NewTableReadModels(tableName TableName) (interface{}, error)
- func NewTableTransformer(table TableName) interfaces.Transformer
- func NewTableTransformerSet(tables []TableName) map[TableName]interfaces.Transformer
- func SegmentRangeByChunkSize(chunkSize, start, stop uint64) [][2]uint64
- type Config
- type Migrator
- type MinAndMux
- type Reader
- type Service
- func (s *Service) Close() error
- func (s *Service) Migrate(wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error)
- func (s *Service) Transfer(wg *sync.WaitGroup, fdwTableName string, ...) (chan [2]uint64, chan struct{}, chan error, error)
- func (s *Service) TransformToCSV(csvWriter csv.Writer, wg *sync.WaitGroup, tableName TableName, ...) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error)
- type TableName
Constants ¶
const ( LOGRUS_FILE = "LOGRUS_FILE" LOGRUS_LEVEL = "LOGRUS_LEVEL" LOG_READ_GAPS_DIR = "LOG_READ_GAPS_DIR" LOG_WRITE_GAPS_DIR = "LOG_WRITE_GAPS_DIR" LOG_TRANSFER_GAPS_DIR = "LOG_TRANSFER_GAPS_DIR" MIGRATION_START = "MIGRATION_START" MIGRATION_STOP = "MIGRATION_STOP" MIGRATION_TABLE_NAMES = "MIGRATION_TABLE_NAMES" MIGRATION_WORKERS_PER_TABLE = "MIGRATION_WORKERS_PER_TABLE" MIGRATION_AUTO_RANGE = "MIGRATION_AUTO_RANGE" MIGRATION_AUTO_RANGE_SEGMENT_SIZE = "MIGRATION_AUTO_RANGE_SEGMENT_SIZE" TRANSFER_TABLE_NAME = "TRANSFER_TABLE_NAME" TRANSFER_SEGMENT_SIZE = "TRANSFER_SEGMENT_SIZE" TRANSFER_SEGMENT_OFFSET = "TRANSFER_SEGMENT_OFFSET" TRANSFER_MAX_PAGE = "TRANSFER_MAX_PAGE" OLD_DATABASE_NAME = "OLD_DATABASE_NAME" OLD_DATABASE_HOSTNAME = "OLD_DATABASE_HOSTNAME" OLD_DATABASE_PORT = "OLD_DATABASE_PORT" OLD_DATABASE_USER = "OLD_DATABASE_USER" OLD_DATABASE_PASSWORD = "OLD_DATABASE_PASSWORD" OLD_DATABASE_MAX_IDLE_CONNECTIONS = "OLD_DATABASE_MAX_IDLE_CONNECTIONS" OLD_DATABASE_MAX_OPEN_CONNECTIONS = "OLD_DATABASE_MAX_OPEN_CONNECTIONS" OLD_DATABASE_MAX_CONN_LIFETIME = "OLD_DATABASE_MAX_CONN_LIFETIME" NEW_DATABASE_NAME = "NEW_DATABASE_NAME" NEW_DATABASE_HOSTNAME = "NEW_DATABASE_HOSTNAME" NEW_DATABASE_PORT = "NEW_DATABASE_PORT" NEW_DATABASE_USER = "NEW_DATABASE_USER" NEW_DATABASE_PASSWORD = "NEW_DATABASE_PASSWORD" NEW_DATABASE_MAX_IDLE_CONNECTIONS = "NEW_DATABASE_MAX_IDLE_CONNECTIONS" NEW_DATABASE_MAX_OPEN_CONNECTIONS = "NEW_DATABASE_MAX_OPEN_CONNECTIONS" NEW_DATABASE_MAX_CONN_LIFETIME = "NEW_DATABASE_MAX_CONN_LIFETIME" )
ENV bindings
const ( TOML_LOGRUS_FILE = "log.file" TOML_LOGRUS_LEVEL = "log.level" TOML_LOG_READ_GAPS_DIR = "log.readGapsDir" TOML_LOG_WRITE_GAPS_DIR = "log.writeGapsDir" TOML_LOG_TRANSFER_GAPS_DIR = "log.transferGapDir" TOML_MIGRATION_RANGES = "migrator.ranges" TOML_MIGRATION_START = "migrator.start" TOML_MIGRATION_STOP = "migrator.stop" TOML_MIGRATION_TABLE_NAMES = "migrator.migrationTableNames" TOML_MIGRATION_WORKERS_PER_TABLE = "migrator.workersPerTable" TOML_MIGRATION_AUTO_RANGE = "migrator.autoRange" TOML_MIGRATION_AUTO_RANGE_SEGMENT_SIZE = "migrator.segmentSize" TOML_TRANSFER_TABLE_NAME = "migrator.transferTableName" TOML_TRANSFER_SEGMENT_SIZE = "migrator.pagesPerTx" TOML_TRANSFER_SEGMENT_OFFSET = "migrator.segmentOffset" TOML_TRANSFER_MAX_PAGE = "migrator.maxPage" TOML_OLD_DATABASE_NAME = "old.databaseName" TOML_OLD_DATABASE_HOSTNAME = "old.databaseHostName" TOML_OLD_DATABASE_PORT = "old.databasePort" TOML_OLD_DATABASE_USER = "old.databaseUser" TOML_OLD_DATABASE_PASSWORD = "old.databasePassword" TOML_OLD_DATABASE_MAX_IDLE_CONNECTIONS = "old.databaseMaxIdleConns" TOML_OLD_DATABASE_MAX_OPEN_CONNECTIONS = "old.databaseMaxOpenConns" TOML_OLD_DATABASE_MAX_CONN_LIFETIME = "old.databaseMaxConnLifetime" TOML_NEW_DATABASE_NAME = "new.databaseName" TOML_NEW_DATABASE_HOSTNAME = "new.databaseHostName" TOML_NEW_DATABASE_PORT = "new.databasePort" TOML_NEW_DATABASE_USER = "new.databaseUser" TOML_NEW_DATABASE_PASSWORD = "new.databasePassword" TOML_NEW_DATABASE_MAX_IDLE_CONNECTIONS = "new.databaseMaxIdleConns" TOML_NEW_DATABASE_MAX_OPEN_CONNECTIONS = "new.databaseMaxOpenConns" TOML_NEW_DATABASE_MAX_CONN_LIFETIME = "new.databaseMaxConnLifetime" )
TOML mappings
const ( CLI_LOGRUS_FILE = "log-file" CLI_LOGRUS_LEVEL = "log-level" CLI_LOG_READ_GAPS_DIR = "read-gaps-dir" CLI_LOG_WRITE_GAPS_DIR = "write-gaps-dir" CLI_LOG_TRANSFER_GAPS_DIR = "transfer-gap-dir" CLI_MIGRATION_START = "start-height" CLI_MIGRATION_STOP = "stop-height" CLI_MIGRATION_TABLE_NAMES = "migration-table-names" CLI_MIGRATION_WORKERS_PER_TABLE = "workers-per-table" CLI_MIGRATION_AUTO_RANGE = "auto-range" CLI_MIGRATION_AUTO_RANGE_SEGMENT_SIZE = "migration-segment-size" CLI_TRANSFER_TABLE_NAME = "transfer-table-name" CLI_TRANSFER_SEGMENT_SIZE = "transfer-segment-size" CLI_TRANSFER_SEGMENT_OFFSET = "transfer-segment-offset" CLI_TRANSFER_MAX_PAGE = "transfer-max-page" CLI_OLD_DATABASE_NAME = "old-db-name" CLI_OLD_DATABASE_HOSTNAME = "old-db-hostname" CLI_OLD_DATABASE_PORT = "old-db-port" CLI_OLD_DATABASE_USER = "old-db-username" CLI_OLD_DATABASE_PASSWORD = "old-db-password" CLI_OLD_DATABASE_MAX_IDLE_CONNECTIONS = "old-db-max-idle" CLI_OLD_DATABASE_MAX_OPEN_CONNECTIONS = "old-db-max-open" CLI_OLD_DATABASE_MAX_CONN_LIFETIME = "old-db-max-lifetime" CLI_NEW_DATABASE_NAME = "new-db-name" CLI_NEW_DATABASE_HOSTNAME = "new-db-hostname" CLI_NEW_DATABASE_PORT = "new-db-port" CLI_NEW_DATABASE_USER = "new-db-username" CLI_NEW_DATABASE_PASSWORD = "new-db-password" CLI_NEW_DATABASE_MAX_IDLE_CONNECTIONS = "new-db-max-idle" CLI_NEW_DATABASE_MAX_OPEN_CONNECTIONS = "new-db-max-open" CLI_NEW_DATABASE_MAX_CONN_LIFETIME = "new-db-max-lifetime" )
CLI flags
const PgReadMinAndMaxBlockNumbers = `SELECT MIN(block_number) min, MAX(block_number) max
FROM eth.header_cids`
PgReadMinAndMaxBlockNumbers for finding the min and max block height in the DB
Variables ¶
var ( // block data TestConfig = params.RopstenChainConfig BlockNumber = TestConfig.LondonBlock MockHeader = types.Header{ Time: 0, Number: new(big.Int).Set(BlockNumber), Root: common.HexToHash("0x0"), TxHash: common.HexToHash("0x0"), ReceiptHash: common.HexToHash("0x0"), Difficulty: big.NewInt(5000000), Extra: []byte{}, BaseFee: big.NewInt(params.InitialBaseFee), Coinbase: common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476777"), } MockTransactions, MockReceipts, SenderAddr = createTransactionsAndReceipts(TestConfig, BlockNumber) MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts, new(trie.Trie)) Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") AnotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") ContractAddress = crypto.CreateAddress(SenderAddr, MockTransactions[2].Nonce()) MockContractByteCode = []byte{0, 1, 2, 3, 4, 5} MockLog1 = &types.Log{ Address: Address, Topics: []common.Hash{mockTopic11, mockTopic12}, Data: []byte{}, } MockLog2 = &types.Log{ Address: AnotherAddress, Topics: []common.Hash{mockTopic21, mockTopic22}, Data: []byte{}, } MockLog3 = &types.Log{ Address: Address, Topics: []common.Hash{mockTopic11, mockTopic22}, Data: []byte{}, } MockLog4 = &types.Log{ Address: AnotherAddress, Topics: []common.Hash{mockTopic21, mockTopic12}, Data: []byte{}, } ShortLog1 = &types.Log{ Address: AnotherAddress, Topics: []common.Hash{}, Data: []byte{}, } ShortLog1RLP, _ = rlp.EncodeToBytes(ShortLog1) ShortLog1CID, _ = ipld.RawdataToCid(ipld.MEthLog, ShortLog1RLP, multihash.KECCAK_256) ShotLog1MhKey = blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(ShortLog1CID.Hash()).String() ShortLog2 = &types.Log{ Address: Address, Topics: []common.Hash{}, Data: []byte{}, } ShortLog2RLP, _ = rlp.EncodeToBytes(ShortLog2) ShortLog2CID, _ = ipld.RawdataToCid(ipld.MEthLog, ShortLog2RLP, multihash.KECCAK_256) ShotLog2MhKey = blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(ShortLog2CID.Hash()).String() // access list entries AccessListEntry1 = types.AccessTuple{ Address: Address, } AccessListEntry2 = types.AccessTuple{ Address: AnotherAddress, StorageKeys: []common.Hash{common.BytesToHash(StorageLeafKey), common.BytesToHash(MockStorageLeafKey)}, } StorageLeafKey = crypto.Keccak256Hash(storageLocation[:]).Bytes() MockStorageLeafKey = crypto.Keccak256Hash(mockStorageLocation[:]).Bytes() StorageValue = common.Hex2Bytes("01") StoragePartialPath = common.Hex2Bytes("20290decd9548b62a8d60345a988386fc84ba6bc95484008f6362f93160ef3e563") StorageLeafNode, _ = rlp.EncodeToBytes([]interface{}{ StoragePartialPath, StorageValue, }) ContractRoot = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0" ContractCodeHash = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea") ContractLeafKey = test_helpers.AddressToLeafKey(ContractAddress) ContractAccount, _ = rlp.EncodeToBytes(&types.StateAccount{ Nonce: nonce1, Balance: big.NewInt(0), CodeHash: ContractCodeHash.Bytes(), Root: common.HexToHash(ContractRoot), }) ContractPartialPath = common.Hex2Bytes("3114658a74d9cc9f7acf2c5cd696c3494d7c344d78bfec3add0d91ec4e8d1c45") ContractLeafNode, _ = rlp.EncodeToBytes([]interface{}{ ContractPartialPath, ContractAccount, }) AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421" AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") AccountLeafKey = test_helpers.Account2LeafKey RemovedLeafKey = test_helpers.Account1LeafKey Account, _ = rlp.EncodeToBytes(&types.StateAccount{ Nonce: nonce0, Balance: big.NewInt(1000), CodeHash: AccountCodeHash.Bytes(), Root: common.HexToHash(AccountRoot), }) AccountPartialPath = common.Hex2Bytes("3957f3e2f04a0764c3a0491b175f69926da61efbcc8f61fa1455fd2d2b4cdd45") AccountLeafNode, _ = rlp.EncodeToBytes([]interface{}{ AccountPartialPath, Account, }) StateDiffs = []sdtypes.StateNode{ { Path: []byte{'\x06'}, NodeType: sdtypes.Leaf, LeafKey: ContractLeafKey, NodeValue: ContractLeafNode, StorageNodes: []sdtypes.StorageNode{ { Path: []byte{}, NodeType: sdtypes.Leaf, LeafKey: StorageLeafKey, NodeValue: StorageLeafNode, }, { Path: []byte{'\x03'}, NodeType: sdtypes.Removed, LeafKey: RemovedLeafKey, NodeValue: []byte{}, }, }, }, { Path: []byte{'\x0c'}, NodeType: sdtypes.Leaf, LeafKey: AccountLeafKey, NodeValue: AccountLeafNode, StorageNodes: []sdtypes.StorageNode{}, }, { Path: []byte{'\x02'}, NodeType: sdtypes.Removed, LeafKey: RemovedLeafKey, NodeValue: []byte{}, }, } )
Test variables
Functions ¶
func DetectAndSegmentRangeByChunkSize ¶
func DetectAndSegmentRangeByChunkSize(readConf postgres.Config, chunkSize uint64) ([][2]uint64, error)
DetectAndSegmentRangeByChunkSize finds the min and max block heights in the DB, and breaks the range up into segments based on the provided chunk size
func NewTableReadModels ¶
NewTableReadModels returns an allocation for the read DB models of the provided table
func NewTableTransformer ¶
func NewTableTransformer(table TableName) interfaces.Transformer
NewTableTransformer inits and returns a Transformers for the provided tables
func NewTableTransformerSet ¶
func NewTableTransformerSet(tables []TableName) map[TableName]interfaces.Transformer
NewTableTransformerSet inits and returns a set of Transformers for the provided tables
func SegmentRangeByChunkSize ¶
SegmentRangeByChunkSize splits the provided range up into segments based on the desired size of the segments
Types ¶
type Migrator ¶
type Migrator interface { Migrate(wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error) Transfer(wg *sync.WaitGroup, fdwTableName string, segmentSize, segmentOffset, maxPage uint64) (chan [2]uint64, chan struct{}, chan error, error) TransformToCSV(csvWriter csv.Writer, wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error) io.Closer }
Migrator interface for migrating from v2 DB to v3 DB
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader struct for reading v2 DB eth.log_cids models
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service struct underpinning the Migrator interface
func (*Service) Close ¶
Close satisfied io.Closer Close shuts down the Migrator, it quits all Migrate goroutines that are currently running whereas closing the chan returned by Migrate only closes the goroutines spun up by that method call
func (*Service) Migrate ¶
func (s *Service) Migrate(wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error)
Migrate satisfies Migrator Migrate spins up a goroutine to process the block ranges provided through the blockRanges work chan for the specified tables Migrate returns a channel for emitting read gaps and failed write ranges, a channel for signaling completion of the process, a quitChan for closing the single process, and a channel for writing out errors
func (*Service) Transfer ¶ added in v1.1.0
func (s *Service) Transfer(wg *sync.WaitGroup, fdwTableName string, segmentSize, segmentOffset, maxPage uint64) (chan [2]uint64, chan struct{}, chan error, error)
Transfer for transferring public.blocks to a new DB page-by-page Transfer assumes the targeted postgres_fdw is already in the db returns a chan for logging failed transfer page ranges, a chan for the errors that caused them, a chan for signalling success, and any error during initialization
func (*Service) TransformToCSV ¶ added in v1.1.0
func (s *Service) TransformToCSV(csvWriter csv.Writer, wg *sync.WaitGroup, tableName TableName, blockRanges <-chan [2]uint64) (chan [2]uint64, chan [2]uint64, chan struct{}, chan struct{}, chan error)
TransformToCSV satisfies Migrator TransformToCSV spins up a goroutine to process the block ranges provided through the blockRanges work chan for the specified tables TransformToCSV returns a channel for emitting read gaps and failed write ranges, a channel for signaling completion of the process, a quitChan for closing the single process, and a channel for writing out errors
type TableName ¶
type TableName string
TableName explicitly types table name strings
const ( PublicNodes TableName = "nodes" EthHeaders TableName = "header_cids" EthUncles TableName = "uncle_cids" EthTransactions TableName = "transaction_cids" EthAccessListElements TableName = "access_list_elements" EthReceipts TableName = "receipt_cids" EthLogs TableName = "log_cids" EthLogsRepair TableName = "log_cids_repair" EthState TableName = "state_cids" EthAccounts TableName = "state_accounts" EthStorage TableName = "storage_cids" Unknown TableName = "unknown" )
func NewTableNameFromString ¶
NewTableNameFromString returns the TableName from the provided string