Documentation ¶
Index ¶
- Constants
- func DeleteCheckpoints(dir string, maxIndex int) error
- func LastCheckpoint(dir string) (string, int, error)
- func NewSegmentBufReader(segs ...*Segment) *segmentBufReader
- func NewSegmentBufReaderWithOffset(offset int, segs ...*Segment) (sbr *segmentBufReader, err error)
- func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error)
- func NewSegmentsReader(dir string) (io.ReadCloser, error)
- func SegmentName(dir string, i int) string
- func Segments(wlDir string) (first, last int, err error)
- type CheckpointStats
- type CorruptionErr
- type LiveReader
- type LiveReaderMetrics
- type Reader
- type Segment
- type SegmentFile
- type SegmentRange
- type WL
- func (w *WL) Close() (err error)
- func (w *WL) CompressionEnabled() bool
- func (w *WL) Dir() string
- func (w *WL) LastSegmentAndOffset() (seg, offset int, err error)
- func (w *WL) Log(recs ...[]byte) error
- func (w *WL) NextSegment() (int, error)
- func (w *WL) NextSegmentSync() (int, error)
- func (w *WL) Repair(origErr error) error
- func (w *WL) Size() (int64, error)
- func (w *WL) Sync() error
- func (w *WL) Truncate(i int) (err error)
- type Watcher
- type WatcherMetrics
- type WriteTo
Constants ¶
const ( DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB WblDirName = "wbl" )
Variables ¶
This section is empty.
Functions ¶
func DeleteCheckpoints ¶
DeleteCheckpoints deletes all checkpoints in a directory below a given index.
func LastCheckpoint ¶
LastCheckpoint returns the directory name and index of the most recent checkpoint. If dir does not contain any checkpoints, ErrNotFound is returned.
func NewSegmentBufReader ¶
func NewSegmentBufReader(segs ...*Segment) *segmentBufReader
nolint:revive // TODO: Consider exporting segmentBufReader
func NewSegmentBufReaderWithOffset ¶
nolint:revive
func NewSegmentsRangeReader ¶
func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error)
NewSegmentsRangeReader returns a new reader over the given WAL segment ranges. If first or last are -1, the range is open on the respective end.
func NewSegmentsReader ¶
func NewSegmentsReader(dir string) (io.ReadCloser, error)
NewSegmentsReader returns a new reader over all segments in the directory.
func SegmentName ¶
SegmentName builds a segment name for the directory.
Types ¶
type CheckpointStats ¶
type CheckpointStats struct { DroppedSeries int DroppedSamples int // Includes histograms. DroppedTombstones int DroppedExemplars int DroppedMetadata int TotalSeries int // Processed series including dropped ones. TotalSamples int // Processed float and histogram samples including dropped ones. TotalTombstones int // Processed tombstones including dropped ones. TotalExemplars int // Processed exemplars including dropped ones. TotalMetadata int // Processed metadata including dropped ones. }
CheckpointStats returns stats about a created checkpoint.
func Checkpoint ¶
func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.HeadSeriesRef) bool, mint int64) (*CheckpointStats, error)
Checkpoint creates a compacted checkpoint of segments in range [from, to] in the given WAL. It includes the most recent checkpoint if it exists. All series not satisfying keep, samples/tombstones/exemplars below mint and metadata that are not the latest are dropped.
The checkpoint is stored in a directory named checkpoint.N in the same segmented format as the original WAL itself. This makes it easy to read it through the WAL package and concatenate it with the original WAL.
type CorruptionErr ¶
CorruptionErr is an error that's returned when corruption is encountered.
func (*CorruptionErr) Error ¶
func (e *CorruptionErr) Error() string
type LiveReader ¶
type LiveReader struct {
// contains filtered or unexported fields
}
LiveReader reads WAL records from an io.Reader. It allows reading of WALs that are still in the process of being written, and returns records as soon as they can be read.
func NewLiveReader ¶
func NewLiveReader(logger log.Logger, metrics *LiveReaderMetrics, r io.Reader) *LiveReader
NewLiveReader returns a new live reader.
func (*LiveReader) Err ¶
func (r *LiveReader) Err() error
Err returns any errors encountered reading the WAL. io.EOFs are not terminal and Next can be tried again. Non-EOFs are terminal, and the reader should not be used again. It is up to the user to decide when to stop trying should io.EOF be returned.
func (*LiveReader) Next ¶
func (r *LiveReader) Next() bool
Next returns true if Record() will contain a full record. If Next returns false, you should always checked the contents of Error(). Return false guarantees there are no more records if the segment is closed and not corrupt, otherwise if Err() == io.EOF you should try again when more data has been written.
func (*LiveReader) Offset ¶
func (r *LiveReader) Offset() int64
Offset returns the number of bytes consumed from this segment.
func (*LiveReader) Record ¶
func (r *LiveReader) Record() []byte
Record returns the current record. The returned byte slice is only valid until the next call to Next.
type LiveReaderMetrics ¶
type LiveReaderMetrics struct {
// contains filtered or unexported fields
}
LiveReaderMetrics holds all metrics exposed by the LiveReader.
func NewLiveReaderMetrics ¶
func NewLiveReaderMetrics(reg prometheus.Registerer) *LiveReaderMetrics
NewLiveReaderMetrics instantiates, registers and returns metrics to be injected at LiveReader instantiation.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader reads WAL records from an io.Reader.
func (*Reader) Err ¶
Err returns the last encountered error wrapped in a corruption error. If the reader does not allow to infer a segment index and offset, a total offset in the reader stream will be provided.
func (*Reader) Next ¶
Next advances the reader to the next records and returns true if it exists. It must not be called again after it returned false.
type Segment ¶
type Segment struct { SegmentFile // contains filtered or unexported fields }
Segment represents a segment file.
func CreateSegment ¶
CreateSegment creates a new segment k in dir.
func OpenReadSegment ¶
OpenReadSegment opens the segment with the given filename.
func OpenWriteSegment ¶
OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
type SegmentFile ¶
type SegmentFile interface { Stat() (os.FileInfo, error) Sync() error io.Writer io.Reader io.Closer }
SegmentFile represents the underlying file used to store a segment.
type SegmentRange ¶
SegmentRange groups segments by the directory and the first and last index it includes.
type WL ¶
type WL struct {
// contains filtered or unexported fields
}
WL is a write log that stores records in segment files. It must be read from start to end once before logging new data. If an error occurs during read, the repair procedure must be called before it's safe to do further writes.
Segments are written to in pages of 32KB, with records possibly split across page boundaries. Records are never split across segments to allow full segments to be safely truncated. It also ensures that torn writes never corrupt records beyond the most recent segment.
func New ¶
func New(logger log.Logger, reg prometheus.Registerer, dir string, compress bool) (*WL, error)
New returns a new WAL over the given directory.
func NewSize ¶
func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSize int, compress bool) (*WL, error)
NewSize returns a new write log over the given directory. New segments are created with the specified size.
func (*WL) CompressionEnabled ¶
CompressionEnabled returns if compression is enabled on this WAL.
func (*WL) LastSegmentAndOffset ¶
LastSegmentAndOffset returns the last segment number of the WAL and the offset in that file upto which the segment has been filled.
func (*WL) Log ¶
Log writes the records into the log. Multiple records can be passed at once to reduce writes and increase throughput.
func (*WL) NextSegment ¶
NextSegment creates the next segment and closes the previous one asynchronously. It returns the file number of the new file.
func (*WL) NextSegmentSync ¶
NextSegmentSync creates the next segment and closes the previous one in sync. It returns the file number of the new file.
func (*WL) Repair ¶
Repair attempts to repair the WAL based on the error. It discards all data after the corruption.
func (*WL) Size ¶
Size computes the size of the write log. We do this by adding the sizes of all the files under the WAL dir.
type Watcher ¶
type Watcher struct { // For testing, stop when we hit this segment. MaxSegment int // contains filtered or unexported fields }
Watcher watches the TSDB WAL for a given WriteTo.
func NewWatcher ¶
func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher
NewWatcher creates a new WAL watcher for a given WriteTo.
func (*Watcher) Run ¶
Run the watcher, which will tail the WAL until the quit channel is closed or an error case is hit.
func (*Watcher) SetStartTime ¶
type WatcherMetrics ¶
type WatcherMetrics struct {
// contains filtered or unexported fields
}
func NewWatcherMetrics ¶
func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics
type WriteTo ¶
type WriteTo interface { // Append and AppendExemplar should block until the samples are fully accepted, // whether enqueued in memory or successfully written to it's final destination. // Once returned, the WAL Watcher will not attempt to pass that data again. Append([]record.RefSample) bool AppendExemplars([]record.RefExemplar) bool AppendHistograms([]record.RefHistogramSample) bool AppendFloatHistograms([]record.RefFloatHistogramSample) bool StoreSeries([]record.RefSeries, int) // Next two methods are intended for garbage-collection: first we call // UpdateSeriesSegment on all current series UpdateSeriesSegment([]record.RefSeries, int) // Then SeriesReset is called to allow the deletion // of all series created in a segment lower than the argument. SeriesReset(int) }
WriteTo is an interface used by the Watcher to send the samples it's read from the WAL on to somewhere else. Functions will be called concurrently and it is left to the implementer to make sure they are safe.