Documentation ¶
Index ¶
- Variables
- func ReportSegmentStats(s SegmentStats, m *SegmentMetrics)
- type AppendRequest
- type AppendResult
- type Config
- type Manager
- type ManagerMetrics
- type PendingSegment
- type SegmentMetrics
- type SegmentReader
- type SegmentStats
- type SegmentWriter
- func (b *SegmentWriter) Age(now time.Time) time.Duration
- func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels, entries []*logproto.Entry, ...)
- func (b *SegmentWriter) InputSize() int64
- func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta
- func (b *SegmentWriter) Reset()
- func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error)
- type SeriesIter
- type Sizes
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClosed is returned when the WAL is closed. It is a permanent error // as once closed, a WAL cannot be re-opened. ErrClosed = errors.New("WAL is closed") // ErrFull is returned when the WAL is full. It is a transient error that // happens when all segments are either in the pending list waiting to be // flushed or in the process of being flushed. ErrFull = errors.New("WAL is full") )
var (
Dir = "loki-v2/wal/anon/"
)
LOKW is the magic number for the Loki WAL format.
Functions ¶
func ReportSegmentStats ¶
func ReportSegmentStats(s SegmentStats, m *SegmentMetrics)
ReportSegmentStats reports the stats as metrics.
Types ¶
type AppendRequest ¶
type AppendResult ¶
type AppendResult struct {
// contains filtered or unexported fields
}
AppendResult contains the result of an AppendRequest.
func (*AppendResult) Done ¶
func (p *AppendResult) Done() <-chan struct{}
Done returns a channel that is closed when the result for the AppendRequest is available. Use Err() to check if the request succeeded or failed.
func (*AppendResult) Err ¶
func (p *AppendResult) Err() error
Err returns a non-nil error if the append request failed, and nil if it succeeded. It should not be called until Done() is closed to avoid data races.
func (*AppendResult) SetDone ¶
func (p *AppendResult) SetDone(err error)
SetDone closes the channel and sets the (optional) error.
type Config ¶
type Config struct { // MaxAge is the maximum amount of time a segment can be buffered in memory // before it is moved to the pending list to be flushed. Increasing MaxAge // allows more time for a segment to grow to MaxSegmentSize, but may // increase latency if appends cannot fill segments quickly enough. MaxAge time.Duration // MaxSegments is the maximum number of segments that can be buffered in // memory. Increasing MaxSegments allows more data to be buffered, but may // increase latency if the incoming volume of data exceeds the rate at // which segments can be flushed. MaxSegments int64 // MaxSegmentSize is the maximum size of an uncompressed segment in bytes. // It is not a strict limit, and segments can exceed the maximum size when // individual appends are larger than the remaining capacity. MaxSegmentSize int64 }
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager is a pool of in-memory segments. It keeps track of which segments are accepting writes and which are waiting to be flushed using two doubly linked lists called the available and pending lists.
By buffering segments in memory, the WAL can tolerate bursts of writes that arrive faster than can be flushed. The amount of data that can be buffered is configured using MaxSegments and MaxSegmentSize. You must use caution when configuring these to avoid excessive latency.
The WAL is full when all segments are waiting to be flushed or in the process of being flushed. When the WAL is full, subsequent appends fail with the transient error ErrFull, and will not succeed until one or more other segments have been flushed and returned to the available list. Callers should back off and retry at a later time.
On shutdown, the WAL must be closed to avoid losing data. This prevents additional appends to the WAL and allows all remaining segments to be flushed.
func NewManager ¶
func NewManager(cfg Config, metrics *ManagerMetrics) (*Manager, error)
func (*Manager) Append ¶
func (m *Manager) Append(r AppendRequest) (*AppendResult, error)
func (*Manager) NextPending ¶
func (m *Manager) NextPending() (*PendingSegment, error)
NextPending returns the next segment to be flushed from the pending list. It returns nil if there are no segments waiting to be flushed. If the WAL is closed it returns all remaining segments from the pending list and then ErrClosed.
func (*Manager) Put ¶
func (m *Manager) Put(s *PendingSegment)
Put resets the segment and puts it back in the available list to accept writes. A PendingSegment should not be put back until it has been flushed.
type ManagerMetrics ¶
type ManagerMetrics struct { NumAvailable prometheus.Gauge NumFlushing prometheus.Gauge NumPending prometheus.Gauge }
func NewManagerMetrics ¶
func NewManagerMetrics(r prometheus.Registerer) *ManagerMetrics
type PendingSegment ¶
type PendingSegment struct { Result *AppendResult Writer *SegmentWriter }
PendingSegment contains a result and the segment to be flushed.
type SegmentMetrics ¶
type SegmentMetrics struct {
// contains filtered or unexported fields
}
func NewSegmentMetrics ¶
func NewSegmentMetrics(r prometheus.Registerer) *SegmentMetrics
type SegmentReader ¶
type SegmentReader struct {
// contains filtered or unexported fields
}
func NewReader ¶
func NewReader(b []byte) (*SegmentReader, error)
func (*SegmentReader) Series ¶
func (r *SegmentReader) Series(ctx context.Context) (*SeriesIter, error)
func (*SegmentReader) Sizes ¶
func (r *SegmentReader) Sizes() (Sizes, error)
type SegmentStats ¶
type SegmentStats struct { // Age is the time between the first append and the flush. Age time.Duration // Idle is the time between the last append and the flush. Idle time.Duration Streams int Tenants int Size int64 WriteSize int64 }
SegmentStats contains the stats for a SegmentWriter.
func GetSegmentStats ¶
func GetSegmentStats(w *SegmentWriter, t time.Time) SegmentStats
GetSegmentStats returns the stats for a SegmentWriter. The age of a segment is calculated from t. WriteSize is zero if GetSegmentStats is called before SegmentWriter.WriteTo.
type SegmentWriter ¶
type SegmentWriter struct {
// contains filtered or unexported fields
}
func NewWalSegmentWriter ¶
func NewWalSegmentWriter() (*SegmentWriter, error)
NewWalSegmentWriter creates a new WalSegmentWriter.
func (*SegmentWriter) Age ¶
func (b *SegmentWriter) Age(now time.Time) time.Duration
Age returns the age of the segment.
func (*SegmentWriter) Append ¶
func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels, entries []*logproto.Entry, now time.Time)
Labels are passed a string `{foo="bar",baz="qux"}` `{foo="foo",baz="foo"}`. labels.Labels => Symbols foo, baz , qux
func (*SegmentWriter) InputSize ¶
func (b *SegmentWriter) InputSize() int64
InputSize returns the total size of the input data written to the writer. It doesn't account for timestamps and labels.
func (*SegmentWriter) Meta ¶
func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta
func (*SegmentWriter) Reset ¶
func (b *SegmentWriter) Reset()
Reset clears the writer. After calling Reset, the writer can be reused.
type SeriesIter ¶
type SeriesIter struct {
// contains filtered or unexported fields
}
func NewSeriesIter ¶
func (*SeriesIter) At ¶
func (iter *SeriesIter) At() labels.Labels
func (*SeriesIter) ChunkReader ¶
func (iter *SeriesIter) ChunkReader(_ *chunks.ChunkReader) (*chunks.ChunkReader, error)
func (*SeriesIter) Err ¶
func (iter *SeriesIter) Err() error
func (*SeriesIter) Next ¶
func (iter *SeriesIter) Next() bool