wal

package module
v1.3.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2024 License: Apache-2.0 Imports: 11 Imported by: 15

README

wal

Write Ahead Log for LSM or bitcask storage.

Key Features

  • Disk based, support large data volume
  • Append only write, high performance
  • Fast read, one disk seek to retrieve any value
  • Support batch write, all data in a batch will be written in a single disk seek
  • Iterate all data in wal with NewReader function
  • Extremely fast read
  • Support concurrent write and read, all functions are thread safe

Design Overview

wal-logo.png

Format

Format of a single segment file:

       +-----+-------------+--+----+----------+------+-- ... ----+
 File  | r0  |      r1     |P | r2 |    r3    |  r4  |           |
       +-----+-------------+--+----+----------+------+-- ... ----+
       |<---- BlockSize ----->|<---- BlockSize ----->|

  rn = variable size records
  P = Padding
  BlockSize = 32KB

Format of a single record:

+----------+-------------+-----------+--- ... ---+
| CRC (4B) | Length (2B) | Type (1B) |  Payload  |
+----------+-------------+-----------+--- ... ---+

CRC = 32-bit hash computed over the payload using CRC
Length = Length of the payload data
Type = Type of record
       (FullType, FirstType, MiddleType, LastType)
       The type is used to group a bunch of records together to represent
       blocks that are larger than BlockSize
Payload = Byte stream as long as specified by the payload size

Getting Started

func main() {
	wal, _ := wal.Open(wal.DefaultOptions)
	// write some data
	chunkPosition, _ := wal.Write([]byte("some data 1"))
	// read by the position
	val, _ := wal.Read(chunkPosition)
	fmt.Println(string(val))

	wal.Write([]byte("some data 2"))
	wal.Write([]byte("some data 3"))

	// iterate all data in wal
	reader := wal.NewReader()
	for {
		val, pos, err := reader.Next()
		if err == io.EOF {
			break
		}
		fmt.Println(string(val))
		fmt.Println(pos) // get position of the data for next read
	}
}

Documentation

Index

Constants

View Source
const (
	B  = 1
	KB = 1024 * B
	MB = 1024 * KB
	GB = 1024 * MB
)

Variables

View Source
var (
	ErrClosed     = errors.New("the segment file is closed")
	ErrInvalidCRC = errors.New("invalid crc, the data may be corrupted")
)
View Source
var (
	ErrValueTooLarge       = errors.New("the data size can't larger than segment size")
	ErrPendingSizeTooLarge = errors.New("the upper bound of pendingWrites can't larger than segment size")
)
View Source
var DefaultOptions = Options{
	DirPath:        os.TempDir(),
	SegmentSize:    GB,
	SegmentFileExt: ".SEG",
	Sync:           false,
	BytesPerSync:   0,
}

Functions

func SegmentFileName added in v1.1.0

func SegmentFileName(dirPath string, extName string, id SegmentID) string

SegmentFileName returns the file name of a segment file.

Types

type ChunkPosition

type ChunkPosition struct {
	SegmentId SegmentID
	// BlockNumber The block number of the chunk in the segment file.
	BlockNumber uint32
	// ChunkOffset The start offset of the chunk in the segment file.
	ChunkOffset int64
	// ChunkSize How many bytes the chunk data takes up in the segment file.
	ChunkSize uint32
}

ChunkPosition represents the position of a chunk in a segment file. Used to read the data from the segment file.

func DecodeChunkPosition added in v1.3.0

func DecodeChunkPosition(buf []byte) *ChunkPosition

DecodeChunkPosition decodes the chunk position from a byte slice. You can encode it by calling wal.ChunkPosition.Encode().

func (*ChunkPosition) Encode added in v1.3.0

func (cp *ChunkPosition) Encode() []byte

Encode encodes the chunk position to a byte slice. Return the slice with the actual occupied elements. You can decode it by calling wal.DecodeChunkPosition().

func (*ChunkPosition) EncodeFixedSize added in v1.3.4

func (cp *ChunkPosition) EncodeFixedSize() []byte

EncodeFixedSize encodes the chunk position to a byte slice. Return a slice of size "maxLen". You can decode it by calling wal.DecodeChunkPosition().

type ChunkType

type ChunkType = byte
const (
	ChunkTypeFull ChunkType = iota
	ChunkTypeFirst
	ChunkTypeMiddle
	ChunkTypeLast
)

type Options

type Options struct {
	// DirPath specifies the directory path where the WAL segment files will be stored.
	DirPath string

	// SegmentSize specifies the maximum size of each segment file in bytes.
	SegmentSize int64

	// SegmentFileExt specifies the file extension of the segment files.
	// The file extension must start with a dot ".", default value is ".SEG".
	// It is used to identify the different types of files in the directory.
	// Now it is used by rosedb to identify the segment files and hint files.
	// Not a common usage for most users.
	SegmentFileExt string

	// Sync is whether to synchronize writes through os buffer cache and down onto the actual disk.
	// Setting sync is required for durability of a single write operation, but also results in slower writes.
	//
	// If false, and the machine crashes, then some recent writes may be lost.
	// Note that if it is just the process that crashes (machine does not) then no writes will be lost.
	//
	// In other words, Sync being false has the same semantics as a write
	// system call. Sync being true means write followed by fsync.
	Sync bool

	// BytesPerSync specifies the number of bytes to write before calling fsync.
	BytesPerSync uint32
}

Options represents the configuration options for a Write-Ahead Log (WAL).

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader represents a reader for the WAL. It consists of segmentReaders, which is a slice of segmentReader structures sorted by segment id, and currentReader, which is the index of the current segmentReader in the slice.

The currentReader field is used to iterate over the segmentReaders slice.

func (*Reader) CurrentChunkPosition added in v1.2.0

func (r *Reader) CurrentChunkPosition() *ChunkPosition

CurrentChunkPosition returns the position of the current chunk data

func (*Reader) CurrentSegmentId added in v1.1.0

func (r *Reader) CurrentSegmentId() SegmentID

CurrentSegmentId returns the id of the current segment file when reading the WAL.

func (*Reader) Next

func (r *Reader) Next() ([]byte, *ChunkPosition, error)

Next returns the next chunk data and its position in the WAL. If there is no data, io.EOF will be returned.

The position can be used to read the data from the segment file.

func (*Reader) SkipCurrentSegment added in v1.1.0

func (r *Reader) SkipCurrentSegment()

SkipCurrentSegment skips the current segment file when reading the WAL.

It is now used by the Merge operation of rosedb, not a common usage for most users.

type SegmentID

type SegmentID = uint32

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL represents a Write-Ahead Log structure that provides durability and fault-tolerance for incoming writes. It consists of an activeSegment, which is the current segment file used for new incoming writes, and olderSegments, which is a map of segment files used for read operations.

The options field stores various configuration options for the WAL.

The mu sync.RWMutex is used for concurrent access to the WAL data structure, ensuring safe access and modification.

func Open

func Open(options Options) (*WAL, error)

Open opens a WAL with the given options. It will create the directory if not exists, and open all segment files in the directory. If there is no segment file in the directory, it will create a new one.

func (*WAL) ActiveSegmentID added in v1.1.0

func (wal *WAL) ActiveSegmentID() SegmentID

ActiveSegmentID returns the id of the active segment file.

func (*WAL) ClearPendingWrites added in v1.3.4

func (wal *WAL) ClearPendingWrites()

ClearPendingWrites clear pendingWrite and reset pendingSize

func (*WAL) Close

func (wal *WAL) Close() error

Close closes the WAL.

func (*WAL) Delete added in v1.2.0

func (wal *WAL) Delete() error

Delete deletes all segment files of the WAL.

func (*WAL) IsEmpty added in v1.1.0

func (wal *WAL) IsEmpty() bool

IsEmpty returns whether the WAL is empty. Only there is only one empty active segment file, which means the WAL is empty.

func (*WAL) NewReader

func (wal *WAL) NewReader() *Reader

NewReader returns a new reader for the WAL. It will iterate all segment files and read all data from them.

func (*WAL) NewReaderWithMax added in v1.1.0

func (wal *WAL) NewReaderWithMax(segId SegmentID) *Reader

NewReaderWithMax returns a new reader for the WAL, and the reader will only read the data from the segment file whose id is less than or equal to the given segId.

It is now used by the Merge operation of rosedb, not a common usage for most users.

func (*WAL) NewReaderWithStart added in v1.2.0

func (wal *WAL) NewReaderWithStart(startPos *ChunkPosition) (*Reader, error)

NewReaderWithStart returns a new reader for the WAL, and the reader will only read the data from the segment file whose position is greater than or equal to the given position.

func (*WAL) OpenNewActiveSegment added in v1.1.0

func (wal *WAL) OpenNewActiveSegment() error

OpenNewActiveSegment opens a new segment file and sets it as the active segment file. It is used when even the active segment file is not full, but the user wants to create a new segment file.

It is now used by Merge operation of rosedb, not a common usage for most users.

func (*WAL) PendingWrites added in v1.3.4

func (wal *WAL) PendingWrites(data []byte)

PendingWrites add data to wal.pendingWrites and wait for batch write. If the data in pendingWrites exceeds the size of one segment, it will return a 'ErrPendingSizeTooLarge' error and clear the pendingWrites.

func (*WAL) Read

func (wal *WAL) Read(pos *ChunkPosition) ([]byte, error)

Read reads the data from the WAL according to the given position.

func (*WAL) RenameFileExt added in v1.3.4

func (wal *WAL) RenameFileExt(ext string) error

RenameFileExt renames all segment files' extension name. It is now used by the Merge operation of loutsdb, not a common usage for most users.

func (*WAL) SetIsStartupTraversal added in v1.3.8

func (wal *WAL) SetIsStartupTraversal(v bool)

SetIsStartupTraversal This is only used if the WAL is during startup traversal. Such as rosedb/lotusdb startup, so it's not a common usage for most users. And notice that if you set it to true, only one reader can read the data from the WAL (Single Thread).

func (*WAL) Sync

func (wal *WAL) Sync() error

Sync syncs the active segment file to stable storage like disk.

func (*WAL) Write

func (wal *WAL) Write(data []byte) (*ChunkPosition, error)

Write writes the data to the WAL. Actually, it writes the data to the active segment file. It returns the position of the data in the WAL, and an error if any.

func (*WAL) WriteAll added in v1.3.4

func (wal *WAL) WriteAll() ([]*ChunkPosition, error)

WriteAll write wal.pendingWrites to WAL and then clear pendingWrites, it will not sync the segment file based on wal.options, you should call Sync() manually.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL