Documentation ¶
Index ¶
- Variables
- func ExpectTables(ctx context.Context, client TableClient, expected []config.TableDesc) error
- func ParseChunkTimeRangeValue(rangeValue []byte, value []byte) (chunkID string, labelValue model.LabelValue, err error)
- func QueryKey(q Query) string
- type Bucket
- type BucketClient
- type Bytes
- type CacheEntry
- func (*CacheEntry) Descriptor() ([]byte, []int)
- func (this *CacheEntry) Equal(that interface{}) bool
- func (this *CacheEntry) GoString() string
- func (m *CacheEntry) Marshal() (dAtA []byte, err error)
- func (m *CacheEntry) MarshalTo(dAtA []byte) (int, error)
- func (m *CacheEntry) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*CacheEntry) ProtoMessage()
- func (m *CacheEntry) Reset()
- func (m *CacheEntry) Size() (n int)
- func (this *CacheEntry) String() string
- func (m *CacheEntry) Unmarshal(dAtA []byte) error
- func (m *CacheEntry) XXX_DiscardUnknown()
- func (m *CacheEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *CacheEntry) XXX_Merge(src proto.Message)
- func (m *CacheEntry) XXX_Size() int
- func (m *CacheEntry) XXX_Unmarshal(b []byte) error
- type CardinalityExceededError
- type Client
- type Entry
- type EntryProcessor
- type ExtraTables
- type Query
- type QueryPagesCallback
- type ReadBatch
- func (*ReadBatch) Descriptor() ([]byte, []int)
- func (this *ReadBatch) Equal(that interface{}) bool
- func (m *ReadBatch) GetCardinality() int32
- func (m *ReadBatch) GetEntries() []CacheEntry
- func (m *ReadBatch) GetExpiry() int64
- func (m *ReadBatch) GetKey() string
- func (this *ReadBatch) GoString() string
- func (b ReadBatch) Iterator() ReadBatchIterator
- func (m *ReadBatch) Marshal() (dAtA []byte, err error)
- func (m *ReadBatch) MarshalTo(dAtA []byte) (int, error)
- func (m *ReadBatch) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ReadBatch) ProtoMessage()
- func (m *ReadBatch) Reset()
- func (m *ReadBatch) Size() (n int)
- func (this *ReadBatch) String() string
- func (m *ReadBatch) Unmarshal(dAtA []byte) error
- func (m *ReadBatch) XXX_DiscardUnknown()
- func (m *ReadBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ReadBatch) XXX_Merge(src proto.Message)
- func (m *ReadBatch) XXX_Size() int
- func (m *ReadBatch) XXX_Unmarshal(b []byte) error
- type ReadBatchIterator
- type ReadBatchResult
- type ReadClient
- type Reader
- type SeriesStoreSchema
- type StoreLimits
- type TableClient
- type TableManager
- type TableManagerConfig
- type WriteBatch
- type WriteClient
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthCachingIndexClient = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowCachingIndexClient = fmt.Errorf("proto: integer overflow") )
var ( // ErrNotSupported when a schema doesn't support that particular lookup. ErrNotSupported = errors.New("not supported") ErrMetricNameLabelMissing = errors.New("metric name label missing") )
Functions ¶
func ExpectTables ¶
ExpectTables compares existing tables to an expected set of tables. Exposed for testing,
func ParseChunkTimeRangeValue ¶
func ParseChunkTimeRangeValue(rangeValue []byte, value []byte) ( chunkID string, labelValue model.LabelValue, err error, )
ParseChunkTimeRangeValue returns the chunkID (seriesID since v9) and labelValue for chunk time range values.
Types ¶
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket describes a range of time with a tableName and hashKey
type BucketClient ¶
BucketClient is used to enforce retention on chunk buckets.
type Bytes ¶
type Bytes []byte
Bytes exists to stop proto copying the byte array
type CacheEntry ¶
type CacheEntry struct { Column Bytes `protobuf:"bytes,1,opt,name=Column,proto3,customtype=Bytes" json:"Column"` Value Bytes `protobuf:"bytes,2,opt,name=Value,proto3,customtype=Bytes" json:"Value"` }
func (*CacheEntry) Descriptor ¶
func (*CacheEntry) Descriptor() ([]byte, []int)
func (*CacheEntry) Equal ¶
func (this *CacheEntry) Equal(that interface{}) bool
func (*CacheEntry) GoString ¶
func (this *CacheEntry) GoString() string
func (*CacheEntry) Marshal ¶
func (m *CacheEntry) Marshal() (dAtA []byte, err error)
func (*CacheEntry) MarshalToSizedBuffer ¶
func (m *CacheEntry) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*CacheEntry) ProtoMessage ¶
func (*CacheEntry) ProtoMessage()
func (*CacheEntry) Reset ¶
func (m *CacheEntry) Reset()
func (*CacheEntry) Size ¶
func (m *CacheEntry) Size() (n int)
func (*CacheEntry) String ¶
func (this *CacheEntry) String() string
func (*CacheEntry) Unmarshal ¶
func (m *CacheEntry) Unmarshal(dAtA []byte) error
func (*CacheEntry) XXX_DiscardUnknown ¶
func (m *CacheEntry) XXX_DiscardUnknown()
func (*CacheEntry) XXX_Marshal ¶
func (m *CacheEntry) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*CacheEntry) XXX_Merge ¶
func (m *CacheEntry) XXX_Merge(src proto.Message)
func (*CacheEntry) XXX_Size ¶
func (m *CacheEntry) XXX_Size() int
func (*CacheEntry) XXX_Unmarshal ¶
func (m *CacheEntry) XXX_Unmarshal(b []byte) error
type CardinalityExceededError ¶
CardinalityExceededError is returned when the user reads a row that is too large.
func (CardinalityExceededError) Error ¶
func (e CardinalityExceededError) Error() string
type Client ¶
type Client interface { ReadClient WriteClient Stop() }
Client is a client for the storage of the index (e.g. DynamoDB or Bigtable).
type Entry ¶
type Entry struct { TableName string HashValue string // For writes, RangeValue will always be set. RangeValue []byte // New for v6 schema, label value is not written as part of the range key. Value []byte }
Entry describes an entry in the chunk index
type EntryProcessor ¶
type EntryProcessor interface { ProcessIndexEntry(indexEntry Entry) error // Will this user be accepted by the processor? AcceptUser(user string) bool // Called at the end of reading of index entries. Flush() error }
EntryProcessor receives index entries from a table.
type ExtraTables ¶
type ExtraTables struct { TableClient TableClient Tables []config.TableDesc }
ExtraTables holds the list of tables that TableManager has to manage using a TableClient. This is useful for managing tables other than Chunk and Index tables.
type Query ¶
type Query struct { TableName string HashValue string // One of RangeValuePrefix or RangeValueStart might be set: // - If RangeValuePrefix is not nil, must read all keys with that prefix. // - If RangeValueStart is not nil, must read all keys from there onwards. // - If neither is set, must read all keys for that row. // RangeValueStart should only be used for querying Chunk IDs. // If this is going to change then please take care of func isChunksQuery in pkg/chunk/storage/caching_index_client.go which relies on it. RangeValuePrefix []byte RangeValueStart []byte // Filters for querying ValueEqual []byte // If the result of this lookup is immutable or not (for caching). Immutable bool }
Query describes a query for entries
type QueryPagesCallback ¶
type QueryPagesCallback func(Query, ReadBatchResult) bool
QueryPagesCallback from an IndexQuery.
func QueryFilter ¶
func QueryFilter(callback QueryPagesCallback) QueryPagesCallback
QueryFilter wraps a callback to ensure the results are filtered correctly; useful for the cache and Bigtable backend, which only ever fetches the whole row.
type ReadBatch ¶
type ReadBatch struct { Entries []CacheEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries"` Key string `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` // The time at which the key expires. Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"` // The number of entries; used for cardinality limiting. // entries will be empty when this is set. Cardinality int32 `protobuf:"varint,4,opt,name=cardinality,proto3" json:"cardinality,omitempty"` }
func (*ReadBatch) Descriptor ¶
func (*ReadBatch) GetCardinality ¶
func (*ReadBatch) GetEntries ¶
func (m *ReadBatch) GetEntries() []CacheEntry
func (ReadBatch) Iterator ¶
func (b ReadBatch) Iterator() ReadBatchIterator
Iterator implements chunk.ReadBatch.
func (*ReadBatch) MarshalToSizedBuffer ¶
func (*ReadBatch) ProtoMessage ¶
func (*ReadBatch) ProtoMessage()
func (*ReadBatch) XXX_DiscardUnknown ¶
func (m *ReadBatch) XXX_DiscardUnknown()
func (*ReadBatch) XXX_Marshal ¶
func (*ReadBatch) XXX_Unmarshal ¶
type ReadBatchIterator ¶
ReadBatchIterator is an iterator over a ReadBatch.
type ReadBatchResult ¶
type ReadBatchResult interface {
Iterator() ReadBatchIterator
}
ReadBatchResult represents the results of a QueryPages.
type ReadClient ¶
type ReadClient interface {
QueryPages(ctx context.Context, queries []Query, callback QueryPagesCallback) error
}
Client for the read path.
type Reader ¶
type Reader interface { IndexTableNames(ctx context.Context) ([]string, error) // Reads a single table from index, and passes individual index entries to the processors. // // All entries with the same TableName, HashValue and RangeValue are passed to the same processor, // and all such entries (with different Values) are passed before index entries with different // values of HashValue and RangeValue are passed to the same processor. // // This allows IndexEntryProcessor to find when values for given Hash and Range finish: // as soon as new Hash and Range differ from last IndexEntry. // // Index entries passed to the same processor arrive sorted by HashValue and RangeValue. ReadIndexEntries(ctx context.Context, table string, processors []EntryProcessor) error }
Reader parses index entries and passes them to the IndexEntryProcessor.
type SeriesStoreSchema ¶
type SeriesStoreSchema interface { // When doing a read, use these methods to return the list of entries you should query GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]Query, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]Query, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]Query, error) FilterReadQueries(queries []Query, shard *astmapper.ShardAnnotation) []Query // returns cache key string and []IndexEntry per bucket, matched in order GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]Entry, error) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]Entry, error) // If the query resulted in series IDs, use this method to find chunks. GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error) // Returns queries to retrieve all label names of multiple series by id. GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]Query, error) }
SeriesStoreSchema is a schema used by seriesStore
func CreateSchema ¶
func CreateSchema(cfg config.PeriodConfig) (SeriesStoreSchema, error)
CreateSchema returns the schema defined by the PeriodConfig
func NewSchemaCaching ¶
func NewSchemaCaching(schema SeriesStoreSchema, cacheOlderThan time.Duration) SeriesStoreSchema
type StoreLimits ¶
StoreLimits helps get Limits specific to Queries for Stores
type TableClient ¶
type TableClient interface { ListTables(ctx context.Context) ([]string, error) CreateTable(ctx context.Context, desc config.TableDesc) error DeleteTable(ctx context.Context, name string) error DescribeTable(ctx context.Context, name string) (desc config.TableDesc, isActive bool, err error) UpdateTable(ctx context.Context, current, expected config.TableDesc) error Stop() }
TableClient is a client for telling Dynamo what to do with tables.
type TableManager ¶
TableManager creates and manages the provisioned throughput on DynamoDB tables
func NewTableManager ¶
func NewTableManager(cfg TableManagerConfig, schemaCfg config.SchemaConfig, maxChunkAge time.Duration, tableClient TableClient, objectClient BucketClient, extraTables []ExtraTables, registerer prometheus.Registerer, logger log.Logger, ) (*TableManager, error)
NewTableManager makes a new TableManager
func (*TableManager) SyncTables ¶
func (m *TableManager) SyncTables(ctx context.Context) error
SyncTables will calculate the tables expected to exist, create those that do not and update those that need it. It is exposed for testing.
type TableManagerConfig ¶
type TableManagerConfig struct { // Master 'off-switch' for table capacity updates, e.g. when troubleshooting ThroughputUpdatesDisabled bool `yaml:"throughput_updates_disabled"` // Master 'on-switch' for table retention deletions RetentionDeletesEnabled bool `yaml:"retention_deletes_enabled"` // How far back tables will be kept before they are deleted RetentionPeriod time.Duration `yaml:"-"` // This is so that we can accept 1w, 1y in the YAML. RetentionPeriodModel model.Duration `yaml:"retention_period"` // Period with which the table manager will poll for tables. PollInterval time.Duration `yaml:"poll_interval"` // duration a table will be created before it is needed. CreationGracePeriod time.Duration `yaml:"creation_grace_period"` IndexTables config.ProvisionConfig `yaml:"index_tables_provisioning"` ChunkTables config.ProvisionConfig `yaml:"chunk_tables_provisioning"` }
TableManagerConfig holds config for a TableManager
func (*TableManagerConfig) MarshalYAML ¶
func (cfg *TableManagerConfig) MarshalYAML() (interface{}, error)
MarshalYAML implements the yaml.Marshaler interface. To support RetentionPeriod.
func (*TableManagerConfig) RegisterFlags ¶
func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (*TableManagerConfig) UnmarshalYAML ¶
func (cfg *TableManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface. To support RetentionPeriod.
func (*TableManagerConfig) Validate ¶
func (cfg *TableManagerConfig) Validate() error
Validate validates the config.
type WriteBatch ¶
type WriteBatch interface { Add(tableName, hashValue string, rangeValue []byte, value []byte) Delete(tableName, hashValue string, rangeValue []byte) }
WriteBatch represents a batch of writes.
type WriteClient ¶
type WriteClient interface { NewWriteBatch() WriteBatch BatchWrite(context.Context, WriteBatch) error }
Client for the write path.