query_cache

package
v5.6.0-dev.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2023 License: Apache-2.0 Imports: 21 Imported by: 4

README

Caching

Overview

The plugin query cache maintains a bidirectional GRPC stream connection to the cache server (hosted by the plugin manager process)

Cache data is streamed row by row, in both directions.

Cache Set
  • call CacheCommand_SET_RESULT_START command
  • for each row:
    • stream to FDW
    • stream to cache with CacheCommand_SET_RESULT_ITERATE command
  • mark completion with CacheCommand_SET_RESULT_END
  • fetch index bucket for this table (if it exists): CacheCommand_GET_INDEX
  • update index bucket and write back CacheCommand_SET_INDEX
Cache Get
  • fetch index bucket for this table (if it exists): CacheCommand_GET_INDEX
  • if this is a cache hit (i.e. an index item satisfying columns and quals exists), call CacheCommand_GET_RESULT
  • cache will stream results to us a row at a time - for each row
    • stream to fdw

Concurrency

The plugin may make more than one request to the cache at a time (for example, if a query consists of multiple scans with the same connection, i.e. multiple concurrent plugin Execution calls).

Therefore results from the cache may be interleaved, e.g. the results from 2 different cache gets may be streaming at the same time

To handle this, each cache command is made using a CallId (unique time-based identifier). When making a cache command, the query cache registers a callback which is stored in a map keyed by call id.

When the cache stream receiver processes a response from the cache, it uses the response callId to find the appropriate callback function and invokes it.

TTL

The TTL is controlled by the client, as multiple clients might be using the same connections and each may have a different TTL.

This is achieved as follows. The cache saves data with a default TTL of 5 minutes (this is the TTL passed to the underlying cache). When fetching data, the age of the retrieved data is determined using the insertion time. If the age of the data is older than the client TTL, it is considered a cache miss

NOTE: If the client TTL is greater than the default TTL of the cache, the default TTL is increased as appropriate to ensure the data remains in the cache at lkeast as long as is needed for the client TTL

Documentation

Overview

Package query_cache is a cache used to store query results.

Each plugin process has a single QueryCache instance which stores the data for all connections

Index

Constants

View Source
const (
	// Key column CacheMatch values
	CacheMatchSubset = "subset"
	CacheMatchExact  = "exact"
)
View Source
const (
	// cache has a default hard TTL limit of 24 hours
	DefaultMaxTtl = 24 * time.Hour
)

Variables

This section is empty.

Functions

func IsCacheMiss

func IsCacheMiss(err error) bool

Types

type CacheData

type CacheData interface {
	proto.Message
	*sdkproto.QueryResult | *sdkproto.IndexBucket
}

type CacheMissError

type CacheMissError struct{}

func (CacheMissError) Error

func (CacheMissError) Error() string

type CacheRequest

type CacheRequest struct {
	CallId         string
	Table          string
	QualMap        map[string]*sdkproto.Quals
	Columns        []string
	Limit          int64
	ConnectionName string
	TtlSeconds     int64

	StreamContext context.Context
	// contains filtered or unexported fields
}

type CacheStats

type CacheStats struct {
	// keep count of hits and misses
	Hits   int
	Misses int
}

type IndexBucket

type IndexBucket struct {
	Items []*IndexItem
}

IndexBucket contains index items for all cache results for a given table and connection

func IndexBucketfromProto

func IndexBucketfromProto(b *proto.IndexBucket) *IndexBucket

func (*IndexBucket) Append

func (b *IndexBucket) Append(item *IndexItem) *IndexBucket

func (*IndexBucket) AsProto

func (b *IndexBucket) AsProto() *proto.IndexBucket

func (*IndexBucket) Get

func (b *IndexBucket) Get(req *CacheRequest, keyColumns map[string]*proto.KeyColumn) *IndexItem

Get finds an index item which satisfies all columns

type IndexItem

type IndexItem struct {
	Columns       []string
	Key           string
	Limit         int64
	Quals         map[string]*proto.Quals
	InsertionTime time.Time
	PageCount     int64
}

IndexItem stores the columns and cached index for a single cached query result note - this index item it tied to a specific table and set of quals

func NewIndexItem

func NewIndexItem(req *CacheRequest) *IndexItem

type NoSubscribersError added in v5.6.0

type NoSubscribersError struct{}

func (NoSubscribersError) Error added in v5.6.0

func (NoSubscribersError) Error() string

type QueryCache

type QueryCache struct {
	Stats *CacheStats

	// map of connection name to plugin schema
	PluginSchemaMap map[string]*grpc.PluginSchema

	Enabled bool
	// contains filtered or unexported fields
}

func NewQueryCache

func NewQueryCache(pluginName string, pluginSchemaMap map[string]*grpc.PluginSchema, opts *QueryCacheOptions) (*QueryCache, error)

func (*QueryCache) AbortSet

func (c *QueryCache) AbortSet(ctx context.Context, callId string, err error)

func (*QueryCache) ClearForConnection

func (c *QueryCache) ClearForConnection(ctx context.Context, connectionName string)

ClearForConnection removes all cache entries for the given connection

func (*QueryCache) EndSet

func (c *QueryCache) EndSet(ctx context.Context, callId string) (err error)

func (*QueryCache) Get

func (c *QueryCache) Get(ctx context.Context, req *CacheRequest, streamUncachedRowFunc, streamCachedRowFunc func(row *sdkproto.Row)) error

func (*QueryCache) IterateSet

func (c *QueryCache) IterateSet(ctx context.Context, row *sdkproto.Row, callId string) error

type QueryCacheOptions added in v5.4.0

type QueryCacheOptions struct {
	Enabled   bool
	Ttl       time.Duration
	MaxSizeMb int
}

func NewQueryCacheOptions added in v5.4.0

func NewQueryCacheOptions(req *proto.SetCacheOptionsRequest) *QueryCacheOptions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL