Documentation ¶
Overview ¶
Package kvstore abstracts KVstore access and provides a high level API to atomically manage cluster wide resources
SPDX-License-Identifier: Apache-2.0 Copyright Authors of Cilium
Index ¶
- Constants
- Variables
- func Connected() <-chan struct{}
- func ConsulDummyAddress() string
- func ConsulDummyConfigFile() string
- func EnableTracing()
- func EtcdDummyAddress() string
- func GetScopeFromKey(key string) string
- func Hint(err error) error
- func IsEtcdOperator(selectedBackend string, opts map[string]string, k8sNamespace string) (string, bool)
- func RunLockGC()
- func Setup(ctx context.Context, selectedBackend string, opts map[string]string, ...) error
- func SetupDummy(tb testing.TB, dummyBackend string)
- func SetupDummyWithConfigOpts(tb testing.TB, dummyBackend string, opts map[string]string)
- func SplitK8sServiceURL(address string) (string, string, error)
- func StateToCachePrefix(prefix string) string
- func Trace(format string, err error, fields logrus.Fields, a ...interface{})
- type BackendOperations
- type BackendOperationsUserMgmt
- type Capabilities
- type ClusterSizeDependantIntervalFunc
- type ConsulLocker
- type EventChan
- type EventType
- type ExtraOptions
- type KVLocker
- type KeyValueEvent
- type KeyValuePairs
- type Lock
- type Value
- type Watcher
Constants ¶
const ( // ConsulAddrOption is the string representing the key mapping to the value of the // address for Consul. ConsulAddrOption = "consul.address" ConsulOptionConfig = "consul.tlsconfig" )
const ( // EtcdBackendName is the backend name for etcd EtcdBackendName = "etcd" EtcdAddrOption = "etcd.address" EtcdOptionConfig = "etcd.config" EtcdOptionKeepAliveHeartbeat = "etcd.keepaliveHeartbeat" EtcdOptionKeepAliveTimeout = "etcd.keepaliveTimeout" // EtcdRateLimitOption specifies maximum kv operations per second EtcdRateLimitOption = "etcd.qps" // EtcdMaxInflightOption specifies maximum inflight concurrent kv store operations EtcdMaxInflightOption = "etcd.maxInflight" // EtcdListLimitOption limits the number of results retrieved in one batch // by ListAndWatch operations. A 0 value equals to no limit. EtcdListLimitOption = "etcd.limit" )
const ( // CapabilityCreateIfExists is true if CreateIfExists is functional CapabilityCreateIfExists Capabilities = 1 << 0 // CapabilityDeleteOnZeroCount is true if DeleteOnZeroCount is functional CapabilityDeleteOnZeroCount Capabilities = 1 << 1 // BaseKeyPrefix is the base prefix that should be used for all keys BaseKeyPrefix = "cilium" // InitLockPath is the path to the init lock to test quorum InitLockPath = BaseKeyPrefix + "/.initlock" // HeartbeatPath is the path to the key at which the operator updates // the heartbeat HeartbeatPath = BaseKeyPrefix + "/.heartbeat" // HasClusterConfigPath is the path to the key used to convey that the cluster // configuration will be eventually created, and remote cilium agents shall // wait until it is present. If this key is not set, the cilium configuration // might, or might not, be configured, but the agents will continue regardless, // falling back to the backward compatible behavior. It must be set before that // the agents have the possibility to connect to the kvstore (that is, when // it is not yet exposed). The corresponding values is ignored. HasClusterConfigPath = BaseKeyPrefix + "/.has-cluster-config" // ClusterConfigPrefix is the kvstore prefix to cluster configuration ClusterConfigPrefix = BaseKeyPrefix + "/cluster-config" // SyncedPrefix is the kvstore prefix used to convey whether // synchronization from an external source has completed for a given prefix SyncedPrefix = BaseKeyPrefix + "/synced" // HeartbeatWriteInterval is the interval in which the heartbeat key at // HeartbeatPath is updated HeartbeatWriteInterval = time.Minute )
const ( // FieldUser identifies a user in the kvstore FieldUser = logfields.User // FieldRole identifies a role in the kvstore FieldRole = "role" )
const (
// OperationalPath is the base path to store the operational details in the kvstore.
OperationalPath = "cilium-net/operational"
)
Variables ¶
var Cell = func(defaultBackend string) cell.Cell { return cell.Module( "kvstore-client", "KVStore Client", cell.Config(config{ KVStore: defaultBackend, KVStoreConnectivityTimeout: defaults.KVstoreConnectivityTimeout, KVStoreLeaseTTL: defaults.KVstoreLeaseTTL, KVStorePeriodicSync: defaults.KVstorePeriodicSync, }), cell.Provide(func(lc cell.Lifecycle, shutdowner hive.Shutdowner, cfg config, opts *ExtraOptions) promise.Promise[BackendOperations] { resolver, promise := promise.New[BackendOperations]() if cfg.KVStore == "" { log.Info("Skipping connection to kvstore, as not configured") resolver.Reject(errors.New("kvstore not configured")) return promise } option.Config.KVStore = cfg.KVStore option.Config.KVStoreOpt = cfg.KVStoreOpt option.Config.KVstoreConnectivityTimeout = cfg.KVStoreConnectivityTimeout option.Config.KVstoreLeaseTTL = cfg.KVStoreLeaseTTL option.Config.KVstorePeriodicSync = cfg.KVStorePeriodicSync ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup lc.Append(cell.Hook{ OnStart: func(cell.HookContext) error { wg.Add(1) go func() { defer wg.Done() log := log.WithField(logfields.BackendName, cfg.KVStore) log.Info("Establishing connection to kvstore") backend, errCh := NewClient(ctx, cfg.KVStore, cfg.KVStoreOpt, opts) if err, isErr := <-errCh; isErr { log.WithError(err).Error("Failed to establish connection to kvstore") resolver.Reject(fmt.Errorf("failed connecting to kvstore: %w", err)) shutdowner.Shutdown(hive.ShutdownWithError(err)) return } log.Info("Connection to kvstore successfully established") resolver.Resolve(backend) }() return nil }, OnStop: func(cell.HookContext) error { cancel() wg.Wait() return nil }, }) return promise }), ) }
Cell returns a cell which provides a promise for the global kvstore client. The parameter allows to customize the default backend, which can be either set to a specific value (e.g., in the case of clustermesh-apiserver) or left unset.
var ( // ErrLockLeaseExpired is an error whenever the lease of the lock does not // exist or it was expired. ErrLockLeaseExpired = errors.New("transaction did not succeed: lock lease expired") )
var ( // ErrNotImplemented is the error which is returned when a functionality is not implemented. ErrNotImplemented = errors.New("not implemented") )
var GlobalUserMgmtClientPromiseCell = cell.Module( "global-kvstore-users-client", "Global KVStore Users Management Client Promise", cell.Provide(func(lc cell.Lifecycle, backendPromise promise.Promise[BackendOperations]) promise.Promise[BackendOperationsUserMgmt] { resolver, promise := promise.New[BackendOperationsUserMgmt]() ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup lc.Append(cell.Hook{ OnStart: func(cell.HookContext) error { wg.Add(1) go func() { backend, err := backendPromise.Await(ctx) if err != nil { resolver.Reject(err) } else { resolver.Resolve(backend) } wg.Done() }() return nil }, OnStop: func(cell.HookContext) error { cancel() wg.Wait() return nil }, }) return promise }), )
GlobalUserMgmtClientPromiseCell provides a promise returning the global kvstore client to perform users management operations, once it has been initialized.
Functions ¶
func Connected ¶ added in v0.15.7
func Connected() <-chan struct{}
Connected returns a channel which is closed when the following conditions are being met at the same time: * The kvstore client is configured * Connectivity to the kvstore has been established * The kvstore has quorum
The channel will *not* be closed if the kvstore client is closed before connectivity or quorum has been achieved. It will wait until a new kvstore client is configured to again wait for connectivity and quorum.
func ConsulDummyAddress ¶ added in v0.15.7
func ConsulDummyAddress() string
func ConsulDummyConfigFile ¶ added in v0.15.7
func ConsulDummyConfigFile() string
func EtcdDummyAddress ¶ added in v0.15.7
func EtcdDummyAddress() string
func GetScopeFromKey ¶ added in v0.15.7
func IsEtcdOperator ¶ added in v0.15.7
func IsEtcdOperator(selectedBackend string, opts map[string]string, k8sNamespace string) (string, bool)
IsEtcdOperator returns the service name if the configuration is setting up an etcd-operator. If the configuration explicitly states it is configured to connect to an etcd operator, e.g. with etcd.operator=true, the returned service name is the first found within the configuration specified.
func RunLockGC ¶ added in v0.15.7
func RunLockGC()
RunLockGC inspects all local kvstore locks to determine whether they have been held longer than the stale lock timeout, and if so, unlocks them forceably.
func Setup ¶ added in v0.10.0
func Setup(ctx context.Context, selectedBackend string, opts map[string]string, goOpts *ExtraOptions) error
Setup sets up the key-value store specified in kvStore and configures it with the options provided in opts
func SetupDummy ¶ added in v0.10.0
SetupDummy sets up kvstore for tests. A lock mechanism it used to prevent the creation of two clients at the same time, to avoid interferences in case different tests are run in parallel. A cleanup function is automatically registered to delete all keys and close the client when the test terminates.
func SetupDummyWithConfigOpts ¶ added in v0.15.7
SetupDummyWithConfigOpts sets up the dummy kvstore for tests but also configures the module with the provided opts. A lock mechanism it used to prevent the creation of two clients at the same time, to avoid interferences in case different tests are run in parallel. A cleanup function is automatically registered to delete all keys and close the client when the test terminates.
func SplitK8sServiceURL ¶ added in v0.15.7
SplitK8sServiceURL returns the service name and namespace for the given address. If the given address is not parseable or it is not the format '<protocol>://><name>.<namespace>[optional]', returns an error.
func StateToCachePrefix ¶ added in v0.15.7
StateToCachePrefix converts a kvstore prefix starting with "cilium/state" (holding the cilium state) to the corresponding one holding cached information from another kvstore (that is, "cilium/cache").
Types ¶
type BackendOperations ¶ added in v0.15.7
type BackendOperations interface { // Connected returns a channel which is closed whenever the kvstore client // is connected to the kvstore server. Connected(ctx context.Context) <-chan error // Disconnected returns a channel which is closed whenever the kvstore // client is not connected to the kvstore server. (Only implemented for etcd) Disconnected() <-chan struct{} // Status returns the status of the kvstore client including an // eventual error Status() (string, error) // StatusCheckErrors returns a channel which receives status check // errors StatusCheckErrors() <-chan error // LockPath locks the provided path LockPath(ctx context.Context, path string) (KVLocker, error) // Get returns value of key Get(ctx context.Context, key string) ([]byte, error) // GetIfLocked returns value of key if the client is still holding the given lock. GetIfLocked(ctx context.Context, key string, lock KVLocker) ([]byte, error) // GetPrefix returns the first key which matches the prefix and its value GetPrefix(ctx context.Context, prefix string) (string, []byte, error) // GetPrefixIfLocked returns the first key which matches the prefix and its value if the client is still holding the given lock. GetPrefixIfLocked(ctx context.Context, prefix string, lock KVLocker) (string, []byte, error) // Set sets value of key Set(ctx context.Context, key string, value []byte) error // Delete deletes a key. It does not return an error if the key does not exist. Delete(ctx context.Context, key string) error // DeleteIfLocked deletes a key if the client is still holding the given lock. It does not return an error if the key does not exist. DeleteIfLocked(ctx context.Context, key string, lock KVLocker) error DeletePrefix(ctx context.Context, path string) error // Update creates or updates a key. Update(ctx context.Context, key string, value []byte, lease bool) error // UpdateIfLocked updates a key if the client is still holding the given lock. UpdateIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) error // UpdateIfDifferent updates a key if the value is different UpdateIfDifferent(ctx context.Context, key string, value []byte, lease bool) (bool, error) // UpdateIfDifferentIfLocked updates a key if the value is different and if the client is still holding the given lock. UpdateIfDifferentIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) (bool, error) // CreateOnly atomically creates a key or fails if it already exists CreateOnly(ctx context.Context, key string, value []byte, lease bool) (bool, error) // CreateOnlyIfLocked atomically creates a key if the client is still holding the given lock or fails if it already exists CreateOnlyIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) (bool, error) // CreateIfExists creates a key with the value only if key condKey exists CreateIfExists(ctx context.Context, condKey, key string, value []byte, lease bool) error // ListPrefix returns a list of keys matching the prefix ListPrefix(ctx context.Context, prefix string) (KeyValuePairs, error) // ListPrefixIfLocked returns a list of keys matching the prefix only if the client is still holding the given lock. ListPrefixIfLocked(ctx context.Context, prefix string, lock KVLocker) (KeyValuePairs, error) // Watch starts watching for changes in a prefix. If list is true, the // current keys matching the prefix will be listed and reported as new // keys first. Watch(ctx context.Context, w *Watcher) // Close closes the kvstore client Close(ctx context.Context) // GetCapabilities returns the capabilities of the backend GetCapabilities() Capabilities // Encodes a binary slice into a character set that the backend // supports Encode(in []byte) string // Decodes a key previously encoded back into the original binary slice Decode(in string) ([]byte, error) // ListAndWatch creates a new watcher which will watch the specified // prefix for changes. Before doing this, it will list the current keys // matching the prefix and report them as new keys. The Events channel is // created with the specified sizes. Upon every change observed, a // KeyValueEvent will be sent to the Events channel ListAndWatch(ctx context.Context, prefix string, chanSize int) *Watcher // RegisterLeaseExpiredObserver registers a function which is executed when // the lease associated with a key having the given prefix is detected as expired. // If the function is nil, the previous observer (if any) is unregistered. RegisterLeaseExpiredObserver(prefix string, fn func(key string)) BackendOperationsUserMgmt }
BackendOperations are the individual kvstore operations that each backend must implement. Direct use of this interface is possible but will bypass the tracing layer.
func Client ¶ added in v0.10.0
func Client() BackendOperations
Client returns the global kvstore, blocking until it has been configured
func NewClient ¶ added in v0.15.7
func NewClient(ctx context.Context, selectedBackend string, opts map[string]string, options *ExtraOptions) (BackendOperations, chan error)
NewClient returns a new kvstore client based on the configuration
type BackendOperationsUserMgmt ¶ added in v0.15.7
type BackendOperationsUserMgmt interface { // UserEnforcePresence creates a user in the kvstore if not already present, and grants the specified roles. UserEnforcePresence(ctx context.Context, name string, roles []string) error // UserEnforcePresence deletes a user from the kvstore, if present. UserEnforceAbsence(ctx context.Context, name string) error }
BackendOperationsUserMgmt are the kvstore operations for users management.
type Capabilities ¶ added in v0.15.7
type Capabilities uint32
Capabilities is a bitmask to indicate the capabilities of a backend
type ClusterSizeDependantIntervalFunc ¶ added in v0.15.7
type ConsulLocker ¶ added in v0.15.7
func (*ConsulLocker) Comparator ¶ added in v0.15.7
func (cl *ConsulLocker) Comparator() interface{}
type EventChan ¶ added in v0.15.7
type EventChan chan KeyValueEvent
EventChan is a channel to receive events on
type EventType ¶ added in v0.15.7
type EventType int
EventType defines the type of watch event that occurred
const ( // EventTypeCreate represents a newly created key EventTypeCreate EventType = iota // EventTypeModify represents a modified key EventTypeModify // EventTypeDelete represents a deleted key EventTypeDelete //EventTypeListDone signals that the initial list operation has completed EventTypeListDone )
type ExtraOptions ¶ added in v0.15.7
type ExtraOptions struct { DialOption []grpc.DialOption // ClusterSizeDependantInterval defines the function to calculate // intervals based on cluster size ClusterSizeDependantInterval ClusterSizeDependantIntervalFunc // NoLockQuorumCheck disables the lock acquisition quorum check NoLockQuorumCheck bool // ClusterName is the name of each etcd cluster ClusterName string }
ExtraOptions represents any options that can not be represented in a textual format and need to be set programmatically.
func (*ExtraOptions) StatusCheckInterval ¶ added in v0.15.7
func (e *ExtraOptions) StatusCheckInterval(allConnected bool) time.Duration
StatusCheckInterval returns the interval of status checks depending on the cluster size and the current connectivity state
nodes OK Failing 1 20s 3s 4 45s 7s 8 1m05s 11s 32 1m45s 18s 128 2m25s 24s 512 3m07s 32s 2048 3m46s 38s 8192 4m30s 45s
type KeyValueEvent ¶ added in v0.15.7
type KeyValueEvent struct { // Typ is the type of event { EventTypeCreate | EventTypeModify | EventTypeDelete | EventTypeListDone } Typ EventType // Key is the kvstore key that changed Key string // Value is the kvstore value associated with the key Value []byte }
KeyValueEvent is a change event for a Key/Value pair
type KeyValuePairs ¶ added in v0.15.7
KeyValuePairs is a map of key=value pairs
type Lock ¶ added in v0.15.7
type Lock struct {
// contains filtered or unexported fields
}
Lock is a lock return by LockPath
func LockPath ¶ added in v0.15.7
LockPath locks the specified path. The key for the lock is not the path provided itself but the path with a suffix of ".lock" appended. The lock returned also contains a patch specific local Mutex which will be held.
It is required to call Unlock() on the returned Lock to unlock
func (*Lock) Comparator ¶ added in v0.15.7
func (l *Lock) Comparator() interface{}
type Value ¶ added in v0.15.7
Value is an abstraction of the data stored in the kvstore as well as the mod revision of that data.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package allocator provides a kvstore based ID allocator
|
Package allocator provides a kvstore based ID allocator |
Package store implements a shared store backed by a kvstore or similar with the following properties:
|
Package store implements a shared store backed by a kvstore or similar with the following properties: |