Documentation ¶
Index ¶
- type Bundle
- type BundleCache
- type BundleStream
- type Cache
- func (c *Cache) CountSVIDs() int
- func (c *Cache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate
- func (c *Cache) GetStaleEntries() []*StaleEntry
- func (c *Cache) Identities() []Identity
- func (c *Cache) MatchingIdentities(selectors []*common.Selector) []Identity
- func (c *Cache) SubscribeToWorkloadUpdates(selectors []*common.Selector) Subscriber
- func (c *Cache) UpdateEntries(update *UpdateEntries, ...)
- func (c *Cache) UpdateSVIDs(update *UpdateSVIDs)
- type Identity
- type JWTSVIDCache
- type Selectors
- type StaleEntry
- type Subscriber
- type UpdateEntries
- type UpdateSVIDs
- type WorkloadUpdate
- type X509SVID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bundle ¶
type Bundle = bundleutil.Bundle
type BundleCache ¶
type BundleCache struct {
// contains filtered or unexported fields
}
func NewBundleCache ¶
func NewBundleCache(trustDomainID string, bundle *Bundle) *BundleCache
func (*BundleCache) Bundle ¶
func (c *BundleCache) Bundle() *Bundle
func (*BundleCache) Bundles ¶
func (c *BundleCache) Bundles() map[string]*Bundle
func (*BundleCache) SubscribeToBundleChanges ¶
func (c *BundleCache) SubscribeToBundleChanges() *BundleStream
func (*BundleCache) Update ¶
func (c *BundleCache) Update(bundles map[string]*Bundle)
type BundleStream ¶
type BundleStream struct {
// contains filtered or unexported fields
}
Wraps an observer stream to provide a type safe interface
func NewBundleStream ¶
func NewBundleStream(stream observer.Stream) *BundleStream
func (*BundleStream) Changes ¶
func (b *BundleStream) Changes() chan struct{}
Changes returns the channel that is closed when a new value is available.
func (*BundleStream) Clone ¶
func (b *BundleStream) Clone() *BundleStream
Clone creates a new independent stream from this one but sharing the same Property. Updates to the property will be reflected in both streams but they may have different values depending on when they advance the stream with Next.
func (*BundleStream) HasNext ¶
func (b *BundleStream) HasNext() bool
HasNext checks whether there is a new value available.
func (*BundleStream) Next ¶
func (b *BundleStream) Next() map[string]*Bundle
Next advances this stream to the next state. You should never call this unless Changes channel is closed.
func (*BundleStream) Value ¶
func (b *BundleStream) Value() map[string]*Bundle
Value returns the current value for this stream.
func (*BundleStream) WaitNext ¶
func (b *BundleStream) WaitNext() map[string]*Bundle
WaitNext waits for Changes to be closed, advances the stream and returns the current value.
type Cache ¶
type Cache struct { *BundleCache *JWTSVIDCache // contains filtered or unexported fields }
Cache caches each registration entry, signed X509-SVIDs for those entries, bundles, and JWT SVIDs for the agent. It allows subscriptions by (workload) selector sets and notifies subscribers when:
1) a registration entry related to the selectors:
- is modified
- has a new X509-SVID signed for it
- federates with a federated bundle that is updated
2) the trust bundle for the agent trust domain is updated
When notified, the subscriber is given a WorkloadUpdate containing related identities and trust bundles.
The cache does this efficiently by building an index for each unique selector it encounters. Each selector index tracks the subscribers (i.e workloads) and registration entries that have that selector.
When registration entries are added/updated/removed, the set of relevant selectors are gathered and the indexes for those selectors are combed for all relevant subscribers.
For each relevant subscriber, the selector index for each selector of the subscriber is combed for registration whose selectors are a subset of the subscriber selector set. Identities for those entries are added to the workload update returned to the subscriber.
NOTE: The cache is intended to be able to handle thousands of workload subscriptions, which can involve thousands of certificates, keys, bundles, and registration entries, etc. The selector index itself is intended to be scalable, but the objects themselves can take a considerable amount of memory. For maximal safety, the objects should be cloned both coming in and leaving the cache. However, during global updates (e.g. trust bundle is updated for the agent trust domain) in particular, cloning all of the relevant objects for each subscriber causes HUGE amounts of memory pressure which adds non-trivial amounts of latency and causes a giant memory spike that could OOM the agent on smaller VMs. For this reason, the cache is presumed to own ALL data passing in and out of the cache. Producers and consumers MUST NOT mutate the data.
func (*Cache) CountSVIDs ¶ added in v0.12.0
func (*Cache) FetchWorkloadUpdate ¶
func (c *Cache) FetchWorkloadUpdate(selectors []*common.Selector) *WorkloadUpdate
func (*Cache) GetStaleEntries ¶ added in v0.10.0
func (c *Cache) GetStaleEntries() []*StaleEntry
GetStaleEntries obtains a list of stale entries
func (*Cache) Identities ¶
Identities is only used by manager tests TODO: We should remove this and find a better way
func (*Cache) MatchingIdentities ¶
func (*Cache) SubscribeToWorkloadUpdates ¶
func (c *Cache) SubscribeToWorkloadUpdates(selectors []*common.Selector) Subscriber
func (*Cache) UpdateEntries ¶ added in v0.10.0
func (c *Cache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool)
UpdateEntries updates the cache with the provided registration entries and bundles and notifies impacted subscribers. The checkSVID callback, if provided, is used to determine if the SVID for the entry is stale, or otherwise in need of rotation. Entries marked stale through the checkSVID callback are returned from GetStaleEntries() until the SVID is updated through a call to UpdateSVIDs.
func (*Cache) UpdateSVIDs ¶ added in v0.10.0
func (c *Cache) UpdateSVIDs(update *UpdateSVIDs)
type Identity ¶
type Identity struct { Entry *common.RegistrationEntry SVID []*x509.Certificate PrivateKey crypto.Signer }
Identity holds the data for a single workload identity
type JWTSVIDCache ¶
type JWTSVIDCache struct {
// contains filtered or unexported fields
}
func NewJWTSVIDCache ¶
func NewJWTSVIDCache() *JWTSVIDCache
func (*JWTSVIDCache) GetJWTSVID ¶
func (*JWTSVIDCache) SetJWTSVID ¶
func (c *JWTSVIDCache) SetJWTSVID(spiffeID string, audience []string, svid *client.JWTSVID)
type StaleEntry ¶ added in v0.10.0
type StaleEntry struct { // Entry stale registration entry Entry *common.RegistrationEntry // SVIDs expiration time ExpiresAt time.Time }
StaleEntry holds stale entries with SVIDs expiration time
type Subscriber ¶
type Subscriber interface { Updates() <-chan *WorkloadUpdate Finish() }
type UpdateEntries ¶ added in v0.10.0
type UpdateEntries struct { // Bundles is a set of ALL trust bundles available to the agent, keyed by // trust domain id. Bundles map[string]*bundleutil.Bundle // RegistrationEntries is a set of ALL registration entries available to the // agent, keyed by registration entry id. RegistrationEntries map[string]*common.RegistrationEntry }
Update holds information for an entries update to the cache.
type UpdateSVIDs ¶ added in v0.10.0
type UpdateSVIDs struct { // X509SVIDs is a set of updated X509-SVIDs that should be merged into // the cache, keyed by registration entry id. X509SVIDs map[string]*X509SVID }
Update holds information for an SVIDs update to the cache.
type WorkloadUpdate ¶
type WorkloadUpdate struct { Identities []Identity Bundle *bundleutil.Bundle FederatedBundles map[string]*bundleutil.Bundle }
WorkloadUpdate is used to convey workload information to cache subscribers