Documentation ¶
Index ¶
- Variables
- type AdaptivePlacement
- type Agent
- type BucketStore
- func (s *BucketStore) LoadRules(ctx context.Context) (*adaptive_placementpb.PlacementRules, error)
- func (s *BucketStore) LoadStats(ctx context.Context) (*adaptive_placementpb.DistributionStats, error)
- func (s *BucketStore) StoreRules(ctx context.Context, rules *adaptive_placementpb.PlacementRules) error
- func (s *BucketStore) StoreStats(ctx context.Context, stats *adaptive_placementpb.DistributionStats) error
- type Config
- type DistributionStats
- type EmptyStore
- func (e *EmptyStore) LoadRules(context.Context) (*adaptive_placementpb.PlacementRules, error)
- func (e *EmptyStore) LoadStats(context.Context) (*adaptive_placementpb.DistributionStats, error)
- func (e *EmptyStore) StoreRules(context.Context, *adaptive_placementpb.PlacementRules) error
- func (e *EmptyStore) StoreStats(context.Context, *adaptive_placementpb.DistributionStats) error
- type Limits
- type LoadBalancing
- type Manager
- type PlacementLimits
- type Ruler
- type Sample
- type Store
- type StoreReader
- type StoreWriter
Constants ¶
This section is empty.
Variables ¶
var ( ErrRulesNotFound = errors.New("placement rules not found") ErrStatsNotFound = errors.New("placement stats not found") )
var ErrLoadBalancing = errors.New("invalid load balancing option")
Functions ¶
This section is empty.
Types ¶
type AdaptivePlacement ¶
type AdaptivePlacement struct {
// contains filtered or unexported fields
}
AdaptivePlacement is a placement policy that adapts to the distribution of data.
It uses a set of rules to determine the number of shards to allocate to each tenant and dataset, and a load balancing function to distribute the dataset keys.
func NewAdaptivePlacement ¶
func NewAdaptivePlacement(limits Limits) *AdaptivePlacement
func (*AdaptivePlacement) Policy ¶
func (a *AdaptivePlacement) Policy(k placement.Key) placement.Policy
func (*AdaptivePlacement) Update ¶
func (a *AdaptivePlacement) Update(rules *adaptive_placementpb.PlacementRules)
type Agent ¶
type Agent struct {
// contains filtered or unexported fields
}
func NewAgent ¶
func NewAgent( logger log.Logger, reg prometheus.Registerer, config Config, limits Limits, store Store, ) *Agent
func (*Agent) Placement ¶
func (a *Agent) Placement() *AdaptivePlacement
type BucketStore ¶
type BucketStore struct {
// contains filtered or unexported fields
}
func NewStore ¶
func NewStore(bucket objstore.Bucket) *BucketStore
func (*BucketStore) LoadRules ¶
func (s *BucketStore) LoadRules(ctx context.Context) (*adaptive_placementpb.PlacementRules, error)
func (*BucketStore) LoadStats ¶
func (s *BucketStore) LoadStats(ctx context.Context) (*adaptive_placementpb.DistributionStats, error)
func (*BucketStore) StoreRules ¶
func (s *BucketStore) StoreRules(ctx context.Context, rules *adaptive_placementpb.PlacementRules) error
func (*BucketStore) StoreStats ¶
func (s *BucketStore) StoreStats(ctx context.Context, stats *adaptive_placementpb.DistributionStats) error
type Config ¶
type Config struct { PlacementUpdateInterval time.Duration `yaml:"placement_rules_update_interval" json:"placement_rules_update_interval"` PlacementRetentionPeriod time.Duration `yaml:"placement_rules_retention_period" json:"placement_rules_retention_period"` StatsConfidencePeriod time.Duration `yaml:"stats_confidence_period" json:"stats_confidence_period"` StatsAggregationWindow time.Duration `yaml:"stats_aggregation_window" json:"stats_aggregation_window"` StatsRetentionPeriod time.Duration `yaml:"stats_retention_period" json:"stats_retention_period"` ExportShardLimitMetrics bool `yaml:"export_shard_limit_metrics" json:"export_shard_limit_metrics"` ExportShardUsageMetrics bool `yaml:"export_shard_usage_metrics" json:"export_shard_usage_metrics"` ExportShardUsageBreakdownMetrics bool `yaml:"export_shard_usage_breakdown_metrics" json:"export_shard_usage_breakdown_metrics"` }
func DefaultConfig ¶
func DefaultConfig() Config
func (*Config) RegisterFlags ¶
type DistributionStats ¶
type DistributionStats struct {
// contains filtered or unexported fields
}
DistributionStats is a helper struct that tracks the data rate of each dataset within a certain time window. EWMA aggregation function is used to calculate the instantaneous rate of the dataset, the time window is half-life of the EWMA function.
DistributionStats is safe for concurrent use.
func NewDistributionStats ¶
func NewDistributionStats(window time.Duration) *DistributionStats
func (*DistributionStats) Build ¶
func (d *DistributionStats) Build() *adaptive_placementpb.DistributionStats
func (*DistributionStats) Expire ¶
func (d *DistributionStats) Expire(before time.Time)
func (*DistributionStats) RecordStats ¶
func (d *DistributionStats) RecordStats(samples iter.Iterator[Sample])
type EmptyStore ¶
type EmptyStore struct{}
EmptyStore is a Store implementation that always returns empty rules and stats, and doesn't store anything.
func NewEmptyStore ¶
func NewEmptyStore() *EmptyStore
func (*EmptyStore) LoadRules ¶
func (e *EmptyStore) LoadRules(context.Context) (*adaptive_placementpb.PlacementRules, error)
func (*EmptyStore) LoadStats ¶
func (e *EmptyStore) LoadStats(context.Context) (*adaptive_placementpb.DistributionStats, error)
func (*EmptyStore) StoreRules ¶
func (e *EmptyStore) StoreRules(context.Context, *adaptive_placementpb.PlacementRules) error
func (*EmptyStore) StoreStats ¶
func (e *EmptyStore) StoreStats(context.Context, *adaptive_placementpb.DistributionStats) error
type Limits ¶
type Limits interface {
PlacementLimits(tenant string) PlacementLimits
}
type LoadBalancing ¶
type LoadBalancing string
const ( FingerprintLoadBalancing LoadBalancing = "fingerprint" RoundRobinLoadBalancing LoadBalancing = "round-robin" DynamicLoadBalancing LoadBalancing = "dynamic" )
func (*LoadBalancing) Set ¶
func (lb *LoadBalancing) Set(text string) error
func (*LoadBalancing) String ¶
func (lb *LoadBalancing) String() string
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager maintains placement rules and distribution stats in the store.
Manager implements services.Service interface for convenience, but it's meant to be started and stopped explicitly via Start and Stop calls.
If manager is being stopped while updating rules, an ongoing attempt is not aborted: we're interested in finishing the operation so that the rules reflect the most recent statistics. Another reason is that another instance might be already running at the Stop call time.
When just started, the manager may not have enough statistics to build the rules: StatsConfidencePeriod should expire before the first update. Note that ruler won't downscale datasets for a certain period of time after the ruler is created regardless of the confidence period. Therefore, it's generally safe to publish rules even with incomplete statistics; however, this allows for delays in response to changes of the data flow.
func NewManager ¶
func NewManager( logger log.Logger, reg prometheus.Registerer, config Config, limits Limits, store Store, ) *Manager
type PlacementLimits ¶
type PlacementLimits struct { TenantShards uint64 `yaml:"adaptive_placement_tenant_shards" json:"adaptive_placement_tenant_shards" doc:"hidden"` DefaultDatasetShards uint64 `yaml:"adaptive_placement_default_dataset_shards" json:"adaptive_placement_default_dataset_shards" doc:"hidden"` LoadBalancing LoadBalancing `yaml:"adaptive_placement_load_balancing" json:"adaptive_placement_load_balancing" doc:"hidden"` MinDatasetShards uint64 `yaml:"adaptive_placement_min_dataset_shards" json:"adaptive_placement_min_dataset_shards" doc:"hidden"` MaxDatasetShards uint64 `yaml:"adaptive_placement_max_dataset_shards" json:"adaptive_placement_max_dataset_shards" doc:"hidden"` UnitSizeBytes uint64 `yaml:"adaptive_placement_unit_size_bytes" json:"adaptive_placement_unit_size_bytes" doc:"hidden"` BurstWindow time.Duration `yaml:"adaptive_placement_burst_window" json:"adaptive_placement_burst_window" doc:"hidden"` DecayWindow time.Duration `yaml:"adaptive_placement_decay_window" json:"adaptive_placement_decay_window" doc:"hidden"` }
PlacementLimits defines the limits for adaptive sharding. These parameters are tenant-specific.
func (*PlacementLimits) RegisterFlags ¶
func (o *PlacementLimits) RegisterFlags(f *flag.FlagSet)
func (*PlacementLimits) RegisterFlagsWithPrefix ¶
func (o *PlacementLimits) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
type Ruler ¶
type Ruler struct {
// contains filtered or unexported fields
}
Ruler builds placement rules based on distribution stats.
Ruler is not safe for concurrent use: the caller should ensure synchronization.
func (*Ruler) BuildRules ¶
func (r *Ruler) BuildRules(stats *adaptive_placementpb.DistributionStats) *adaptive_placementpb.PlacementRules
func (*Ruler) Load ¶
func (r *Ruler) Load(rules *adaptive_placementpb.PlacementRules)
type Store ¶
type Store interface { StoreReader StoreWriter }
type StoreReader ¶
type StoreReader interface { LoadRules(context.Context) (*adaptive_placementpb.PlacementRules, error) LoadStats(context.Context) (*adaptive_placementpb.DistributionStats, error) }
type StoreWriter ¶
type StoreWriter interface { StoreRules(context.Context, *adaptive_placementpb.PlacementRules) error StoreStats(context.Context, *adaptive_placementpb.DistributionStats) error }