Documentation ¶
Index ¶
- Constants
- Variables
- func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error
- func RecoverWAL(reader WALReader, recoverer Recoverer) error
- type CheckpointWriter
- type Checkpointer
- type Chunk
- func (*Chunk) Descriptor() ([]byte, []int)
- func (this *Chunk) Equal(that interface{}) bool
- func (m *Chunk) GetClosed() bool
- func (m *Chunk) GetData() []byte
- func (m *Chunk) GetFlushedAt() time.Time
- func (m *Chunk) GetFrom() time.Time
- func (m *Chunk) GetHead() []byte
- func (m *Chunk) GetLastUpdated() time.Time
- func (m *Chunk) GetSynced() bool
- func (m *Chunk) GetTo() time.Time
- func (this *Chunk) GoString() string
- func (m *Chunk) Marshal() (dAtA []byte, err error)
- func (m *Chunk) MarshalTo(dAtA []byte) (int, error)
- func (m *Chunk) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Chunk) ProtoMessage()
- func (m *Chunk) Reset()
- func (m *Chunk) Size() (n int)
- func (this *Chunk) String() string
- func (m *Chunk) Unmarshal(dAtA []byte) error
- func (m *Chunk) XXX_DiscardUnknown()
- func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Chunk) XXX_Merge(src proto.Message)
- func (m *Chunk) XXX_Size() int
- func (m *Chunk) XXX_Unmarshal(b []byte) error
- type ChunkStore
- type Config
- type Flusher
- type Ingester
- func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error)
- func (i *Ingester) CheckReady(ctx context.Context) error
- func (i *Ingester) Flush()
- func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)
- func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error)
- func (i *Ingester) GetOrCreateInstance(instanceID string) *instance
- func (i *Ingester) InitFlushQueues()
- func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error)
- func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
- func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error
- func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, ...) error
- func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error)
- func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer)
- func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error
- func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error)
- func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error
- func (i *Ingester) TransferOut(ctx context.Context) error
- func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error
- type Interface
- type Limiter
- type NoopWALReader
- type OnceSwitch
- type QuerierQueryServer
- type RateLimiterStrategy
- type RecordType
- type Recoverer
- type RefEntries
- type RingCount
- type Series
- func (*Series) Descriptor() ([]byte, []int)
- func (this *Series) Equal(that interface{}) bool
- func (m *Series) GetChunks() []Chunk
- func (m *Series) GetEntryCt() int64
- func (m *Series) GetFingerprint() uint64
- func (m *Series) GetHighestTs() time.Time
- func (m *Series) GetLastLine() string
- func (m *Series) GetTo() time.Time
- func (m *Series) GetUserID() string
- func (this *Series) GoString() string
- func (m *Series) Marshal() (dAtA []byte, err error)
- func (m *Series) MarshalTo(dAtA []byte) (int, error)
- func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Series) ProtoMessage()
- func (m *Series) Reset()
- func (m *Series) Size() (n int)
- func (this *Series) String() string
- func (m *Series) Unmarshal(dAtA []byte) error
- func (m *Series) XXX_DiscardUnknown()
- func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Series) XXX_Merge(src proto.Message)
- func (m *Series) XXX_Size() int
- func (m *Series) XXX_Unmarshal(b []byte) error
- type SeriesIter
- type SeriesWithErr
- type StreamRateLimiter
- type TailServer
- type WAL
- type WALCheckpointWriter
- type WALConfig
- type WALReader
- type WALRecord
- type Wrapper
Constants ¶
const (
// RingKey is the key under which we store the ingesters ring in the KVStore.
RingKey = "ring"
)
Variables ¶
var ( ErrInvalidLengthCheckpoint = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowCheckpoint = fmt.Errorf("proto: integer overflow") )
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
var (
ErrReadOnly = errors.New("Ingester is shutting down")
)
ErrReadOnly is returned when the ingester is shutting down and a push was attempted.
Functions ¶
func RecoverCheckpoint ¶
func RecoverWAL ¶
Types ¶
type CheckpointWriter ¶
type Checkpointer ¶
type Checkpointer struct {
// contains filtered or unexported fields
}
func NewCheckpointer ¶
func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, metrics *ingesterMetrics, quit <-chan struct{}) *Checkpointer
func (*Checkpointer) PerformCheckpoint ¶
func (c *Checkpointer) PerformCheckpoint() (err error)
func (*Checkpointer) Run ¶
func (c *Checkpointer) Run()
type Chunk ¶
type Chunk struct { From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"` To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"` FlushedAt time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"` LastUpdated time.Time `protobuf:"bytes,4,opt,name=lastUpdated,proto3,stdtime" json:"lastUpdated"` Closed bool `protobuf:"varint,5,opt,name=closed,proto3" json:"closed,omitempty"` Synced bool `protobuf:"varint,6,opt,name=synced,proto3" json:"synced,omitempty"` // data to be unmarshaled into a MemChunk Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"` // data to be unmarshaled into a MemChunk's headBlock Head []byte `protobuf:"bytes,8,opt,name=head,proto3" json:"head,omitempty"` }
Chunk is a {de,}serializable intermediate type for chunkDesc which allows efficient loading/unloading to disk during WAL checkpoint recovery.
func (*Chunk) Descriptor ¶
func (*Chunk) GetFlushedAt ¶
func (*Chunk) GetLastUpdated ¶
func (*Chunk) ProtoMessage ¶
func (*Chunk) ProtoMessage()
func (*Chunk) XXX_DiscardUnknown ¶
func (m *Chunk) XXX_DiscardUnknown()
func (*Chunk) XXX_Marshal ¶
func (*Chunk) XXX_Unmarshal ¶
type ChunkStore ¶
type ChunkStore interface { Put(ctx context.Context, chunks []chunk.Chunk) error SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) GetSchemaConfigs() []chunk.PeriodConfig }
ChunkStore is the interface we need to store chunks.
type Config ¶
type Config struct { LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` // Config for transferring chunks. MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"` ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` BlockSize int `yaml:"chunk_block_size"` TargetChunkSize int `yaml:"chunk_target_size"` ChunkEncoding string `yaml:"chunk_encoding"` MaxChunkAge time.Duration `yaml:"max_chunk_age"` AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` // Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments. SyncPeriod time.Duration `yaml:"sync_period"` SyncMinUtilization float64 `yaml:"sync_min_utilization"` MaxReturnedErrors int `yaml:"max_returned_stream_errors"` QueryStore bool `yaml:"-"` QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"` WAL WALConfig `yaml:"wal,omitempty"` ChunkFilterer storage.RequestChunkFilterer `yaml:"-"` // Optional wrapper that can be used to modify the behaviour of the ingester Wrapper Wrapper `yaml:"-"` IndexShards int `yaml:"index_shards"` MaxDroppedStreams int `yaml:"max_dropped_streams"` // contains filtered or unexported fields }
Config for an ingester.
func (*Config) RegisterFlags ¶
RegisterFlags registers the flags.
type Ingester ¶
Ingester builds chunks for incoming log streams.
func New ¶
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error)
New makes a new Ingester.
func (*Ingester) Check ¶
func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error)
Check implements grpc_health_v1.HealthCheck.
func (*Ingester) CheckReady ¶
ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.
func (*Ingester) Flush ¶
func (i *Ingester) Flush()
Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.
func (*Ingester) FlushHandler ¶
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)
FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.
func (*Ingester) GetChunkIDs ¶
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error)
GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.
func (*Ingester) GetOrCreateInstance ¶
func (*Ingester) InitFlushQueues ¶
func (i *Ingester) InitFlushQueues()
Note: this is called both during the WAL replay (zero or more times) and then after replay as well.
func (*Ingester) Label ¶
func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error)
Label returns the set of labels for the stream this ingester knows about.
func (*Ingester) Push ¶
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
Push implements logproto.Pusher.
func (*Ingester) Query ¶
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error
Query the ingests for log streams matching a set of matchers.
func (*Ingester) QuerySample ¶
func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error
QuerySample the ingesters for series from logs matching a set of matchers.
func (*Ingester) Series ¶
func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error)
Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
func (*Ingester) SetChunkFilterer ¶
func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer)
func (*Ingester) ShutdownHandler ¶
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownHandler triggers the following set of operations in order:
- Change the state of ring to stop accepting writes.
- Flush all the chunks.
func (*Ingester) Tail ¶
func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error
Tail logs matching given query
func (*Ingester) TailersCount ¶
func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error)
TailersCount returns count of active tail requests from a user
func (*Ingester) TransferChunks ¶
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error
TransferChunks receives all chunks from another ingester. The Ingester must be in PENDING state or else the call will fail.
func (*Ingester) TransferOut ¶
TransferOut implements ring.Lifecycler.
func (*Ingester) Watch ¶
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error
Watch implements grpc_health_v1.HealthCheck.
type Interface ¶
type Interface interface { services.Service logproto.IngesterServer logproto.PusherServer logproto.QuerierServer CheckReady(ctx context.Context) error FlushHandler(w http.ResponseWriter, _ *http.Request) ShutdownHandler(w http.ResponseWriter, r *http.Request) GetOrCreateInstance(instanceID string) *instance }
Interface is an interface for the Ingester
type Limiter ¶
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements primitives to get the maximum number of streams an ingester can handle for a specific tenant
func NewLimiter ¶
func NewLimiter(limits *validation.Overrides, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter
NewLimiter makes a new limiter
func (*Limiter) AssertMaxStreamsPerUser ¶
AssertMaxStreamsPerUser ensures limit has not been reached compared to the current number of streams in input and returns an error if so.
func (*Limiter) DisableForWALReplay ¶
func (l *Limiter) DisableForWALReplay()
func (*Limiter) UnorderedWrites ¶
type NoopWALReader ¶
type NoopWALReader struct{}
func (NoopWALReader) Close ¶
func (NoopWALReader) Close() error
func (NoopWALReader) Err ¶
func (NoopWALReader) Err() error
func (NoopWALReader) Next ¶
func (NoopWALReader) Next() bool
func (NoopWALReader) Record ¶
func (NoopWALReader) Record() []byte
type OnceSwitch ¶
type OnceSwitch struct {
// contains filtered or unexported fields
}
OnceSwitch is an optimized switch that can only ever be switched "on" in a concurrent environment.
func (*OnceSwitch) Get ¶
func (o *OnceSwitch) Get() bool
func (*OnceSwitch) Trigger ¶
func (o *OnceSwitch) Trigger()
func (*OnceSwitch) TriggerAnd ¶
func (o *OnceSwitch) TriggerAnd(fn func())
TriggerAnd will ensure the switch is on and run the provided function if the switch was not already toggled on.
type QuerierQueryServer ¶
type QuerierQueryServer interface { Context() context.Context Send(res *logproto.QueryResponse) error }
QuerierQueryServer is the GRPC server stream we use to send batch of entries.
type RateLimiterStrategy ¶
type RateLimiterStrategy interface {
RateLimit(tenant string) validation.RateLimit
}
type RecordType ¶
type RecordType byte
RecordType represents the type of the WAL/Checkpoint record.
const ( // WALRecordSeries is the type for the WAL record for series. WALRecordSeries RecordType = iota // WALRecordEntriesV1 is the type for the WAL record for samples. WALRecordEntriesV1 // CheckpointRecord is the type for the Checkpoint record based on protos. CheckpointRecord // WALRecordEntriesV2 is the type for the WAL record for samples with an // additional counter value for use in replaying without the ordering constraint. WALRecordEntriesV2 )
const CurrentEntriesRec RecordType = WALRecordEntriesV2
The current type of Entries that this distribution writes. Loki can read in a backwards compatible manner, but will write the newest variant.
type RefEntries ¶
type RefEntries struct { Counter int64 Ref chunks.HeadSeriesRef Entries []logproto.Entry }
type RingCount ¶
type RingCount interface {
HealthyInstancesCount() int
}
RingCount is the interface exposed by a ring implementation which allows to count members
type Series ¶
type Series struct { UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` // post mapped fingerprint is necessary because subsequent wal writes will reference it. Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Labels []github_com_grafana_loki_pkg_logproto.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/pao214/loki/v2/pkg/logproto.LabelAdapter" json:"labels"` Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"` // most recently pushed timestamp. To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"` // most recently pushed line. LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"` // highest counter value for pushes to this stream. // Used to skip already applied entries during WAL replay. EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"` // highest timestamp pushed to this stream. HighestTs time.Time `protobuf:"bytes,8,opt,name=highestTs,proto3,stdtime" json:"highestTs"` }
Series is a {de,}serializable intermediate type for Series.
func (*Series) Descriptor ¶
func (*Series) GetEntryCt ¶
func (*Series) GetFingerprint ¶
func (*Series) GetHighestTs ¶
func (*Series) GetLastLine ¶
func (*Series) MarshalToSizedBuffer ¶
func (*Series) ProtoMessage ¶
func (*Series) ProtoMessage()
func (*Series) XXX_DiscardUnknown ¶
func (m *Series) XXX_DiscardUnknown()
func (*Series) XXX_Marshal ¶
func (*Series) XXX_Unmarshal ¶
type SeriesIter ¶
type SeriesIter interface { Count() int Iter() *streamIterator Stop() }
type SeriesWithErr ¶
type StreamRateLimiter ¶
type StreamRateLimiter struct {
// contains filtered or unexported fields
}
func NewStreamRateLimiter ¶
func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter
type TailServer ¶
type TailServer interface { Send(*logproto.TailResponse) error Context() context.Context }
type WAL ¶
type WAL interface { Start() // Log marshalls the records and writes it into the WAL. Log(*WALRecord) error // Stop stops all the WAL operations. Stop() error }
WAL interface allows us to have a no-op WAL when the WAL is disabled.
type WALCheckpointWriter ¶
type WALCheckpointWriter struct {
// contains filtered or unexported fields
}
func (*WALCheckpointWriter) Advance ¶
func (w *WALCheckpointWriter) Advance() (bool, error)
func (*WALCheckpointWriter) Close ¶
func (w *WALCheckpointWriter) Close(abort bool) error
func (*WALCheckpointWriter) Write ¶
func (w *WALCheckpointWriter) Write(s *Series) error
type WALConfig ¶
type WALConfig struct { Enabled bool `yaml:"enabled"` Dir string `yaml:"dir"` CheckpointDuration time.Duration `yaml:"checkpoint_duration"` FlushOnShutdown bool `yaml:"flush_on_shutdown"` ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"` }
func (*WALConfig) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet
type WALRecord ¶
type WALRecord struct { UserID string Series []record.RefSeries RefEntries []RefEntries // contains filtered or unexported fields }
WALRecord is a struct combining the series and samples record.
func (*WALRecord) AddEntries ¶
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
|
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries. |