storegateway

package
v1.0.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RingKey is the key under which we store the store gateways ring in the KVStore.
	RingKey = "store-gateway"

	// RingNameForServer is the name of the ring used by the store gateway server.
	RingNameForServer = "store-gateway"

	// RingNameForClient is the name of the ring used by the store gateway client (we need
	// a different name to avoid clashing Prometheus metrics when running in single-binary).
	RingNameForClient = "store-gateway-client"

	// We use a safe default instead of exposing to config option to the user
	// in order to simplify the config.
	RingNumTokens = 512
)

Variables

View Source
var (
	// BlocksOwnerSync is the operation used to check the authoritative owners of a block
	// (replicas included).
	BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)

	// BlocksOwnerRead is the operation used to check the authoritative owners of a block
	// (replicas included) that are available for queries (a store-gateway is available for
	// queries only when ACTIVE).
	BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

	// BlocksRead is the operation run by the querier to query blocks via the store-gateway.
	BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {

		return s != ring.ACTIVE
	})
)

Functions

func GetShuffleShardingSubring

func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing

GetShuffleShardingSubring returns the subring to be used for a given user. This function should be used both by store-gateway and querier in order to guarantee the same logic is used.

Types

type Block

type Block struct {
	BlockCloser
	// contains filtered or unexported fields
}

type BlockCloser

type BlockCloser interface {
	phlaredb.Querier
	Close() error
}

type BlockMetaFilter

type BlockMetaFilter interface {
	Filter(ctx context.Context, metas map[ulid.ULID]*block.Meta, synced GaugeVec) error
}

func NewShardingMetadataFilterAdapter

func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) BlockMetaFilter

type BucketStore

type BucketStore struct {
	// contains filtered or unexported fields
}

func NewBucketStore

func NewBucketStore(bucket phlareobj.Bucket, tenantID string, syncDir string, filters []BlockMetaFilter, logger log.Logger, Metrics *Metrics) (*BucketStore, error)

func (*BucketStore) InitialSync

func (b *BucketStore) InitialSync(ctx context.Context) error

func (*BucketStore) MergeProfilesLabels

func (store *BucketStore) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error

func (*BucketStore) MergeProfilesPprof

func (store *BucketStore) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error

func (*BucketStore) MergeProfilesStacktraces

func (store *BucketStore) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error

func (*BucketStore) RemoveBlocksAndClose

func (s *BucketStore) RemoveBlocksAndClose() error

RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore.

func (*BucketStore) Stats

func (b *BucketStore) Stats() BucketStoreStats

func (*BucketStore) SyncBlocks

func (s *BucketStore) SyncBlocks(ctx context.Context) error

type BucketStoreConfig

type BucketStoreConfig struct {
	SyncDir               string        `yaml:"sync_dir"`
	SyncInterval          time.Duration `yaml:"sync_interval" category:"advanced"`
	TenantSyncConcurrency int           `yaml:"tenant_sync_concurrency" category:"advanced"`
	IgnoreBlocksWithin    time.Duration `yaml:"ignore_blocks_within" category:"advanced"`
}

func (*BucketStoreConfig) RegisterFlags

func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the BucketStore flags

func (*BucketStoreConfig) Validate

func (cfg *BucketStoreConfig) Validate(logger log.Logger) error

Validate the config.

type BucketStoreStats

type BucketStoreStats struct {
	// BlocksLoaded is the number of blocks currently loaded in the bucket store.
	BlocksLoaded int
}

type BucketStores

type BucketStores struct {
	// contains filtered or unexported fields
}

func NewBucketStores

func NewBucketStores(cfg BucketStoreConfig, shardingStrategy ShardingStrategy, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error)

func (*BucketStores) InitialSync

func (bs *BucketStores) InitialSync(ctx context.Context) error

func (*BucketStores) SyncBlocks

func (bs *BucketStores) SyncBlocks(ctx context.Context) error

SyncBlocks synchronizes the stores state with the Bucket store for every user.

type Config

type Config struct {
	ShardingRing      RingConfig        `yaml:"sharding_ring" doc:"description=The hash ring configuration."`
	BucketStoreConfig BucketStoreConfig `yaml:"bucket_store,omitempty"`
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags registers the Config flags.

func (*Config) Validate

func (c *Config) Validate(limits validation.Limits) error

type GaugeVec

type GaugeVec interface {
	WithLabelValues(lvs ...string) prometheus.Gauge
}

GaugeVec hides something like a Prometheus GaugeVec or an extprom.TxGaugeVec.

type Limits

type Limits interface {
	ShardingLimits
}

type Metrics

type Metrics struct {
	Synced *prometheus.GaugeVec
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(reg prometheus.Registerer) *Metrics

type RingConfig

type RingConfig struct {
	KVStore              kv.Config     `` /* 213-byte string literal not displayed */
	HeartbeatPeriod      time.Duration `yaml:"heartbeat_period" category:"advanced"`
	HeartbeatTimeout     time.Duration `yaml:"heartbeat_timeout" category:"advanced"`
	ReplicationFactor    int           `yaml:"replication_factor" category:"advanced"`
	TokensFilePath       string        `yaml:"tokens_file_path"`
	ZoneAwarenessEnabled bool          `yaml:"zone_awareness_enabled"`

	// Wait ring stability.
	WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration" category:"advanced"`
	WaitStabilityMaxDuration time.Duration `yaml:"wait_stability_max_duration" category:"advanced"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" category:"advanced"`
	InstanceAddr           string   `yaml:"instance_addr" category:"advanced"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" category:"advanced"`
	InstanceZone           string   `yaml:"instance_availability_zone"`

	UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"`

	// Injected internally
	ListenPort      int           `yaml:"-"`
	RingCheckPeriod time.Duration `yaml:"-"`
}

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig

func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

type ShardingLimits

type ShardingLimits interface {
	StoreGatewayTenantShardSize(tenantID string) int
}

ShardingLimits is the interface that should be implemented by the limits provider, limiting the scope of the limits to the ones required by sharding strategies.

type ShardingStrategy

type ShardingStrategy interface {
	// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
	// that should be synced by the store-gateway.
	FilterUsers(ctx context.Context, userIDs []string) ([]string, error)

	// FilterBlocks filters metas in-place keeping only blocks that should be loaded by the store-gateway.
	// The provided loaded map contains blocks which have been previously returned by this function and
	// are now loaded or loading in the store-gateway.
	FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced GaugeVec) error
}

type ShuffleShardingStrategy

type ShuffleShardingStrategy struct {
	// contains filtered or unexported fields
}

SardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, where each tenant blocks are sharded across a subset of store-gateway instances.

func NewShuffleShardingStrategy

func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy

NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.

func (*ShuffleShardingStrategy) FilterBlocks

func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*block.Meta, loaded map[ulid.ULID]struct{}, synced GaugeVec) error

FilterBlocks implements ShardingStrategy.

func (*ShuffleShardingStrategy) FilterUsers

func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) ([]string, error)

FilterUsers implements ShardingStrategy.

type StoreGateway

type StoreGateway struct {
	services.Service
	// contains filtered or unexported fields
}

func NewStoreGateway

func NewStoreGateway(gatewayCfg Config, storageBucket phlareobj.Bucket, limits Limits, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error)

func (*StoreGateway) BlocksHandler

func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) MergeProfilesLabels

func (s *StoreGateway) MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse]) error

func (*StoreGateway) MergeProfilesPprof

func (s *StoreGateway) MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse]) error

func (*StoreGateway) MergeProfilesStacktraces

func (s *StoreGateway) MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse]) error

func (*StoreGateway) RingHandler

func (c *StoreGateway) RingHandler(w http.ResponseWriter, req *http.Request)

func (*StoreGateway) TenantsHandler

func (s *StoreGateway) TenantsHandler(w http.ResponseWriter, req *http.Request)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL