ingester

package
v2.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2022 License: AGPL-3.0 Imports: 71 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RingKey is the key under which we store the ingesters ring in the KVStore.
	RingKey = "ring"
)

Variables

View Source
var (
	ErrInvalidLengthCheckpoint = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowCheckpoint   = fmt.Errorf("proto: integer overflow")
)
View Source
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
View Source
var (
	ErrReadOnly = errors.New("Ingester is shutting down")
)

ErrReadOnly is returned when the ingester is shutting down and a push was attempted.

Functions

func RecoverCheckpoint

func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error

func RecoverWAL

func RecoverWAL(reader WALReader, recoverer Recoverer) error

Types

type CheckpointWriter

type CheckpointWriter interface {
	// Advances current checkpoint, can also signal a no-op.
	Advance() (noop bool, err error)
	Write(*Series) error
	// Closes current checkpoint.
	Close(abort bool) error
}

type Checkpointer

type Checkpointer struct {
	// contains filtered or unexported fields
}

func NewCheckpointer

func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, metrics *ingesterMetrics, quit <-chan struct{}) *Checkpointer

func (*Checkpointer) PerformCheckpoint

func (c *Checkpointer) PerformCheckpoint() (err error)

func (*Checkpointer) Run

func (c *Checkpointer) Run()

type Chunk

type Chunk struct {
	From        time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"`
	To          time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"`
	FlushedAt   time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"`
	LastUpdated time.Time `protobuf:"bytes,4,opt,name=lastUpdated,proto3,stdtime" json:"lastUpdated"`
	Closed      bool      `protobuf:"varint,5,opt,name=closed,proto3" json:"closed,omitempty"`
	Synced      bool      `protobuf:"varint,6,opt,name=synced,proto3" json:"synced,omitempty"`
	// data to be unmarshaled into a MemChunk
	Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"`
	// data to be unmarshaled into a MemChunk's headBlock
	Head []byte `protobuf:"bytes,8,opt,name=head,proto3" json:"head,omitempty"`
}

Chunk is a {de,}serializable intermediate type for chunkDesc which allows efficient loading/unloading to disk during WAL checkpoint recovery.

func (*Chunk) Descriptor

func (*Chunk) Descriptor() ([]byte, []int)

func (*Chunk) Equal

func (this *Chunk) Equal(that interface{}) bool

func (*Chunk) GetClosed

func (m *Chunk) GetClosed() bool

func (*Chunk) GetData

func (m *Chunk) GetData() []byte

func (*Chunk) GetFlushedAt

func (m *Chunk) GetFlushedAt() time.Time

func (*Chunk) GetFrom

func (m *Chunk) GetFrom() time.Time

func (*Chunk) GetHead

func (m *Chunk) GetHead() []byte

func (*Chunk) GetLastUpdated

func (m *Chunk) GetLastUpdated() time.Time

func (*Chunk) GetSynced

func (m *Chunk) GetSynced() bool

func (*Chunk) GetTo

func (m *Chunk) GetTo() time.Time

func (*Chunk) GoString

func (this *Chunk) GoString() string

func (*Chunk) Marshal

func (m *Chunk) Marshal() (dAtA []byte, err error)

func (*Chunk) MarshalTo

func (m *Chunk) MarshalTo(dAtA []byte) (int, error)

func (*Chunk) MarshalToSizedBuffer

func (m *Chunk) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Chunk) ProtoMessage

func (*Chunk) ProtoMessage()

func (*Chunk) Reset

func (m *Chunk) Reset()

func (*Chunk) Size

func (m *Chunk) Size() (n int)

func (*Chunk) String

func (this *Chunk) String() string

func (*Chunk) Unmarshal

func (m *Chunk) Unmarshal(dAtA []byte) error

func (*Chunk) XXX_DiscardUnknown

func (m *Chunk) XXX_DiscardUnknown()

func (*Chunk) XXX_Marshal

func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Chunk) XXX_Merge

func (m *Chunk) XXX_Merge(src proto.Message)

func (*Chunk) XXX_Size

func (m *Chunk) XXX_Size() int

func (*Chunk) XXX_Unmarshal

func (m *Chunk) XXX_Unmarshal(b []byte) error

type ChunkStore

type ChunkStore interface {
	Put(ctx context.Context, chunks []chunk.Chunk) error
	SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
	SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error)
	GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error)
	GetSchemaConfigs() []chunk.PeriodConfig
}

ChunkStore is the interface we need to store chunks.

type Config

type Config struct {
	LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

	// Config for transferring chunks.
	MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`

	ConcurrentFlushes int           `yaml:"concurrent_flushes"`
	FlushCheckPeriod  time.Duration `yaml:"flush_check_period"`
	FlushOpTimeout    time.Duration `yaml:"flush_op_timeout"`
	RetainPeriod      time.Duration `yaml:"chunk_retain_period"`
	MaxChunkIdle      time.Duration `yaml:"chunk_idle_period"`
	BlockSize         int           `yaml:"chunk_block_size"`
	TargetChunkSize   int           `yaml:"chunk_target_size"`
	ChunkEncoding     string        `yaml:"chunk_encoding"`

	MaxChunkAge         time.Duration `yaml:"max_chunk_age"`
	AutoForgetUnhealthy bool          `yaml:"autoforget_unhealthy"`

	// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
	SyncPeriod         time.Duration `yaml:"sync_period"`
	SyncMinUtilization float64       `yaml:"sync_min_utilization"`

	MaxReturnedErrors int `yaml:"max_returned_stream_errors"`

	QueryStore                  bool          `yaml:"-"`
	QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`

	WAL WALConfig `yaml:"wal,omitempty"`

	ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`
	// Optional wrapper that can be used to modify the behaviour of the ingester
	Wrapper Wrapper `yaml:"-"`

	IndexShards int `yaml:"index_shards"`

	MaxDroppedStreams int `yaml:"max_dropped_streams"`
	// contains filtered or unexported fields
}

Config for an ingester.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

RegisterFlags registers the flags.

func (*Config) Validate

func (cfg *Config) Validate() error

type Flusher

type Flusher interface {
	Flush()
}

type Ingester

type Ingester struct {
	services.Service
	// contains filtered or unexported fields
}

Ingester builds chunks for incoming log streams.

func New

func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error)

New makes a new Ingester.

func (*Ingester) Check

Check implements grpc_health_v1.HealthCheck.

func (*Ingester) CheckReady

func (i *Ingester) CheckReady(ctx context.Context) error

ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.

func (*Ingester) Flush

func (i *Ingester) Flush()

Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.

func (*Ingester) FlushHandler

func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)

FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.

func (*Ingester) GetChunkIDs

GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.

func (*Ingester) GetOrCreateInstance

func (i *Ingester) GetOrCreateInstance(instanceID string) *instance

func (*Ingester) InitFlushQueues

func (i *Ingester) InitFlushQueues()

Note: this is called both during the WAL replay (zero or more times) and then after replay as well.

func (*Ingester) Label

Label returns the set of labels for the stream this ingester knows about.

func (*Ingester) Push

Push implements logproto.Pusher.

func (*Ingester) Query

func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error

Query the ingests for log streams matching a set of matchers.

func (*Ingester) QuerySample

func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error

QuerySample the ingesters for series from logs matching a set of matchers.

func (*Ingester) Series

Series queries the ingester for log stream identifiers (label sets) matching a set of matchers

func (*Ingester) SetChunkFilterer

func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer)

func (*Ingester) ShutdownHandler

func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)

ShutdownHandler triggers the following set of operations in order:

  • Change the state of ring to stop accepting writes.
  • Flush all the chunks.

func (*Ingester) Tail

func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error

Tail logs matching given query

func (*Ingester) TailersCount

TailersCount returns count of active tail requests from a user

func (*Ingester) TransferChunks

func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error

TransferChunks receives all chunks from another ingester. The Ingester must be in PENDING state or else the call will fail.

func (*Ingester) TransferOut

func (i *Ingester) TransferOut(ctx context.Context) error

TransferOut implements ring.Lifecycler.

func (*Ingester) Watch

Watch implements grpc_health_v1.HealthCheck.

type Interface

type Interface interface {
	services.Service

	logproto.IngesterServer
	logproto.PusherServer
	logproto.QuerierServer
	CheckReady(ctx context.Context) error
	FlushHandler(w http.ResponseWriter, _ *http.Request)
	ShutdownHandler(w http.ResponseWriter, r *http.Request)
	GetOrCreateInstance(instanceID string) *instance
}

Interface is an interface for the Ingester

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter implements primitives to get the maximum number of streams an ingester can handle for a specific tenant

func NewLimiter

func NewLimiter(limits *validation.Overrides, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter

NewLimiter makes a new limiter

func (*Limiter) AssertMaxStreamsPerUser

func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error

AssertMaxStreamsPerUser ensures limit has not been reached compared to the current number of streams in input and returns an error if so.

func (*Limiter) DisableForWALReplay

func (l *Limiter) DisableForWALReplay()

func (*Limiter) Enable

func (l *Limiter) Enable()

func (*Limiter) RateLimit

func (l *Limiter) RateLimit(tenant string) validation.RateLimit

func (*Limiter) UnorderedWrites

func (l *Limiter) UnorderedWrites(userID string) bool

type NoopWALReader

type NoopWALReader struct{}

func (NoopWALReader) Close

func (NoopWALReader) Close() error

func (NoopWALReader) Err

func (NoopWALReader) Err() error

func (NoopWALReader) Next

func (NoopWALReader) Next() bool

func (NoopWALReader) Record

func (NoopWALReader) Record() []byte

type OnceSwitch

type OnceSwitch struct {
	// contains filtered or unexported fields
}

OnceSwitch is an optimized switch that can only ever be switched "on" in a concurrent environment.

func (*OnceSwitch) Get

func (o *OnceSwitch) Get() bool

func (*OnceSwitch) Trigger

func (o *OnceSwitch) Trigger()

func (*OnceSwitch) TriggerAnd

func (o *OnceSwitch) TriggerAnd(fn func())

TriggerAnd will ensure the switch is on and run the provided function if the switch was not already toggled on.

type QuerierQueryServer

type QuerierQueryServer interface {
	Context() context.Context
	Send(res *logproto.QueryResponse) error
}

QuerierQueryServer is the GRPC server stream we use to send batch of entries.

type RateLimiterStrategy

type RateLimiterStrategy interface {
	RateLimit(tenant string) validation.RateLimit
}

type RecordType

type RecordType byte

RecordType represents the type of the WAL/Checkpoint record.

const (

	// WALRecordSeries is the type for the WAL record for series.
	WALRecordSeries RecordType = iota
	// WALRecordEntriesV1 is the type for the WAL record for samples.
	WALRecordEntriesV1
	// CheckpointRecord is the type for the Checkpoint record based on protos.
	CheckpointRecord
	// WALRecordEntriesV2 is the type for the WAL record for samples with an
	// additional counter value for use in replaying without the ordering constraint.
	WALRecordEntriesV2
)
const CurrentEntriesRec RecordType = WALRecordEntriesV2

The current type of Entries that this distribution writes. Loki can read in a backwards compatible manner, but will write the newest variant.

type Recoverer

type Recoverer interface {
	NumWorkers() int
	Series(series *Series) error
	SetStream(userID string, series record.RefSeries) error
	Push(userID string, entries RefEntries) error
	Done() <-chan struct{}
}

type RefEntries

type RefEntries struct {
	Counter int64
	Ref     chunks.HeadSeriesRef
	Entries []logproto.Entry
}

type RingCount

type RingCount interface {
	HealthyInstancesCount() int
}

RingCount is the interface exposed by a ring implementation which allows to count members

type Series

type Series struct {
	UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"`
	// post mapped fingerprint is necessary because subsequent wal writes will reference it.
	Fingerprint uint64                                              `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
	Labels      []github_com_grafana_loki_pkg_logproto.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/pao214/loki/v2/pkg/logproto.LabelAdapter" json:"labels"`
	Chunks      []Chunk                                             `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"`
	// most recently pushed timestamp.
	To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"`
	// most recently pushed line.
	LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"`
	// highest counter value for pushes to this stream.
	// Used to skip already applied entries during WAL replay.
	EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"`
	// highest timestamp pushed to this stream.
	HighestTs time.Time `protobuf:"bytes,8,opt,name=highestTs,proto3,stdtime" json:"highestTs"`
}

Series is a {de,}serializable intermediate type for Series.

func (*Series) Descriptor

func (*Series) Descriptor() ([]byte, []int)

func (*Series) Equal

func (this *Series) Equal(that interface{}) bool

func (*Series) GetChunks

func (m *Series) GetChunks() []Chunk

func (*Series) GetEntryCt

func (m *Series) GetEntryCt() int64

func (*Series) GetFingerprint

func (m *Series) GetFingerprint() uint64

func (*Series) GetHighestTs

func (m *Series) GetHighestTs() time.Time

func (*Series) GetLastLine

func (m *Series) GetLastLine() string

func (*Series) GetTo

func (m *Series) GetTo() time.Time

func (*Series) GetUserID

func (m *Series) GetUserID() string

func (*Series) GoString

func (this *Series) GoString() string

func (*Series) Marshal

func (m *Series) Marshal() (dAtA []byte, err error)

func (*Series) MarshalTo

func (m *Series) MarshalTo(dAtA []byte) (int, error)

func (*Series) MarshalToSizedBuffer

func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Series) ProtoMessage

func (*Series) ProtoMessage()

func (*Series) Reset

func (m *Series) Reset()

func (*Series) Size

func (m *Series) Size() (n int)

func (*Series) String

func (this *Series) String() string

func (*Series) Unmarshal

func (m *Series) Unmarshal(dAtA []byte) error

func (*Series) XXX_DiscardUnknown

func (m *Series) XXX_DiscardUnknown()

func (*Series) XXX_Marshal

func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Series) XXX_Merge

func (m *Series) XXX_Merge(src proto.Message)

func (*Series) XXX_Size

func (m *Series) XXX_Size() int

func (*Series) XXX_Unmarshal

func (m *Series) XXX_Unmarshal(b []byte) error

type SeriesIter

type SeriesIter interface {
	Count() int
	Iter() *streamIterator
	Stop()
}

type SeriesWithErr

type SeriesWithErr struct {
	Err    error
	Series *Series
}

type StreamRateLimiter

type StreamRateLimiter struct {
	// contains filtered or unexported fields
}

func NewStreamRateLimiter

func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter

func (*StreamRateLimiter) AllowN

func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool

type TailServer

type TailServer interface {
	Send(*logproto.TailResponse) error
	Context() context.Context
}

type WAL

type WAL interface {
	Start()
	// Log marshalls the records and writes it into the WAL.
	Log(*WALRecord) error
	// Stop stops all the WAL operations.
	Stop() error
}

WAL interface allows us to have a no-op WAL when the WAL is disabled.

type WALCheckpointWriter

type WALCheckpointWriter struct {
	// contains filtered or unexported fields
}

func (*WALCheckpointWriter) Advance

func (w *WALCheckpointWriter) Advance() (bool, error)

func (*WALCheckpointWriter) Close

func (w *WALCheckpointWriter) Close(abort bool) error

func (*WALCheckpointWriter) Write

func (w *WALCheckpointWriter) Write(s *Series) error

type WALConfig

type WALConfig struct {
	Enabled             bool             `yaml:"enabled"`
	Dir                 string           `yaml:"dir"`
	CheckpointDuration  time.Duration    `yaml:"checkpoint_duration"`
	FlushOnShutdown     bool             `yaml:"flush_on_shutdown"`
	ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"`
}

func (*WALConfig) RegisterFlags

func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*WALConfig) Validate

func (cfg *WALConfig) Validate() error

type WALReader

type WALReader interface {
	Next() bool
	Err() error
	// Record should not be used across multiple calls to Next()
	Record() []byte
}

type WALRecord

type WALRecord struct {
	UserID string
	Series []record.RefSeries

	RefEntries []RefEntries
	// contains filtered or unexported fields
}

WALRecord is a struct combining the series and samples record.

func (*WALRecord) AddEntries

func (r *WALRecord) AddEntries(fp uint64, counter int64, entries ...logproto.Entry)

func (*WALRecord) IsEmpty

func (r *WALRecord) IsEmpty() bool

func (*WALRecord) Reset

func (r *WALRecord) Reset()

type Wrapper

type Wrapper interface {
	Wrap(wrapped Interface) Interface
}

Directories

Path Synopsis
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL