chunk

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2020 License: Apache-2.0 Imports: 47 Imported by: 98

Documentation

Index

Constants

View Source
const (
	ErrInvalidChecksum    = errs.Error("invalid chunk checksum")
	ErrWrongMetadata      = errs.Error("wrong chunk metadata")
	ErrMetadataLength     = errs.Error("chunk metadata wrong length")
	ErrDataLength         = errs.Error("chunk data wrong length")
	ErrSliceOutOfRange    = errs.Error("chunk can't be sliced out of its data range")
	ErrSliceNoDataInRange = errs.Error("chunk has no data for given range to slice")
	ErrSliceChunkOverflow = errs.Error("slicing should not overflow a chunk")
)

Errors that decode can return

View Source
const DirDelim = "/"

DirDelim is the delimiter used to model a directory structure in an object store.

Variables

View Source
var (
	ErrMetricNameLabelMissing     = errors.New("metric name label missing")
	ErrParialDeleteChunkNoOverlap = errors.New("interval for partial deletion has not overlap with chunk interval")
)
View Source
var (
	// ErrStorageObjectNotFound when object storage does not have requested object
	ErrStorageObjectNotFound = errors.New("object not found in storage")
	// ErrMethodNotImplemented when any of the storage clients do not implement a method
	ErrMethodNotImplemented = errors.New("method is not implemented")
)
View Source
var BenchmarkLabels = labels.Labels{
	{Name: model.MetricNameLabel, Value: "container_cpu_usage_seconds_total"},
	{Name: "beta_kubernetes_io_arch", Value: "amd64"},
	{Name: "beta_kubernetes_io_instance_type", Value: "c3.somesize"},
	{Name: "beta_kubernetes_io_os", Value: "linux"},
	{Name: "container_name", Value: "some-name"},
	{Name: "cpu", Value: "cpu01"},
	{Name: "failure_domain_beta_kubernetes_io_region", Value: "somewhere-1"},
	{Name: "failure_domain_beta_kubernetes_io_zone", Value: "somewhere-1b"},
	{Name: "id", Value: "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28"},
	{Name: "image", Value: "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506"},
	{Name: "instance", Value: "ip-111-11-1-11.ec2.internal"},
	{Name: "job", Value: "kubernetes-cadvisor"},
	{Name: "kubernetes_io_hostname", Value: "ip-111-11-1-11"},
	{Name: "monitor", Value: "prod"},
	{Name: "name", Value: "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0"},
	{Name: "namespace", Value: "kube-system"},
	{Name: "pod_name", Value: "some-other-name-5j8s8"},
}

BenchmarkLabels is a real example from Kubernetes' embedded cAdvisor metrics, lightly obfuscated

View Source
var (
	// ErrNotSupported when a schema doesn't support that particular lookup.
	ErrNotSupported = errors.New("not supported")
)

Functions

func ChunksToMatrix

func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error)

ChunksToMatrix converts a set of chunks to a model.Matrix.

func ExpectTables

func ExpectTables(ctx context.Context, client TableClient, expected []TableDesc) error

ExpectTables compares existing tables to an expected set of tables. Exposed for testing,

Types

type AutoScalingConfig

type AutoScalingConfig struct {
	Enabled     bool    `yaml:"enabled"`
	RoleARN     string  `yaml:"role_arn"`
	MinCapacity int64   `yaml:"min_capacity"`
	MaxCapacity int64   `yaml:"max_capacity"`
	OutCooldown int64   `yaml:"out_cooldown"`
	InCooldown  int64   `yaml:"in_cooldown"`
	TargetValue float64 `yaml:"target"`
}

AutoScalingConfig for DynamoDB tables.

func (*AutoScalingConfig) RegisterFlags

func (cfg *AutoScalingConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket describes a range of time with a tableName and hashKey

type BucketClient

type BucketClient interface {
	DeleteChunksBefore(ctx context.Context, ts time.Time) error
}

BucketClient is used to enforce retention on chunk buckets.

type CardinalityExceededError

type CardinalityExceededError struct {
	MetricName, LabelName string
	Size, Limit           int32
}

CardinalityExceededError is returned when the user reads a row that is too large.

func (CardinalityExceededError) Error

func (e CardinalityExceededError) Error() string

type Chunk

type Chunk struct {
	// These two fields will be missing from older chunks (as will the hash).
	// On fetch we will initialise these fields from the DynamoDB key.
	Fingerprint model.Fingerprint `json:"fingerprint"`
	UserID      string            `json:"userID"`

	// These fields will be in all chunks, including old ones.
	From    model.Time    `json:"from"`
	Through model.Time    `json:"through"`
	Metric  labels.Labels `json:"metric"`

	// The hash is not written to the external storage either.  We use
	// crc32, Castagnoli table.  See http://www.evanjones.ca/crc32c.html.
	// For old chunks, ChecksumSet will be false.
	ChecksumSet bool   `json:"-"`
	Checksum    uint32 `json:"-"`

	// We never use Delta encoding (the zero value), so if this entry is
	// missing, we default to DoubleDelta.
	Encoding prom_chunk.Encoding `json:"encoding"`
	Data     prom_chunk.Chunk    `json:"-"`
	// contains filtered or unexported fields
}

Chunk contains encoded timeseries data

func NewChunk

func NewChunk(userID string, fp model.Fingerprint, metric labels.Labels, c prom_chunk.Chunk, from, through model.Time) Chunk

NewChunk creates a new chunk

func ParseExternalKey

func ParseExternalKey(userID, externalKey string) (Chunk, error)

ParseExternalKey is used to construct a partially-populated chunk from the key in DynamoDB. This chunk can then be used to calculate the key needed to fetch the Chunk data from Memcache/S3, and then fully populate the chunk with decode().

Pre-checksums, the keys written to DynamoDB looked like `<fingerprint>:<start time>:<end time>` (aka the ID), and the key for memcache and S3 was `<user id>/<fingerprint>:<start time>:<end time>. Finger prints and times were written in base-10.

Post-checksums, externals keys become the same across DynamoDB, Memcache and S3. Numbers become hex encoded. Keys look like: `<user id>/<fingerprint>:<start time>:<end time>:<checksum>`.

func (*Chunk) Decode

func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error

Decode the chunk from the given buffer, and confirm the chunk is the one we expected.

func (*Chunk) Encode

func (c *Chunk) Encode() error

Encode writes the chunk into a buffer, and calculates the checksum.

func (*Chunk) Encoded

func (c *Chunk) Encoded() ([]byte, error)

Encoded returns the buffer created by Encoded()

func (*Chunk) ExternalKey

func (c *Chunk) ExternalKey() string

ExternalKey returns the key you can use to fetch this chunk from external storage. For newer chunks, this key includes a checksum.

func (*Chunk) Samples

func (c *Chunk) Samples(from, through model.Time) ([]model.SamplePair, error)

Samples returns all SamplePairs for the chunk.

func (*Chunk) Slice added in v0.7.0

func (c *Chunk) Slice(from, through model.Time) (*Chunk, error)

Slice builds a new smaller chunk with data only from given time range (inclusive)

type Client added in v0.7.0

type Client interface {
	Stop()

	PutChunks(ctx context.Context, chunks []Chunk) error
	GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error)
	DeleteChunk(ctx context.Context, chunkID string) error
}

Client is for storing and retrieving chunks.

type CompositeStore

type CompositeStore struct {
	// contains filtered or unexported fields
}

CompositeStore is a Store which delegates to various stores depending on when they were activated.

func NewCompositeStore

func NewCompositeStore() CompositeStore

NewCompositeStore creates a new Store which delegates to different stores depending on time.

func (*CompositeStore) AddPeriod

func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error

AddPeriod adds the configuration for a period of time to the CompositeStore

func (CompositeStore) DeleteChunk added in v0.7.0

func (c CompositeStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error

DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage. It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk

func (CompositeStore) DeleteSeriesIDs added in v0.7.0

func (c CompositeStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error

DeleteSeriesIDs deletes series IDs from index in series store

func (CompositeStore) Get

func (c CompositeStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)

func (CompositeStore) GetChunkRefs

func (c CompositeStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)

func (CompositeStore) LabelNamesForMetricName

func (c CompositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)

LabelNamesForMetricName retrieves all label names for a metric name.

func (CompositeStore) LabelValuesForMetricName

func (c CompositeStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error)

LabelValuesForMetricName retrieves all label values for a single label name and metric name.

func (CompositeStore) Put

func (c CompositeStore) Put(ctx context.Context, chunks []Chunk) error

func (CompositeStore) PutOne

func (c CompositeStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error

func (CompositeStore) Stop

func (c CompositeStore) Stop()

type DayTime

type DayTime struct {
	model.Time
}

DayTime is a model.Time what holds day-aligned values, and marshals to/from YAML in YYYY-MM-DD format.

func (DayTime) MarshalYAML

func (d DayTime) MarshalYAML() (interface{}, error)

MarshalYAML implements yaml.Marshaller.

func (*DayTime) UnmarshalYAML

func (d *DayTime) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements yaml.Unmarshaller.

type DecodeContext

type DecodeContext struct {
	// contains filtered or unexported fields
}

DecodeContext holds data that can be re-used between decodes of different chunks

func NewDecodeContext

func NewDecodeContext() *DecodeContext

NewDecodeContext creates a new, blank, DecodeContext

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

Fetcher deals with fetching chunk contents from the cache/store, and writing back any misses to the cache. Also responsible for decoding chunks from the cache, in parallel.

func NewChunkFetcher

func NewChunkFetcher(cacher cache.Cache, cacheStubs bool, storage Client) (*Fetcher, error)

NewChunkFetcher makes a new ChunkFetcher.

func (*Fetcher) FetchChunks

func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error)

FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.

func (*Fetcher) Stop

func (c *Fetcher) Stop()

Stop the ChunkFetcher.

type IndexClient

type IndexClient interface {
	Stop()

	// For the write path.
	NewWriteBatch() WriteBatch
	BatchWrite(context.Context, WriteBatch) error

	// For the read path.
	QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error
}

IndexClient is a client for the storage of the index (e.g. DynamoDB or Bigtable).

type IndexEntry

type IndexEntry struct {
	TableName string
	HashValue string

	// For writes, RangeValue will always be set.
	RangeValue []byte

	// New for v6 schema, label value is not written as part of the range key.
	Value []byte
}

IndexEntry describes an entry in the chunk index

type IndexQuery

type IndexQuery struct {
	TableName string
	HashValue string

	// One of RangeValuePrefix or RangeValueStart might be set:
	// - If RangeValuePrefix is not nil, must read all keys with that prefix.
	// - If RangeValueStart is not nil, must read all keys from there onwards.
	// - If neither is set, must read all keys for that row.
	RangeValuePrefix []byte
	RangeValueStart  []byte

	// Filters for querying
	ValueEqual []byte

	// If the result of this lookup is immutable or not (for caching).
	Immutable bool
}

IndexQuery describes a query for entries

type MockStorage

type MockStorage struct {
	// contains filtered or unexported fields
}

MockStorage is a fake in-memory StorageClient.

func NewMockStorage

func NewMockStorage() *MockStorage

NewMockStorage creates a new MockStorage.

func (*MockStorage) BatchWrite

func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error

BatchWrite implements StorageClient.

func (*MockStorage) CreateTable

func (m *MockStorage) CreateTable(_ context.Context, desc TableDesc) error

CreateTable implements StorageClient.

func (*MockStorage) DeleteChunk added in v0.7.0

func (m *MockStorage) DeleteChunk(ctx context.Context, chunkID string) error

DeleteChunk implements StorageClient.

func (*MockStorage) DeleteObject added in v0.7.0

func (m *MockStorage) DeleteObject(ctx context.Context, objectKey string) error

func (*MockStorage) DeleteTable

func (m *MockStorage) DeleteTable(_ context.Context, name string) error

DeleteTable implements StorageClient.

func (*MockStorage) DescribeTable

func (m *MockStorage) DescribeTable(_ context.Context, name string) (desc TableDesc, isActive bool, err error)

DescribeTable implements StorageClient.

func (*MockStorage) GetChunks

func (m *MockStorage) GetChunks(ctx context.Context, chunkSet []Chunk) ([]Chunk, error)

GetChunks implements StorageClient.

func (*MockStorage) GetObject added in v0.7.0

func (m *MockStorage) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)

func (*MockStorage) List added in v0.7.0

func (m *MockStorage) List(ctx context.Context, prefix string) ([]StorageObject, error)

func (*MockStorage) ListTables

func (m *MockStorage) ListTables(_ context.Context) ([]string, error)

ListTables implements StorageClient.

func (*MockStorage) NewWriteBatch

func (m *MockStorage) NewWriteBatch() WriteBatch

NewWriteBatch implements StorageClient.

func (*MockStorage) PutChunks

func (m *MockStorage) PutChunks(_ context.Context, chunks []Chunk) error

PutChunks implements StorageClient.

func (*MockStorage) PutObject added in v0.7.0

func (m *MockStorage) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error

func (*MockStorage) QueryPages

func (m *MockStorage) QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error

QueryPages implements StorageClient.

func (*MockStorage) Stop

func (*MockStorage) Stop()

Stop doesn't do anything.

func (*MockStorage) UpdateTable

func (m *MockStorage) UpdateTable(_ context.Context, _, desc TableDesc) error

UpdateTable implements StorageClient.

type ObjectAndIndexClient

type ObjectAndIndexClient interface {
	PutChunkAndIndex(ctx context.Context, c Chunk, index WriteBatch) error
}

ObjectAndIndexClient allows optimisations where the same client handles both

type ObjectClient

type ObjectClient interface {
	PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
	GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)
	List(ctx context.Context, prefix string) ([]StorageObject, error)
	DeleteObject(ctx context.Context, objectKey string) error
	Stop()
}

ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/Etc)

type PeriodConfig

type PeriodConfig struct {
	From        DayTime             `yaml:"from"`         // used when working with config
	IndexType   string              `yaml:"store"`        // type of index client to use.
	ObjectType  string              `yaml:"object_store"` // type of object client to use; if omitted, defaults to store.
	Schema      string              `yaml:"schema"`
	IndexTables PeriodicTableConfig `yaml:"index"`
	ChunkTables PeriodicTableConfig `yaml:"chunks"`
	RowShards   uint32              `yaml:"row_shards"`
}

PeriodConfig defines the schema and tables to use for a period of time

func (PeriodConfig) CreateSchema added in v0.2.0

func (cfg PeriodConfig) CreateSchema() Schema

CreateSchema returns the schema defined by the PeriodConfig

type PeriodicTableConfig

type PeriodicTableConfig struct {
	Prefix string
	Period time.Duration
	Tags   Tags
}

PeriodicTableConfig is configuration for a set of time-sharded tables.

func (PeriodicTableConfig) MarshalYAML added in v1.0.0

func (cfg PeriodicTableConfig) MarshalYAML() (interface{}, error)

MarshalYAML implements the yaml.Marshaler interface.

func (*PeriodicTableConfig) TableFor

func (cfg *PeriodicTableConfig) TableFor(t model.Time) string

TableFor calculates the table shard for a given point in time.

func (*PeriodicTableConfig) UnmarshalYAML added in v1.0.0

func (cfg *PeriodicTableConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements the yaml.Unmarshaler interface.

type ProvisionConfig

type ProvisionConfig struct {
	ProvisionedThroughputOnDemandMode bool  `yaml:"enable_ondemand_throughput_mode"`
	ProvisionedWriteThroughput        int64 `yaml:"provisioned_write_throughput"`
	ProvisionedReadThroughput         int64 `yaml:"provisioned_read_throughput"`
	InactiveThroughputOnDemandMode    bool  `yaml:"enable_inactive_throughput_on_demand_mode"`
	InactiveWriteThroughput           int64 `yaml:"inactive_write_throughput"`
	InactiveReadThroughput            int64 `yaml:"inactive_read_throughput"`

	WriteScale              AutoScalingConfig `yaml:"write_scale"`
	InactiveWriteScale      AutoScalingConfig `yaml:"inactive_write_scale"`
	InactiveWriteScaleLastN int64             `yaml:"inactive_write_scale_lastn"`
	ReadScale               AutoScalingConfig `yaml:"read_scale"`
	InactiveReadScale       AutoScalingConfig `yaml:"inactive_read_scale"`
	InactiveReadScaleLastN  int64             `yaml:"inactive_read_scale_lastn"`
}

ProvisionConfig holds config for provisioning capacity (on DynamoDB for now)

func (*ProvisionConfig) RegisterFlags

func (cfg *ProvisionConfig) RegisterFlags(argPrefix string, f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

type ReadBatch

type ReadBatch interface {
	Iterator() ReadBatchIterator
}

ReadBatch represents the results of a QueryPages.

type ReadBatchIterator

type ReadBatchIterator interface {
	Next() bool
	RangeValue() []byte
	Value() []byte
}

ReadBatchIterator is an iterator over a ReadBatch.

type Schema

type Schema interface {
	// When doing a write, use this method to return the list of entries you should write to.
	GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)

	// Should only be used with the seriesStore. TODO: Make seriesStore implement a different interface altogether.
	// returns cache key string and []IndexEntry per bucket, matched in order
	GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error)
	GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)

	// When doing a read, use these methods to return the list of entries you should query
	GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error)
	GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error)
	GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
	FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery

	// If the query resulted in series IDs, use this method to find chunks.
	GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
	// Returns queries to retrieve all label names of multiple series by id.
	GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)

	// GetSeriesDeleteEntries returns IndexEntry's for deleting SeriesIDs from SeriesStore.
	// Since SeriesIDs are created per bucket, it makes sure that we don't include series entries which are in use by verifying using hasChunksForIntervalFunc i.e
	// It checks first and last buckets covered by the time interval to see if a SeriesID still has chunks in the store,
	// if yes then it doesn't include IndexEntry's for that bucket for deletion.
	GetSeriesDeleteEntries(from, through model.Time, userID string, metric labels.Labels, hasChunksForIntervalFunc hasChunksForIntervalFunc) ([]IndexEntry, error)
}

Schema interface defines methods to calculate the hash and range keys needed to write or read chunks from the external index.

type SchemaConfig

type SchemaConfig struct {
	Configs []PeriodConfig `yaml:"configs"`
	// contains filtered or unexported fields
}

SchemaConfig contains the config for our chunk index schemas

func DefaultSchemaConfig

func DefaultSchemaConfig(store, schema string, from model.Time) SchemaConfig

DefaultSchemaConfig creates a simple schema config for testing

func (SchemaConfig) ChunkTableFor

func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error)

ChunkTableFor calculates the chunk table shard for a given point in time.

func (*SchemaConfig) ForEachAfter

func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig))

ForEachAfter will call f() on every entry after t, splitting entries if necessary so there is an entry starting at t

func (*SchemaConfig) Load

func (cfg *SchemaConfig) Load() error

Load the yaml file, or build the config from legacy command-line flags

func (*SchemaConfig) RegisterFlags

func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*SchemaConfig) Validate added in v0.4.0

func (cfg *SchemaConfig) Validate() error

Validate the schema config and returns an error if the validation doesn't pass

type StorageObject added in v0.7.0

type StorageObject struct {
	Key        string
	ModifiedAt time.Time
}

StorageObject represents an object being stored in an Object Store

type Store

type Store interface {
	Put(ctx context.Context, chunks []Chunk) error
	PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
	Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
	// GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk),
	// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
	GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
	LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error)
	LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)

	// DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage.
	// It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk
	DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error
	// DeleteSeriesIDs is only relevant for SeriesStore.
	DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error
	Stop()
}

Store for chunks.

type StoreConfig

type StoreConfig struct {
	ChunkCacheConfig       cache.Config `yaml:"chunk_cache_config"`
	WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config"`

	CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than"`

	// Limits query start time to be greater than now() - MaxLookBackPeriod, if set.
	MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"`
	// contains filtered or unexported fields
}

StoreConfig specifies config for a ChunkStore

func (*StoreConfig) RegisterFlags

func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

type StoreLimits added in v0.2.0

type StoreLimits interface {
	MaxChunksPerQuery(userID string) int
	MaxQueryLength(userID string) time.Duration
}

StoreLimits helps get Limits specific to Queries for Stores

type TableClient

type TableClient interface {
	ListTables(ctx context.Context) ([]string, error)
	CreateTable(ctx context.Context, desc TableDesc) error
	DeleteTable(ctx context.Context, name string) error
	DescribeTable(ctx context.Context, name string) (desc TableDesc, isActive bool, err error)
	UpdateTable(ctx context.Context, current, expected TableDesc) error
}

TableClient is a client for telling Dynamo what to do with tables.

type TableDesc

type TableDesc struct {
	Name              string
	UseOnDemandIOMode bool
	ProvisionedRead   int64
	ProvisionedWrite  int64
	Tags              Tags
	WriteScale        AutoScalingConfig
	ReadScale         AutoScalingConfig
}

TableDesc describes a table.

func (TableDesc) Equals

func (desc TableDesc) Equals(other TableDesc) bool

Equals returns true if other matches desc.

type TableManager

type TableManager struct {
	services.Service
	// contains filtered or unexported fields
}

TableManager creates and manages the provisioned throughput on DynamoDB tables

func NewTableManager

func NewTableManager(cfg TableManagerConfig, schemaCfg SchemaConfig, maxChunkAge time.Duration, tableClient TableClient,
	objectClient BucketClient, registerer prometheus.Registerer) (*TableManager, error)

NewTableManager makes a new TableManager

func (*TableManager) SyncTables

func (m *TableManager) SyncTables(ctx context.Context) error

SyncTables will calculate the tables expected to exist, create those that do not and update those that need it. It is exposed for testing.

type TableManagerConfig

type TableManagerConfig struct {
	// Master 'off-switch' for table capacity updates, e.g. when troubleshooting
	ThroughputUpdatesDisabled bool `yaml:"throughput_updates_disabled"`

	// Master 'on-switch' for table retention deletions
	RetentionDeletesEnabled bool `yaml:"retention_deletes_enabled"`

	// How far back tables will be kept before they are deleted
	RetentionPeriod time.Duration `yaml:"-"`
	// This is so that we can accept 1w, 1y in the YAML.
	RetentionPeriodModel model.Duration `yaml:"retention_period"`

	// Period with which the table manager will poll for tables.
	PollInterval time.Duration `yaml:"poll_interval"`

	// duration a table will be created before it is needed.
	CreationGracePeriod time.Duration `yaml:"creation_grace_period"`

	IndexTables ProvisionConfig `yaml:"index_tables_provisioning"`
	ChunkTables ProvisionConfig `yaml:"chunk_tables_provisioning"`
}

TableManagerConfig holds config for a TableManager

func (*TableManagerConfig) MarshalYAML added in v1.0.0

func (cfg *TableManagerConfig) MarshalYAML() (interface{}, error)

MarshalYAML implements the yaml.Marshaler interface. To support RetentionPeriod.

func (*TableManagerConfig) RegisterFlags

func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*TableManagerConfig) UnmarshalYAML added in v1.0.0

func (cfg *TableManagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements the yaml.Unmarshaler interface. To support RetentionPeriod.

func (*TableManagerConfig) Validate added in v1.0.0

func (cfg *TableManagerConfig) Validate() error

Validate validates the config.

type Tags

type Tags map[string]string

Tags is a string-string map that implements flag.Value.

func (Tags) Equals

func (ts Tags) Equals(other Tags) bool

Equals returns true is other matches ts.

func (*Tags) Set

func (ts *Tags) Set(s string) error

Set implements flag.Value

func (Tags) String

func (ts Tags) String() string

String implements flag.Value

func (*Tags) UnmarshalYAML

func (ts *Tags) UnmarshalYAML(unmarshal func(interface{}) error) error

UnmarshalYAML implements yaml.Unmarshaler.

type UniqueStrings added in v0.4.0

type UniqueStrings struct {
	// contains filtered or unexported fields
}

UniqueStrings keeps a slice of unique strings.

func (*UniqueStrings) Add added in v0.4.0

func (us *UniqueStrings) Add(strings ...string)

Add adds a new string, dropping duplicates.

func (UniqueStrings) Strings added in v0.4.0

func (us UniqueStrings) Strings() []string

Strings returns the sorted sliced of unique strings.

type WriteBatch

type WriteBatch interface {
	Add(tableName, hashValue string, rangeValue []byte, value []byte)
	Delete(tableName, hashValue string, rangeValue []byte)
}

WriteBatch represents a batch of writes.

Directories

Path Synopsis
This file was taken from Prometheus (https://github.com/prometheus/prometheus).
This file was taken from Prometheus (https://github.com/prometheus/prometheus).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL