Documentation ¶
Index ¶
- Constants
- Variables
- func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing
- type Block
- type BlockCloser
- type BlockMetaFilter
- type BucketStore
- func (b *BucketStore) InitialSync(ctx context.Context) error
- func (store *BucketStore) MergeProfilesLabels(ctx context.Context, ...) error
- func (store *BucketStore) MergeProfilesPprof(ctx context.Context, ...) error
- func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, ...) error
- func (s *BucketStore) RemoveBlocksAndClose() error
- func (b *BucketStore) Stats() BucketStoreStats
- func (s *BucketStore) SyncBlocks(ctx context.Context) error
- type BucketStoreConfig
- type BucketStoreStats
- type BucketStores
- type Config
- type GaugeVec
- type Limits
- type Metrics
- type RingConfig
- type ShardingLimits
- type ShardingStrategy
- type ShuffleShardingStrategy
- type StoreGateway
- func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
- func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, ...) error
- func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, ...) error
- func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, ...) error
- func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
- func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)
Constants ¶
const ( // RingKey is the key under which we store the store gateways ring in the KVStore. RingKey = "store-gateway" // RingNameForServer is the name of the ring used by the store gateway server. RingNameForServer = "store-gateway" // RingNameForClient is the name of the ring used by the store gateway client (we need // a different name to avoid clashing Prometheus metrics when running in single-binary). RingNameForClient = "store-gateway-client" // We use a safe default instead of exposing to config option to the user // in order to simplify the config. RingNumTokens = 512 )
Variables ¶
var ( // BlocksOwnerSync is the operation used to check the authoritative owners of a block // (replicas included). BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) // BlocksOwnerRead is the operation used to check the authoritative owners of a block // (replicas included) that are available for queries (a store-gateway is available for // queries only when ACTIVE). BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) // BlocksRead is the operation run by the querier to query blocks via the store-gateway. BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { return s != ring.ACTIVE }) )
Functions ¶
func GetShuffleShardingSubring ¶
GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.
Types ¶
type Block ¶
type Block struct { BlockCloser // contains filtered or unexported fields }
type BlockCloser ¶
type BlockMetaFilter ¶
type BlockMetaFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced GaugeVec) error
}
func NewShardingMetadataFilterAdapter ¶
func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) BlockMetaFilter
type BucketStore ¶
type BucketStore struct {
// contains filtered or unexported fields
}
func NewBucketStore ¶
func NewBucketStore(bucket phlareobj.Bucket, tenantID string, syncDir string, filters []BlockMetaFilter, logger log.Logger, Metrics *Metrics) (*BucketStore, error)
func (*BucketStore) InitialSync ¶
func (b *BucketStore) InitialSync(ctx context.Context) error
func (*BucketStore) MergeProfilesLabels ¶
func (store *BucketStore) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error
func (*BucketStore) MergeProfilesPprof ¶
func (store *BucketStore) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error
func (*BucketStore) MergeProfilesStacktraces ¶
func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error
func (*BucketStore) RemoveBlocksAndClose ¶
func (s *BucketStore) RemoveBlocksAndClose() error
RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.
func (*BucketStore) Stats ¶
func (b *BucketStore) Stats() BucketStoreStats
func (*BucketStore) SyncBlocks ¶
func (s *BucketStore) SyncBlocks(ctx context.Context) error
type BucketStoreConfig ¶
type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` SyncInterval time.Duration `yaml:"sync_interval" category:"advanced"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency" category:"advanced"` IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within" category:"advanced"` }
func (*BucketStoreConfig) RegisterFlags ¶
func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)
RegisterFlags registers the BucketStore flags
type BucketStoreStats ¶
type BucketStoreStats struct { // BlocksLoaded is the number of blocks currently loaded in the bucket store. BlocksLoaded int }
type BucketStores ¶
type BucketStores struct {
// contains filtered or unexported fields
}
func NewBucketStores ¶
func NewBucketStores(cfg BucketStoreConfig, shardingStrategy ShardingStrategy, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)
func (*BucketStores) InitialSync ¶
func (bs *BucketStores) InitialSync(ctx context.Context) error
func (*BucketStores) SyncBlocks ¶
func (bs *BucketStores) SyncBlocks(ctx context.Context) error
SyncBlocks synchronizes the stores state with the Bucket store for every user.
type Config ¶
type Config struct { ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration."` BucketStoreConfig BucketStoreConfig `yaml:"bucket_store,omitempty"` }
func (*Config) RegisterFlags ¶
RegisterFlags registers the Config flags.
type GaugeVec ¶
type GaugeVec interface {
WithLabelValues(lvs ...string) prometheus.Gauge
}
GaugeVec hides something like a Prometheus GaugeVec or an extprom.TxGaugeVec.
type Limits ¶
type Limits interface { ShardingLimits }
type Metrics ¶
type Metrics struct { Synced *prometheus.GaugeVec // contains filtered or unexported fields }
func NewMetrics ¶
func NewMetrics(reg prometheus.Registerer) *Metrics
type RingConfig ¶
type RingConfig struct { KVStore kv.Config `` /* 213-byte string literal not displayed */ HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"` ReplicationFactor int `yaml:"replication_factor" category:"advanced"` TokensFilePath string `yaml:"tokens_file_path"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"` WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"` // Instance details InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"` InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"` InstancePort int `yaml:"instance_port" category:"advanced"` InstanceAddr string `yaml:"instance_addr" category:"advanced"` EnableIPv6 bool `yaml:"instance_enable_ipv6" category:"advanced"` InstanceZone string `yaml:"instance_availability_zone"` UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` // Injected internally ListenPort int `yaml:"-"` RingCheckPeriod time.Duration `yaml:"-"` }
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶
func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardingLimits ¶
ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.
type ShardingStrategy ¶
type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. FilterUsers(ctx context.Context, userIDs []string) ([]string, error) // FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway. // The provided loaded map contains blocks which have been previously returned by this function and // are now loaded or loading in the store-gateway. FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced GaugeVec) error }
type ShuffleShardingStrategy ¶
type ShuffleShardingStrategy struct {
// contains filtered or unexported fields
}
SardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.
func NewShuffleShardingStrategy ¶
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy
NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
func (*ShuffleShardingStrategy) FilterBlocks ¶
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced GaugeVec) error
FilterBlocks implements ShardingStrategy.
func (*ShuffleShardingStrategy) FilterUsers ¶
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)
FilterUsers implements ShardingStrategy.
type StoreGateway ¶
func NewStoreGateway ¶
func NewStoreGateway(gatewayCfg Config, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)
func (*StoreGateway) BlocksHandler ¶
func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) MergeProfilesLabels ¶
func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error
func (*StoreGateway) MergeProfilesPprof ¶
func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error
func (*StoreGateway) MergeProfilesStacktraces ¶
func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error
func (*StoreGateway) RingHandler ¶
func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)
func (*StoreGateway) TenantsHandler ¶
func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)