wal

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2025 License: BSD-3-Clause Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseLogFilename

func ParseLogFilename(name string) (NumWAL, LogNameIndex, bool)

ParseLogFilename takes a base filename and parses it into its constituent NumWAL and LogNameIndex. If the filename is not a log file, it returns false for the final return value.

Types

type CreateInfo

type CreateInfo struct {
	// JobID is the ID of the job the caused the WAL to be created.
	//
	// TODO(sumeer): for a file created later due to the need to failover, we
	// need to provide a JobID generator func in Options.
	JobID int
	// Path to the file. This includes the NumWAL, and implicitly or explicitly
	// includes the logNameIndex.
	Path string
	// IsSecondary is true if the file was created on the secondary.
	IsSecondary bool
	// Num is the WAL number.
	Num NumWAL
	// RecycledFileNum is the file number of a previous log file which was
	// recycled to create this one. Zero if recycling did not take place.
	RecycledFileNum base.DiskFileNum
	// Err contains any error.
	Err error
}

CreateInfo contains info about a log file creation event.

type DeletableLog

type DeletableLog struct {
	vfs.FS
	// Path to the file.
	Path string
	NumWAL
	ApproxFileSize uint64
}

DeletableLog contains information about a log file that can be deleted.

type Dir

type Dir struct {
	FS      vfs.FS
	Dirname string
}

Dir is used for storing log files.

type EventListener

type EventListener interface {
	// LogCreated informs the listener of a log file creation.
	LogCreated(CreateInfo)
}

EventListener is called on events, like log file creation.

type FailoverOptions

type FailoverOptions struct {
	// PrimaryDirProbeInterval is the interval for probing the primary dir, when
	// the WAL is being written to the secondary, to decide when to fail back.
	PrimaryDirProbeInterval time.Duration
	// HealthyProbeLatencyThreshold is the latency threshold to declare that the
	// primary is healthy again.
	HealthyProbeLatencyThreshold time.Duration
	// HealthyInterval is the time interval over which the probes have to be
	// healthy. That is, we look at probe history of length
	// HealthyInterval/PrimaryDirProbeInterval.
	HealthyInterval time.Duration

	// UnhealthySamplingInterval is the interval for sampling ongoing calls and
	// errors in the latest LogWriter.
	UnhealthySamplingInterval time.Duration
	// UnhealthyOperationLatencyThreshold is the latency threshold that is
	// considered unhealthy, for operations done by a LogWriter. The second return
	// value indicates whether we should consider failover at all. If the second
	// return value is false, failover is disabled.
	UnhealthyOperationLatencyThreshold func() (time.Duration, bool)

	// ElevatedWriteStallThresholdLag is the duration for which an elevated
	// threshold should continue after a switch back to the primary dir. This is
	// because we may have accumulated many unflushed memtables and flushing
	// them can take some time. Maybe set to 60s.
	ElevatedWriteStallThresholdLag time.Duration
	// contains filtered or unexported fields
}

FailoverOptions are options that are specific to failover mode.

func (*FailoverOptions) EnsureDefaults

func (o *FailoverOptions) EnsureDefaults()

EnsureDefaults ensures that the default values for all options are set if a valid value was not already specified.

type FailoverStats

type FailoverStats struct {
	// DirSwitchCount is the number of times WAL writing has switched to a
	// different directory, either due to failover, when the current dir is
	// unhealthy, or to failback to the primary, when the primary is healthy
	// again.
	DirSwitchCount int64

	// PrimaryWriteDuration is the cumulative duration for which WAL writes are
	// using the primary directory.
	PrimaryWriteDuration time.Duration
	// SecondaryWriteDuration is the cumulative duration for which WAL writes
	// are using the secondary directory.
	SecondaryWriteDuration time.Duration

	// FailoverWriteAndSyncLatency measures the latency of writing and syncing a
	// set of writes that were synced together. Each sample represents the
	// highest latency observed across the writes in the set of writes. It gives
	// us a sense of the user-observed latency, which can be much lower than the
	// underlying fsync latency, when WAL failover is working effectively.
	FailoverWriteAndSyncLatency prometheus.Histogram
}

FailoverStats contains stats about WAL failover. These are empty if failover is not configured.

type FileAccumulator

type FileAccumulator struct {
	// contains filtered or unexported fields
}

FileAccumulator parses and accumulates log files.

func (*FileAccumulator) Finish

func (a *FileAccumulator) Finish() Logs

Finish returns a Logs constructed from the physical files observed through MaybeAccumulate.

func (*FileAccumulator) MaybeAccumulate

func (a *FileAccumulator) MaybeAccumulate(fs vfs.FS, path string) (isLogFile bool, err error)

MaybeAccumulate parses the provided path's filename. If the filename indicates the file is a write-ahead log, MaybeAccumulate updates its internal state to remember the file and returns isLogFile=true. An error is returned if the file is a duplicate.

type LogNameIndex

type LogNameIndex uint32

LogNameIndex numbers log files within a WAL.

func (LogNameIndex) String

func (li LogNameIndex) String() string

String implements fmt.Stringer.

type LogRecycler

type LogRecycler struct {
	// contains filtered or unexported fields
}

LogRecycler recycles WAL log files. It holds a set of log file numbers that are available for reuse. Writing to a recycled log file is faster than to a new log file on some common filesystems (xfs, and ext3/4) due to avoiding metadata updates.

func (*LogRecycler) Add

func (r *LogRecycler) Add(logInfo base.FileInfo) bool

Add attempts to recycle the log file specified by logInfo. Returns true if the log file should not be deleted (i.e. the log is being recycled), and false otherwise.

func (*LogRecycler) Init

func (r *LogRecycler) Init(maxNumLogFiles int)

Init initialized the LogRecycler.

func (*LogRecycler) LogNumsForTesting

func (r *LogRecycler) LogNumsForTesting() []base.DiskFileNum

LogNumsForTesting returns the current set of recyclable logs.

func (*LogRecycler) MinRecycleLogNum

func (r *LogRecycler) MinRecycleLogNum() NumWAL

MinRecycleLogNum returns the current minimum log number that is allowed to be recycled.

func (*LogRecycler) Peek

func (r *LogRecycler) Peek() (base.FileInfo, bool)

Peek returns the log at the head of the recycling queue, or the zero value fileInfo and false if the queue is empty.

func (*LogRecycler) Pop

func (r *LogRecycler) Pop(logNum base.DiskFileNum) error

Pop removes the log number at the head of the recycling queue, enforcing that it matches the specified logNum. An error is returned of the recycling queue is empty or the head log number does not match the specified one.

func (*LogRecycler) SetMinRecycleLogNum

func (r *LogRecycler) SetMinRecycleLogNum(n NumWAL)

SetMinRecycleLogNum sets the minimum log number that is allowed to be recycled.

func (*LogRecycler) Stats

func (r *LogRecycler) Stats() (count int, size uint64)

Stats return current stats.

type LogicalLog

type LogicalLog struct {
	Num NumWAL
	// contains filtered or unexported fields
}

A LogicalLog identifies a logical WAL and its consituent segment files.

func (LogicalLog) NumSegments

func (ll LogicalLog) NumSegments() int

NumSegments returns the number of constituent physical log files that make up the log.

func (LogicalLog) OpenForRead

func (ll LogicalLog) OpenForRead() Reader

OpenForRead a logical WAL for reading.

func (LogicalLog) PhysicalSize

func (ll LogicalLog) PhysicalSize() (uint64, error)

PhysicalSize stats each of the log's physical files, summing their sizes.

func (LogicalLog) SegmentLocation

func (ll LogicalLog) SegmentLocation(i int) (vfs.FS, string)

SegmentLocation returns the FS and path for the i-th physical segment file.

func (LogicalLog) String

func (ll LogicalLog) String() string

String implements fmt.Stringer.

type Logs

type Logs []LogicalLog

Logs holds a collection of WAL files, in increasing order of NumWAL.

func Scan

func Scan(dirs ...Dir) (Logs, error)

Scan finds all log files in the provided directories. It returns an ordered list of WALs in increasing NumWAL order.

func (Logs) Get

func (l Logs) Get(num NumWAL) (LogicalLog, bool)

Get retrieves the WAL with the given number if present. The second return value indicates whether or not the WAL was found.

type Manager

type Manager interface {

	// List returns the virtual WALs in ascending order.
	List() (Logs, error)
	// Obsolete informs the manager that all virtual WALs less than
	// minUnflushedNum are obsolete. The callee can choose to recycle some
	// underlying log files, if !noRecycle. The log files that are not recycled,
	// and therefore can be deleted, are returned. The deletable files are no
	// longer tracked by the manager.
	Obsolete(minUnflushedNum NumWAL, noRecycle bool) (toDelete []DeletableLog, err error)
	// Create creates a new virtual WAL.
	//
	// NumWALs passed to successive Create calls must be monotonically
	// increasing, and be greater than any NumWAL seen earlier. The caller must
	// close the previous Writer before calling Create.
	//
	// jobID is used for the WALEventListener.
	Create(wn NumWAL, jobID int) (Writer, error)
	// ElevateWriteStallThresholdForFailover returns true if the caller should
	// use a high write stall threshold because the WALs are being written to
	// the secondary dir.
	ElevateWriteStallThresholdForFailover() bool
	// Stats returns the latest Stats.
	Stats() Stats
	// Close the manager.
	// REQUIRES: Writers and Readers have already been closed.
	Close() error

	// RecyclerForTesting exposes the internal LogRecycler.
	RecyclerForTesting() *LogRecycler
	// contains filtered or unexported methods
}

Manager handles all WAL work.

  • Obsolete can be called concurrently with WAL writing.
  • WAL writing: Is done via Create, and the various Writer methods. These are required to be serialized via external synchronization (specifically, the caller does it via commitPipeline.mu).

func Init

func Init(o Options, initial Logs) (Manager, error)

Init constructs and initializes a WAL manager from the provided options and the set of initial logs.

type NumWAL

type NumWAL base.DiskFileNum

NumWAL is the number of the virtual WAL. It can map to one or more physical log files. In standalone mode, it will map to exactly one log file. In failover mode, it can map to many log files, which are totally ordered (using a dense logNameIndex).

In general, WAL refers to the virtual WAL, and file refers to a log file. The Pebble MANIFEST only knows about virtual WALs and assigns numbers to them. Additional mapping to one or more files happens in this package. If a WAL maps to multiple files, the source of truth regarding that mapping is the contents of the directories.

func (NumWAL) String

func (s NumWAL) String() string

String implements fmt.Stringer.

type Offset

type Offset struct {
	// PhysicalFile is the path to the physical file containing a particular
	// record.
	PhysicalFile string
	// Physical indicates the file offset at which a record begins within
	// the physical file named by PhysicalFile.
	Physical int64
	// PreviousFilesBytes is the bytes read from all the previous physical
	// segment files that have been read up to the current log segment. If WAL
	// failover is not in use, PreviousFileBytes will always be zero. Otherwise,
	// it may be non-zero when replaying records from multiple segment files
	// that make up a single logical WAL.
	PreviousFilesBytes int64
}

Offset indicates the offset or position of a record within a WAL.

func (Offset) String

func (o Offset) String() string

String implements fmt.Stringer, returning a string representation of the offset.

type Options

type Options struct {
	// Primary dir for storing WAL files. It must already be created and synced
	// up to the root.
	Primary Dir
	// Secondary is used for failover. Optional. It must already be created and
	// synced up to the root.
	Secondary Dir

	// MinUnflushedLogNum is the smallest WAL number corresponding to
	// mutations that have not been flushed to a sstable.
	MinUnflushedWALNum NumWAL

	// MaxNumRecyclableLogs is the maximum number of log files to maintain for
	// recycling.
	MaxNumRecyclableLogs int

	// NoSyncOnClose is documented in SyncingFileOptions.
	NoSyncOnClose bool
	// BytesPerSync is documented in SyncingFileOptions.
	BytesPerSync int
	// PreallocateSize is documented in SyncingFileOptions.
	PreallocateSize func() int

	// MinSyncInterval is documented in Options.WALMinSyncInterval.
	MinSyncInterval func() time.Duration
	// FsyncLatency records fsync latency. This doesn't differentiate between
	// fsyncs on the primary and secondary dir.
	//
	// TODO(sumeer): consider separating out into two histograms.
	FsyncLatency prometheus.Histogram
	// QueueSemChan is the channel to pop from when popping from queued records
	// that have requested a sync. It's original purpose was to function as a
	// semaphore that prevents the record.LogWriter.flusher.syncQueue from
	// overflowing (which will cause a panic). It is still useful in that role
	// when the WALManager is configured in standalone mode. In failover mode
	// there is no syncQueue, so the pushback into the commit pipeline is
	// unnecessary, but possibly harmless.
	QueueSemChan chan struct{}

	// Logger for logging.
	Logger base.Logger

	// EventListener is called on events, like log file creation.
	EventListener EventListener

	FailoverOptions
	// FailoverWriteAndSyncLatency is only populated when WAL failover is
	// configured.
	FailoverWriteAndSyncLatency prometheus.Histogram
}

Options provides configuration for the Manager.

func (*Options) Dirs

func (o *Options) Dirs() []Dir

Dirs returns the primary Dir and the secondary if provided.

type Reader

type Reader interface {
	// NextRecord returns a reader for the next record. It returns io.EOF if there
	// are no more records. The reader returned becomes stale after the next NextRecord
	// call, and should no longer be used.
	NextRecord() (io.Reader, Offset, error)
	// Close the reader.
	Close() error
}

Reader reads a virtual WAL.

type RefCount

type RefCount interface {
	// Ref increments the reference count.
	Ref()
	// Unref increments the reference count.
	Unref()
}

RefCount is a reference count associated with a record passed to [Writer.WriteRecord]. See the comment on WriteRecord.

type StandaloneManager

type StandaloneManager struct {
	// contains filtered or unexported fields
}

StandaloneManager implements Manager with a single log file per WAL (no failover capability).

func (*StandaloneManager) Close

func (m *StandaloneManager) Close() error

Close implements Manager.

func (*StandaloneManager) Create

func (m *StandaloneManager) Create(wn NumWAL, jobID int) (Writer, error)

Create implements Manager.

func (*StandaloneManager) ElevateWriteStallThresholdForFailover

func (m *StandaloneManager) ElevateWriteStallThresholdForFailover() bool

ElevateWriteStallThresholdForFailover implements Manager.

func (*StandaloneManager) List

func (m *StandaloneManager) List() (Logs, error)

List implements Manager.

func (*StandaloneManager) Obsolete

func (m *StandaloneManager) Obsolete(
	minUnflushedNum NumWAL, noRecycle bool,
) (toDelete []DeletableLog, err error)

Obsolete implements Manager.

func (*StandaloneManager) RecyclerForTesting

func (m *StandaloneManager) RecyclerForTesting() *LogRecycler

RecyclerForTesting implements Manager.

func (*StandaloneManager) Stats

func (m *StandaloneManager) Stats() Stats

Stats implements Manager.

type Stats

type Stats struct {
	// ObsoleteFileCount is the number of obsolete log files.
	ObsoleteFileCount int
	// ObsoleteFileSize is the total size of obsolete log files.
	ObsoleteFileSize uint64
	// LiveFileCount is the number of live log files.
	LiveFileCount int
	// LiveFileSize is the total size of live log files. This can be higher than
	// LiveSize due to log recycling (a live log file may be larger than the
	// size used in its latest incarnation), or failover (resulting in multiple
	// log files containing the same records).
	//
	// This is updated only when log files are closed, to minimize
	// synchronization.
	LiveFileSize uint64
	// Failover contains failover stats.
	Failover FailoverStats
}

Stats exposes stats used in Pebble metrics.

NB: Metrics.WAL.{Size,BytesIn,BytesWritten} are not maintained by the wal package.

TODO(sumeer): with failover, Metrics.WAL.BytesWritten needs to be maintained here.

type SyncOptions

type SyncOptions struct {
	Done *sync.WaitGroup
	Err  *error
}

SyncOptions has non-nil Done and Err when fsync is requested, else both are nil.

type Writer

type Writer interface {
	// WriteRecord writes a complete record. The record is asynchronously
	// persisted to the underlying writer. If SyncOptions.Done != nil, the wait
	// group will be notified when durability is guaranteed or an error has
	// occurred (set in SyncOptions.Err). External synchronisation provided by
	// commitPipeline.mu guarantees that WriteRecord calls are serialized.
	//
	// The logicalOffset is the logical size of the WAL after this record is
	// written. If the WAL corresponds to a single log file, this is the offset
	// in that log file.
	//
	// Some Writer implementations may continue to read p after WriteRecord
	// returns. This is an obstacle to reusing p's memory. If the caller would
	// like to reuse p's memory, the caller may pass a non-nil [RefCount].  If
	// the Writer will retain p, it will invoke the [RefCount] before returning.
	// When it's finished, it will invoke [RefCount.Unref] to release its
	// reference.
	WriteRecord(p []byte, opts SyncOptions, ref RefCount) (logicalOffset int64, err error)
	// Close the writer.
	Close() (logicalOffset int64, err error)
	// Metrics must be called after Close. The callee will no longer modify the
	// returned LogWriterMetrics.
	Metrics() record.LogWriterMetrics
}

Writer writes to a virtual WAL. A Writer in standalone mode maps to a single record.LogWriter. In failover mode, it can failover across multiple physical log files.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL