wal

package
v0.39.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 9, 2022 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB

	WblDirName = "wbl"
)

Variables

This section is empty.

Functions

func DeleteCheckpoints

func DeleteCheckpoints(dir string, maxIndex int) error

DeleteCheckpoints deletes all checkpoints in a directory below a given index.

func LastCheckpoint

func LastCheckpoint(dir string) (string, int, error)

LastCheckpoint returns the directory name and index of the most recent checkpoint. If dir does not contain any checkpoints, ErrNotFound is returned.

func NewSegmentBufReader

func NewSegmentBufReader(segs ...*Segment) *segmentBufReader

nolint:revive // TODO: Consider exporting segmentBufReader

func NewSegmentBufReaderWithOffset

func NewSegmentBufReaderWithOffset(offset int, segs ...*Segment) (sbr *segmentBufReader, err error)

nolint:revive

func NewSegmentsRangeReader

func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error)

NewSegmentsRangeReader returns a new reader over the given WAL segment ranges. If first or last are -1, the range is open on the respective end.

func NewSegmentsReader

func NewSegmentsReader(dir string) (io.ReadCloser, error)

NewSegmentsReader returns a new reader over all segments in the directory.

func SegmentName

func SegmentName(dir string, i int) string

SegmentName builds a segment name for the directory.

func Segments

func Segments(walDir string) (first, last int, err error)

Segments returns the range [first, n] of currently existing segments. If no segments are found, first and n are -1.

Types

type CheckpointStats

type CheckpointStats struct {
	DroppedSeries     int
	DroppedSamples    int
	DroppedTombstones int
	DroppedExemplars  int
	DroppedMetadata   int
	TotalSeries       int // Processed series including dropped ones.
	TotalSamples      int // Processed samples including dropped ones.
	TotalTombstones   int // Processed tombstones including dropped ones.
	TotalExemplars    int // Processed exemplars including dropped ones.
	TotalMetadata     int // Processed metadata including dropped ones.
}

CheckpointStats returns stats about a created checkpoint.

func Checkpoint

func Checkpoint(logger log.Logger, w *WAL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error)

Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL. It includes the most recent checkpoint if it exists. All series not satisfying keep, samples/tombstones/exemplars below mint and metadata that are not the latest are dropped.

The checkpoint is stored in a directory named checkpoint.N in the same segmented format as the original WAL itself. This makes it easy to read it through the WAL package and concatenate it with the original WAL.

type CorruptionErr

type CorruptionErr struct {
	Dir     string
	Segment int
	Offset  int64
	Err     error
}

CorruptionErr is an error that's returned when corruption is encountered.

func (*CorruptionErr) Error

func (e *CorruptionErr) Error() string

type LiveReader

type LiveReader struct {
	// contains filtered or unexported fields
}

LiveReader reads WAL records from an io.Reader. It allows reading of WALs that are still in the process of being written, and returns records as soon as they can be read.

func NewLiveReader

func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader

NewLiveReader returns a new live reader.

func (*LiveReader) Err

func (r *LiveReader) Err() error

Err returns any errors encountered reading the WAL. io.EOFs are not terminal and Next can be tried again. Non-EOFs are terminal, and the reader should not be used again. It is up to the user to decide when to stop trying should io.EOF be returned.

func (*LiveReader) Next

func (r *LiveReader) Next() bool

Next returns true if Record() will contain a full record. If Next returns false, you should always checked the contents of Error(). Return false guarantees there are no more records if the segment is closed and not corrupt, otherwise if Err() == io.EOF you should try again when more data has been written.

func (*LiveReader) Offset

func (r *LiveReader) Offset() int64

Offset returns the number of bytes consumed from this segment.

func (*LiveReader) Record

func (r *LiveReader) Record() []byte

Record returns the current record. The returned byte slice is only valid until the next call to Next.

type LiveReaderMetrics

type LiveReaderMetrics struct {
	// contains filtered or unexported fields
}

LiveReaderMetrics holds all metrics exposed by the LiveReader.

func NewLiveReaderMetrics

func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics

NewLiveReaderMetrics instantiates, registers and returns metrics to be injected at LiveReader instantiation.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads WAL records from an io.Reader.

func NewReader

func NewReader(r io.Reader) *Reader

NewReader returns a new reader.

func (*Reader) Err

func (r *Reader) Err() error

Err returns the last encountered error wrapped in a corruption error. If the reader does not allow to infer a segment index and offset, a total offset in the reader stream will be provided.

func (*Reader) Next

func (r *Reader) Next() bool

Next advances the reader to the next records and returns true if it exists. It must not be called again after it returned false.

func (*Reader) Offset

func (r *Reader) Offset() int64

Offset returns the current position of the segment being read.

func (*Reader) Record

func (r *Reader) Record() []byte

Record returns the current record. The returned byte slice is only valid until the next call to Next.

func (*Reader) Segment

func (r *Reader) Segment() int

Segment returns the current segment being read.

type Segment

type Segment struct {
	SegmentFile
	// contains filtered or unexported fields
}

Segment represents a segment file.

func CreateSegment

func CreateSegment(dir string, k int) (*Segment, error)

CreateSegment creates a new segment k in dir.

func OpenReadSegment

func OpenReadSegment(fn string) (*Segment, error)

OpenReadSegment opens the segment with the given filename.

func OpenWriteSegment

func OpenWriteSegment(logger log.Logger, dir string, k int) (*Segment, error)

OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.

func (*Segment) Dir

func (s *Segment) Dir() string

Dir returns the directory of the segment.

func (*Segment) Index

func (s *Segment) Index() int

Index returns the index of the segment.

type SegmentFile

type SegmentFile interface {
	Stat() (os.FileInfo, error)
	Sync() error
	io.Writer
	io.Reader
	io.Closer
}

SegmentFile represents the underlying file used to store a segment.

type SegmentRange

type SegmentRange struct {
	Dir         string
	First, Last int
}

SegmentRange groups segments by the directory and the first and last index it includes.

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL is a write ahead log that stores records in segment files. It must be read from start to end once before logging new data. If an error occurs during read, the repair procedure must be called before it's safe to do further writes.

Segments are written to in pages of 32KB, with records possibly split across page boundaries. Records are never split across segments to allow full segments to be safely truncated. It also ensures that torn writes never corrupt records beyond the most recent segment.

func New

func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WAL, error)

New returns a new WAL over the given directory.

func NewSize

func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WAL, error)

NewSize returns a new WAL over the given directory. New segments are created with the specified size.

func Open

func Open(logger log.Logger, dir string) (*WAL, error)

Open an existing WAL.

func (*WAL) Close

func (w *WAL) Close() (err error)

Close flushes all writes and closes active segment.

func (*WAL) CompressionEnabled

func (w *WAL) CompressionEnabled() bool

CompressionEnabled returns if compression is enabled on this WAL.

func (*WAL) Dir

func (w *WAL) Dir() string

Dir returns the directory of the WAL.

func (*WAL) LastSegmentAndOffset

func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error)

LastSegmentAndOffset returns the last segment number of the WAL and the offset in that file upto which the segment has been filled.

func (*WAL) Log

func (w *WAL) Log(recs ...[]byte) error

Log writes the records into the log. Multiple records can be passed at once to reduce writes and increase throughput.

func (*WAL) NextSegment

func (w *WAL) NextSegment() (int, error)

NextSegment creates the next segment and closes the previous one asynchronously. It returns the file number of the new file.

func (*WAL) NextSegmentSync added in v0.39.0

func (w *WAL) NextSegmentSync() (int, error)

NextSegmentSync creates the next segment and closes the previous one in sync. It returns the file number of the new file.

func (*WAL) Repair

func (w *WAL) Repair(origErr error) error

Repair attempts to repair the WAL based on the error. It discards all data after the corruption.

func (*WAL) Size

func (w *WAL) Size() (int64, error)

Computing size of the WAL. We do this by adding the sizes of all the files under the WAL dir.

func (*WAL) Sync added in v0.39.0

func (w *WAL) Sync() error

Sync forces a file sync on the current wal segment. This function is meant to be used only on tests due to different behaviour on Operating Systems like windows and linux

func (*WAL) Truncate

func (w *WAL) Truncate(i int) (err error)

Truncate drops all segments before i.

type Watcher

type Watcher struct {

	// For testing, stop when we hit this segment.
	MaxSegment int
	// contains filtered or unexported fields
}

Watcher watches the TSDB WAL for a given WriteTo.

func NewWatcher

func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars bool) *Watcher

NewWatcher creates a new WAL watcher for a given WriteTo.

func (*Watcher) Run

func (w *Watcher) Run() error

Run the watcher, which will tail the WAL until the quit channel is closed or an error case is hit.

func (*Watcher) SetStartTime

func (w *Watcher) SetStartTime(t time.Time)

func (*Watcher) Start

func (w *Watcher) Start()

Start the Watcher.

func (*Watcher) Stop

func (w *Watcher) Stop()

Stop the Watcher.

type WatcherMetrics

type WatcherMetrics struct {
	// contains filtered or unexported fields
}

func NewWatcherMetrics

func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics

type WriteTo

type WriteTo interface {
	// Append and AppendExemplar should block until the samples are fully accepted,
	// whether enqueued in memory or successfully written to it's final destination.
	// Once returned, the WAL Watcher will not attempt to pass that data again.
	Append([]record.RefSample) bool
	AppendExemplars([]record.RefExemplar) bool
	StoreSeries([]record.RefSeries, int)

	// Next two methods are intended for garbage-collection: first we call
	// UpdateSeriesSegment on all current series
	UpdateSeriesSegment([]record.RefSeries, int)
	// Then SeriesReset is called to allow the deletion
	// of all series created in a segment lower than the argument.
	SeriesReset(int)
}

WriteTo is an interface used by the Watcher to send the samples it's read from the WAL on to somewhere else. Functions will be called concurrently and it is left to the implementer to make sure they are safe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL