merger

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const ParallelOneBlockDownload = 2

Variables

View Source
var DefaultFilesDeleteBatchSize = 10000
View Source
var DeleteObjectTimeout = 5 * time.Minute
View Source
var ErrFirstBlockAfterInitialStreamableBlock = errors.New("received first block after inital streamable block")
View Source
var ErrHoleFound = errors.New("hole found in merged files")
View Source
var ErrStopBlockReached = errors.New("stop block reached")
View Source
var GetObjectTimeout = 5 * time.Minute
View Source
var ListFilesTimeout = 10 * time.Minute
View Source
var WriteObjectTimeout = 5 * time.Minute

Functions

func Retry

func Retry(logger *zap.Logger, attempts int, sleep time.Duration, function func() error) (err error)

Types

type BundleReader

type BundleReader struct {
	// contains filtered or unexported fields
}

func NewBundleReader

func NewBundleReader(ctx context.Context, logger *zap.Logger, tracer logging.Tracer, oneBlockFiles []*bstream.OneBlockFile, anyOneBlockFile *bstream.OneBlockFile, oneBlockDownloader bstream.OneBlockDownloaderFunc) (*BundleReader, error)

func (*BundleReader) Read

func (r *BundleReader) Read(p []byte) (bytesRead int, err error)

type Bundler

type Bundler struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewBundler

func NewBundler(startBlock, stopBlock, firstStreamableBlock, bundleSize uint64, io IOInterface) *Bundler

func (*Bundler) BaseBlockNum

func (b *Bundler) BaseBlockNum() uint64

BaseBlockNum can be called from a different thread

func (*Bundler) HandleBlockFile

func (b *Bundler) HandleBlockFile(obf *bstream.OneBlockFile) error

func (*Bundler) ProcessBlock

func (b *Bundler) ProcessBlock(_ *pbbstream.Block, obj interface{}) error

func (*Bundler) Reset

func (b *Bundler) Reset(nextBase uint64, lib bstream.BlockRef)

func (*Bundler) String

func (b *Bundler) String() string

String can be called from a different thread

type DStoreIO

type DStoreIO struct {
	// contains filtered or unexported fields
}

func (*DStoreIO) DeleteAsync

func (s *DStoreIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error

func (*DStoreIO) DownloadOneBlockFile

func (s *DStoreIO) DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)

func (*DStoreIO) MergeAndStore

func (s *DStoreIO) MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)

func (*DStoreIO) NextBundle

func (s *DStoreIO) NextBundle(ctx context.Context, lowestBaseBlock uint64) (outBaseBlock uint64, lib bstream.BlockRef, err error)

func (*DStoreIO) WalkOneBlockFiles

func (s *DStoreIO) WalkOneBlockFiles(ctx context.Context, lowestBlock uint64, callback func(*bstream.OneBlockFile) error) error

type ForkAwareDStoreIO

type ForkAwareDStoreIO struct {
	*DStoreIO
	// contains filtered or unexported fields
}

func (*ForkAwareDStoreIO) DeleteForkedBlocksAsync

func (s *ForkAwareDStoreIO) DeleteForkedBlocksAsync(inclusiveLowBoundary, inclusiveHighBoundary uint64)

func (*ForkAwareDStoreIO) MoveForkedBlocks

func (s *ForkAwareDStoreIO) MoveForkedBlocks(ctx context.Context, oneBlockFiles []*bstream.OneBlockFile)

type ForkAwareIOInterface

type ForkAwareIOInterface interface {
	// DeleteForkedBlocksAsync will delete forked blocks between lowBoundary and highBoundary (both inclusive)
	DeleteForkedBlocksAsync(inclusiveLowBoundary, inclusiveHighBoundary uint64)

	// MoveForkedBlocks will copy an array of oneBlockFiles to the forkedBlocksStore, then delete them (dstore does not have MOVE primitive)
	MoveForkedBlocks(ctx context.Context, oneBlockFiles []*bstream.OneBlockFile)
}

type IOInterface

type IOInterface interface {

	// NextBundle will read through consecutive merged blocks, starting at `lowestBaseBlock`, and return the next bundle that needs to be created
	// If it finds an existing merged file at `lowestBaseBlock`, it will read the last one and include the lastIrreversibleBlock so you can bootstrap your forkdb from there
	NextBundle(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)

	// WalkOneBlockFiles calls your function for each oneBlockFile it reads, starting at the inclusiveLowerBlock. Useful to feed a block source
	WalkOneBlockFiles(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error

	// MergeAndStore writes a merged file from a list of oneBlockFiles
	MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)

	// DownloadOneBlockFile will get you the data from the file
	DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)

	// DeleteAsync should be able to delete large quantities of oneBlockFiles from storage without ever blocking
	DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error
}

func NewDStoreIO

func NewDStoreIO(
	logger *zap.Logger,
	tracer logging.Tracer,
	oneBlocksStore dstore.Store,
	mergedBlocksStore dstore.Store,
	forkedBlocksStore dstore.Store,
	retryAttempts int,
	retryCooldown time.Duration,
	bundleSize uint64,
	numDeleteThreads int,
) IOInterface

type Merger

type Merger struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewMerger

func NewMerger(
	logger *zap.Logger,
	grpcListenAddr string,
	io IOInterface,

	firstStreamableBlock uint64,
	bundleSize uint64,
	pruningDistanceToLIB uint64,
	timeBetweenPruning time.Duration,
	timeBetweenPolling time.Duration,
	stopBlock uint64,
) *Merger

func (*Merger) Check

Check is basic GRPC Healthcheck

func (*Merger) Run

func (m *Merger) Run()

func (*Merger) Watch

Watch is basic GRPC Healthcheck as a stream

type TestMergerIO

type TestMergerIO struct {
	NextBundleFunc           func(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)
	WalkOneBlockFilesFunc    func(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error
	MergeAndStoreFunc        func(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)
	DownloadOneBlockFileFunc func(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)
	DeleteAsyncFunc          func(oneBlockFiles []*bstream.OneBlockFile) error
}

func (*TestMergerIO) DeleteAsync

func (io *TestMergerIO) DeleteAsync(oneBlockFiles []*bstream.OneBlockFile) error

func (*TestMergerIO) DownloadOneBlockFile

func (io *TestMergerIO) DownloadOneBlockFile(ctx context.Context, oneBlockFile *bstream.OneBlockFile) (data []byte, err error)

func (*TestMergerIO) MergeAndStore

func (io *TestMergerIO) MergeAndStore(ctx context.Context, inclusiveLowerBlock uint64, oneBlockFiles []*bstream.OneBlockFile) (err error)

func (*TestMergerIO) NextBundle

func (io *TestMergerIO) NextBundle(ctx context.Context, lowestBaseBlock uint64) (baseBlock uint64, lastIrreversibleBlock bstream.BlockRef, err error)

func (*TestMergerIO) WalkOneBlockFiles

func (io *TestMergerIO) WalkOneBlockFiles(ctx context.Context, inclusiveLowerBlock uint64, callback func(*bstream.OneBlockFile) error) error

Directories

Path Synopsis
app

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL