Documentation ¶
Overview ¶
Package store implements a shared store backed by a kvstore or similar with the following properties:
- A single type is used to represent all keys
- Any number of collaborators can join the store. Typically a collaborator is an individual Cilium agent running on each node.
- All collaborators can own and contribute keys to the store. Each key is owned by exactly one collaborator. It is the responsibility of each collaborator to pick a key name which is guaranteed to be unique.
- All collaborate desire to see all keys within the scope of a store. The scope of the store is defined by a common key prefix. For this purpose, each collaborator maintains a local cache of all keys in the store by subscribing to change events.
Index ¶
- type Configuration
- type KVPair
- type Key
- type KeyCreator
- type LocalKey
- type NamedKey
- type Observer
- type RWSOpt
- type SharedStore
- func (s *SharedStore) Close(ctx context.Context)
- func (s *SharedStore) DeleteLocalKey(ctx context.Context, key NamedKey)
- func (s *SharedStore) NumEntries() int
- func (s *SharedStore) Release()
- func (s *SharedStore) SharedKeysMap() map[string]Key
- func (s *SharedStore) UpdateKeySync(ctx context.Context, key LocalKey, lease bool) error
- func (s *SharedStore) UpdateLocalKeySync(ctx context.Context, key LocalKey) error
- type SyncStore
- type SyncStoreBackend
- type WSMFunc
- type WSSOpt
- type WatchStore
- type WatchStoreBackend
- type WatchStoreManager
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Configuration ¶
type Configuration struct { // Prefix is the key prefix of the store shared by all keys. The prefix // is the unique identification of the store. Multiple collaborators // connected to the same kvstore cluster configuring stores with // matching prefixes will automatically form a shared store. This // parameter is required. Prefix string // SynchronizationInterval is the interval in which locally owned keys // are synchronized with the kvstore. This parameter is optional. SynchronizationInterval time.Duration // handled. This parameter is optional, and defaults to 0 if unset. SharedKeyDeleteDelay time.Duration // KeyCreator is called to allocate a Key instance when a new shared // key is discovered. This parameter is required. KeyCreator KeyCreator // Backend is the kvstore to use as a backend. If no backend is // specified, kvstore.Client() is being used. Backend kvstore.BackendOperations // Observer is the observe that will receive events on key mutations Observer Observer Context context.Context }
Configuration is the set of configuration parameters of a shared store.
type KVPair ¶
type KVPair struct{ Key, Value string }
KVPair represents a basic implementation of the LocalKey interface
func (*KVPair) DeepKeyCopy ¶
func (*KVPair) GetKeyName ¶
type Key ¶
type Key interface { NamedKey // Marshal is called to retrieve the byte slice representation of the // data represented by the key to store it in the kvstore. The function // must ensure that the underlying datatype is properly locked. It is // typically a good idea to use json.Marshal to implement this // function. Marshal() ([]byte, error) // Unmarshal is called when an update from the kvstore is received. The // prefix configured for the store is removed from the key, and the // byte slice passed to the function is coming from the Marshal // function from another collaborator. The function must unmarshal and // update the underlying data type. It is typically a good idea to use // json.Unmarshal to implement this function. Unmarshal(key string, data []byte) error }
Key is the interface that a data structure must implement in order to be stored and shared as a key in a SharedStore.
func KVPairCreator ¶
func KVPairCreator() Key
type KeyCreator ¶
type KeyCreator func() Key
KeyCreator is the function to create a new empty Key instances. Store collaborators must implement this interface and provide the implementation in the Configuration structure.
type LocalKey ¶
type LocalKey interface { Key // DeepKeyCopy must return a deep copy of the key DeepKeyCopy() LocalKey }
LocalKey is a Key owned by the local store instance
type NamedKey ¶
type NamedKey interface { // GetKeyName must return the name of the key. The name of the key must // be unique within the store and stable for a particular key. The name // of the key must be identical across agent restarts as the keys // remain in the kvstore. GetKeyName() string }
NamedKey is an interface that a data structure must implement in order to be deleted from a SharedStore.
type Observer ¶
type Observer interface { // OnDelete is called when the key has been deleted from the shared store OnDelete(k NamedKey) // OnUpdate is called whenever a change has occurred in the data // structure represented by the key OnUpdate(k Key) }
Observer receives events when objects in the store mutate
type RWSOpt ¶
type RWSOpt func(*restartableWatchStore)
func RWSWithEntriesMetric ¶
func RWSWithEntriesMetric(gauge prometheus.Gauge) RWSOpt
WSWithEntriesGauge registers a Prometheus gauge metric that is kept in sync with the number of entries synchronized from the kvstore.
func RWSWithOnSyncCallback ¶
WSWithOnSyncCallback registers a function to be executed after listing all keys from the kvstore for the first time. Multiple callback functions can be registered.
type SharedStore ¶
type SharedStore struct {
// contains filtered or unexported fields
}
SharedStore is an instance of a shared store. It is created with JoinSharedStore() and released with the SharedStore.Close() function.
func JoinSharedStore ¶
func JoinSharedStore(c Configuration) (*SharedStore, error)
JoinSharedStore creates a new shared store based on the provided configuration. An error is returned if the configuration is invalid. The store is initialized with the contents of the kvstore. An error is returned if the contents cannot be retrieved synchronously from the kvstore. Starts a controller to continuously synchronize the store with the kvstore.
func (*SharedStore) Close ¶
func (s *SharedStore) Close(ctx context.Context)
Close stops participation with a shared store and removes all keys owned by this node in the kvstore. This stops the controller started by JoinSharedStore().
func (*SharedStore) DeleteLocalKey ¶
func (s *SharedStore) DeleteLocalKey(ctx context.Context, key NamedKey)
DeleteLocalKey removes a key from being synchronized with the kvstore
func (*SharedStore) NumEntries ¶
func (s *SharedStore) NumEntries() int
NumEntries returns the number of entries in the store
func (*SharedStore) Release ¶
func (s *SharedStore) Release()
Release frees all resources own by the store but leaves all keys in the kvstore intact
func (*SharedStore) SharedKeysMap ¶
func (s *SharedStore) SharedKeysMap() map[string]Key
SharedKeysMap returns a copy of the SharedKeysMap, the returned map can be safely modified but the values of the map represent the actual data stored in the internal SharedStore SharedKeys map.
func (*SharedStore) UpdateKeySync ¶
UpdateKeySync synchronously synchronizes a key with the kvstore.
func (*SharedStore) UpdateLocalKeySync ¶
func (s *SharedStore) UpdateLocalKeySync(ctx context.Context, key LocalKey) error
UpdateLocalKeySync synchronously synchronizes a local key with the kvstore and adds it to the list of local keys to be synchronized if the initial synchronous synchronization was successful
type SyncStore ¶
type SyncStore interface { // Run starts the SyncStore logic, blocking until the context is closed. Run(ctx context.Context) // UpsertKey upserts a key/value pair into the kvstore. UpsertKey(ctx context.Context, key Key) error // DeleteKey removes a key from the kvstore. DeleteKey(ctx context.Context, key NamedKey) error // Synced triggers the insertion of the "synced" key associated with this // store into the kvstore once all upsertions already issued have completed // successfully, eventually executing all specified callbacks (if any). // Only the first invocation takes effect. Synced(ctx context.Context, callbacks ...func(ctx context.Context)) error }
SyncStore abstracts the operations allowing to synchronize key/value pairs into a kvstore.
func NewWorkqueueSyncStore ¶ added in v1.14.0
func NewWorkqueueSyncStore(clusterName string, backend SyncStoreBackend, prefix string, opts ...WSSOpt) SyncStore
NewWorkqueueSyncStore returns a SyncStore instance which leverages a workqueue to coalescence update/delete requests and handle retries in case of errors.
type SyncStoreBackend ¶
type SyncStoreBackend interface { // Update creates or updates a key. Update(ctx context.Context, key string, value []byte, lease bool) error // Delete deletes a key. Delete(ctx context.Context, key string) error // RegisterLeaseExpiredObserver registers a function which is executed when // the lease associated with a key having the given prefix is detected as expired. RegisterLeaseExpiredObserver(prefix string, fn func(key string)) }
SyncStoreBackend represents the subset kvstore.BackendOperations leveraged by SyncStore implementations.
type WSSOpt ¶
type WSSOpt func(*wqSyncStore)
func WSSWithRateLimiter ¶
func WSSWithRateLimiter(limiter workqueue.RateLimiter) WSSOpt
WSSWithRateLimiter sets the rate limiting algorithm to be used when requeueing failed events.
func WSSWithSyncedKeyOverride ¶
WSSWithSyncedKeyOverride overrides the "synced" key inserted into the kvstore when initial synchronization completed (by default it corresponds to the prefix).
func WSSWithWorkers ¶
WSSWithWorkers configures the number of workers spawned by Run() to handle update/delete operations.
func WSSWithoutLease ¶
func WSSWithoutLease() WSSOpt
WSSWithoutLease disables attaching the lease to upserted keys.
type WatchStore ¶
type WatchStore interface { // Watch starts watching the specified kvstore prefix, blocking until the context is closed. // Depending on the implementation, it might be executed multiple times. Watch(ctx context.Context, backend WatchStoreBackend, prefix string) // NumEntries returns the number of entries synchronized from the store. NumEntries() uint64 // Drain emits a deletion event for each known key. It shall be called only // when no watch operation is in progress. Drain() }
WatchStore abstracts the operations allowing to synchronize key/value pairs from a kvstore, emitting the corresponding events.
func NewRestartableWatchStore ¶ added in v1.14.0
func NewRestartableWatchStore(clusterName string, keyCreator KeyCreator, observer Observer, opts ...RWSOpt) WatchStore
NewRestartableWatchStore returns a WatchStore instance which supports restarting the watch operation multiple times, automatically handling the emission of deletion events for all stale entries (if enabled). It shall be restarted only once the previous Watch execution terminated.
type WatchStoreBackend ¶
type WatchStoreBackend interface { // ListAndWatch creates a new watcher for the given prefix after listing the existing keys. ListAndWatch(ctx context.Context, name, prefix string, chanSize int) *kvstore.Watcher }
WatchStoreBackend represents the subset of kvstore.BackendOperations leveraged by WatchStore implementations.
type WatchStoreManager ¶
type WatchStoreManager interface { // Register registers a function associated with a given kvstore prefix. // It cannot be called once Run() has started. Register(prefix string, function WSMFunc) // Run starts the manager, blocking until the context is closed and all // started functions terminated. Run(ctx context.Context) }
WatchStoreManager enables to register a set of functions to be asynchronously executed when the corresponding kvstore prefixes are synchronized (based on the implementation).
func NewWatchStoreManagerImmediate ¶
func NewWatchStoreManagerImmediate(clusterName string) WatchStoreManager
NewWatchStoreManagerImmediate implements the WatchStoreManager interface, immediately starting the registered functions once Run() is executed.
func NewWatchStoreManagerSync ¶ added in v1.14.0
func NewWatchStoreManagerSync(backend WatchStoreBackend, clusterName string) WatchStoreManager
NewWatchStoreManagerSync implements the WatchStoreManager interface, starting the registered functions only once the corresponding prefix sync canary has been received. This ensures that the synchronization of the keys hosted under the given prefix have been successfully synchronized from the external source, even in case an ephemeral kvstore is used.