storegateway

package
v0.0.0-...-834961c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: AGPL-3.0 Imports: 92 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// RingKey is the key under which we store the store gateways ring in the KVStore.
	RingKey = "store-gateway"

	// RingNameForServer is the name of the ring used by the store gateway server.
	RingNameForServer = "store-gateway"

	// RingNameForClient is the name of the ring used by the store gateway client (we need
	// a different name to avoid clashing Prometheus metrics when running in single-binary).
	RingNameForClient = "store-gateway-client"
)
View Source
const GrpcContextMetadataTenantID = "__org_id__"

GrpcContextMetadataTenantID is a key for GRPC Metadata used to pass tenant ID to store-gateway process. (This is now separate from DeprecatedTenantIDExternalLabel to signify different use case.)

Variables

View Source
var (
	// BlocksOwnerSync is the operation used to check the authoritative owners of a block
	// (replicas included).
	BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)

	// BlocksOwnerRead is the operation used to check the authoritative owners of a block
	// (replicas included) that are available for queries (a store-gateway is available for
	// queries only when ACTIVE).
	BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

	// BlocksRead is the operation run by the querier to query blocks via the store-gateway.
	BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {

		return s != ring.ACTIVE
	})
)

Functions

func GetShuffleShardingSubring

func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing

GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.

func NewShardingMetadataFilterAdapter

func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter

Types

type BucketIndexMetadataFetcher

type BucketIndexMetadataFetcher struct {
	// contains filtered or unexported fields
}

BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Mimir bucket index.

func NewBucketIndexMetadataFetcher

func NewBucketIndexMetadataFetcher(
	userID string,
	bkt objstore.Bucket,
	cfgProvider bucket.TenantConfigProvider,
	logger log.Logger,
	reg prometheus.Registerer,
	filters []block.MetadataFilter,
) *BucketIndexMetadataFetcher

func (*BucketIndexMetadataFetcher) Fetch

func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*block.Meta, partial map[ulid.ULID]error, err error)

Fetch implements block.MetadataFetcher. Not goroutine-safe.

type BucketStore

type BucketStore struct {
	services.Service
	// contains filtered or unexported fields
}

BucketStore implements the store API backed by a bucket. It loads all index files to local disk.

NOTE: Bucket store reencodes postings using diff+varint+snappy when storing to cache. This makes them smaller, but takes extra CPU and memory. When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.

func NewBucketStore

func NewBucketStore(
	userID string,
	bkt objstore.InstrumentedBucketReader,
	fetcher block.MetadataFetcher,
	dir string,
	bucketStoreConfig tsdb.BucketStoreConfig,
	postingsStrategy postingsSelectionStrategy,
	chunksLimiterFactory ChunksLimiterFactory,
	seriesLimiterFactory SeriesLimiterFactory,
	partitioners blockPartitioners,
	seriesHashCache *hashcache.SeriesHashCache,
	metrics *BucketStoreMetrics,
	options ...BucketStoreOption,
) (*BucketStore, error)

NewBucketStore creates a new bucket backed store that implements the store API against an object store bucket. It is optimized to work against high latency backends.

func (*BucketStore) InitialSync

func (s *BucketStore) InitialSync(ctx context.Context) error

InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup.

func (*BucketStore) LabelNames

LabelNames implements the storegatewaypb.StoreGatewayServer interface.

func (*BucketStore) LabelValues

LabelValues implements the storegatewaypb.StoreGatewayServer interface.

func (*BucketStore) RemoveBlocksAndClose

func (s *BucketStore) RemoveBlocksAndClose() error

RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.

func (*BucketStore) Series

Series implements the storegatewaypb.StoreGatewayServer interface.

func (*BucketStore) Stats

func (s *BucketStore) Stats() BucketStoreStats

Stats returns statistics about the BucketStore instance.

func (*BucketStore) SyncBlocks

func (s *BucketStore) SyncBlocks(ctx context.Context) error

SyncBlocks synchronizes the stores state with the Bucket bucket. It will reuse disk space as persistent cache based on s.dir param.

func (*BucketStore) TimeRange

func (s *BucketStore) TimeRange() (mint, maxt int64)

TimeRange returns the minimum and maximum timestamp of data available in the store.

type BucketStoreMetrics

type BucketStoreMetrics struct {
	// contains filtered or unexported fields
}

BucketStoreMetrics holds all the metrics tracked by BucketStore. These metrics MUST be monotonic (counter, summary, histogram) because a single metrics instance can be passed to multiple BucketStore and metrics MUST be correct even after a BucketStore is offloaded.

func NewBucketStoreMetrics

func NewBucketStoreMetrics(reg prometheus.Registerer) *BucketStoreMetrics

type BucketStoreOption

type BucketStoreOption func(s *BucketStore)

BucketStoreOption are functions that configure BucketStore.

func WithIndexCache

func WithIndexCache(cache indexcache.IndexCache) BucketStoreOption

WithIndexCache sets a indexCache to use instead of a noopCache.

func WithLazyLoadingGate

func WithLazyLoadingGate(lazyLoadingGate gate.Gate) BucketStoreOption

WithLazyLoadingGate sets a lazyLoadingGate to use instead of a gate.NewNoop().

func WithLogger

func WithLogger(logger log.Logger) BucketStoreOption

WithLogger sets the BucketStore logger to the one you pass.

func WithQueryGate

func WithQueryGate(queryGate gate.Gate) BucketStoreOption

WithQueryGate sets a queryGate to use instead of a gate.NewNoop().

type BucketStoreStats

type BucketStoreStats struct {
	// BlocksLoadedTotal is the total number of blocks currently loaded in the bucket store.
	BlocksLoadedTotal int
}

type BucketStores

type BucketStores struct {
	services.Service
	// contains filtered or unexported fields
}

BucketStores is a multi-tenant wrapper of Thanos BucketStore.

func NewBucketStores

func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, allowedTenants *util.AllowedTenants, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)

NewBucketStores makes a new BucketStores. After starting the returned BucketStores

func (*BucketStores) Collect

func (u *BucketStores) Collect(metrics chan<- prometheus.Metric)

func (*BucketStores) Describe

func (u *BucketStores) Describe(descs chan<- *prometheus.Desc)

func (*BucketStores) LabelNames

LabelNames implements the storegatewaypb.StoreGatewayServer interface.

func (*BucketStores) LabelValues

LabelValues implements the storegatewaypb.StoreGatewayServer interface.

func (*BucketStores) Series

Series implements the storegatewaypb.StoreGatewayServer interface, making a series request to the underlying user bucket store.

func (*BucketStores) SyncBlocks

func (u *BucketStores) SyncBlocks(ctx context.Context) error

SyncBlocks synchronizes the stores state with the Bucket store for every user.

type ChunksLimiter

type ChunksLimiter interface {
	// Reserve num chunks out of the total number of chunks enforced by the limiter.
	// Returns an error if the limit has been exceeded. This function must be
	// goroutine safe.
	Reserve(num uint64) error
}

type ChunksLimiterFactory

type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter

ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for projects depending on Thanos which have dynamic limits.

func NewChunksLimiterFactory

func NewChunksLimiterFactory(limitsExtractor func() uint64) ChunksLimiterFactory

NewChunksLimiterFactory makes a new ChunksLimiterFactory with a dynamic limit.

type Config

type Config struct {
	ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."`

	EnabledTenants  flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"`
	DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"`
}

Config holds the store gateway config.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the Config flags.

func (*Config) Validate

func (cfg *Config) Validate(limits validation.Limits) error

Validate the Config.

type IgnoreDeletionMarkFilter

type IgnoreDeletionMarkFilter struct {
	// contains filtered or unexported fields
}

IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements the MetadataFilterWithBucketIndex interface.

func NewIgnoreDeletionMarkFilter

func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter

NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.

func (*IgnoreDeletionMarkFilter) DeletionMarkBlocks

func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*block.DeletionMark

DeletionMarkBlocks returns blocks that were marked for deletion.

func (*IgnoreDeletionMarkFilter) Filter

func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error

Filter implements block.MetadataFilter.

func (*IgnoreDeletionMarkFilter) FilterWithBucketIndex

func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error

FilterWithBucketIndex implements MetadataFilterWithBucketIndex.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter is a simple mechanism for checking if something has passed a certain threshold.

func NewLimiter

func NewLimiter(limit uint64, ctr prometheus.Counter, limitErrorFunc func(uint64) validation.LimitError) *Limiter

NewLimiter returns a new limiter with a specified limit. 0 disables the limit.

func (*Limiter) Reserve

func (l *Limiter) Reserve(num uint64) error

Reserve implements ChunksLimiter.

type MetadataFetcherMetrics

type MetadataFetcherMetrics struct {
	// contains filtered or unexported fields
}

MetadataFetcherMetrics aggregates metrics exported by Thanos MetaFetcher and re-exports those aggregates as Mimir metrics.

func NewMetadataFetcherMetrics

func NewMetadataFetcherMetrics(logger log.Logger) *MetadataFetcherMetrics

func (*MetadataFetcherMetrics) AddUserRegistry

func (m *MetadataFetcherMetrics) AddUserRegistry(user string, reg *prometheus.Registry)

func (*MetadataFetcherMetrics) Collect

func (m *MetadataFetcherMetrics) Collect(out chan<- prometheus.Metric)

func (*MetadataFetcherMetrics) Describe

func (m *MetadataFetcherMetrics) Describe(out chan<- *prometheus.Desc)

func (*MetadataFetcherMetrics) RemoveUserRegistry

func (m *MetadataFetcherMetrics) RemoveUserRegistry(user string)

type MetadataFilterWithBucketIndex

type MetadataFilterWithBucketIndex interface {
	// FilterWithBucketIndex is like Thanos MetadataFilter.Filter() but it provides in input the bucket index too.
	FilterWithBucketIndex(ctx context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
}

type Part

type Part struct {
	Start uint64
	End   uint64

	ElemRng [2]int
}

type Partitioner

type Partitioner interface {
	// Partition partitions length entries into n <= length ranges that cover all
	// input ranges
	// It supports overlapping ranges.
	// NOTE: It expects range to be sorted by start time.
	Partition(length int, rng func(int) (uint64, uint64)) []Part
}

type RingConfig

type RingConfig struct {
	KVStore              kv.Config     `` /* 213-byte string literal not displayed */
	HeartbeatPeriod      time.Duration `yaml:"heartbeat_period" category:"advanced"`
	HeartbeatTimeout     time.Duration `yaml:"heartbeat_timeout" category:"advanced"`
	ReplicationFactor    int           `yaml:"replication_factor" category:"advanced"`
	TokensFilePath       string        `yaml:"tokens_file_path"`
	NumTokens            int           `yaml:"num_tokens" category:"advanced"`
	ZoneAwarenessEnabled bool          `yaml:"zone_awareness_enabled"`
	AutoForgetEnabled    bool          `yaml:"auto_forget_enabled"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" category:"advanced"`
	InstanceAddr           string   `yaml:"instance_addr" category:"advanced"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" category:"advanced"`
	InstanceZone           string   `yaml:"instance_availability_zone"`

	UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"`

	// Injected internally
	ListenPort      int           `yaml:"-"`
	RingCheckPeriod time.Duration `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the store gateways ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig

func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

type SeriesLimiter

type SeriesLimiter interface {
	// Reserve num series out of the total number of series enforced by the limiter.
	// Returns an error if the limit has been exceeded. This function must be
	// goroutine safe.
	Reserve(num uint64) error
}

type SeriesLimiterFactory

type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter

SeriesLimiterFactory is used to create a new SeriesLimiter.

func NewSeriesLimiterFactory

func NewSeriesLimiterFactory(limitsExtractor func() uint64) SeriesLimiterFactory

NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a dynamic limit.

type ShardingLimits

type ShardingLimits interface {
	StoreGatewayTenantShardSize(userID string) int
}

ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.

type ShardingStrategy

type ShardingStrategy interface {
	// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
	// that should be synced by the store-gateway.
	FilterUsers(ctx context.Context, userIDs []string) ([]string, error)

	// FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway.
	// The provided loaded map contains blocks which have been previously returned by this function and
	// are now loaded or loading in the store-gateway.
	FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error
}

type ShuffleShardingStrategy

type ShuffleShardingStrategy struct {
	// contains filtered or unexported fields
}

ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.

func NewShuffleShardingStrategy

func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy

NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.

func (*ShuffleShardingStrategy) FilterBlocks

func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error

FilterBlocks implements ShardingStrategy.

func (*ShuffleShardingStrategy) FilterUsers

func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)

FilterUsers implements ShardingStrategy.

type StoreGateway

type StoreGateway struct {
	services.Service
	// contains filtered or unexported fields
}

StoreGateway is the Mimir service responsible to expose an API over the bucket where blocks are stored, supporting blocks sharding and replication across a pool of store gateway instances (optional).

func NewStoreGateway

func NewStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer, tracker *activitytracker.ActivityTracker) (*StoreGateway, error)

func (*StoreGateway) BlocksHandler

func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) LabelNames

LabelNames implements the storegatewaypb.StoreGatewayServer interface.

func (*StoreGateway) LabelValues

LabelValues implements the storegatewaypb.StoreGatewayServer interface.

func (*StoreGateway) PrepareShutdownHandler

func (g *StoreGateway) PrepareShutdownHandler(w http.ResponseWriter, req *http.Request)

PrepareShutdownHandler possibly changes the configuration of the store-gateway in such a way that when it is stopped, it gets unregistered from the ring.

Moreover, it creates a file on disk which is used to re-apply the desired configuration if the store-gateway crashes and restarts before being permanently shutdown.

The following methods are possible: * `GET` shows the status of this configuration * `POST` enables this configuration * `DELETE` disables this configuration

func (*StoreGateway) RingHandler

func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) Series

Series implements the storegatewaypb.StoreGatewayServer interface.

func (*StoreGateway) TenantsHandler

func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)

Directories

Path Synopsis
SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/cache/inmemory.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Thanos Authors.
SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/cache/inmemory.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Thanos Authors.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL