logstorage

package
v0.8.0-victorialogs Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2024 License: Apache-2.0 Imports: 42 Imported by: 1

Documentation

Index

Constants

View Source
const MaxFieldNameSize = 128

MaxFieldNameSize is the maximum size in bytes for field name.

Longer field names are truncated during data ingestion to MaxFieldNameSize length.

Variables

This section is empty.

Functions

func PutJSONParser

func PutJSONParser(p *JSONParser)

PutJSONParser returns the parser to the pool.

The parser cannot be used after returning to the pool.

func PutLogRows

func PutLogRows(lr *LogRows)

PutLogRows returns lr to the pool.

func PutStreamTags

func PutStreamTags(st *StreamTags)

PutStreamTags returns st to the pool.

Types

type BlockColumn

type BlockColumn struct {
	// Name is the column name
	Name string

	// Values is column values
	Values []string
}

BlockColumn is a single column of a block of data

type DatadbStats

type DatadbStats struct {
	// InmemoryMergesTotal is the number of inmemory merges performed in the given datadb.
	InmemoryMergesTotal uint64

	// InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb.
	InmemoryActiveMerges uint64

	// SmallPartMergesTotal is the number of small file merges performed in the given datadb.
	SmallPartMergesTotal uint64

	// SmallPartActiveMerges is the number of currently active small file merges performed by the given datadb.
	SmallPartActiveMerges uint64

	// BigPartMergesTotal is the number of big file merges performed in the given datadb.
	BigPartMergesTotal uint64

	// BigPartActiveMerges is the number of currently active big file merges performed by the given datadb.
	BigPartActiveMerges uint64

	// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
	InmemoryRowsCount uint64

	// SmallPartRowsCount is the number of rows stored on disk in small parts.
	SmallPartRowsCount uint64

	// BigPartRowsCount is the number of rows stored on disk in big parts.
	BigPartRowsCount uint64

	// InmemoryParts is the number of in-memory parts, which weren't flushed to disk yet.
	InmemoryParts uint64

	// SmallParts is the number of file-based small parts stored on disk.
	SmallParts uint64

	// BigParts is the number of file-based big parts stored on disk.
	BigParts uint64

	// InmemoryBlocks is the number of in-memory blocks, which weren't flushed to disk yet.
	InmemoryBlocks uint64

	// SmallPartBlocks is the number of file-based small blocks stored on disk.
	SmallPartBlocks uint64

	// BigPartBlocks is the number of file-based big blocks stored on disk.
	BigPartBlocks uint64

	// CompressedInmemorySize is the size of compressed data stored in memory.
	CompressedInmemorySize uint64

	// CompressedSmallPartSize is the size of compressed small parts data stored on disk.
	CompressedSmallPartSize uint64

	// CompressedBigPartSize is the size of compressed big data stored on disk.
	CompressedBigPartSize uint64

	// UncompressedInmemorySize is the size of uncompressed data stored in memory.
	UncompressedInmemorySize uint64

	// UncompressedSmallPartSize is the size of uncompressed small data stored on disk.
	UncompressedSmallPartSize uint64

	// UncompressedBigPartSize is the size of uncompressed big data stored on disk.
	UncompressedBigPartSize uint64
}

DatadbStats contains various stats for datadb.

func (*DatadbStats) RowsCount

func (s *DatadbStats) RowsCount() uint64

RowsCount returns the number of rows stored in datadb.

type Field

type Field struct {
	// Name is the name of the field
	Name string

	// Value is the value of the field
	Value string
}

Field is a single field for the log entry.

func (*Field) Reset

func (f *Field) Reset()

Reset resets f for future re-use.

func (*Field) String

func (f *Field) String() string

String returns string representation of f.

type IndexdbStats

type IndexdbStats struct {
	// StreamsCreatedTotal is the number of log streams created since the indexdb initialization.
	StreamsCreatedTotal uint64

	// IndexdbSizeBytes is the size of data in indexdb.
	IndexdbSizeBytes uint64

	// IndexdbItemsCount is the number of items in indexdb.
	IndexdbItemsCount uint64

	// IndexdbBlocksCount is the number of blocks in indexdb.
	IndexdbBlocksCount uint64

	// IndexdbPartsCount is the number of parts in indexdb.
	IndexdbPartsCount uint64
}

IndexdbStats contains indexdb stats

type JSONParser

type JSONParser struct {
	// Fields contains the parsed JSON line after Parse() call
	//
	// The Fields are valid until the next call to ParseLogMessage()
	// or until the parser is returned to the pool with PutParser() call.
	Fields []Field
	// contains filtered or unexported fields
}

JSONParser parses a single JSON log message into Fields.

See https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model

Use GetParser() for obtaining the parser.

func GetJSONParser

func GetJSONParser() *JSONParser

GetJSONParser returns JSONParser ready to parse JSON lines.

Return the parser to the pool when it is no longer needed by calling PutJSONParser().

func (*JSONParser) ParseLogMessage

func (p *JSONParser) ParseLogMessage(msg []byte, prefix string) error

ParseLogMessage parses the given JSON log message msg into p.Fields.

It adds the given prefix to all the parsed field names.

The p.Fields remains valid until the next call to ParseLogMessage() or PutJSONParser().

func (*JSONParser) ParseLogMessageNoResetBuf

func (p *JSONParser) ParseLogMessageNoResetBuf(msg, prefix string) error

ParseLogMessageNoResetBuf parses the given JSON log message msg into p.Fields.

It adds the given prefix to all the parsed field names.

The p.Fields remains valid until the next call to PutJSONParser().

func (*JSONParser) RenameField

func (p *JSONParser) RenameField(oldName, newName string)

RenameField renames field with the oldName to newName in p.Fields

type LogRows

type LogRows struct {
	// contains filtered or unexported fields
}

LogRows holds a set of rows needed for Storage.MustAddRows

LogRows must be obtained via GetLogRows()

func GetLogRows

func GetLogRows(streamFields, ignoreFields []string) *LogRows

GetLogRows returns LogRows from the pool for the given streamFields.

streamFields is a set of field names, which must be associated with the stream. ignoreFields is a set of field names, which must be ignored during data ingestion.

Return back it to the pool with PutLogRows() when it is no longer needed.

func (*LogRows) GetRowString

func (lr *LogRows) GetRowString(idx int) string

GetRowString returns string representation of the row with the given idx.

func (*LogRows) Len

func (lr *LogRows) Len() int

Len returns the number of items in lr.

func (*LogRows) Less

func (lr *LogRows) Less(i, j int) bool

Less returns true if (streamID, timestamp) for row i is smaller than the (streamID, timestamp) for row j

func (*LogRows) MustAdd

func (lr *LogRows) MustAdd(tenantID TenantID, timestamp int64, fields []Field)

MustAdd adds a log entry with the given args to lr.

It is OK to modify the args after returning from the function, since lr copies all the args to internal data.

field names longer than MaxFieldNameSize are automatically truncated to MaxFieldNameSize length.

func (*LogRows) NeedFlush

func (lr *LogRows) NeedFlush() bool

NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.

func (*LogRows) Reset

func (lr *LogRows) Reset()

Reset resets lr with all its settings.

Call ResetKeepSettings() for resetting lr without resetting its settings.

func (*LogRows) ResetKeepSettings

func (lr *LogRows) ResetKeepSettings()

ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().

func (*LogRows) Swap

func (lr *LogRows) Swap(i, j int)

Swap swaps rows i and j in lr.

type PartitionStats

type PartitionStats struct {
	DatadbStats
	IndexdbStats
}

PartitionStats contains stats for the partition.

type Query

type Query struct {
	// contains filtered or unexported fields
}

Query represents LogsQL query.

func ParseQuery

func ParseQuery(s string) (*Query, error)

ParseQuery parses s.

func (*Query) AddCountByTimePipe

func (q *Query) AddCountByTimePipe(step, off int64, fields []string)

AddCountByTimePipe adds '| stats by (_time:step offset off, field1, ..., fieldN) count() hits' to the end of q.

func (*Query) AddPipeLimit

func (q *Query) AddPipeLimit(n uint64)

AddPipeLimit adds `| limit n` pipe to q.

See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe

func (*Query) AddTimeFilter

func (q *Query) AddTimeFilter(start, end int64)

AddTimeFilter adds global filter _time:[start ... end] to q.

func (*Query) Optimize

func (q *Query) Optimize()

Optimize tries optimizing the query.

func (*Query) String

func (q *Query) String() string

String returns string representation for q.

type RowFormatter

type RowFormatter []Field

RowFormatter implementes fmt.Stringer for []Field aka a single log row

func (*RowFormatter) String

func (rf *RowFormatter) String() string

String returns user-readable representation for rf

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage is the storage for log entries.

func MustOpenStorage

func MustOpenStorage(path string, cfg *StorageConfig) *Storage

MustOpenStorage opens Storage at the given path.

MustClose must be called on the returned Storage when it is no longer needed.

func (*Storage) GetFieldNames

func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error)

GetFieldNames returns field names from q results for the given tenantIDs.

func (*Storage) GetFieldValues

func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error)

GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs.

If limit > 0, then up to limit unique values are returned. The values are returned in arbitrary order because of performance reasons. The caller may sort the returned values if needed.

func (*Storage) IsReadOnly added in v1.94.0

func (s *Storage) IsReadOnly() bool

IsReadOnly returns true if s is in read-only mode.

func (*Storage) MustAddRows

func (s *Storage) MustAddRows(lr *LogRows)

MustAddRows adds lr to s.

It is recommended checking whether the s is in read-only mode by calling IsReadOnly() before calling MustAddRows.

func (*Storage) MustClose

func (s *Storage) MustClose()

MustClose closes s.

It is expected that nobody uses the storage at the close time.

func (*Storage) RunQuery

func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error

RunQuery runs the given q and calls writeBlock for results.

func (*Storage) UpdateStats

func (s *Storage) UpdateStats(ss *StorageStats)

UpdateStats updates ss for the given s.

type StorageConfig

type StorageConfig struct {
	// Retention is the retention for the ingested data.
	//
	// Older data is automatically deleted.
	Retention time.Duration

	// FlushInterval is the interval for flushing the in-memory data to disk at the Storage
	FlushInterval time.Duration

	// FutureRetention is the allowed retention from the current time to future for the ingested data.
	//
	// Log entries with timestamps bigger than now+FutureRetention are ignored.
	FutureRetention time.Duration

	// MinFreeDiskSpaceBytes is the minimum free disk space at storage path after which the storage stops accepting new data.
	MinFreeDiskSpaceBytes int64

	// LogNewStreams indicates whether to log newly created log streams.
	//
	// This can be useful for debugging of high cardinality issues.
	// https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality
	LogNewStreams bool

	// LogIngestedRows indicates whether to log the ingested log entries.
	//
	// This can be useful for debugging of data ingestion.
	LogIngestedRows bool
}

StorageConfig is the config for the Storage.

type StorageStats

type StorageStats struct {
	// RowsDroppedTooBigTimestamp is the number of rows dropped during data ingestion because their timestamp is smaller than the minimum allowed
	RowsDroppedTooBigTimestamp uint64

	// RowsDroppedTooSmallTimestamp is the number of rows dropped during data ingestion because their timestamp is bigger than the maximum allowed
	RowsDroppedTooSmallTimestamp uint64

	// PartitionsCount is the number of partitions in the storage
	PartitionsCount uint64

	// IsReadOnly indicates whether the storage is read-only.
	IsReadOnly bool

	// PartitionStats contains partition stats.
	PartitionStats
}

StorageStats represents stats for the storage. It may be obtained by calling Storage.UpdateStats().

func (*StorageStats) Reset

func (s *StorageStats) Reset()

Reset resets s.

type StreamFilter

type StreamFilter struct {
	// contains filtered or unexported fields
}

StreamFilter is a filter for streams, e.g. `_stream:{...}`

func (*StreamFilter) String

func (sf *StreamFilter) String() string

type StreamTags

type StreamTags struct {
	// contains filtered or unexported fields
}

StreamTags contains stream tags.

func GetStreamTags

func GetStreamTags() *StreamTags

GetStreamTags returns a StreamTags from pool.

func (*StreamTags) Add

func (st *StreamTags) Add(name, value string)

Add adds (name:value) tag to st.

func (*StreamTags) Len

func (st *StreamTags) Len() int

Len returns the number of tags in st.

func (*StreamTags) Less

func (st *StreamTags) Less(i, j int) bool

Less returns true if tag i is smaller than the tag j.

func (*StreamTags) MarshalCanonical

func (st *StreamTags) MarshalCanonical(dst []byte) []byte

MarshalCanonical marshal st in a canonical way

func (*StreamTags) Reset

func (st *StreamTags) Reset()

Reset resets st for re-use

func (*StreamTags) String

func (st *StreamTags) String() string

String returns string representation of st.

func (*StreamTags) Swap

func (st *StreamTags) Swap(i, j int)

Swap swaps i and j tags

func (*StreamTags) UnmarshalCanonical

func (st *StreamTags) UnmarshalCanonical(src []byte) ([]byte, error)

UnmarshalCanonical unmarshals st from src marshaled with MarshalCanonical.

type TenantID

type TenantID struct {
	// AccountID is the id of the account for the log stream.
	AccountID uint32

	// ProjectID is the id of the project for the log stream.
	ProjectID uint32
}

TenantID is an id of a tenant for log streams.

Each log stream is associated with a single TenantID.

func GetTenantIDFromRequest

func GetTenantIDFromRequest(r *http.Request) (TenantID, error)

GetTenantIDFromRequest returns tenantID from r.

func GetTenantIDFromString

func GetTenantIDFromString(s string) (TenantID, error)

GetTenantIDFromString returns tenantID from s. String is expected in the form of accountID:projectID

func (*TenantID) Reset

func (tid *TenantID) Reset()

Reset resets tid.

func (*TenantID) String

func (tid *TenantID) String() string

String returns human-readable representation of tid

type TimeFormatter

type TimeFormatter int64

TimeFormatter implements fmt.Stringer for timestamp in nanoseconds

func (*TimeFormatter) String

func (tf *TimeFormatter) String() string

String returns human-readable representation for tf.

type WriteBlockFunc

type WriteBlockFunc func(workerID uint, timestamps []int64, columns []BlockColumn)

WriteBlockFunc must write a block with the given timestamps and columns.

WriteBlockFunc cannot hold references to timestamps and columns after returning.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL