index

package
v1.6.2-0...-ae1fb5a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2024 License: AGPL-3.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthCachingIndexClient = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowCachingIndexClient   = fmt.Errorf("proto: integer overflow")
)
View Source
var (
	// ErrNotSupported when a schema doesn't support that particular lookup.
	ErrNotSupported           = errors.New("not supported")
	ErrMetricNameLabelMissing = errors.New("metric name label missing")
)

Functions

func ExpectTables

func ExpectTables(ctx context.Context, client TableClient, expected []config.TableDesc) error

ExpectTables compares existing tables to an expected set of tables. Exposed for testing,

func ParseChunkTimeRangeValue

func ParseChunkTimeRangeValue(rangeValue []byte, value []byte) (
	chunkID string, labelValue model.LabelValue, err error,
)

ParseChunkTimeRangeValue returns the chunkID (seriesID since v9) and labelValue for chunk time range values.

Types

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket describes a range of time with a tableName and hashKey

type BucketClient

type BucketClient interface {
	DeleteChunksBefore(ctx context.Context, ts time.Time) error
}

BucketClient is used to enforce retention on chunk buckets.

type Bytes

type Bytes []byte

Bytes exists to stop proto copying the byte array

func (*Bytes) Compare

func (bs *Bytes) Compare(other Bytes) int

Compare Bytes to other

func (*Bytes) Equal

func (bs *Bytes) Equal(other Bytes) bool

Equal returns true if other equals Bytes

func (*Bytes) Marshal

func (bs *Bytes) Marshal() ([]byte, error)

Marshal just returns bs

func (*Bytes) MarshalTo

func (bs *Bytes) MarshalTo(data []byte) (n int, err error)

MarshalTo copies Bytes to data

func (*Bytes) Size

func (bs *Bytes) Size() int

Size returns the length of Bytes

func (*Bytes) Unmarshal

func (bs *Bytes) Unmarshal(data []byte) error

Unmarshal updates Bytes to be data, without a copy

type CacheEntry

type CacheEntry struct {
	Column Bytes `protobuf:"bytes,1,opt,name=Column,proto3,customtype=Bytes" json:"Column"`
	Value  Bytes `protobuf:"bytes,2,opt,name=Value,proto3,customtype=Bytes" json:"Value"`
}

func (*CacheEntry) Descriptor

func (*CacheEntry) Descriptor() ([]byte, []int)

func (*CacheEntry) Equal

func (this *CacheEntry) Equal(that interface{}) bool

func (*CacheEntry) GoString

func (this *CacheEntry) GoString() string

func (*CacheEntry) Marshal

func (m *CacheEntry) Marshal() (dAtA []byte, err error)

func (*CacheEntry) MarshalTo

func (m *CacheEntry) MarshalTo(dAtA []byte) (int, error)

func (*CacheEntry) MarshalToSizedBuffer

func (m *CacheEntry) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*CacheEntry) ProtoMessage

func (*CacheEntry) ProtoMessage()

func (*CacheEntry) Reset

func (m *CacheEntry) Reset()

func (*CacheEntry) Size

func (m *CacheEntry) Size() (n int)

func (*CacheEntry) String

func (this *CacheEntry) String() string

func (*CacheEntry) Unmarshal

func (m *CacheEntry) Unmarshal(dAtA []byte) error

func (*CacheEntry) XXX_DiscardUnknown

func (m *CacheEntry) XXX_DiscardUnknown()

func (*CacheEntry) XXX_Marshal

func (m *CacheEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*CacheEntry) XXX_Merge

func (m *CacheEntry) XXX_Merge(src proto.Message)

func (*CacheEntry) XXX_Size

func (m *CacheEntry) XXX_Size() int

func (*CacheEntry) XXX_Unmarshal

func (m *CacheEntry) XXX_Unmarshal(b []byte) error

type CardinalityExceededError

type CardinalityExceededError struct {
	MetricName, LabelName string
	Size, Limit           int32
}

CardinalityExceededError is returned when the user reads a row that is too large.

func (CardinalityExceededError) Error

func (e CardinalityExceededError) Error() string

type Client

type Client interface {
	ReadClient
	WriteClient
	Stop()
}

Client is a client for the storage of the index (e.g. DynamoDB or Bigtable).

func NewCachingIndexClient

func NewCachingIndexClient(client Client, c cache.Cache, validity time.Duration, limits StoreLimits, logger log.Logger, disableBroadQueries bool) Client

type Entry

type Entry struct {
	TableName string
	HashValue string

	// For writes, RangeValue will always be set.
	RangeValue []byte

	// New for v6 schema, label value is not written as part of the range key.
	Value []byte
}

Entry describes an entry in the chunk index

type EntryProcessor

type EntryProcessor interface {
	ProcessIndexEntry(indexEntry Entry) error

	// Will this user be accepted by the processor?
	AcceptUser(user string) bool

	// Called at the end of reading of index entries.
	Flush() error
}

EntryProcessor receives index entries from a table.

type ExtraTables

type ExtraTables struct {
	TableClient TableClient
	Tables      []config.TableDesc
}

ExtraTables holds the list of tables that TableManager has to manage using a TableClient. This is useful for managing tables other than Chunk and Index tables.

type Query

type Query struct {
	TableName string
	HashValue string

	// One of RangeValuePrefix or RangeValueStart might be set:
	// - If RangeValuePrefix is not nil, must read all keys with that prefix.
	// - If RangeValueStart is not nil, must read all keys from there onwards.
	// - If neither is set, must read all keys for that row.
	// RangeValueStart should only be used for querying Chunk IDs.
	// If this is going to change then please take care of func isChunksQuery in pkg/chunk/storage/caching_index_client.go which relies on it.
	RangeValuePrefix []byte
	RangeValueStart  []byte

	// Filters for querying
	ValueEqual []byte

	// If the result of this lookup is immutable or not (for caching).
	Immutable bool
}

Query describes a query for entries

type QueryPagesCallback

type QueryPagesCallback func(Query, ReadBatchResult) bool

QueryPagesCallback from an IndexQuery.

func QueryFilter

func QueryFilter(callback QueryPagesCallback) QueryPagesCallback

QueryFilter wraps a callback to ensure the results are filtered correctly; useful for the cache and Bigtable backend, which only ever fetches the whole row.

type ReadBatch

type ReadBatch struct {
	Entries []CacheEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries"`
	Key     string       `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
	// The time at which the key expires.
	Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"`
	// The number of entries; used for cardinality limiting.
	// entries will be empty when this is set.
	Cardinality int32 `protobuf:"varint,4,opt,name=cardinality,proto3" json:"cardinality,omitempty"`
}

func (*ReadBatch) Descriptor

func (*ReadBatch) Descriptor() ([]byte, []int)

func (*ReadBatch) Equal

func (this *ReadBatch) Equal(that interface{}) bool

func (*ReadBatch) GetCardinality

func (m *ReadBatch) GetCardinality() int32

func (*ReadBatch) GetEntries

func (m *ReadBatch) GetEntries() []CacheEntry

func (*ReadBatch) GetExpiry

func (m *ReadBatch) GetExpiry() int64

func (*ReadBatch) GetKey

func (m *ReadBatch) GetKey() string

func (*ReadBatch) GoString

func (this *ReadBatch) GoString() string

func (ReadBatch) Iterator

func (b ReadBatch) Iterator() ReadBatchIterator

Iterator implements chunk.ReadBatch.

func (*ReadBatch) Marshal

func (m *ReadBatch) Marshal() (dAtA []byte, err error)

func (*ReadBatch) MarshalTo

func (m *ReadBatch) MarshalTo(dAtA []byte) (int, error)

func (*ReadBatch) MarshalToSizedBuffer

func (m *ReadBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ReadBatch) ProtoMessage

func (*ReadBatch) ProtoMessage()

func (*ReadBatch) Reset

func (m *ReadBatch) Reset()

func (*ReadBatch) Size

func (m *ReadBatch) Size() (n int)

func (*ReadBatch) String

func (this *ReadBatch) String() string

func (*ReadBatch) Unmarshal

func (m *ReadBatch) Unmarshal(dAtA []byte) error

func (*ReadBatch) XXX_DiscardUnknown

func (m *ReadBatch) XXX_DiscardUnknown()

func (*ReadBatch) XXX_Marshal

func (m *ReadBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReadBatch) XXX_Merge

func (m *ReadBatch) XXX_Merge(src proto.Message)

func (*ReadBatch) XXX_Size

func (m *ReadBatch) XXX_Size() int

func (*ReadBatch) XXX_Unmarshal

func (m *ReadBatch) XXX_Unmarshal(b []byte) error

type ReadBatchIterator

type ReadBatchIterator interface {
	Next() bool
	RangeValue() []byte
	Value() []byte
}

ReadBatchIterator is an iterator over a ReadBatch.

type ReadBatchResult

type ReadBatchResult interface {
	Iterator() ReadBatchIterator
}

ReadBatchResult represents the results of a QueryPages.

type ReadClient

type ReadClient interface {
	QueryPages(ctx context.Context, queries []Query, callback QueryPagesCallback) error
}

Client for the read path.

type Reader

type Reader interface {
	IndexTableNames(ctx context.Context) ([]string, error)

	// Reads a single table from index, and passes individual index entries to the processors.
	//
	// All entries with the same TableName, HashValue and RangeValue are passed to the same processor,
	// and all such entries (with different Values) are passed before index entries with different
	// values of HashValue and RangeValue are passed to the same processor.
	//
	// This allows IndexEntryProcessor to find when values for given Hash and Range finish:
	// as soon as new Hash and Range differ from last IndexEntry.
	//
	// Index entries passed to the same processor arrive sorted by HashValue and RangeValue.
	ReadIndexEntries(ctx context.Context, table string, processors []EntryProcessor) error
}

Reader parses index entries and passes them to the IndexEntryProcessor.

type SeriesStoreSchema

type SeriesStoreSchema interface {
	// When doing a read, use these methods to return the list of entries you should query
	GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]Query, error)
	GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]Query, error)
	GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]Query, error)
	FilterReadQueries(queries []Query, shard *astmapper.ShardAnnotation) []Query

	// returns cache key string and []IndexEntry per bucket, matched in order
	GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]Entry, error)
	GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]Entry, error)

	// If the query resulted in series IDs, use this method to find chunks.
	GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error)
	// Returns queries to retrieve all label names of multiple series by id.
	GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error)
}

SeriesStoreSchema is a schema used by seriesStore

func CreateSchema

func CreateSchema(cfg config.PeriodConfig) (SeriesStoreSchema, error)

CreateSchema returns the schema defined by the PeriodConfig

func NewSchemaCaching

func NewSchemaCaching(schema SeriesStoreSchema, cacheOlderThan time.Duration) SeriesStoreSchema

type StoreLimits

type StoreLimits interface {
	CardinalityLimit(string) int
}

StoreLimits helps get Limits specific to Queries for Stores

type TableClient

type TableClient interface {
	ListTables(ctx context.Context) ([]string, error)
	CreateTable(ctx context.Context, desc config.TableDesc) error
	DeleteTable(ctx context.Context, name string) error
	DescribeTable(ctx context.Context, name string) (desc config.TableDesc, isActive bool, err error)
	UpdateTable(ctx context.Context, current, expected config.TableDesc) error
	Stop()
}

TableClient is a client for telling Dynamo what to do with tables.

type TableManager

type TableManager struct {
	services.Service
	// contains filtered or unexported fields
}

TableManager creates and manages the provisioned throughput on DynamoDB tables

func NewTableManager

func NewTableManager(cfg TableManagerConfig, schemaCfg config.SchemaConfig, maxChunkAge time.Duration, tableClient TableClient,
	objectClient BucketClient, extraTables []ExtraTables, registerer prometheus.Registerer,
) (*TableManager, error)

NewTableManager makes a new TableManager

func (*TableManager) SyncTables

func (m *TableManager) SyncTables(ctx context.Context) error

SyncTables will calculate the tables expected to exist, create those that do not and update those that need it. It is exposed for testing.

type TableManagerConfig

type TableManagerConfig struct {
	// Master 'off-switch' for table capacity updates, e.g. when troubleshooting
	ThroughputUpdatesDisabled bool `yaml:"throughput_updates_disabled"`

	// Master 'on-switch' for table retention deletions
	RetentionDeletesEnabled bool `yaml:"retention_deletes_enabled"`

	// How far back tables will be kept before they are deleted
	RetentionPeriod time.Duration `yaml:"-"`
	// This is so that we can accept 1w, 1y in the YAML.
	RetentionPeriodModel model.Duration `yaml:"retention_period"`

	// Period with which the table manager will poll for tables.
	PollInterval time.Duration `yaml:"poll_interval"`

	// duration a table will be created before it is needed.
	CreationGracePeriod time.Duration `yaml:"creation_grace_period"`

	IndexTables config.ProvisionConfig `yaml:"index_tables_provisioning"`
	ChunkTables config.ProvisionConfig `yaml:"chunk_tables_provisioning"`
}

TableManagerConfig holds config for a TableManager

func (*TableManagerConfig) MarshalYAML

func (cfg *TableManagerConfig) MarshalYAML() (interface{}, error)

MarshalYAML implements the yaml.Marshaler interface. To support RetentionPeriod.

func (*TableManagerConfig) RegisterFlags

func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*TableManagerConfig) UnmarshalYAML

func (cfg *TableManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements the yaml.Unmarshaler interface. To support RetentionPeriod.

func (*TableManagerConfig) Validate

func (cfg *TableManagerConfig) Validate() error

Validate validates the config.

type WriteBatch

type WriteBatch interface {
	Add(tableName, hashValue string, rangeValue []byte, value []byte)
	Delete(tableName, hashValue string, rangeValue []byte)
}

WriteBatch represents a batch of writes.

type WriteClient

type WriteClient interface {
	NewWriteBatch() WriteBatch
	BatchWrite(context.Context, WriteBatch) error
}

Client for the write path.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL