internal

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2025 License: BSD-2-Clause Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// An ExplicitSubscription means the client subscribed to a resource by explicit providing its name.
	ExplicitSubscription = subscriptionType(iota)
	// A GlobSubscription means the client subscribed to a resource by specifying its parent glob
	// collection URL, implicitly subscribing it to all the resources that are part of the collection.
	GlobSubscription
	// A WildcardSubscription means the client subscribed to a resource by specifying the wildcard
	// (ads.WildcardSubscription), implicitly subscribing it to all resources in the cache.
	WildcardSubscription
)

The following subscriptionType constants define the ways a client can subscribe to a resource. See RawCache.Subscribe for additional details.

Variables

This section is empty.

Functions

func SetTimeProvider

func SetTimeProvider(now func() time.Time)

SetTimeProvider can be used to provide an alternative time provider, which is important when benchmarking the cache.

Types

type GlobCollectionsMap

type GlobCollectionsMap[T proto.Message] struct {
	// contains filtered or unexported fields
}

GlobCollectionsMap used to map individual GlobCollectionURL to their corresponding globCollection. This uses a ResourceMap under the hood because it has similar semantics to cache entries:

  1. A globCollection is created lazily, either when an entry for that collection is created, or a subscription to that collection is made.
  2. A globCollection is only deleted once all subscribers have unsubscribed and the collection is empty. Crucially, a collection can be empty but will remain in the cache as long as some subscribers remain subscribed.

func (*GlobCollectionsMap[T]) IsSubscribed

func (gcm *GlobCollectionsMap[T]) IsSubscribed(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) (subscribed bool)

IsSubscribed checks if the given handler is subscribed to the collection.

func (*GlobCollectionsMap[T]) PutValueInCollection

func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])

PutValueInCollection creates the glob collection if it was not already created, and puts the given value in it.

func (*GlobCollectionsMap[T]) RemoveValueFromCollection

func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])

RemoveValueFromCollection removes the given value from the collection. If the collection becomes empty as a result, it is removed from the map.

func (*GlobCollectionsMap[T]) Subscribe

func (gcm *GlobCollectionsMap[T]) Subscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T])

Subscribe creates or gets the corresponding collection for the given URL using createOrModifyCollection, then invokes globCollection.subscribe with the given handler.

func (*GlobCollectionsMap[T]) Unsubscribe

func (gcm *GlobCollectionsMap[T]) Unsubscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T])

Unsubscribe invokes globCollection.unsubscribe on the collection for the given URL, if it exists. If, as a result, the collection becomes empty, it invokes deleteCollectionIfEmpty.

type Priority

type Priority int

type ResourceMap

type ResourceMap[K comparable, T any] struct {
	// contains filtered or unexported fields
}

ResourceMap is a typed extension of a sync.Map which allows for fine-grained control over the creations and deletions of entries. It is meant to imitate Java's compute/computeIfPresent/computeIfAbsent methods on ConcurrentHashMap as sync.Map does not natively provide these constructs. It deliberately does not expose bare Get or Put methods as its concurrency model is based on the assumption that access to the backing values must be strictly synchronized. Instead, all operations should be executed through the various compute methods.

func (*ResourceMap[K, T]) Compute

func (m *ResourceMap[K, T]) Compute(
	key K,
	newValue func(key K) T,
	compute func(key K, value T),
)

Compute ensures that an entry for the given name exists in the map before executing the given compute function. If the entry was already in the map, it has the same semantics as ComputeIfPresent. Otherwise, it uses the given newValue constructor to create a new value before executing the given compute method, and no ComputeIfPresent will run until both the newValue constructor and the compute function complete. This is to prevent subsequent ComputeIfPresent operations from reading a partially initialized value.

func (*ResourceMap[K, T]) ComputeIfPresent

func (m *ResourceMap[K, T]) ComputeIfPresent(key K, compute func(key K, value T)) bool

ComputeIfPresent executes the given compute function if the entry is present in the map. There can be multiple executions of ComputeIfPresent in flight at the same time for the same entry.

func (*ResourceMap[K, T]) DeleteIf

func (m *ResourceMap[K, T]) DeleteIf(key K, condition func(key K, value T) bool)

DeleteIf loads the entry from the map if it still exists, then executes the given condition function with the value. If the condition returns true, the entry is deleted from the map, otherwise nothing happens. It is guaranteed that the condition function will only be executed once any in-flight ComputeIfPresent operations for that entry complete. Conversely, once the in-flight operations complete, no new ComputeIfPresent operations will be started for that entry until the condition has been checked. If the entry was deleted, any ComputeIfPresent operations queued for that entry while the condition was being checked will be abandoned. No two executions of DeleteIf can execute in parallel for the same entry.

func (*ResourceMap[K, T]) Keys

func (m *ResourceMap[K, T]) Keys(f func(key K) bool)

Keys iterates through all the keys in the map. It does not expose the actual value as all operations on the values should be executed through Compute, ComputeIfPresent and DeleteIf.

type SubscriberSet

type SubscriberSet[T proto.Message] struct {
	// contains filtered or unexported fields
}

SubscriberSet is a concurrency-safe data structure that stores a set of unique subscribers. It is specifically designed to support wildcard and glob subscriptions such that they can be shared by multiple watchableValues instead of requiring each WatchableValue to store each subscriber. After subscribing to a given value, the SubscriptionHandler is supposed to be notified of the current value immediately, which usually simply means reading WatchableValue.currentValue and notifying the handler. However, it is possible that the notification loop for the WatchableValue is already running, and it could result in a double notification. To avoid this, this data structure introduces a notion of versioning. This way, the notification loop can record which version it is about to iterate over (in WatchableValue.lastSeenSubscriberSetVersions) such that subscribers can determine whether the loop will notify them and avoid the double notification. This is done by recording the version returned by SubscriberSet.Subscribe and checking whether it's equal to or smaller than the version in WatchableValue.lastSeenSubscriberSetVersions.

The implementation uses a sync.Map to store and iterate over the subscribers. In this case it's impossible to use a normal map since the subscriber set will be iterated over frequently. However, sync.Map provides no guarantees about what happens if the map is modified while another goroutine is iterating over the entries. Specifically, if an entry is added during the iteration, the iterator may or may not actually yield the new entry, which means the iterator may yield an entry that was added _after_ Iterator was invoked, violating the Iterator contract that it will only yield entries that were added before. To get around this, the returned iterator simply records the version at which it was initially created, and drops entries that have a greater version, making it always consistent.

func (*SubscriberSet[T]) IsSubscribed

func (m *SubscriberSet[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool

IsSubscribed checks whether the given handler is subscribed to this set.

func (*SubscriberSet[T]) Iterator

Iterator returns an iterator over the SubscriberSet. The returned associated version can be used by subscribers to check whether they are present in the iterator. For convenience, returns an empty iterator and invalid version if the receiver is nil.

func (*SubscriberSet[T]) Size

func (m *SubscriberSet[T]) Size() int

Size returns the number of subscribers in the set. For convenience, returns 0 if the receiver is nil.

func (*SubscriberSet[T]) Subscribe

func (m *SubscriberSet[T]) Subscribe(handler ads.SubscriptionHandler[T]) (subscribedAt time.Time, id SubscriberSetVersion)

Subscribe registers the given SubscriptionHandler as a subscriber and returns the time and version at which the subscription was processed. The returned version can be compared against the version returned by Iterator to check whether the given handler is present in the iterator.

func (*SubscriberSet[T]) Unsubscribe

func (m *SubscriberSet[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)

Unsubscribe removes the given handler from the set, and returns whether the set is now empty as a result of this unsubscription.

type SubscriberSetIterator

type SubscriberSetIterator[T proto.Message] iter.Seq2[ads.SubscriptionHandler[T], time.Time]

type SubscriberSetVersion

type SubscriberSetVersion uint64

SubscriberSetVersion is a monotonically increasing counter that tracks how many times subscribers have been added to a given SubscriberSet. This means a subscriber can check whether they are in a SubscriberSet by storing the version returned by SubscriberSet.Subscribe and comparing it against the version returned by SubscriberSet.Iterator.

type WatchableValue

type WatchableValue[T proto.Message] struct {

	// SubscriberSets is holds all the async.SubscriberSet instances relevant to this WatchableValue.
	SubscriberSets [subscriptionTypes]*SubscriberSet[T]
	// contains filtered or unexported fields
}

func NewValue

func NewValue[T proto.Message](
	name string,
	prioritySlots int,
) *WatchableValue[T]

func (*WatchableValue[T]) Clear

func (v *WatchableValue[T]) Clear(p Priority, clearedAt time.Time) (isFullClear bool)

func (*WatchableValue[T]) IsSubscribed

func (v *WatchableValue[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool

func (*WatchableValue[T]) NotifyHandlerAfterSubscription

func (v *WatchableValue[T]) NotifyHandlerAfterSubscription(
	handler ads.SubscriptionHandler[T],
	subType subscriptionType,
	subscribedAt time.Time,
	version SubscriberSetVersion,
)

NotifyHandlerAfterSubscription should be invoked by subscribers after subscribing to the corresponding SubscriberSet. This function is guaranteed to only return once the handler has been notified of the current value, since the xDS protocol spec explicitly states that an explicit subscription to an entry must always be respected by sending the current value: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#subscribing-to-resources

A resource_names_subscribe field may contain resource names that the server believes the client is
already subscribed to, and furthermore has the most recent versions of. However, the server must
still provide those resources in the response; due to implementation details hidden from the
server, the client may have "forgotten" those resources despite apparently remaining subscribed.

func (*WatchableValue[T]) Read

func (v *WatchableValue[T]) Read() *ads.Resource[T]

func (*WatchableValue[T]) Set

func (v *WatchableValue[T]) Set(p Priority, r *ads.Resource[T], modifiedAt time.Time)

Set updates resource to the given version and value and notifies all subscribers of the new value. It is invalid to invoke this method with a nil resource.

func (*WatchableValue[T]) Subscribe

func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T])

func (*WatchableValue[T]) Unsubscribe

func (v *WatchableValue[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL