Documentation ¶
Index ¶
- Constants
- Variables
- func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error)
- func ExpectTables(ctx context.Context, client TableClient, expected []TableDesc) error
- type AutoScalingConfig
- type Bucket
- type BucketClient
- type CardinalityExceededError
- type Chunk
- type CompositeStore
- func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks ObjectClient, ...) error
- func (c CompositeStore) Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
- func (c CompositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
- func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error)
- func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, ...) ([]string, error)
- func (c CompositeStore) Put(ctx context.Context, chunks []Chunk) error
- func (c CompositeStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
- func (c CompositeStore) Stop()
- type DayTime
- type DecodeContext
- type Fetcher
- type IndexClient
- type IndexEntry
- type IndexQuery
- type LegacySchemaConfig
- type MockStorage
- func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error
- func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error
- func (m *MockStorage) DeleteTable(_ context.Context, name string) error
- func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error)
- func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, error)
- func (m *MockStorage) ListTables(_ context.Context) ([]string, error)
- func (m *MockStorage) NewWriteBatch() WriteBatch
- func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error
- func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, ...) error
- func (*MockStorage) Stop()
- func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error
- type ObjectAndIndexClient
- type ObjectClient
- type PeriodConfig
- type PeriodicTableConfig
- type ProvisionConfig
- type ReadBatch
- type ReadBatchIterator
- type Schema
- type SchemaConfig
- type Store
- type StoreConfig
- type TableClient
- type TableDesc
- type TableManager
- type TableManagerConfig
- type Tags
- type WriteBatch
Constants ¶
const ( ErrInvalidChecksum = errs.Error("invalid chunk checksum") ErrWrongMetadata = errs.Error("wrong chunk metadata") ErrMetadataLength = errs.Error("chunk metadata wrong length") ErrDataLength = errs.Error("chunk data wrong length") )
Errors that decode can return
Variables ¶
var BenchmarkLabels = labels.Labels{ {Name: model.MetricNameLabel, Value: "container_cpu_usage_seconds_total"}, {Name: "beta_kubernetes_io_arch", Value: "amd64"}, {Name: "beta_kubernetes_io_instance_type", Value: "c3.somesize"}, {Name: "beta_kubernetes_io_os", Value: "linux"}, {Name: "container_name", Value: "some-name"}, {Name: "cpu", Value: "cpu01"}, {Name: "failure_domain_beta_kubernetes_io_region", Value: "somewhere-1"}, {Name: "failure_domain_beta_kubernetes_io_zone", Value: "somewhere-1b"}, {Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"}, {Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"}, {Name: "instance", Value: "ip-111-11-1-11.ec2.internal"}, {Name: "job", Value: "kubernetes-cadvisor"}, {Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"}, {Name: "monitor", Value: "prod"}, {Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"}, {Name: "namespace", Value: "kube-system"}, {Name: "pod_name", Value: "some-other-name-5j8s8"}, }
BenchmarkLabels is a real example from Kubernetes' embedded cAdvisor metrics, lightly obfuscated
var ( // ErrNotSupported when a schema doesn't support that particular lookup. ErrNotSupported = errors.New("not supported") )
Functions ¶
func ChunksToMatrix ¶
func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error)
ChunksToMatrix converts a set of chunks to a model.Matrix.
func ExpectTables ¶
func ExpectTables(ctx context.Context, client TableClient, expected []TableDesc) error
ExpectTables compares existing tables to an expected set of tables. Exposed for testing,
Types ¶
type AutoScalingConfig ¶
type AutoScalingConfig struct { Enabled bool `yaml:"enabled,omitempty"` RoleARN string `yaml:"role_arn,omitempty"` MinCapacity int64 `yaml:"min_capacity,omitempty"` MaxCapacity int64 `yaml:"max_capacity,omitempty"` OutCooldown int64 `yaml:"out_cooldown,omitempty"` InCooldown int64 `yaml:"in_cooldown,omitempty"` TargetValue float64 `yaml:"target,omitempty"` }
AutoScalingConfig for DynamoDB tables.
func (*AutoScalingConfig) RegisterFlags ¶
func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
Bucket describes a range of time with a tableName and hashKey
type BucketClient ¶
BucketClient is used to enforce retention on chunk buckets.
type CardinalityExceededError ¶
CardinalityExceededError is returned when the user reads a row that is too large.
func (CardinalityExceededError) Error ¶
func (e CardinalityExceededError) Error() string
type Chunk ¶
type Chunk struct { // These two fields will be missing from older chunks (as will the hash). // On fetch we will initialise these fields from the DynamoDB key. Fingerprint model.Fingerprint `json:"fingerprint"` UserID string `json:"userID"` // These fields will be in all chunks, including old ones. From model.Time `json:"from"` Through model.Time `json:"through"` Metric labels.Labels `json:"metric"` // The hash is not written to the external storage either. We use // crc32, Castagnoli table. See http://www.evanjones.ca/crc32c.html. // For old chunks, ChecksumSet will be false. ChecksumSet bool `json:"-"` Checksum uint32 `json:"-"` // We never use Delta encoding (the zero value), so if this entry is // missing, we default to DoubleDelta. Encoding prom_chunk.Encoding `json:"encoding"` Data prom_chunk.Chunk `json:"-"` // contains filtered or unexported fields }
Chunk contains encoded timeseries data
func NewChunk ¶
func NewChunk(userID string, fp model.Fingerprint, metric labels.Labels, c prom_chunk.Chunk, from, through model.Time) Chunk
NewChunk creates a new chunk
func ParseExternalKey ¶
ParseExternalKey is used to construct a partially-populated chunk from the key in DynamoDB. This chunk can then be used to calculate the key needed to fetch the Chunk data from Memcache/S3, and then fully populate the chunk with decode().
Pre-checksums, the keys written to DynamoDB looked like `<fingerprint>:<start time>:<end time>` (aka the ID), and the key for memcache and S3 was `<user id>/<fingerprint>:<start time>:<end time>. Finger prints and times were written in base-10.
Post-checksums, externals keys become the same across DynamoDB, Memcache and S3. Numbers become hex encoded. Keys look like: `<user id>/<fingerprint>:<start time>:<end time>:<checksum>`.
func (*Chunk) Decode ¶
func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error
Decode the chunk from the given buffer, and confirm the chunk is the one we expected.
func (*Chunk) ExternalKey ¶
ExternalKey returns the key you can use to fetch this chunk from external storage. For newer chunks, this key includes a checksum.
type CompositeStore ¶
type CompositeStore struct {
// contains filtered or unexported fields
}
CompositeStore is a Store which delegates to various stores depending on when they were activated.
func NewCompositeStore ¶
func NewCompositeStore() CompositeStore
NewCompositeStore creates a new Store which delegates to different stores depending on time.
func (*CompositeStore) AddPeriod ¶
func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks ObjectClient, limits *validation.Overrides) error
AddPeriod adds the configuration for a period of time to the CompositeStore
func (CompositeStore) GetChunkRefs ¶
func (CompositeStore) LabelNamesForMetricName ¶
func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error)
LabelNamesForMetricName retrieves all label names for a metric name.
func (CompositeStore) LabelValuesForMetricName ¶
func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error)
LabelValuesForMetricName retrieves all label values for a single label name and metric name.
type DayTime ¶
DayTime is a model.Time what holds day-aligned values, and marshals to/from YAML in YYYY-MM-DD format.
func (DayTime) MarshalYAML ¶
MarshalYAML implements yaml.Marshaller.
func (*DayTime) UnmarshalYAML ¶
UnmarshalYAML implements yaml.Unmarshaller.
type DecodeContext ¶
type DecodeContext struct {
// contains filtered or unexported fields
}
DecodeContext holds data that can be re-used between decodes of different chunks
func NewDecodeContext ¶
func NewDecodeContext() *DecodeContext
NewDecodeContext creates a new, blank, DecodeContext
type Fetcher ¶
type Fetcher struct {
// contains filtered or unexported fields
}
Fetcher deals with fetching chunk contents from the cache/store, and writing back any misses to the cache. Also responsible for decoding chunks from the cache, in parallel.
func NewChunkFetcher ¶
NewChunkFetcher makes a new ChunkFetcher.
func (*Fetcher) FetchChunks ¶
FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
type IndexClient ¶
type IndexClient interface { Stop() // For the write path. NewWriteBatch() WriteBatch BatchWrite(context.Context, WriteBatch) error // For the read path. QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error }
IndexClient is a client for the storage of the index (e.g. DynamoDB or Bigtable).
type IndexEntry ¶
type IndexEntry struct { TableName string HashValue string // For writes, RangeValue will always be set. RangeValue []byte // New for v6 schema, label value is not written as part of the range key. Value []byte }
IndexEntry describes an entry in the chunk index
type IndexQuery ¶
type IndexQuery struct { TableName string HashValue string // One of RangeValuePrefix or RangeValueStart might be set: // - If RangeValuePrefix is not nil, must read all keys with that prefix. // - If RangeValueStart is not nil, must read all keys from there onwards. // - If neither is set, must read all keys for that row. RangeValuePrefix []byte RangeValueStart []byte // Filters for querying ValueEqual []byte // If the result of this lookup is immutable or not (for caching). Immutable bool }
IndexQuery describes a query for entries
type LegacySchemaConfig ¶
type LegacySchemaConfig struct { StorageClient string // aws, gcp, etc. // After midnight on this day, we start bucketing indexes by day instead of by // hour. Only the day matters, not the time within the day. DailyBucketsFrom flagext.DayValue Base64ValuesFrom flagext.DayValue V4SchemaFrom flagext.DayValue V5SchemaFrom flagext.DayValue V6SchemaFrom flagext.DayValue V9SchemaFrom flagext.DayValue BigtableColumnKeyFrom flagext.DayValue // Config for the index & chunk tables. OriginalTableName string UsePeriodicTables bool IndexTablesFrom flagext.DayValue IndexTables PeriodicTableConfig ChunkTablesFrom flagext.DayValue ChunkTables PeriodicTableConfig }
LegacySchemaConfig lets you configure schema via command-line flags
func (*LegacySchemaConfig) RegisterFlags ¶
func (cfg *LegacySchemaConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type MockStorage ¶
type MockStorage struct {
// contains filtered or unexported fields
}
MockStorage is a fake in-memory StorageClient.
func (*MockStorage) BatchWrite ¶
func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error
BatchWrite implements StorageClient.
func (*MockStorage) CreateTable ¶
func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error
CreateTable implements StorageClient.
func (*MockStorage) DeleteTable ¶
func (m *MockStorage) DeleteTable(_ context.Context, name string) error
DeleteTable implements StorageClient.
func (*MockStorage) DescribeTable ¶
func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error)
DescribeTable implements StorageClient.
func (*MockStorage) ListTables ¶
func (m *MockStorage) ListTables(_ context.Context) ([]string, error)
ListTables implements StorageClient.
func (*MockStorage) NewWriteBatch ¶
func (m *MockStorage) NewWriteBatch() WriteBatch
NewWriteBatch implements StorageClient.
func (*MockStorage) PutChunks ¶
func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error
PutChunks implements StorageClient.
func (*MockStorage) QueryPages ¶
func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error
QueryPages implements StorageClient.
func (*MockStorage) UpdateTable ¶
func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error
UpdateTable implements StorageClient.
type ObjectAndIndexClient ¶
type ObjectAndIndexClient interface {
PutChunkAndIndex(ctx context.Context, c Chunk, index WriteBatch) error
}
ObjectAndIndexClient allows optimisations where the same client handles both
type ObjectClient ¶
type ObjectClient interface { Stop() PutChunks(ctx context.Context, chunks []Chunk) error GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) }
ObjectClient is for storing and retrieving chunks.
type PeriodConfig ¶
type PeriodConfig struct { From DayTime `yaml:"from"` // used when working with config IndexType string `yaml:"store"` // type of index client to use. ObjectType string `yaml:"object_store"` // type of object client to use; if omitted, defaults to store. Schema string `yaml:"schema"` IndexTables PeriodicTableConfig `yaml:"index"` ChunkTables PeriodicTableConfig `yaml:"chunks,omitempty"` RowShards uint32 `yaml:"row_shards"` }
PeriodConfig defines the schema and tables to use for a period of time
func (PeriodConfig) CreateSchema ¶
func (cfg PeriodConfig) CreateSchema() Schema
CreateSchema returns the schema defined by the PeriodConfig
type PeriodicTableConfig ¶
type PeriodicTableConfig struct { Prefix string `yaml:"prefix"` Period time.Duration `yaml:"period,omitempty"` Tags Tags `yaml:"tags,omitempty"` }
PeriodicTableConfig is configuration for a set of time-sharded tables.
func (*PeriodicTableConfig) RegisterFlags ¶
func (cfg *PeriodicTableConfig) RegisterFlags(argPrefix, tablePrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type ProvisionConfig ¶
type ProvisionConfig struct { ProvisionedThroughputOnDemandMode bool `yaml:"provisioned_throughput_on_demand_mode"` ProvisionedWriteThroughput int64 `yaml:"provisioned_write_throughput"` ProvisionedReadThroughput int64 `yaml:"provisioned_read_throughput"` InactiveThroughputOnDemandMode bool `yaml:"inactive_throughput_on_demand_mode"` InactiveWriteThroughput int64 `yaml:"inactive_write_throughput"` InactiveReadThroughput int64 `yaml:"inactive_read_throughput"` WriteScale AutoScalingConfig `yaml:"write_scale"` InactiveWriteScale AutoScalingConfig `yaml:"inactive_write_scale"` InactiveWriteScaleLastN int64 `yaml:"inactive_write_scale_lastn"` ReadScale AutoScalingConfig `yaml:"read_scale"` InactiveReadScale AutoScalingConfig `yaml:"inactive_read_scale"` InactiveReadScaleLastN int64 `yaml:"inactive_read_scale_lastn"` }
ProvisionConfig holds config for provisioning capacity (on DynamoDB)
func (*ProvisionConfig) RegisterFlags ¶
func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type ReadBatch ¶
type ReadBatch interface {
Iterator() ReadBatchIterator
}
ReadBatch represents the results of a QueryPages.
type ReadBatchIterator ¶
ReadBatchIterator is an iterator over a ReadBatch.
type Schema ¶
type Schema interface { // When doing a write, use this method to return the list of entries you should write to. GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) // Should only be used with the seriesStore. TODO: Make seriesStore implement a different interface altogether. // returns cache key string and []IndexEntry per bucket, matched in order GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) // When doing a read, use these methods to return the list of entries you should query GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) // If the query resulted in series IDs, use this method to find chunks. GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) }
Schema interface defines methods to calculate the hash and range keys needed to write or read chunks from the external index.
type SchemaConfig ¶
type SchemaConfig struct { Configs []PeriodConfig `yaml:"configs"` // contains filtered or unexported fields }
SchemaConfig contains the config for our chunk index schemas
func DefaultSchemaConfig ¶
func DefaultSchemaConfig(store, schema string, from model.Time) SchemaConfig
DefaultSchemaConfig creates a simple schema config for testing
func (SchemaConfig) ChunkTableFor ¶
func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error)
ChunkTableFor calculates the chunk table shard for a given point in time.
func (*SchemaConfig) ForEachAfter ¶
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig))
ForEachAfter will call f() on every entry after t, splitting entries if necessary so there is an entry starting at t
func (*SchemaConfig) Load ¶
func (cfg *SchemaConfig) Load() error
Load the yaml file, or build the config from legacy command-line flags
func (SchemaConfig) PrintYaml ¶
func (cfg SchemaConfig) PrintYaml()
PrintYaml dumps the yaml to stdout, to aid in migration
func (*SchemaConfig) RegisterFlags ¶
func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type Store ¶
type Store interface { Put(ctx context.Context, chunks []Chunk) error PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) // GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk), // using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) LabelValuesForMetricName(ctx context.Context, from, through model.Time, metricName string, labelName string) ([]string, error) LabelNamesForMetricName(ctx context.Context, from, through model.Time, metricName string) ([]string, error) Stop() }
Store for chunks.
type StoreConfig ¶
type StoreConfig struct { ChunkCacheConfig cache.Config `yaml:"chunk_cache_config,omitempty"` WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config,omitempty"` MinChunkAge time.Duration `yaml:"min_chunk_age,omitempty"` CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than,omitempty"` // Limits query start time to be greater than now() - MaxLookBackPeriod, if set. MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"` // contains filtered or unexported fields }
StoreConfig specifies config for a ChunkStore
func (*StoreConfig) RegisterFlags ¶
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
type TableClient ¶
type TableClient interface { ListTables(ctx context.Context) ([]string, error) CreateTable(ctx context.Context, desc TableDesc) error DeleteTable(ctx context.Context, name string) error DescribeTable(ctx context.Context, name string) (desc TableDesc, isActive bool, err error) UpdateTable(ctx context.Context, current, expected TableDesc) error }
TableClient is a client for telling Dynamo what to do with tables.
type TableDesc ¶
type TableDesc struct { Name string UseOnDemandIOMode bool ProvisionedRead int64 ProvisionedWrite int64 Tags Tags WriteScale AutoScalingConfig ReadScale AutoScalingConfig }
TableDesc describes a table.
type TableManager ¶
type TableManager struct {
// contains filtered or unexported fields
}
TableManager creates and manages the provisioned throughput on DynamoDB tables
func NewTableManager ¶
func NewTableManager(cfg TableManagerConfig, schemaCfg SchemaConfig, maxChunkAge time.Duration, tableClient TableClient, objectClient BucketClient) (*TableManager, error)
NewTableManager makes a new TableManager
func (*TableManager) SyncTables ¶
func (m *TableManager) SyncTables(ctx context.Context) error
SyncTables will calculate the tables expected to exist, create those that do not and update those that need it. It is exposed for testing.
type TableManagerConfig ¶
type TableManagerConfig struct { // Master 'off-switch' for table capacity updates, e.g. when troubleshooting ThroughputUpdatesDisabled bool `yaml:"throughput_updates_disabled"` // Master 'on-switch' for table retention deletions RetentionDeletesEnabled bool `yaml:"retention_deletes_enabled"` // How far back tables will be kept before they are deleted RetentionPeriod time.Duration `yaml:"retention_period"` // Period with which the table manager will poll for tables. DynamoDBPollInterval time.Duration `yaml:"dynamodb_poll_interval"` // duration a table will be created before it is needed. CreationGracePeriod time.Duration `yaml:"creation_grace_period"` IndexTables ProvisionConfig `yaml:"index_tables_provisioning"` ChunkTables ProvisionConfig `yaml:"chunk_tables_provisioning"` }
TableManagerConfig holds config for a TableManager
func (*TableManagerConfig) RegisterFlags ¶
func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type Tags ¶
Tags is a string-string map that implements flag.Value.
func (*Tags) UnmarshalYAML ¶
UnmarshalYAML implements yaml.Unmarshaler.
type WriteBatch ¶
WriteBatch represents a batch of writes.