wal

package
v3.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: AGPL-3.0 Imports: 22 Imported by: 1

README

Loki New Object Storage WAL

Principles

  • The WAL can be streamed to a file or remote object storage.
  • When building WAL segments in the ingester, prioritize colocation first by tenant and then by series. This allows efficient reading during compaction and querying.
  • At compaction, chunks from the WAL should be reusable and writable to the new block format without decompression.

We aim for at least 8MB WAL segments, preferably larger. In a cluster with a 32MB/s write rate, using 4 ingesters will suffice, halving the current ingester requirement.

Overview

Multitenancy is achieved by storing the tenant as a label __0_tenant_id__ in the index to ensure sorting by tenant first. This label is not exposed to users and is removed during compaction.

┌──────────────────────────────┐
│    Magic Header ("LOKW")     │
│           (4 bytes)          │
├──────────────────────────────┤
│ ┌──────────────────────────┐ │
│ │         Chunk 1          │ │
│ ├──────────────────────────┤ │
│ │          ...             │ │
│ ├──────────────────────────┤ │
│ │         Chunk N          │ │
│ └──────────────────────────┘ │
├──────────────────────────────┤
│           Index              │
├──────────────────────────────┤
│       Index Len (4b)         │
├──────────────────────────────┤
│    Version (1 byte)          │
├──────────────────────────────┤
│    Magic Footer ("LOKW")     │
│           (4 bytes)          │
└──────────────────────────────┘

Index

The index format is designed to enable efficient seeking to specific chunks required for recent queries. Inspired by the Prometheus tsdb index, it has some key differences, particularly in the chunk reference within the Series tables. This reference contains sufficient information to seek directly to the chunk in the WAL (Write-Ahead Log).

┌────────────────────────────────────────────────────────────────────────────┐
│ len <uvarint>                                                              │
├────────────────────────────────────────────────────────────────────────────┤
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │                     labels count <uvarint64>                           │ │
│ ├────────────────────────────────────────────────────────────────────────┤ │
│ │              ┌────────────────────────────────────────────────┐        │ │
│ │              │ ref(l_i.name) <uvarint32>                      │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ ref(l_i.value) <uvarint32>                     │        │ │
│ │              └────────────────────────────────────────────────┘        │ │
│ │                             ...                                        │ │
│ ├────────────────────────────────────────────────────────────────────────┤ │
│ │                     chunks count <uvarint64>                           │ │
│ ├────────────────────────────────────────────────────────────────────────┤ │
│ │              ┌────────────────────────────────────────────────┐        │ │
│ │              │ c_0.mint <varint64>                            │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ c_0.maxt - c_0.mint <uvarint64>                │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ ref(c_0.data) <uvarint64>                      │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ c_0.entries <uvarint32>                        │        │ │
│ │              └────────────────────────────────────────────────┘        │ │
│ │              ┌────────────────────────────────────────────────┐        │ │
│ │              │ c_i.mint - c_i-1.maxt <uvarint64>              │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ c_i.maxt - c_i.mint <uvarint64>                │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ ref(c_i.data) - ref(c_i-1.data) <varint64>     │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ c_i.entries <uvarint32>                        │        │ │
│ │              └────────────────────────────────────────────────┘        │ │
│ │                             ...                                        │ │
│ ├────────────────────────────────────────────────────────────────────────┤ │
│ │              ┌────────────────────────────────────────────────┐        │ │
│ │              │ last_chunk.mint - prev_chunk.maxt <uvarint64>  │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ last_chunk.maxt - last_chunk.mint <uvarint64>  │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ ref(last_chunk.data) - ref(prev_chunk.data)    │        │ │
│ │              │ <varint64>                                     │        │ │
│ │              ├────────────────────────────────────────────────┤        │ │
│ │              │ last_chunk.entries <uvarint32>                 │        │ │
│ │              └────────────────────────────────────────────────┘        │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
├────────────────────────────────────────────────────────────────────────────┤
│ CRC32 <4b>                                                                 │
└────────────────────────────────────────────────────────────────────────────┘

Note: data_len for all entries except the last one is inferred from the offset of the next entry.

Explanation
  • len : The length of the series entry.
  • labels count : The number of labels in the series.
  • ref(l_i.name) : Reference to the label name in the symbol table.
  • ref(l_i.value) : Reference to the label value in the symbol table.
  • chunks count : The number of chunks in the series.
  • c_0.mint : Minimum timestamp of the first chunk.
  • c_0.maxt - c_0.mint : Time delta between the minimum and maximum timestamp of the first chunk.
  • ref(c_0.data) : Reference to the chunk data.
  • c_0.entries : Number of entries in the chunk.
  • c_i.mint - c_i-1.maxt : Time delta between the minimum timestamp of the current chunk and the maximum timestamp of the previous chunk.
  • c_i.maxt - c_i.mint : Time delta between the minimum and maximum timestamp of the current chunk.
  • ref(c_i.data) - ref(c_i-1.data) : Delta between the current chunk reference and the previous chunk reference.
  • c_i.entries : Number of entries in the chunk.
  • CRC32 <4b>: CRC32 checksum of the series entry.

Chunks

Chunk Format Overview

The chunk format is structured to efficiently store and retrieve log data. It starts with a byte that indicates the encoding used for the raw logs, followed by a sequence of double-delta encoded timestamps and the lengths of each log line. Finally, it includes the raw compressed logs and a CRC32 checksum for the metadata.

Key Components of the Chunk Format
  1. Initial Byte:

    • Indicates the encoding (compression format, we'll start with 1 for snappy ) used for the raw logs.
  2. Timestamps and Lengths:

    • A sequence of double-delta encoded timestamps.
    • Lengths of each log line.
  3. Raw Compressed Logs:

    • The actual log data, compressed for efficiency.
  4. CRC32 Checksum:

    • Ensures the integrity of the metadata.

Unlike the current Loki chunk format, this format does not use smaller blocks because WAL (Write-Ahead Log) segments are typically created within seconds.

Structure of a Chunk
┌──────────────────────────────────────────────────────────────────────────┐
│ encoding (1 byte)                                                        │
├──────────────────────────────────────────────────────────────────────────┤
│ #entries <uvarint>                                                       │
├──────────────────────────────────────────────────────────────────────────┤
│ ts_0 <uvarint>                                                           │
├──────────────────────────────────────────────────────────────────────────┤
│ len_line_0 <uvarint>                                                     │
├──────────────────────────────────────────────────────────────────────────┤
│ ts_1_delta <uvarint>                                                     │
├──────────────────────────────────────────────────────────────────────────┤
│ len_line_1 <uvarint>                                                     │
├──────────────────────────────────────────────────────────────────────────┤
│ ts_2_dod <varint>                                                        │
├──────────────────────────────────────────────────────────────────────────┤
│ len_line_2 <uvarint>                                                     │
├──────────────────────────────────────────────────────────────────────────┤
│ ...                                                                      │
├──────────────────────────────────────────────────────────────────────────┤
│ compressed logs <bytes>                                                  │
├──────────────────────────────────────────────────────────────────────────┤
| compressed logs offset <4b>                                              |
├──────────────────────────────────────────────────────────────────────────┤
│ crc32 <4 bytes>                                                          │
└──────────────────────────────────────────────────────────────────────────┘
Explanation
  • encoding (1 byte): Indicates the encoding used for the raw logs (e.g., 0 for no compression, 1 for gzip, etc.).
  • #entries : The number of log entries in the chunk.
  • ts_0 : The initial timestamp, with nanosecond precision.
  • len_line_0 : The length of the first log line.
  • ts_1_delta : The delta from the initial timestamp to the second timestamp.
  • len_line_1 : The length of the second log line.
  • ts_2_dod : The delta of deltas, representing the difference from the previous delta (i.e., double-delta encoding). Can be negative if the spacing between points is decreasing.
  • len_line_2 : The length of the third log line.
  • compressed logs : The actual log data, compressed according to the specified encoding.
  • compressed logs offset <4 bytes>: The offset of the compressed log data.
  • crc32 (4 bytes): CRC32 checksum for the metadata (excluding the compressed data), ensuring the integrity of the timestamp and length information.

The offset to the compressed logs is known from the index, allowing efficient access and decompression. The CRC32 checksum at the end verifies the integrity of the metadata, as the compressed data typically includes its own CRC for verification.

This structure ensures efficient storage and retrieval of log entries, utilizing double-delta encoding for timestamps and compressing the log data to save space. The timestamps are precise to nanoseconds, allowing for high-resolution time tracking.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed is returned when the WAL is closed. It is a permanent error
	// as once closed, a WAL cannot be re-opened.
	ErrClosed = errors.New("WAL is closed")

	// ErrFull is returned when the WAL is full. It is a transient error that
	// happens when all segments are either in the pending list waiting to be
	// flushed or in the process of being flushed.
	ErrFull = errors.New("WAL is full")
)
View Source
var (
	Dir = "loki-v2/wal/anon/"
)

LOKW is the magic number for the Loki WAL format.

Functions

func ReportSegmentStats

func ReportSegmentStats(s SegmentStats, m *SegmentMetrics)

ReportSegmentStats reports the stats as metrics.

Types

type AppendRequest

type AppendRequest struct {
	TenantID  string
	Labels    labels.Labels
	LabelsStr string
	Entries   []*logproto.Entry
}

type AppendResult

type AppendResult struct {
	// contains filtered or unexported fields
}

AppendResult contains the result of an AppendRequest.

func (*AppendResult) Done

func (p *AppendResult) Done() <-chan struct{}

Done returns a channel that is closed when the result for the AppendRequest is available. Use Err() to check if the request succeeded or failed.

func (*AppendResult) Err

func (p *AppendResult) Err() error

Err returns a non-nil error if the append request failed, and nil if it succeeded. It should not be called until Done() is closed to avoid data races.

func (*AppendResult) SetDone

func (p *AppendResult) SetDone(err error)

SetDone closes the channel and sets the (optional) error.

type Config

type Config struct {
	// MaxAge is the maximum amount of time a segment can be buffered in memory
	// before it is moved to the pending list to be flushed. Increasing MaxAge
	// allows more time for a segment to grow to MaxSegmentSize, but may
	// increase latency if appends cannot fill segments quickly enough.
	MaxAge time.Duration

	// MaxSegments is the maximum number of segments that can be buffered in
	// memory. Increasing MaxSegments allows more data to be buffered, but may
	// increase latency if the incoming volume of data exceeds the rate at
	// which segments can be flushed.
	MaxSegments int64

	// MaxSegmentSize is the maximum size of an uncompressed segment in bytes.
	// It is not a strict limit, and segments can exceed the maximum size when
	// individual appends are larger than the remaining capacity.
	MaxSegmentSize int64
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager is a pool of in-memory segments. It keeps track of which segments are accepting writes and which are waiting to be flushed using two doubly linked lists called the available and pending lists.

By buffering segments in memory, the WAL can tolerate bursts of writes that arrive faster than can be flushed. The amount of data that can be buffered is configured using MaxSegments and MaxSegmentSize. You must use caution when configuring these to avoid excessive latency.

The WAL is full when all segments are waiting to be flushed or in the process of being flushed. When the WAL is full, subsequent appends fail with the transient error ErrFull, and will not succeed until one or more other segments have been flushed and returned to the available list. Callers should back off and retry at a later time.

On shutdown, the WAL must be closed to avoid losing data. This prevents additional appends to the WAL and allows all remaining segments to be flushed.

func NewManager

func NewManager(cfg Config, metrics *ManagerMetrics) (*Manager, error)

func (*Manager) Append

func (m *Manager) Append(r AppendRequest) (*AppendResult, error)

func (*Manager) Close

func (m *Manager) Close()

func (*Manager) NextPending

func (m *Manager) NextPending() (*PendingSegment, error)

NextPending returns the next segment to be flushed from the pending list. It returns nil if there are no segments waiting to be flushed. If the WAL is closed it returns all remaining segments from the pending list and then ErrClosed.

func (*Manager) Put

func (m *Manager) Put(s *PendingSegment)

Put resets the segment and puts it back in the available list to accept writes. A PendingSegment should not be put back until it has been flushed.

type ManagerMetrics

type ManagerMetrics struct {
	NumAvailable prometheus.Gauge
	NumFlushing  prometheus.Gauge
	NumPending   prometheus.Gauge
}

func NewManagerMetrics

func NewManagerMetrics(r prometheus.Registerer) *ManagerMetrics

type PendingSegment

type PendingSegment struct {
	Result *AppendResult
	Writer *SegmentWriter
}

PendingSegment contains a result and the segment to be flushed.

type SegmentMetrics

type SegmentMetrics struct {
	// contains filtered or unexported fields
}

func NewSegmentMetrics

func NewSegmentMetrics(r prometheus.Registerer) *SegmentMetrics

type SegmentReader

type SegmentReader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(b []byte) (*SegmentReader, error)

func (*SegmentReader) Series

func (r *SegmentReader) Series(ctx context.Context) (*SeriesIter, error)

func (*SegmentReader) Sizes

func (r *SegmentReader) Sizes() (Sizes, error)

type SegmentStats

type SegmentStats struct {
	// Age is the time between the first append and the flush.
	Age time.Duration
	// Idle is the time between the last append and the flush.
	Idle      time.Duration
	Streams   int
	Tenants   int
	Size      int64
	WriteSize int64
}

SegmentStats contains the stats for a SegmentWriter.

func GetSegmentStats

func GetSegmentStats(w *SegmentWriter, t time.Time) SegmentStats

GetSegmentStats returns the stats for a SegmentWriter. The age of a segment is calculated from t. WriteSize is zero if GetSegmentStats is called before SegmentWriter.WriteTo.

type SegmentWriter

type SegmentWriter struct {
	// contains filtered or unexported fields
}

func NewWalSegmentWriter

func NewWalSegmentWriter() (*SegmentWriter, error)

NewWalSegmentWriter creates a new WalSegmentWriter.

func (*SegmentWriter) Age

func (b *SegmentWriter) Age(now time.Time) time.Duration

Age returns the age of the segment.

func (*SegmentWriter) Append

func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels, entries []*logproto.Entry, now time.Time)

Labels are passed a string `{foo="bar",baz="qux"}` `{foo="foo",baz="foo"}`. labels.Labels => Symbols foo, baz , qux

func (*SegmentWriter) InputSize

func (b *SegmentWriter) InputSize() int64

InputSize returns the total size of the input data written to the writer. It doesn't account for timestamps and labels.

func (*SegmentWriter) Meta

func (*SegmentWriter) Reset

func (b *SegmentWriter) Reset()

Reset clears the writer. After calling Reset, the writer can be reused.

func (*SegmentWriter) WriteTo

func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error)

type SeriesIter

type SeriesIter struct {
	// contains filtered or unexported fields
}

func NewSeriesIter

func NewSeriesIter(ir *index.Reader, ps tsdbindex.Postings, blocks []byte) *SeriesIter

func (*SeriesIter) At

func (iter *SeriesIter) At() labels.Labels

func (*SeriesIter) ChunkReader

func (iter *SeriesIter) ChunkReader(_ *chunks.ChunkReader) (*chunks.ChunkReader, error)

func (*SeriesIter) Err

func (iter *SeriesIter) Err() error

func (*SeriesIter) Next

func (iter *SeriesIter) Next() bool

type Sizes

type Sizes struct {
	Index  int64
	Series []int64
}

Directories

Path Synopsis
Package chunks provides functionality for efficient storage and retrieval of log data and metrics.
Package chunks provides functionality for efficient storage and retrieval of log data and metrics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL