ingesterrf1

package
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: AGPL-3.0 Imports: 61 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RingKey is the key under which we store the ingesters ring in the KVStore.
	RingKey = "ring"
)

Variables

View Source
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
View Source
var (
	ErrReadOnly = errors.New("Ingester is shutting down")
)

ErrReadOnly is returned when the ingester is shutting down and a push was attempted.

Functions

This section is empty.

Types

type Config

type Config struct {
	Enabled bool `yaml:"enabled" doc:"description=Whether the ingester is enabled."`

	LifecyclerConfig    ring.LifecyclerConfig `` /* 145-byte string literal not displayed */
	MaxSegmentAge       time.Duration         `yaml:"max_segment_age"`
	MaxSegmentSize      int                   `yaml:"max_segment_size"`
	MaxSegments         int                   `yaml:"max_segments"`
	ConcurrentFlushes   int                   `yaml:"concurrent_flushes"`
	FlushCheckPeriod    time.Duration         `yaml:"flush_check_period"`
	FlushOpBackoff      backoff.Config        `yaml:"flush_op_backoff"`
	FlushOpTimeout      time.Duration         `yaml:"flush_op_timeout"`
	AutoForgetUnhealthy bool                  `yaml:"autoforget_unhealthy"`

	MaxReturnedErrors int `yaml:"max_returned_stream_errors"`

	// Optional wrapper that can be used to modify the behaviour of the ingester
	Wrapper Wrapper `yaml:"-"`

	IndexShards int `yaml:"index_shards"`

	MaxDroppedStreams int `yaml:"max_dropped_streams"`

	ShutdownMarkerPath string `yaml:"shutdown_marker_path"`

	OwnedStreamsCheckInterval time.Duration `` /* 164-byte string literal not displayed */
	StreamRetainPeriod        time.Duration `yaml:"stream_retain_period" doc:"description=How long stream metadata is retained in memory after it was last seen."`

	// Tee configs
	ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
	// contains filtered or unexported fields
}

Config for an ingester.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the flags.

func (*Config) Validate

func (cfg *Config) Validate() error

type FpMapper

type FpMapper struct {
	// contains filtered or unexported fields
}

FpMapper is used to map fingerprints in order to work around fingerprint collisions.

func NewFPMapper

func NewFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *FpMapper

NewFPMapper returns an fpMapper ready to use.

func (*FpMapper) MapFP

func (m *FpMapper) MapFP(fp model.Fingerprint, metric labels.Labels) model.Fingerprint

MapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and returns a truly unique fingerprint. The caller must have locked the raw fingerprint.

type Ingester

type Ingester struct {
	services.Service
	// contains filtered or unexported fields
}

Ingester builds chunks for incoming log streams.

func New

func New(cfg Config, clientConfig client.Config,
	periodConfigs []config.PeriodConfig,
	storageConfig storage.Config,
	clientMetrics storage.ClientMetrics,
	limits Limits, configs *runtime.TenantConfigs,
	metastoreClient metastorepb.MetastoreServiceClient,
	registerer prometheus.Registerer,
	writeFailuresCfg writefailures.Cfg,
	metricsNamespace string,
	logger log.Logger,
	customStreamsTracker push.UsageTracker, readRing ring.ReadRing,
) (*Ingester, error)

New makes a new Ingester.

func (*Ingester) CheckReady

func (i *Ingester) CheckReady(ctx context.Context) error

ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.

func (*Ingester) Flush

func (i *Ingester) Flush()

Flush implements ring.FlushTransferer Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.

func (*Ingester) FlushHandler

func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)

FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.

func (*Ingester) GetOrCreateInstance

func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error)

func (*Ingester) GetStreamRates

GetStreamRates returns a response containing all streams and their current rate TODO: It might be nice for this to be human readable, eventually: Sort output and return labels, too?

func (*Ingester) InitFlushWorkers

func (i *Ingester) InitFlushWorkers()

Note: this is called both during the WAL replay (zero or more times) and then after replay as well.

func (*Ingester) PrepareShutdown

func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request)

PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.

Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received. Releasing resources meaning flushing data, deleting tokens, and removing itself from the ring.

It also creates a file on disk which is used to re-apply the configuration if the ingester crashes and restarts before being permanently shutdown.

* `GET` shows the status of this configuration * `POST` enables this configuration * `DELETE` disables this configuration

func (*Ingester) Push

Push implements logproto.Pusher.

func (*Ingester) ServeHTTP

func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements the pattern ring status page.

func (*Ingester) ShutdownHandler

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)

ShutdownHandler handles a graceful shutdown of the ingester service and termination of the Loki process.

func (*Ingester) TransferOut

func (i *Ingester) TransferOut(_ context.Context) error

TransferOut implements ring.FlushTransferer Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more. We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.

func (*Ingester) Watch

Watch implements grpc_health_v1.HealthCheck.

type Interface

type Interface interface {
	services.Service
	http.Handler

	logproto.PusherServer

	CheckReady(ctx context.Context) error
	FlushHandler(w http.ResponseWriter, _ *http.Request)
	GetOrCreateInstance(instanceID string) (*instance, error)
	ShutdownHandler(w http.ResponseWriter, r *http.Request)
	PrepareShutdown(w http.ResponseWriter, r *http.Request)
}

Interface is an interface for the Ingester

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of streams an ingester can handle for a specific tenant

func NewLimiter

func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter

NewLimiter makes a new limiter

func (*Limiter) DisableForWALReplay

func (l *Limiter) DisableForWALReplay()

func (*Limiter) Enable

func (l *Limiter) Enable()

func (*Limiter) GetStreamCountLimit

func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int)

func (*Limiter) RateLimit

func (l *Limiter) RateLimit(tenant string) validation.RateLimit

func (*Limiter) UnorderedWrites

func (l *Limiter) UnorderedWrites(userID string) bool

type Limits

type Limits interface {
	UnorderedWrites(userID string) bool
	UseOwnedStreamCount(userID string) bool
	MaxLocalStreamsPerUser(userID string) int
	MaxGlobalStreamsPerUser(userID string) int
	PerStreamRateLimit(userID string) validation.RateLimit
	ShardStreams(userID string) shardstreams.Config
}

type RateLimiterStrategy

type RateLimiterStrategy interface {
	RateLimit(tenant string) validation.RateLimit
}

type RingClient

type RingClient struct {
	services.Service
	// contains filtered or unexported fields
}

func NewRingClient

func NewRingClient(
	cfg Config,
	metricsNamespace string,
	registerer prometheus.Registerer,
	logger log.Logger,
) (*RingClient, error)

type RingCount

type RingCount interface {
	HealthyInstancesCount() int
}

RingCount is the interface exposed by a ring implementation which allows to count members

type Storage

type Storage interface {
	PutObject(ctx context.Context, objectKey string, object io.Reader) error
	Stop()
}

Storage is the store interface we need on the ingester.

type StreamRateCalculator

type StreamRateCalculator struct {
	// contains filtered or unexported fields
}

func NewStreamRateCalculator

func NewStreamRateCalculator() *StreamRateCalculator

func (*StreamRateCalculator) Rates

func (*StreamRateCalculator) Record

func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoShard uint64, bytes int)

func (*StreamRateCalculator) Stop

func (c *StreamRateCalculator) Stop()

type StreamRateLimiter

type StreamRateLimiter struct {
	// contains filtered or unexported fields
}

func NewStreamRateLimiter

func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter

func (*StreamRateLimiter) AllowN

func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool

type Tee

type Tee struct {
	// contains filtered or unexported fields
}

func NewTee

func NewTee(
	cfg Config,
	ringClient *RingClient,
	metricsNamespace string,
	registerer prometheus.Registerer,
	logger log.Logger,
) (*Tee, error)

func (*Tee) Duplicate

func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream)

Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters.

type Wrapper

type Wrapper interface {
	Wrap(wrapped Interface) Interface
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL