Versions in this module Expand all Collapse all v1 v1.6.3 Jan 15, 2024 Changes in this version + var ErrIntOverflowCheckpoint = fmt.Errorf("proto: integer overflow") + var ErrInvalidLengthCheckpoint = fmt.Errorf("proto: negative length found during unmarshaling") + var ErrReadOnly = errors.New("Ingester is shutting down") + var ErrStreamMissing = errors.New("Stream missing") + func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error + func RecoverWAL(reader WALReader, recoverer Recoverer) error + type CheckpointWriter interface + Advance func() (noop bool, err error) + Close func(abort bool) error + Write func(*Series) error + type Checkpointer struct + func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, ...) *Checkpointer + func (c *Checkpointer) PerformCheckpoint() (err error) + func (c *Checkpointer) Run() + type Chunk struct + Closed bool + Data []byte + FlushedAt time.Time + From time.Time + Head []byte + LastUpdated time.Time + Synced bool + To time.Time + func (*Chunk) Descriptor() ([]byte, []int) + func (*Chunk) ProtoMessage() + func (m *Chunk) GetClosed() bool + func (m *Chunk) GetData() []byte + func (m *Chunk) GetFlushedAt() time.Time + func (m *Chunk) GetFrom() time.Time + func (m *Chunk) GetHead() []byte + func (m *Chunk) GetLastUpdated() time.Time + func (m *Chunk) GetSynced() bool + func (m *Chunk) GetTo() time.Time + func (m *Chunk) Marshal() (dAtA []byte, err error) + func (m *Chunk) MarshalTo(dAtA []byte) (int, error) + func (m *Chunk) Reset() + func (m *Chunk) Size() (n int) + func (m *Chunk) Unmarshal(dAtA []byte) error + func (m *Chunk) XXX_DiscardUnknown() + func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) + func (m *Chunk) XXX_Merge(src proto.Message) + func (m *Chunk) XXX_Size() int + func (m *Chunk) XXX_Unmarshal(b []byte) error + func (this *Chunk) Equal(that interface{}) bool + func (this *Chunk) GoString() string + func (this *Chunk) String() string + type ChunkStore interface + GetChunkRefs func(ctx context.Context, userID string, from, through model.Time, ...) ([][]chunk.Chunk, []*chunk.Fetcher, error) + GetSchemaConfigs func() []chunk.PeriodConfig + Put func(ctx context.Context, chunks []chunk.Chunk) error + SelectLogs func(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) + SelectSamples func(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) + type Config struct + BlockSize int + ChunkEncoding string + ConcurrentFlushes int + FlushCheckPeriod time.Duration + FlushOpTimeout time.Duration + LifecyclerConfig ring.LifecyclerConfig + MaxChunkAge time.Duration + MaxChunkIdle time.Duration + MaxReturnedErrors int + MaxTransferRetries int + QueryStore bool + QueryStoreMaxLookBackPeriod time.Duration + RetainPeriod time.Duration + SyncMinUtilization float64 + SyncPeriod time.Duration + TargetChunkSize int + WAL WALConfig + func (cfg *Config) RegisterFlags(f *flag.FlagSet) + func (cfg *Config) Validate() error + type Decbuf struct + func DecWith(b []byte) (res Decbuf) + func (d *Decbuf) Bytes(n int) []byte + type Encbuf struct + func EncWith(b []byte) (res Encbuf) + func (e *Encbuf) PutString(s string) + type Flusher interface + Flush func() + type Ingester struct + func New(cfg Config, clientConfig client.Config, store ChunkStore, ...) (*Ingester, error) + func (*Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) + func (*Ingester) Watch(*grpc_health_v1.HealthCheckRequest, grpc_health_v1.Health_WatchServer) error + func (i *Ingester) CheckReady(ctx context.Context) error + func (i *Ingester) Flush() + func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) + func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) + func (i *Ingester) InitFlushQueues() + func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) + func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) + func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error + func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, ...) error + func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) + func (i *Ingester) SetChunkFilterer(chunkFilter storage.RequestChunkFilterer) + func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) + func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error + func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) + func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error + func (i *Ingester) TransferOut(ctx context.Context) error + type Limiter struct + func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter + func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error + func (l *Limiter) Disable() + func (l *Limiter) Enable() + type NoopWALReader struct + func (NoopWALReader) Close() error + func (NoopWALReader) Err() error + func (NoopWALReader) Next() bool + func (NoopWALReader) Record() []byte + type OnceSwitch struct + func (o *OnceSwitch) Get() bool + func (o *OnceSwitch) Trigger() + func (o *OnceSwitch) TriggerAnd(fn func()) + type QuerierQueryServer interface + Context func() context.Context + Send func(res *logproto.QueryResponse) error + type RecordType byte + const CheckpointRecord + const WALRecordEntries + const WALRecordSeries + type Recoverer interface + Done func() <-chan struct{} + NumWorkers func() int + Push func(userID string, entries RefEntries) error + Series func(series *Series) error + SetStream func(userID string, series record.RefSeries) error + type RefEntries struct + Entries []logproto.Entry + Ref uint64 + type RingCount interface + HealthyInstancesCount func() int + type Series struct + Chunks []Chunk + Fingerprint uint64 + Labels []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter + LastLine string + To time.Time + UserID string + func (*Series) Descriptor() ([]byte, []int) + func (*Series) ProtoMessage() + func (m *Series) GetChunks() []Chunk + func (m *Series) GetFingerprint() uint64 + func (m *Series) GetLastLine() string + func (m *Series) GetTo() time.Time + func (m *Series) GetUserID() string + func (m *Series) Marshal() (dAtA []byte, err error) + func (m *Series) MarshalTo(dAtA []byte) (int, error) + func (m *Series) Reset() + func (m *Series) Size() (n int) + func (m *Series) Unmarshal(dAtA []byte) error + func (m *Series) XXX_DiscardUnknown() + func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) + func (m *Series) XXX_Merge(src proto.Message) + func (m *Series) XXX_Size() int + func (m *Series) XXX_Unmarshal(b []byte) error + func (this *Series) Equal(that interface{}) bool + func (this *Series) GoString() string + func (this *Series) String() string + type SeriesIter interface + Count func() int + Iter func() *streamIterator + Stop func() + type SeriesWithErr struct + Err error + Series *Series + type TailServer interface + Context func() context.Context + Send func(*logproto.TailResponse) error + type WAL interface + Log func(*WALRecord) error + Start func() + Stop func() error + type WALCheckpointWriter struct + func (w *WALCheckpointWriter) Advance() (bool, error) + func (w *WALCheckpointWriter) Close(abort bool) error + func (w *WALCheckpointWriter) Write(s *Series) error + type WALConfig struct + CheckpointDuration time.Duration + Dir string + Enabled bool + FlushOnShutdown bool + ReplayMemoryCeiling flagext.ByteSize + func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) + func (cfg *WALConfig) Validate() error + type WALReader interface + Err func() error + Next func() bool + Record func() []byte + type WALRecord struct + RefEntries []RefEntries + Series []record.RefSeries + UserID string + func (r *WALRecord) AddEntries(fp uint64, entries ...logproto.Entry) + func (r *WALRecord) IsEmpty() bool + func (r *WALRecord) Reset()