resultscache

package
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: AGPL-3.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthTestTypes = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowTestTypes   = fmt.Errorf("proto: integer overflow")
)
View Source
var (
	ErrInvalidLengthTypes = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowTypes   = fmt.Errorf("proto: integer overflow")
)

Functions

This section is empty.

Types

type CacheGenNumberLoader

type CacheGenNumberLoader interface {
	GetResultsCacheGenNumber(tenantIDs []string) string
	Stop()
}

type CachedResponse

type CachedResponse struct {
	Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key"`
	// List of cached responses; non-overlapping and in order.
	Extents []Extent `protobuf:"bytes,2,rep,name=extents,proto3" json:"extents"`
}

func (*CachedResponse) Descriptor

func (*CachedResponse) Descriptor() ([]byte, []int)

func (*CachedResponse) Equal

func (this *CachedResponse) Equal(that interface{}) bool

func (*CachedResponse) GetExtents

func (m *CachedResponse) GetExtents() []Extent

func (*CachedResponse) GetKey

func (m *CachedResponse) GetKey() string

func (*CachedResponse) GoString

func (this *CachedResponse) GoString() string

func (*CachedResponse) Marshal

func (m *CachedResponse) Marshal() (dAtA []byte, err error)

func (*CachedResponse) MarshalTo

func (m *CachedResponse) MarshalTo(dAtA []byte) (int, error)

func (*CachedResponse) MarshalToSizedBuffer

func (m *CachedResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*CachedResponse) ProtoMessage

func (*CachedResponse) ProtoMessage()

func (*CachedResponse) Reset

func (m *CachedResponse) Reset()

func (*CachedResponse) Size

func (m *CachedResponse) Size() (n int)

func (*CachedResponse) String

func (this *CachedResponse) String() string

func (*CachedResponse) Unmarshal

func (m *CachedResponse) Unmarshal(dAtA []byte) error

func (*CachedResponse) XXX_DiscardUnknown

func (m *CachedResponse) XXX_DiscardUnknown()

func (*CachedResponse) XXX_Marshal

func (m *CachedResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*CachedResponse) XXX_Merge

func (m *CachedResponse) XXX_Merge(src proto.Message)

func (*CachedResponse) XXX_Size

func (m *CachedResponse) XXX_Size() int

func (*CachedResponse) XXX_Unmarshal

func (m *CachedResponse) XXX_Unmarshal(b []byte) error

type CachingOptions

type CachingOptions struct {
	Disabled bool `protobuf:"varint,1,opt,name=disabled,proto3" json:"disabled,omitempty"`
}

Defined here to prevent circular imports between logproto & queryrangebase

func (*CachingOptions) Descriptor

func (*CachingOptions) Descriptor() ([]byte, []int)

func (*CachingOptions) Equal

func (this *CachingOptions) Equal(that interface{}) bool

func (*CachingOptions) GetDisabled

func (m *CachingOptions) GetDisabled() bool

func (*CachingOptions) GoString

func (this *CachingOptions) GoString() string

func (*CachingOptions) Marshal

func (m *CachingOptions) Marshal() (dAtA []byte, err error)

func (*CachingOptions) MarshalTo

func (m *CachingOptions) MarshalTo(dAtA []byte) (int, error)

func (*CachingOptions) MarshalToSizedBuffer

func (m *CachingOptions) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*CachingOptions) ProtoMessage

func (*CachingOptions) ProtoMessage()

func (*CachingOptions) Reset

func (m *CachingOptions) Reset()

func (*CachingOptions) Size

func (m *CachingOptions) Size() (n int)

func (*CachingOptions) String

func (this *CachingOptions) String() string

func (*CachingOptions) Unmarshal

func (m *CachingOptions) Unmarshal(dAtA []byte) error

func (*CachingOptions) XXX_DiscardUnknown

func (m *CachingOptions) XXX_DiscardUnknown()

func (*CachingOptions) XXX_Marshal

func (m *CachingOptions) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*CachingOptions) XXX_Merge

func (m *CachingOptions) XXX_Merge(src proto.Message)

func (*CachingOptions) XXX_Size

func (m *CachingOptions) XXX_Size() int

func (*CachingOptions) XXX_Unmarshal

func (m *CachingOptions) XXX_Unmarshal(b []byte) error

type Config

type Config struct {
	CacheConfig cache.Config `yaml:"cache"`
	Compression string       `yaml:"compression"`
}

Config is the config for the results cache.

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string)

func (*Config) Validate

func (cfg *Config) Validate() error

type ConstSplitter

type ConstSplitter time.Duration

ConstSplitter is a utility for using a constant split interval when determining cache keys

func (ConstSplitter) GenerateCacheKey

func (t ConstSplitter) GenerateCacheKey(_ context.Context, userID string, r Request) string

GenerateCacheKey generates a cache key based on the userID, Request and interval.

type Extent

type Extent struct {
	Start    int64      `protobuf:"varint,1,opt,name=start,proto3" json:"start"`
	End      int64      `protobuf:"varint,2,opt,name=end,proto3" json:"end"`
	TraceId  string     `protobuf:"bytes,4,opt,name=trace_id,json=traceId,proto3" json:"-"`
	Response *types.Any `protobuf:"bytes,5,opt,name=response,proto3" json:"response"`
}

func (*Extent) Descriptor

func (*Extent) Descriptor() ([]byte, []int)

func (*Extent) Equal

func (this *Extent) Equal(that interface{}) bool

func (*Extent) GetEnd

func (m *Extent) GetEnd() int64

func (*Extent) GetResponse

func (m *Extent) GetResponse() *types.Any

func (*Extent) GetStart

func (m *Extent) GetStart() int64

func (*Extent) GetTraceId

func (m *Extent) GetTraceId() string

func (*Extent) GoString

func (this *Extent) GoString() string

func (*Extent) Marshal

func (m *Extent) Marshal() (dAtA []byte, err error)

func (*Extent) MarshalTo

func (m *Extent) MarshalTo(dAtA []byte) (int, error)

func (*Extent) MarshalToSizedBuffer

func (m *Extent) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Extent) ProtoMessage

func (*Extent) ProtoMessage()

func (*Extent) Reset

func (m *Extent) Reset()

func (*Extent) Size

func (m *Extent) Size() (n int)

func (*Extent) String

func (this *Extent) String() string

func (*Extent) Unmarshal

func (m *Extent) Unmarshal(dAtA []byte) error

func (*Extent) XXX_DiscardUnknown

func (m *Extent) XXX_DiscardUnknown()

func (*Extent) XXX_Marshal

func (m *Extent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Extent) XXX_Merge

func (m *Extent) XXX_Merge(src proto.Message)

func (*Extent) XXX_Size

func (m *Extent) XXX_Size() int

func (*Extent) XXX_Unmarshal

func (m *Extent) XXX_Unmarshal(b []byte) error

type Extractor

type Extractor interface {
	// Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds
	// in the `res` response which spans from `resStart` to `resEnd`.
	Extract(start, end int64, res Response, resStart, resEnd int64) Response
}

Extractor is used by the cache to extract a subset of a response from a cache entry.

type Handler

type Handler interface {
	Do(ctx context.Context, req Request) (Response, error)
}

type HandlerFunc

type HandlerFunc func(context.Context, Request) (Response, error)

func (HandlerFunc) Do

func (q HandlerFunc) Do(ctx context.Context, req Request) (Response, error)

Do implements Handler.

type KeyGenerator

type KeyGenerator interface {
	GenerateCacheKey(ctx context.Context, userID string, r Request) string
}

KeyGenerator generates cache keys. This is a useful interface for downstream consumers who wish to implement their own strategies.

func NewPipelineWrapperKeygen added in v3.1.0

func NewPipelineWrapperKeygen(inner KeyGenerator) KeyGenerator

type Limits

type Limits interface {
	MaxCacheFreshness(ctx context.Context, tenantID string) time.Duration
}

type MockLabelsPair

type MockLabelsPair struct {
	Name  string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}

func (*MockLabelsPair) Descriptor

func (*MockLabelsPair) Descriptor() ([]byte, []int)

func (*MockLabelsPair) Equal

func (this *MockLabelsPair) Equal(that interface{}) bool

func (*MockLabelsPair) GetName

func (m *MockLabelsPair) GetName() string

func (*MockLabelsPair) GetValue

func (m *MockLabelsPair) GetValue() string

func (*MockLabelsPair) GoString

func (this *MockLabelsPair) GoString() string

func (*MockLabelsPair) Marshal

func (m *MockLabelsPair) Marshal() (dAtA []byte, err error)

func (*MockLabelsPair) MarshalTo

func (m *MockLabelsPair) MarshalTo(dAtA []byte) (int, error)

func (*MockLabelsPair) MarshalToSizedBuffer

func (m *MockLabelsPair) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*MockLabelsPair) ProtoMessage

func (*MockLabelsPair) ProtoMessage()

func (*MockLabelsPair) Reset

func (m *MockLabelsPair) Reset()

func (*MockLabelsPair) Size

func (m *MockLabelsPair) Size() (n int)

func (*MockLabelsPair) String

func (this *MockLabelsPair) String() string

func (*MockLabelsPair) Unmarshal

func (m *MockLabelsPair) Unmarshal(dAtA []byte) error

func (*MockLabelsPair) XXX_DiscardUnknown

func (m *MockLabelsPair) XXX_DiscardUnknown()

func (*MockLabelsPair) XXX_Marshal

func (m *MockLabelsPair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MockLabelsPair) XXX_Merge

func (m *MockLabelsPair) XXX_Merge(src proto.Message)

func (*MockLabelsPair) XXX_Size

func (m *MockLabelsPair) XXX_Size() int

func (*MockLabelsPair) XXX_Unmarshal

func (m *MockLabelsPair) XXX_Unmarshal(b []byte) error

type MockRequest

type MockRequest struct {
	Path           string         `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
	Start          time.Time      `protobuf:"bytes,2,opt,name=start,proto3,stdtime" json:"start"`
	End            time.Time      `protobuf:"bytes,3,opt,name=end,proto3,stdtime" json:"end"`
	Step           int64          `protobuf:"varint,4,opt,name=step,proto3" json:"step,omitempty"`
	Query          string         `protobuf:"bytes,6,opt,name=query,proto3" json:"query,omitempty"`
	CachingOptions CachingOptions `protobuf:"bytes,7,opt,name=cachingOptions,proto3" json:"cachingOptions"`
}

func (*MockRequest) Descriptor

func (*MockRequest) Descriptor() ([]byte, []int)

func (*MockRequest) Equal

func (this *MockRequest) Equal(that interface{}) bool

func (*MockRequest) GetCachingOptions

func (m *MockRequest) GetCachingOptions() CachingOptions

func (*MockRequest) GetEnd

func (m *MockRequest) GetEnd() time.Time

func (*MockRequest) GetPath

func (m *MockRequest) GetPath() string

func (*MockRequest) GetQuery

func (m *MockRequest) GetQuery() string

func (*MockRequest) GetStart

func (m *MockRequest) GetStart() time.Time

func (*MockRequest) GetStep

func (m *MockRequest) GetStep() int64

func (*MockRequest) GoString

func (this *MockRequest) GoString() string

func (*MockRequest) Marshal

func (m *MockRequest) Marshal() (dAtA []byte, err error)

func (*MockRequest) MarshalTo

func (m *MockRequest) MarshalTo(dAtA []byte) (int, error)

func (*MockRequest) MarshalToSizedBuffer

func (m *MockRequest) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*MockRequest) ProtoMessage

func (*MockRequest) ProtoMessage()

func (*MockRequest) Reset

func (m *MockRequest) Reset()

func (*MockRequest) Size

func (m *MockRequest) Size() (n int)

func (*MockRequest) String

func (this *MockRequest) String() string

func (*MockRequest) Unmarshal

func (m *MockRequest) Unmarshal(dAtA []byte) error

func (*MockRequest) XXX_DiscardUnknown

func (m *MockRequest) XXX_DiscardUnknown()

func (*MockRequest) XXX_Marshal

func (m *MockRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MockRequest) XXX_Merge

func (m *MockRequest) XXX_Merge(src proto.Message)

func (*MockRequest) XXX_Size

func (m *MockRequest) XXX_Size() int

func (*MockRequest) XXX_Unmarshal

func (m *MockRequest) XXX_Unmarshal(b []byte) error

type MockResponse

type MockResponse struct {
	Labels  []*MockLabelsPair `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels,omitempty"`
	Samples []*MockSample     `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples,omitempty"`
}

func (*MockResponse) Descriptor

func (*MockResponse) Descriptor() ([]byte, []int)

func (*MockResponse) Equal

func (this *MockResponse) Equal(that interface{}) bool

func (*MockResponse) GetLabels

func (m *MockResponse) GetLabels() []*MockLabelsPair

func (*MockResponse) GetSamples

func (m *MockResponse) GetSamples() []*MockSample

func (*MockResponse) GoString

func (this *MockResponse) GoString() string

func (*MockResponse) Marshal

func (m *MockResponse) Marshal() (dAtA []byte, err error)

func (*MockResponse) MarshalTo

func (m *MockResponse) MarshalTo(dAtA []byte) (int, error)

func (*MockResponse) MarshalToSizedBuffer

func (m *MockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*MockResponse) ProtoMessage

func (*MockResponse) ProtoMessage()

func (*MockResponse) Reset

func (m *MockResponse) Reset()

func (*MockResponse) Size

func (m *MockResponse) Size() (n int)

func (*MockResponse) String

func (this *MockResponse) String() string

func (*MockResponse) Unmarshal

func (m *MockResponse) Unmarshal(dAtA []byte) error

func (*MockResponse) XXX_DiscardUnknown

func (m *MockResponse) XXX_DiscardUnknown()

func (*MockResponse) XXX_Marshal

func (m *MockResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MockResponse) XXX_Merge

func (m *MockResponse) XXX_Merge(src proto.Message)

func (*MockResponse) XXX_Size

func (m *MockResponse) XXX_Size() int

func (*MockResponse) XXX_Unmarshal

func (m *MockResponse) XXX_Unmarshal(b []byte) error

type MockSample

type MockSample struct {
	Value       float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
	TimestampMs int64   `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"`
}

func (*MockSample) Descriptor

func (*MockSample) Descriptor() ([]byte, []int)

func (*MockSample) Equal

func (this *MockSample) Equal(that interface{}) bool

func (*MockSample) GetTimestampMs

func (m *MockSample) GetTimestampMs() int64

func (*MockSample) GetValue

func (m *MockSample) GetValue() float64

func (*MockSample) GoString

func (this *MockSample) GoString() string

func (*MockSample) Marshal

func (m *MockSample) Marshal() (dAtA []byte, err error)

func (*MockSample) MarshalTo

func (m *MockSample) MarshalTo(dAtA []byte) (int, error)

func (*MockSample) MarshalToSizedBuffer

func (m *MockSample) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*MockSample) ProtoMessage

func (*MockSample) ProtoMessage()

func (*MockSample) Reset

func (m *MockSample) Reset()

func (*MockSample) Size

func (m *MockSample) Size() (n int)

func (*MockSample) String

func (this *MockSample) String() string

func (*MockSample) Unmarshal

func (m *MockSample) Unmarshal(dAtA []byte) error

func (*MockSample) XXX_DiscardUnknown

func (m *MockSample) XXX_DiscardUnknown()

func (*MockSample) XXX_Marshal

func (m *MockSample) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*MockSample) XXX_Merge

func (m *MockSample) XXX_Merge(src proto.Message)

func (*MockSample) XXX_Size

func (m *MockSample) XXX_Size() int

func (*MockSample) XXX_Unmarshal

func (m *MockSample) XXX_Unmarshal(b []byte) error

type ParallelismForReqFn

type ParallelismForReqFn func(ctx context.Context, tenantIDs []string, r Request) int

ParallelismForReqFn returns the parallelism for a given request.

type PipelineWrapperKeyGenerator added in v3.1.0

type PipelineWrapperKeyGenerator struct {
	// contains filtered or unexported fields
}

func (*PipelineWrapperKeyGenerator) GenerateCacheKey added in v3.1.0

func (kg *PipelineWrapperKeyGenerator) GenerateCacheKey(ctx context.Context, userID string, r Request) string

type Request

type Request interface {
	proto.Message
	// GetStart returns the start timestamp of the request in milliseconds.
	GetStart() time.Time
	// GetEnd returns the end timestamp of the request in milliseconds.
	GetEnd() time.Time
	// GetStep returns the step of the request in milliseconds.
	GetStep() int64
	// GetQuery returns the query of the request.
	GetQuery() string
	// GetCachingOptions returns the caching options.
	GetCachingOptions() CachingOptions
	// WithStartEndForCache clone the current request with different start and end timestamp.
	WithStartEndForCache(start time.Time, end time.Time) Request
}

type RequestResponse

type RequestResponse struct {
	Request  Request
	Response Response
}

RequestResponse contains a request response and the respective request that was used.

func DoRequests

func DoRequests(ctx context.Context, downstream Handler, reqs []Request, parallelism int) ([]RequestResponse, error)

DoRequests executes a list of requests in parallel.

type Response

type Response interface {
	proto.Message
}

type ResponseMerger

type ResponseMerger interface {
	// MergeResponse merges responses from multiple requests into a single Response
	MergeResponse(...Response) (Response, error)
}

ResponseMerger is used by middlewares making multiple requests to merge back all responses into a single one.

type ResultsCache

type ResultsCache struct {
	// contains filtered or unexported fields
}

func NewResultsCache

func NewResultsCache(
	logger log.Logger,
	c cache.Cache,
	next Handler,
	keyGen KeyGenerator,
	limits Limits,
	merger ResponseMerger,
	extractor Extractor,
	shouldCacheReq ShouldCacheReqFn,
	shouldCacheRes ShouldCacheResFn,
	parallelismForReq func(ctx context.Context, tenantIDs []string, r Request) int,
	cacheGenNumberLoader CacheGenNumberLoader,
	retentionEnabled, onlyUseEntireExtent bool,
) *ResultsCache

NewResultsCache creates results cache from config. The middleware cache result using a unique cache key for a given request (step,query,user) and interval. The cache assumes that each request length (end-start) is below or equal the interval. Each request starting from within the same interval will hit the same cache entry. If the cache doesn't have the entire duration of the request cached, it will query the uncached parts and append them to the cache entries. see `generateKey`.

func (ResultsCache) Do

type ShouldCacheReqFn

type ShouldCacheReqFn func(ctx context.Context, r Request) bool

ShouldCacheReqFn checks whether the current request should go to cache or not. If not, just send the request to next handler.

type ShouldCacheResFn

type ShouldCacheResFn func(ctx context.Context, r Request, res Response, maxCacheTime int64) bool

ShouldCacheResFn checks whether the current response should go to cache or not.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL