ingester

package
v1.6.2-0...-b66c343 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: AGPL-3.0 Imports: 86 Imported by: 14

Documentation

Index

Constants

View Source
const (
	// ShardLbName is the internal label to be used by Loki when dividing a stream into smaller pieces.
	// Possible values are only increasing integers starting from 0.
	ShardLbName        = "__stream_shard__"
	ShardLbPlaceholder = "__placeholder__"
)
View Source
const (
	// RingKey is the key under which we store the ingesters ring in the KVStore.
	RingKey = "ring"
)

Variables

View Source
var (
	ErrInvalidLengthCheckpoint = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowCheckpoint   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
View Source
var (
	ErrReadOnly = errors.New("Ingester is shutting down")
)

ErrReadOnly is returned when the ingester is shutting down and a push was attempted.

Functions

func RecoverCheckpoint

func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error

func RecoverWAL

func RecoverWAL(ctx context.Context, reader WALReader, recoverer Recoverer) error

Types

type CheckpointWriter

type CheckpointWriter interface {
	// Advances current checkpoint, can also signal a no-op.
	Advance() (noop bool, err error)
	Write(*Series) error
	// Closes current checkpoint.
	Close(abort bool) error
}

type Checkpointer

type Checkpointer struct {
	// contains filtered or unexported fields
}

func NewCheckpointer

func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, metrics *ingesterMetrics, quit <-chan struct{}) *Checkpointer

func (*Checkpointer) PerformCheckpoint

func (c *Checkpointer) PerformCheckpoint() (err error)

func (*Checkpointer) Run

func (c *Checkpointer) Run()

type Chunk

type Chunk struct {
	From        time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"`
	To          time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"`
	FlushedAt   time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"`
	LastUpdated time.Time `protobuf:"bytes,4,opt,name=lastUpdated,proto3,stdtime" json:"lastUpdated"`
	Closed      bool      `protobuf:"varint,5,opt,name=closed,proto3" json:"closed,omitempty"`
	Synced      bool      `protobuf:"varint,6,opt,name=synced,proto3" json:"synced,omitempty"`
	// data to be unmarshaled into a MemChunk
	Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"`
	// data to be unmarshaled into a MemChunk's headBlock
	Head []byte `protobuf:"bytes,8,opt,name=head,proto3" json:"head,omitempty"`
}

Chunk is a {de,}serializable intermediate type for chunkDesc which allows efficient loading/unloading to disk during WAL checkpoint recovery.

func (*Chunk) Descriptor

func (*Chunk) Descriptor() ([]byte, []int)

func (*Chunk) Equal

func (this *Chunk) Equal(that interface{}) bool

func (*Chunk) GetClosed

func (m *Chunk) GetClosed() bool

func (*Chunk) GetData

func (m *Chunk) GetData() []byte

func (*Chunk) GetFlushedAt

func (m *Chunk) GetFlushedAt() time.Time

func (*Chunk) GetFrom

func (m *Chunk) GetFrom() time.Time

func (*Chunk) GetHead

func (m *Chunk) GetHead() []byte

func (*Chunk) GetLastUpdated

func (m *Chunk) GetLastUpdated() time.Time

func (*Chunk) GetSynced

func (m *Chunk) GetSynced() bool

func (*Chunk) GetTo

func (m *Chunk) GetTo() time.Time

func (*Chunk) GoString

func (this *Chunk) GoString() string

func (*Chunk) Marshal

func (m *Chunk) Marshal() (dAtA []byte, err error)

func (*Chunk) MarshalTo

func (m *Chunk) MarshalTo(dAtA []byte) (int, error)

func (*Chunk) MarshalToSizedBuffer

func (m *Chunk) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Chunk) ProtoMessage

func (*Chunk) ProtoMessage()

func (*Chunk) Reset

func (m *Chunk) Reset()

func (*Chunk) Size

func (m *Chunk) Size() (n int)

func (*Chunk) String

func (this *Chunk) String() string

func (*Chunk) Unmarshal

func (m *Chunk) Unmarshal(dAtA []byte) error

func (*Chunk) XXX_DiscardUnknown

func (m *Chunk) XXX_DiscardUnknown()

func (*Chunk) XXX_Marshal

func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Chunk) XXX_Merge

func (m *Chunk) XXX_Merge(src proto.Message)

func (*Chunk) XXX_Size

func (m *Chunk) XXX_Size() int

func (*Chunk) XXX_Unmarshal

func (m *Chunk) XXX_Unmarshal(b []byte) error

type Config

type Config struct {
	LifecyclerConfig ring.LifecyclerConfig `` /* 145-byte string literal not displayed */

	ConcurrentFlushes int           `yaml:"concurrent_flushes"`
	FlushCheckPeriod  time.Duration `yaml:"flush_check_period"`
	FlushOpTimeout    time.Duration `yaml:"flush_op_timeout"`
	RetainPeriod      time.Duration `yaml:"chunk_retain_period"`
	MaxChunkIdle      time.Duration `yaml:"chunk_idle_period"`
	BlockSize         int           `yaml:"chunk_block_size"`
	TargetChunkSize   int           `yaml:"chunk_target_size"`
	ChunkEncoding     string        `yaml:"chunk_encoding"`

	MaxChunkAge         time.Duration `yaml:"max_chunk_age"`
	AutoForgetUnhealthy bool          `yaml:"autoforget_unhealthy"`

	// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
	SyncPeriod         time.Duration `yaml:"sync_period"`
	SyncMinUtilization float64       `yaml:"sync_min_utilization"`

	MaxReturnedErrors int `yaml:"max_returned_stream_errors"`

	QueryStore                  bool          `yaml:"-"`
	QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`

	WAL WALConfig `` /* 225-byte string literal not displayed */

	ChunkFilterer          chunk.RequestChunkFilterer     `yaml:"-"`
	PipelineWrapper        lokilog.PipelineWrapper        `yaml:"-"`
	SampleExtractorWrapper lokilog.SampleExtractorWrapper `yaml:"-"`

	// Optional wrapper that can be used to modify the behaviour of the ingester
	Wrapper Wrapper `yaml:"-"`

	IndexShards int `yaml:"index_shards"`

	MaxDroppedStreams int `yaml:"max_dropped_streams"`

	ShutdownMarkerPath string `yaml:"shutdown_marker_path"`
	// contains filtered or unexported fields
}

Config for an ingester.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the flags.

func (*Config) Validate

func (cfg *Config) Validate() error

type Flusher

type Flusher interface {
	Flush()
}

type Ingester

type Ingester struct {
	services.Service
	// contains filtered or unexported fields
}

Ingester builds chunks for incoming log streams.

func New

func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger) (*Ingester, error)

New makes a new Ingester.

func (*Ingester) CheckReady added in v1.5.0

func (i *Ingester) CheckReady(ctx context.Context) error

ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.

func (*Ingester) Flush

func (i *Ingester) Flush()

Flush implements ring.FlushTransferer Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.

func (*Ingester) FlushHandler

func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)

FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.

func (*Ingester) GetChunkIDs

GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.

func (*Ingester) GetOrCreateInstance

func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error)

func (*Ingester) GetStats

func (*Ingester) GetStreamRates

GetStreamRates returns a response containing all streams and their current rate TODO: It might be nice for this to be human readable, eventually: Sort output and return labels, too?

func (*Ingester) GetVolume

func (*Ingester) InitFlushQueues

func (i *Ingester) InitFlushQueues()

Note: this is called both during the WAL replay (zero or more times) and then after replay as well.

func (*Ingester) Label

Label returns the set of labels for the stream this ingester knows about.

func (*Ingester) PrepareShutdown

func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request)

PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.

Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received. Releasing resources meaning flushing data, deleting tokens, and removing itself from the ring.

It also creates a file on disk which is used to re-apply the configuration if the ingester crashes and restarts before being permanently shutdown.

* `GET` shows the status of this configuration * `POST` enables this configuration * `DELETE` disables this configuration

func (*Ingester) Push

Push implements logproto.Pusher.

func (*Ingester) Query

func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error

Query the ingests for log streams matching a set of matchers.

func (*Ingester) QuerySample added in v1.6.0

func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error

QuerySample the ingesters for series from logs matching a set of matchers.

func (*Ingester) Series added in v1.3.0

Series queries the ingester for log stream identifiers (label sets) matching a set of matchers

func (*Ingester) SetChunkFilterer

func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

func (*Ingester) SetExtractorWrapper

func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper)

func (*Ingester) SetPipelineWrapper

func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper)

func (*Ingester) ShutdownHandler

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)

ShutdownHandler handles a graceful shutdown of the ingester service and termination of the Loki process.

func (*Ingester) Tail added in v0.2.0

func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error

Tail logs matching given query

func (*Ingester) TailersCount added in v1.4.0

TailersCount returns count of active tail requests from a user

func (*Ingester) TransferOut

func (i *Ingester) TransferOut(_ context.Context) error

TransferOut implements ring.FlushTransferer Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more. We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.

func (*Ingester) Watch

Watch implements grpc_health_v1.HealthCheck.

type Interface

type Interface interface {
	services.Service

	logproto.PusherServer
	logproto.QuerierServer
	logproto.StreamDataServer

	CheckReady(ctx context.Context) error
	FlushHandler(w http.ResponseWriter, _ *http.Request)
	GetOrCreateInstance(instanceID string) (*instance, error)
	ShutdownHandler(w http.ResponseWriter, r *http.Request)
	PrepareShutdown(w http.ResponseWriter, r *http.Request)
}

Interface is an interface for the Ingester

type Limiter added in v1.3.0

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of streams an ingester can handle for a specific tenant

func NewLimiter added in v1.3.0

func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter

NewLimiter makes a new limiter

func (*Limiter) AssertMaxStreamsPerUser added in v1.3.0

func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error

AssertMaxStreamsPerUser ensures limit has not been reached compared to the current number of streams in input and returns an error if so.

func (*Limiter) DisableForWALReplay

func (l *Limiter) DisableForWALReplay()

func (*Limiter) Enable

func (l *Limiter) Enable()

func (*Limiter) RateLimit

func (l *Limiter) RateLimit(tenant string) validation.RateLimit

func (*Limiter) UnorderedWrites

func (l *Limiter) UnorderedWrites(userID string) bool

type Limits

type Limits interface {
	UnorderedWrites(userID string) bool
	MaxLocalStreamsPerUser(userID string) int
	MaxGlobalStreamsPerUser(userID string) int
	PerStreamRateLimit(userID string) validation.RateLimit
	ShardStreams(userID string) *shardstreams.Config
}

type NoopWALReader

type NoopWALReader struct{}

func (NoopWALReader) Close

func (NoopWALReader) Close() error

func (NoopWALReader) Err

func (NoopWALReader) Err() error

func (NoopWALReader) Next

func (NoopWALReader) Next() bool

func (NoopWALReader) Record

func (NoopWALReader) Record() []byte

type OnceSwitch

type OnceSwitch struct {
	// contains filtered or unexported fields
}

OnceSwitch is an optimized switch that can only ever be switched "on" in a concurrent environment.

func (*OnceSwitch) Get

func (o *OnceSwitch) Get() bool

func (*OnceSwitch) Trigger

func (o *OnceSwitch) Trigger()

func (*OnceSwitch) TriggerAnd

func (o *OnceSwitch) TriggerAnd(fn func())

TriggerAnd will ensure the switch is on and run the provided function if the switch was not already toggled on.

type QuerierQueryServer

type QuerierQueryServer interface {
	Context() context.Context
	Send(res *logproto.QueryResponse) error
}

QuerierQueryServer is the GRPC server stream we use to send batch of entries.

type RateLimiterStrategy

type RateLimiterStrategy interface {
	RateLimit(tenant string) validation.RateLimit
}

type Recoverer

type Recoverer interface {
	NumWorkers() int
	Series(series *Series) error
	SetStream(ctx context.Context, userID string, series record.RefSeries) error
	Push(userID string, entries wal.RefEntries) error
	Done() <-chan struct{}
}

type RingCount added in v1.3.0

type RingCount interface {
	HealthyInstancesCount() int
}

RingCount is the interface exposed by a ring implementation which allows to count members

type Series

type Series struct {
	UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"`
	// post mapped fingerprint is necessary because subsequent wal writes will reference it.
	Fingerprint uint64                                              `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
	Labels      []github_com_grafana_loki_pkg_logproto.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/grafana/loki/pkg/logproto.LabelAdapter" json:"labels"`
	Chunks      []Chunk                                             `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"`
	// most recently pushed timestamp.
	To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"`
	// most recently pushed line.
	LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"`
	// highest counter value for pushes to this stream.
	// Used to skip already applied entries during WAL replay.
	EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"`
	// highest timestamp pushed to this stream.
	HighestTs time.Time `protobuf:"bytes,8,opt,name=highestTs,proto3,stdtime" json:"highestTs"`
}

Series is a {de,}serializable intermediate type for Series.

func (*Series) Descriptor

func (*Series) Descriptor() ([]byte, []int)

func (*Series) Equal

func (this *Series) Equal(that interface{}) bool

func (*Series) GetChunks

func (m *Series) GetChunks() []Chunk

func (*Series) GetEntryCt

func (m *Series) GetEntryCt() int64

func (*Series) GetFingerprint

func (m *Series) GetFingerprint() uint64

func (*Series) GetHighestTs

func (m *Series) GetHighestTs() time.Time

func (*Series) GetLastLine

func (m *Series) GetLastLine() string

func (*Series) GetTo

func (m *Series) GetTo() time.Time

func (*Series) GetUserID

func (m *Series) GetUserID() string

func (*Series) GoString

func (this *Series) GoString() string

func (*Series) Marshal

func (m *Series) Marshal() (dAtA []byte, err error)

func (*Series) MarshalTo

func (m *Series) MarshalTo(dAtA []byte) (int, error)

func (*Series) MarshalToSizedBuffer

func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Series) ProtoMessage

func (*Series) ProtoMessage()

func (*Series) Reset

func (m *Series) Reset()

func (*Series) Size

func (m *Series) Size() (n int)

func (*Series) String

func (this *Series) String() string

func (*Series) Unmarshal

func (m *Series) Unmarshal(dAtA []byte) error

func (*Series) XXX_DiscardUnknown

func (m *Series) XXX_DiscardUnknown()

func (*Series) XXX_Marshal

func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Series) XXX_Merge

func (m *Series) XXX_Merge(src proto.Message)

func (*Series) XXX_Size

func (m *Series) XXX_Size() int

func (*Series) XXX_Unmarshal

func (m *Series) XXX_Unmarshal(b []byte) error

type SeriesIter

type SeriesIter interface {
	Count() int
	Iter() *streamIterator
	Stop()
}

type SeriesWithErr

type SeriesWithErr struct {
	Err    error
	Series *Series
}

type Store

Store is the store interface we need on the ingester.

type StreamRateCalculator

type StreamRateCalculator struct {
	// contains filtered or unexported fields
}

func NewStreamRateCalculator

func NewStreamRateCalculator() *StreamRateCalculator

func (*StreamRateCalculator) Rates

func (*StreamRateCalculator) Record

func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoShard uint64, bytes int)

func (*StreamRateCalculator) Stop

func (c *StreamRateCalculator) Stop()

type StreamRateLimiter

type StreamRateLimiter struct {
	// contains filtered or unexported fields
}

func NewStreamRateLimiter

func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter

func (*StreamRateLimiter) AllowN

func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool

type TailServer

type TailServer interface {
	Send(*logproto.TailResponse) error
	Context() context.Context
}

type WAL

type WAL interface {
	Start()
	// Log marshalls the records and writes it into the WAL.
	Log(*wal.Record) error
	// Stop stops all the WAL operations.
	Stop() error
}

WAL interface allows us to have a no-op WAL when the WAL is disabled.

type WALCheckpointWriter

type WALCheckpointWriter struct {
	// contains filtered or unexported fields
}

func (*WALCheckpointWriter) Advance

func (w *WALCheckpointWriter) Advance() (bool, error)

func (*WALCheckpointWriter) Close

func (w *WALCheckpointWriter) Close(abort bool) error

func (*WALCheckpointWriter) Write

func (w *WALCheckpointWriter) Write(s *Series) error

type WALConfig

type WALConfig struct {
	Enabled             bool             `yaml:"enabled"`
	Dir                 string           `yaml:"dir"`
	CheckpointDuration  time.Duration    `yaml:"checkpoint_duration"`
	FlushOnShutdown     bool             `yaml:"flush_on_shutdown"`
	ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"`
}

func (*WALConfig) RegisterFlags

func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*WALConfig) Validate

func (cfg *WALConfig) Validate() error

type WALReader

type WALReader interface {
	Next() bool
	Err() error
	// Record should not be used across multiple calls to Next()
	Record() []byte
}

type Wrapper

type Wrapper interface {
	Wrap(wrapped Interface) Interface
}

Directories

Path Synopsis
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL