query_cache

package
v4.0.1-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

README

#Caching

##Overview The plugin query cache maintains a bidirectional GRPC stream connection to the cache server (hosted by the plugin manager process)

Cache data is streamed row by row, in both directions.

###Cache Set

  • call CacheCommand_SET_RESULT_START command
  • for each row:
    • stream to FDW
    • stream to cache with CacheCommand_SET_RESULT_ITERATE command
  • mark completion with CacheCommand_SET_RESULT_END
  • fetch index bucket for this table (if it exists): CacheCommand_GET_INDEX
  • update index bucket and write back CacheCommand_SET_INDEX

###Cache Get

  • fetch index bucket for this table (if it exists): CacheCommand_GET_INDEX
  • if this is a cache hit (i.e. an index item satisfying columns and quals exists), call CacheCommand_GET_RESULT
  • cache will stream results to us a row at a time - for each row
    • stream to fdw

##Concurrency The plugin may make more than one request to the cache at a time (for example, if a query consists of multiple scans with the same connetion, i.e. multiple concurrent plugin Execution calls).

Therefore results from the cache may be interleaved, e.g. the results from 2 different cache gets may be streaming at the same time

To handle this, each cache command is made using a CallId (unique time-based identifier). When making a cache command, the query cache registers a callback which is stored in a map keyed by call id.

When the cache stream receiver processes a response from the cache, it uses the response callId to find the appropriate callback function and invokes it.

##TTL

The TTL is controlled by the client, as multiple clients might be using the same connections and each may have a different TTL.

This is achieved as follows. The cache saves data with a default TTL of 5 minutes (this is the TTL passed to the underlying cache). When fetching data, the age of the retrieved data is determined using the insertion time. If the age of the data is older than the client TTL, it is considered a cache miss

NOTE: If the client TTL is greater than the default TTL of the cache, the default TTL is increased as appropriate to ensure the data remains in the cache at lkeast as long as is needed for the client TTL

Documentation

Index

Constants

View Source
const (
	// Key column CacheMatch values
	CacheMatchSubset = "subset"
	CacheMatchExact  = "exact"
)

Variables

This section is empty.

Functions

func IsCacheMiss

func IsCacheMiss(err error) bool

func NewPendingIndexItem

func NewPendingIndexItem(req *CacheRequest) *pendingIndexItem

Types

type CacheData

type CacheData interface {
	proto.Message
	*sdkproto.QueryResult | *sdkproto.IndexBucket
}

type CacheMissError

type CacheMissError struct{}

func (CacheMissError) Error

func (CacheMissError) Error() string

type CacheRequest

type CacheRequest struct {
	CallId         string
	Table          string
	QualMap        map[string]*sdkproto.Quals
	Columns        []string
	Limit          int64
	ConnectionName string
	TtlSeconds     int64
	// contains filtered or unexported fields
}

type CacheStats

type CacheStats struct {
	// keep count of hits and misses
	Hits   int
	Misses int
}

type IndexBucket

type IndexBucket struct {
	Items []*IndexItem
}

IndexBucket contains index items for all cache results for a given table and qual set

func IndexBucketfromProto

func IndexBucketfromProto(b *proto.IndexBucket) *IndexBucket

func (*IndexBucket) Append

func (b *IndexBucket) Append(item *IndexItem) *IndexBucket

func (*IndexBucket) AsProto

func (b *IndexBucket) AsProto() *proto.IndexBucket

func (*IndexBucket) Get

func (b *IndexBucket) Get(req *CacheRequest, keyColumns map[string]*proto.KeyColumn) *IndexItem

Get finds an index item which satisfies all columns

type IndexItem

type IndexItem struct {
	Columns       []string
	Key           string
	Limit         int64
	Quals         map[string]*proto.Quals
	InsertionTime time.Time
	PageCount     int64
}

IndexItem stores the columns and cached index for a single cached query result note - this index item it tied to a specific table and set of quals

func NewIndexItem

func NewIndexItem(req *CacheRequest) *IndexItem

func (IndexItem) SatisfiesColumns

func (i IndexItem) SatisfiesColumns(columns []string) bool

SatisfiesColumns returns whether this index item satisfies the given columns used when determining whether this IndexItem satisfies a cache reques

func (IndexItem) SatisfiesLimit

func (i IndexItem) SatisfiesLimit(limit int64) bool

SatisfiesLimit returns whether this index item satisfies the given limit used when determining whether this IndexItem satisfies a cache reques

func (IndexItem) SatisfiesQuals

func (i IndexItem) SatisfiesQuals(checkQualMap map[string]*proto.Quals, keyColumns map[string]*proto.KeyColumn) bool

SatisfiesQuals does this index item satisfy the check quals all data returned by check quals is returned by index quals

i.e. check quals must be a 'subset' of index quals
eg
   our quals [], check quals [id="1"] 				-> SATISFIED
   our quals [id="1"], check quals [id="1"] 		-> SATISFIED
   our quals [id="1"], check quals [id="1", foo=2] -> SATISFIED
   our quals [id="1", foo=2], check quals [id="1"] -> NOT SATISFIED

NOTE: some columns cannot use this subset logic. Generally this applies to columns which represent a filter which is executed server side to filter the data returned. In this case, we only identify a cache hit if the cached data has the _same_ value for the given colummn

NOTE: if the IndexItem has a limit, the quals must be IDENTICAL (ignoring ordering)

func (IndexItem) SatisfiesTtl

func (i IndexItem) SatisfiesTtl(ttlSeconds int64) bool

SatisfiesTtl does this index item satisfy the ttl requirement

type QueryCache

type QueryCache struct {
	Stats *CacheStats

	// map of connection name to plugin schema
	PluginSchemaMap map[string]*grpc.PluginSchema
	// contains filtered or unexported fields
}

func NewQueryCache

func NewQueryCache(pluginName string, pluginSchemaMap map[string]*grpc.PluginSchema, maxCacheStorageMb int) (*QueryCache, error)

func (*QueryCache) AbortSet

func (c *QueryCache) AbortSet(ctx context.Context, callId string)

func (*QueryCache) EndSet

func (c *QueryCache) EndSet(ctx context.Context, callId string) (err error)

func (*QueryCache) Get

func (c *QueryCache) Get(ctx context.Context, req *CacheRequest, streamRowFunc func(row *sdkproto.Row)) error

func (*QueryCache) IterateSet

func (c *QueryCache) IterateSet(ctx context.Context, row *sdkproto.Row, callId string) error

func (*QueryCache) StartSet

func (c *QueryCache) StartSet(_ context.Context, req *CacheRequest)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL