Documentation ¶
Index ¶
- Constants
- func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing
- func NewShardingBucketReaderAdapter(userID string, strategy ShardingStrategy, ...) objstore.InstrumentedBucketReader
- func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
- type BlocksReplicationStrategy
- type BucketStoreMetrics
- type BucketStores
- type Config
- type DefaultShardingStrategy
- type MetadataFetcherMetrics
- type NoShardingStrategy
- type ReplicaLabelRemover
- type RingConfig
- type ShardingLimits
- type ShardingStrategy
- type ShuffleShardingStrategy
- type StoreGateway
- func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.IngesterDesc)
- func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, ...) (ring.IngesterState, ring.Tokens)
- func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler)
- func (g *StoreGateway) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
- func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
- func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error
Constants ¶
const ( // RingKey is the key under which we store the store gateways ring in the KVStore. RingKey = "store-gateway" // RingNameForServer is the name of the ring used by the store gateway server. RingNameForServer = "store-gateway" // RingNameForClient is the name of the ring used by the store gateway client (we need // a different name to avoid clashing Prometheus metrics when running in single-binary). RingNameForClient = "store-gateway-client" // We use a safe default instead of exposing to config option to the user // in order to simplify the config. RingNumTokens = 512 )
Variables ¶
This section is empty.
Functions ¶
func GetShuffleShardingSubring ¶
GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.
func NewShardingBucketReaderAdapter ¶
func NewShardingBucketReaderAdapter(userID string, strategy ShardingStrategy, wrapped objstore.InstrumentedBucketReader) objstore.InstrumentedBucketReader
func NewShardingMetadataFilterAdapter ¶
func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
Types ¶
type BlocksReplicationStrategy ¶
type BlocksReplicationStrategy struct{}
func (*BlocksReplicationStrategy) Filter ¶
func (s *BlocksReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, _ int, heartbeatTimeout time.Duration, _ bool) ([]ring.IngesterDesc, int, error)
func (*BlocksReplicationStrategy) ShouldExtendReplicaSet ¶
func (s *BlocksReplicationStrategy) ShouldExtendReplicaSet(instance ring.IngesterDesc, op ring.Operation) bool
type BucketStoreMetrics ¶
type BucketStoreMetrics struct {
// contains filtered or unexported fields
}
BucketStoreMetrics aggregates metrics exported by Thanos Bucket Store and re-exports those aggregates as Cortex metrics.
func NewBucketStoreMetrics ¶
func NewBucketStoreMetrics() *BucketStoreMetrics
func (*BucketStoreMetrics) AddUserRegistry ¶
func (m *BucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry)
func (*BucketStoreMetrics) Collect ¶
func (m *BucketStoreMetrics) Collect(out chan<- prometheus.Metric)
func (*BucketStoreMetrics) Describe ¶
func (m *BucketStoreMetrics) Describe(out chan<- *prometheus.Desc)
type BucketStores ¶
type BucketStores struct {
// contains filtered or unexported fields
}
BucketStores is a multi-tenant wrapper of Thanos BucketStore.
func NewBucketStores ¶
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)
NewBucketStores makes a new BucketStores.
func (*BucketStores) InitialSync ¶
func (u *BucketStores) InitialSync(ctx context.Context) error
InitialSync does an initial synchronization of blocks for all users.
func (*BucketStores) Series ¶
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error
Series makes a series request to the underlying user bucket store.
func (*BucketStores) SyncBlocks ¶
func (u *BucketStores) SyncBlocks(ctx context.Context) error
SyncBlocks synchronizes the stores state with the Bucket store for every user.
type Config ¶
type Config struct { ShardingEnabled bool `yaml:"sharding_enabled"` ShardingRing RingConfig `` /* 127-byte string literal not displayed */ ShardingStrategy string `yaml:"sharding_strategy"` }
Config holds the store gateway config.
func (*Config) RegisterFlags ¶
RegisterFlags registers the Config flags.
type DefaultShardingStrategy ¶
type DefaultShardingStrategy struct {
// contains filtered or unexported fields
}
DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. Not go-routine safe.
func NewDefaultShardingStrategy ¶
func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy
NewDefaultShardingStrategy creates DefaultShardingStrategy.
func (*DefaultShardingStrategy) FilterBlocks ¶
func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*DefaultShardingStrategy) FilterUsers ¶
func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string
FilterUsers implements ShardingStrategy.
type MetadataFetcherMetrics ¶
type MetadataFetcherMetrics struct {
// contains filtered or unexported fields
}
This struct aggregates metrics exported by Thanos MetaFetcher and re-exports those aggregates as Cortex metrics.
func NewMetadataFetcherMetrics ¶
func NewMetadataFetcherMetrics() *MetadataFetcherMetrics
func (*MetadataFetcherMetrics) AddUserRegistry ¶
func (m *MetadataFetcherMetrics) AddUserRegistry(user string, reg *prometheus.Registry)
func (*MetadataFetcherMetrics) Collect ¶
func (m *MetadataFetcherMetrics) Collect(out chan<- prometheus.Metric)
func (*MetadataFetcherMetrics) Describe ¶
func (m *MetadataFetcherMetrics) Describe(out chan<- *prometheus.Desc)
type NoShardingStrategy ¶
type NoShardingStrategy struct{}
NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
func NewNoShardingStrategy ¶
func NewNoShardingStrategy() *NoShardingStrategy
func (*NoShardingStrategy) FilterBlocks ¶
func (*NoShardingStrategy) FilterUsers ¶
func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string
type ReplicaLabelRemover ¶
type ReplicaLabelRemover struct {
// contains filtered or unexported fields
}
ReplicaLabelRemover is a BaseFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func NewReplicaLabelRemover ¶
func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaLabelRemover
NewReplicaLabelRemover creates a ReplicaLabelRemover.
func (*ReplicaLabelRemover) Modify ¶
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*thanos_metadata.Meta, modified *extprom.TxGaugeVec) error
Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
type RingConfig ¶
type RingConfig struct { KVStore kv.Config `` /* 206-byte string literal not displayed */ HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` ReplicationFactor int `yaml:"replication_factor"` TokensFilePath string `yaml:"tokens_file_path"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` InstanceZone string `yaml:"instance_availability_zone"` // Injected internally ListenPort int `yaml:"-"` RingCheckPeriod time.Duration `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the store gateways ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶
func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error)
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardingLimits ¶
ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.
type ShardingStrategy ¶
type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. FilterUsers(ctx context.Context, userIDs []string) []string // FilterBlocks that should be loaded by the store-gateway. FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error }
type ShuffleShardingStrategy ¶
type ShuffleShardingStrategy struct {
// contains filtered or unexported fields
}
ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.
func NewShuffleShardingStrategy ¶
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy
NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func (*ShuffleShardingStrategy) FilterBlocks ¶
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*ShuffleShardingStrategy) FilterUsers ¶
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string
FilterUsers implements ShardingStrategy.
type StoreGateway ¶
StoreGateway is the Cortex service responsible to expose an API over the bucket where blocks are stored, supporting blocks sharding and replication across a pool of store gateway instances (optional).
func NewStoreGateway ¶
func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)
func (*StoreGateway) OnRingInstanceHeartbeat ¶
func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.IngesterDesc)
func (*StoreGateway) OnRingInstanceRegister ¶
func (g *StoreGateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.IngesterDesc) (ring.IngesterState, ring.Tokens)
func (*StoreGateway) OnRingInstanceStopping ¶
func (g *StoreGateway) OnRingInstanceStopping(_ *ring.BasicLifecycler)
func (*StoreGateway) OnRingInstanceTokens ¶
func (g *StoreGateway) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens)
func (*StoreGateway) RingHandler ¶
func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) Series ¶
func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error