Documentation ¶
Index ¶
- Constants
- Variables
- func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing
- func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
- type Block
- type BlockCloser
- type BucketIndexConfig
- type BucketIndexMetadataFetcher
- type BucketStore
- func (s *BucketStore) BlockMetadata(ctx context.Context, req *ingestv1.BlockMetadataRequest) (*ingestv1.BlockMetadataResponse, error)
- func (b *BucketStore) InitialSync(ctx context.Context) error
- func (store *BucketStore) MergeProfilesLabels(ctx context.Context, ...) error
- func (store *BucketStore) MergeProfilesPprof(ctx context.Context, ...) error
- func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, ...) error
- func (store *BucketStore) MergeSpanProfile(ctx context.Context, ...) error
- func (s *BucketStore) RemoveBlocksAndClose() error
- func (b *BucketStore) Stats() BucketStoreStats
- func (s *BucketStore) SyncBlocks(ctx context.Context) error
- type BucketStoreConfig
- type BucketStoreStats
- type BucketStores
- type Config
- type IgnoreDeletionMarkFilter
- func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*block.DeletionMark
- func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error
- func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, ...) error
- type Limits
- type MetadataFilterWithBucketIndex
- type Metrics
- type RingConfig
- type ShardingLimits
- type ShardingStrategy
- type ShuffleShardingStrategy
- type StoreGateway
- func (s *StoreGateway) BlockMetadata(ctx context.Context, req *connect.Request[ingestv1.BlockMetadataRequest]) (*connect.Response[ingestv1.BlockMetadataResponse], error)
- func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
- func (s *StoreGateway) GetBlockStats(ctx context.Context, req *connect.Request[ingestv1.GetBlockStatsRequest]) (*connect.Response[ingestv1.GetBlockStatsResponse], error)
- func (s *StoreGateway) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)
- func (s *StoreGateway) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)
- func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, ...) error
- func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, ...) error
- func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, ...) error
- func (s *StoreGateway) MergeSpanProfile(ctx context.Context, ...) error
- func (s *StoreGateway) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)
- func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
- func (s *StoreGateway) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error)
- func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)
Constants ¶
const ( // RingKey is the key under which we store the store gateways ring in the KVStore. RingKey = "store-gateway" // RingNameForServer is the name of the ring used by the store gateway server. RingNameForServer = "store-gateway" // RingNameForClient is the name of the ring used by the store gateway client (we need // a different name to avoid clashing Prometheus metrics when running in single-binary). RingNameForClient = "store-gateway-client" // We use a safe default instead of exposing to config option to the user // in order to simplify the config. RingNumTokens = 512 )
Variables ¶
var ( // BlocksOwnerSync is the operation used to check the authoritative owners of a block // (replicas included). BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) // BlocksOwnerRead is the operation used to check the authoritative owners of a block // (replicas included) that are available for queries (a store-gateway is available for // queries only when ACTIVE). BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) // BlocksRead is the operation run by the querier to query blocks via the store-gateway. BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { return s != ring.ACTIVE }) )
Functions ¶
func GetShuffleShardingSubring ¶
GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.
func NewShardingMetadataFilterAdapter ¶
func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter
Types ¶
type Block ¶
type Block struct { BlockCloser // contains filtered or unexported fields }
type BlockCloser ¶
type BucketIndexConfig ¶ added in v1.2.0
type BucketIndexConfig struct { UpdateOnErrorInterval time.Duration `yaml:"update_on_error_interval" category:"advanced"` IdleTimeout time.Duration `yaml:"idle_timeout" category:"advanced"` MaxStalePeriod time.Duration `yaml:"max_stale_period" category:"advanced"` }
func (*BucketIndexConfig) RegisterFlagsWithPrefix ¶ added in v1.2.0
func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)
type BucketIndexMetadataFetcher ¶ added in v1.2.0
type BucketIndexMetadataFetcher struct {
// contains filtered or unexported fields
}
BucketIndexMetadataFetcher is a Thanos MetadataFetcher implementation leveraging on the Mimir bucket index.
func NewBucketIndexMetadataFetcher ¶ added in v1.2.0
func NewBucketIndexMetadataFetcher( userID string, bkt objstore.Bucket, cfgProvider objstore.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer, filters []block.MetadataFilter, ) *BucketIndexMetadataFetcher
type BucketStore ¶
type BucketStore struct {
// contains filtered or unexported fields
}
func NewBucketStore ¶
func NewBucketStore(bucket phlareobj.Bucket, fetcher block.MetadataFetcher, tenantID string, syncDir string, logger log.Logger, reg prometheus.Registerer) (*BucketStore, error)
func (*BucketStore) BlockMetadata ¶ added in v1.2.0
func (s *BucketStore) BlockMetadata(ctx context.Context, req *ingestv1.BlockMetadataRequest) (*ingestv1.BlockMetadataResponse, error)
func (*BucketStore) InitialSync ¶
func (b *BucketStore) InitialSync(ctx context.Context) error
func (*BucketStore) MergeProfilesLabels ¶
func (store *BucketStore) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error
func (*BucketStore) MergeProfilesPprof ¶
func (store *BucketStore) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error
func (*BucketStore) MergeProfilesStacktraces ¶
func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error
func (*BucketStore) MergeSpanProfile ¶ added in v1.2.0
func (store *BucketStore) MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeSpanProfileRequest, ingestv1.MergeSpanProfileResponse]) error
func (*BucketStore) RemoveBlocksAndClose ¶
func (s *BucketStore) RemoveBlocksAndClose() error
RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.
func (*BucketStore) Stats ¶
func (b *BucketStore) Stats() BucketStoreStats
func (*BucketStore) SyncBlocks ¶
func (s *BucketStore) SyncBlocks(ctx context.Context) error
type BucketStoreConfig ¶
type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"` IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency" category:"advanced"` IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay" category:"advanced"` }
func (*BucketStoreConfig) RegisterFlags ¶
func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)
RegisterFlags registers the BucketStore flags
type BucketStoreStats ¶
type BucketStoreStats struct { // BlocksLoaded is the number of blocks currently loaded in the bucket store. BlocksLoaded int }
type BucketStores ¶
type BucketStores struct {
// contains filtered or unexported fields
}
func NewBucketStores ¶
func NewBucketStores(cfg BucketStoreConfig, shardingStrategy ShardingStrategy, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)
func (*BucketStores) InitialSync ¶
func (bs *BucketStores) InitialSync(ctx context.Context) error
func (*BucketStores) SyncBlocks ¶
func (bs *BucketStores) SyncBlocks(ctx context.Context) error
SyncBlocks synchronizes the stores state with the Bucket store for every user.
type Config ¶
type Config struct { ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."` BucketStoreConfig BucketStoreConfig `yaml:"bucket_store,omitempty"` }
func (*Config) RegisterFlags ¶
RegisterFlags registers the Config flags.
type IgnoreDeletionMarkFilter ¶ added in v1.2.0
type IgnoreDeletionMarkFilter struct {
// contains filtered or unexported fields
}
IgnoreDeletionMarkFilter is like the Thanos IgnoreDeletionMarkFilter, but it also implements the MetadataFilterWithBucketIndex interface.
func NewIgnoreDeletionMarkFilter ¶ added in v1.2.0
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.BucketReader, delay time.Duration, concurrency int) *IgnoreDeletionMarkFilter
NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func (*IgnoreDeletionMarkFilter) DeletionMarkBlocks ¶ added in v1.2.0
func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*block.DeletionMark
DeletionMarkBlocks returns blocks that were marked for deletion.
func (*IgnoreDeletionMarkFilter) Filter ¶ added in v1.2.0
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced block.GaugeVec) error
Filter implements block.MetadataFilter.
func (*IgnoreDeletionMarkFilter) FilterWithBucketIndex ¶ added in v1.2.0
func (f *IgnoreDeletionMarkFilter) FilterWithBucketIndex(_ context.Context, metas map[ulid.ULID]*block.Meta, idx *bucketindex.Index, synced block.GaugeVec) error
FilterWithBucketIndex implements MetadataFilterWithBucketIndex.
type Limits ¶
type Limits interface { ShardingLimits phlareobj.TenantConfigProvider }
type MetadataFilterWithBucketIndex ¶ added in v1.2.0
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewBucketStoreMetrics ¶ added in v1.3.0
func NewBucketStoreMetrics(reg prometheus.Registerer) *Metrics
func (*Metrics) Unregister ¶ added in v1.3.0
func (m *Metrics) Unregister()
type RingConfig ¶
type RingConfig struct { Ring util.CommonRingConfig `yaml:",inline"` ReplicationFactor int `yaml:"replication_factor" category:"advanced"` TokensFilePath string `yaml:"tokens_file_path"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"` // Instance details InstanceZone string `yaml:"instance_availability_zone"` UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` // Injected internally RingCheckPeriod time.Duration `yaml:"-"` }
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶
func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardingLimits ¶
ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.
type ShardingStrategy ¶
type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. FilterUsers(ctx context.Context, userIDs []string) ([]string, error) // FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway. // The provided loaded map contains blocks which have been previously returned by this function and // are now loaded or loading in the store-gateway. FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error }
type ShuffleShardingStrategy ¶
type ShuffleShardingStrategy struct {
// contains filtered or unexported fields
}
SardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.
func NewShuffleShardingStrategy ¶
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy
NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func (*ShuffleShardingStrategy) FilterBlocks ¶
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*ShuffleShardingStrategy) FilterUsers ¶
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)
FilterUsers implements ShardingStrategy.
type StoreGateway ¶
func NewStoreGateway ¶
func NewStoreGateway(gatewayCfg Config, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)
func (*StoreGateway) BlockMetadata ¶ added in v1.2.0
func (s *StoreGateway) BlockMetadata(ctx context.Context, req *connect.Request[ingestv1.BlockMetadataRequest]) (*connect.Response[ingestv1.BlockMetadataResponse], error)
func (*StoreGateway) BlocksHandler ¶
func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) GetBlockStats ¶ added in v1.6.0
func (s *StoreGateway) GetBlockStats(ctx context.Context, req *connect.Request[ingestv1.GetBlockStatsRequest]) (*connect.Response[ingestv1.GetBlockStatsResponse], error)
func (*StoreGateway) LabelNames ¶ added in v1.2.0
func (s *StoreGateway) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error)
func (*StoreGateway) LabelValues ¶ added in v1.2.0
func (s *StoreGateway) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error)
func (*StoreGateway) MergeProfilesLabels ¶
func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error
func (*StoreGateway) MergeProfilesPprof ¶
func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error
func (*StoreGateway) MergeProfilesStacktraces ¶
func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error
func (*StoreGateway) MergeSpanProfile ¶ added in v1.2.0
func (s *StoreGateway) MergeSpanProfile(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeSpanProfileRequest, ingestv1.MergeSpanProfileResponse]) error
func (*StoreGateway) ProfileTypes ¶ added in v1.2.0
func (s *StoreGateway) ProfileTypes(ctx context.Context, req *connect.Request[ingestv1.ProfileTypesRequest]) (*connect.Response[ingestv1.ProfileTypesResponse], error)
func (*StoreGateway) RingHandler ¶
func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) Series ¶ added in v1.1.0
func (s *StoreGateway) Series(ctx context.Context, req *connect.Request[ingestv1.SeriesRequest]) (*connect.Response[ingestv1.SeriesResponse], error)
func (*StoreGateway) TenantsHandler ¶
func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)