chunkenc

package
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: AGPL-3.0 Imports: 36 Imported by: 1

README

Chunk format

  |                 |             |
  | MagicNumber(4b) | version(1b) |
  |                 |             |
  --------------------------------------------------
  |         block-1 bytes         |  checksum (4b) |
  --------------------------------------------------
  |         block-2 bytes         |  checksum (4b) |
  --------------------------------------------------
  |         block-n bytes         |  checksum (4b) |
  --------------------------------------------------
  |         #blocks (uvarint)                      |
  --------------------------------------------------
  | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
  ------------------------------------------------------------------------------------------------
  | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
  ------------------------------------------------------------------------------------------------
  | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
  ------------------------------------------------------------------------------------------------
  | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | uncompressedSize (uvarint) |
  ------------------------------------------------------------------------------------------------
  |                      checksum(from #blocks)                     |
  -------------------------------------------------------------------
  | metasOffset - offset to the point with #blocks |
  --------------------------------------------------

Documentation

Index

Constants

View Source
const (
	ChunkFormatV1 byte
	ChunkFormatV2
	ChunkFormatV3
	ChunkFormatV4
)
View Source
const GzipLogChunk = chunk.Encoding(128)

GzipLogChunk is a cortex encoding type for our chunks. Deprecated: the chunk encoding/compression format is inside the chunk data.

View Source
const LogChunk = chunk.Encoding(129)

LogChunk is a cortex encoding type for our chunks.

Variables

View Source
var (
	ErrChunkFull       = errors.New("chunk full")
	ErrOutOfOrder      = errors.New("entry out of order")
	ErrInvalidSize     = errors.New("invalid size")
	ErrInvalidFlag     = errors.New("invalid flag")
	ErrInvalidChecksum = errors.New("invalid chunk checksum")
)

Errors returned by the chunk interface.

View Source
var (
	// Gzip is the gnu zip compression pool
	Gzip     = GzipPool{/* contains filtered or unexported fields */}
	Lz4_64k  = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_64k is the l4z compression pool, with 64k buffer size
	Lz4_256k = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_256k uses 256k buffer
	Lz4_1M   = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_1M uses 1M buffer
	Lz4_4M   = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_4M uses 4M buffer
	Flate    = FlatePool{}
	Zstd     = ZstdPool{}
	// Snappy is the snappy compression pool
	Snappy SnappyPool
	// Noop is the no compression pool
	Noop NoopPool

	// BytesBufferPool is a bytes buffer used for lines decompressed.
	// Buckets [0.5KB,1KB,2KB,4KB,8KB]
	BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })

	// LabelsPool is a matrix of bytes buffers used to store label names and values.
	// Buckets [8, 16, 32, 64, 128, 256].
	// Since we store label names and values, the number of labels we can store is the half the bucket size.
	// So we will be able to store from 0 to 128 labels.
	LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) })

	SymbolsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([]symbol, 0, size) })

	// SamplesPool pooling array of samples [512,1024,...,16k]
	SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) })

	// EncodeBufferPool is a pool used to binary encode.
	EncodeBufferPool = sync.Pool{
		New: func() interface{} {
			return &encbuf{
				b: make([]byte, 0, 256),
			}
		},
	}
)

Functions

func ErrTooFarBehind

func ErrTooFarBehind(entryTs, cutoff time.Time) error

func IsErrTooFarBehind

func IsErrTooFarBehind(err error) bool

func IsOutOfOrderErr

func IsOutOfOrderErr(err error) bool

func NewFacade

func NewFacade(c Chunk, blockSize, targetSize int) chunk.Data

NewFacade makes a new Facade.

func SupportedEncoding

func SupportedEncoding() string

SupportedEncoding returns the list of supported Encoding.

func UncompressedSize

func UncompressedSize(c chunk.Data) (int, bool)

UncompressedSize is a helper function to hide the type assertion kludge when wanting the uncompressed size of the Cortex interface encoding.Chunk.

Types

type Block

type Block interface {
	// MinTime is the minimum time of entries in the block
	MinTime() int64
	// MaxTime is the maximum time of entries in the block
	MaxTime() int64
	// Offset is the offset/position of the block in the chunk. Offset is unique for a given block per chunk.
	Offset() int
	// Entries is the amount of entries in the block.
	Entries() int
	// Iterator returns an entry iterator for the block.
	Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
	// SampleIterator returns a sample iterator for the block.
	SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
}

Block is a chunk block.

type Chunk

type Chunk interface {
	Bounds() (time.Time, time.Time)
	SpaceFor(*logproto.Entry) bool
	// Append returns true if the entry appended was a duplicate
	Append(*logproto.Entry) (bool, error)
	Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
	SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
	// Returns the list of blocks in the chunks.
	Blocks(mintT, maxtT time.Time) []Block
	// Size returns the number of entries in a chunk
	Size() int
	Bytes() ([]byte, error)
	BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation
	io.WriterTo
	BlockCount() int
	Utilization() float64
	UncompressedSize() int
	CompressedSize() int
	Close() error
	Encoding() Encoding
	Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
}

Chunk is the interface for the compressed logs chunk format.

func NewDumbChunk

func NewDumbChunk() Chunk

NewDumbChunk returns a new chunk that isn't very good.

type Encoding

type Encoding byte

Encoding is the identifier for a chunk encoding.

const (
	EncNone Encoding = iota
	EncGZIP
	EncDumb
	EncLZ4_64k
	EncSnappy
	EncLZ4_256k
	EncLZ4_1M
	EncLZ4_4M
	EncFlate
	EncZstd
)

The different available encodings. Make sure to preserve the order, as these numeric values are written to the chunks!

func ParseEncoding

func ParseEncoding(enc string) (Encoding, error)

ParseEncoding parses an chunk encoding (compression algorithm) by its name.

func (Encoding) String

func (e Encoding) String() string

type Facade

type Facade struct {
	chunk.Data
	// contains filtered or unexported fields
}

Facade for compatibility with cortex chunk type, so we can use its chunk store.

func (Facade) Bounds

func (f Facade) Bounds() (time.Time, time.Time)

func (Facade) Encoding

func (Facade) Encoding() chunk.Encoding

Encoding implements chunk.Chunk.

func (Facade) Entries

func (f Facade) Entries() int

func (Facade) LokiChunk

func (f Facade) LokiChunk() Chunk

LokiChunk returns the chunkenc.Chunk.

func (Facade) Marshal

func (f Facade) Marshal(w io.Writer) error

Marshal implements chunk.Chunk.

func (Facade) Rebound

func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error)

func (Facade) Size

func (f Facade) Size() int

Size implements encoding.Chunk, which unfortunately uses the Size method to refer to the byte size and not the entry count like chunkenc.Chunk does.

func (Facade) UncompressedSize

func (f Facade) UncompressedSize() int

func (*Facade) UnmarshalFromBuf

func (f *Facade) UnmarshalFromBuf(buf []byte) error

UnmarshalFromBuf implements chunk.Chunk.

func (Facade) Utilization

func (f Facade) Utilization() float64

Utilization implements encoding.Chunk.

type FlatePool

type FlatePool struct {
	// contains filtered or unexported fields
}

FlatePool is a flate compression pool

func (*FlatePool) GetReader

func (pool *FlatePool) GetReader(src io.Reader) (io.Reader, error)

GetReader gets or creates a new CompressionReader and reset it to read from src

func (*FlatePool) GetWriter

func (pool *FlatePool) GetWriter(dst io.Writer) io.WriteCloser

GetWriter gets or creates a new CompressionWriter and reset it to write to dst

func (*FlatePool) PutReader

func (pool *FlatePool) PutReader(reader io.Reader)

PutReader places back in the pool a CompressionReader

func (*FlatePool) PutWriter

func (pool *FlatePool) PutWriter(writer io.WriteCloser)

PutWriter places back in the pool a CompressionWriter

type GzipPool

type GzipPool struct {
	// contains filtered or unexported fields
}

GzipPool is a gun zip compression pool

func (*GzipPool) GetReader

func (pool *GzipPool) GetReader(src io.Reader) (io.Reader, error)

GetReader gets or creates a new CompressionReader and reset it to read from src

func (*GzipPool) GetWriter

func (pool *GzipPool) GetWriter(dst io.Writer) io.WriteCloser

GetWriter gets or creates a new CompressionWriter and reset it to write to dst

func (*GzipPool) PutReader

func (pool *GzipPool) PutReader(reader io.Reader)

PutReader places back in the pool a CompressionReader

func (*GzipPool) PutWriter

func (pool *GzipPool) PutWriter(writer io.WriteCloser)

PutWriter places back in the pool a CompressionWriter

type HeadBlock

type HeadBlock interface {
	IsEmpty() bool
	CheckpointTo(w io.Writer) error
	CheckpointBytes(b []byte) ([]byte, error)
	CheckpointSize() int
	LoadBytes(b []byte) error
	Serialise(pool WriterPool) ([]byte, error)
	Reset()
	Bounds() (mint, maxt int64)
	Entries() int
	UncompressedSize() int
	Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error)
	Append(int64, string, labels.Labels) (bool, error)
	Iterator(
		ctx context.Context,
		direction logproto.Direction,
		mint,
		maxt int64,
		pipeline log.StreamPipeline,
	) iter.EntryIterator
	SampleIterator(
		ctx context.Context,
		mint,
		maxt int64,
		extractor log.StreamSampleExtractor,
	) iter.SampleIterator
	Format() HeadBlockFmt
}

func HeadFromCheckpoint

func HeadFromCheckpoint(b []byte, desiredIfNotUnordered HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error)

HeadFromCheckpoint handles reading any head block format and returning the desired form. This is particularly helpful replaying WALs from different configurations such as after enabling unordered writes.

type HeadBlockFmt

type HeadBlockFmt byte
const (
	OrderedHeadBlockFmt HeadBlockFmt
	UnorderedHeadBlockFmt
	UnorderedWithStructuredMetadataHeadBlockFmt
)

func ChunkHeadFormatFor

func ChunkHeadFormatFor(chunkfmt byte) HeadBlockFmt

ChunkHeadFormatFor returns corresponding head block format for the given `chunkfmt`.

func (HeadBlockFmt) Byte

func (f HeadBlockFmt) Byte() byte

func (HeadBlockFmt) NewBlock

func (f HeadBlockFmt) NewBlock(symbolizer *symbolizer) HeadBlock

func (HeadBlockFmt) String

func (f HeadBlockFmt) String() string

type LZ4Pool

type LZ4Pool struct {
	// contains filtered or unexported fields
}

func (*LZ4Pool) GetReader

func (pool *LZ4Pool) GetReader(src io.Reader) (io.Reader, error)

GetReader gets or creates a new CompressionReader and reset it to read from src

func (*LZ4Pool) GetWriter

func (pool *LZ4Pool) GetWriter(dst io.Writer) io.WriteCloser

GetWriter gets or creates a new CompressionWriter and reset it to write to dst

func (*LZ4Pool) PutReader

func (pool *LZ4Pool) PutReader(reader io.Reader)

PutReader places back in the pool a CompressionReader

func (*LZ4Pool) PutWriter

func (pool *LZ4Pool) PutWriter(writer io.WriteCloser)

PutWriter places back in the pool a CompressionWriter

type MemChunk

type MemChunk struct {
	// contains filtered or unexported fields
}

MemChunk implements compressed log chunks.

func MemchunkFromCheckpoint

func MemchunkFromCheckpoint(chk, head []byte, desiredIfNotUnordered HeadBlockFmt, blockSize int, targetSize int) (*MemChunk, error)

func NewByteChunk

func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error)

NewByteChunk returns a MemChunk on the passed bytes.

func NewMemChunk

func NewMemChunk(chunkFormat byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk

NewMemChunk returns a new in-mem chunk.

func (*MemChunk) Append

func (c *MemChunk) Append(entry *logproto.Entry) (bool, error)

Append implements Chunk. The MemChunk may return true or false, depending on what the head block returns.

func (*MemChunk) BlockCount

func (c *MemChunk) BlockCount() int

BlockCount implements Chunk.

func (*MemChunk) Blocks

func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block

Blocks implements Chunk

func (*MemChunk) Bounds

func (c *MemChunk) Bounds() (fromT, toT time.Time)

Bounds implements Chunk.

func (*MemChunk) Bytes

func (c *MemChunk) Bytes() ([]byte, error)

Bytes implements Chunk. NOTE: Does not cut head block or include any head block data.

func (*MemChunk) BytesSize

func (c *MemChunk) BytesSize() int

BytesSize returns the raw size of the chunk. NOTE: This does not account for the head block nor include any head block data.

func (*MemChunk) BytesWith

func (c *MemChunk) BytesWith(b []byte) ([]byte, error)

BytesWith uses a provided []byte for buffer instantiation NOTE: This does not cut the head block nor include any head block data.

func (*MemChunk) CheckpointSize

func (c *MemChunk) CheckpointSize() (chunk, head int)

func (*MemChunk) Close

func (c *MemChunk) Close() error

Close implements Chunk. TODO: Fix this to check edge cases.

func (*MemChunk) CompressedSize

func (c *MemChunk) CompressedSize() int

CompressedSize implements Chunk.

func (*MemChunk) ConvertHead

func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error

func (*MemChunk) Encoding

func (c *MemChunk) Encoding() Encoding

Encoding implements Chunk.

func (*MemChunk) Iterator

func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)

Iterator implements Chunk.

func (*MemChunk) Rebound

func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error)

Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)

func (*MemChunk) SampleIterator

func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator

Iterator implements Chunk.

func (*MemChunk) SerializeForCheckpointTo

func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error

SerializeForCheckpointTo serialize the chunk & head into different `io.Writer` for checkpointing use. This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed. In turn this allows us to maintain a more effective dedupe ratio in storage.

func (*MemChunk) Size

func (c *MemChunk) Size() int

Size implements Chunk.

func (*MemChunk) SpaceFor

func (c *MemChunk) SpaceFor(e *logproto.Entry) bool

SpaceFor implements Chunk.

func (*MemChunk) UncompressedSize

func (c *MemChunk) UncompressedSize() int

UncompressedSize implements Chunk.

func (*MemChunk) Utilization

func (c *MemChunk) Utilization() float64

Utilization implements Chunk.

func (*MemChunk) WriteTo

func (c *MemChunk) WriteTo(w io.Writer) (int64, error)

type NoopPool

type NoopPool struct{}

func (*NoopPool) GetReader

func (pool *NoopPool) GetReader(src io.Reader) (io.Reader, error)

GetReader gets or creates a new CompressionReader and reset it to read from src

func (*NoopPool) GetWriter

func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser

GetWriter gets or creates a new CompressionWriter and reset it to write to dst

func (*NoopPool) PutReader

func (pool *NoopPool) PutReader(_ io.Reader)

PutReader places back in the pool a CompressionReader

func (*NoopPool) PutWriter

func (pool *NoopPool) PutWriter(_ io.WriteCloser)

PutWriter places back in the pool a CompressionWriter

type ReaderPool

type ReaderPool interface {
	GetReader(io.Reader) (io.Reader, error)
	PutReader(io.Reader)
}

ReaderPool similar to WriterPool but for reading chunks.

func GetReaderPool

func GetReaderPool(enc Encoding) ReaderPool

type SnappyPool

type SnappyPool struct {
	// contains filtered or unexported fields
}

func (*SnappyPool) GetReader

func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error)

GetReader gets or creates a new CompressionReader and reset it to read from src

func (*SnappyPool) GetWriter

func (pool *SnappyPool) GetWriter(dst io.Writer) io.WriteCloser

GetWriter gets or creates a new CompressionWriter and reset it to write to dst

func (*SnappyPool) PutReader

func (pool *SnappyPool) PutReader(reader io.Reader)

PutReader places back in the pool a CompressionReader

func (*SnappyPool) PutWriter

func (pool *SnappyPool) PutWriter(writer io.WriteCloser)

PutWriter places back in the pool a CompressionWriter

type WriterPool

type WriterPool interface {
	GetWriter(io.Writer) io.WriteCloser
	PutWriter(io.WriteCloser)
}

WriterPool is a pool of io.Writer This is used by every chunk to avoid unnecessary allocations.

func GetWriterPool

func GetWriterPool(enc Encoding) WriterPool

type ZstdPool

type ZstdPool struct {
	// contains filtered or unexported fields
}

GzipPool is a gun zip compression pool

func (*ZstdPool) GetReader

func (pool *ZstdPool) GetReader(src io.Reader) (io.Reader, error)

GetReader gets or creates a new CompressionReader and reset it to read from src

func (*ZstdPool) GetWriter

func (pool *ZstdPool) GetWriter(dst io.Writer) io.WriteCloser

GetWriter gets or creates a new CompressionWriter and reset it to write to dst

func (*ZstdPool) PutReader

func (pool *ZstdPool) PutReader(reader io.Reader)

PutReader places back in the pool a CompressionReader

func (*ZstdPool) PutWriter

func (pool *ZstdPool) PutWriter(writer io.WriteCloser)

PutWriter places back in the pool a CompressionWriter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL