kvstore

package
v1.17.0-pre.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: Apache-2.0 Imports: 49 Imported by: 68

Documentation

Overview

Package kvstore abstracts KVstore access and provides a high level API to atomically manage cluster wide resources

SPDX-License-Identifier: Apache-2.0 Copyright Authors of Cilium

Index

Constants

View Source
const (
	// EtcdBackendName is the backend name for etcd
	EtcdBackendName = "etcd"

	EtcdAddrOption               = "etcd.address"
	EtcdOptionConfig             = "etcd.config"
	EtcdOptionKeepAliveHeartbeat = "etcd.keepaliveHeartbeat"
	EtcdOptionKeepAliveTimeout   = "etcd.keepaliveTimeout"

	// EtcdRateLimitOption specifies maximum kv operations per second
	EtcdRateLimitOption = "etcd.qps"

	// EtcdBootstrapRateLimitOption specifies maximum kv operations per second
	// during bootstrapping
	EtcdBootstrapRateLimitOption = "etcd.bootstrapQps"

	// EtcdMaxInflightOption specifies maximum inflight concurrent kv store operations
	EtcdMaxInflightOption = "etcd.maxInflight"

	// EtcdListLimitOption limits the number of results retrieved in one batch
	// by ListAndWatch operations. A 0 value equals to no limit.
	EtcdListLimitOption = "etcd.limit"
)
View Source
const (
	// BaseKeyPrefix is the base prefix that should be used for all keys
	BaseKeyPrefix = "cilium"

	// StatePrefix is the kvstore prefix used to store the Cilium's state.
	StatePrefix = BaseKeyPrefix + "/state"

	// CachePrefix is the kvstore prefix used to store the information retrieved
	// from a remote cluster and cached locally by KVStoreMesh.
	CachePrefix = BaseKeyPrefix + "/cache"

	// InitLockPath is the path to the init lock to test quorum
	InitLockPath = BaseKeyPrefix + "/.initlock"

	// HeartbeatPath is the path to the key at which the operator updates
	// the heartbeat
	HeartbeatPath = BaseKeyPrefix + "/.heartbeat"

	// ClusterConfigPrefix is the kvstore prefix to cluster configuration
	ClusterConfigPrefix = BaseKeyPrefix + "/cluster-config"

	// SyncedPrefix is the kvstore prefix used to convey whether
	// synchronization from an external source has completed for a given prefix
	SyncedPrefix = BaseKeyPrefix + "/synced"

	// HeartbeatWriteInterval is the interval in which the heartbeat key at
	// HeartbeatPath is updated
	HeartbeatWriteInterval = time.Minute
)
View Source
const (

	// FieldUser identifies a user in the kvstore
	FieldUser = logfields.User

	// FieldRole identifies a role in the kvstore
	FieldRole = "role"
)

Variables

View Source
var Cell = cell.Module(
	"kvstore-client",
	"KVStore Client",

	cell.Config(defaultConfig),

	cell.Provide(func(lc cell.Lifecycle, shutdowner hive.Shutdowner, cfg config, opts *ExtraOptions) promise.Promise[BackendOperations] {
		resolver, promise := promise.New[BackendOperations]()
		if cfg.KVStore == "" {
			log.Info("Skipping connection to kvstore, as not configured")
			resolver.Reject(errors.New("kvstore not configured"))
			return promise
		}

		option.Config.KVStore = cfg.KVStore
		option.Config.KVStoreOpt = cfg.KVStoreOpt
		option.Config.KVstoreConnectivityTimeout = cfg.KVStoreConnectivityTimeout
		option.Config.KVstoreLeaseTTL = cfg.KVStoreLeaseTTL
		option.Config.KVstorePeriodicSync = cfg.KVStorePeriodicSync
		option.Config.KVstoreMaxConsecutiveQuorumErrors = cfg.KVstoreMaxConsecutiveQuorumErrors

		ctx, cancel := context.WithCancel(context.Background())
		var wg sync.WaitGroup

		lc.Append(cell.Hook{
			OnStart: func(cell.HookContext) error {
				wg.Add(1)
				go func() {
					defer wg.Done()

					log := log.WithField(logfields.BackendName, cfg.KVStore)
					log.Info("Establishing connection to kvstore")
					backend, errCh := NewClient(ctx, cfg.KVStore, cfg.KVStoreOpt, opts)

					if err, isErr := <-errCh; isErr {
						log.WithError(err).Error("Failed to establish connection to kvstore")
						resolver.Reject(fmt.Errorf("failed connecting to kvstore: %w", err))
						shutdowner.Shutdown(hive.ShutdownWithError(err))
						return
					}

					log.Info("Connection to kvstore successfully established")
					resolver.Resolve(backend)
				}()
				return nil
			},
			OnStop: func(cell.HookContext) error {
				cancel()
				wg.Wait()

				return nil
			},
		})

		return promise
	}),
)

Cell returns a cell which provides a promise for the global kvstore client.

View Source
var ErrLockLeaseExpired = errors.New("transaction did not succeed: lock lease expired")

ErrLockLeaseExpired is an error whenever the lease of the lock does not exist or it was expired.

View Source
var GlobalUserMgmtClientPromiseCell = cell.Module(
	"global-kvstore-users-client",
	"Global KVStore Users Management Client Promise",

	cell.Provide(func(lc cell.Lifecycle, backendPromise promise.Promise[BackendOperations]) promise.Promise[BackendOperationsUserMgmt] {
		resolver, promise := promise.New[BackendOperationsUserMgmt]()
		ctx, cancel := context.WithCancel(context.Background())
		var wg sync.WaitGroup

		lc.Append(cell.Hook{
			OnStart: func(cell.HookContext) error {
				wg.Add(1)
				go func() {
					backend, err := backendPromise.Await(ctx)
					if err != nil {
						resolver.Reject(err)
					} else {
						resolver.Resolve(backend)
					}
					wg.Done()
				}()
				return nil
			},
			OnStop: func(cell.HookContext) error {
				cancel()
				wg.Wait()
				return nil
			},
		})

		return promise
	}),
)

GlobalUserMgmtClientPromiseCell provides a promise returning the global kvstore client to perform users management operations, once it has been initialized.

Functions

func Connected added in v0.15.7

func Connected() <-chan struct{}

Connected returns a channel which is closed when the following conditions are being met at the same time: * The kvstore client is configured * Connectivity to the kvstore has been established * The kvstore has quorum

The channel will *not* be closed if the kvstore client is closed before connectivity or quorum has been achieved. It will wait until a new kvstore client is configured to again wait for connectivity and quorum.

func EnableTracing added in v0.15.7

func EnableTracing()

EnableTracing enables kvstore tracing

func EtcdDbg added in v1.13.17

func EtcdDbg(ctx context.Context, cfgfile string, dialer EtcdDbgDialer, w io.Writer)

EtcdDbg performs a set of sanity checks concerning the connection to the given etcd cluster, and outputs the result in a user-friendly format.

func EtcdDummyAddress added in v0.15.7

func EtcdDummyAddress() string

func GetScopeFromKey added in v0.15.7

func GetScopeFromKey(key string) string

func Hint added in v0.15.7

func Hint(err error) error

Hint tries to improve the error message displayed to te user.

func RunLockGC added in v0.15.7

func RunLockGC()

RunLockGC inspects all local kvstore locks to determine whether they have been held longer than the stale lock timeout, and if so, unlocks them forceably.

func Setup added in v0.10.0

func Setup(ctx context.Context, selectedBackend string, opts map[string]string, goOpts *ExtraOptions) error

Setup sets up the key-value store specified in kvStore and configures it with the options provided in opts

func SetupDummy added in v0.10.0

func SetupDummy(tb testing.TB, dummyBackend string)

SetupDummy sets up kvstore for tests. A lock mechanism it used to prevent the creation of two clients at the same time, to avoid interferences in case different tests are run in parallel. A cleanup function is automatically registered to delete all keys and close the client when the test terminates.

func SetupDummyWithConfigOpts added in v0.15.7

func SetupDummyWithConfigOpts(tb testing.TB, dummyBackend string, opts map[string]string)

SetupDummyWithConfigOpts sets up the dummy kvstore for tests but also configures the module with the provided opts. A lock mechanism it used to prevent the creation of two clients at the same time, to avoid interferences in case different tests are run in parallel. A cleanup function is automatically registered to delete all keys and close the client when the test terminates.

func StateToCachePrefix added in v0.15.7

func StateToCachePrefix(prefix string) string

StateToCachePrefix converts a kvstore prefix starting with "cilium/state" (holding the cilium state) to the corresponding one holding cached information from another kvstore (that is, "cilium/cache").

func Trace added in v0.15.7

func Trace(msg string, err error, fields logrus.Fields)

Trace is used to trace kvstore debug messages

Types

type BackendOperations added in v0.15.7

type BackendOperations interface {
	// Connected returns a channel which is closed whenever the kvstore client
	// is connected to the kvstore server.
	Connected(ctx context.Context) <-chan error

	// Disconnected returns a channel which is closed whenever the kvstore
	// client is not connected to the kvstore server. (Only implemented for etcd)
	Disconnected() <-chan struct{}

	// Status returns the status of the kvstore client
	Status() *models.Status

	// StatusCheckErrors returns a channel which receives status check
	// errors
	StatusCheckErrors() <-chan error

	// LockPath locks the provided path
	LockPath(ctx context.Context, path string) (KVLocker, error)

	// Get returns value of key
	Get(ctx context.Context, key string) ([]byte, error)

	// GetIfLocked returns value of key if the client is still holding the given lock.
	GetIfLocked(ctx context.Context, key string, lock KVLocker) ([]byte, error)

	// Delete deletes a key. It does not return an error if the key does not exist.
	Delete(ctx context.Context, key string) error

	// DeleteIfLocked deletes a key if the client is still holding the given lock. It does not return an error if the key does not exist.
	DeleteIfLocked(ctx context.Context, key string, lock KVLocker) error

	DeletePrefix(ctx context.Context, path string) error

	// Update creates or updates a key.
	Update(ctx context.Context, key string, value []byte, lease bool) error

	// UpdateIfLocked updates a key if the client is still holding the given lock.
	UpdateIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) error

	// UpdateIfDifferent updates a key if the value is different
	UpdateIfDifferent(ctx context.Context, key string, value []byte, lease bool) (bool, error)

	// UpdateIfDifferentIfLocked updates a key if the value is different and if the client is still holding the given lock.
	UpdateIfDifferentIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) (bool, error)

	// CreateOnly atomically creates a key or fails if it already exists
	CreateOnly(ctx context.Context, key string, value []byte, lease bool) (bool, error)

	// CreateOnlyIfLocked atomically creates a key if the client is still holding the given lock or fails if it already exists
	CreateOnlyIfLocked(ctx context.Context, key string, value []byte, lease bool, lock KVLocker) (bool, error)

	// ListPrefix returns a list of keys matching the prefix
	ListPrefix(ctx context.Context, prefix string) (KeyValuePairs, error)

	// ListPrefixIfLocked returns a list of keys matching the prefix only if the client is still holding the given lock.
	ListPrefixIfLocked(ctx context.Context, prefix string, lock KVLocker) (KeyValuePairs, error)

	// Close closes the kvstore client
	Close()

	// ListAndWatch creates a new watcher which will watch the specified
	// prefix for changes. Before doing this, it will list the current keys
	// matching the prefix and report them as new keys. The Events channel is
	// created with the specified sizes. Upon every change observed, a
	// KeyValueEvent will be sent to the Events channel
	ListAndWatch(ctx context.Context, prefix string, chanSize int) *Watcher

	// RegisterLeaseExpiredObserver registers a function which is executed when
	// the lease associated with a key having the given prefix is detected as expired.
	// If the function is nil, the previous observer (if any) is unregistered.
	RegisterLeaseExpiredObserver(prefix string, fn func(key string))

	BackendOperationsUserMgmt
}

BackendOperations are the individual kvstore operations that each backend must implement. Direct use of this interface is possible but will bypass the tracing layer.

func Client added in v0.10.0

func Client() BackendOperations

Client returns the global kvstore, blocking until it has been configured

func NewClient added in v0.15.7

func NewClient(ctx context.Context, selectedBackend string, opts map[string]string, options *ExtraOptions) (BackendOperations, chan error)

NewClient returns a new kvstore client based on the configuration

type BackendOperationsUserMgmt added in v0.15.7

type BackendOperationsUserMgmt interface {
	// UserEnforcePresence creates a user in the kvstore if not already present, and grants the specified roles.
	UserEnforcePresence(ctx context.Context, name string, roles []string) error

	// UserEnforcePresence deletes a user from the kvstore, if present.
	UserEnforceAbsence(ctx context.Context, name string) error
}

BackendOperationsUserMgmt are the kvstore operations for users management.

type ClusterSizeDependantIntervalFunc added in v0.15.7

type ClusterSizeDependantIntervalFunc func(baseInterval time.Duration) time.Duration

type DefaultEtcdDbgDialer added in v1.13.17

type DefaultEtcdDbgDialer struct{}

DefaultEtcdDbgDialer provides a default implementation of the EtcdDbgDialer interface.

func (DefaultEtcdDbgDialer) DialContext added in v1.13.17

func (DefaultEtcdDbgDialer) DialContext(ctx context.Context, addr string) (net.Conn, error)

func (DefaultEtcdDbgDialer) LookupIP added in v1.13.17

func (DefaultEtcdDbgDialer) LookupIP(ctx context.Context, hostname string) ([]net.IP, error)

type EtcdDbgDialer added in v1.13.17

type EtcdDbgDialer interface {
	LookupIP(ctx context.Context, hostname string) ([]net.IP, error)
	DialContext(ctx context.Context, addr string) (net.Conn, error)
}

EtcdDbgDialer enables to override the LookupIP and DialContext functions, e.g., to support service name to IP address resolution when CoreDNS is not the configured DNS server --- for pods running in the host network namespace.

type EventChan added in v0.15.7

type EventChan chan KeyValueEvent

EventChan is a channel to receive events on

type EventType added in v0.15.7

type EventType int

EventType defines the type of watch event that occurred

const (
	// EventTypeCreate represents a newly created key
	EventTypeCreate EventType = iota
	// EventTypeModify represents a modified key
	EventTypeModify
	// EventTypeDelete represents a deleted key
	EventTypeDelete
	//EventTypeListDone signals that the initial list operation has completed
	EventTypeListDone
)

func (EventType) String added in v0.15.7

func (t EventType) String() string

String() returns the human readable format of an event type

type ExtraOptions added in v0.15.7

type ExtraOptions struct {
	DialOption []grpc.DialOption

	// ClusterSizeDependantInterval defines the function to calculate
	// intervals based on cluster size
	ClusterSizeDependantInterval ClusterSizeDependantIntervalFunc

	// NoLockQuorumCheck disables the lock acquisition quorum check
	NoLockQuorumCheck bool

	// ClusterName is the name of each etcd cluster
	ClusterName string

	// BootstrapComplete is an optional channel that can be provided to signal
	// to the client that bootstrap is complete. If provided, the client will
	// have an initial rate limit equal to etcd.bootstrapQps and be updated to
	// etcd.qps after this channel is closed.
	BootstrapComplete <-chan struct{}

	// NoEndpointStatusChecks disables the status checks for the endpoints
	NoEndpointStatusChecks bool
}

ExtraOptions represents any options that can not be represented in a textual format and need to be set programmatically.

func (*ExtraOptions) StatusCheckInterval added in v0.15.7

func (e *ExtraOptions) StatusCheckInterval(allConnected bool) time.Duration

StatusCheckInterval returns the interval of status checks depending on the cluster size and the current connectivity state

nodes OK Failing 1 20s 3s 4 45s 7s 8 1m05s 11s 32 1m45s 18s 128 2m25s 24s 512 3m07s 32s 2048 3m46s 38s 8192 4m30s 45s

type KVLocker

type KVLocker interface {
	Unlock(ctx context.Context) error
	// Comparator returns an object that should be used by the KVStore to make
	// sure if the lock is still valid for its client or nil if no such
	// verification exists.
	Comparator() interface{}
}

type KeyValueEvent added in v0.15.7

type KeyValueEvent struct {
	// Typ is the type of event { EventTypeCreate | EventTypeModify | EventTypeDelete | EventTypeListDone }
	Typ EventType

	// Key is the kvstore key that changed
	Key string

	// Value is the kvstore value associated with the key
	Value []byte
}

KeyValueEvent is a change event for a Key/Value pair

type KeyValuePairs added in v0.15.7

type KeyValuePairs map[string]Value

KeyValuePairs is a map of key=value pairs

type Lock added in v0.15.7

type Lock struct {
	// contains filtered or unexported fields
}

Lock is a lock return by LockPath

func LockPath added in v0.15.7

func LockPath(ctx context.Context, backend BackendOperations, path string) (l *Lock, err error)

LockPath locks the specified path. The key for the lock is not the path provided itself but the path with a suffix of ".lock" appended. The lock returned also contains a patch specific local Mutex which will be held.

It is required to call Unlock() on the returned Lock to unlock

func (*Lock) Comparator added in v0.15.7

func (l *Lock) Comparator() interface{}

func (*Lock) Unlock added in v0.15.7

func (l *Lock) Unlock(ctx context.Context) error

Unlock unlocks a lock

type Value added in v0.15.7

type Value struct {
	Data        []byte
	ModRevision uint64
	LeaseID     int64
}

Value is an abstraction of the data stored in the kvstore as well as the mod revision of that data.

type Watcher added in v0.15.7

type Watcher struct {
	// Events is the channel to which change notifications will be sent to
	Events EventChan `json:"-"`

	Prefix string `json:"prefix"`
	// contains filtered or unexported fields
}

Watcher represents a KVstore watcher

func (*Watcher) Stop added in v0.15.7

func (w *Watcher) Stop()

Stop stops a watcher previously created and started with Watch()

Directories

Path Synopsis
Package allocator provides a kvstore based ID allocator
Package allocator provides a kvstore based ID allocator
doublewrite
SPDX-License-Identifier: Apache-2.0 Copyright Authors of Cilium
SPDX-License-Identifier: Apache-2.0 Copyright Authors of Cilium
Package store implements a shared store backed by a kvstore or similar with the following properties:
Package store implements a shared store backed by a kvstore or similar with the following properties:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL