distributor

package
v3.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: AGPL-3.0 Imports: 60 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRateStore

func NewRateStore(cfg RateStoreConfig, r ring.ReadRing, cf poolClientFactory, l Limits, registerer prometheus.Registerer) *rateStore

Types

type Config

type Config struct {
	// Distributors ring
	DistributorRing RingConfig `yaml:"ring,omitempty"`
	PushWorkerCount int        `yaml:"push_worker_count"`

	// RateStore customizes the rate storing used by stream sharding.
	RateStore RateStoreConfig `yaml:"rate_store"`

	// WriteFailuresLoggingCfg customizes write failures logging behavior.
	WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Customize the logging of write failures."`

	OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`

	KafkaEnabled    bool         `yaml:"kafka_writes_enabled"`
	IngesterEnabled bool         `yaml:"ingester_writes_enabled"`
	KafkaConfig     kafka.Config `yaml:"-"`
	// contains filtered or unexported fields
}

Config for a Distributor.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(fs *flag.FlagSet)

RegisterFlags registers distributor-related flags.

func (*Config) Validate added in v3.3.0

func (cfg *Config) Validate() error

type Distributor

type Distributor struct {
	services.Service

	RequestParserWrapper push.RequestParserWrapper
	// contains filtered or unexported fields
}

Distributor coordinates replicates and distribution of log streams.

func New

func New(
	cfg Config,
	clientCfg client.Config,
	configs *runtime.TenantConfigs,
	ingestersRing ring.ReadRing,
	partitionRing ring.PartitionRingReader,
	overrides Limits,
	registerer prometheus.Registerer,
	metricsNamespace string,
	tee Tee,
	usageTracker push.UsageTracker,
	logger log.Logger,
) (*Distributor, error)

New a distributor creates.

func (*Distributor) HealthyInstancesCount

func (d *Distributor) HealthyInstancesCount() int

HealthyInstancesCount implements the ReadLifecycler interface.

We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES.

func (*Distributor) OTLPPushHandler

func (d *Distributor) OTLPPushHandler(w http.ResponseWriter, r *http.Request)

func (*Distributor) Push

Push a set of streams. The returned error is the last one seen.

func (*Distributor) PushHandler

func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request)

PushHandler reads a snappy-compressed proto from the HTTP body.

func (*Distributor) ServeHTTP

func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements the distributor ring status page.

If the rate limiting strategy is local instead of global, no ring is used by the distributor and as such, no ring status is returned from this function.

type KafkaProducer added in v3.3.0

type KafkaProducer interface {
	ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults
	Close()
}

type KeyedStream

type KeyedStream struct {
	HashKey uint32
	Stream  logproto.Stream
}

type Limits

type Limits interface {
	retention.Limits
	MaxLineSize(userID string) int
	MaxLineSizeTruncate(userID string) bool
	MaxLabelNamesPerSeries(userID string) int
	MaxLabelNameLength(userID string) int
	MaxLabelValueLength(userID string) int

	CreationGracePeriod(userID string) time.Duration
	RejectOldSamples(userID string) bool
	RejectOldSamplesMaxAge(userID string) time.Duration

	IncrementDuplicateTimestamps(userID string) bool
	DiscoverServiceName(userID string) []string
	DiscoverLogLevels(userID string) bool

	ShardStreams(userID string) shardstreams.Config
	IngestionRateStrategy() string
	IngestionRateBytes(userID string) float64
	IngestionBurstSizeBytes(userID string) int
	AllowStructuredMetadata(userID string) bool
	MaxStructuredMetadataSize(userID string) int
	MaxStructuredMetadataCount(userID string) int
	OTLPConfig(userID string) push.OTLPConfig

	BlockIngestionUntil(userID string) time.Time
	BlockIngestionStatusCode(userID string) int

	IngestionPartitionsTenantShardSize(userID string) int
}

Limits is an interface for distributor limits/related configs

type RateStore

type RateStore interface {
	RateFor(tenantID string, streamHash uint64) (int64, float64)
}

RateStore manages the ingestion rate of streams, populated by data fetched from ingesters.

type RateStoreConfig

type RateStoreConfig struct {
	MaxParallelism           int           `yaml:"max_request_parallelism"`
	StreamRateUpdateInterval time.Duration `yaml:"stream_rate_update_interval"`
	IngesterReqTimeout       time.Duration `yaml:"ingester_request_timeout"`
	Debug                    bool          `yaml:"debug"`
}

func (*RateStoreConfig) RegisterFlagsWithPrefix

func (cfg *RateStoreConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet)

type ReadLifecycler

type ReadLifecycler interface {
	HealthyInstancesCount() int
}

ReadLifecycler represents the read interface to the lifecycler.

type RingConfig

type RingConfig struct {
	KVStore          kv.Config     `yaml:"kvstore"`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"hidden"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" doc:"hidden"`
	InstanceAddr           string   `yaml:"instance_addr" doc:"hidden"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" doc:"hidden"`

	// Injected internally
	ListenPort int `yaml:"-"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToBasicLifecyclerConfig

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

ToBasicLifecyclerConfig returns a BasicLifecyclerConfig based on the distributor ring config.

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

type ShardTracker

type ShardTracker struct {
	// contains filtered or unexported fields
}

ShardTracker is a data structure to keep track of the last pushed shard number for a given stream hash. This allows the distributor to evenly shard streams across pushes even when any given push has fewer entries than the calculated number of shards

func NewShardTracker

func NewShardTracker() *ShardTracker

func (*ShardTracker) LastShardNum

func (t *ShardTracker) LastShardNum(tenant string, streamHash uint64) int

func (*ShardTracker) SetLastShardNum

func (t *ShardTracker) SetLastShardNum(tenant string, streamHash uint64, shardNum int)

type Tee

type Tee interface {
	Duplicate(tenant string, streams []KeyedStream)
}

Tee implementations can duplicate the log streams to another endpoint.

func WrapTee

func WrapTee(existing, new Tee) Tee

WrapTee wraps a new Tee around an existing Tee.

type Validator

type Validator struct {
	Limits
	// contains filtered or unexported fields
}

func NewValidator

func NewValidator(l Limits, t push.UsageTracker) (*Validator, error)

func (Validator) ShouldBlockIngestion added in v3.2.0

func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (bool, time.Time, int)

ShouldBlockIngestion returns whether ingestion should be blocked, until when and the status code.

func (Validator) ValidateEntry

func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry) error

ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.

func (Validator) ValidateLabels

func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error

Validate labels returns an error if the labels are invalid

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL