Documentation ¶
Index ¶
- func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, ...) *rateStore
- type Config
- type Distributor
- func (d *Distributor) HealthyInstancesCount() int
- func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
- func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
- type Limits
- type RateStore
- type RateStoreConfig
- type ReadLifecycler
- type RingConfig
- type ShardTracker
- type Validator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRateStore ¶
func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore
Types ¶
type Config ¶
type Config struct { // Distributors ring DistributorRing RingConfig `yaml:"ring,omitempty"` // RateStore customizes the rate storing used by stream sharding. RateStore RateStoreConfig `yaml:"rate_store"` // WriteFailuresLoggingCfg customizes write failures logging behavior. WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Experimental. Customize the logging of write failures."` // contains filtered or unexported fields }
Config for a Distributor.
func (*Config) RegisterFlags ¶
RegisterFlags registers distributor-related flags.
type Distributor ¶
Distributor coordinates replicates and distribution of log streams.
func New ¶
func New( cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, overrides Limits, registerer prometheus.Registerer, ) (*Distributor, error)
New a distributor creates.
func (*Distributor) HealthyInstancesCount ¶
func (d *Distributor) HealthyInstancesCount() int
HealthyInstancesCount implements the ReadLifecycler interface.
We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES.
func (*Distributor) Push ¶
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
Push a set of streams. The returned error is the last one seen.
func (*Distributor) PushHandler ¶
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)
PushHandler reads a snappy-compressed proto from the HTTP body.
func (*Distributor) ServeHTTP ¶
func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements the distributor ring status page.
If the rate limiting strategy is local instead of global, no ring is used by the distributor and as such, no ring status is returned from this function.
type Limits ¶
type Limits interface { retention.Limits MaxLineSize(userID string) int MaxLineSizeTruncate(userID string) bool EnforceMetricName(userID string) bool MaxLabelNamesPerSeries(userID string) int MaxLabelNameLength(userID string) int MaxLabelValueLength(userID string) int CreationGracePeriod(userID string) time.Duration RejectOldSamples(userID string) bool RejectOldSamplesMaxAge(userID string) time.Duration IncrementDuplicateTimestamps(userID string) bool ShardStreams(userID string) *shardstreams.Config IngestionRateStrategy() string IngestionRateBytes(userID string) float64 IngestionBurstSizeBytes(userID string) int }
Limits is an interface for distributor limits/related configs
type RateStore ¶
RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.
type RateStoreConfig ¶
type RateStoreConfig struct { MaxParallelism int `yaml:"max_request_parallelism"` StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"` IngesterReqTimeout time.Duration `yaml:"ingester_request_timeout"` Debug bool `yaml:"debug"` }
func (*RateStoreConfig) RegisterFlagsWithPrefix ¶
func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet)
type ReadLifecycler ¶
type ReadLifecycler interface {
HealthyInstancesCount() int
}
ReadLifecycler represents the read interface to the lifecycler.
type RingConfig ¶
type RingConfig struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` EnableIPv6 bool `yaml:"instance_enable_ipv6" doc:"hidden"` // Injected internally ListenPort int `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToBasicLifecyclerConfig ¶
func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)
ToBasicLifecyclerConfig returns a BasicLifecyclerConfig based on the distributor ring config.
func (*RingConfig) ToRingConfig ¶
func (cfg *RingConfig) ToRingConfig() ring.Config
type ShardTracker ¶
type ShardTracker struct {
// contains filtered or unexported fields
}
ShardTracker is a data structure to keep track of the last pushed shard number for a given stream hash. This allows the distributor to evenly shard streams across pushes even when any given push has fewer entries than the calculated number of shards
func NewShardTracker ¶
func NewShardTracker() *ShardTracker
func (*ShardTracker) LastShardNum ¶
func (t *ShardTracker) LastShardNum(tenant string, streamHash uint64) int
func (*ShardTracker) SetLastShardNum ¶
func (t *ShardTracker) SetLastShardNum(tenant string, streamHash uint64, shardNum int)
type Validator ¶
type Validator struct {
Limits
}
func NewValidator ¶
func (Validator) ValidateEntry ¶
ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.