Documentation ¶
Index ¶
- Constants
- Variables
- func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits, ...) ring.ReadRing
- func NewShardingBucketReaderAdapter(userID string, strategy ShardingStrategy, ...) objstore.InstrumentedBucketReader
- func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
- type BucketIndexMetadataFetcher
- type BucketStoreMetrics
- type BucketStores
- func (u *BucketStores) InitialSync(ctx context.Context) error
- func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
- func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
- func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error
- func (u *BucketStores) SyncBlocks(ctx context.Context) error
- type Config
- type DefaultShardingStrategy
- type IgnoreDeletionMarkFilter
- func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark
- func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, ...) error
- func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, ...) error
- type IgnoreNonQueryableBlocksFilter
- type MetadataFetcherMetrics
- type MetadataFilterWithBucketIndex
- type NoShardingStrategy
- type ReplicaLabelRemover
- type RingConfig
- type ShardingLimits
- type ShardingStrategy
- type ShuffleShardingStrategy
- type StoreGateway
- func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
- func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
- func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)
- func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, ...) (ring.InstanceState, ring.Tokens)
- func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler)
- func (g *StoreGateway) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
- func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
- func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error
Constants ¶
const ( // RingKey is the key under which we store the store gateways ring in the KVStore. RingKey = "store-gateway" // RingNameForServer is the name of the ring used by the store gateway server. RingNameForServer = "store-gateway" // RingNameForClient is the name of the ring used by the store gateway client (we need // a different name to avoid clashing Prometheus metrics when running in single-binary). RingNameForClient = "store-gateway-client" // We use a safe default instead of exposing to config option to the user // in order to simplify the config. RingNumTokens = 512 )
Variables ¶
var ( // BlocksOwnerSync is the operation used to check the authoritative owners of a block // (replicas included). BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, func(s ring.InstanceState) bool { return s == ring.LEAVING }) // BlocksOwnerRead is the operation used to check the authoritative owners of a block // (replicas included) that are available for queries (a store-gateway is available for // queries only when ACTIVE). BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) // BlocksRead is the operation run by the querier to query blocks via the store-gateway. BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { return s != ring.ACTIVE }) )
var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")
Functions ¶
func GetShuffleShardingSubring ¶ added in v1.4.0
func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits, zoneStableShuffleSharding bool) ring.ReadRing
GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.
func NewShardingBucketReaderAdapter ¶ added in v1.4.0
func NewShardingBucketReaderAdapter(userID string, strategy ShardingStrategy, wrapped objstore.InstrumentedBucketReader) objstore.InstrumentedBucketReader
func NewShardingMetadataFilterAdapter ¶ added in v1.4.0
func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
Types ¶
type BucketIndexMetadataFetcher ¶ added in v1.7.0
type BucketIndexMetadataFetcher struct {
// contains filtered or unexported fields
}
BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Cortex bucket index.
func NewBucketIndexMetadataFetcher ¶ added in v1.7.0
func NewBucketIndexMetadataFetcher( userID string, bkt objstore.Bucket, strategy ShardingStrategy, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer, filters []block.MetadataFilter, ) *BucketIndexMetadataFetcher
func (*BucketIndexMetadataFetcher) Fetch ¶ added in v1.7.0
func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
Fetch implements block.MetadataFetcher. Not goroutine-safe.
func (*BucketIndexMetadataFetcher) UpdateOnChange ¶ added in v1.7.0
func (f *BucketIndexMetadataFetcher) UpdateOnChange(callback func([]metadata.Meta, error))
type BucketStoreMetrics ¶ added in v1.1.0
type BucketStoreMetrics struct {
// contains filtered or unexported fields
}
BucketStoreMetrics aggregates metrics exported by Thanos Bucket Store and re-exports those aggregates as Cortex metrics.
func NewBucketStoreMetrics ¶ added in v1.1.0
func NewBucketStoreMetrics() *BucketStoreMetrics
func (*BucketStoreMetrics) AddUserRegistry ¶ added in v1.1.0
func (m *BucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry)
func (*BucketStoreMetrics) Collect ¶ added in v1.1.0
func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric)
func (*BucketStoreMetrics) Describe ¶ added in v1.1.0
func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc)
func (*BucketStoreMetrics) RemoveUserRegistry ¶ added in v1.8.0
func (m *BucketStoreMetrics) RemoveUserRegistry(user string)
type BucketStores ¶ added in v1.1.0
type BucketStores struct {
// contains filtered or unexported fields
}
BucketStores is a multi-tenant wrapper of Thanos BucketStore.
func NewBucketStores ¶ added in v1.1.0
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)
NewBucketStores makes a new BucketStores.
func (*BucketStores) InitialSync ¶ added in v1.1.0
func (u *BucketStores) InitialSync(ctx context.Context) error
InitialSync does an initial synchronization of blocks for all users.
func (*BucketStores) LabelNames ¶ added in v1.6.0
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
LabelNames implements the Storegateway proto service.
func (*BucketStores) LabelValues ¶ added in v1.6.0
func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
LabelValues implements the Storegateway proto service.
func (*BucketStores) Series ¶ added in v1.1.0
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error
Series makes a series request to the underlying user bucket store.
func (*BucketStores) SyncBlocks ¶ added in v1.1.0
func (u *BucketStores) SyncBlocks(ctx context.Context) error
SyncBlocks synchronizes the stores state with the Bucket store for every user.
type Config ¶
type Config struct { ShardingEnabled bool `yaml:"sharding_enabled"` ShardingRing RingConfig `` /* 127-byte string literal not displayed */ ShardingStrategy string `yaml:"sharding_strategy"` }
Config holds the store gateway config.
func (*Config) RegisterFlags ¶
RegisterFlags registers the Config flags.
type DefaultShardingStrategy ¶ added in v1.4.0
type DefaultShardingStrategy struct {
// contains filtered or unexported fields
}
DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. Not go-routine safe.
func NewDefaultShardingStrategy ¶ added in v1.4.0
func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy
NewDefaultShardingStrategy creates DefaultShardingStrategy.
func (*DefaultShardingStrategy) FilterBlocks ¶ added in v1.4.0
func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*DefaultShardingStrategy) FilterUsers ¶ added in v1.4.0
func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string
FilterUsers implements ShardingStrategy.
type IgnoreDeletionMarkFilter ¶ added in v1.7.0
type IgnoreDeletionMarkFilter struct {
// contains filtered or unexported fields
}
IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements the MetadataFilterWithBucketIndex interface.
func NewIgnoreDeletionMarkFilter ¶ added in v1.7.0
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter
NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func (*IgnoreDeletionMarkFilter) DeletionMarkBlocks ¶ added in v1.7.0
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.DeletionMark
DeletionMarkBlocks returns blocks that were marked for deletion.
func (*IgnoreDeletionMarkFilter) Filter ¶ added in v1.7.0
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error
Filter implements block.MetadataFilter.
func (*IgnoreDeletionMarkFilter) FilterWithBucketIndex ¶ added in v1.7.0
func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*metadata.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
FilterWithBucketIndex implements MetadataFilterWithBucketIndex.
type IgnoreNonQueryableBlocksFilter ¶ added in v1.15.0
type IgnoreNonQueryableBlocksFilter struct {
// contains filtered or unexported fields
}
IgnoreNonQueryableBlocksFilter ignores blocks that are too new be queried. This has be used in conjunction with `-querier.query-store-after` with some buffer.
func NewIgnoreNonQueryableBlocksFilter ¶ added in v1.15.0
func NewIgnoreNonQueryableBlocksFilter(logger log.Logger, ignoreWithin time.Duration) *IgnoreNonQueryableBlocksFilter
type MetadataFetcherMetrics ¶ added in v1.1.0
type MetadataFetcherMetrics struct {
// contains filtered or unexported fields
}
This struct aggregates metrics exported by Thanos MetaFetcher and re-exports those aggregates as Cortex metrics.
func NewMetadataFetcherMetrics ¶ added in v1.1.0
func NewMetadataFetcherMetrics() *MetadataFetcherMetrics
func (*MetadataFetcherMetrics) AddUserRegistry ¶ added in v1.1.0
func (m *MetadataFetcherMetrics) AddUserRegistry(user string, reg *prometheus.Registry)
func (*MetadataFetcherMetrics) Collect ¶ added in v1.1.0
func (m *MetadataFetcherMetrics) Collect(out chan<- prometheus.Metric)
func (*MetadataFetcherMetrics) Describe ¶ added in v1.1.0
func (m *MetadataFetcherMetrics) Describe(out chan<- *prometheus.Desc)
func (*MetadataFetcherMetrics) RemoveUserRegistry ¶ added in v1.8.0
func (m *MetadataFetcherMetrics) RemoveUserRegistry(user string)
type MetadataFilterWithBucketIndex ¶ added in v1.7.0
type NoShardingStrategy ¶ added in v1.4.0
type NoShardingStrategy struct{}
NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
func NewNoShardingStrategy ¶ added in v1.4.0
func NewNoShardingStrategy() *NoShardingStrategy
func (*NoShardingStrategy) FilterBlocks ¶ added in v1.4.0
func (*NoShardingStrategy) FilterUsers ¶ added in v1.4.0
func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string
type ReplicaLabelRemover ¶ added in v1.2.0
type ReplicaLabelRemover struct {
// contains filtered or unexported fields
}
ReplicaLabelRemover is a BaseFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func NewReplicaLabelRemover ¶ added in v1.2.0
func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaLabelRemover
NewReplicaLabelRemover creates a ReplicaLabelRemover.
type RingConfig ¶
type RingConfig struct { KVStore kv.Config `` /* 206-byte string literal not displayed */ HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` ReplicationFactor int `yaml:"replication_factor"` TokensFilePath string `yaml:"tokens_file_path"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` KeepInstanceInTheRingOnShutdown bool `yaml:"keep_instance_in_the_ring_on_shutdown"` ZoneStableShuffleSharding bool `yaml:"zone_stable_shuffle_sharding" doc:"hidden"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration"` WaitInstanceStateTimeout time.Duration `yaml:"wait_instance_state_timeout"` FinalSleep time.Duration `yaml:"final_sleep"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` InstanceZone string `yaml:"instance_availability_zone"` // Injected internally ListenPort int `yaml:"-"` RingCheckPeriod time.Duration `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the store gateways ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶
func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
func (*RingConfig) ToRingConfig ¶ added in v1.1.0
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardingLimits ¶ added in v1.4.0
ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.
type ShardingStrategy ¶ added in v1.4.0
type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. FilterUsers(ctx context.Context, userIDs []string) []string // FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway. // The provided loaded map contains blocks which have been previously returned by this function and // are now loaded or loading in the store-gateway. FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error }
type ShuffleShardingStrategy ¶ added in v1.4.0
type ShuffleShardingStrategy struct {
// contains filtered or unexported fields
}
ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.
func NewShuffleShardingStrategy ¶ added in v1.4.0
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, zoneStableShuffleSharding bool) *ShuffleShardingStrategy
NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func (*ShuffleShardingStrategy) FilterBlocks ¶ added in v1.4.0
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*ShuffleShardingStrategy) FilterUsers ¶ added in v1.4.0
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string
FilterUsers implements ShardingStrategy.
type StoreGateway ¶
StoreGateway is the Cortex service responsible to expose an API over the bucket where blocks are stored, supporting blocks sharding and replication across a pool of store gateway instances (optional).
func NewStoreGateway ¶
func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)
func (*StoreGateway) LabelNames ¶ added in v1.6.0
func (g *StoreGateway) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
LabelNames implements the Storegateway proto service.
func (*StoreGateway) LabelValues ¶ added in v1.6.0
func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
LabelValues implements the Storegateway proto service.
func (*StoreGateway) OnRingInstanceHeartbeat ¶ added in v1.1.0
func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc)
func (*StoreGateway) OnRingInstanceRegister ¶ added in v1.1.0
func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens)
func (*StoreGateway) OnRingInstanceStopping ¶ added in v1.1.0
func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler)
func (*StoreGateway) OnRingInstanceTokens ¶ added in v1.1.0
func (g *StoreGateway) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
func (*StoreGateway) RingHandler ¶
func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) Series ¶ added in v1.1.0
func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error