Documentation ¶
Index ¶
- Constants
- Variables
- func ErrTooFarBehind(entryTs, cutoff time.Time) error
- func IsErrTooFarBehind(err error) bool
- func IsOutOfOrderErr(err error) bool
- func NewFacade(c Chunk, blockSize, targetSize int) chunk.Data
- func UncompressedSize(c chunk.Data) (int, bool)
- type Block
- type Chunk
- type Facade
- func (f Facade) Bounds() (time.Time, time.Time)
- func (Facade) Encoding() chunk.Encoding
- func (f Facade) Entries() int
- func (f Facade) LokiChunk() Chunk
- func (f Facade) Marshal(w io.Writer) error
- func (f Facade) Rebound(start, end model.Time, filter filter.Func) (chunk.Data, error)
- func (f Facade) Size() int
- func (f Facade) UncompressedSize() int
- func (f *Facade) UnmarshalFromBuf(buf []byte) error
- func (f Facade) Utilization() float64
- type HeadBlock
- type HeadBlockFmt
- type MemChunk
- func (c *MemChunk) Append(entry *logproto.Entry) (bool, error)
- func (c *MemChunk) BlockCount() int
- func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block
- func (c *MemChunk) Bounds() (fromT, toT time.Time)
- func (c *MemChunk) Bytes() ([]byte, error)
- func (c *MemChunk) BytesSize() int
- func (c *MemChunk) BytesWith(b []byte) ([]byte, error)
- func (c *MemChunk) CheckpointSize() (chunk, head int)
- func (c *MemChunk) Close() error
- func (c *MemChunk) CompressedSize() int
- func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
- func (c *MemChunk) Encoding() compression.Codec
- func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, ...) (iter.EntryIterator, error)
- func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error)
- func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, ...) iter.SampleIterator
- func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error
- func (c *MemChunk) Size() int
- func (c *MemChunk) SpaceFor(e *logproto.Entry) bool
- func (c *MemChunk) UncompressedSize() int
- func (c *MemChunk) Utilization() float64
- func (c *MemChunk) WriteTo(w io.Writer) (int64, error)
Constants ¶
const ( ChunkFormatV1 byte ChunkFormatV2 ChunkFormatV3 ChunkFormatV4 )
const GzipLogChunk = chunk.Encoding(128)
GzipLogChunk is a cortex encoding type for our chunks. Deprecated: the chunk encoding/compression format is inside the chunk data.
const LogChunk = chunk.Encoding(129)
LogChunk is a cortex encoding type for our chunks.
Variables ¶
var ( ErrChunkFull = errors.New("chunk full") ErrOutOfOrder = errors.New("entry out of order") ErrInvalidSize = errors.New("invalid size") ErrInvalidFlag = errors.New("invalid flag") ErrInvalidChecksum = errors.New("invalid chunk checksum") )
Errors returned by the chunk interface.
var ( // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) // LabelsPool is a matrix of bytes buffers used to store label names and values. // Buckets [8, 16, 32, 64, 128, 256]. // Since we store label names and values, the number of labels we can store is the half the bucket size. // So we will be able to store from 0 to 128 labels. LabelsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([][]byte, 0, size) }) SymbolsPool = pool.New(1<<3, 1<<8, 2, func(size int) interface{} { return make([]symbol, 0, size) }) // SamplesPool pooling array of samples [512,1024,...,16k] SamplesPool = pool.New(1<<9, 1<<14, 2, func(size int) interface{} { return make([]logproto.Sample, 0, size) }) // EncodeBufferPool is a pool used to binary encode. EncodeBufferPool = sync.Pool{ New: func() interface{} { return &encbuf{ b: make([]byte, 0, 256), } }, } )
var HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt, UnorderedWithStructuredMetadataHeadBlockFmt}
Functions ¶
func ErrTooFarBehind ¶
func IsErrTooFarBehind ¶
func IsOutOfOrderErr ¶
Types ¶
type Block ¶
type Block interface { // MinTime is the minimum time of entries in the block MinTime() int64 // MaxTime is the maximum time of entries in the block MaxTime() int64 // Offset is the offset/position of the block in the chunk. Offset is unique for a given block per chunk. Offset() int // Entries is the amount of entries in the block. Entries() int // Iterator returns an entry iterator for the block. Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator // SampleIterator returns a sample iterator for the block. SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator }
Block is a chunk block.
type Chunk ¶
type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool // Append returns true if the entry appended was a duplicate Append(*logproto.Entry) (bool, error) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. Blocks(mintT, maxtT time.Time) []Block // Size returns the number of entries in a chunk Size() int Bytes() ([]byte, error) BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation io.WriterTo BlockCount() int Utilization() float64 UncompressedSize() int CompressedSize() int Close() error Encoding() compression.Codec Rebound(start, end time.Time, filter filter.Func) (Chunk, error) }
Chunk is the interface for the compressed logs chunk format.
func NewDumbChunk ¶
func NewDumbChunk() Chunk
NewDumbChunk returns a new chunk that isn't very good.
type Facade ¶
Facade for compatibility with cortex chunk type, so we can use its chunk store.
func (Facade) Size ¶
Size implements encoding.Chunk, which unfortunately uses the Size method to refer to the byte size and not the entry count like chunkenc.Chunk does.
func (Facade) UncompressedSize ¶
func (*Facade) UnmarshalFromBuf ¶
UnmarshalFromBuf implements chunk.Chunk.
func (Facade) Utilization ¶
Utilization implements encoding.Chunk.
type HeadBlock ¶
type HeadBlock interface { IsEmpty() bool CheckpointTo(w io.Writer) error CheckpointBytes(b []byte) ([]byte, error) CheckpointSize() int LoadBytes(b []byte) error Serialise(pool compression.WriterPool) ([]byte, error) Reset() Bounds() (mint, maxt int64) Entries() int UncompressedSize() int Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error) Append(int64, string, labels.Labels) (bool, error) Iterator( ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline, ) iter.EntryIterator SampleIterator( ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor, ) iter.SampleIterator Format() HeadBlockFmt }
func HeadFromCheckpoint ¶
func HeadFromCheckpoint(b []byte, desiredIfNotUnordered HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error)
HeadFromCheckpoint handles reading any head block format and returning the desired form. This is particularly helpful replaying WALs from different configurations such as after enabling unordered writes.
type HeadBlockFmt ¶
type HeadBlockFmt byte
const ( OrderedHeadBlockFmt HeadBlockFmt UnorderedHeadBlockFmt UnorderedWithStructuredMetadataHeadBlockFmt )
func ChunkHeadFormatFor ¶
func ChunkHeadFormatFor(chunkfmt byte) HeadBlockFmt
ChunkHeadFormatFor returns corresponding head block format for the given `chunkfmt`.
func (HeadBlockFmt) Byte ¶
func (f HeadBlockFmt) Byte() byte
func (HeadBlockFmt) NewBlock ¶
func (f HeadBlockFmt) NewBlock(symbolizer *symbolizer) HeadBlock
func (HeadBlockFmt) String ¶
func (f HeadBlockFmt) String() string
type MemChunk ¶
type MemChunk struct {
// contains filtered or unexported fields
}
MemChunk implements compressed log chunks.
func MemchunkFromCheckpoint ¶
func NewByteChunk ¶
NewByteChunk returns a MemChunk on the passed bytes.
func NewMemChunk ¶
func NewMemChunk(chunkFormat byte, enc compression.Codec, head HeadBlockFmt, blockSize, targetSize int) *MemChunk
NewMemChunk returns a new in-mem chunk.
func (*MemChunk) Append ¶
Append implements Chunk. The MemChunk may return true or false, depending on what the head block returns.
func (*MemChunk) Bytes ¶
Bytes implements Chunk. NOTE: Does not cut head block or include any head block data.
func (*MemChunk) BytesSize ¶
BytesSize returns the raw size of the chunk. NOTE: This does not account for the head block nor include any head block data.
func (*MemChunk) BytesWith ¶
BytesWith uses a provided []byte for buffer instantiation NOTE: This does not cut the head block nor include any head block data.
func (*MemChunk) CheckpointSize ¶
func (*MemChunk) CompressedSize ¶
CompressedSize implements Chunk.
func (*MemChunk) ConvertHead ¶
func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error
func (*MemChunk) Encoding ¶
func (c *MemChunk) Encoding() compression.Codec
Encoding implements Chunk.
func (*MemChunk) Iterator ¶
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
Iterator implements Chunk.
func (*MemChunk) Rebound ¶
Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (*MemChunk) SampleIterator ¶
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
Iterator implements Chunk.
func (*MemChunk) SerializeForCheckpointTo ¶
SerializeForCheckpointTo serialize the chunk & head into different `io.Writer` for checkpointing use. This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed. In turn this allows us to maintain a more effective dedupe ratio in storage.
func (*MemChunk) UncompressedSize ¶
UncompressedSize implements Chunk.
func (*MemChunk) Utilization ¶
Utilization implements Chunk.