Documentation ¶
Index ¶
- Constants
- type Bundle
- type BundleCache
- type BundleStream
- func (b *BundleStream) Changes() chan struct{}
- func (b *BundleStream) Clone() *BundleStream
- func (b *BundleStream) HasNext() bool
- func (b *BundleStream) Next() map[spiffeid.TrustDomain]*Bundle
- func (b *BundleStream) Value() map[spiffeid.TrustDomain]*Bundle
- func (b *BundleStream) WaitNext() map[spiffeid.TrustDomain]*Bundle
- type Cache
- func (c *Cache) CountSVIDs() int
- func (c *Cache) Entries() []*common.RegistrationEntry
- func (c *Cache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate
- func (c *Cache) GetStaleEntries() []*StaleEntry
- func (c *Cache) Identities() []Identity
- func (c *Cache) MatchingIdentities(selectors []*common.Selector) []Identity
- func (c *Cache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry
- func (c *Cache) SubscribeToWorkloadUpdates(ctx context.Context, selectors Selectors) (Subscriber, error)
- func (c *Cache) SyncSVIDsWithSubscribers()
- func (c *Cache) UpdateEntries(update *UpdateEntries, ...)
- func (c *Cache) UpdateSVIDs(update *UpdateSVIDs)
- type Identity
- type JWTSVIDCache
- type LRUCache
- func (c *LRUCache) CountSVIDs() int
- func (c *LRUCache) Entries() []*common.RegistrationEntry
- func (c *LRUCache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate
- func (c *LRUCache) GetStaleEntries() []*StaleEntry
- func (c *LRUCache) Identities() []Identity
- func (c *LRUCache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry
- func (c *LRUCache) NewSubscriber(selectors []*common.Selector) Subscriber
- func (c *LRUCache) Notify(selectors []*common.Selector) bool
- func (c *LRUCache) SubscribeToWorkloadUpdates(ctx context.Context, selectors Selectors) (Subscriber, error)
- func (c *LRUCache) SyncSVIDsWithSubscribers()
- func (c *LRUCache) UpdateEntries(update *UpdateEntries, ...)
- func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs)
- type Selectors
- type StaleEntry
- type Subscriber
- type UpdateEntries
- type UpdateSVIDs
- type WorkloadUpdate
- type X509SVID
Constants ¶
const ( DefaultSVIDCacheMaxSize = 1000 SVIDSyncInterval = 500 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bundle ¶
type Bundle = bundleutil.Bundle
type BundleCache ¶
type BundleCache struct {
// contains filtered or unexported fields
}
func NewBundleCache ¶
func NewBundleCache(trustDomain spiffeid.TrustDomain, bundle *Bundle) *BundleCache
func (*BundleCache) Bundle ¶
func (c *BundleCache) Bundle() *Bundle
func (*BundleCache) Bundles ¶
func (c *BundleCache) Bundles() map[spiffeid.TrustDomain]*Bundle
func (*BundleCache) SubscribeToBundleChanges ¶
func (c *BundleCache) SubscribeToBundleChanges() *BundleStream
func (*BundleCache) Update ¶
func (c *BundleCache) Update(bundles map[spiffeid.TrustDomain]*Bundle)
type BundleStream ¶
type BundleStream struct {
// contains filtered or unexported fields
}
Wraps an observer stream to provide a type safe interface
func NewBundleStream ¶
func NewBundleStream(stream observer.Stream) *BundleStream
func (*BundleStream) Changes ¶
func (b *BundleStream) Changes() chan struct{}
Changes returns the channel that is closed when a new value is available.
func (*BundleStream) Clone ¶
func (b *BundleStream) Clone() *BundleStream
Clone creates a new independent stream from this one but sharing the same Property. Updates to the property will be reflected in both streams but they may have different values depending on when they advance the stream with Next.
func (*BundleStream) HasNext ¶
func (b *BundleStream) HasNext() bool
HasNext checks whether there is a new value available.
func (*BundleStream) Next ¶
func (b *BundleStream) Next() map[spiffeid.TrustDomain]*Bundle
Next advances this stream to the next state. You should never call this unless Changes channel is closed.
func (*BundleStream) Value ¶
func (b *BundleStream) Value() map[spiffeid.TrustDomain]*Bundle
Value returns the current value for this stream.
func (*BundleStream) WaitNext ¶
func (b *BundleStream) WaitNext() map[spiffeid.TrustDomain]*Bundle
WaitNext waits for Changes to be closed, advances the stream and returns the current value.
type Cache ¶
type Cache struct { *BundleCache *JWTSVIDCache // contains filtered or unexported fields }
Cache caches each registration entry, signed X509-SVIDs for those entries, bundles, and JWT SVIDs for the agent. It allows subscriptions by (workload) selector sets and notifies subscribers when:
1) a registration entry related to the selectors:
- is modified
- has a new X509-SVID signed for it
- federates with a federated bundle that is updated
2) the trust bundle for the agent trust domain is updated
When notified, the subscriber is given a WorkloadUpdate containing related identities and trust bundles.
The cache does this efficiently by building an index for each unique selector it encounters. Each selector index tracks the subscribers (i.e workloads) and registration entries that have that selector.
When registration entries are added/updated/removed, the set of relevant selectors are gathered and the indexes for those selectors are combed for all relevant subscribers.
For each relevant subscriber, the selector index for each selector of the subscriber is combed for registration whose selectors are a subset of the subscriber selector set. Identities for those entries are added to the workload update returned to the subscriber.
NOTE: The cache is intended to be able to handle thousands of workload subscriptions, which can involve thousands of certificates, keys, bundles, and registration entries, etc. The selector index itself is intended to be scalable, but the objects themselves can take a considerable amount of memory. For maximal safety, the objects should be cloned both coming in and leaving the cache. However, during global updates (e.g. trust bundle is updated for the agent trust domain) in particular, cloning all of the relevant objects for each subscriber causes HUGE amounts of memory pressure which adds non-trivial amounts of latency and causes a giant memory spike that could OOM the agent on smaller VMs. For this reason, the cache is presumed to own ALL data passing in and out of the cache. Producers and consumers MUST NOT mutate the data.
func New ¶
func New(log logrus.FieldLogger, trustDomain spiffeid.TrustDomain, bundle *Bundle, metrics telemetry.Metrics) *Cache
func (*Cache) CountSVIDs ¶
func (*Cache) Entries ¶
func (c *Cache) Entries() []*common.RegistrationEntry
func (*Cache) FetchWorkloadUpdate ¶
func (c *Cache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate
func (*Cache) GetStaleEntries ¶
func (c *Cache) GetStaleEntries() []*StaleEntry
GetStaleEntries obtains a list of stale entries
func (*Cache) Identities ¶
Identities is only used by manager tests TODO: We should remove this and find a better way
func (*Cache) MatchingIdentities ¶
func (*Cache) MatchingRegistrationEntries ¶
func (c *Cache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry
func (*Cache) SubscribeToWorkloadUpdates ¶
func (*Cache) SyncSVIDsWithSubscribers ¶
func (c *Cache) SyncSVIDsWithSubscribers()
func (*Cache) UpdateEntries ¶
func (c *Cache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool)
UpdateEntries updates the cache with the provided registration entries and bundles and notifies impacted subscribers. The checkSVID callback, if provided, is used to determine if the SVID for the entry is stale, or otherwise in need of rotation. Entries marked stale through the checkSVID callback are returned from GetStaleEntries() until the SVID is updated through a call to UpdateSVIDs.
func (*Cache) UpdateSVIDs ¶
func (c *Cache) UpdateSVIDs(update *UpdateSVIDs)
type Identity ¶
type Identity struct { Entry *common.RegistrationEntry SVID []*x509.Certificate PrivateKey crypto.Signer }
Identity holds the data for a single workload identity
type JWTSVIDCache ¶
type JWTSVIDCache struct {
// contains filtered or unexported fields
}
func NewJWTSVIDCache ¶
func NewJWTSVIDCache() *JWTSVIDCache
func (*JWTSVIDCache) GetJWTSVID ¶
func (*JWTSVIDCache) SetJWTSVID ¶
type LRUCache ¶
type LRUCache struct { *BundleCache *JWTSVIDCache // contains filtered or unexported fields }
Cache caches each registration entry, bundles, and JWT SVIDs for the agent. The signed X509-SVIDs for those entries are stored in LRU-like cache. It allows subscriptions by (workload) selector sets and notifies subscribers when:
1) a registration entry related to the selectors:
- is modified
- has a new X509-SVID signed for it
- federates with a federated bundle that is updated
2) the trust bundle for the agent trust domain is updated
When notified, the subscriber is given a WorkloadUpdate containing related identities and trust bundles.
The cache does this efficiently by building an index for each unique selector it encounters. Each selector index tracks the subscribers (i.e workloads) and registration entries that have that selector.
The LRU-like SVID cache has configurable size limit and expiry period.
- Size limit of SVID cache is a soft limit. If SVID has a subscriber present then that SVID is never removed from cache.
- Least recently used SVIDs are removed from cache only after the cache expiry period has passed. This is done to reduce the overall cache churn.
- Last access timestamp for SVID cache entry is updated when a new subscriber is created
- When a new subscriber is created and there is a cache miss then subscriber needs to wait for next SVID sync event to receive WorkloadUpdate with newly minted SVID
The advantage of above approach is that if agent has entry count less than cache size then all SVIDs are cached at all times. If agent has entry count greater than cache size then subscribers will continue to get SVID updates (potential delay for first WorkloadUpdate if cache miss) and least used SVIDs will be removed from cache which will save memory usage. This allows agent to support environments where the active simultaneous workload count is a small percentage of the large number of registrations assigned to the agent.
When registration entries are added/updated/removed, the set of relevant selectors are gathered and the indexes for those selectors are combed for all relevant subscribers.
For each relevant subscriber, the selector index for each selector of the subscriber is combed for registration whose selectors are a subset of the subscriber selector set. Identities for those entries are added to the workload update returned to the subscriber.
NOTE: The cache is intended to be able to handle thousands of workload subscriptions, which can involve thousands of certificates, keys, bundles, and registration entries, etc. The selector index itself is intended to be scalable, but the objects themselves can take a considerable amount of memory. For maximal safety, the objects should be cloned both coming in and leaving the cache. However, during global updates (e.g. trust bundle is updated for the agent trust domain) in particular, cloning all of the relevant objects for each subscriber causes HUGE amounts of memory pressure which adds non-trivial amounts of latency and causes a giant memory spike that could OOM the agent on smaller VMs. For this reason, the cache is presumed to own ALL data passing in and out of the cache. Producers and consumers MUST NOT mutate the data.
func NewLRUCache ¶
func (*LRUCache) CountSVIDs ¶
func (*LRUCache) Entries ¶
func (c *LRUCache) Entries() []*common.RegistrationEntry
func (*LRUCache) FetchWorkloadUpdate ¶
func (c *LRUCache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate
func (*LRUCache) GetStaleEntries ¶
func (c *LRUCache) GetStaleEntries() []*StaleEntry
GetStaleEntries obtains a list of stale entries
func (*LRUCache) Identities ¶
Identities is only used by manager tests TODO: We should remove this and find a better way
func (*LRUCache) MatchingRegistrationEntries ¶
func (c *LRUCache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry
func (*LRUCache) NewSubscriber ¶
func (c *LRUCache) NewSubscriber(selectors []*common.Selector) Subscriber
NewSubscriber creates a subscriber for given selector set. Separately call Notify for the first time after this method is invoked to receive latest updates.
func (*LRUCache) Notify ¶
Notify subscribers of selector set only if all SVIDs for corresponding selector set are cached It returns whether all SVIDs are cached or not. This method should be retried with backoff to avoid lock contention.
func (*LRUCache) SubscribeToWorkloadUpdates ¶
func (*LRUCache) SyncSVIDsWithSubscribers ¶
func (c *LRUCache) SyncSVIDsWithSubscribers()
SyncSVIDsWithSubscribers will sync svid cache: entries with active subscribers which are not cached will be put in staleEntries map records which are not cached for remainder of max cache size will also be put in staleEntries map
func (*LRUCache) UpdateEntries ¶
func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool)
UpdateEntries updates the cache with the provided registration entries and bundles and notifies impacted subscribers. The checkSVID callback, if provided, is used to determine if the SVID for the entry is stale, or otherwise in need of rotation. Entries marked stale through the checkSVID callback are returned from GetStaleEntries() until the SVID is updated through a call to UpdateSVIDs.
func (*LRUCache) UpdateSVIDs ¶
func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs)
type StaleEntry ¶
type StaleEntry struct { // Entry stale registration entry Entry *common.RegistrationEntry // SVIDs expiration time ExpiresAt time.Time }
StaleEntry holds stale entries with SVIDs expiration time
type Subscriber ¶
type Subscriber interface { Updates() <-chan *WorkloadUpdate Finish() }
type UpdateEntries ¶
type UpdateEntries struct { // Bundles is a set of ALL trust bundles available to the agent, keyed by trust domain Bundles map[spiffeid.TrustDomain]*bundleutil.Bundle // RegistrationEntries is a set of ALL registration entries available to the // agent, keyed by registration entry id. RegistrationEntries map[string]*common.RegistrationEntry }
Update holds information for an entries update to the cache.
type UpdateSVIDs ¶
type UpdateSVIDs struct { // X509SVIDs is a set of updated X509-SVIDs that should be merged into // the cache, keyed by registration entry id. X509SVIDs map[string]*X509SVID }
Update holds information for an SVIDs update to the cache.
type WorkloadUpdate ¶
type WorkloadUpdate struct { Identities []Identity Bundle *bundleutil.Bundle FederatedBundles map[spiffeid.TrustDomain]*bundleutil.Bundle }
WorkloadUpdate is used to convey workload information to cache subscribers
func (*WorkloadUpdate) HasIdentity ¶
func (u *WorkloadUpdate) HasIdentity() bool