Documentation ¶
Index ¶
- Constants
- Variables
- func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error)
- func ExpectTables(ctx context.Context, client TableClient, expected []TableDesc) error
- func FindSetMatches(pattern string) []string
- type ActiveTableProvisionConfig
- type AutoScalingConfig
- type BaseSchema
- type Bucket
- type BucketClient
- type CacheGenNumLoader
- type CardinalityExceededError
- type Chunk
- func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error
- func (c *Chunk) Encode() error
- func (c *Chunk) EncodeTo(buf *bytes.Buffer) error
- func (c *Chunk) Encoded() ([]byte, error)
- func (c *Chunk) Samples(from, through model.Time) ([]model.SamplePair, error)
- func (c *Chunk) Slice(from, through model.Time) (*Chunk, error)
- type Client
- type CompositeStore
- func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, ...) error
- func (c CompositeStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, ...) error
- func (c CompositeStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, ...) error
- func (c CompositeStore) Get(ctx context.Context, userID string, from, through model.Time, ...) ([]Chunk, error)
- func (c CompositeStore) GetChunkFetcher(tm model.Time) *Fetcher
- func (c CompositeStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, ...) ([][]Chunk, []*Fetcher, error)
- func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, ...) ([]string, error)
- func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, ...) ([]string, error)
- func (c CompositeStore) Put(ctx context.Context, chunks []Chunk) error
- func (c CompositeStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
- func (c CompositeStore) Stop()
- type DayTime
- type DecodeContext
- type ExtraTables
- type Fetcher
- type InactiveTableProvisionConfig
- type IndexClient
- type IndexEntry
- type IndexEntryProcessor
- type IndexQuery
- type IndexReader
- type MockStorage
- func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error
- func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error
- func (m *MockStorage) DeleteChunk(ctx context.Context, userID, chunkID string) error
- func (m *MockStorage) DeleteObject(ctx context.Context, objectKey string) error
- func (m *MockStorage) DeleteTable(_ context.Context, name string) error
- func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error)
- func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, error)
- func (m *MockStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
- func (m *MockStorage) GetObjectCount() int
- func (m *MockStorage) GetSortedObjectKeys() []string
- func (m *MockStorage) IsChunkNotFoundErr(err error) bool
- func (m *MockStorage) IsObjectNotFoundErr(err error) bool
- func (m *MockStorage) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error)
- func (m *MockStorage) ListTables(_ context.Context) ([]string, error)
- func (m *MockStorage) NewWriteBatch() WriteBatch
- func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error
- func (m *MockStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
- func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, ...) error
- func (m *MockStorage) SetMode(mode MockStorageMode)
- func (*MockStorage) Stop()
- func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error
- type MockStorageMode
- type ObjectAndIndexClient
- type ObjectClient
- type PeriodConfig
- type PeriodicTableConfig
- type ProvisionConfig
- type QueryError
- type ReadBatch
- type ReadBatchIterator
- type SchemaConfig
- func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error)
- func (cfg SchemaConfig) ExternalKey(chunk Chunk) string
- func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig))
- func (cfg *SchemaConfig) Load() error
- func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet)
- func (cfg SchemaConfig) SchemaForTime(t model.Time) (PeriodConfig, error)
- func (cfg *SchemaConfig) Validate() error
- func (cfg SchemaConfig) VersionForChunk(c Chunk) int
- type SeriesStoreSchema
- type StorageCommonPrefix
- type StorageObject
- type Store
- type StoreConfig
- type StoreLimits
- type StoreSchema
- type TableClient
- type TableDesc
- type TableManager
- type TableManagerConfig
- type Tags
- type UniqueStrings
- type WriteBatch
Constants ¶
const ( ErrInvalidChecksum = errs.Error("invalid chunk checksum") ErrWrongMetadata = errs.Error("wrong chunk metadata") ErrMetadataLength = errs.Error("chunk metadata wrong length") ErrDataLength = errs.Error("chunk data wrong length") ErrSliceOutOfRange = errs.Error("chunk can't be sliced out of its data range") )
const ( MockStorageModeReadWrite = 0 MockStorageModeReadOnly = 1 MockStorageModeWriteOnly = 2 )
Variables ¶
var ( ErrQueryMustContainMetricName = QueryError("query must contain metric name") ErrMetricNameLabelMissing = errors.New("metric name label missing") ErrParialDeleteChunkNoOverlap = errors.New("interval for partial deletion has not overlap with chunk interval") )
var ( // ErrMethodNotImplemented when any of the storage clients do not implement a method ErrMethodNotImplemented = errors.New("method is not implemented") // ErrStorageObjectNotFound when object storage does not have requested object ErrStorageObjectNotFound = errors.New("object not found in storage") )
var BenchmarkLabels = labels.Labels{ {Name: model.MetricNameLabel, Value: "container_cpu_usage_seconds_total"}, {Name: "beta_kubernetes_io_arch", Value: "amd64"}, {Name: "beta_kubernetes_io_instance_type", Value: "c3.somesize"}, {Name: "beta_kubernetes_io_os", Value: "linux"}, {Name: "container_name", Value: "some-name"}, {Name: "cpu", Value: "cpu01"}, {Name: "failure_domain_beta_kubernetes_io_region", Value: "somewhere-1"}, {Name: "failure_domain_beta_kubernetes_io_zone", Value: "somewhere-1b"}, {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, {Name: "job", Value: "kubernetes-cadvisor"}, {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, {Name: "monitor", Value: "prod"}, {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, {Name: "namespace", Value: "kube-system"}, {Name: "pod_name", Value: "some-other-name-5j8s8"}, }
BenchmarkLabels is a real example from Kubernetes' embedded cAdvisor metrics, lightly obfuscated
var ( // ErrNotSupported when a schema doesn't support that particular lookup. ErrNotSupported = errors.New("not supported") )
Functions ¶
func ChunksToMatrix ¶
func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error)
ChunksToMatrix converts a set of chunks to a model.Matrix.
func ExpectTables ¶
func ExpectTables(ctx context.Context, client TableClient, expected []TableDesc) error
ExpectTables compares existing tables to an expected set of tables. Exposed for testing,
func FindSetMatches ¶
FindSetMatches returns list of values that can be equality matched on. copied from Prometheus querier.go, removed check for Prometheus wrapper.
Types ¶
type ActiveTableProvisionConfig ¶
type ActiveTableProvisionConfig struct { ProvisionedThroughputOnDemandMode bool `yaml:"enable_ondemand_throughput_mode"` ProvisionedWriteThroughput int64 `yaml:"provisioned_write_throughput"` ProvisionedReadThroughput int64 `yaml:"provisioned_read_throughput"` WriteScale AutoScalingConfig `yaml:"write_scale"` ReadScale AutoScalingConfig `yaml:"read_scale"` }
func (ActiveTableProvisionConfig) BuildTableDesc ¶
func (cfg ActiveTableProvisionConfig) BuildTableDesc(tableName string, tags Tags) TableDesc
func (*ActiveTableProvisionConfig) RegisterFlags ¶
func (cfg *ActiveTableProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet. Make sure defaults are set in the respective fields before calling RegisterFlags.
type AutoScalingConfig ¶
type AutoScalingConfig struct { Enabled bool `yaml:"enabled"` RoleARN string `yaml:"role_arn"` MinCapacity int64 `yaml:"min_capacity"` MaxCapacity int64 `yaml:"max_capacity"` OutCooldown int64 `yaml:"out_cooldown"` InCooldown int64 `yaml:"in_cooldown"` TargetValue float64 `yaml:"target"` }
AutoScalingConfig for DynamoDB tables.
func (*AutoScalingConfig) RegisterFlags ¶
func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type BaseSchema ¶
type BaseSchema interface { // When doing a read, use these methods to return the list of entries you should query GetReadQueriesForMetric(from, through model.Time, userID, metricName string) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID, metricName, labelName string) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID, metricName, labelName, labelValue string) ([]IndexQuery, error) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery }
BasicSchema has operation shared between StoreSchema and SeriesStoreSchema
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket describes a range of time with a tableName and hashKey
type BucketClient ¶
BucketClient is used to enforce retention on chunk buckets.
type CacheGenNumLoader ¶
type CardinalityExceededError ¶
CardinalityExceededError is returned when the user reads a row that is too large.
func (CardinalityExceededError) Error ¶
func (e CardinalityExceededError) Error() string
type Chunk ¶
type Chunk struct { // These two fields will be missing from older chunks (as will the hash). // On fetch we will initialise these fields from the DynamoDB key. Fingerprint model.Fingerprint `json:"fingerprint"` UserID string `json:"userID"` // These fields will be in all chunks, including old ones. From model.Time `json:"from"` Through model.Time `json:"through"` Metric labels.Labels `json:"metric"` // The hash is not written to the external storage either. We use // crc32, Castagnoli table. See http://www.evanjones.ca/crc32c.html. // For old chunks, ChecksumSet will be false. ChecksumSet bool `json:"-"` Checksum uint32 `json:"-"` // We never use Delta encoding (the zero value), so if this entry is // missing, we default to DoubleDelta. Encoding prom_chunk.Encoding `json:"encoding"` Data prom_chunk.Chunk `json:"-"` // contains filtered or unexported fields }
Chunk contains encoded timeseries data
func NewChunk ¶
func NewChunk(userID string, fp model.Fingerprint, metric labels.Labels, c prom_chunk.Chunk, from, through model.Time) Chunk
NewChunk creates a new chunk
func ParseExternalKey ¶
ParseExternalKey is used to construct a partially-populated chunk from the key in DynamoDB. This chunk can then be used to calculate the key needed to fetch the Chunk data from Memcache/S3, and then fully populate the chunk with decode().
Pre-checksums, the keys written to DynamoDB looked like `<fingerprint>:<start time>:<end time>` (aka the ID), and the key for memcache and S3 was `<user id>/<fingerprint>:<start time>:<end time>. Finger prints and times were written in base-10.
Post-checksums, externals keys become the same across DynamoDB, Memcache and S3. Numbers become hex encoded. Keys look like: `<user id>/<fingerprint>:<start time>:<end time>:<checksum>`.
v12+, fingerprint is now a prefix to support better read and write request parallelization: `<user>/<fprint>/<start>:<end>:<checksum>`
func (*Chunk) Decode ¶
func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error
Decode the chunk from the given buffer, and confirm the chunk is the one we expected.
type Client ¶
type Client interface { Stop() PutChunks(ctx context.Context, chunks []Chunk) error GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) DeleteChunk(ctx context.Context, userID, chunkID string) error IsChunkNotFoundErr(err error) bool }
Client is for storing and retrieving chunks.
type CompositeStore ¶
type CompositeStore struct {
// contains filtered or unexported fields
}
CompositeStore is a Store which delegates to various stores depending on when they were activated.
func NewCompositeStore ¶
func NewCompositeStore(cacheGenNumLoader CacheGenNumLoader) CompositeStore
NewCompositeStore creates a new Store which delegates to different stores depending on time.
func (*CompositeStore) AddPeriod ¶
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error
AddPeriod adds the configuration for a period of time to the CompositeStore
func (CompositeStore) DeleteChunk ¶
func (c CompositeStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error
DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage. It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk
func (CompositeStore) DeleteSeriesIDs ¶
func (c CompositeStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error
DeleteSeriesIDs deletes series IDs from index in series store
func (CompositeStore) GetChunkFetcher ¶
func (CompositeStore) GetChunkRefs ¶
func (CompositeStore) LabelNamesForMetricName ¶
func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
LabelNamesForMetricName retrieves all label names for a metric name.
func (CompositeStore) LabelValuesForMetricName ¶
func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName, labelName string, matchers ...*labels.Matcher) ([]string, error)
LabelValuesForMetricName retrieves all label values for a single label name and metric name.
type DayTime ¶
DayTime is a model.Time what holds day-aligned values, and marshals to/from YAML in YYYY-MM-DD format.
func (DayTime) MarshalYAML ¶
MarshalYAML implements yaml.Marshaller.
func (*DayTime) UnmarshalYAML ¶
UnmarshalYAML implements yaml.Unmarshaller.
type DecodeContext ¶
type DecodeContext struct {
// contains filtered or unexported fields
}
DecodeContext holds data that can be re-used between decodes of different chunks
func NewDecodeContext ¶
func NewDecodeContext() *DecodeContext
NewDecodeContext creates a new, blank, DecodeContext
type ExtraTables ¶
type ExtraTables struct { TableClient TableClient Tables []TableDesc }
ExtraTables holds the list of tables that TableManager has to manage using a TableClient. This is useful for managing tables other than Chunk and Index tables.
type Fetcher ¶
type Fetcher struct {
// contains filtered or unexported fields
}
Fetcher deals with fetching chunk contents from the cache/store, and writing back any misses to the cache. Also responsible for decoding chunks from the cache, in parallel.
func NewChunkFetcher ¶
func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, schema SchemaConfig, storage Client, maxAsyncConcurrency, maxAsyncBufferSize int) (*Fetcher, error)
NewChunkFetcher makes a new ChunkFetcher.
func (*Fetcher) FetchChunks ¶
FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
func (*Fetcher) IsChunkNotFoundErr ¶
type InactiveTableProvisionConfig ¶
type InactiveTableProvisionConfig struct { InactiveThroughputOnDemandMode bool `yaml:"enable_inactive_throughput_on_demand_mode"` InactiveWriteThroughput int64 `yaml:"inactive_write_throughput"` InactiveReadThroughput int64 `yaml:"inactive_read_throughput"` InactiveWriteScale AutoScalingConfig `yaml:"inactive_write_scale"` InactiveReadScale AutoScalingConfig `yaml:"inactive_read_scale"` InactiveWriteScaleLastN int64 `yaml:"inactive_write_scale_lastn"` InactiveReadScaleLastN int64 `yaml:"inactive_read_scale_lastn"` }
func (InactiveTableProvisionConfig) BuildTableDesc ¶
func (cfg InactiveTableProvisionConfig) BuildTableDesc(tableName string, tags Tags, disableAutoscale bool) TableDesc
func (*InactiveTableProvisionConfig) RegisterFlags ¶
func (cfg *InactiveTableProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type IndexClient ¶
type IndexClient interface { Stop() // For the write path. NewWriteBatch() WriteBatch BatchWrite(context.Context, WriteBatch) error // For the read path. QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error }
IndexClient is a client for the storage of the index (e.g. DynamoDB or Bigtable).
type IndexEntry ¶
type IndexEntry struct { TableName string HashValue string // For writes, RangeValue will always be set. RangeValue []byte // New for v6 schema, label value is not written as part of the range key. Value []byte }
IndexEntry describes an entry in the chunk index
type IndexEntryProcessor ¶
type IndexEntryProcessor interface { ProcessIndexEntry(indexEntry IndexEntry) error // Will this user be accepted by the processor? AcceptUser(user string) bool // Called at the end of reading of index entries. Flush() error }
IndexEntryProcessor receives index entries from a table.
type IndexQuery ¶
type IndexQuery struct { TableName string HashValue string // One of RangeValuePrefix or RangeValueStart might be set: // - If RangeValuePrefix is not nil, must read all keys with that prefix. // - If RangeValueStart is not nil, must read all keys from there onwards. // - If neither is set, must read all keys for that row. // RangeValueStart should only be used for querying Chunk IDs. // If this is going to change then please take care of func isChunksQuery in pkg/chunk/storage/caching_index_client.go which relies on it. RangeValuePrefix []byte RangeValueStart []byte // Filters for querying ValueEqual []byte // If the result of this lookup is immutable or not (for caching). Immutable bool }
IndexQuery describes a query for entries
type IndexReader ¶
type IndexReader interface { IndexTableNames(ctx context.Context) ([]string, error) // Reads a single table from index, and passes individual index entries to the processors. // // All entries with the same TableName, HashValue and RangeValue are passed to the same processor, // and all such entries (with different Values) are passed before index entries with different // values of HashValue and RangeValue are passed to the same processor. // // This allows IndexEntryProcessor to find when values for given Hash and Range finish: // as soon as new Hash and Range differ from last IndexEntry. // // Index entries passed to the same processor arrive sorted by HashValue and RangeValue. ReadIndexEntries(ctx context.Context, table string, processors []IndexEntryProcessor) error }
IndexReader parses index entries and passes them to the IndexEntryProcessor.
type MockStorage ¶
type MockStorage struct {
// contains filtered or unexported fields
}
MockStorage is a fake in-memory StorageClient.
func (*MockStorage) BatchWrite ¶
func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error
BatchWrite implements StorageClient.
func (*MockStorage) CreateTable ¶
func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error
CreateTable implements StorageClient.
func (*MockStorage) DeleteChunk ¶
func (m *MockStorage) DeleteChunk(ctx context.Context, userID, chunkID string) error
DeleteChunk implements StorageClient.
func (*MockStorage) DeleteObject ¶
func (m *MockStorage) DeleteObject(ctx context.Context, objectKey string) error
func (*MockStorage) DeleteTable ¶
func (m *MockStorage) DeleteTable(_ context.Context, name string) error
DeleteTable implements StorageClient.
func (*MockStorage) DescribeTable ¶
func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error)
DescribeTable implements StorageClient.
func (*MockStorage) GetObject ¶
func (m *MockStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
func (*MockStorage) GetObjectCount ¶
func (m *MockStorage) GetObjectCount() int
func (*MockStorage) GetSortedObjectKeys ¶
func (m *MockStorage) GetSortedObjectKeys() []string
func (*MockStorage) IsChunkNotFoundErr ¶
func (m *MockStorage) IsChunkNotFoundErr(err error) bool
func (*MockStorage) IsObjectNotFoundErr ¶
func (m *MockStorage) IsObjectNotFoundErr(err error) bool
func (*MockStorage) List ¶
func (m *MockStorage) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error)
List implements chunk.ObjectClient.
func (*MockStorage) ListTables ¶
func (m *MockStorage) ListTables(_ context.Context) ([]string, error)
ListTables implements StorageClient.
func (*MockStorage) NewWriteBatch ¶
func (m *MockStorage) NewWriteBatch() WriteBatch
NewWriteBatch implements StorageClient.
func (*MockStorage) PutChunks ¶
func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error
PutChunks implements StorageClient.
func (*MockStorage) PutObject ¶
func (m *MockStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
func (*MockStorage) QueryPages ¶
func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error
QueryPages implements StorageClient.
func (*MockStorage) SetMode ¶
func (m *MockStorage) SetMode(mode MockStorageMode)
func (*MockStorage) UpdateTable ¶
func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error
UpdateTable implements StorageClient.
type MockStorageMode ¶
type MockStorageMode int
type ObjectAndIndexClient ¶
type ObjectAndIndexClient interface {
PutChunksAndIndex(ctx context.Context, chunks []Chunk, index WriteBatch) error
}
ObjectAndIndexClient allows optimisations where the same client handles both
type ObjectClient ¶
type ObjectClient interface { PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error // NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak. GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) // List objects with given prefix. // // If delimiter is empty, all objects are returned, even if they are in nested in "subdirectories". // If delimiter is not empty, it is used to compute common prefixes ("subdirectories"), // and objects containing delimiter in the name will not be returned in the result. // // For example, if the prefix is "notes/" and the delimiter is a slash (/) as in "notes/summer/july", the common prefix is "notes/summer/". // Common prefixes will always end with passed delimiter. // // Keys of returned storage objects have given prefix. List(ctx context.Context, prefix string, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) DeleteObject(ctx context.Context, objectKey string) error IsObjectNotFoundErr(err error) bool Stop() }
ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/...)
type PeriodConfig ¶
type PeriodConfig struct { From DayTime `yaml:"from"` // used when working with config IndexType string `yaml:"store"` // type of index client to use. ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store. Schema string `yaml:"schema"` IndexTables PeriodicTableConfig `yaml:"index"` ChunkTables PeriodicTableConfig `yaml:"chunks"` RowShards uint32 `yaml:"row_shards"` // contains filtered or unexported fields }
PeriodConfig defines the schema and tables to use for a period of time
func (PeriodConfig) CreateSchema ¶
func (cfg PeriodConfig) CreateSchema() (BaseSchema, error)
CreateSchema returns the schema defined by the PeriodConfig
func (*PeriodConfig) UnmarshalYAML ¶
func (cfg *PeriodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implements yaml.Unmarshaller.
func (*PeriodConfig) VersionAsInt ¶
func (cfg *PeriodConfig) VersionAsInt() (int, error)
type PeriodicTableConfig ¶
PeriodicTableConfig is configuration for a set of time-sharded tables.
func (PeriodicTableConfig) MarshalYAML ¶
func (cfg PeriodicTableConfig) MarshalYAML() (interface{}, error)
MarshalYAML implements the yaml.Marshaler interface.
func (*PeriodicTableConfig) TableFor ¶
func (cfg *PeriodicTableConfig) TableFor(t model.Time) string
TableFor calculates the table shard for a given point in time.
func (*PeriodicTableConfig) UnmarshalYAML ¶
func (cfg *PeriodicTableConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface.
type ProvisionConfig ¶
type ProvisionConfig struct { ActiveTableProvisionConfig `yaml:",inline"` InactiveTableProvisionConfig `yaml:",inline"` }
ProvisionConfig holds config for provisioning capacity for index and chunk tables (on DynamoDB for now)
func (*ProvisionConfig) RegisterFlags ¶
func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type QueryError ¶
type QueryError string
Query errors are to be treated as user errors, rather than storage errors.
func (QueryError) Error ¶
func (e QueryError) Error() string
type ReadBatch ¶
type ReadBatch interface {
Iterator() ReadBatchIterator
}
ReadBatch represents the results of a QueryPages.
type ReadBatchIterator ¶
ReadBatchIterator is an iterator over a ReadBatch.
type SchemaConfig ¶
type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` // contains filtered or unexported fields }
SchemaConfig contains the config for our chunk index schemas
func DefaultSchemaConfig ¶
func DefaultSchemaConfig(store, schema string, from model.Time) SchemaConfig
DefaultSchemaConfig creates a simple schema config for testing
func (SchemaConfig) ChunkTableFor ¶
func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error)
ChunkTableFor calculates the chunk table shard for a given point in time.
func (SchemaConfig) ExternalKey ¶
func (cfg SchemaConfig) ExternalKey(chunk Chunk) string
Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From
func (*SchemaConfig) ForEachAfter ¶
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig))
ForEachAfter will call f() on every entry after t, splitting entries if necessary so there is an entry starting at t
func (*SchemaConfig) Load ¶
func (cfg *SchemaConfig) Load() error
Load the yaml file, or build the config from legacy command-line flags
func (*SchemaConfig) RegisterFlags ¶
func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (SchemaConfig) SchemaForTime ¶
func (cfg SchemaConfig) SchemaForTime(t model.Time) (PeriodConfig, error)
SchemaForTime returns the Schema PeriodConfig to use for a given point in time.
func (*SchemaConfig) Validate ¶
func (cfg *SchemaConfig) Validate() error
Validate the schema config and returns an error if the validation doesn't pass
func (SchemaConfig) VersionForChunk ¶
func (cfg SchemaConfig) VersionForChunk(c Chunk) int
VersionForChunk will return the schema version associated with the `From` timestamp of a chunk. The schema and chunk must be valid+compatible as the errors are not checked.
type SeriesStoreSchema ¶
type SeriesStoreSchema interface { BaseSchema // returns cache key string and []IndexEntry per bucket, matched in order GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) GetChunkWriteEntries(from, through model.Time, userID, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) // If the query resulted in series IDs, use this method to find chunks. GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) // Returns queries to retrieve all label names of multiple series by id. GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) // GetSeriesDeleteEntries returns IndexEntry's for deleting SeriesIDs from SeriesStore. // Since SeriesIDs are created per bucket, it makes sure that we don't include series entries which are in use by verifying using hasChunksForIntervalFunc i.e // It checks first and last buckets covered by the time interval to see if a SeriesID still has chunks in the store, // if yes then it doesn't include IndexEntry's for that bucket for deletion. GetSeriesDeleteEntries(from, through model.Time, userID string, metric labels.Labels, hasChunksForIntervalFunc hasChunksForIntervalFunc) ([]IndexEntry, error) }
SeriesStoreSchema is a schema used by seriesStore
type StorageCommonPrefix ¶
type StorageCommonPrefix string
StorageCommonPrefix represents a common prefix aka a synthetic directory in Object Store. It is guaranteed to always end with delimiter passed to List method.
type StorageObject ¶
StorageObject represents an object being stored in an Object Store
type Store ¶
type Store interface { Put(ctx context.Context, chunks []Chunk) error PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) // GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk), // using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName, labelName string, matchers ...*labels.Matcher) ([]string, error) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) GetChunkFetcher(tm model.Time) *Fetcher // DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage. // It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error // DeleteSeriesIDs is only relevant for SeriesStore. DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error Stop() }
Store for chunks.
type StoreConfig ¶
type StoreConfig struct { ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"` WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config"` CacheLookupsOlderThan model.Duration `yaml:"cache_lookups_older_than"` // When DisableIndexDeduplication is true and chunk is already there in cache, only index would be written to the store and not chunk. DisableIndexDeduplication bool `yaml:"-"` // contains filtered or unexported fields }
StoreConfig specifies config for a ChunkStore
func (*StoreConfig) RegisterFlags ¶
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
type StoreLimits ¶
type StoreLimits interface { MaxChunksPerQueryFromStore(userID string) int MaxQueryLength(userID string) time.Duration }
StoreLimits helps get Limits specific to Queries for Stores
type StoreSchema ¶
type StoreSchema interface { BaseSchema // When doing a write, use this method to return the list of entries you should write to. GetWriteEntries(from, through model.Time, userID, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) }
StoreSchema is a schema used by store
type TableClient ¶
type TableClient interface { ListTables(ctx context.Context) ([]string, error) CreateTable(ctx context.Context, desc TableDesc) error DeleteTable(ctx context.Context, name string) error DescribeTable(ctx context.Context, name string) (desc TableDesc, isActive bool, err error) UpdateTable(ctx context.Context, current, expected TableDesc) error Stop() }
TableClient is a client for telling Dynamo what to do with tables.
type TableDesc ¶
type TableDesc struct { Name string UseOnDemandIOMode bool ProvisionedRead int64 ProvisionedWrite int64 Tags Tags WriteScale AutoScalingConfig ReadScale AutoScalingConfig }
TableDesc describes a table.
type TableManager ¶
TableManager creates and manages the provisioned throughput on DynamoDB tables
func NewTableManager ¶
func NewTableManager(cfg TableManagerConfig, schemaCfg SchemaConfig, maxChunkAge time.Duration, tableClient TableClient, objectClient BucketClient, extraTables []ExtraTables, registerer prometheus.Registerer) (*TableManager, error)
NewTableManager makes a new TableManager
func (*TableManager) SyncTables ¶
func (m *TableManager) SyncTables(ctx context.Context) error
SyncTables will calculate the tables expected to exist, create those that do not and update those that need it. It is exposed for testing.
type TableManagerConfig ¶
type TableManagerConfig struct { // Master 'off-switch' for table capacity updates, e.g. when troubleshooting ThroughputUpdatesDisabled bool `yaml:"throughput_updates_disabled"` // Master 'on-switch' for table retention deletions RetentionDeletesEnabled bool `yaml:"retention_deletes_enabled"` // How far back tables will be kept before they are deleted RetentionPeriod time.Duration `yaml:"-"` // This is so that we can accept 1w, 1y in the YAML. RetentionPeriodModel model.Duration `yaml:"retention_period"` // Period with which the table manager will poll for tables. PollInterval time.Duration `yaml:"poll_interval"` // duration a table will be created before it is needed. CreationGracePeriod time.Duration `yaml:"creation_grace_period"` IndexTables ProvisionConfig `yaml:"index_tables_provisioning"` ChunkTables ProvisionConfig `yaml:"chunk_tables_provisioning"` }
TableManagerConfig holds config for a TableManager
func (*TableManagerConfig) MarshalYAML ¶
func (cfg *TableManagerConfig) MarshalYAML() (interface{}, error)
MarshalYAML implements the yaml.Marshaler interface. To support RetentionPeriod.
func (*TableManagerConfig) RegisterFlags ¶
func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (*TableManagerConfig) UnmarshalYAML ¶
func (cfg *TableManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error
UnmarshalYAML implements the yaml.Unmarshaler interface. To support RetentionPeriod.
func (*TableManagerConfig) Validate ¶
func (cfg *TableManagerConfig) Validate() error
Validate validates the config.
type Tags ¶
Tags is a string-string map that implements flag.Value.
func (*Tags) UnmarshalYAML ¶
UnmarshalYAML implements yaml.Unmarshaler.
type UniqueStrings ¶
type UniqueStrings struct {
// contains filtered or unexported fields
}
UniqueStrings keeps a slice of unique strings.
func NewUniqueStrings ¶
func NewUniqueStrings(sizeHint int) UniqueStrings
NewUniqueStrings returns a UniqueStrings instance with a pre-allocated result buffer.
func (*UniqueStrings) Add ¶
func (us *UniqueStrings) Add(strings ...string)
Add adds a new string, dropping duplicates.
func (UniqueStrings) Strings ¶
func (us UniqueStrings) Strings() []string
Strings returns the sorted sliced of unique strings.
Source Files ¶
- bucket_client.go
- chunk.go
- chunk_store.go
- chunk_store_utils.go
- composite_store.go
- fixtures.go
- index_reader.go
- inmemory_storage_client.go
- json_helpers.go
- opts.go
- schema.go
- schema_caching.go
- schema_config.go
- schema_util.go
- series_store.go
- storage_client.go
- strings.go
- table_client.go
- table_manager.go
- table_provisioning.go
- tags.go