ingester

package
v2.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2022 License: AGPL-3.0 Imports: 73 Imported by: 0

Documentation ¶

Index ¶

Constants ¶

View Source
const (
	// RingKey is the key under which we store the ingesters ring in the KVStore.
	RingKey = "ring"
)

Variables ¶

View Source
var (
	ErrInvalidLengthCheckpoint = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowCheckpoint   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
View Source
var (
	ErrReadOnly = errors.New("Ingester is shutting down")
)

ErrReadOnly is returned when the ingester is shutting down and a push was attempted.

Functions ¶

func RecoverCheckpoint ¶

func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error

func RecoverWAL ¶

func RecoverWAL(reader WALReader, recoverer Recoverer) error

Types ¶

type CheckpointWriter ¶

type CheckpointWriter interface {
	// Advances current checkpoint, can also signal a no-op.
	Advance() (noop bool, err error)
	Write(*Series) error
	// Closes current checkpoint.
	Close(abort bool) error
}

type Checkpointer ¶

type Checkpointer struct {
	// contains filtered or unexported fields
}

func NewCheckpointer ¶

func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, metrics *ingesterMetrics, quit <-chan struct{}) *Checkpointer

func (*Checkpointer) PerformCheckpoint ¶

func (c *Checkpointer) PerformCheckpoint() (err error)

func (*Checkpointer) Run ¶

func (c *Checkpointer) Run()

type Chunk ¶

type Chunk struct {
	From        time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"`
	To          time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"`
	FlushedAt   time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"`
	LastUpdated time.Time `protobuf:"bytes,4,opt,name=lastUpdated,proto3,stdtime" json:"lastUpdated"`
	Closed      bool      `protobuf:"varint,5,opt,name=closed,proto3" json:"closed,omitempty"`
	Synced      bool      `protobuf:"varint,6,opt,name=synced,proto3" json:"synced,omitempty"`
	// data to be unmarshaled into a MemChunk
	Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"`
	// data to be unmarshaled into a MemChunk's headBlock
	Head []byte `protobuf:"bytes,8,opt,name=head,proto3" json:"head,omitempty"`
}

Chunk is a {de,}serializable intermediate type for chunkDesc which allows efficient loading/unloading to disk during WAL checkpoint recovery.

func (*Chunk) Descriptor ¶

func (*Chunk) Descriptor() ([]byte, []int)

func (*Chunk) Equal ¶

func (this *Chunk) Equal(that interface{}) bool

func (*Chunk) GetClosed ¶

func (m *Chunk) GetClosed() bool

func (*Chunk) GetData ¶

func (m *Chunk) GetData() []byte

func (*Chunk) GetFlushedAt ¶

func (m *Chunk) GetFlushedAt() time.Time

func (*Chunk) GetFrom ¶

func (m *Chunk) GetFrom() time.Time

func (*Chunk) GetHead ¶

func (m *Chunk) GetHead() []byte

func (*Chunk) GetLastUpdated ¶

func (m *Chunk) GetLastUpdated() time.Time

func (*Chunk) GetSynced ¶

func (m *Chunk) GetSynced() bool

func (*Chunk) GetTo ¶

func (m *Chunk) GetTo() time.Time

func (*Chunk) GoString ¶

func (this *Chunk) GoString() string

func (*Chunk) Marshal ¶

func (m *Chunk) Marshal() (dAtA []byte, err error)

func (*Chunk) MarshalTo ¶

func (m *Chunk) MarshalTo(dAtA []byte) (int, error)

func (*Chunk) MarshalToSizedBuffer ¶

func (m *Chunk) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Chunk) ProtoMessage ¶

func (*Chunk) ProtoMessage()

func (*Chunk) Reset ¶

func (m *Chunk) Reset()

func (*Chunk) Size ¶

func (m *Chunk) Size() (n int)

func (*Chunk) String ¶

func (this *Chunk) String() string

func (*Chunk) Unmarshal ¶

func (m *Chunk) Unmarshal(dAtA []byte) error

func (*Chunk) XXX_DiscardUnknown ¶

func (m *Chunk) XXX_DiscardUnknown()

func (*Chunk) XXX_Marshal ¶

func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Chunk) XXX_Merge ¶

func (m *Chunk) XXX_Merge(src proto.Message)

func (*Chunk) XXX_Size ¶

func (m *Chunk) XXX_Size() int

func (*Chunk) XXX_Unmarshal ¶

func (m *Chunk) XXX_Unmarshal(b []byte) error

type ChunkStore ¶

type ChunkStore interface {
	Put(ctx context.Context, chunks []chunk.Chunk) error
	SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
	SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error)
	GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*fetcher.Fetcher, error)
	GetSchemaConfigs() []config.PeriodConfig
}

ChunkStore is the interface we need to store chunks.

type Config ¶

type Config struct {
	LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

	// Config for transferring chunks.
	MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`

	ConcurrentFlushes int           `yaml:"concurrent_flushes"`
	FlushCheckPeriod  time.Duration `yaml:"flush_check_period"`
	FlushOpTimeout    time.Duration `yaml:"flush_op_timeout"`
	RetainPeriod      time.Duration `yaml:"chunk_retain_period"`
	MaxChunkIdle      time.Duration `yaml:"chunk_idle_period"`
	BlockSize         int           `yaml:"chunk_block_size"`
	TargetChunkSize   int           `yaml:"chunk_target_size"`
	ChunkEncoding     string        `yaml:"chunk_encoding"`

	MaxChunkAge         time.Duration `yaml:"max_chunk_age"`
	AutoForgetUnhealthy bool          `yaml:"autoforget_unhealthy"`

	// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
	SyncPeriod         time.Duration `yaml:"sync_period"`
	SyncMinUtilization float64       `yaml:"sync_min_utilization"`

	MaxReturnedErrors int `yaml:"max_returned_stream_errors"`

	QueryStore                  bool          `yaml:"-"`
	QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`

	WAL WALConfig `yaml:"wal,omitempty"`

	ChunkFilterer chunk.RequestChunkFilterer `yaml:"-"`
	// Optional wrapper that can be used to modify the behaviour of the ingester
	Wrapper Wrapper `yaml:"-"`

	IndexShards int `yaml:"index_shards"`

	MaxDroppedStreams int `yaml:"max_dropped_streams"`
	// contains filtered or unexported fields
}

Config for an ingester.

func (*Config) RegisterFlags ¶

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the flags.

func (*Config) Validate ¶

func (cfg *Config) Validate() error

type Flusher ¶

type Flusher interface {
	Flush()
}

type Ingester ¶

type Ingester struct {
	services.Service
	// contains filtered or unexported fields
}

Ingester builds chunks for incoming log streams.

func New ¶

func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error)

New makes a new Ingester.

func (*Ingester) Check ¶

Check implements grpc_health_v1.HealthCheck.

func (*Ingester) CheckReady ¶

func (i *Ingester) CheckReady(ctx context.Context) error

ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.

func (*Ingester) Flush ¶

func (i *Ingester) Flush()

Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.

func (*Ingester) FlushHandler ¶

func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)

FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.

func (*Ingester) GetChunkIDs ¶

GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.

func (*Ingester) GetOrCreateInstance ¶

func (i *Ingester) GetOrCreateInstance(instanceID string) *instance

func (*Ingester) InitFlushQueues ¶

func (i *Ingester) InitFlushQueues()

Note: this is called both during the WAL replay (zero or more times) and then after replay as well.

func (*Ingester) Label ¶

Label returns the set of labels for the stream this ingester knows about.

func (*Ingester) Push ¶

Push implements logproto.Pusher.

func (*Ingester) Query ¶

func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error

Query the ingests for log streams matching a set of matchers.

func (*Ingester) QuerySample ¶

func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error

QuerySample the ingesters for series from logs matching a set of matchers.

func (*Ingester) Series ¶

Series queries the ingester for log stream identifiers (label sets) matching a set of matchers

func (*Ingester) SetChunkFilterer ¶

func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)

func (*Ingester) ShutdownHandler ¶

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)

ShutdownHandler triggers the following set of operations in order:

  • Change the state of ring to stop accepting writes.
  • Flush all the chunks.

func (*Ingester) Tail ¶

func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error

Tail logs matching given query

func (*Ingester) TailersCount ¶

TailersCount returns count of active tail requests from a user

func (*Ingester) TransferChunks ¶

func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error

TransferChunks receives all chunks from another ingester. The Ingester must be in PENDING state or else the call will fail.

func (*Ingester) TransferOut ¶

func (i *Ingester) TransferOut(ctx context.Context) error

TransferOut implements ring.Lifecycler.

func (*Ingester) Watch ¶

Watch implements grpc_health_v1.HealthCheck.

type Interface ¶

type Interface interface {
	services.Service

	logproto.IngesterServer
	logproto.PusherServer
	logproto.QuerierServer
	CheckReady(ctx context.Context) error
	FlushHandler(w http.ResponseWriter, _ *http.Request)
	ShutdownHandler(w http.ResponseWriter, r *http.Request)
	GetOrCreateInstance(instanceID string) *instance
}

Interface is an interface for the Ingester

type Limiter ¶

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of streams an ingester can handle for a specific tenant

func NewLimiter ¶

func NewLimiter(limits *validation.Overrides, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter

NewLimiter makes a new limiter

func (*Limiter) AssertMaxStreamsPerUser ¶

func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error

AssertMaxStreamsPerUser ensures limit has not been reached compared to the current number of streams in input and returns an error if so.

func (*Limiter) DisableForWALReplay ¶

func (l *Limiter) DisableForWALReplay()

func (*Limiter) Enable ¶

func (l *Limiter) Enable()

func (*Limiter) RateLimit ¶

func (l *Limiter) RateLimit(tenant string) validation.RateLimit

func (*Limiter) UnorderedWrites ¶

func (l *Limiter) UnorderedWrites(userID string) bool

type NoopWALReader ¶

type NoopWALReader struct{}

func (NoopWALReader) Close ¶

func (NoopWALReader) Close() error

func (NoopWALReader) Err ¶

func (NoopWALReader) Err() error

func (NoopWALReader) Next ¶

func (NoopWALReader) Next() bool

func (NoopWALReader) Record ¶

func (NoopWALReader) Record() []byte

type OnceSwitch ¶

type OnceSwitch struct {
	// contains filtered or unexported fields
}

OnceSwitch is an optimized switch that can only ever be switched "on" in a concurrent environment.

func (*OnceSwitch) Get ¶

func (o *OnceSwitch) Get() bool

func (*OnceSwitch) Trigger ¶

func (o *OnceSwitch) Trigger()

func (*OnceSwitch) TriggerAnd ¶

func (o *OnceSwitch) TriggerAnd(fn func())

TriggerAnd will ensure the switch is on and run the provided function if the switch was not already toggled on.

type QuerierQueryServer ¶

type QuerierQueryServer interface {
	Context() context.Context
	Send(res *logproto.QueryResponse) error
}

QuerierQueryServer is the GRPC server stream we use to send batch of entries.

type RateLimiterStrategy ¶

type RateLimiterStrategy interface {
	RateLimit(tenant string) validation.RateLimit
}

type RecordType ¶

type RecordType byte

RecordType represents the type of the WAL/Checkpoint record.

const (

	// WALRecordSeries is the type for the WAL record for series.
	WALRecordSeries RecordType = iota
	// WALRecordEntriesV1 is the type for the WAL record for samples.
	WALRecordEntriesV1
	// CheckpointRecord is the type for the Checkpoint record based on protos.
	CheckpointRecord
	// WALRecordEntriesV2 is the type for the WAL record for samples with an
	// additional counter value for use in replaying without the ordering constraint.
	WALRecordEntriesV2
)
const CurrentEntriesRec RecordType = WALRecordEntriesV2

The current type of Entries that this distribution writes. Loki can read in a backwards compatible manner, but will write the newest variant.

type Recoverer ¶

type Recoverer interface {
	NumWorkers() int
	Series(series *Series) error
	SetStream(userID string, series record.RefSeries) error
	Push(userID string, entries RefEntries) error
	Done() <-chan struct{}
}

type RefEntries ¶

type RefEntries struct {
	Counter int64
	Ref     chunks.HeadSeriesRef
	Entries []logproto.Entry
}

type RingCount ¶

type RingCount interface {
	HealthyInstancesCount() int
}

RingCount is the interface exposed by a ring implementation which allows to count members

type Series ¶

type Series struct {
	UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"`
	// post mapped fingerprint is necessary because subsequent wal writes will reference it.
	Fingerprint uint64                                              `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
	Labels      []github_com_grafana_loki_pkg_logproto.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/grafana/loki/pkg/logproto.LabelAdapter" json:"labels"`
	Chunks      []Chunk                                             `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"`
	// most recently pushed timestamp.
	To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"`
	// most recently pushed line.
	LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"`
	// highest counter value for pushes to this stream.
	// Used to skip already applied entries during WAL replay.
	EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"`
	// highest timestamp pushed to this stream.
	HighestTs time.Time `protobuf:"bytes,8,opt,name=highestTs,proto3,stdtime" json:"highestTs"`
}

Series is a {de,}serializable intermediate type for Series.

func (*Series) Descriptor ¶

func (*Series) Descriptor() ([]byte, []int)

func (*Series) Equal ¶

func (this *Series) Equal(that interface{}) bool

func (*Series) GetChunks ¶

func (m *Series) GetChunks() []Chunk

func (*Series) GetEntryCt ¶

func (m *Series) GetEntryCt() int64

func (*Series) GetFingerprint ¶

func (m *Series) GetFingerprint() uint64

func (*Series) GetHighestTs ¶

func (m *Series) GetHighestTs() time.Time

func (*Series) GetLastLine ¶

func (m *Series) GetLastLine() string

func (*Series) GetTo ¶

func (m *Series) GetTo() time.Time

func (*Series) GetUserID ¶

func (m *Series) GetUserID() string

func (*Series) GoString ¶

func (this *Series) GoString() string

func (*Series) Marshal ¶

func (m *Series) Marshal() (dAtA []byte, err error)

func (*Series) MarshalTo ¶

func (m *Series) MarshalTo(dAtA []byte) (int, error)

func (*Series) MarshalToSizedBuffer ¶

func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Series) ProtoMessage ¶

func (*Series) ProtoMessage()

func (*Series) Reset ¶

func (m *Series) Reset()

func (*Series) Size ¶

func (m *Series) Size() (n int)

func (*Series) String ¶

func (this *Series) String() string

func (*Series) Unmarshal ¶

func (m *Series) Unmarshal(dAtA []byte) error

func (*Series) XXX_DiscardUnknown ¶

func (m *Series) XXX_DiscardUnknown()

func (*Series) XXX_Marshal ¶

func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Series) XXX_Merge ¶

func (m *Series) XXX_Merge(src proto.Message)

func (*Series) XXX_Size ¶

func (m *Series) XXX_Size() int

func (*Series) XXX_Unmarshal ¶

func (m *Series) XXX_Unmarshal(b []byte) error

type SeriesIter ¶

type SeriesIter interface {
	Count() int
	Iter() *streamIterator
	Stop()
}

type SeriesWithErr ¶

type SeriesWithErr struct {
	Err    error
	Series *Series
}

type StreamRateLimiter ¶

type StreamRateLimiter struct {
	// contains filtered or unexported fields
}

func NewStreamRateLimiter ¶

func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter

func (*StreamRateLimiter) AllowN ¶

func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool

type TailServer ¶

type TailServer interface {
	Send(*logproto.TailResponse) error
	Context() context.Context
}

type WAL ¶

type WAL interface {
	Start()
	// Log marshalls the records and writes it into the WAL.
	Log(*WALRecord) error
	// Stop stops all the WAL operations.
	Stop() error
}

WAL interface allows us to have a no-op WAL when the WAL is disabled.

type WALCheckpointWriter ¶

type WALCheckpointWriter struct {
	// contains filtered or unexported fields
}

func (*WALCheckpointWriter) Advance ¶

func (w *WALCheckpointWriter) Advance() (bool, error)

func (*WALCheckpointWriter) Close ¶

func (w *WALCheckpointWriter) Close(abort bool) error

func (*WALCheckpointWriter) Write ¶

func (w *WALCheckpointWriter) Write(s *Series) error

type WALConfig ¶

type WALConfig struct {
	Enabled             bool             `yaml:"enabled"`
	Dir                 string           `yaml:"dir"`
	CheckpointDuration  time.Duration    `yaml:"checkpoint_duration"`
	FlushOnShutdown     bool             `yaml:"flush_on_shutdown"`
	ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"`
}

func (*WALConfig) RegisterFlags ¶

func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*WALConfig) Validate ¶

func (cfg *WALConfig) Validate() error

type WALReader ¶

type WALReader interface {
	Next() bool
	Err() error
	// Record should not be used across multiple calls to Next()
	Record() []byte
}

type WALRecord ¶

type WALRecord struct {
	UserID string
	Series []record.RefSeries

	RefEntries []RefEntries
	// contains filtered or unexported fields
}

WALRecord is a struct combining the series and samples record.

func (*WALRecord) AddEntries ¶

func (r *WALRecord) AddEntries(fp uint64, counter int64, entries ...logproto.Entry)

func (*WALRecord) IsEmpty ¶

func (r *WALRecord) IsEmpty() bool

func (*WALRecord) Reset ¶

func (r *WALRecord) Reset()

type Wrapper ¶

type Wrapper interface {
	Wrap(wrapped Interface) Interface
}

Directories ¶

Path Synopsis
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL