wal

package
v0.22.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2021 License: AGPL-3.0 Imports: 29 Imported by: 13

Documentation

Index

Constants

View Source
const MagicBytes uint16 = 0x2137
View Source
const SegmentSize = 32 * 1024 * 1024
View Source
const VersionV1 uint16 = 0x01
View Source
const VersionV3 uint16 = 0x03

Versions was reset while changing trie format, so now bump it to 3 to avoid conflicts Version 3 contains a file checksum for detecting corrupted checkpoint files.

Variables

This section is empty.

Functions

func CreateCheckpointWriter

func CreateCheckpointWriter(dir string, fileNo int) (io.WriteCloser, error)

func CreateCheckpointWriterForFile

func CreateCheckpointWriterForFile(dir, filename string) (io.WriteCloser, error)

CreateCheckpointWriterForFile returns a file writer that will write to a temporary file and then move it to the checkpoint folder by renaming it.

func EncodeDelete

func EncodeDelete(rootHash ledger.RootHash) []byte

func EncodeUpdate

func EncodeUpdate(update *ledger.TrieUpdate) []byte

func LoadCheckpoint

func LoadCheckpoint(filepath string) (*flattener.FlattenedForest, error)

func NumberToFilename

func NumberToFilename(n int) string

func NumberToFilenamePart

func NumberToFilenamePart(n int) string

func ReadCheckpoint added in v0.14.0

func ReadCheckpoint(r io.Reader) (*flattener.FlattenedForest, error)

func StoreCheckpoint

func StoreCheckpoint(forestSequencing *flattener.FlattenedForest, writer io.Writer) error

StoreCheckpoint writes the given checkpoint to disk, and also append with a CRC32 file checksum for integrity check.

Types

type Checkpointer

type Checkpointer struct {
	// contains filtered or unexported fields
}

func NewCheckpointer

func NewCheckpointer(wal *DiskWAL, keyByteSize int, forestCapacity int) *Checkpointer

func (*Checkpointer) Checkpoint

func (c *Checkpointer) Checkpoint(to int, targetWriter func() (io.WriteCloser, error)) error

Checkpoint creates new checkpoint stopping at given segment

func (*Checkpointer) CheckpointWriter

func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error)

func (*Checkpointer) Checkpoints added in v0.14.0

func (c *Checkpointer) Checkpoints() ([]int, error)

Checkpoints returns all the checkpoint numbers in asc order

func (*Checkpointer) HasRootCheckpoint

func (c *Checkpointer) HasRootCheckpoint() (bool, error)

func (*Checkpointer) LatestCheckpoint

func (c *Checkpointer) LatestCheckpoint() (int, error)

LatestCheckpoint returns number of latest checkpoint or -1 if there are no checkpoints

func (*Checkpointer) LoadCheckpoint

func (c *Checkpointer) LoadCheckpoint(checkpoint int) (*flattener.FlattenedForest, error)

func (*Checkpointer) LoadRootCheckpoint

func (c *Checkpointer) LoadRootCheckpoint() (*flattener.FlattenedForest, error)

func (*Checkpointer) NotCheckpointedSegments

func (c *Checkpointer) NotCheckpointedSegments() (from, to int, err error)

NotCheckpointedSegments - returns numbers of segments which are not checkpointed yet, or -1, -1 if there are no segments

func (*Checkpointer) RemoveCheckpoint added in v0.14.0

func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error

type Compactor

type Compactor struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor(checkpointer *Checkpointer, interval time.Duration, checkpointDistance uint, checkpointsToKeep uint) *Compactor

func (*Compactor) Done

func (c *Compactor) Done() <-chan struct{}

func (*Compactor) Ready

func (c *Compactor) Ready() <-chan struct{}

Ready periodically fires Run function, every `interval`

func (*Compactor) Run

func (c *Compactor) Run() error

func (*Compactor) Subscribe added in v0.21.0

func (c *Compactor) Subscribe(observer observable.Observer)

func (*Compactor) Unsubscribe added in v0.21.0

func (c *Compactor) Unsubscribe(observer observable.Observer)

type Crc32Reader added in v0.14.0

type Crc32Reader struct {
	// contains filtered or unexported fields
}

func NewCRC32Reader added in v0.14.0

func NewCRC32Reader(reader io.Reader) *Crc32Reader

func (*Crc32Reader) Crc32 added in v0.14.0

func (c *Crc32Reader) Crc32() uint32

func (*Crc32Reader) Read added in v0.14.0

func (c *Crc32Reader) Read(p []byte) (int, error)

type Crc32Writer added in v0.14.0

type Crc32Writer struct {
	Writer io.Writer
	// contains filtered or unexported fields
}

func NewCRC32Writer added in v0.14.0

func NewCRC32Writer(writer io.Writer) *Crc32Writer

func (*Crc32Writer) Crc32 added in v0.14.0

func (c *Crc32Writer) Crc32() uint32

func (*Crc32Writer) Write added in v0.14.0

func (c *Crc32Writer) Write(p []byte) (n int, err error)

type DiskWAL added in v0.15.4

type DiskWAL struct {
	// contains filtered or unexported fields
}

func NewDiskWAL added in v0.15.4

func NewDiskWAL(logger zerolog.Logger, reg prometheus.Registerer, metrics module.WALMetrics, dir string, forestCapacity int, pathByteSize int, segmentSize int) (*DiskWAL, error)

TODO use real logger and metrics, but that would require passing them to Trie storage

func (*DiskWAL) DiskSize added in v0.15.4

func (w *DiskWAL) DiskSize() (uint64, error)

DiskSize returns the amount of disk space used by the storage (in bytes)

func (*DiskWAL) Done added in v0.15.4

func (w *DiskWAL) Done() <-chan struct{}

Done implements interface module.ReadyDoneAware it closes all the open write-ahead log files.

func (*DiskWAL) NewCheckpointer added in v0.15.4

func (w *DiskWAL) NewCheckpointer() (*Checkpointer, error)

NewCheckpointer returns a Checkpointer for this WAL

func (*DiskWAL) PauseRecord added in v0.15.4

func (w *DiskWAL) PauseRecord()

func (*DiskWAL) Ready added in v0.15.4

func (w *DiskWAL) Ready() <-chan struct{}

func (*DiskWAL) RecordDelete added in v0.15.4

func (w *DiskWAL) RecordDelete(rootHash ledger.RootHash) error

func (*DiskWAL) RecordUpdate added in v0.15.4

func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) error

func (*DiskWAL) Replay added in v0.15.4

func (w *DiskWAL) Replay(
	checkpointFn func(forestSequencing *flattener.FlattenedForest) error,
	updateFn func(update *ledger.TrieUpdate) error,
	deleteFn func(ledger.RootHash) error,
) error

func (*DiskWAL) ReplayLogsOnly added in v0.15.4

func (w *DiskWAL) ReplayLogsOnly(
	checkpointFn func(forestSequencing *flattener.FlattenedForest) error,
	updateFn func(update *ledger.TrieUpdate) error,
	deleteFn func(rootHash ledger.RootHash) error,
) error

func (*DiskWAL) ReplayOnForest added in v0.15.4

func (w *DiskWAL) ReplayOnForest(forest *mtrie.Forest) error

func (*DiskWAL) Segments added in v0.15.4

func (w *DiskWAL) Segments() (first, last int, err error)

func (*DiskWAL) UnpauseRecord added in v0.15.4

func (w *DiskWAL) UnpauseRecord()

type LedgerWAL

type LedgerWAL interface {
	module.ReadyDoneAware

	NewCheckpointer() (*Checkpointer, error)
	PauseRecord()
	UnpauseRecord()
	RecordUpdate(update *ledger.TrieUpdate) error
	RecordDelete(rootHash ledger.RootHash) error
	ReplayOnForest(forest *mtrie.Forest) error
	Segments() (first, last int, err error)
	Replay(
		checkpointFn func(forestSequencing *flattener.FlattenedForest) error,
		updateFn func(update *ledger.TrieUpdate) error,
		deleteFn func(ledger.RootHash) error,
	) error
	ReplayLogsOnly(
		checkpointFn func(forestSequencing *flattener.FlattenedForest) error,
		updateFn func(update *ledger.TrieUpdate) error,
		deleteFn func(rootHash ledger.RootHash) error,
	) error
}

type SyncOnCloseRenameFile added in v0.14.0

type SyncOnCloseRenameFile struct {
	*bufio.Writer
	// contains filtered or unexported fields
}

SyncOnCloseRenameFile is a composite of buffered writer over a given file which flushes/sync on closing and renames to `targetName` as a last step. Typical usecase is to write data to a temporary file and only rename it to target one as the last step. This help avoid situation when writing is interrupted and unusable file but with target name exists.

func (*SyncOnCloseRenameFile) Close added in v0.14.0

func (s *SyncOnCloseRenameFile) Close() error

func (*SyncOnCloseRenameFile) Sync added in v0.14.0

func (s *SyncOnCloseRenameFile) Sync() error

type WALOperation

type WALOperation uint8
const WALDelete WALOperation = 2
const WALUpdate WALOperation = 1

func Decode

func Decode(data []byte) (operation WALOperation, rootHash ledger.RootHash, update *ledger.TrieUpdate, err error)

type WriterSeekerCloser added in v0.14.0

type WriterSeekerCloser interface {
	io.Writer
	io.Seeker
	io.Closer
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL