Documentation ¶
Index ¶
- Constants
- Variables
- func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error
- func RecoverWAL(ctx context.Context, reader WALReader, recoverer Recoverer) error
- type CheckpointWriter
- type Checkpointer
- type Chunk
- func (*Chunk) Descriptor() ([]byte, []int)
- func (this *Chunk) Equal(that interface{}) bool
- func (m *Chunk) GetClosed() bool
- func (m *Chunk) GetData() []byte
- func (m *Chunk) GetFlushedAt() time.Time
- func (m *Chunk) GetFrom() time.Time
- func (m *Chunk) GetHead() []byte
- func (m *Chunk) GetLastUpdated() time.Time
- func (m *Chunk) GetSynced() bool
- func (m *Chunk) GetTo() time.Time
- func (this *Chunk) GoString() string
- func (m *Chunk) Marshal() (dAtA []byte, err error)
- func (m *Chunk) MarshalTo(dAtA []byte) (int, error)
- func (m *Chunk) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Chunk) ProtoMessage()
- func (m *Chunk) Reset()
- func (m *Chunk) Size() (n int)
- func (this *Chunk) String() string
- func (m *Chunk) Unmarshal(dAtA []byte) error
- func (m *Chunk) XXX_DiscardUnknown()
- func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Chunk) XXX_Merge(src proto.Message)
- func (m *Chunk) XXX_Size() int
- func (m *Chunk) XXX_Unmarshal(b []byte) error
- type Config
- type Flusher
- type Ingester
- func (i *Ingester) CheckReady(ctx context.Context) error
- func (i *Ingester) Flush()
- func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)
- func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error)
- func (i *Ingester) GetDetectedFields(_ context.Context, _ *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error)
- func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error)
- func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error)
- func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error)
- func (i *Ingester) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
- func (i *Ingester) InitFlushQueues()
- func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error)
- func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
- func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error
- func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, ...) error
- func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error)
- func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)
- func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper)
- func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper)
- func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
- func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error
- func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error)
- func (i *Ingester) TransferOut(_ context.Context) error
- func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error
- type Interface
- type Limiter
- type Limits
- type NoopWALReader
- type OnceSwitch
- type QuerierQueryServer
- type RateLimiterStrategy
- type Recoverer
- type RingCount
- type Series
- func (*Series) Descriptor() ([]byte, []int)
- func (this *Series) Equal(that interface{}) bool
- func (m *Series) GetChunks() []Chunk
- func (m *Series) GetEntryCt() int64
- func (m *Series) GetFingerprint() uint64
- func (m *Series) GetHighestTs() time.Time
- func (m *Series) GetLastLine() string
- func (m *Series) GetTo() time.Time
- func (m *Series) GetUserID() string
- func (this *Series) GoString() string
- func (m *Series) Marshal() (dAtA []byte, err error)
- func (m *Series) MarshalTo(dAtA []byte) (int, error)
- func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Series) ProtoMessage()
- func (m *Series) Reset()
- func (m *Series) Size() (n int)
- func (this *Series) String() string
- func (m *Series) Unmarshal(dAtA []byte) error
- func (m *Series) XXX_DiscardUnknown()
- func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Series) XXX_Merge(src proto.Message)
- func (m *Series) XXX_Size() int
- func (m *Series) XXX_Unmarshal(b []byte) error
- type SeriesIter
- type SeriesWithErr
- type Store
- type StreamRateCalculator
- type StreamRateLimiter
- type TailServer
- type WAL
- type WALCheckpointWriter
- type WALConfig
- type WALReader
- type Wrapper
Constants ¶
const ( // ShardLbName is the internal label to be used by Loki when dividing a stream into smaller pieces. // Possible values are only increasing integers starting from 0. ShardLbName = "__stream_shard__" ShardLbPlaceholder = "__placeholder__" )
const (
// RingKey is the key under which we store the ingesters ring in the KVStore.
RingKey = "ring"
)
Variables ¶
var ( ErrInvalidLengthCheckpoint = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowCheckpoint = fmt.Errorf("proto: integer overflow") )
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
var (
ErrReadOnly = errors.New("Ingester is shutting down")
)
ErrReadOnly is returned when the ingester is shutting down and a push was attempted.
Functions ¶
func RecoverCheckpoint ¶
Types ¶
type CheckpointWriter ¶
type Checkpointer ¶
type Checkpointer struct {
// contains filtered or unexported fields
}
func NewCheckpointer ¶
func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, metrics *ingesterMetrics, quit <-chan struct{}) *Checkpointer
func (*Checkpointer) PerformCheckpoint ¶
func (c *Checkpointer) PerformCheckpoint() (err error)
func (*Checkpointer) Run ¶
func (c *Checkpointer) Run()
type Chunk ¶
type Chunk struct { From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"` To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"` FlushedAt time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"` LastUpdated time.Time `protobuf:"bytes,4,opt,name=lastUpdated,proto3,stdtime" json:"lastUpdated"` Closed bool `protobuf:"varint,5,opt,name=closed,proto3" json:"closed,omitempty"` Synced bool `protobuf:"varint,6,opt,name=synced,proto3" json:"synced,omitempty"` // data to be unmarshaled into a MemChunk Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"` // data to be unmarshaled into a MemChunk's headBlock Head []byte `protobuf:"bytes,8,opt,name=head,proto3" json:"head,omitempty"` }
Chunk is a {de,}serializable intermediate type for chunkDesc which allows efficient loading/unloading to disk during WAL checkpoint recovery.
func (*Chunk) Descriptor ¶
func (*Chunk) GetFlushedAt ¶
func (*Chunk) GetLastUpdated ¶
func (*Chunk) ProtoMessage ¶
func (*Chunk) ProtoMessage()
func (*Chunk) XXX_DiscardUnknown ¶
func (m *Chunk) XXX_DiscardUnknown()
func (*Chunk) XXX_Marshal ¶
func (*Chunk) XXX_Unmarshal ¶
type Config ¶
type Config struct { LifecyclerConfig ring.LifecyclerConfig `` /* 145-byte string literal not displayed */ ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` BlockSize int `yaml:"chunk_block_size"` TargetChunkSize int `yaml:"chunk_target_size"` ChunkEncoding string `yaml:"chunk_encoding"` MaxChunkAge time.Duration `yaml:"max_chunk_age"` AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"` // Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments. SyncPeriod time.Duration `yaml:"sync_period"` SyncMinUtilization float64 `yaml:"sync_min_utilization"` MaxReturnedErrors int `yaml:"max_returned_stream_errors"` QueryStore bool `yaml:"-"` QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"` WAL WALConfig `` /* 225-byte string literal not displayed */ ChunkFilterer chunk.RequestChunkFilterer `yaml:"-"` PipelineWrapper lokilog.PipelineWrapper `yaml:"-"` SampleExtractorWrapper lokilog.SampleExtractorWrapper `yaml:"-"` // Optional wrapper that can be used to modify the behaviour of the ingester Wrapper Wrapper `yaml:"-"` IndexShards int `yaml:"index_shards"` MaxDroppedStreams int `yaml:"max_dropped_streams"` ShutdownMarkerPath string `yaml:"shutdown_marker_path"` // contains filtered or unexported fields }
Config for an ingester.
func (*Config) RegisterFlags ¶
RegisterFlags registers the flags.
type Ingester ¶
Ingester builds chunks for incoming log streams.
func New ¶
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger) (*Ingester, error)
New makes a new Ingester.
func (*Ingester) CheckReady ¶ added in v1.5.0
ReadinessHandler is used to indicate to k8s when the ingesters are ready for the addition removal of another ingester. Returns 204 when the ingester is ready, 500 otherwise.
func (*Ingester) Flush ¶
func (i *Ingester) Flush()
Flush implements ring.FlushTransferer Flush triggers a flush of all the chunks and closes the flush queues. Called from the Lifecycler as part of the ingester shutdown.
func (*Ingester) FlushHandler ¶
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request)
FlushHandler triggers a flush of all in memory chunks. Mainly used for local testing.
func (*Ingester) GetChunkIDs ¶
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error)
GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (*Ingester) GetDetectedFields ¶
func (i *Ingester) GetDetectedFields(_ context.Context, _ *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error)
func (*Ingester) GetOrCreateInstance ¶
func (*Ingester) GetStats ¶
func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error)
func (*Ingester) GetStreamRates ¶
func (i *Ingester) GetStreamRates(ctx context.Context, _ *logproto.StreamRatesRequest) (*logproto.StreamRatesResponse, error)
GetStreamRates returns a response containing all streams and their current rate TODO: It might be nice for this to be human readable, eventually: Sort output and return labels, too?
func (*Ingester) GetVolume ¶
func (i *Ingester) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error)
func (*Ingester) InitFlushQueues ¶
func (i *Ingester) InitFlushQueues()
Note: this is called both during the WAL replay (zero or more times) and then after replay as well.
func (*Ingester) Label ¶
func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error)
Label returns the set of labels for the stream this ingester knows about.
func (*Ingester) PrepareShutdown ¶
func (i *Ingester) PrepareShutdown(w http.ResponseWriter, r *http.Request)
PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received. Releasing resources meaning flushing data, deleting tokens, and removing itself from the ring.
It also creates a file on disk which is used to re-apply the configuration if the ingester crashes and restarts before being permanently shutdown.
* `GET` shows the status of this configuration * `POST` enables this configuration * `DELETE` disables this configuration
func (*Ingester) Push ¶
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error)
Push implements logproto.Pusher.
func (*Ingester) Query ¶
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error
Query the ingests for log streams matching a set of matchers.
func (*Ingester) QuerySample ¶ added in v1.6.0
func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error
QuerySample the ingesters for series from logs matching a set of matchers.
func (*Ingester) Series ¶ added in v1.3.0
func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error)
Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
func (*Ingester) SetChunkFilterer ¶
func (i *Ingester) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer)
func (*Ingester) SetExtractorWrapper ¶
func (i *Ingester) SetExtractorWrapper(wrapper lokilog.SampleExtractorWrapper)
func (*Ingester) SetPipelineWrapper ¶
func (i *Ingester) SetPipelineWrapper(wrapper lokilog.PipelineWrapper)
func (*Ingester) ShutdownHandler ¶
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownHandler handles a graceful shutdown of the ingester service and termination of the Loki process.
func (*Ingester) Tail ¶ added in v0.2.0
func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error
Tail logs matching given query
func (*Ingester) TailersCount ¶ added in v1.4.0
func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error)
TailersCount returns count of active tail requests from a user
func (*Ingester) TransferOut ¶
TransferOut implements ring.FlushTransferer Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more. We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.
func (*Ingester) Watch ¶
func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error
Watch implements grpc_health_v1.HealthCheck.
type Interface ¶
type Interface interface { services.Service logproto.PusherServer logproto.QuerierServer logproto.StreamDataServer CheckReady(ctx context.Context) error FlushHandler(w http.ResponseWriter, _ *http.Request) GetOrCreateInstance(instanceID string) (*instance, error) ShutdownHandler(w http.ResponseWriter, r *http.Request) PrepareShutdown(w http.ResponseWriter, r *http.Request) }
Interface is an interface for the Ingester
type Limiter ¶ added in v1.3.0
type Limiter struct {
// contains filtered or unexported fields
}
Limiter implements primitives to get the maximum number of streams an ingester can handle for a specific tenant
func NewLimiter ¶ added in v1.3.0
func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter
NewLimiter makes a new limiter
func (*Limiter) AssertMaxStreamsPerUser ¶ added in v1.3.0
AssertMaxStreamsPerUser ensures limit has not been reached compared to the current number of streams in input and returns an error if so.
func (*Limiter) DisableForWALReplay ¶
func (l *Limiter) DisableForWALReplay()
func (*Limiter) UnorderedWrites ¶
type NoopWALReader ¶
type NoopWALReader struct{}
func (NoopWALReader) Close ¶
func (NoopWALReader) Close() error
func (NoopWALReader) Err ¶
func (NoopWALReader) Err() error
func (NoopWALReader) Next ¶
func (NoopWALReader) Next() bool
func (NoopWALReader) Record ¶
func (NoopWALReader) Record() []byte
type OnceSwitch ¶
type OnceSwitch struct {
// contains filtered or unexported fields
}
OnceSwitch is an optimized switch that can only ever be switched "on" in a concurrent environment.
func (*OnceSwitch) Get ¶
func (o *OnceSwitch) Get() bool
func (*OnceSwitch) Trigger ¶
func (o *OnceSwitch) Trigger()
func (*OnceSwitch) TriggerAnd ¶
func (o *OnceSwitch) TriggerAnd(fn func())
TriggerAnd will ensure the switch is on and run the provided function if the switch was not already toggled on.
type QuerierQueryServer ¶
type QuerierQueryServer interface { Context() context.Context Send(res *logproto.QueryResponse) error }
QuerierQueryServer is the GRPC server stream we use to send batch of entries.
type RateLimiterStrategy ¶
type RateLimiterStrategy interface {
RateLimit(tenant string) validation.RateLimit
}
type RingCount ¶ added in v1.3.0
type RingCount interface {
HealthyInstancesCount() int
}
RingCount is the interface exposed by a ring implementation which allows to count members
type Series ¶
type Series struct { UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` // post mapped fingerprint is necessary because subsequent wal writes will reference it. Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Labels []github_com_grafana_loki_pkg_logproto.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/grafana/loki/pkg/logproto.LabelAdapter" json:"labels"` Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"` // most recently pushed timestamp. To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"` // most recently pushed line. LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"` // highest counter value for pushes to this stream. // Used to skip already applied entries during WAL replay. EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"` // highest timestamp pushed to this stream. HighestTs time.Time `protobuf:"bytes,8,opt,name=highestTs,proto3,stdtime" json:"highestTs"` }
Series is a {de,}serializable intermediate type for Series.
func (*Series) Descriptor ¶
func (*Series) GetEntryCt ¶
func (*Series) GetFingerprint ¶
func (*Series) GetHighestTs ¶
func (*Series) GetLastLine ¶
func (*Series) MarshalToSizedBuffer ¶
func (*Series) ProtoMessage ¶
func (*Series) ProtoMessage()
func (*Series) XXX_DiscardUnknown ¶
func (m *Series) XXX_DiscardUnknown()
func (*Series) XXX_Marshal ¶
func (*Series) XXX_Unmarshal ¶
type SeriesIter ¶
type SeriesIter interface { Count() int Iter() *streamIterator Stop() }
type SeriesWithErr ¶
type Store ¶
type Store interface { stores.ChunkWriter stores.ChunkFetcher storage.SelectStore storage.SchemaConfigProvider indexstore.StatsReader }
Store is the store interface we need on the ingester.
type StreamRateCalculator ¶
type StreamRateCalculator struct {
// contains filtered or unexported fields
}
func NewStreamRateCalculator ¶
func NewStreamRateCalculator() *StreamRateCalculator
func (*StreamRateCalculator) Rates ¶
func (c *StreamRateCalculator) Rates() []logproto.StreamRate
func (*StreamRateCalculator) Record ¶
func (c *StreamRateCalculator) Record(tenant string, streamHash, streamHashNoShard uint64, bytes int)
func (*StreamRateCalculator) Stop ¶
func (c *StreamRateCalculator) Stop()
type StreamRateLimiter ¶
type StreamRateLimiter struct {
// contains filtered or unexported fields
}
func NewStreamRateLimiter ¶
func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter
type TailServer ¶
type TailServer interface { Send(*logproto.TailResponse) error Context() context.Context }
type WAL ¶
type WAL interface { Start() // Log marshalls the records and writes it into the WAL. Log(*wal.Record) error // Stop stops all the WAL operations. Stop() error }
WAL interface allows us to have a no-op WAL when the WAL is disabled.
type WALCheckpointWriter ¶
type WALCheckpointWriter struct {
// contains filtered or unexported fields
}
func (*WALCheckpointWriter) Advance ¶
func (w *WALCheckpointWriter) Advance() (bool, error)
func (*WALCheckpointWriter) Close ¶
func (w *WALCheckpointWriter) Close(abort bool) error
func (*WALCheckpointWriter) Write ¶
func (w *WALCheckpointWriter) Write(s *Series) error
type WALConfig ¶
type WALConfig struct { Enabled bool `yaml:"enabled"` Dir string `yaml:"dir"` CheckpointDuration time.Duration `yaml:"checkpoint_duration"` FlushOnShutdown bool `yaml:"flush_on_shutdown"` ReplayMemoryCeiling flagext.ByteSize `yaml:"replay_memory_ceiling"` }
func (*WALConfig) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries.
|
originally from https://github.com/cortexproject/cortex/blob/868898a2921c662dcd4f90683e8b95c927a8edd8/pkg/ingester/index/index.go but modified to support sharding queries. |