Documentation ¶
Overview ¶
Package allocator provides a kvstore based ID allocator
Index ¶
- type Allocator
- func (a *Allocator) Allocate(ctx context.Context, key AllocatorKey) (idpool.ID, bool, bool, error)
- func (a *Allocator) Delete()
- func (a *Allocator) DeleteAllKeys()
- func (a *Allocator) ForeachCache(cb RangeFunc)
- func (a *Allocator) Get(ctx context.Context, key AllocatorKey) (idpool.ID, error)
- func (a *Allocator) GetByID(ctx context.Context, id idpool.ID) (AllocatorKey, error)
- func (a *Allocator) GetByIDIncludeRemoteCaches(ctx context.Context, id idpool.ID) (AllocatorKey, error)
- func (a *Allocator) GetEvents() AllocatorEventSendChan
- func (a *Allocator) GetIfLocked(ctx context.Context, key AllocatorKey, lock kvstore.KVLocker) (idpool.ID, error)
- func (a *Allocator) GetIncludeRemoteCaches(ctx context.Context, key AllocatorKey) (idpool.ID, error)
- func (a *Allocator) GetNoCache(ctx context.Context, key AllocatorKey) (idpool.ID, error)
- func (a *Allocator) NewRemoteCache(remoteName string, remoteAlloc *Allocator) *RemoteCache
- func (a *Allocator) Observe(ctx context.Context, next func(AllocatorChange), complete func(error))
- func (a *Allocator) Release(ctx context.Context, key AllocatorKey) (lastUse bool, err error)
- func (a *Allocator) RemoveRemoteKVStore(remoteName string)
- func (a *Allocator) RunGC(rateLimit *rate.Limiter, staleKeysPrevRound map[string]uint64) (map[string]uint64, *GCStats, error)
- func (a *Allocator) RunLocksGC(ctx context.Context, staleLocksPrevRound map[string]kvstore.Value) (map[string]kvstore.Value, error)
- func (a *Allocator) WaitForInitialSync(ctx context.Context) error
- func (a *Allocator) WatchRemoteKVStore(ctx context.Context, rc *RemoteCache, onSync func(context.Context))
- type AllocatorChange
- type AllocatorChangeKind
- type AllocatorEvent
- type AllocatorEventChan
- type AllocatorEventRecvChan
- type AllocatorEventSendChan
- type AllocatorKey
- type AllocatorOption
- func WithBackend(backend Backend) AllocatorOption
- func WithCacheValidator(validator CacheValidator) AllocatorOption
- func WithEvents(events AllocatorEventSendChan) AllocatorOption
- func WithMasterKeyProtection() AllocatorOption
- func WithMax(id idpool.ID) AllocatorOption
- func WithMin(id idpool.ID) AllocatorOption
- func WithPrefixMask(mask idpool.ID) AllocatorOption
- func WithoutAutostart() AllocatorOption
- func WithoutGC() AllocatorOption
- type Backend
- type CacheMutations
- type CacheValidator
- type GCStats
- type RangeFunc
- type RemoteCache
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Allocator ¶
type Allocator struct {
// contains filtered or unexported fields
}
Allocator is a distributed ID allocator backed by a KVstore. It maps arbitrary keys to identifiers. Multiple users on different cluster nodes can in parallel request the ID for keys and are guaranteed to retrieve the same ID for an identical key.
While the details of how keys are stored is delegated to Backend implementations, some expectations exist. See pkg/kvstore/allocator for details about the kvstore implementation.
A node takes a reference to an identity when it is in-use on that node, and the identity remains in-use if there is any node reference to it. When an identity no longer has any node references, it may be garbage collected. No guarantees are made at that point and the numeric identity may be reused. Note that the numeric IDs are selected locally and verified with the Backend.
Lookup ID by key:
- Return ID from local cache updated by watcher (no Backend interactions)
- Do ListPrefix() on slave key excluding node suffix, return the first result that matches the exact prefix.
Lookup key by ID:
- Return key from local cache updated by watcher (no Backend interactions)
- Do Get() on master key, return result
Allocate:
- Check local key cache, increment, and return if key is already in use locally (no Backend interactions)
- Check local cache updated by watcher, if...
... match found:
2.1 Create a new slave key. This operation is potentially racy as the master key can be removed in the meantime. - etcd: Create is made conditional on existence of master key - consul: locking
... match not found:
2.1 Select new unused id from local cache 2.2 Create a new master key with the condition that it may not exist 2.3 Create a new slave key
1.1. If found, increment and return (no Backend interactions) 2. Lookup ID by key in local cache or via first slave key found in Backend
Release:
- Reduce local reference count until last use (no Backend interactions)
- Delete slave key (basePath/value/key1/node1) This automatically guarantees that when the last node has released the key, the key is no longer found by Get()
- If the node goes down, all slave keys of that node are removed after the TTL expires (auto release).
func NewAllocator ¶
func NewAllocator(typ AllocatorKey, backend Backend, opts ...AllocatorOption) (*Allocator, error)
NewAllocator creates a new Allocator. Any type can be used as key as long as the type implements the AllocatorKey interface. A variable of the type has to be passed into NewAllocator() to make the type known. The specified base path is used to prefix all keys in the kvstore. The provided path must be unique.
The allocator can be configured by passing in additional options:
- WithEvents() - enable Events channel
- WithMin(id) - minimum ID to allocate (default: 1)
- WithMax(id) - maximum ID to allocate (default max(uint64))
After creation, IDs can be allocated with Allocate() and released with Release()
func NewAllocatorForGC ¶
func NewAllocatorForGC(backend Backend, opts ...AllocatorOption) *Allocator
NewAllocatorForGC returns an allocator that can be used to run RunGC()
The allocator can be configured by passing in additional options:
- WithMin(id) - minimum ID to allocate (default: 1)
- WithMax(id) - maximum ID to allocate (default max(uint64))
func (*Allocator) Allocate ¶
Allocate will retrieve the ID for the provided key. If no ID has been allocated for this key yet, a key will be allocated. If allocation fails, most likely due to a parallel allocation of the same ID by another user, allocation is re-attempted for maxAllocAttempts times.
Return values:
- allocated ID
- whether the ID is newly allocated from kvstore
- whether this is the first owner that holds a reference to the key in localkeys store
- error in case of failure
func (*Allocator) Delete ¶
func (a *Allocator) Delete()
Delete deletes an allocator and stops the garbage collector
func (*Allocator) DeleteAllKeys ¶
func (a *Allocator) DeleteAllKeys()
DeleteAllKeys will delete all keys. It is expected to be used in tests.
func (*Allocator) ForeachCache ¶
ForeachCache iterates over the allocator cache and calls RangeFunc on each cached entry
func (*Allocator) Get ¶
Get returns the ID which is allocated to a key. Returns an ID of NoID if no ID has been allocated to this key yet.
func (*Allocator) GetByID ¶
GetByID returns the key associated with an ID. Returns nil if no key is associated with the ID.
func (*Allocator) GetByIDIncludeRemoteCaches ¶
func (a *Allocator) GetByIDIncludeRemoteCaches(ctx context.Context, id idpool.ID) (AllocatorKey, error)
GetByIDIncludeRemoteCaches returns the key associated with an ID. Includes the caches of watched remote kvstores in the query. Returns nil if no key is associated with the ID.
func (*Allocator) GetEvents ¶
func (a *Allocator) GetEvents() AllocatorEventSendChan
GetEvents returns the events channel given to the allocator when constructed. Note: This channel is not owned by the allocator!
func (*Allocator) GetIfLocked ¶
func (a *Allocator) GetIfLocked(ctx context.Context, key AllocatorKey, lock kvstore.KVLocker) (idpool.ID, error)
GetIfLocked returns the ID which is allocated to a key. Returns an ID of NoID if no ID has been allocated to this key yet if the client is still holding the given lock.
func (*Allocator) GetIncludeRemoteCaches ¶
func (a *Allocator) GetIncludeRemoteCaches(ctx context.Context, key AllocatorKey) (idpool.ID, error)
GetIncludeRemoteCaches returns the ID which is allocated to a key. Includes the caches of watched remote kvstores in the query. Returns an ID of NoID if no ID has been allocated in any remote kvstore to this key yet.
func (*Allocator) GetNoCache ¶
GetNoCache returns the ID which is allocated to a key in the kvstore, bypassing the local copy of allocated keys.
func (*Allocator) NewRemoteCache ¶
func (a *Allocator) NewRemoteCache(remoteName string, remoteAlloc *Allocator) *RemoteCache
func (*Allocator) Observe ¶
func (a *Allocator) Observe(ctx context.Context, next func(AllocatorChange), complete func(error))
Observe the identity changes. Conforms to stream.Observable. Replays the current state of the cache when subscribing.
func (*Allocator) Release ¶
Release releases the use of an ID associated with the provided key. After the last user has released the ID, the key is removed in the KVstore and the returned lastUse value is true.
func (*Allocator) RemoveRemoteKVStore ¶
RemoveRemoteKVStore removes any reference to a remote allocator / kvstore, emitting a deletion event for all previously known identities.
func (*Allocator) RunGC ¶
func (a *Allocator) RunGC(rateLimit *rate.Limiter, staleKeysPrevRound map[string]uint64) (map[string]uint64, *GCStats, error)
RunGC scans the kvstore for unused master keys and removes them
func (*Allocator) RunLocksGC ¶
func (a *Allocator) RunLocksGC(ctx context.Context, staleLocksPrevRound map[string]kvstore.Value) (map[string]kvstore.Value, error)
RunLocksGC scans the kvstore for stale locks and removes them
func (*Allocator) WaitForInitialSync ¶
WaitForInitialSync waits until the initial sync is complete
func (*Allocator) WatchRemoteKVStore ¶
func (a *Allocator) WatchRemoteKVStore(ctx context.Context, rc *RemoteCache, onSync func(context.Context))
WatchRemoteKVStore starts watching an allocator base prefix the kvstore represents by the provided backend. A local cache of all identities of that kvstore will be maintained in the RemoteCache structure returned and will start being reported in the identities returned by the ForeachCache() function. RemoteName should be unique per logical "remote".
type AllocatorChange ¶
type AllocatorChange struct { Kind AllocatorChangeKind ID idpool.ID Key AllocatorKey }
type AllocatorChangeKind ¶
type AllocatorChangeKind string
const ( AllocatorChangeSync AllocatorChangeKind = "sync" AllocatorChangeUpsert AllocatorChangeKind = "upsert" AllocatorChangeDelete AllocatorChangeKind = "delete" )
type AllocatorEvent ¶
type AllocatorEvent struct { // Typ is the type of event (upsert / delete) Typ AllocatorChangeKind // ID is the allocated ID ID idpool.ID // Key is the key associated with the ID Key AllocatorKey }
AllocatorEvent is an event sent over AllocatorEventChan
type AllocatorEventChan ¶
type AllocatorEventChan chan AllocatorEvent
AllocatorEventChan is a channel to receive allocator events on
type AllocatorEventRecvChan ¶
type AllocatorEventRecvChan = <-chan AllocatorEvent
Send- and receive-only versions of the above.
type AllocatorEventSendChan ¶
type AllocatorEventSendChan = chan<- AllocatorEvent
type AllocatorKey ¶
type AllocatorKey interface { fmt.Stringer // GetKey returns the canonical string representation of the key GetKey() string // PutKey stores the information in v into the key. This is the inverse // operation to GetKey PutKey(v string) AllocatorKey // GetAsMap returns the key as a collection of "labels" with a key and value. // This is the inverse operation to PutKeyFromMap. GetAsMap() map[string]string // PutKeyFromMap stores the labels in v into the key to be used later. This // is the inverse operation to GetAsMap. PutKeyFromMap(v map[string]string) AllocatorKey // PutValue puts metadata inside the global identity for the given 'key' with // the given 'value'. PutValue(key any, value any) AllocatorKey // Value returns the value stored in the metadata map. Value(key any) any }
AllocatorKey is the interface to implement in order for a type to be used as key for the allocator. The key's data is assumed to be a collection of pkg/label.Label, and the functions reflect this somewhat.
type AllocatorOption ¶
type AllocatorOption func(*Allocator)
AllocatorOption is the base type for allocator options
func WithBackend ¶
func WithBackend(backend Backend) AllocatorOption
WithBackend sets this allocator to use backend. It is expected to be used at initialization.
func WithCacheValidator ¶ added in v1.16.0
func WithCacheValidator(validator CacheValidator) AllocatorOption
WithCacheValidator registers a validator triggered for each identity notification event to filter out invalid IDs and keys.
func WithEvents ¶
func WithEvents(events AllocatorEventSendChan) AllocatorOption
WithEvents enables receiving of events.
CAUTION: When using this function. The provided channel must be continuously read while NewAllocator() is being called to ensure that the channel does not block indefinitely while NewAllocator() emits events on it while populating the initial cache.
func WithMasterKeyProtection ¶
func WithMasterKeyProtection() AllocatorOption
WithMasterKeyProtection will watch for delete events on master keys and re-created them if local usage suggests that the key is still in use
func WithMax ¶
func WithMax(id idpool.ID) AllocatorOption
WithMax sets the maximum identifier to be allocated
func WithMin ¶
func WithMin(id idpool.ID) AllocatorOption
WithMin sets the minimum identifier to be allocated
func WithPrefixMask ¶
func WithPrefixMask(mask idpool.ID) AllocatorOption
WithPrefixMask sets the prefix used for all ID allocations. If set, the mask will be ORed to all selected IDs prior to allocation. It is the responsibility of the caller to ensure that the mask is not conflicting with min..max.
func WithoutAutostart ¶
func WithoutAutostart() AllocatorOption
WithoutAutostart prevents starting the allocator when it is initialized
func WithoutGC ¶
func WithoutGC() AllocatorOption
WithoutGC disables the use of the garbage collector
type Backend ¶
type Backend interface { // DeleteAllKeys will delete all keys. It is used in tests. DeleteAllKeys(ctx context.Context) // DeleteID deletes the identity with the given ID DeleteID(ctx context.Context, id idpool.ID) error // Encode encodes a key string as required to conform to the key // restrictions of the backend Encode(string) string // AllocateID creates a new key->ID association. This is expected to be a // create-only operation, and the ID may be allocated by another node. An // error in that case is not expected to be fatal. The actual ID is obtained // by Allocator from the local idPool, which is updated with used-IDs as the // Backend makes calls to the handler in ListAndWatch. // The implementation of the backend might return an AllocatorKey that is // a copy of 'key' with an internal reference of the backend key or, if it // doesn't use the internal reference of the backend key it simply returns // 'key'. In case of an error the returned 'AllocatorKey' should be nil. AllocateID(ctx context.Context, id idpool.ID, key AllocatorKey) (AllocatorKey, error) // AllocateIDIfLocked behaves like AllocateID but when lock is non-nil the // operation proceeds only if it is still valid. // The implementation of the backend might return an AllocatorKey that is // a copy of 'key' with an internal reference of the backend key or, if it // doesn't use the internal reference of the backend key it simply returns // 'key'. In case of an error the returned 'AllocatorKey' should be nil. AllocateIDIfLocked(ctx context.Context, id idpool.ID, key AllocatorKey, lock kvstore.KVLocker) (AllocatorKey, error) // AcquireReference records that this node is using this key->ID mapping. // This is distinct from any reference counting within this agent; only one // reference exists for this node for any number of managed endpoints using // it. // The semantics of cleaning up stale references is delegated to the Backend // implementation. RunGC may need to be invoked. // This can race, and so lock can be provided (via a Lock call, below). AcquireReference(ctx context.Context, id idpool.ID, key AllocatorKey, lock kvstore.KVLocker) error // Release releases the use of an ID associated with the provided key. It // does not guard against concurrent calls to // releases.Release(ctx context.Context, key AllocatorKey) (err error) Release(ctx context.Context, id idpool.ID, key AllocatorKey) (err error) // UpdateKey refreshes the record that this node is using this key -> id // mapping. When reliablyMissing is set it will also recreate missing master or // slave keys. UpdateKey(ctx context.Context, id idpool.ID, key AllocatorKey, reliablyMissing bool) error // UpdateKeyIfLocked behaves like UpdateKey but when lock is non-nil the operation proceeds only if it is still valid. UpdateKeyIfLocked(ctx context.Context, id idpool.ID, key AllocatorKey, reliablyMissing bool, lock kvstore.KVLocker) error // Get returns the allocated ID for this key as seen by the Backend. This may // have been created by other agents. Get(ctx context.Context, key AllocatorKey) (idpool.ID, error) // GetIfLocked behaves like Get, but but when lock is non-nil the // operation proceeds only if it is still valid. GetIfLocked(ctx context.Context, key AllocatorKey, lock kvstore.KVLocker) (idpool.ID, error) // GetByID returns the key associated with this ID, as seen by the Backend. // This may have been created by other agents. GetByID(ctx context.Context, id idpool.ID) (AllocatorKey, error) // Lock provides an opaque lock object that can be used, later, to ensure // that the key has not changed since the lock was created. This can be done // with GetIfLocked. Lock(ctx context.Context, key AllocatorKey) (kvstore.KVLocker, error) // ListIDs returns the IDs of all identities currently stored in the backend ListIDs(ctx context.Context) (identityIDs []idpool.ID, err error) // ListAndWatch begins synchronizing the local Backend instance with its // remote. ListAndWatch(ctx context.Context, handler CacheMutations, stopChan chan struct{}) // RunGC reaps stale or unused identities within the Backend and makes them // available for reuse. It is used by the cilium-operator and is not invoked // by cilium-agent. // Note: not all Backend implemenations rely on this, such as the kvstore // backends, and may use leases to expire keys. RunGC(ctx context.Context, rateLimit *rate.Limiter, staleKeysPrevRound map[string]uint64, minID idpool.ID, maxID idpool.ID) (map[string]uint64, *GCStats, error) // RunLocksGC reaps stale or unused locks within the Backend. It is used by // the cilium-operator and is not invoked by cilium-agent. Returns // a map of locks currently being held in the KVStore including the ones // that failed to be GCed. // Note: not all Backend implementations rely on this, such as the kvstore // backends, and may use leases to expire keys. RunLocksGC(ctx context.Context, staleKeysPrevRound map[string]kvstore.Value) (map[string]kvstore.Value, error) // Status returns a human-readable status of the Backend. Status() (string, error) }
Backend represents clients to remote ID allocation systems, such as KV Stores. These are used to coordinate key->ID allocation between cilium nodes.
type CacheMutations ¶
type CacheMutations interface { // OnListDone is called when the initial full-sync is complete. OnListDone() // OnUpsert is called when either a new key->ID mapping appears or an existing // one is modified. The latter case may occur e.g., when leases are updated, // and does not mean that the actual mapping had changed. OnUpsert(id idpool.ID, key AllocatorKey) // OnDelete is called when a key->ID mapping is removed. This may trigger // master-key protection, if enabled, where the local allocator will recreate // the key->ID association is recreated because the local node is still using // it. OnDelete(id idpool.ID, key AllocatorKey) }
CacheMutations are the operations given to a Backend's ListAndWatch command. They are called on changes to identities.
type CacheValidator ¶ added in v1.16.0
type CacheValidator func(kind AllocatorChangeKind, id idpool.ID, key AllocatorKey) error
CacheValidator is the type of the validation functions triggered to filter out invalid notification events.
type RangeFunc ¶
type RangeFunc func(idpool.ID, AllocatorKey)
RangeFunc is the function called by RangeCache
type RemoteCache ¶
type RemoteCache struct {
// contains filtered or unexported fields
}
RemoteCache represents the cache content of an additional kvstore managing identities. The contents are not directly accessible but will be merged into the ForeachCache() function.
func (*RemoteCache) NumEntries ¶
func (rc *RemoteCache) NumEntries() int
NumEntries returns the number of entries in the remote cache
func (*RemoteCache) Synced ¶
func (rc *RemoteCache) Synced() bool
Synced returns whether the initial list of entries has been retrieved from the kvstore, and new events are currently being watched.