Documentation ¶
Index ¶
- Constants
- Variables
- func DecodeBlock(block []byte, vals *[]Value) error
- func NewCombinedEngineCursor(wc, ec tsdb.Cursor, ascending bool) tsdb.Cursor
- func NewDataFile(f *os.File) (*dataFile, error)
- func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
- func NewMultiFieldCursor(fields []string, cursors []tsdb.Cursor, ascending bool) tsdb.Cursor
- func NewTSMReader(r io.ReadSeeker) (*tsmReader, error)
- func SeriesFieldKey(seriesKey, field string) string
- func ZigZagDecode(v uint64) int64
- func ZigZagEncode(x int64) uint64
- type BoolDecoder
- type BoolEncoder
- type BoolValue
- type EmptyValue
- type Engine
- func (e *Engine) Begin(writable bool) (tsdb.Tx, error)
- func (e *Engine) Close() error
- func (e *Engine) Compact(fullCompaction bool) error
- func (e *Engine) DataFileCount() int
- func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error)
- func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error
- func (e *Engine) DeleteSeries(seriesKeys []string) error
- func (e *Engine) Format() tsdb.EngineFormat
- func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, ...) error
- func (e *Engine) MarkDeletes(keys []string)
- func (e *Engine) MarkMeasurementDelete(name string)
- func (e *Engine) Open() error
- func (e *Engine) Path() string
- func (e *Engine) PerformMaintenance()
- func (e *Engine) SeriesCount() (n int, err error)
- func (e *Engine) SetLogOutput(w io.Writer)
- func (e *Engine) Write(pointsByKey map[string]Values, ...) error
- func (e *Engine) WritePoints(points []models.Point, ...) error
- func (e *Engine) WriteTo(w io.Writer) (n int64, err error)
- type FloatDecoder
- type FloatEncoder
- type FloatValue
- type IndexEntry
- type IndexWriter
- type Int64Decoder
- type Int64Encoder
- type Int64Value
- type Log
- func (l *Log) Close() error
- func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor
- func (l *Log) DeleteMeasurement(measurement string, keys []string) error
- func (l *Log) DeleteSeries(keys []string) error
- func (l *Log) Flush() error
- func (l *Log) LastWriteTime() time.Time
- func (l *Log) Open() error
- func (l *Log) Path() string
- func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, ...) error
- type StringDecoder
- type StringEncoder
- type StringValue
- type TSMIndex
- type TSMWriter
- type TimeDecoder
- type TimeEncoder
- type TimePrecision
- type Value
- type Values
- type WriteLock
Constants ¶
const ( // MagicNumber is written as the first 4 bytes of a data file to // identify the file as a tsm1 formatted file MagicNumber uint32 = 0x16D116D1 Version byte = 1 )
const ( // BlockFloat64 designates a block encodes float64 values BlockFloat64 = 0 // BlockInt64 designates a block encodes int64 values BlockInt64 = 1 // BlockBool designates a block encodes bool values BlockBool = 2 // BlockString designates a block encodes string values BlockString = 3 )
const ( // Format is the file format name of this engine. Format = "tsm1" //IDsFileExtension is the extension for the file that keeps the compressed map // of keys to uint64 IDs. IDsFileExtension = "ids" // FieldsFileExtension is the extension for the file that stores compressed field // encoding data for this db FieldsFileExtension = "fields" // SeriesFileExtension is the extension for the file that stores the compressed // series metadata for series in this db SeriesFileExtension = "series" // CollisionsFileExtension is the extension for the file that keeps a map of which // keys have hash collisions and what their actual IDs are CollisionsFileExtension = "collisions" // CheckpointExtension is the extension given to files that checkpoint a rewrite or compaction. // The checkpoint files are created when a new file is first created. They // are removed after the file has been synced and is safe for use. If a file // has an associated checkpoint file, it wasn't safely written and both should be removed CheckpointExtension = "check" // CompactionExtension is the extension given to the file that marks when a compaction has been // fully written, but the compacted files have not yet been deleted. It is used for cleanup // if the server was not cleanly shutdown before the compacted files could be deleted. CompactionExtension = "compact" )
const ( MaxDataFileSize = 1024 * 1024 * 1024 * 2 // 2GB DefaultRotateFileSize = 5 * 1024 * 1024 // 5MB DefaultMaxPointsPerBlock = 1000 // MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall MAP_POPULATE = 0x8000 )
const ( // DefaultSegmentSize of 2MB is the size at which segment files will be rolled over DefaultSegmentSize = 2 * 1024 * 1024 // FileExtension is the file extension we expect for wal segments WALFileExtension = "wal" WALFilePrefix = "_" )
Variables ¶
var ErrWALClosed = fmt.Errorf("WAL closed")
Functions ¶
func DecodeBlock ¶
DecodeBlock takes a byte array and will decode into values of the appropriate type based on the block
func NewCombinedEngineCursor ¶
NewCombinedEngineCursor returns a Cursor that joins wc and ec. Values from wc take precedence over ec when identical timestamps are returned.
func NewDataFile ¶
func NewMultiFieldCursor ¶
NewMultiFieldCursor returns an instance of Cursor that joins the results of cursors.
func NewTSMReader ¶
func NewTSMReader(r io.ReadSeeker) (*tsmReader, error)
func SeriesFieldKey ¶
SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID
func ZigZagDecode ¶
ZigZagDecode converts a previously zigzag encoded uint64 back to a int64
func ZigZagEncode ¶
ZigZagEncode converts a int64 to a uint64 by zig zagging negative and positive values across even and odd numbers. Eg. [0,-1,1,-2] becomes [0, 1, 2, 3]
Types ¶
type BoolDecoder ¶
BoolDecoder decodes a series of bools from an in-memory buffer.
func NewBoolDecoder ¶
func NewBoolDecoder(b []byte) BoolDecoder
NewBoolDecoder returns a new instance of BoolDecoder.
type BoolEncoder ¶
BoolEncoder encodes a series of bools to an in-memory buffer.
func NewBoolEncoder ¶
func NewBoolEncoder() BoolEncoder
NewBoolEncoder returns a new instance of BoolEncoder.
type EmptyValue ¶
type EmptyValue struct { }
func (*EmptyValue) Size ¶
func (e *EmptyValue) Size() int
func (*EmptyValue) Time ¶
func (e *EmptyValue) Time() time.Time
func (*EmptyValue) UnixNano ¶
func (e *EmptyValue) UnixNano() int64
func (*EmptyValue) Value ¶
func (e *EmptyValue) Value() interface{}
type Engine ¶
type Engine struct { // HashSeriesField is a function that takes a series key and a field name // and returns a hash identifier. It's not guaranteed to be unique. HashSeriesField func(key string) uint64 WAL *Log RotateFileSize uint32 MaxFileSize uint32 SkipCompaction bool CompactionAge time.Duration MinCompactionFileCount int IndexCompactionFullAge time.Duration IndexMinCompactionInterval time.Duration MaxPointsPerBlock int // contains filtered or unexported fields }
Engine represents a storage engine with compressed blocks.
func (*Engine) Compact ¶
Compact will compact data files in the directory into the fewest possible data files they can be combined into
func (*Engine) DataFileCount ¶
DataFileCount returns the number of data files in the database
func (*Engine) DecodeAndCombine ¶
func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error)
DecodeAndCombine take an encoded block from a file, decodes it and interleaves the file values with the values passed in. nextTime and hasNext refer to if the file has future encoded blocks so that this method can know how much of its values can be combined and output in the resulting encoded block.
func (*Engine) DeleteMeasurement ¶
DeleteMeasurement deletes a measurement and all related series.
func (*Engine) DeleteSeries ¶
DeleteSeries deletes the series from the engine.
func (*Engine) Format ¶
func (e *Engine) Format() tsdb.EngineFormat
Format returns the format type of this engine
func (*Engine) LoadMetadataIndex ¶
func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
LoadMetadataIndex loads the shard metadata into memory.
func (*Engine) MarkDeletes ¶
MarkDeletes will mark the given keys for deletion in memory. They will be deleted from data files on the next flush. This mainly for the WAL to use on startup
func (*Engine) MarkMeasurementDelete ¶
func (*Engine) PerformMaintenance ¶
func (e *Engine) PerformMaintenance()
PerformMaintenance is for periodic maintenance of the store. A no-op for b1
func (*Engine) SeriesCount ¶
SeriesCount returns the number of series buckets on the shard.
func (*Engine) Write ¶
func (e *Engine) Write(pointsByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
func (*Engine) WritePoints ¶
func (e *Engine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
WritePoints writes metadata and point data into the engine. Returns an error if new points are added to an existing key.
type FloatDecoder ¶
type FloatDecoder struct {
// contains filtered or unexported fields
}
FloatDecoder decodes a byte slice into multipe float64 values
func NewFloatDecoder ¶
func NewFloatDecoder(b []byte) (*FloatDecoder, error)
func (*FloatDecoder) Error ¶
func (it *FloatDecoder) Error() error
func (*FloatDecoder) Next ¶
func (it *FloatDecoder) Next() bool
func (*FloatDecoder) Values ¶
func (it *FloatDecoder) Values() float64
type FloatEncoder ¶
type FloatEncoder struct {
// contains filtered or unexported fields
}
FloatEncoder encodes multiple float64s into a byte slice
func NewFloatEncoder ¶
func NewFloatEncoder() *FloatEncoder
func (*FloatEncoder) Bytes ¶
func (s *FloatEncoder) Bytes() ([]byte, error)
func (*FloatEncoder) Finish ¶
func (s *FloatEncoder) Finish()
func (*FloatEncoder) Push ¶
func (s *FloatEncoder) Push(v float64)
type FloatValue ¶
type FloatValue struct {
// contains filtered or unexported fields
}
func (*FloatValue) Size ¶
func (f *FloatValue) Size() int
func (*FloatValue) Time ¶
func (f *FloatValue) Time() time.Time
func (*FloatValue) UnixNano ¶
func (f *FloatValue) UnixNano() int64
func (*FloatValue) Value ¶
func (f *FloatValue) Value() interface{}
type IndexEntry ¶
type IndexEntry struct {
// The min and max time of all points stored in the block.
MinTime, MaxTime time.Time
// The absolute position in the file where this block is located.
Offset int64
// The size in bytes of the block in the file.
Size uint32
}
IndexEntry is the index information for a given block in a TSM file.
func (*IndexEntry) Contains ¶
func (e *IndexEntry) Contains(t time.Time) bool
Returns true if this IndexEntry may contain values for the given time. The min and max times are inclusive.
func (*IndexEntry) UnmarshalBinary ¶
func (e *IndexEntry) UnmarshalBinary(b []byte) error
type IndexWriter ¶
type IndexWriter interface { Write(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error MarkDeletes(keys []string) MarkMeasurementDelete(name string) }
IndexWriter is an interface for the indexed database the WAL flushes data to
type Int64Decoder ¶
Int64Decoder decodes a byte slice into int64s
func NewInt64Decoder ¶
func NewInt64Decoder(b []byte) Int64Decoder
type Int64Encoder ¶
Int64Encoder encoders int64 into byte slices
func NewInt64Encoder ¶
func NewInt64Encoder() Int64Encoder
type Int64Value ¶
type Int64Value struct {
// contains filtered or unexported fields
}
func (*Int64Value) Size ¶
func (v *Int64Value) Size() int
func (*Int64Value) String ¶
func (v *Int64Value) String() string
func (*Int64Value) Time ¶
func (v *Int64Value) Time() time.Time
func (*Int64Value) UnixNano ¶
func (v *Int64Value) UnixNano() int64
func (*Int64Value) Value ¶
func (v *Int64Value) Value() interface{}
type Log ¶
type Log struct { // LogOutput is the writer used by the logger. LogOutput io.Writer // FlushColdInterval is the period of time after which a partition will do a // full flush and compaction if it has been cold for writes. FlushColdInterval time.Duration // SegmentSize is the file size at which a segment file will be rotated SegmentSize int // FlushMemorySizeThreshold specifies when the log should be forced to be flushed FlushMemorySizeThreshold int // MaxMemorySizeThreshold specifies the limit at which writes to the WAL should be rejected MaxMemorySizeThreshold int // IndexWriter is the database series will be flushed to IndexWriter IndexWriter // LoggingEnabled specifies if detailed logs should be output LoggingEnabled bool // SkipCache specifies if the wal should immediately write to the index instead of // caching data in memory. False by default so we buffer in memory before flushing to index. SkipCache bool // SkipDurability specifies if the wal should not write the wal entries to disk. // False by default which means all writes are durable even when cached before flushing to index. SkipDurability bool // contains filtered or unexported fields }
func (*Log) Cursor ¶
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor
Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given. This should only ever be called by the engine cursor method, which will always give it exactly one field.
func (*Log) DeleteMeasurement ¶
func (*Log) DeleteSeries ¶
func (*Log) LastWriteTime ¶
func (*Log) Open ¶
Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (*Log) WritePoints ¶
func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error
type StringDecoder ¶
func NewStringDecoder ¶
func NewStringDecoder(b []byte) (StringDecoder, error)
type StringEncoder ¶
func NewStringEncoder ¶
func NewStringEncoder() StringEncoder
type StringValue ¶
type StringValue struct {
// contains filtered or unexported fields
}
func (*StringValue) Size ¶
func (v *StringValue) Size() int
func (*StringValue) String ¶
func (v *StringValue) String() string
func (*StringValue) Time ¶
func (v *StringValue) Time() time.Time
func (*StringValue) UnixNano ¶
func (v *StringValue) UnixNano() int64
func (*StringValue) Value ¶
func (v *StringValue) Value() interface{}
type TSMIndex ¶
type TSMIndex interface { // Add records a new block entry for a key in the index. Add(key string, minTime, maxTime time.Time, offset int64, size uint32) // Entries returns all index entries for a key. Entries(key string) []*IndexEntry // Entry returns the index entry for the specified key and timestamp. If no entry // matches the key and timestamp, nil is returned. Entry(key string, timestamp time.Time) *IndexEntry // MarshalBinary returns a byte slice encoded version of the index. MarshalBinary() ([]byte, error) // UnmarshalBinary populates an index from an encoded byte slice // representation of an index. UnmarshalBinary(b []byte) error }
TSMIndex represent the index section of a TSM file. The index records all blocks, their locations, sizes, min and max times.
func NewDirectIndex ¶
func NewDirectIndex() TSMIndex
func NewIndirectIndex ¶
func NewIndirectIndex() TSMIndex
type TSMWriter ¶
type TSMWriter interface { // Write writes a new block for key containing and values. Writes append // blocks in the order that the Write function is called. The caller is // responsible for ensuring keys and blocks or sorted appropriately. // Values are encoded as a full block. The caller is responsible for // ensuring a fixed number of values are encoded in each block as wells as // ensuring the Values are sorted. The first and last timestamp values are // used as the minimum and maximum values for the index entry. Write(key string, values Values) error // Close finishes the TSM write streams and writes the index. Close() error }
TSMWriter writes TSM formatted key and values.
type TimeDecoder ¶
TimeDecoder decodes byte slices to time.Time values.
func NewTimeDecoder ¶
func NewTimeDecoder(b []byte) TimeDecoder
type TimeEncoder ¶
TimeEncoder encodes time.Time to byte slices.
type TimePrecision ¶
type TimePrecision uint8
const ( Seconds TimePrecision = iota Milliseconds Microseconds Nanoseconds )
type Values ¶
type Values []Value
Values represented a time ascending sorted collection of Value types. the underlying type should be the same across all values, but the interface makes the code cleaner.
func (Values) Deduplicate ¶
Deduplicate returns a new Values slice with any values that have the same timestamp removed. The Value that appears last in the slice is the one that is kept. The returned slice is in ascending order
type WriteLock ¶
type WriteLock struct {
// contains filtered or unexported fields
}
writeLock is a lock that enables locking of ranges between a min and max value. We use this so that flushes from the WAL can occur concurrently along with compactions.
func (*WriteLock) LockRange ¶
LockRange will ensure an exclusive lock between the min and max values inclusive. Any subsequent calls that have an an overlapping range will have to wait until the previous lock is released. A corresponding call to UnlockRange should be deferred.
func (*WriteLock) UnlockRange ¶
UnlockRange will release a previously locked range.