Documentation ¶
Index ¶
- Constants
- Variables
- func NewFacade(c Chunk, blockSize, targetSize int) encoding.Chunk
- func SupportedEncoding() string
- func UncompressedSize(c encoding.Chunk) (int, bool)
- type Block
- type BufioReaderPool
- type Chunk
- type Encoding
- type Facade
- func (Facade) Encoding() encoding.Encoding
- func (f Facade) LokiChunk() Chunk
- func (f Facade) Marshal(w io.Writer) error
- func (f Facade) Rebound(start, end model.Time) (encoding.Chunk, error)
- func (f Facade) Size() int
- func (f *Facade) UnmarshalFromBuf(buf []byte) error
- func (f Facade) Utilization() float64
- type FlatePool
- type GzipPool
- type HeadBlock
- type HeadBlockFmt
- type LZ4Pool
- type MemChunk
- func (c *MemChunk) Append(entry *logproto.Entry) error
- func (c *MemChunk) BlockCount() int
- func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block
- func (c *MemChunk) Bounds() (fromT, toT time.Time)
- func (c *MemChunk) Bytes() ([]byte, error)
- func (c *MemChunk) BytesSize() int
- func (c *MemChunk) BytesWith(b []byte) ([]byte, error)
- func (c *MemChunk) CheckpointSize() (chunk, head int)
- func (c *MemChunk) Close() error
- func (c *MemChunk) CompressedSize() int
- func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
- func (c *MemChunk) Encoding() Encoding
- func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, ...) (iter.EntryIterator, error)
- func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error)
- func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, ...) iter.SampleIterator
- func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error
- func (c *MemChunk) Size() int
- func (c *MemChunk) SpaceFor(e *logproto.Entry) bool
- func (c *MemChunk) UncompressedSize() int
- func (c *MemChunk) Utilization() float64
- func (c *MemChunk) WriteTo(w io.Writer) (int64, error)
- type NoopPool
- type ReaderPool
- type SnappyPool
- type WriterPool
- type ZstdPool
Constants ¶
const (
DefaultChunkFormat = chunkFormatV3 // the currently used chunk format
)
const GzipLogChunk = encoding.Encoding(128)
GzipLogChunk is a cortex encoding type for our chunks. Deprecated: the chunk encoding/compression format is inside the chunk data.
const LogChunk = encoding.Encoding(129)
LogChunk is a cortex encoding type for our chunks.
Variables ¶
var ( ErrChunkFull = errors.New("chunk full") ErrOutOfOrder = errors.New("entry out of order") ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") ErrInvalidChecksum = errors.New("invalid chunk checksum") )
Errors returned by the chunk interface.
var ( // Gzip is the gnu zip compression pool Gzip = GzipPool{/* contains filtered or unexported fields */} Lz4_64k = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_64k is the l4z compression pool, with 64k buffer size Lz4_256k = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_256k uses 256k buffer Lz4_1M = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_1M uses 1M buffer Lz4_4M = LZ4Pool{/* contains filtered or unexported fields */} // Lz4_4M uses 4M buffer Flate = FlatePool{} Zstd = ZstdPool{} // Snappy is the snappy compression pool Snappy SnappyPool // Noop is the no compression pool Noop NoopPool // BufReaderPool is bufio.Reader pool BufReaderPool = &BufioReaderPool{ pool: sync.Pool{ New: func() interface{} { return bufio.NewReader(nil) }, }, } // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) // SamplesPool pooling array of samples [512,1024,...,16k] SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) }) // EncodeBufferPool is a pool used to binary encode. EncodeBufferPool = sync.Pool{ New: func() interface{} { return &encbuf{ b: make([]byte, 0, 256), } }, } )
var (
HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt}
)
Functions ¶
func SupportedEncoding ¶
func SupportedEncoding() string
SupportedEncoding returns the list of supported Encoding.
Types ¶
type Block ¶
type Block interface { // MinTime is the minimum time of entries in the block MinTime() int64 // MaxTime is the maximum time of entries in the block MaxTime() int64 // Offset is the offset/position of the block in the chunk. Offset is unique for a given block per chunk. Offset() int // Entries is the amount of entries in the block. Entries() int // Iterator returns an entry iterator for the block. Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator // SampleIterator returns a sample iterator for the block. SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator }
Block is a chunk block.
type BufioReaderPool ¶
type BufioReaderPool struct {
// contains filtered or unexported fields
}
BufioReaderPool is a bufio reader that uses sync.Pool.
func (*BufioReaderPool) Get ¶
func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader
Get returns a bufio.Reader which reads from r. The buffer size is that of the pool.
func (*BufioReaderPool) Put ¶
func (bufPool *BufioReaderPool) Put(b *bufio.Reader)
Put puts the bufio.Reader back into the pool.
type Chunk ¶
type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool Append(*logproto.Entry) error Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. Blocks(mintT, maxtT time.Time) []Block // Size returns the number of entries in a chunk Size() int Bytes() ([]byte, error) BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation io.WriterTo BlockCount() int Utilization() float64 UncompressedSize() int CompressedSize() int Close() error Encoding() Encoding Rebound(start, end time.Time) (Chunk, error) }
Chunk is the interface for the compressed logs chunk format.
func NewDumbChunk ¶
func NewDumbChunk() Chunk
NewDumbChunk returns a new chunk that isn't very good.
type Encoding ¶
type Encoding byte
Encoding is the identifier for a chunk encoding.
const ( EncNone Encoding = iota EncGZIP EncDumb EncLZ4_64k EncSnappy EncLZ4_256k EncLZ4_1M EncLZ4_4M EncFlate EncZstd )
The different available encodings. Make sure to preserve the order, as these numeric values are written to the chunks!
func ParseEncoding ¶
ParseEncoding parses an chunk encoding (compression algorithm) by its name.
type Facade ¶
Facade for compatibility with cortex chunk type, so we can use its chunk store.
func (*Facade) UnmarshalFromBuf ¶
UnmarshalFromBuf implements encoding.Chunk.
func (Facade) Utilization ¶
Utilization implements encoding.Chunk.
type FlatePool ¶
type FlatePool struct {
// contains filtered or unexported fields
}
FlatePool is a flate compression pool
func (*FlatePool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*FlatePool) GetWriter ¶
func (pool *FlatePool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*FlatePool) PutWriter ¶
func (pool *FlatePool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type GzipPool ¶
type GzipPool struct {
// contains filtered or unexported fields
}
GzipPool is a gun zip compression pool
func (*GzipPool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*GzipPool) GetWriter ¶
func (pool *GzipPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*GzipPool) PutWriter ¶
func (pool *GzipPool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type HeadBlock ¶
type HeadBlock interface { IsEmpty() bool CheckpointTo(w io.Writer) error CheckpointBytes(b []byte) ([]byte, error) CheckpointSize() int LoadBytes(b []byte) error Serialise(pool WriterPool) ([]byte, error) Reset() Bounds() (mint, maxt int64) Entries() int UncompressedSize() int Convert(HeadBlockFmt) (HeadBlock, error) Append(int64, string) error Iterator( ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline, ) iter.EntryIterator SampleIterator( ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor, ) iter.SampleIterator Format() HeadBlockFmt }
func HeadFromCheckpoint ¶
func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error)
HeadFromCheckpoint handles reading any head block format and returning the desired form. This is particularly helpful replaying WALs from different configurations such as after enabling unordered writes.
type HeadBlockFmt ¶
type HeadBlockFmt byte
const ( OrderedHeadBlockFmt HeadBlockFmt UnorderedHeadBlockFmt )
func (HeadBlockFmt) Byte ¶
func (f HeadBlockFmt) Byte() byte
func (HeadBlockFmt) NewBlock ¶
func (f HeadBlockFmt) NewBlock() HeadBlock
func (HeadBlockFmt) String ¶
func (f HeadBlockFmt) String() string
type LZ4Pool ¶
type LZ4Pool struct {
// contains filtered or unexported fields
}
func (*LZ4Pool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*LZ4Pool) GetWriter ¶
func (pool *LZ4Pool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*LZ4Pool) PutWriter ¶
func (pool *LZ4Pool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type MemChunk ¶
type MemChunk struct {
// contains filtered or unexported fields
}
MemChunk implements compressed log chunks.
func MemchunkFromCheckpoint ¶
func NewByteChunk ¶
NewByteChunk returns a MemChunk on the passed bytes.
func NewMemChunk ¶
func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk
NewMemChunk returns a new in-mem chunk.
func (*MemChunk) Bytes ¶
Bytes implements Chunk. NOTE: Does not cut head block or include any head block data.
func (*MemChunk) BytesSize ¶
BytesSize returns the raw size of the chunk. NOTE: This does not account for the head block nor include any head block data.
func (*MemChunk) BytesWith ¶
BytesWith uses a provided []byte for buffer instantiation NOTE: This does not cut the head block nor include any head block data.
func (*MemChunk) CheckpointSize ¶
func (*MemChunk) CompressedSize ¶
CompressedSize implements Chunk.
func (*MemChunk) ConvertHead ¶
func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
func (*MemChunk) Iterator ¶
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
Iterator implements Chunk.
func (*MemChunk) Rebound ¶
Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (*MemChunk) SampleIterator ¶
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
Iterator implements Chunk.
func (*MemChunk) SerializeForCheckpointTo ¶
SerializeForCheckpointTo serialize the chunk & head into different `io.Writer` for checkpointing use. This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed. In turn this allows us to maintain a more effective dedupe ratio in storage.
func (*MemChunk) UncompressedSize ¶
UncompressedSize implements Chunk.
func (*MemChunk) Utilization ¶
Utilization implements Chunk.
func (*MemChunk) WriteTo ¶
WriteTo Implements io.WriterTo NOTE: Does not cut head block or include any head block data. For this to be the case you must call Close() first. This decision notably enables WAL checkpointing, which would otherwise result in different content addressable chunks in storage based on the timing of when they were checkpointed (which would cause new blocks to be cut early).
type NoopPool ¶
type NoopPool struct{}
func (*NoopPool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*NoopPool) GetWriter ¶
func (pool *NoopPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*NoopPool) PutWriter ¶
func (pool *NoopPool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type ReaderPool ¶
ReaderPool similar to WriterPool but for reading chunks.
type SnappyPool ¶
type SnappyPool struct {
// contains filtered or unexported fields
}
func (*SnappyPool) GetReader ¶
func (pool *SnappyPool) GetReader(src io.Reader) io.Reader
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*SnappyPool) GetWriter ¶
func (pool *SnappyPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*SnappyPool) PutReader ¶
func (pool *SnappyPool) PutReader(reader io.Reader)
PutReader places back in the pool a CompressionReader
func (*SnappyPool) PutWriter ¶
func (pool *SnappyPool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter
type WriterPool ¶
type WriterPool interface { GetWriter(io.Writer) io.WriteCloser PutWriter(io.WriteCloser) }
WriterPool is a pool of io.Writer This is used by every chunk to avoid unnecessary allocations.
type ZstdPool ¶
type ZstdPool struct {
// contains filtered or unexported fields
}
GzipPool is a gun zip compression pool
func (*ZstdPool) GetReader ¶
GetReader gets or creates a new CompressionReader and reset it to read from src
func (*ZstdPool) GetWriter ¶
func (pool *ZstdPool) GetWriter(dst io.Writer) io.WriteCloser
GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (*ZstdPool) PutWriter ¶
func (pool *ZstdPool) PutWriter(writer io.WriteCloser)
PutWriter places back in the pool a CompressionWriter