Documentation ¶
Index ¶
- Constants
- func NewCombinedEngineCursor(wc, ec tsdb.Cursor, ascending bool) tsdb.Cursor
- func NewDataFile(f *os.File) (*dataFile, error)
- func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
- func NewMultiFieldCursor(fields []string, cursors []tsdb.Cursor, ascending bool) tsdb.Cursor
- func SeriesFieldKey(seriesKey, field string) string
- func ZigZagDecode(v uint64) int64
- func ZigZagEncode(x int64) uint64
- type BoolDecoder
- type BoolEncoder
- type BoolValue
- type EmptyValue
- type Engine
- func (e *Engine) Begin(writable bool) (tsdb.Tx, error)
- func (e *Engine) Close() error
- func (e *Engine) Compact(fullCompaction bool) error
- func (e *Engine) DataFileCount() int
- func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error)
- func (e *Engine) DeleteMeasurement(name string, seriesKeys []string) error
- func (e *Engine) DeleteSeries(seriesKeys []string) error
- func (e *Engine) Format() tsdb.EngineFormat
- func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, ...) error
- func (e *Engine) MarkDeletes(keys []string)
- func (e *Engine) MarkMeasurementDelete(name string)
- func (e *Engine) Open() error
- func (e *Engine) Path() string
- func (e *Engine) PerformMaintenance()
- func (e *Engine) SeriesCount() (n int, err error)
- func (e *Engine) SetLogOutput(w io.Writer)
- func (e *Engine) Write(pointsByKey map[string]Values, ...) error
- func (e *Engine) WritePoints(points []models.Point, ...) error
- func (e *Engine) WriteTo(w io.Writer) (n int64, err error)
- type FloatDecoder
- type FloatEncoder
- type FloatValue
- type IndexWriter
- type Int64Decoder
- type Int64Encoder
- type Int64Value
- type Log
- func (l *Log) Close() error
- func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor
- func (l *Log) DeleteMeasurement(measurement string, keys []string) error
- func (l *Log) DeleteSeries(keys []string) error
- func (l *Log) Flush() error
- func (l *Log) LastWriteTime() time.Time
- func (l *Log) Open() error
- func (l *Log) Path() string
- func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, ...) error
- type StringDecoder
- type StringEncoder
- type StringValue
- type TimeDecoder
- type TimeEncoder
- type TimePrecision
- type Value
- type Values
- type WriteLock
Constants ¶
const ( // BlockFloat64 designates a block encodes float64 values BlockFloat64 = 0 // BlockInt64 designates a block encodes int64 values BlockInt64 = 1 // BlockBool designates a block encodes bool values BlockBool = 2 // BlockString designates a block encodes string values BlockString = 3 )
const ( // Format is the file format name of this engine. Format = "tsm1" //IDsFileExtension is the extension for the file that keeps the compressed map // of keys to uint64 IDs. IDsFileExtension = "ids" // FieldsFileExtension is the extension for the file that stores compressed field // encoding data for this db FieldsFileExtension = "fields" // SeriesFileExtension is the extension for the file that stores the compressed // series metadata for series in this db SeriesFileExtension = "series" // CollisionsFileExtension is the extension for the file that keeps a map of which // keys have hash collisions and what their actual IDs are CollisionsFileExtension = "collisions" //CheckpointExtension is the extension given to files that checkpoint. // The checkpoint files are created when a new file is first created. They // are removed after the file has been synced and is safe for use. If a file // has an associated checkpoint file, it wasn't safely written and both should be removed CheckpointExtension = "check" )
const ( MaxDataFileSize = 1024 * 1024 * 1024 // 1GB // DefaultRotateBlockSize is the default size to rotate to a new compressed block DefaultRotateBlockSize = 512 * 1024 // 512KB DefaultRotateFileSize = 5 * 1024 * 1024 // 5MB DefaultMaxPointsPerBlock = 1000 // MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall MAP_POPULATE = 0x8000 )
const ( // DefaultSegmentSize of 2MB is the size at which segment files will be rolled over DefaultSegmentSize = 2 * 1024 * 1024 // FileExtension is the file extension we expect for wal segments WALFileExtension = "wal" WALFilePrefix = "_" )
Variables ¶
This section is empty.
Functions ¶
func NewCombinedEngineCursor ¶
NewCombinedEngineCursor returns a Cursor that joins wc and ec. Values from wc take precedence over ec when identical timestamps are returned.
func NewDataFile ¶
func NewMultiFieldCursor ¶
NewMultiFieldCursor returns an instance of Cursor that joins the results of cursors.
func SeriesFieldKey ¶
SeriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID
func ZigZagDecode ¶
ZigZagDecode converts a previously zigzag encoded uint64 back to a int64
func ZigZagEncode ¶
ZigZagEncode converts a int64 to a uint64 by zig zagging negative and positive values across even and odd numbers. Eg. [0,-1,1,-2] becomes [0, 1, 2, 3]
Types ¶
type BoolDecoder ¶
BoolDecoder decodes a series of bools from an in-memory buffer.
func NewBoolDecoder ¶
func NewBoolDecoder(b []byte) BoolDecoder
NewBoolDecoder returns a new instance of BoolDecoder.
type BoolEncoder ¶
BoolEncoder encodes a series of bools to an in-memory buffer.
func NewBoolEncoder ¶
func NewBoolEncoder() BoolEncoder
NewBoolEncoder returns a new instance of BoolEncoder.
type EmptyValue ¶
type EmptyValue struct { }
func (*EmptyValue) Size ¶
func (e *EmptyValue) Size() int
func (*EmptyValue) Time ¶
func (e *EmptyValue) Time() time.Time
func (*EmptyValue) UnixNano ¶
func (e *EmptyValue) UnixNano() int64
func (*EmptyValue) Value ¶
func (e *EmptyValue) Value() interface{}
type Engine ¶
type Engine struct { // HashSeriesField is a function that takes a series key and a field name // and returns a hash identifier. It's not guaranteed to be unique. HashSeriesField func(key string) uint64 WAL *Log RotateFileSize uint32 SkipCompaction bool CompactionAge time.Duration MinCompactionFileCount int IndexCompactionFullAge time.Duration IndexMinCompactionInterval time.Duration MaxPointsPerBlock int RotateBlockSize int // contains filtered or unexported fields }
Engine represents a storage engine with compressed blocks.
func (*Engine) DataFileCount ¶
DataFileCount returns the number of data files in the database
func (*Engine) DecodeAndCombine ¶
func (e *Engine) DecodeAndCombine(newValues Values, block, buf []byte, nextTime int64, hasFutureBlock bool) (Values, []byte, error)
DecodeAndCombine take an encoded block from a file, decodes it and interleaves the file values with the values passed in. nextTime and hasNext refer to if the file has future encoded blocks so that this method can know how much of its values can be combined and output in the resulting encoded block.
func (*Engine) DeleteMeasurement ¶
DeleteMeasurement deletes a measurement and all related series.
func (*Engine) DeleteSeries ¶
DeleteSeries deletes the series from the engine.
func (*Engine) Format ¶
func (e *Engine) Format() tsdb.EngineFormat
Format returns the format type of this engine
func (*Engine) LoadMetadataIndex ¶
func (e *Engine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
LoadMetadataIndex loads the shard metadata into memory.
func (*Engine) MarkDeletes ¶
MarkDeletes will mark the given keys for deletion in memory. They will be deleted from data files on the next flush. This mainly for the WAL to use on startup
func (*Engine) MarkMeasurementDelete ¶
func (*Engine) PerformMaintenance ¶
func (e *Engine) PerformMaintenance()
PerformMaintenance is for periodic maintenance of the store. A no-op for b1
func (*Engine) SeriesCount ¶
SeriesCount returns the number of series buckets on the shard.
func (*Engine) Write ¶
func (e *Engine) Write(pointsByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
func (*Engine) WritePoints ¶
func (e *Engine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
WritePoints writes metadata and point data into the engine. Returns an error if new points are added to an existing key.
type FloatDecoder ¶
type FloatDecoder struct {
// contains filtered or unexported fields
}
FloatDecoder decodes a byte slice into multipe float64 values
func NewFloatDecoder ¶
func NewFloatDecoder(b []byte) (*FloatDecoder, error)
func (*FloatDecoder) Error ¶
func (it *FloatDecoder) Error() error
func (*FloatDecoder) Next ¶
func (it *FloatDecoder) Next() bool
func (*FloatDecoder) Values ¶
func (it *FloatDecoder) Values() float64
type FloatEncoder ¶
type FloatEncoder struct {
// contains filtered or unexported fields
}
FloatEncoder encodes multiple float64s into a byte slice
func NewFloatEncoder ¶
func NewFloatEncoder() *FloatEncoder
func (*FloatEncoder) Bytes ¶
func (s *FloatEncoder) Bytes() []byte
func (*FloatEncoder) Finish ¶
func (s *FloatEncoder) Finish()
func (*FloatEncoder) Push ¶
func (s *FloatEncoder) Push(v float64)
type FloatValue ¶
type FloatValue struct {
// contains filtered or unexported fields
}
func (*FloatValue) Size ¶
func (f *FloatValue) Size() int
func (*FloatValue) Time ¶
func (f *FloatValue) Time() time.Time
func (*FloatValue) UnixNano ¶
func (f *FloatValue) UnixNano() int64
func (*FloatValue) Value ¶
func (f *FloatValue) Value() interface{}
type IndexWriter ¶
type IndexWriter interface { Write(valuesByKey map[string]Values, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error MarkDeletes(keys []string) MarkMeasurementDelete(name string) }
IndexWriter is an interface for the indexed database the WAL flushes data to
type Int64Decoder ¶
Int64Decoder decodes a byte slice into int64s
func NewInt64Decoder ¶
func NewInt64Decoder(b []byte) Int64Decoder
type Int64Encoder ¶
Int64Encoder encoders int64 into byte slices
func NewInt64Encoder ¶
func NewInt64Encoder() Int64Encoder
type Int64Value ¶
type Int64Value struct {
// contains filtered or unexported fields
}
func (*Int64Value) Size ¶
func (v *Int64Value) Size() int
func (*Int64Value) String ¶
func (v *Int64Value) String() string
func (*Int64Value) Time ¶
func (v *Int64Value) Time() time.Time
func (*Int64Value) UnixNano ¶
func (v *Int64Value) UnixNano() int64
func (*Int64Value) Value ¶
func (v *Int64Value) Value() interface{}
type Log ¶
type Log struct { // LogOutput is the writer used by the logger. LogOutput io.Writer // FlushColdInterval is the period of time after which a partition will do a // full flush and compaction if it has been cold for writes. FlushColdInterval time.Duration // SegmentSize is the file size at which a segment file will be rotated SegmentSize int // FlushMemorySizeThreshold specifies when the log should be forced to be flushed FlushMemorySizeThreshold int // MaxMemorySizeThreshold specifies the limit at which writes to the WAL should be rejected MaxMemorySizeThreshold int // IndexWriter is the database series will be flushed to IndexWriter IndexWriter // LoggingEnabled specifies if detailed logs should be output LoggingEnabled bool // SkipCache specifies if the wal should immediately write to the index instead of // caching data in memory. False by default so we buffer in memory before flushing to index. SkipCache bool // SkipDurability specifies if the wal should not write the wal entries to disk. // False by default which means all writes are durable even when cached before flushing to index. SkipDurability bool // contains filtered or unexported fields }
func (*Log) Cursor ¶
func (l *Log) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor
Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given. This should only ever be called by the engine cursor method, which will always give it exactly one field.
func (*Log) DeleteMeasurement ¶
func (*Log) DeleteSeries ¶
func (*Log) LastWriteTime ¶
func (*Log) Open ¶
Open opens and initializes the Log. Will recover from previous unclosed shutdowns
func (*Log) WritePoints ¶
func (l *Log) WritePoints(points []models.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error
type StringDecoder ¶
func NewStringDecoder ¶
func NewStringDecoder(b []byte) (StringDecoder, error)
type StringEncoder ¶
func NewStringEncoder ¶
func NewStringEncoder() StringEncoder
type StringValue ¶
type StringValue struct {
// contains filtered or unexported fields
}
func (*StringValue) Size ¶
func (v *StringValue) Size() int
func (*StringValue) String ¶
func (v *StringValue) String() string
func (*StringValue) Time ¶
func (v *StringValue) Time() time.Time
func (*StringValue) UnixNano ¶
func (v *StringValue) UnixNano() int64
func (*StringValue) Value ¶
func (v *StringValue) Value() interface{}
type TimeDecoder ¶
TimeDecoder decodes byte slices to time.Time values.
func NewTimeDecoder ¶
func NewTimeDecoder(b []byte) TimeDecoder
type TimeEncoder ¶
TimeEncoder encodes time.Time to byte slices.
type TimePrecision ¶
type TimePrecision uint8
const ( Seconds TimePrecision = iota Milliseconds Microseconds Nanoseconds )
type Values ¶
type Values []Value
Values represented a time ascending sorted collection of Value types. the underlying type should be the same across all values, but the interface makes the code cleaner.
func DecodeBlock ¶
DecodeBlock takes a byte array and will decode into values of the appropriate type based on the block
func (Values) Deduplicate ¶
Deduplicate returns a new Values slice with any values that have the same timestamp removed. The Value that appears last in the slice is the one that is kept. The returned slice is in ascending order
type WriteLock ¶
type WriteLock struct {
// contains filtered or unexported fields
}
writeLock is a lock that enables locking of ranges between a min and max value. We use this so that flushes from the WAL can occur concurrently along with compactions.
func (*WriteLock) LockRange ¶
LockRange will ensure an exclusive lock between the min and max values inclusive. Any subsequent calls that have an an overlapping range will have to wait until the previous lock is released. A corresponding call to UnlockRange should be deferred.
func (*WriteLock) UnlockRange ¶
UnlockRange will release a previously locked range.