Documentation ¶
Index ¶
- Constants
- Variables
- func ErrTooFarBehind(entryTs, cutoff time.Time) error
- func IsErrTooFarBehind(err error) bool
- func IsOutOfOrderErr(err error) bool
- func NewFacade(c Chunk, blockSize, targetSize int) chunk.Data
- func SupportedEncoding() string
- func UncompressedSize(c chunk.Data) (int, bool)
- type Block
- type Chunk
- type Encoding
- type Facade
- func (f Facade) Bounds() (time.Time, time.Time)
- func (Facade) Encoding() chunk.Encoding
- func (f Facade) Entries() int
- func (f Facade) LokiChunk() Chunk
- func (f Facade) Marshal(w io.Writer) error
- func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error)
- func (f Facade) Size() int
- func (f Facade) UncompressedSize() int
- func (f *Facade) UnmarshalFromBuf(buf []byte) error
- func (f Facade) Utilization() float64
- type FlatePool
- type GzipPool
- type HeadBlock
- type HeadBlockFmt
- type LZ4Pool
- type MemChunk
- func (c *MemChunk) Append(entry *logproto.Entry) (bool, error)
- func (c *MemChunk) BlockCount() int
- func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block
- func (c *MemChunk) Bounds() (fromT, toT time.Time)
- func (c *MemChunk) Bytes() ([]byte, error)
- func (c *MemChunk) BytesSize() int
- func (c *MemChunk) BytesWith(b []byte) ([]byte, error)
- func (c *MemChunk) CheckpointSize() (chunk, head int)
- func (c *MemChunk) Close() error
- func (c *MemChunk) CompressedSize() int
- func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
- func (c *MemChunk) Encoding() Encoding
- func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, ...) (iter.EntryIterator, error)
- func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
- func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, ...) iter.SampleIterator
- func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error
- func (c *MemChunk) Size() int
- func (c *MemChunk) SpaceFor(e *logproto.Entry) bool
- func (c *MemChunk) UncompressedSize() int
- func (c *MemChunk) Utilization() float64
- func (c *MemChunk) WriteTo(w io.Writer) (int64, error)
- type NoopPool
- type ReaderPool
- type SnappyPool
- type WriterPool
- type ZstdPool
Constants ¶
const ( ChunkFormatV1 byte ChunkFormatV2 ChunkFormatV3 ChunkFormatV4 )
const GzipLogChunk = chunk.Encoding(128)
GzipLogChunk is a cortex encoding type for our chunks. Deprecated: the chunk encoding/compression format is inside the chunk data.
const LogChunk = chunk.Encoding(129)
LogChunk is a cortex encoding type for our chunks.
Variables ¶
var ( ErrChunkFull = errors.New("chunk full") ErrOutOfOrder = errors.New("entry out of order") ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") ErrInvalidChecksum = errors.New("invalid chunk checksum") )
Errors returned by the chunk interface.
var ( // Gzip is the gnu zip compression pool Gzip = GzipPool{/* contains filtered or unexported fields */} Lz4_64k = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_64k is the l4z compression pool, with 64k buffer size Lz4_256k = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_256k uses 256k buffer Lz4_1M = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_1M uses 1M buffer Lz4_4M = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_4M uses 4M buffer Flate = FlatePool{} Zstd = ZstdPool{} // Snappy is the snappy compression pool Snappy SnappyPool // Noop is the no compression pool Noop NoopPool // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) // LabelsPool is a matrix of bytes buffers used to store label names and values. // Buckets [8, 16, 32, 64, 128, 256]. // Since we store label names and values, the number of labels we can store is the half the bucket size. // So we will be able to store from 0 to 128 labels. LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) }) SymbolsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([]symbol, 0, size) }) // SamplesPool pooling array of samples [512,1024,...,16k] SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) }) // EncodeBufferPool is a pool used to binary encode. EncodeBufferPool = sync.Pool{ New: func() interface{} { return &encbuf{ b: make([]byte, 0, 256), } }, } )
var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithStructuredMetadataHeadBlockFmt}
Functions ¶
func ErrTooFarBehind ¶
func IsErrTooFarBehind ¶
func IsOutOfOrderErr ¶
func SupportedEncoding ¶
func SupportedEncoding() string
SupportedEncoding returns the list of supported Encoding.
Types ¶
type Block ¶
type Block interface { // MinTime is the minimum time of entries in the block MinTime() int64 // MaxTime is the maximum time of entries in the block MaxTime() int64 // Offset is the offset/position of the block in the chunk. Offset is unique for a given block per chunk. Offset() int // Entries is the amount of entries in the block. Entries() int // Iterator returns an entry iterator for the block. Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator // SampleIterator returns a sample iterator for the block. SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator }
Block is a chunk block.
type Chunk ¶
type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool // Append returns true if the entry appended was a duplicate Append(*logproto.Entry) (bool, error) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. Blocks(mintT, maxtT time.Time) []Block // Size returns the number of entries in a chunk Size() int Bytes() ([]byte, error) BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation io.WriterTo BlockCount() int Utilization() float64 UncompressedSize() int CompressedSize() int Close() error Encoding() Encoding Rebound(start, end time.Time, filter filter.Func) (Chunk, error) }
Chunk is the interface for the compressed logs chunk format.
func NewDumbChunk ¶
func NewDumbChunk() Chunk
NewDumbChunk returns a new chunk that isn't very good.
type Encoding ¶
type Encoding byte
Encoding is the identifier for a chunk encoding.
const ( EncNone Encoding = iota EncGZIP EncDumb EncLZ4_64k EncSnappy EncLZ4_256k EncLZ4_1M EncLZ4_4M EncFlate EncZstd )
The different available encodings. Make sure to preserve the order, as these numeric values are written to the chunks!
func ParseEncoding ¶
ParseEncoding parses an chunk encoding (compression algorithm) by its name.
type Facade ¶
Facade for compatibility with cortex chunk type, so we can use its chunk store.
func (Facade) Size ¶
Size implements encoding.Chunk, which unfortunately uses the Size method to refer to the byte size and not the entry count like chunkenc.Chunk does.
func (Facade) UncompressedSize ¶
func (*Facade) UnmarshalFromBuf ¶
UnmarshalFromBuf implements chunk.Chunk.
func (Facade) Utilization ¶
Utilization implements encoding.Chunk.
type FlatePool ¶
type FlatePool struct {
// contains filtered or unexported fields
}
FlatePool is a flate compression pool
func (*FlatePool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*FlatePool) GetWriter ¶
func (pool *FlatePool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*FlatePool) PutWriter ¶
func (pool *FlatePool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type GzipPool ¶
type GzipPool struct {
// contains filtered or unexported fields
}
GzipPool is a gun zip compression pool
func (*GzipPool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*GzipPool) GetWriter ¶
func (pool *GzipPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*GzipPool) PutWriter ¶
func (pool *GzipPool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type HeadBlock ¶
type HeadBlock interface { IsEmpty() bool CheckpointTo(w io.Writer) error CheckpointBytes(b []byte) ([]byte, error) CheckpointSize() int LoadBytes(b []byte) error Serialise(pool WriterPool) ([]byte, error) Reset() Bounds() (mint, maxt int64) Entries() int UncompressedSize() int Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error) Append(int64, string, labels.Labels) (bool, error) Iterator( ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline, ) iter.EntryIterator SampleIterator( ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor, ) iter.SampleIterator Format() HeadBlockFmt }
func HeadFromCheckpoint ¶
func HeadFromCheckpoint(b []byte, desiredIfNotUnordered HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error)
HeadFromCheckpoint handles reading any head block format and returning the desired form. This is particularly helpful replaying WALs from different configurations such as after enabling unordered writes.
type HeadBlockFmt ¶
type HeadBlockFmt byte
const ( OrderedHeadBlockFmt HeadBlockFmt UnorderedHeadBlockFmt UnorderedWithStructuredMetadataHeadBlockFmt )
func ChunkHeadFormatFor ¶
func ChunkHeadFormatFor(chunkfmt byte) HeadBlockFmt
ChunkHeadFormatFor returns corresponding head block format for the given `chunkfmt`.
func (HeadBlockFmt) Byte ¶
func (f HeadBlockFmt) Byte() byte
func (HeadBlockFmt) NewBlock ¶
func (f HeadBlockFmt) NewBlock(symbolizer *symbolizer) HeadBlock
func (HeadBlockFmt) String ¶
func (f HeadBlockFmt) String() string
type LZ4Pool ¶
type LZ4Pool struct {
// contains filtered or unexported fields
}
func (*LZ4Pool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*LZ4Pool) GetWriter ¶
func (pool *LZ4Pool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*LZ4Pool) PutWriter ¶
func (pool *LZ4Pool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type MemChunk ¶
type MemChunk struct {
// contains filtered or unexported fields
}
MemChunk implements compressed log chunks.
func MemchunkFromCheckpoint ¶
func NewByteChunk ¶
NewByteChunk returns a MemChunk on the passed bytes.
func NewMemChunk ¶
func NewMemChunk(chunkFormat byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk
NewMemChunk returns a new in-mem chunk.
func (*MemChunk) Append ¶
Append implements Chunk. The MemChunk may return true or false, depending on what the head block returns.
func (*MemChunk) Bytes ¶
Bytes implements Chunk. NOTE: Does not cut head block or include any head block data.
func (*MemChunk) BytesSize ¶
BytesSize returns the raw size of the chunk. NOTE: This does not account for the head block nor include any head block data.
func (*MemChunk) BytesWith ¶
BytesWith uses a provided []byte for buffer instantiation NOTE: This does not cut the head block nor include any head block data.
func (*MemChunk) CheckpointSize ¶
func (*MemChunk) CompressedSize ¶
CompressedSize implements Chunk.
func (*MemChunk) ConvertHead ¶
func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
func (*MemChunk) Iterator ¶
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
Iterator implements Chunk.
func (*MemChunk) Rebound ¶
Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (*MemChunk) SampleIterator ¶
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
Iterator implements Chunk.
func (*MemChunk) SerializeForCheckpointTo ¶
SerializeForCheckpointTo serialize the chunk & head into different `io.Writer` for checkpointing use. This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed. In turn this allows us to maintain a more effective dedupe ratio in storage.
func (*MemChunk) UncompressedSize ¶
UncompressedSize implements Chunk.
func (*MemChunk) Utilization ¶
Utilization implements Chunk.
type NoopPool ¶
type NoopPool struct{}
func (*NoopPool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*NoopPool) GetWriter ¶
func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*NoopPool) PutWriter ¶
func (pool *NoopPool) PutWriter(_ io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type ReaderPool ¶
ReaderPool similar to WriterPool but for reading chunks.
func GetReaderPool ¶
func GetReaderPool(enc Encoding) ReaderPool
type SnappyPool ¶
type SnappyPool struct {
// contains filtered or unexported fields
}
func (*SnappyPool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*SnappyPool) GetWriter ¶
func (pool *SnappyPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*SnappyPool) PutReader ¶
func (pool *SnappyPool) PutReader(reader io.Reader)
PutReader places back in the pool a CompressionReader
func (*SnappyPool) PutWriter ¶
func (pool *SnappyPool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type WriterPool ¶
type WriterPool interface { GetWriter(io.Writer) io.WriteCloser PutWriter(io.WriteCloser) }
WriterPool is a pool of io.Writer This is used by every chunk to avoid unnecessary allocations.
func GetWriterPool ¶
func GetWriterPool(enc Encoding) WriterPool
type ZstdPool ¶
type ZstdPool struct {
// contains filtered or unexported fields
}
GzipPool is a gun zip compression pool
func (*ZstdPool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*ZstdPool) GetWriter ¶
func (pool *ZstdPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*ZstdPool) PutWriter ¶
func (pool *ZstdPool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter