Documentation ¶
Index ¶
- Constants
- Variables
- type Config
- type FpMapper
- type Ingester
- func (i *Ingester) CheckReady(ctx context.Context) error
- func (i *Ingester) Flush()
- func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)
- func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error)
- func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error)
- func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error)
- func (i *Ingester) InitFlushWorkers()
- func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
- func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) TransferOut(_ context.Context) error
- func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error
- type Interface
- type Limiter
- func (l *Limiter) DisableForWALReplay()
- func (l *Limiter) Enable()
- func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int)
- func (l *Limiter) RateLimit(tenant string) validation.RateLimit
- func (l *Limiter) UnorderedWrites(userID string) bool
- type Limits
- type RateLimiterStrategy
- type RingClient
- type RingCount
- type Storage
- type StreamRateCalculator
- type StreamRateLimiter
- type Tee
- type Wrapper
Constants ¶
const (
// RingKey is the key under which we store the ingesters ring in the KVStore.
RingKey = "ring"
)
Variables ¶
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
var (
ErrReadOnly = errors.New("Ingester is shutting down")
)
ErrReadOnly is returned when the ingester is shutting down and a push was attempted.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Enabled bool `yaml:"enabled" doc:"description=Whether the ingester is enabled."` LifecyclerConfig ring.LifecyclerConfig `` /* 145-byte string literal not displayed */ MaxSegmentAge time.Duration `yaml:"max_segment_age"` MaxSegmentSize int `yaml:"max_segment_size"` MaxSegments int `yaml:"max_segments"` ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` MaxReturnedErrors int `yaml:"max_returned_stream_errors"` // Optional wrapper that can be used to modify the behaviour of the ingester Wrapper Wrapper `yaml:"-"` IndexShards int `yaml:"index_shards"` MaxDroppedStreams int `yaml:"max_dropped_streams"` ShutdownMarkerPath string `yaml:"shutdown_marker_path"` OwnedStreamsCheckInterval time.Duration `` /* 164-byte string literal not displayed */ StreamRetainPeriod time.Duration `yaml:"stream_retain_period" doc:"description=How long stream metadata is retained in memory after it was last seen."` // Tee configs ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."` // contains filtered or unexported fields }
Config for an ingester.
func (*Config) RegisterFlags ¶
RegisterFlags registers the flags.
type FpMapper ¶
type FpMapper struct {
// contains filtered or unexported fields
}
FpMapper is used to map fingerprints in order to work around fingerprint collisions.
func NewFPMapper ¶
func NewFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *FpMapper
NewFPMapper returns an fpMapper ready to use.
func (*FpMapper) MapFP ¶
func (m *FpMapper) MapFP(fp model.Fingerprint, metric labels.Labels) model.Fingerprint
MapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and returns a truly unique fingerprint. The caller must have locked the raw fingerprint.
type Ingester ¶
Ingester builds chunks for incoming log streams.
func New ¶
func New(cfg Config, clientConfig client.Config, periodConfigs []config.PeriodConfig, storageConfig storage.Config, clientMetrics storage.ClientMetrics, limits Limits, configs *runtime.TenantConfigs, metastoreClient metastorepb.MetastoreServiceClient, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, ) (*Ingester, error)
New makes a new Ingester.
func (*Ingester) CheckReady ¶
ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.
func (*Ingester) Flush ¶
func (i *Ingester) Flush()
Flush implements ring.FlushTransferer Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.
func (*Ingester) FlushHandler ¶
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)
FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.
func (*Ingester) GetDetectedFields ¶
func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error)
func (*Ingester) GetOrCreateInstance ¶
func (*Ingester) GetStreamRates ¶
func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error)
GetStreamRates returns a response containing all streams and their current rate TODO: It might be nice for this to be human readable, eventually: Sort output and return labels, too?
func (*Ingester) InitFlushWorkers ¶
func (i *Ingester) InitFlushWorkers()
Note: this is called both during the WAL replay (zero or more times) and then after replay as well.
func (*Ingester) PrepareShutdown ¶
func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request)
PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received. Releasing resources meaning flushing data, deleting tokens, and removing itself from the ring.
It also creates a file on disk which is used to re-apply the configuration if the ingester crashes and restarts before being permanently shutdown.
* `GET` shows the status of this configuration * `POST` enables this configuration * `DELETE` disables this configuration
func (*Ingester) Push ¶
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
Push implements logproto.Pusher.
func (*Ingester) ServeHTTP ¶
func (i *Ingester) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements the pattern ring status page.
func (*Ingester) ShutdownHandler ¶
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownHandler handles a graceful shutdown of the ingester service and termination of the Loki process.
func (*Ingester) TransferOut ¶
TransferOut implements ring.FlushTransferer Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more. We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.
func (*Ingester) Watch ¶
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error
Watch implements grpc_health_v1.HealthCheck.
type Interface ¶
type Interface interface { services.Service http.Handler logproto.PusherServer CheckReady(ctx context.Context) error FlushHandler(w http.ResponseWriter, _ *http.Request) GetOrCreateInstance(instanceID string) (*instance, error) ShutdownHandler(w http.ResponseWriter, r *http.Request) PrepareShutdown(w http.ResponseWriter, r *http.Request) }
Interface is an interface for the Ingester
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements primitives to get the maximum number of streams an ingester can handle for a specific tenant
func NewLimiter ¶
func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter
NewLimiter makes a new limiter
func (*Limiter) DisableForWALReplay ¶
func (l *Limiter) DisableForWALReplay()
func (*Limiter) GetStreamCountLimit ¶
func (*Limiter) UnorderedWrites ¶
type RateLimiterStrategy ¶
type RateLimiterStrategy interface {
RateLimit(tenant string) validation.RateLimit
}
type RingClient ¶
func NewRingClient ¶
func NewRingClient( cfg Config, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, ) (*RingClient, error)
type RingCount ¶
type RingCount interface {
HealthyInstancesCount() int
}
RingCount is the interface exposed by a ring implementation which allows to count members
type Storage ¶
type Storage interface { PutObject(ctx context.Context, objectKey string, object io.Reader) error Stop() }
Storage is the store interface we need on the ingester.
type StreamRateCalculator ¶
type StreamRateCalculator struct {
// contains filtered or unexported fields
}
func NewStreamRateCalculator ¶
func NewStreamRateCalculator() *StreamRateCalculator
func (*StreamRateCalculator) Rates ¶
func (c *StreamRateCalculator) Rates() []logproto.StreamRate
func (*StreamRateCalculator) Record ¶
func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoShard uint64, bytes int)
func (*StreamRateCalculator) Stop ¶
func (c *StreamRateCalculator) Stop()
type StreamRateLimiter ¶
type StreamRateLimiter struct {
// contains filtered or unexported fields
}
func NewStreamRateLimiter ¶
func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter
type Tee ¶
type Tee struct {
// contains filtered or unexported fields
}
func NewTee ¶
func NewTee( cfg Config, ringClient *RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, ) (*Tee, error)
func (*Tee) Duplicate ¶
func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream)
Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters.