Documentation ¶
Index ¶
- Constants
- Variables
- func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing
- func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
- type BucketIndexMetadataFetcher
- type BucketStore
- func (s *BucketStore) InitialSync(ctx context.Context) error
- func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
- func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
- func (s *BucketStore) RemoveBlocksAndClose() error
- func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) (err error)
- func (s *BucketStore) Stats() BucketStoreStats
- func (s *BucketStore) SyncBlocks(ctx context.Context) error
- func (s *BucketStore) TimeRange() (mint, maxt int64)
- type BucketStoreMetrics
- type BucketStoreOption
- type BucketStoreStats
- type BucketStores
- func (u *BucketStores) Collect(metrics chan<- prometheus.Metric)
- func (u *BucketStores) Describe(descs chan<- *prometheus.Desc)
- func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
- func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
- func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error
- func (u *BucketStores) SyncBlocks(ctx context.Context) error
- type ChunksLimiter
- type ChunksLimiterFactory
- type Config
- type IgnoreDeletionMarkFilter
- func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*block.DeletionMark
- func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error
- func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, ...) error
- type Limiter
- type MetadataFetcherMetrics
- type MetadataFilterWithBucketIndex
- type Part
- type Partitioner
- type RingConfig
- type SeriesLimiter
- type SeriesLimiterFactory
- type ShardingLimits
- type ShardingStrategy
- type ShuffleShardingStrategy
- type StoreGateway
- func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
- func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
- func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
- func (g *StoreGateway) PrepareShutdownHandler(w http.ResponseWriter, req *http.Request)
- func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
- func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error
- func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)
Constants ¶
const ( // RingKey is the key under which we store the store gateways ring in the KVStore. RingKey = "store-gateway" // RingNameForServer is the name of the ring used by the store gateway server. RingNameForServer = "store-gateway" // RingNameForClient is the name of the ring used by the store gateway client (we need // a different name to avoid clashing Prometheus metrics when running in single-binary). RingNameForClient = "store-gateway-client" )
const GrpcContextMetadataTenantID = "__org_id__"
GrpcContextMetadataTenantID is a key for GRPC Metadata used to pass tenant ID to store-gateway process. (This is now separate from DeprecatedTenantIDExternalLabel to signify different use case.)
Variables ¶
var ( // BlocksOwnerSync is the operation used to check the authoritative owners of a block // (replicas included). BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) // BlocksOwnerRead is the operation used to check the authoritative owners of a block // (replicas included) that are available for queries (a store-gateway is available for // queries only when ACTIVE). BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) // BlocksRead is the operation run by the querier to query blocks via the store-gateway. BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { return s != ring.ACTIVE }) )
Functions ¶
func GetShuffleShardingSubring ¶
GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.
func NewShardingMetadataFilterAdapter ¶
func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
Types ¶
type BucketIndexMetadataFetcher ¶
type BucketIndexMetadataFetcher struct {
// contains filtered or unexported fields
}
BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Mimir bucket index.
func NewBucketIndexMetadataFetcher ¶
func NewBucketIndexMetadataFetcher( userID string, bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer, filters []block.MetadataFilter, ) *BucketIndexMetadataFetcher
type BucketStore ¶
BucketStore implements the store API backed by a bucket. It loads all index files to local disk.
NOTE: Bucket store reencodes postings using diff+varint+snappy when storing to cache. This makes them smaller, but takes extra CPU and memory. When used with in-memory cache, memory usage should decrease overall, thanks to postings being smaller.
func NewBucketStore ¶
func NewBucketStore( userID string, bkt objstore.InstrumentedBucketReader, fetcher block.MetadataFetcher, dir string, bucketStoreConfig tsdb.BucketStoreConfig, postingsStrategy postingsSelectionStrategy, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, partitioners blockPartitioners, seriesHashCache *hashcache.SeriesHashCache, metrics *BucketStoreMetrics, options ...BucketStoreOption, ) (*BucketStore, error)
NewBucketStore creates a new bucket backed store that implements the store API against an object store bucket. It is optimized to work against high latency backends.
func (*BucketStore) InitialSync ¶
func (s *BucketStore) InitialSync(ctx context.Context) error
InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup.
func (*BucketStore) LabelNames ¶
func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
LabelNames implements the storegatewaypb.StoreGatewayServer interface.
func (*BucketStore) LabelValues ¶
func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
LabelValues implements the storegatewaypb.StoreGatewayServer interface.
func (*BucketStore) RemoveBlocksAndClose ¶
func (s *BucketStore) RemoveBlocksAndClose() error
RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.
func (*BucketStore) Series ¶
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) (err error)
Series implements the storegatewaypb.StoreGatewayServer interface.
func (*BucketStore) Stats ¶
func (s *BucketStore) Stats() BucketStoreStats
Stats returns statistics about the BucketStore instance.
func (*BucketStore) SyncBlocks ¶
func (s *BucketStore) SyncBlocks(ctx context.Context) error
SyncBlocks synchronizes the stores state with the Bucket bucket. It will reuse disk space as persistent cache based on s.dir param.
func (*BucketStore) TimeRange ¶
func (s *BucketStore) TimeRange() (mint, maxt int64)
TimeRange returns the minimum and maximum timestamp of data available in the store.
type BucketStoreMetrics ¶
type BucketStoreMetrics struct {
// contains filtered or unexported fields
}
BucketStoreMetrics holds all the metrics tracked by BucketStore. These metrics MUST be monotonic (counter, summary, histogram) because a single metrics instance can be passed to multiple BucketStore and metrics MUST be correct even after a BucketStore is offloaded.
func NewBucketStoreMetrics ¶
func NewBucketStoreMetrics(reg prometheus.Registerer) *BucketStoreMetrics
type BucketStoreOption ¶
type BucketStoreOption func(s *BucketStore)
BucketStoreOption are functions that configure BucketStore.
func WithIndexCache ¶
func WithIndexCache(cache indexcache.IndexCache) BucketStoreOption
WithIndexCache sets a indexCache to use instead of a noopCache.
func WithLazyLoadingGate ¶
func WithLazyLoadingGate(lazyLoadingGate gate.Gate) BucketStoreOption
WithLazyLoadingGate sets a lazyLoadingGate to use instead of a gate.NewNoop().
func WithLogger ¶
func WithLogger(logger log.Logger) BucketStoreOption
WithLogger sets the BucketStore logger to the one you pass.
func WithQueryGate ¶
func WithQueryGate(queryGate gate.Gate) BucketStoreOption
WithQueryGate sets a queryGate to use instead of a gate.NewNoop().
type BucketStoreStats ¶
type BucketStoreStats struct { // BlocksLoadedTotal is the total number of blocks currently loaded in the bucket store. BlocksLoadedTotal int }
type BucketStores ¶
BucketStores is a multi-tenant wrapper of Thanos BucketStore.
func NewBucketStores ¶
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, allowedTenants *util.AllowedTenants, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)
NewBucketStores makes a new BucketStores. After starting the returned BucketStores
func (*BucketStores) Collect ¶
func (u *BucketStores) Collect(metrics chan<- prometheus.Metric)
func (*BucketStores) Describe ¶
func (u *BucketStores) Describe(descs chan<- *prometheus.Desc)
func (*BucketStores) LabelNames ¶
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
LabelNames implements the storegatewaypb.StoreGatewayServer interface.
func (*BucketStores) LabelValues ¶
func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
LabelValues implements the storegatewaypb.StoreGatewayServer interface.
func (*BucketStores) Series ¶
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error
Series implements the storegatewaypb.StoreGatewayServer interface, making a series request to the underlying user bucket store.
func (*BucketStores) SyncBlocks ¶
func (u *BucketStores) SyncBlocks(ctx context.Context) error
SyncBlocks synchronizes the stores state with the Bucket store for every user.
type ChunksLimiter ¶
type ChunksLimiterFactory ¶
type ChunksLimiterFactory func(failedCounter prometheus.Counter) ChunksLimiter
ChunksLimiterFactory is used to create a new ChunksLimiter. The factory is useful for projects depending on Thanos which have dynamic limits.
func NewChunksLimiterFactory ¶
func NewChunksLimiterFactory(limitsExtractor func() uint64) ChunksLimiterFactory
NewChunksLimiterFactory makes a new ChunksLimiterFactory with a dynamic limit.
type Config ¶
type Config struct { ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."` EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants" category:"advanced"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants" category:"advanced"` }
Config holds the store gateway config.
func (*Config) RegisterFlags ¶
RegisterFlags registers the Config flags.
type IgnoreDeletionMarkFilter ¶
type IgnoreDeletionMarkFilter struct {
// contains filtered or unexported fields
}
IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements the MetadataFilterWithBucketIndex interface.
func NewIgnoreDeletionMarkFilter ¶
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter
NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func (*IgnoreDeletionMarkFilter) DeletionMarkBlocks ¶
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*block.DeletionMark
DeletionMarkBlocks returns blocks that were marked for deletion.
func (*IgnoreDeletionMarkFilter) Filter ¶
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error
Filter implements block.MetadataFilter.
func (*IgnoreDeletionMarkFilter) FilterWithBucketIndex ¶
func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
FilterWithBucketIndex implements MetadataFilterWithBucketIndex.
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter is a simple mechanism for checking if something has passed a certain threshold.
func NewLimiter ¶
func NewLimiter(limit uint64, ctr prometheus.Counter, limitErrorFunc func(uint64) validation.LimitError) *Limiter
NewLimiter returns a new limiter with a specified limit. 0 disables the limit.
type MetadataFetcherMetrics ¶
type MetadataFetcherMetrics struct {
// contains filtered or unexported fields
}
MetadataFetcherMetrics aggregates metrics exported by Thanos MetaFetcher and re-exports those aggregates as Mimir metrics.
func NewMetadataFetcherMetrics ¶
func NewMetadataFetcherMetrics(logger log.Logger) *MetadataFetcherMetrics
func (*MetadataFetcherMetrics) AddUserRegistry ¶
func (m *MetadataFetcherMetrics) AddUserRegistry(user string, reg *prometheus.Registry)
func (*MetadataFetcherMetrics) Collect ¶
func (m *MetadataFetcherMetrics) Collect(out chan<- prometheus.Metric)
func (*MetadataFetcherMetrics) Describe ¶
func (m *MetadataFetcherMetrics) Describe(out chan<- *prometheus.Desc)
func (*MetadataFetcherMetrics) RemoveUserRegistry ¶
func (m *MetadataFetcherMetrics) RemoveUserRegistry(user string)
type Partitioner ¶
type RingConfig ¶
type RingConfig struct { KVStore kv.Config `` /* 213-byte string literal not displayed */ HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"` ReplicationFactor int `yaml:"replication_factor" category:"advanced"` TokensFilePath string `yaml:"tokens_file_path"` NumTokens int `yaml:"num_tokens" category:"advanced"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` AutoForgetEnabled bool `yaml:"auto_forget_enabled"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"` // Instance details InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"` InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"` InstancePort int `yaml:"instance_port" category:"advanced"` InstanceAddr string `yaml:"instance_addr" category:"advanced"` EnableIPv6 bool `yaml:"instance_enable_ipv6" category:"advanced"` InstanceZone string `yaml:"instance_availability_zone"` UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` // Injected internally ListenPort int `yaml:"-"` RingCheckPeriod time.Duration `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the store gateways ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶
func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type SeriesLimiter ¶
type SeriesLimiterFactory ¶
type SeriesLimiterFactory func(failedCounter prometheus.Counter) SeriesLimiter
SeriesLimiterFactory is used to create a new SeriesLimiter.
func NewSeriesLimiterFactory ¶
func NewSeriesLimiterFactory(limitsExtractor func() uint64) SeriesLimiterFactory
NewSeriesLimiterFactory makes a new NewSeriesLimiterFactory with a dynamic limit.
type ShardingLimits ¶
ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.
type ShardingStrategy ¶
type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. FilterUsers(ctx context.Context, userIDs []string) ([]string, error) // FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway. // The provided loaded map contains blocks which have been previously returned by this function and // are now loaded or loading in the store-gateway. FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error }
type ShuffleShardingStrategy ¶
type ShuffleShardingStrategy struct {
// contains filtered or unexported fields
}
ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.
func NewShuffleShardingStrategy ¶
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy
NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func (*ShuffleShardingStrategy) FilterBlocks ¶
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*ShuffleShardingStrategy) FilterUsers ¶
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)
FilterUsers implements ShardingStrategy.
type StoreGateway ¶
StoreGateway is the Mimir service responsible to expose an API over the bucket where blocks are stored, supporting blocks sharding and replication across a pool of store gateway instances (optional).
func NewStoreGateway ¶
func NewStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer, tracker *activitytracker.ActivityTracker) (*StoreGateway, error)
func (*StoreGateway) BlocksHandler ¶
func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) LabelNames ¶
func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
LabelNames implements the storegatewaypb.StoreGatewayServer interface.
func (*StoreGateway) LabelValues ¶
func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
LabelValues implements the storegatewaypb.StoreGatewayServer interface.
func (*StoreGateway) PrepareShutdownHandler ¶
func (g *StoreGateway) PrepareShutdownHandler(w http.ResponseWriter, req *http.Request)
PrepareShutdownHandler possibly changes the configuration of the store-gateway in such a way that when it is stopped, it gets unregistered from the ring.
Moreover, it creates a file on disk which is used to re-apply the desired configuration if the store-gateway crashes and restarts before being permanently shutdown.
The following methods are possible: * `GET` shows the status of this configuration * `POST` enables this configuration * `DELETE` disables this configuration
func (*StoreGateway) RingHandler ¶
func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) Series ¶
func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error
Series implements the storegatewaypb.StoreGatewayServer interface.
func (*StoreGateway) TenantsHandler ¶
func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)
Source Files ¶
- bucket.go
- bucket_chunk_reader.go
- bucket_index_metadata_fetcher.go
- bucket_index_postings.go
- bucket_index_reader.go
- bucket_store_metrics.go
- bucket_stores.go
- error.go
- gateway.go
- gateway_blocks_http.go
- gateway_prepare_shutdown_http.go
- gateway_ring.go
- gateway_ring_http.go
- gateway_tenants_http.go
- io.go
- limiter.go
- metadata_fetcher_filters.go
- metadata_fetcher_metrics.go
- partitioner.go
- postings_codec.go
- series_chunks.go
- series_refs.go
- series_refs_streaming.go
- sharding_strategy.go
- snappy_gob_codec.go
- stats.go
Directories ¶
Path | Synopsis |
---|---|
SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/cache/inmemory.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Thanos Authors.
|
SPDX-License-Identifier: AGPL-3.0-only Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/cache/inmemory.go Provenance-includes-license: Apache-2.0 Provenance-includes-copyright: The Thanos Authors. |