distributor

package
v0.0.0-...-f8e07d0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2024 License: AGPL-3.0 Imports: 83 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SkipLabelNameValidationHeader  = "X-Mimir-SkipLabelNameValidation"
	SkipLabelCountValidationHeader = "X-Mimir-SkipLabelCountValidation"
)
View Source
const (
	// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
	// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
	ExemplarMaxLabelSetLength = 128
)
View Source
const (
	// 529 is non-standard status code used by some services to signal that "The service is overloaded".
	StatusServiceOverloaded = 529
)

Variables

View Source
var (
	ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowHaTracker   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrResponseTooLarge = errors.New("response too large")

Functions

func GetReplicaDescCodec

func GetReplicaDescCodec() codec.Proto

func Handler

func Handler(
	maxRecvMsgSize int,
	requestBufferPool util.Pool,
	sourceIPs *middleware.SourceIPExtractor,
	allowSkipLabelNameValidation bool,
	allowSkipLabelCountValidation bool,
	limits *validation.Overrides,
	retryCfg RetryConfig,
	push PushFunc,
	pushMetrics *PushMetrics,
	logger log.Logger,
) http.Handler

Handler is a http.Handler which accepts WriteRequests.

func NewPool

func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger) *ring_client.Pool

func OTLPHandler

func OTLPHandler(
	maxRecvMsgSize int,
	requestBufferPool util.Pool,
	sourceIPs *middleware.SourceIPExtractor,
	limits OTLPHandlerLimits,
	resourceAttributePromotionConfig OTelResourceAttributePromotionConfig,
	retryCfg RetryConfig,
	push PushFunc,
	pushMetrics *PushMetrics,
	reg prometheus.Registerer,
	logger log.Logger,
) http.Handler

OTLPHandler is an http.Handler accepting OTLP write requests.

func ProtoReplicaDescFactory

func ProtoReplicaDescFactory() proto.Message

ProtoReplicaDescFactory makes new InstanceDescs

func SetDefaultInstanceLimitsForYAMLUnmarshalling

func SetDefaultInstanceLimitsForYAMLUnmarshalling(l InstanceLimits)

func TimeseriesToOTLPRequest

func TimeseriesToOTLPRequest(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) pmetricotlp.ExportRequest

TimeseriesToOTLPRequest is used in tests.

Types

type Config

type Config struct {
	PoolConfig PoolConfig `yaml:"pool"`

	RetryConfig     RetryConfig     `yaml:"retry_after_header"`
	HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

	MaxRecvMsgSize           int           `yaml:"max_recv_msg_size" category:"advanced"`
	MaxOTLPRequestSize       int           `yaml:"max_otlp_request_size" category:"experimental"`
	MaxRequestPoolBufferSize int           `yaml:"max_request_pool_buffer_size" category:"experimental"`
	RemoteTimeout            time.Duration `yaml:"remote_timeout" category:"advanced"`

	// Distributors ring
	DistributorRing RingConfig `yaml:"ring"`

	// for testing and for extending the ingester by adding calls to the client
	IngesterClientFactory ring_client.PoolFactory `yaml:"-"`

	// When SkipLabelValidation is true the distributor does not validate the label name and value, Mimir doesn't directly use
	// this (and should never use it) but this feature is used by other projects built on top of it.
	SkipLabelValidation bool `yaml:"-"`

	// When SkipLabelCountValidation is true the distributor does not validate the number of labels, Mimir doesn't directly use
	// this (and should never use it) but this feature is used by other projects built on top of it.
	SkipLabelCountValidation bool `yaml:"-"`

	// This config is dynamically injected because it is defined in the querier config.
	ShuffleShardingLookbackPeriod              time.Duration `yaml:"-"`
	StreamingChunksPerIngesterSeriesBufferSize uint64        `yaml:"-"`
	MinimizeIngesterRequests                   bool          `yaml:"-"`
	MinimiseIngesterRequestsHedgingDelay       time.Duration `yaml:"-"`
	PreferAvailabilityZone                     string        `yaml:"-"`

	// IngestStorageConfig is dynamically injected because defined outside of distributor config.
	IngestStorageConfig ingest.Config `yaml:"-"`

	// Limits for distributor
	DefaultLimits    InstanceLimits         `yaml:"instance_limits"`
	InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

	// This allows downstream projects to wrap the distributor push function
	// and access the deserialized write requests before/after they are pushed.
	// These functions will only receive samples that don't get dropped by HA deduplication.
	PushWrappers []PushWrapper `yaml:"-"`

	WriteRequestsBufferPoolingEnabled bool `yaml:"write_requests_buffer_pooling_enabled" category:"experimental"`
	ReusableIngesterPushWorkers       int  `yaml:"reusable_ingester_push_workers" category:"advanced"`

	// OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion.
	OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"`
}

Config contains the configuration required to create a Distributor

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*Config) Validate

func (cfg *Config) Validate(limits validation.Limits) error

Validate config and returns error on failure

type Distributor

type Distributor struct {
	services.Service

	// For handling HA replicas.
	HATracker haTracker

	// Metrics to be passed to distributor push handlers
	PushMetrics *PushMetrics

	PushWithMiddlewares PushFunc

	RequestBufferPool util.Pool
	// contains filtered or unexported fields
}

Distributor forwards appends and queries to individual ingesters.

func New

func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error)

New constructs a new Distributor

func (*Distributor) ActiveNativeHistogramMetrics

func (d *Distributor) ActiveNativeHistogramMetrics(ctx context.Context, matchers []*labels.Matcher) (*cardinality.ActiveNativeHistogramMetricsResponse, error)

func (*Distributor) ActiveSeries

func (d *Distributor) ActiveSeries(ctx context.Context, matchers []*labels.Matcher) ([]labels.Labels, error)

ActiveSeries queries the ingester replication set for active series matching the given selector. It combines and deduplicates the results.

func (*Distributor) AllUserStats

func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)

AllUserStats returns statistics about all users. Note it does not divide by the ReplicationFactor like UserStats()

func (*Distributor) AllUserStatsHandler

func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)

AllUserStatsHandler shows stats for all users.

func (*Distributor) FinishPushRequest

func (d *Distributor) FinishPushRequest(ctx context.Context)

FinishPushRequest is a counter-part to StartPushRequest, and must be called exactly once while handling the push request, on the same goroutine as push method itself.

func (*Distributor) HealthyInstancesCount

func (d *Distributor) HealthyInstancesCount() int

HealthyInstancesCount implements the ReadLifecycler interface

We use a ring lifecycler delegate to count the number of members of the ring. The count is then used to enforce rate limiting correctly for each distributor. $EFFECTIVE_RATE_LIMIT = $GLOBAL_RATE_LIMIT / $NUM_INSTANCES

func (*Distributor) LabelNames

func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]string, error)

LabelNames returns the names of all labels from series with samples timestamp between from and to, and matching the input optional series label matchers. The returned label names are sorted.

func (*Distributor) LabelNamesAndValues

func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (*ingester_client.LabelNamesAndValuesResponse, error)

LabelNamesAndValues returns the label name and value pairs for series matching the input label matchers.

The actual series considered eligible depend on countMethod:

  • inmemory: in-memory series in ingesters.
  • active: in-memory series in ingesters which are also tracked as active ones.

func (*Distributor) LabelValuesCardinality

func (d *Distributor) LabelValuesCardinality(ctx context.Context, labelNames []model.LabelName, matchers []*labels.Matcher, countMethod cardinality.CountMethod) (uint64, *ingester_client.LabelValuesCardinalityResponse, error)

LabelValuesCardinality performs the following two operations in parallel:

  • queries ingesters for label values cardinality of a set of labelNames
  • queries ingesters for user stats to get the ingester's series head count

func (*Distributor) LabelValuesForLabelName

func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error)

LabelValuesForLabelName returns the label values associated with the given labelName, among all series with samples timestamp between from and to, and series labels matching the optional matchers.

func (*Distributor) MetricsForLabelMatchers

func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error)

MetricsForLabelMatchers returns a list of series with samples timestamps between from and through, and series labels matching the optional label matchers. The returned series are not sorted.

func (*Distributor) MetricsMetadata

MetricsMetadata returns the metrics metadata based on the provided req.

func (*Distributor) Push

Push is gRPC method registered as client.IngesterServer and distributor.DistributorServer.

func (*Distributor) QueryExemplars

func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error)

QueryExemplars returns exemplars with timestamp between from and to, for the series matching the input series label matchers. The exemplars in the response are sorted by series labels.

func (*Distributor) QueryStream

func (d *Distributor) QueryStream(ctx context.Context, queryMetrics *stats.QueryMetrics, from, to model.Time, matchers ...*labels.Matcher) (ingester_client.CombinedQueryStreamResponse, error)

QueryStream queries multiple ingesters via the streaming interface and returns a big ol' set of chunks.

func (*Distributor) RemoveGroupMetricsForUser

func (d *Distributor) RemoveGroupMetricsForUser(userID, group string)

func (*Distributor) ServeHTTP

func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Distributor) StartPushRequest

func (d *Distributor) StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error)

func (*Distributor) UserStats

func (d *Distributor) UserStats(ctx context.Context, countMethod cardinality.CountMethod) (*UserStats, error)

UserStats returns statistics about the current user.

func (*Distributor) UserStatsHandler

func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)

UserStatsHandler handles user stats to the Distributor.

type Error

type Error interface {
	// Cause returns the cause of the error.
	Cause() mimirpb.ErrorCause
}

Error is a marker interface for the errors returned by distributor.

type HATrackerConfig

type HATrackerConfig struct {
	EnableHATracker bool `yaml:"enable_ha_tracker"`
	// We should only update the timestamp if the difference
	// between the stored timestamp and the time we received a sample at
	// is more than this duration.
	UpdateTimeout          time.Duration `yaml:"ha_tracker_update_timeout" category:"advanced"`
	UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max" category:"advanced"`
	// We should only failover to accepting samples from a replica
	// other than the replica written in the KVStore if the difference
	// between the stored timestamp and the time we received a sample is
	// more than this duration
	FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout" category:"advanced"`

	KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Note that memberlist support is experimental."`
}

HATrackerConfig contains the configuration required to create an HA Tracker.

func (*HATrackerConfig) RegisterFlags

func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*HATrackerConfig) Validate

func (cfg *HATrackerConfig) Validate() error

Validate config and returns error on failure

type InstanceLimits

type InstanceLimits struct {
	MaxIngestionRate             float64 `yaml:"max_ingestion_rate" category:"advanced"`
	MaxInflightPushRequests      int     `yaml:"max_inflight_push_requests" category:"advanced"`
	MaxInflightPushRequestsBytes int     `yaml:"max_inflight_push_requests_bytes" category:"advanced"`
}

func (*InstanceLimits) RegisterFlags

func (l *InstanceLimits) RegisterFlags(f *flag.FlagSet)

func (*InstanceLimits) UnmarshalYAML

func (l *InstanceLimits) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML implements the yaml.Unmarshaler interface.

type OTLPHandlerLimits

type OTLPHandlerLimits interface {
	OTelMetricSuffixesEnabled(id string) bool
	OTelCreatedTimestampZeroIngestionEnabled(id string) bool
	PromoteOTelResourceAttributes(id string) []string
	OTelKeepIdentifyingResourceAttributes(id string) bool
}

type OTelResourceAttributePromotionConfig

type OTelResourceAttributePromotionConfig interface {
	// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
	PromoteOTelResourceAttributes(id string) []string
}

OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.

type PoolConfig

type PoolConfig struct {
	ClientCleanupPeriod  time.Duration `yaml:"client_cleanup_period" category:"advanced"`
	HealthCheckIngesters bool          `yaml:"health_check_ingesters" category:"advanced"`
	RemoteTimeout        time.Duration `yaml:"-"`
}

PoolConfig is config for creating a Pool.

func (*PoolConfig) RegisterFlags

func (cfg *PoolConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

type PushFunc

type PushFunc func(ctx context.Context, req *Request) error

PushFunc defines the type of the push. It is similar to http.HandlerFunc.

func NextOrCleanup

func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup func())

NextOrCleanup returns a new PushFunc and a cleanup function that should be deferred by the caller. The cleanup function will only call Request.CleanUp() if next() wasn't called previously.

This function is used outside of this codebase.

type PushMetrics

type PushMetrics struct {
	// contains filtered or unexported fields
}

func (*PushMetrics) IncOTLPRequest

func (m *PushMetrics) IncOTLPRequest(user string)

func (*PushMetrics) ObserveUncompressedBodySize

func (m *PushMetrics) ObserveUncompressedBodySize(user string, size float64)

type PushWrapper

type PushWrapper func(next PushFunc) PushFunc

PushWrapper wraps around a push. It is similar to middleware.Interface.

type ReadLifecycler

type ReadLifecycler interface {
	HealthyInstancesCount() int
}

ReadLifecycler represents the read interface to the lifecycler.

type ReplicaDesc

type ReplicaDesc struct {
	Replica string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"`
	// Unix timestamp in milliseconds when we have last received request from this replica.
	ReceivedAt int64 `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"`
	// Unix timestamp in milliseconds when this entry was marked for deletion.
	// Reason for doing marking first, and delete later, is to make sure that distributors
	// watching the prefix will receive notification on "marking" -- at which point they can
	// already remove entry from memory. Actual deletion from KV store does *not* trigger
	// "watch" notification with a key for all KV stores.
	DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"`
	// This is the most recent timestamp when this replica was elected as the leader.
	ElectedAt int64 `protobuf:"varint,4,opt,name=elected_at,json=electedAt,proto3" json:"elected_at,omitempty"`
	// This is incremented every time a new replica is elected as the leader.
	ElectedChanges int64 `protobuf:"varint,5,opt,name=elected_changes,json=electedChanges,proto3" json:"elected_changes,omitempty"`
}

func NewReplicaDesc

func NewReplicaDesc() *ReplicaDesc

NewReplicaDesc returns an empty *distributor.ReplicaDesc.

func (*ReplicaDesc) Clone

func (r *ReplicaDesc) Clone() memberlist.Mergeable

Clone returns a deep copy of the ReplicaDesc.

func (*ReplicaDesc) Descriptor

func (*ReplicaDesc) Descriptor() ([]byte, []int)

func (*ReplicaDesc) Equal

func (this *ReplicaDesc) Equal(that interface{}) bool

func (*ReplicaDesc) GetDeletedAt

func (m *ReplicaDesc) GetDeletedAt() int64

func (*ReplicaDesc) GetElectedAt

func (m *ReplicaDesc) GetElectedAt() int64

func (*ReplicaDesc) GetElectedChanges

func (m *ReplicaDesc) GetElectedChanges() int64

func (*ReplicaDesc) GetReceivedAt

func (m *ReplicaDesc) GetReceivedAt() int64

func (*ReplicaDesc) GetReplica

func (m *ReplicaDesc) GetReplica() string

func (*ReplicaDesc) GoString

func (this *ReplicaDesc) GoString() string

func (*ReplicaDesc) Marshal

func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)

func (*ReplicaDesc) MarshalTo

func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error)

func (*ReplicaDesc) MarshalToSizedBuffer

func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ReplicaDesc) Merge

func (r *ReplicaDesc) Merge(other memberlist.Mergeable, _ bool) (change memberlist.Mergeable, error error)

Merge merges other ReplicaDesc into this one. The decision is made based on the ReceivedAt timestamp, if the Replica name is the same and at the ElectedAt if the Replica name is different

func (*ReplicaDesc) MergeContent

func (r *ReplicaDesc) MergeContent() []string

MergeContent describes content of this Mergeable. Given that ReplicaDesc can have only one instance at a time, it returns the ReplicaDesc it contains. By doing this we choose to not make use of the subset invalidation feature of memberlist

func (*ReplicaDesc) ProtoMessage

func (*ReplicaDesc) ProtoMessage()

func (*ReplicaDesc) RemoveTombstones

func (r *ReplicaDesc) RemoveTombstones(_ time.Time) (total, removed int)

RemoveTombstones is noOp because we will handle replica deletions outside the context of memberlist.

func (*ReplicaDesc) Reset

func (m *ReplicaDesc) Reset()

func (*ReplicaDesc) Size

func (m *ReplicaDesc) Size() (n int)

func (*ReplicaDesc) String

func (this *ReplicaDesc) String() string

func (*ReplicaDesc) Unmarshal

func (m *ReplicaDesc) Unmarshal(dAtA []byte) error

func (*ReplicaDesc) XXX_DiscardUnknown

func (m *ReplicaDesc) XXX_DiscardUnknown()

func (*ReplicaDesc) XXX_Marshal

func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ReplicaDesc) XXX_Merge

func (m *ReplicaDesc) XXX_Merge(src proto.Message)

func (*ReplicaDesc) XXX_Size

func (m *ReplicaDesc) XXX_Size() int

func (*ReplicaDesc) XXX_Unmarshal

func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error

type Request

type Request struct {
	// contains filtered or unexported fields
}

Request represents a push request. It allows lazy body reading from the underlying http request and adding cleanup functions that should be called after the request has been handled.

func NewParsedRequest

func NewParsedRequest(r *mimirpb.WriteRequest) *Request

func (*Request) AddCleanup

func (r *Request) AddCleanup(f func())

AddCleanup adds a function that will be called once CleanUp is called. If f is nil, it will not be invoked.

func (*Request) CleanUp

func (r *Request) CleanUp()

CleanUp calls all added cleanups in reverse order - the last added is the first invoked. CleanUp removes each called cleanup function from the list of cleanups. So subsequent calls to CleanUp will not invoke the same cleanup functions.

func (*Request) WriteRequest

func (r *Request) WriteRequest() (*mimirpb.WriteRequest, error)

WriteRequest returns request from supplier function. Function is only called once, and subsequent calls to WriteRequest return the same value.

type RetryConfig

type RetryConfig struct {
	Enabled    bool          `yaml:"enabled" category:"advanced"`
	MinBackoff time.Duration `yaml:"min_backoff" category:"advanced"`
	MaxBackoff time.Duration `yaml:"max_backoff" category:"advanced"`
}

func (*RetryConfig) RegisterFlags

func (cfg *RetryConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet.

func (*RetryConfig) Validate

func (cfg *RetryConfig) Validate() error

type RingConfig

type RingConfig struct {
	Common util.CommonRingConfig `yaml:",inline"`
}

RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

func (*RingConfig) ToBasicLifecyclerConfig

func (cfg *RingConfig) ToBasicLifecyclerConfig(logger log.Logger) (ring.BasicLifecyclerConfig, error)

type UserIDStats

type UserIDStats struct {
	UserID string `json:"userID"`
	UserStats
}

UserIDStats models ingestion statistics for one user, including the user ID

type UserStats

type UserStats struct {
	IngestionRate     float64 `json:"ingestionRate"`
	NumSeries         uint64  `json:"numSeries"`
	APIIngestionRate  float64 `json:"APIIngestionRate"`
	RuleIngestionRate float64 `json:"RuleIngestionRate"`
}

UserStats models ingestion statistics for one user.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL