ingester

package
v0.0.0-...-fbfa8e9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: AGPL-3.0 Imports: 83 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// IngesterRingKey is the key under which we store the ingesters ring in the KVStore.
	IngesterRingKey = "ring"

	// PartitionRingKey is the key under which we store the partitions ring used by the "ingest storage".
	PartitionRingKey  = "ingester-partitions"
	PartitionRingName = "ingester-partitions"
)

Variables

View Source
var ErrSpreadMinimizingValidation = fmt.Errorf("%q token generation strategy is misconfigured", tokenGenerationSpreadMinimizing)

ErrSpreadMinimizingValidation is a sentinel error that indicates a failure in the validation of spread minimizing token generation config.

Functions

func SetDefaultInstanceLimitsForYAMLUnmarshalling

func SetDefaultInstanceLimitsForYAMLUnmarshalling(l InstanceLimits)

Types

type ActivityTrackerWrapper

type ActivityTrackerWrapper struct {
	// contains filtered or unexported fields
}

ActivityTrackerWrapper is a wrapper around Ingester that adds queries to activity tracker.

func NewIngesterActivityTracker

func NewIngesterActivityTracker(ing *Ingester, tracker *activitytracker.ActivityTracker) *ActivityTrackerWrapper

func (*ActivityTrackerWrapper) ActiveSeries

func (*ActivityTrackerWrapper) AllUserStats

func (*ActivityTrackerWrapper) FlushHandler

func (i *ActivityTrackerWrapper) FlushHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) LabelNames

func (*ActivityTrackerWrapper) LabelNamesAndValues

func (*ActivityTrackerWrapper) LabelValues

func (*ActivityTrackerWrapper) LabelValuesCardinality

func (*ActivityTrackerWrapper) MetricsForLabelMatchers

func (*ActivityTrackerWrapper) MetricsMetadata

func (*ActivityTrackerWrapper) PrepareInstanceRingDownscaleHandler

func (i *ActivityTrackerWrapper) PrepareInstanceRingDownscaleHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) PreparePartitionDownscaleHandler

func (i *ActivityTrackerWrapper) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) PrepareShutdownHandler

func (i *ActivityTrackerWrapper) PrepareShutdownHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) PrepareUnregisterHandler

func (i *ActivityTrackerWrapper) PrepareUnregisterHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) Push

func (*ActivityTrackerWrapper) QueryExemplars

func (*ActivityTrackerWrapper) QueryStream

func (*ActivityTrackerWrapper) ShutdownHandler

func (i *ActivityTrackerWrapper) ShutdownHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) TenantTSDBHandler

func (i *ActivityTrackerWrapper) TenantTSDBHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) TenantsHandler

func (i *ActivityTrackerWrapper) TenantsHandler(w http.ResponseWriter, r *http.Request)

func (*ActivityTrackerWrapper) UserRegistryHandler

func (i *ActivityTrackerWrapper) UserRegistryHandler(writer http.ResponseWriter, request *http.Request)

func (*ActivityTrackerWrapper) UserStats

type BlocksUploader

type BlocksUploader interface {
	Sync(ctx context.Context) (uploaded int, err error)
}

BlocksUploader interface is used to have an easy way to mock it in tests.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	Enabled                    bool          `yaml:"enabled" category:"experimental"`
	FailureThresholdPercentage uint          `yaml:"failure_threshold_percentage" category:"experimental"`
	FailureExecutionThreshold  uint          `yaml:"failure_execution_threshold" category:"experimental"`
	ThresholdingPeriod         time.Duration `yaml:"thresholding_period" category:"experimental"`
	CooldownPeriod             time.Duration `yaml:"cooldown_period" category:"experimental"`
	InitialDelay               time.Duration `yaml:"initial_delay" category:"experimental"`
	RequestTimeout             time.Duration `yaml:"request_timeout" category:"experimental"`
	// contains filtered or unexported fields
}

func (*CircuitBreakerConfig) RegisterFlagsWithPrefix

func (cfg *CircuitBreakerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet, defaultRequestDuration time.Duration)

type Config

type Config struct {
	IngesterRing          RingConfig          `yaml:"ring"`
	IngesterPartitionRing PartitionRingConfig `yaml:"partition_ring" category:"experimental"`

	// Config for metadata purging.
	MetadataRetainPeriod time.Duration `yaml:"metadata_retain_period" category:"advanced"`

	RateUpdatePeriod time.Duration `yaml:"rate_update_period" category:"advanced"`

	ActiveSeriesMetrics activeseries.Config `yaml:",inline"`

	TSDBConfigUpdatePeriod time.Duration `yaml:"tsdb_config_update_period" category:"experimental"`

	BlocksStorageConfig         mimir_tsdb.BlocksStorageConfig `yaml:"-"`
	StreamChunksWhenUsingBlocks bool                           `yaml:"-" category:"advanced"`
	// Runtime-override for type of streaming query to use (chunks or samples).
	StreamTypeFn func() QueryStreamType `yaml:"-"`

	DefaultLimits    InstanceLimits         `yaml:"instance_limits"`
	InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

	IgnoreSeriesLimitForMetricNames string `yaml:"ignore_series_limit_for_metric_names" category:"advanced"`

	ReadPathCPUUtilizationLimit          float64 `yaml:"read_path_cpu_utilization_limit" category:"experimental"`
	ReadPathMemoryUtilizationLimit       uint64  `yaml:"read_path_memory_utilization_limit" category:"experimental"`
	LogUtilizationBasedLimiterCPUSamples bool    `yaml:"log_utilization_based_limiter_cpu_samples" category:"experimental"`

	ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"advanced"`

	// UseIngesterOwnedSeriesForLimits was added in 2.12, but we keep it experimental until we decide, what is the correct behaviour
	// when the replication factor and the number of zones don't match. Refer to notes in https://github.com/grafana/mimir/pull/8695 and https://github.com/grafana/mimir/pull/9496
	UseIngesterOwnedSeriesForLimits bool          `yaml:"use_ingester_owned_series_for_limits" category:"experimental"`
	UpdateIngesterOwnedSeries       bool          `yaml:"track_ingester_owned_series" category:"experimental"`
	OwnedSeriesUpdateInterval       time.Duration `yaml:"owned_series_update_interval" category:"experimental"`

	PushCircuitBreaker CircuitBreakerConfig `yaml:"push_circuit_breaker"`
	ReadCircuitBreaker CircuitBreakerConfig `yaml:"read_circuit_breaker"`

	PushGrpcMethodEnabled bool `yaml:"push_grpc_method_enabled" category:"experimental" doc:"hidden"`

	// This config is dynamically injected because defined outside the ingester config.
	IngestStorageConfig ingest.Config `yaml:"-"`
	// contains filtered or unexported fields
}

Config for an Ingester.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*Config) Validate

func (cfg *Config) Validate(log.Logger) error

type Ingester

type Ingester struct {
	*services.BasicService
	// contains filtered or unexported fields
}

Ingester deals with "in flight" chunks. Based on Prometheus 1.x MemorySeriesStorage.

func New

func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error)

New returns an Ingester that uses Mimir block storage.

func NewForFlusher

func NewForFlusher(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error)

NewForFlusher is a special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react on Flush method and flush all openened TSDBs when called.

func (*Ingester) ActiveSeries

func (i *Ingester) ActiveSeries(request *client.ActiveSeriesRequest, stream client.Ingester_ActiveSeriesServer) (err error)

ActiveSeries implements the ActiveSeries RPC. It returns a stream of active series that match the given matchers.

func (*Ingester) AllUserStats

func (i *Ingester) AllUserStats(_ context.Context, req *client.UserStatsRequest) (resp *client.UsersStatsResponse, err error)

AllUserStats returns some per-tenant statistics about the data ingested in this ingester.

When using the experimental ingest storage, this function doesn't support the read consistency setting because the purpose of this function is to show a snapshot of the live ingester's state.

func (*Ingester) CheckReady

func (i *Ingester) CheckReady(ctx context.Context) error

CheckReady is the readiness handler used to indicate to k8s when the ingesters are ready for the addition or removal of another ingester.

func (*Ingester) FinishPushRequest

func (i *Ingester) FinishPushRequest(ctx context.Context)

func (*Ingester) Flush

func (i *Ingester) Flush()

Flush will flush all data. It is called as part of Lifecycler's shutdown (if flush on shutdown is configured), or from the flusher.

When called as during Lifecycler shutdown, this happens as part of normal Ingester shutdown (see stopping method). Samples are not received at this stage. Compaction and Shipping loops have already been stopped as well.

When used from flusher, ingester is constructed in a way that compaction, shipping and receiving of samples is never started.

func (*Ingester) FlushHandler

func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request)

Blocks version of Flush handler. It force-compacts blocks, and triggers shipping.

func (*Ingester) LabelNames

func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (resp *client.LabelNamesResponse, err error)

func (*Ingester) LabelNamesAndValues

func (i *Ingester) LabelNamesAndValues(request *client.LabelNamesAndValuesRequest, stream client.Ingester_LabelNamesAndValuesServer) (err error)

func (*Ingester) LabelValues

func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (resp *client.LabelValuesResponse, err error)

func (*Ingester) MetricsForLabelMatchers

func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (resp *client.MetricsForLabelMatchersResponse, err error)

MetricsForLabelMatchers implements IngesterServer.

func (*Ingester) MetricsMetadata

func (i *Ingester) MetricsMetadata(ctx context.Context, req *client.MetricsMetadataRequest) (resp *client.MetricsMetadataResponse, err error)

MetricsMetadata returns all the metrics metadata of a user.

func (*Ingester) PrepareInstanceRingDownscaleHandler

func (i *Ingester) PrepareInstanceRingDownscaleHandler(w http.ResponseWriter, r *http.Request)

PrepareInstanceRingDownscaleHandler prepares the ingester ring entry for downscaling. It can mark ingester as read-only or set it back to read-write mode.

Following methods are supported:

  • GET Returns timestamp when ingester ring entry was switched to read-only mode, or 0, if ring entry is not in read-only mode.

  • POST Switches the ingester ring entry to read-only mode (if it isn't yet), and returns the timestamp when the switch to read-only mode happened.

  • DELETE Sets ingester ring entry back to read-write mode.

func (*Ingester) PreparePartitionDownscaleHandler

func (i *Ingester) PreparePartitionDownscaleHandler(w http.ResponseWriter, r *http.Request)

PreparePartitionDownscaleHandler prepares the ingester's partition downscaling. The partition owned by the ingester will switch to INACTIVE state (read-only).

Following methods are supported:

  • GET Returns timestamp when partition was switched to INACTIVE state, or 0, if partition is not in INACTIVE state.

  • POST Switches the partition to INACTIVE state (if not yet), and returns the timestamp when the switch to INACTIVE state happened.

  • DELETE Sets partition back from INACTIVE to ACTIVE state.

func (*Ingester) PrepareShutdownHandler

func (i *Ingester) PrepareShutdownHandler(w http.ResponseWriter, r *http.Request)

PrepareShutdownHandler inspects or changes the configuration of the ingester such that when it is stopped, it will:

  • Change the state of ring to stop accepting writes.
  • Flush all the chunks to long-term storage.

It also creates a file on disk which is used to re-apply the configuration if the ingester crashes and restarts before being permanently shutdown.

* `GET` shows the status of this configuration * `POST` enables this configuration * `DELETE` disables this configuration

func (*Ingester) PrepareUnregisterHandler

func (i *Ingester) PrepareUnregisterHandler(w http.ResponseWriter, r *http.Request)

PrepareUnregisterHandler manipulates whether an ingester will unregister from the ring on its next termination.

The following methods are supported:

  • GET Returns the ingester's current unregister state.
  • PUT Sets the ingester's unregister state.
  • DELETE Resets the ingester's unregister state to the value passed via the RingConfig.UnregisterOnShutdown ring configuration option.

All methods are idempotent.

func (*Ingester) Push

Push implements client.IngesterServer, which is registered into gRPC server.

func (*Ingester) PushToStorage

func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error

PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.

func (*Ingester) PushWithCleanup

func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteRequest, cleanUp func()) (returnErr error)

PushWithCleanup is the Push() implementation for blocks storage and takes a WriteRequest and adds it to the TSDB head.

func (*Ingester) QueryExemplars

func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQueryRequest) (resp *client.ExemplarQueryResponse, err error)

func (*Ingester) QueryStream

func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) (err error)

QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface

func (*Ingester) RemoveGroupMetricsForUser

func (i *Ingester) RemoveGroupMetricsForUser(userID, group string)

func (*Ingester) RingHandler

func (i *Ingester) RingHandler() http.Handler

func (*Ingester) ShutdownHandler

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request)

ShutdownHandler triggers the following set of operations in order:

  • Change the state of ring to stop accepting writes.
  • Flush all the chunks.

func (*Ingester) StartPushRequest

func (i *Ingester) StartPushRequest(ctx context.Context, reqSize int64) (context.Context, error)

StartPushRequest checks if ingester can start push request, and increments relevant counters. If new push request cannot be started, errors convertible to gRPC status code are returned, and metrics are updated.

func (*Ingester) TenantTSDBHandler

func (i *Ingester) TenantTSDBHandler(w http.ResponseWriter, req *http.Request)

func (*Ingester) TenantsHandler

func (i *Ingester) TenantsHandler(w http.ResponseWriter, req *http.Request)

func (*Ingester) TransferOut

func (i *Ingester) TransferOut(_ context.Context) error

TransferOut implements ring.FlushTransferer.

func (*Ingester) UserRegistryHandler

func (i *Ingester) UserRegistryHandler(w http.ResponseWriter, r *http.Request)

func (*Ingester) UserStats

func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (resp *client.UserStatsResponse, err error)

type InstanceLimits

type InstanceLimits struct {
	MaxIngestionRate             float64 `yaml:"max_ingestion_rate" category:"advanced"`
	MaxInMemoryTenants           int64   `yaml:"max_tenants" category:"advanced"`
	MaxInMemorySeries            int64   `yaml:"max_series" category:"advanced"`
	MaxInflightPushRequests      int64   `yaml:"max_inflight_push_requests" category:"advanced"`
	MaxInflightPushRequestsBytes int64   `yaml:"max_inflight_push_requests_bytes" category:"advanced"`
}

InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return (internal) error.

func (*InstanceLimits) RegisterFlags

func (l *InstanceLimits) RegisterFlags(f *flag.FlagSet)

func (*InstanceLimits) UnmarshalYAML

func (l *InstanceLimits) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML implements the yaml.Unmarshaler interface.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of series, exemplars, metadata, etc. that an ingester can handle for a specific tenant

func NewLimiter

func NewLimiter(limits limiterTenantLimits, limiterRingSupport limiterRingStrategy) *Limiter

NewLimiter makes a new in-memory series limiter

func (*Limiter) IsWithinMaxMetadataPerMetric

func (l *Limiter) IsWithinMaxMetadataPerMetric(userID string, metadata int) bool

IsWithinMaxMetadataPerMetric returns true if limit has not been reached compared to the current number of metadata per metric in input; otherwise returns false.

func (*Limiter) IsWithinMaxMetricsWithMetadataPerUser

func (l *Limiter) IsWithinMaxMetricsWithMetadataPerUser(userID string, metrics int) bool

IsWithinMaxMetricsWithMetadataPerUser returns true if limit has not been reached compared to the current number of metrics with metadata in input; otherwise returns false.

func (*Limiter) IsWithinMaxSeriesPerMetric

func (l *Limiter) IsWithinMaxSeriesPerMetric(userID string, series int) bool

IsWithinMaxSeriesPerMetric returns true if limit has not been reached compared to the current number of series in input; otherwise returns false.

func (*Limiter) IsWithinMaxSeriesPerUser

func (l *Limiter) IsWithinMaxSeriesPerUser(userID string, series int, minLocalLimit int) bool

IsWithinMaxSeriesPerUser returns true if limit has not been reached compared to the current number of series in input; otherwise returns false.

type PartitionRingConfig

type PartitionRingConfig struct {
	KVStore kv.Config `` /* 217-byte string literal not displayed */

	// MinOwnersCount maps to ring.PartitionInstanceLifecyclerConfig's WaitOwnersCountOnPending.
	MinOwnersCount int `yaml:"min_partition_owners_count"`

	// MinOwnersDuration maps to ring.PartitionInstanceLifecyclerConfig's WaitOwnersDurationOnPending.
	MinOwnersDuration time.Duration `yaml:"min_partition_owners_duration"`

	// DeleteInactivePartitionAfter maps to ring.PartitionInstanceLifecyclerConfig's DeleteInactivePartitionAfterDuration.
	DeleteInactivePartitionAfter time.Duration `yaml:"delete_inactive_partition_after"`
	// contains filtered or unexported fields
}

func (*PartitionRingConfig) RegisterFlags

func (cfg *PartitionRingConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*PartitionRingConfig) ToLifecyclerConfig

func (cfg *PartitionRingConfig) ToLifecyclerConfig(partitionID int32, instanceID string) ring.PartitionInstanceLifecyclerConfig

type QueryStreamType

type QueryStreamType int

QueryStreamType defines type of function to use when doing query-stream operation.

const (
	QueryStreamDefault QueryStreamType = iota // Use default configured value.
	QueryStreamSamples                        // Stream individual samples.
	QueryStreamChunks                         // Stream entire chunks.
)

type RingConfig

type RingConfig struct {
	KVStore              kv.Config              `` /* 217-byte string literal not displayed */
	HeartbeatPeriod      time.Duration          `yaml:"heartbeat_period" category:"advanced"`
	HeartbeatTimeout     time.Duration          `yaml:"heartbeat_timeout" category:"advanced"`
	ReplicationFactor    int                    `yaml:"replication_factor"`
	ZoneAwarenessEnabled bool                   `yaml:"zone_awareness_enabled"`
	ExcludedZones        flagext.StringSliceCSV `yaml:"excluded_zones" category:"advanced"`

	// Tokens
	TokensFilePath string `yaml:"tokens_file_path"`
	NumTokens      int    `yaml:"num_tokens" category:"advanced"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" category:"advanced" doc:"default=<hostname>"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" category:"advanced" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" category:"advanced"`
	InstanceAddr           string   `yaml:"instance_addr" category:"advanced"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" category:"advanced"`
	InstanceZone           string   `yaml:"instance_availability_zone" category:"advanced"`

	UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"`

	// Config for the ingester lifecycle control
	ObservePeriod    time.Duration `yaml:"observe_period" category:"advanced"`
	MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"`
	FinalSleep       time.Duration `yaml:"final_sleep" category:"advanced"`

	TokenGenerationStrategy         string                 `yaml:"token_generation_strategy" category:"advanced"`
	SpreadMinimizingJoinRingInOrder bool                   `yaml:"spread_minimizing_join_ring_in_order" category:"advanced"`
	SpreadMinimizingZones           flagext.StringSliceCSV `yaml:"spread_minimizing_zones" category:"advanced"`

	// Injected internally
	ListenPort int `yaml:"-"`

	// Used only for testing.
	JoinAfter time.Duration `yaml:"-"`
}

func (*RingConfig) RegisterFlags

func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*RingConfig) ToLifecyclerConfig

func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig

ToLifecyclerConfig returns a ring.LifecyclerConfig based on the ingester ring config.

func (*RingConfig) ToRingConfig

func (cfg *RingConfig) ToRingConfig() ring.Config

ToRingConfig returns a ring.Config based on the ingester ring config.

func (*RingConfig) Validate

func (cfg *RingConfig) Validate() error

type Series

type Series struct {
	// contains filtered or unexported fields
}

Series is a wrapper around index.Postings and tsdb.IndexReader. It implements the generic iterator interface to list all series in the index that are contained in the given postings.

func NewSeries

func NewSeries(postings index.Postings, index tsdb.IndexReader) *Series

func (*Series) At

func (s *Series) At() labels.Labels

func (*Series) Err

func (s *Series) Err() error

func (*Series) Next

func (s *Series) Next() bool

type ShipperConfigProvider

type ShipperConfigProvider interface {
	OutOfOrderBlocksExternalLabelEnabled(userID string) bool
}

type ZeroBucketCountPostings

type ZeroBucketCountPostings struct {
	activeseries.Postings
}

func (*ZeroBucketCountPostings) AtBucketCount

func (z *ZeroBucketCountPostings) AtBucketCount() (storage.SeriesRef, int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL