migrator

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: AGPL-3.0 Imports: 14 Imported by: 6

README

Migrator

A migrator manages migrating storage from a source to a destination. It does not know anything about networks or protocols. If you need a network and protocol you should also look at toProtocol fromProtocol and protocolRW.

Functionality

NewMigrator(source storage.TrackingStorageProvider,
  dest storage.StorageProvider,
  block_order storage.BlockOrder,
  config *MigratorConfig) (*Migrator, error)

First we have a storage.TrackingStorageProvider. This serves as a source for data. It also tracks dirty blocks with a single sync function which returns a bitfield of dirty blocks.

Next we have a destination storage.StorageProvider which is where the data will be written to. Typically you may want this to be a ToProtocol which will serialize calls through a Protocol.

Next we have a storage.BlockOrder which tells the migrator which order it should send blocks. A typical block orderer would be a PriorityOrderer chained to a VolatilityMonitor. In that way, we can send any priority blocks (Blocks the destination needs ASAP), and then we can send blocks from least volatile to most.

Let's look at the MigratorConfig.

type MigratorConfig struct {
  BlockSize       int
  LockerHandler   func()
  UnlockerHandler func()
  ErrorHandler    func(b *storage.BlockInfo, err error)
  ProgressHandler func(p *MigrationProgress)
  Concurrency     map[int]int
}

BlockSize This is the size of a block in migration. It can be anything you wish.

LockerHandler and UnlockerHandler These functions are called to lock and unlock the source. That is, to pause whatever is writing to the source. You can use a Lockable here to just stop the writes, or ideally you can pause/resume whatever is doing the writes. You should also flush writes.

ErrorHandler This function is called if there was an error migrating a block. That is, there was an error reading or writing the block. In this instance, you might want to check things over, and re-schedule the block by adding it back into the blockOrderer.

ProgressHandler This callback allows you to monitor progress of the migration. It's called each time a block is migrated.

Concurrency This map defines how many blocks of each type can be migrated concurrently.

Usage

Once you have a Migrator you can call .Migrate(numBlocks) to migrate some blocks. Note that this call may block depending on concurrency settings. Also it will return before migration is completed. To ensure migration is completed call .WaitForCompletion().

If you'd like to re-migrate some dirty blocks, you can call .GetLatestDirty() and then .MigrateDirty(). Once again, this call may block depending on concurrency, and you should again call .WaitForCompletion() to ensure the migration has completed.

Simple example

blockSize := 65536
numBlocks := 1024
source := sources.NewMemoryStorage(blockSize * numBlocks)
sourceDirty := modules.NewFilterReadDirtyTracker(source, blockSize)

orderer := blocks.NewAnyBlockOrder(numBlocks, nil)
orderer.AddAll()

dest := sources.NewMemoryStorage(blockSize * numBlocks)

conf := NewMigratorConfig().WithBlockSize(blockSize)

mig, err := NewMigrator(sourceDirty, dest, orderer, conf)

mig.Migrate(numBlocks)

err = mig.WaitForCompletion()

This simple example migrates storage from source to dest.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockStatus added in v0.1.0

type BlockStatus struct {
	Set         bool
	UpdatingID  uint64
	CurrentID   uint64
	CurrentHash [sha256.Size]byte
}

type Config added in v0.1.0

type Config struct {
	Logger            types.Logger
	BlockSize         int
	LockerHandler     func()
	UnlockerHandler   func()
	ErrorHandler      func(b *storage.BlockInfo, err error)
	ProgressHandler   func(p *MigrationProgress)
	ProgressRateLimit time.Duration
	BlockHandler      func(b *storage.BlockInfo, id uint64, block []byte)
	Concurrency       map[int]int
	Integrity         bool
	CancelWrites      bool
	DedupeWrites      bool
	RecentWriteAge    time.Duration
}

func NewConfig added in v0.1.0

func NewConfig() *Config

func (*Config) WithBlockSize added in v0.1.0

func (mc *Config) WithBlockSize(bs int) *Config

type MigrationProgress

type MigrationProgress struct {
	BlockSize             int
	TotalBlocks           int // Total blocks
	MigratedBlocks        int // Number of blocks that have been migrated
	MigratedBlocksPerc    float64
	ReadyBlocks           int // Number of blocks which are up to date (clean). May go down as well as up.
	ReadyBlocksPerc       float64
	ActiveBlocks          int // Number of blocks in progress now
	TotalCanceledBlocks   int // Total blocks that were cancelled
	TotalMigratedBlocks   int // Total blocks that were migrated
	TotalDuplicatedBlocks int
}

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

func NewMigrator

func NewMigrator(source storage.TrackingProvider,
	dest storage.Provider,
	blockOrder storage.BlockOrder,
	config *Config) (*Migrator, error)

func (*Migrator) GetHashes added in v0.0.4

func (m *Migrator) GetHashes() map[uint][sha256.Size]byte

func (*Migrator) GetLatestDirty

func (m *Migrator) GetLatestDirty() []uint

*

  • Get the latest dirty blocks.
  • If there a no more dirty blocks, we leave the src locked.

func (*Migrator) GetLatestDirtyFunc added in v0.0.5

func (m *Migrator) GetLatestDirtyFunc(getter func() []uint) []uint

*

  • Get the latest dirty blocks.
  • If there a no more dirty blocks, we leave the src locked.

func (*Migrator) GetMetrics added in v0.1.5

func (m *Migrator) GetMetrics() *MigrationProgress

*

  • Get overall status of the migration *

func (*Migrator) Migrate

func (m *Migrator) Migrate(numBlocks int) error

*

  • Migrate storage to dest.

func (*Migrator) MigrateDirty

func (m *Migrator) MigrateDirty(blocks []uint) error

*

  • MigrateDirty migrates a list of dirty blocks.
  • An attempt is made to cancel any existing writes for the blocks first.

func (*Migrator) MigrateDirtyWithID added in v0.1.0

func (m *Migrator) MigrateDirtyWithID(blocks []uint, tid uint64) error

*

*
* You can give a tracking ID which will turn up at block_fn on success

func (*Migrator) SetMigratedBlock added in v0.0.5

func (m *Migrator) SetMigratedBlock(block int)

*

  • Set a block to already migrated state

func (*Migrator) SetSourceMapped added in v0.0.6

func (m *Migrator) SetSourceMapped(ms *modules.MappedStorage, writer func([]byte, int64, map[uint64]uint64) (int, error))

*

  • Set a MappedStorage for the source. *

func (*Migrator) Unlock added in v0.0.2

func (m *Migrator) Unlock()

func (*Migrator) WaitForCompletion

func (m *Migrator) WaitForCompletion() error

type SyncConfig added in v0.0.10

type SyncConfig struct {
	Logger           types.Logger
	Name             string
	Tracker          *dirtytracker.Remote     // A dirty block tracker
	Lockable         storage.LockableProvider // Lockable
	LockerHandler    func()
	UnlockerHandler  func()
	Destination      storage.Provider
	Orderer          storage.BlockOrder
	DirtyCheckPeriod time.Duration
	DirtyBlockGetter func() []uint

	BlockSize int

	HashesHandler     func(map[uint][32]byte)
	ProgressHandler   func(p *MigrationProgress)
	ProgressRateLimit time.Duration
	ErrorHandler      func(b *storage.BlockInfo, err error)

	Concurrency  map[int]int
	Integrity    bool
	CancelWrites bool
	DedupeWrites bool
}

type Syncer added in v0.0.10

type Syncer struct {
	// contains filtered or unexported fields
}

func NewSyncer added in v0.0.10

func NewSyncer(ctx context.Context, sinfo *SyncConfig) *Syncer

func (*Syncer) GetMetrics added in v0.1.5

func (s *Syncer) GetMetrics() *MigrationProgress

func (*Syncer) GetSafeBlockMap added in v0.0.10

func (s *Syncer) GetSafeBlockMap() map[uint][sha256.Size]byte

*

  • Get a list of blocks that are safe (On the destination)
  • NB This does not include the very latest dirty list, but it's a good starting point.

func (*Syncer) Sync added in v0.0.10

func (s *Syncer) Sync(syncAllFirst bool, continuous bool) (*MigrationProgress, error)

*

*
*

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL