Documentation ¶
Index ¶
- Constants
- func CreateCheckpointWriter(dir string, fileNo int) (io.WriteCloser, error)
- func CreateCheckpointWriterForFile(dir, filename string) (io.WriteCloser, error)
- func EncodeDelete(rootHash ledger.RootHash) []byte
- func EncodeUpdate(update *ledger.TrieUpdate) []byte
- func LoadCheckpoint(filepath string) ([]*trie.MTrie, error)
- func NumberToFilename(n int) string
- func NumberToFilenamePart(n int) string
- func StoreCheckpoint(writer io.Writer, tries ...*trie.MTrie) error
- type Checkpointer
- func (c *Checkpointer) Checkpoint(to int, targetWriter func() (io.WriteCloser, error)) (err error)
- func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error)
- func (c *Checkpointer) Checkpoints() ([]int, error)
- func (c *Checkpointer) HasRootCheckpoint() (bool, error)
- func (c *Checkpointer) LatestCheckpoint() (int, error)
- func (c *Checkpointer) LoadCheckpoint(checkpoint int) ([]*trie.MTrie, error)
- func (c *Checkpointer) LoadRootCheckpoint() ([]*trie.MTrie, error)
- func (c *Checkpointer) NotCheckpointedSegments() (from, to int, err error)
- func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error
- type Compactor
- type Crc32Reader
- type Crc32Writer
- type DiskWAL
- func (w *DiskWAL) DiskSize() (uint64, error)
- func (w *DiskWAL) Done() <-chan struct{}
- func (w *DiskWAL) NewCheckpointer() (*Checkpointer, error)
- func (w *DiskWAL) PauseRecord()
- func (w *DiskWAL) Ready() <-chan struct{}
- func (w *DiskWAL) RecordDelete(rootHash ledger.RootHash) error
- func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) error
- func (w *DiskWAL) Replay(checkpointFn func(tries []*trie.MTrie) error, ...) error
- func (w *DiskWAL) ReplayLogsOnly(checkpointFn func(tries []*trie.MTrie) error, ...) error
- func (w *DiskWAL) ReplayOnForest(forest *mtrie.Forest) error
- func (w *DiskWAL) Segments() (first, last int, err error)
- func (w *DiskWAL) UnpauseRecord()
- type LedgerWAL
- type SyncOnCloseRenameFile
- type WALOperation
- type WriterSeekerCloser
Constants ¶
const MagicBytes uint16 = 0x2137
const SegmentSize = 32 * 1024 * 1024
const VersionV1 uint16 = 0x01
const VersionV3 uint16 = 0x03
Versions was reset while changing trie format, so now bump it to 3 to avoid conflicts Version 3 contains a file checksum for detecting corrupted checkpoint files.
const VersionV4 uint16 = 0x04
Version 4 contains a footer with node count and trie count (previously in the header). Version 4 also reduces checkpoint data size. See EncodeNode() and EncodeTrie() for more details.
Variables ¶
This section is empty.
Functions ¶
func CreateCheckpointWriter ¶
func CreateCheckpointWriter(dir string, fileNo int) (io.WriteCloser, error)
func CreateCheckpointWriterForFile ¶
func CreateCheckpointWriterForFile(dir, filename string) (io.WriteCloser, error)
CreateCheckpointWriterForFile returns a file writer that will write to a temporary file and then move it to the checkpoint folder by renaming it.
func EncodeDelete ¶
func EncodeUpdate ¶
func EncodeUpdate(update *ledger.TrieUpdate) []byte
func NumberToFilename ¶
func NumberToFilenamePart ¶
func StoreCheckpoint ¶
StoreCheckpoint writes the given tries to checkpoint file, and also appends a CRC32 file checksum for integrity check. Checkpoint file consists of a flattened forest. Specifically, it consists of:
- a list of encoded nodes, where references to other nodes are by list index.
- a list of encoded tries, each referencing their respective root node by index.
Referencing to other nodes by index 0 is a special case, meaning nil.
As an important property, the nodes are listed in an order which satisfies Descendents-First-Relationship. The Descendents-First-Relationship has the following important property: When rebuilding the trie from the sequence of nodes, build the trie on the fly, as for each node, the children have been previously encountered. TODO: evaluate alternatives to CRC32 since checkpoint file is many GB in size. TODO: add concurrency if the performance gains are enough to offset complexity.
Types ¶
type Checkpointer ¶
type Checkpointer struct {
// contains filtered or unexported fields
}
func NewCheckpointer ¶
func NewCheckpointer(wal *DiskWAL, keyByteSize int, forestCapacity int) *Checkpointer
func (*Checkpointer) Checkpoint ¶
func (c *Checkpointer) Checkpoint(to int, targetWriter func() (io.WriteCloser, error)) (err error)
Checkpoint creates new checkpoint stopping at given segment
func (*Checkpointer) CheckpointWriter ¶
func (c *Checkpointer) CheckpointWriter(to int) (io.WriteCloser, error)
func (*Checkpointer) Checkpoints ¶ added in v0.14.0
func (c *Checkpointer) Checkpoints() ([]int, error)
Checkpoints returns all the checkpoint numbers in asc order
func (*Checkpointer) HasRootCheckpoint ¶
func (c *Checkpointer) HasRootCheckpoint() (bool, error)
func (*Checkpointer) LatestCheckpoint ¶
func (c *Checkpointer) LatestCheckpoint() (int, error)
LatestCheckpoint returns number of latest checkpoint or -1 if there are no checkpoints
func (*Checkpointer) LoadCheckpoint ¶
func (c *Checkpointer) LoadCheckpoint(checkpoint int) ([]*trie.MTrie, error)
func (*Checkpointer) LoadRootCheckpoint ¶
func (c *Checkpointer) LoadRootCheckpoint() ([]*trie.MTrie, error)
func (*Checkpointer) NotCheckpointedSegments ¶
func (c *Checkpointer) NotCheckpointedSegments() (from, to int, err error)
NotCheckpointedSegments - returns numbers of segments which are not checkpointed yet, or -1, -1 if there are no segments
func (*Checkpointer) RemoveCheckpoint ¶ added in v0.14.0
func (c *Checkpointer) RemoveCheckpoint(checkpoint int) error
type Compactor ¶
func NewCompactor ¶
func (*Compactor) Ready ¶
func (c *Compactor) Ready() <-chan struct{}
Ready periodically fires Run function, every `interval`
func (*Compactor) Subscribe ¶ added in v0.21.0
func (c *Compactor) Subscribe(observer observable.Observer)
func (*Compactor) Unsubscribe ¶ added in v0.21.0
func (c *Compactor) Unsubscribe(observer observable.Observer)
type Crc32Reader ¶ added in v0.14.0
type Crc32Reader struct {
// contains filtered or unexported fields
}
func NewCRC32Reader ¶ added in v0.14.0
func NewCRC32Reader(reader io.Reader) *Crc32Reader
func (*Crc32Reader) Crc32 ¶ added in v0.14.0
func (c *Crc32Reader) Crc32() uint32
type Crc32Writer ¶ added in v0.14.0
func NewCRC32Writer ¶ added in v0.14.0
func NewCRC32Writer(writer io.Writer) *Crc32Writer
func (*Crc32Writer) Crc32 ¶ added in v0.14.0
func (c *Crc32Writer) Crc32() uint32
type DiskWAL ¶ added in v0.15.4
type DiskWAL struct {
// contains filtered or unexported fields
}
func NewDiskWAL ¶ added in v0.15.4
func NewDiskWAL(logger zerolog.Logger, reg prometheus.Registerer, metrics module.WALMetrics, dir string, forestCapacity int, pathByteSize int, segmentSize int) (*DiskWAL, error)
TODO use real logger and metrics, but that would require passing them to Trie storage
func (*DiskWAL) DiskSize ¶ added in v0.15.4
DiskSize returns the amount of disk space used by the storage (in bytes)
func (*DiskWAL) Done ¶ added in v0.15.4
func (w *DiskWAL) Done() <-chan struct{}
Done implements interface module.ReadyDoneAware it closes all the open write-ahead log files.
func (*DiskWAL) NewCheckpointer ¶ added in v0.15.4
func (w *DiskWAL) NewCheckpointer() (*Checkpointer, error)
NewCheckpointer returns a Checkpointer for this WAL
func (*DiskWAL) PauseRecord ¶ added in v0.15.4
func (w *DiskWAL) PauseRecord()
func (*DiskWAL) RecordDelete ¶ added in v0.15.4
func (*DiskWAL) RecordUpdate ¶ added in v0.15.4
func (w *DiskWAL) RecordUpdate(update *ledger.TrieUpdate) error
func (*DiskWAL) ReplayLogsOnly ¶ added in v0.15.4
func (*DiskWAL) ReplayOnForest ¶ added in v0.15.4
func (*DiskWAL) UnpauseRecord ¶ added in v0.15.4
func (w *DiskWAL) UnpauseRecord()
type LedgerWAL ¶
type LedgerWAL interface { module.ReadyDoneAware NewCheckpointer() (*Checkpointer, error) PauseRecord() UnpauseRecord() RecordUpdate(update *ledger.TrieUpdate) error RecordDelete(rootHash ledger.RootHash) error ReplayOnForest(forest *mtrie.Forest) error Segments() (first, last int, err error) Replay( checkpointFn func(tries []*trie.MTrie) error, updateFn func(update *ledger.TrieUpdate) error, deleteFn func(ledger.RootHash) error, ) error ReplayLogsOnly( checkpointFn func(tries []*trie.MTrie) error, updateFn func(update *ledger.TrieUpdate) error, deleteFn func(rootHash ledger.RootHash) error, ) error }
type SyncOnCloseRenameFile ¶ added in v0.14.0
SyncOnCloseRenameFile is a composite of buffered writer over a given file which flushes/sync on closing and renames to `targetName` as a last step. Typical usecase is to write data to a temporary file and only rename it to target one as the last step. This help avoid situation when writing is interrupted and unusable file but with target name exists.
func (*SyncOnCloseRenameFile) Close ¶ added in v0.14.0
func (s *SyncOnCloseRenameFile) Close() error
func (*SyncOnCloseRenameFile) Sync ¶ added in v0.14.0
func (s *SyncOnCloseRenameFile) Sync() error
type WALOperation ¶
type WALOperation uint8
const WALDelete WALOperation = 2
const WALUpdate WALOperation = 1
func Decode ¶
func Decode(data []byte) (operation WALOperation, rootHash ledger.RootHash, update *ledger.TrieUpdate, err error)