Documentation ¶
Overview ¶
Package WAL implements a write ahead log optimized for write throughput that can be put in front of the database index.
The WAL is broken into different partitions. The default number of partitions is 5. Each partition consists of a number of segment files. By default these files will get up to 2MB in size before a new segment file is opened. The files are numbered and start at 1. The number indicates the order in which the files should be read on startup to ensure data is recovered in the same order it was written.
Partitions are flushed and compacted individually. One of the goals with having multiple partitions was to be able to flush only a portion of the WAL at a time.
The WAL does not flush everything in a partition when it comes time. It will only flush series that are over a given threshold (32kb by default). The rest will be written into a new segment file so they can be flushed later. This is like a compaction in an LSM Tree.
Index ¶
- Constants
- Variables
- func DecodeKeyValue(fields []string, dec *tsdb.FieldCodec, k, v []byte) (key int64, value interface{})
- func MarshalEntry(timestamp int64, data []byte) []byte
- func UnmarshalEntry(buf []byte) (timestamp int64, data []byte)
- type IndexWriter
- type Log
- func (l *Log) Close() error
- func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor
- func (l *Log) DeleteSeries(keys []string) error
- func (l *Log) DiskSize() (int64, error)
- func (l *Log) Flush() error
- func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, ...) error
- func (l *Log) Open() error
- func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, ...) error
- type Partition
Constants ¶
const ( // DefaultSegmentSize of 2MB is the size at which segment files will be rolled over DefaultSegmentSize = 2 * 1024 * 1024 // FileExtension is the file extension we expect for wal segments FileExtension = "wal" // MetaFileExtension is the file extension for the log files of new fields and measurements that get created MetaFileExtension = "meta" // CompactionExtension is the file extension we expect for compaction files CompactionExtension = "CPT" // MetaFlushInterval is the period after which any compressed meta data in the .meta file will get // flushed to the index MetaFlushInterval = 10 * time.Minute // FailWriteMemoryThreshold will start returning errors on writes if the memory gets more // than this multiple above the maximum threshold. This is set to 5 because previously // the memory threshold was for 5 partitions, but when this was introduced the partition // count was reduced to 1 so we know that it can handle at least this much extra memory FailWriteMemoryThreshold = 5 )
Variables ¶
var ( // ErrCompactionRunning to return if we attempt to run a compaction on a partition that is currently running one ErrCompactionRunning = errors.New("compaction running") // ErrMemoryCompactionDone gets returned if we called to flushAndCompact to free up memory // but a compaction has already been done to do so ErrMemoryCompactionDone = errors.New("compaction already run to free up memory") // CompactSequence is the byte sequence within a segment file that has been compacted // that indicates the start of a compaction marker CompactSequence = []byte{0xFF, 0xFF} )
Functions ¶
func DecodeKeyValue ¶
func DecodeKeyValue(fields []string, dec *tsdb.FieldCodec, k, v []byte) (key int64, value interface{})
DecodeKeyValue decodes the key and value from bytes.
func MarshalEntry ¶
marshalCacheEntry encodes the timestamp and data to a single byte slice.
The format of the byte slice is:
uint64 timestamp []byte data
func UnmarshalEntry ¶
unmarshalCacheEntry returns the timestamp and data from an encoded byte slice.
Types ¶
type IndexWriter ¶
type IndexWriter interface { // time ascending points where each byte array is: // int64 time // data WriteIndex(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error }
IndexWriter is an interface for the indexed database the WAL flushes data to
type Log ¶
type Log struct { // LogOutput is the writer used by the logger. LogOutput io.Writer // FlushColdInterval is the period of time after which a partition will do a // full flush and compaction if it has been cold for writes. FlushColdInterval time.Duration // SegmentSize is the file size at which a segment file will be rotated in a partition. SegmentSize int64 // MaxSeriesSize controls when a partition should get flushed to index and compacted // if any series in the partition has exceeded this size threshold MaxSeriesSize int // ReadySeriesSize is the minimum size a series of points must get to before getting flushed. ReadySeriesSize int // CompactionThreshold controls when a parition will be flushed. Once this // percentage of series in a partition are ready, a flush and compaction will be triggered. CompactionThreshold float64 // PartitionSizeThreshold specifies when a partition should be forced to be flushed. PartitionSizeThreshold uint64 // Index is the database that series data gets flushed to once it gets compacted // out of the WAL. Index IndexWriter // LoggingEnabled specifies if detailed logs should be output LoggingEnabled bool // contains filtered or unexported fields }
func (*Log) Cursor ¶
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor
Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
func (*Log) DeleteSeries ¶
DeleteSeries will flush the metadata that is in the WAL to the index and remove all series specified from the cache and the segment files in each partition. This will block all writes while a compaction is done against all partitions. This function is meant to be called by bz1 BEFORE it updates its own index, since the metadata is flushed here first.
func (*Log) LoadMetadataIndex ¶
func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
LoadMetadatIndex loads the new series and fields files into memory and flushes them to the BoltDB index. This function should be called before making a call to Open()
func (*Log) Open ¶
Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (*Log) WritePoints ¶
func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error
type Partition ¶
type Partition struct {
// contains filtered or unexported fields
}
Partition is a set of files for a partition of the WAL. We use multiple partitions so when compactions occur only a portion of the WAL must be flushed and compacted