Documentation ¶
Index ¶
- func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits storegateway.ShardingLimits) ring.ReadRing
- type Block
- type BlockCloser
- type BlockMetaFilter
- type BucketStore
- func (b *BucketStore) InitialSync(ctx context.Context) error
- func (store *BucketStore) MergeProfilesLabels(ctx context.Context, ...) error
- func (store *BucketStore) MergeProfilesPprof(ctx context.Context, ...) error
- func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, ...) error
- func (s *BucketStore) RemoveBlocksAndClose() error
- func (b *BucketStore) Stats() storegateway.BucketStoreStats
- func (s *BucketStore) SyncBlocks(ctx context.Context) error
- type BucketStoreConfig
- type BucketStores
- type Config
- type Limits
- type Metrics
- type ShardingStrategy
- type ShuffleShardingStrategy
- type StoreGateway
- func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
- func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, ...) error
- func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, ...) error
- func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, ...) error
- func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
- func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetShuffleShardingSubring ¶
func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits storegateway.ShardingLimits) ring.ReadRing
GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.
Types ¶
type Block ¶
type Block struct { BlockCloser // contains filtered or unexported fields }
type BlockCloser ¶
type BlockMetaFilter ¶
type BlockMetaFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced tsdb_block.GaugeVec) error
}
func NewShardingMetadataFilterAdapter ¶
func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) BlockMetaFilter
type BucketStore ¶
type BucketStore struct {
// contains filtered or unexported fields
}
func NewBucketStore ¶
func NewBucketStore(bucket phlareobj.Bucket, tenantID string, syncDir string, filters []BlockMetaFilter, logger log.Logger, Metrics *Metrics) (*BucketStore, error)
func (*BucketStore) InitialSync ¶
func (b *BucketStore) InitialSync(ctx context.Context) error
func (*BucketStore) MergeProfilesLabels ¶
func (store *BucketStore) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error
func (*BucketStore) MergeProfilesPprof ¶
func (store *BucketStore) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error
func (*BucketStore) MergeProfilesStacktraces ¶
func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error
func (*BucketStore) RemoveBlocksAndClose ¶
func (s *BucketStore) RemoveBlocksAndClose() error
RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.
func (*BucketStore) Stats ¶
func (b *BucketStore) Stats() storegateway.BucketStoreStats
func (*BucketStore) SyncBlocks ¶
func (s *BucketStore) SyncBlocks(ctx context.Context) error
type BucketStoreConfig ¶
type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"` IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"` }
func (*BucketStoreConfig) RegisterFlags ¶
func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)
RegisterFlags registers the BucketStore flags
type BucketStores ¶
type BucketStores struct {
// contains filtered or unexported fields
}
func NewBucketStores ¶
func NewBucketStores(cfg BucketStoreConfig, shardingStrategy ShardingStrategy, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)
func (*BucketStores) InitialSync ¶
func (bs *BucketStores) InitialSync(ctx context.Context) error
func (*BucketStores) SyncBlocks ¶
func (bs *BucketStores) SyncBlocks(ctx context.Context) error
SyncBlocks synchronizes the stores state with the Bucket store for every user.
type Config ¶
type Config struct { storegateway.Config `yaml:",inline"` BucketStoreConfig BucketStoreConfig `yaml:"bucket_store,omitempty"` }
func (*Config) RegisterFlags ¶
RegisterFlags registers the Config flags.
type Limits ¶
type Limits interface { storegateway.ShardingLimits }
type Metrics ¶
type Metrics struct { Synced *prometheus.GaugeVec // contains filtered or unexported fields }
func NewMetrics ¶
func NewMetrics(reg prometheus.Registerer) *Metrics
type ShardingStrategy ¶
type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. FilterUsers(ctx context.Context, userIDs []string) ([]string, error) // FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway. // The provided loaded map contains blocks which have been previously returned by this function and // are now loaded or loading in the store-gateway. FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced tsdb_block.GaugeVec) error }
type ShuffleShardingStrategy ¶
type ShuffleShardingStrategy struct {
// contains filtered or unexported fields
}
SardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.
func NewShuffleShardingStrategy ¶
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits storegateway.ShardingLimits, logger log.Logger) *ShuffleShardingStrategy
NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func (*ShuffleShardingStrategy) FilterBlocks ¶
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced tsdb_block.GaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*ShuffleShardingStrategy) FilterUsers ¶
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)
FilterUsers implements ShardingStrategy.
type StoreGateway ¶
func NewStoreGateway ¶
func NewStoreGateway(gatewayCfg Config, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)
func (*StoreGateway) BlocksHandler ¶
func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) MergeProfilesLabels ¶
func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error
func (*StoreGateway) MergeProfilesPprof ¶
func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error
func (*StoreGateway) MergeProfilesStacktraces ¶
func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error
func (*StoreGateway) RingHandler ¶
func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) TenantsHandler ¶
func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)