wal

package
v0.25.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2022 License: AGPL-3.0 Imports: 32 Imported by: 13

Documentation

Index

Constants

View Source
const MagicBytes uint16 = 0x2137
View Source
const SegmentSize = 32 * 1024 * 1024
View Source
const VersionV1 uint16 = 0x01
View Source
const VersionV3 uint16 = 0x03

Versions was reset while changing trie format, so now bump it to 3 to avoid conflicts Version 3 contains a file checksum for detecting corrupted checkpoint files.

View Source
const VersionV4 uint16 = 0x04

Version 4 contains a footer with node count and trie count (previously in the header). Version 4 also reduces checkpoint data size. See EncodeNode() and EncodeTrie() for more details.

View Source
const VersionV5 uint16 = 0x05

Version 5 includes these changes: - remove regCount and maxDepth from serialized nodes - add allocated register count and size to serialized tries - reduce number of bytes used to encode payload value size from 8 bytes to 4 bytes. See EncodeNode() and EncodeTrie() for more details.

Variables

This section is empty.

Functions

func CreateCheckpointWriterForFile

func CreateCheckpointWriterForFile(dir, filename string, logger *zerolog.Logger) (io.WriteCloser, error)

CreateCheckpointWriterForFile returns a file writer that will write to a temporary file and then move it to the checkpoint folder by renaming it.

func EncodeDelete

func EncodeDelete(rootHash ledger.RootHash) []byte

func EncodeUpdate

func EncodeUpdate(update *ledger.TrieUpdate) []byte

func LoadCheckpoint

func LoadCheckpoint(filepath string, logger *zerolog.Logger) ([]*trie.MTrie, error)

func NumberToFilename

func NumberToFilename(n int) string

func NumberToFilenamePart

func NumberToFilenamePart(n int) string

func StoreCheckpoint

func StoreCheckpoint(writer io.Writer, tries ...*trie.MTrie) error

StoreCheckpoint writes the given tries to checkpoint file, and also appends a CRC32 file checksum for integrity check. Checkpoint file consists of a flattened forest. Specifically, it consists of:

  • a list of encoded nodes, where references to other nodes are by list index.
  • a list of encoded tries, each referencing their respective root node by index.

Referencing to other nodes by index 0 is a special case, meaning nil.

As an important property, the nodes are listed in an order which satisfies Descendents-First-Relationship. The Descendents-First-Relationship has the following important property: When rebuilding the trie from the sequence of nodes, build the trie on the fly, as for each node, the children have been previously encountered. TODO: evaluate alternatives to CRC32 since checkpoint file is many GB in size. TODO: add concurrency if the performance gains are enough to offset complexity.

Types

type Checkpointer

type Checkpointer struct {
	// contains filtered or unexported fields
}

func NewCheckpointer

func NewCheckpointer(wal *DiskWAL, keyByteSize int, forestCapacity int) *Checkpointer

func (*Checkpointer) Checkpoint

func (c *Checkpointer) Checkpoint(to int, targetWriter func() (io.WriteCloser, error)) (err error)

Checkpoint creates new checkpoint stopping at given segment

func (*Checkpointer) CheckpointWriter

func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error)

func (*Checkpointer) Checkpoints added in v0.14.0

func (c *Checkpointer) Checkpoints() ([]int, error)

Checkpoints returns all the checkpoint numbers in asc order

func (*Checkpointer) HasRootCheckpoint

func (c *Checkpointer) HasRootCheckpoint() (bool, error)

func (*Checkpointer) LatestCheckpoint

func (c *Checkpointer) LatestCheckpoint() (int, error)

LatestCheckpoint returns number of latest checkpoint or -1 if there are no checkpoints

func (*Checkpointer) LoadCheckpoint

func (c *Checkpointer) LoadCheckpoint(checkpoint int) ([]*trie.MTrie, error)

func (*Checkpointer) LoadRootCheckpoint

func (c *Checkpointer) LoadRootCheckpoint() ([]*trie.MTrie, error)

func (*Checkpointer) NotCheckpointedSegments

func (c *Checkpointer) NotCheckpointedSegments() (from, to int, err error)

NotCheckpointedSegments - returns numbers of segments which are not checkpointed yet, or -1, -1 if there are no segments

func (*Checkpointer) RemoveCheckpoint added in v0.14.0

func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error

type Compactor

type Compactor struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor(checkpointer *Checkpointer, interval time.Duration, checkpointDistance uint, checkpointsToKeep uint, logger zerolog.Logger) *Compactor

func (*Compactor) Done

func (c *Compactor) Done() <-chan struct{}

func (*Compactor) Ready

func (c *Compactor) Ready() <-chan struct{}

Ready periodically fires Run function, every `interval`

func (*Compactor) Run

func (c *Compactor) Run() error

func (*Compactor) Subscribe added in v0.21.0

func (c *Compactor) Subscribe(observer observable.Observer)

func (*Compactor) Unsubscribe added in v0.21.0

func (c *Compactor) Unsubscribe(observer observable.Observer)

type Crc32Reader added in v0.14.0

type Crc32Reader struct {
	// contains filtered or unexported fields
}

func NewCRC32Reader added in v0.14.0

func NewCRC32Reader(reader io.Reader) *Crc32Reader

func (*Crc32Reader) Crc32 added in v0.14.0

func (c *Crc32Reader) Crc32() uint32

func (*Crc32Reader) Read added in v0.14.0

func (c *Crc32Reader) Read(p []byte) (int, error)

type Crc32Writer added in v0.14.0

type Crc32Writer struct {
	Writer io.Writer
	// contains filtered or unexported fields
}

func NewCRC32Writer added in v0.14.0

func NewCRC32Writer(writer io.Writer) *Crc32Writer

func (*Crc32Writer) Crc32 added in v0.14.0

func (c *Crc32Writer) Crc32() uint32

func (*Crc32Writer) Write added in v0.14.0

func (c *Crc32Writer) Write(p []byte) (n int, err error)

type DiskWAL added in v0.15.4

type DiskWAL struct {
	// contains filtered or unexported fields
}

func NewDiskWAL added in v0.15.4

func NewDiskWAL(logger zerolog.Logger, reg prometheus.Registerer, metrics module.WALMetrics, dir string, forestCapacity int, pathByteSize int, segmentSize int) (*DiskWAL, error)

TODO use real logger and metrics, but that would require passing them to Trie storage

func (*DiskWAL) DiskSize added in v0.15.4

func (w *DiskWAL) DiskSize() (uint64, error)

DiskSize returns the amount of disk space used by the storage (in bytes)

func (*DiskWAL) Done added in v0.15.4

func (w *DiskWAL) Done() <-chan struct{}

Done implements interface module.ReadyDoneAware it closes all the open write-ahead log files.

func (*DiskWAL) NewCheckpointer added in v0.15.4

func (w *DiskWAL) NewCheckpointer() (*Checkpointer, error)

NewCheckpointer returns a Checkpointer for this WAL

func (*DiskWAL) PauseRecord added in v0.15.4

func (w *DiskWAL) PauseRecord()

func (*DiskWAL) Ready added in v0.15.4

func (w *DiskWAL) Ready() <-chan struct{}

func (*DiskWAL) RecordDelete added in v0.15.4

func (w *DiskWAL) RecordDelete(rootHash ledger.RootHash) error

func (*DiskWAL) RecordUpdate added in v0.15.4

func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) error

func (*DiskWAL) Replay added in v0.15.4

func (w *DiskWAL) Replay(
	checkpointFn func(tries []*trie.MTrie) error,
	updateFn func(update *ledger.TrieUpdate) error,
	deleteFn func(ledger.RootHash) error,
) error

func (*DiskWAL) ReplayLogsOnly added in v0.15.4

func (w *DiskWAL) ReplayLogsOnly(
	checkpointFn func(tries []*trie.MTrie) error,
	updateFn func(update *ledger.TrieUpdate) error,
	deleteFn func(rootHash ledger.RootHash) error,
) error

func (*DiskWAL) ReplayOnForest added in v0.15.4

func (w *DiskWAL) ReplayOnForest(forest *mtrie.Forest) error

func (*DiskWAL) Segments added in v0.15.4

func (w *DiskWAL) Segments() (first, last int, err error)

func (*DiskWAL) UnpauseRecord added in v0.15.4

func (w *DiskWAL) UnpauseRecord()

type LedgerWAL

type LedgerWAL interface {
	module.ReadyDoneAware

	NewCheckpointer() (*Checkpointer, error)
	PauseRecord()
	UnpauseRecord()
	RecordUpdate(update *ledger.TrieUpdate) error
	RecordDelete(rootHash ledger.RootHash) error
	ReplayOnForest(forest *mtrie.Forest) error
	Segments() (first, last int, err error)
	Replay(
		checkpointFn func(tries []*trie.MTrie) error,
		updateFn func(update *ledger.TrieUpdate) error,
		deleteFn func(ledger.RootHash) error,
	) error
	ReplayLogsOnly(
		checkpointFn func(tries []*trie.MTrie) error,
		updateFn func(update *ledger.TrieUpdate) error,
		deleteFn func(rootHash ledger.RootHash) error,
	) error
}

type SyncOnCloseRenameFile added in v0.14.0

type SyncOnCloseRenameFile struct {
	*bufio.Writer
	// contains filtered or unexported fields
}

SyncOnCloseRenameFile is a composite of buffered writer over a given file which flushes/sync on closing and renames to `targetName` as a last step. Typical usecase is to write data to a temporary file and only rename it to target one as the last step. This help avoid situation when writing is interrupted and unusable file but with target name exists.

func (*SyncOnCloseRenameFile) Close added in v0.14.0

func (s *SyncOnCloseRenameFile) Close() error

func (*SyncOnCloseRenameFile) Sync added in v0.14.0

func (s *SyncOnCloseRenameFile) Sync() error

func (*SyncOnCloseRenameFile) Write added in v0.23.9

func (s *SyncOnCloseRenameFile) Write(b []byte) (int, error)

type WALOperation

type WALOperation uint8
const WALDelete WALOperation = 2
const WALUpdate WALOperation = 1

func Decode

func Decode(data []byte) (operation WALOperation, rootHash ledger.RootHash, update *ledger.TrieUpdate, err error)

type WriterSeekerCloser added in v0.14.0

type WriterSeekerCloser interface {
	io.Writer
	io.Seeker
	io.Closer
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL