Documentation ¶
Overview ¶
Package statecache implements the state caching feature described by the Beam Fn API
The Beam State API and the intended caching behavior are described here: https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.7ghoih5aig5m
Index ¶
- type CacheMetrics
- type SideInputCache
- func (c *SideInputCache) CacheMetrics() CacheMetrics
- func (c *SideInputCache) CompleteBundle(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)
- func (c *SideInputCache) Init(cap int) error
- func (c *SideInputCache) QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) exec.ReStream
- func (c *SideInputCache) SetCache(ctx context.Context, transformID, sideInputID string, win, key []byte, ...) exec.ReStream
- func (c *SideInputCache) SetValidTokens(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CacheMetrics ¶
type CacheMetrics struct {
Hits, Misses, Evictions, InUseEvictions, ReStreamErrors int64
}
CacheMetrics stores metrics for the cache across a pipeline run.
type SideInputCache ¶
type SideInputCache struct {
// contains filtered or unexported fields
}
SideInputCache stores a cache of reusable inputs for the purposes of eliminating redundant calls to the runner during execution of ParDos using side inputs.
A SideInputCache should be initialized when the SDK harness is initialized, creating storage for side input caching. On each ProcessBundleRequest, the cache will process the list of tokens for cacheable side inputs and be queried when side inputs are requested in bundle execution. Once a new bundle request comes in the valid tokens will be updated and the cache will be re-used. In the event that the cache reaches capacity, a random, currently invalid cached object will be evicted.
func (*SideInputCache) CacheMetrics ¶
func (c *SideInputCache) CacheMetrics() CacheMetrics
CacheMetrics returns the cache metrics for current side input cache.
func (*SideInputCache) CompleteBundle ¶
func (c *SideInputCache) CompleteBundle(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)
CompleteBundle takes the cache tokens passed to set the valid tokens and decrements their usage count for the purposes of maintaining a valid count of whether or not a value is still in use. Should be called once ProcessBundle has completed.
func (*SideInputCache) Init ¶
func (c *SideInputCache) Init(cap int) error
Init makes the cache map and the map of IDs to cache tokens for the SideInputCache. Should only be called once. Returns an error for non-positive capacities.
func (*SideInputCache) QueryCache ¶
func (c *SideInputCache) QueryCache(ctx context.Context, transformID, sideInputID string, win, key []byte) exec.ReStream
QueryCache takes a transform ID and side input ID and checking if a corresponding side input has been cached. A query having a bad token (e.g. one that doesn't make a known token or one that makes a known but currently invalid token) is treated the same as a cache miss.
func (*SideInputCache) SetCache ¶
func (c *SideInputCache) SetCache(ctx context.Context, transformID, sideInputID string, win, key []byte, input exec.ReStream) exec.ReStream
SetCache allows a user to place a ReusableInput materialized from the reader into the SideInputCache with its corresponding transform ID and side input ID. If the IDs do not pair with a known, valid token then we silently do not cache the input, as this is an indication that the runner is treating that input as uncacheable.
func (*SideInputCache) SetValidTokens ¶
func (c *SideInputCache) SetValidTokens(cacheTokens ...*fnpb.ProcessBundleRequest_CacheToken)
SetValidTokens clears the list of valid tokens then sets new ones, also updating the mapping of transform and side input IDs to cache tokens in the process. Should be called at the start of every new ProcessBundleRequest. If the runner does not support caching, the passed cache token values should be empty and all get/set requests will silently be no-ops.