cache

package
v0.7.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2024 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Overview

Package cache implements lightweight Kubernetes cluster caching that stores only resource references and ownership references. In addition to references cache might be configured to store custom metadata and whole body of selected resources.

The library uses Kubernetes watch API to maintain cache up to date. This approach reduces number of Kubernetes API requests and provides instant access to the required Kubernetes resources.

Index

Examples

Constants

View Source
const (
	// RespectRbacDisabled default value for respectRbac
	RespectRbacDisabled = iota
	// RespectRbacNormal checks only api response for forbidden/unauthorized errors
	RespectRbacNormal
	// RespectRbacStrict checks both api response for forbidden/unauthorized errors and SelfSubjectAccessReview
	RespectRbacStrict
)
View Source
const (
	ClusterRetryTimeout = 10 * time.Second
)

Variables

This section is empty.

Functions

func ListRetryFuncAlways

func ListRetryFuncAlways(err error) bool

ListRetryFuncAlways always retries on errors

func ListRetryFuncNever

func ListRetryFuncNever(err error) bool

ListRetryFuncNever never retries on errors

func NewClusterCache

func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache

NewClusterCache creates new instance of cluster cache

Example (InspectNamespaceResources)
// kubernetes cluster config here
config := &rest.Config{}

clusterCache := NewClusterCache(config,
	// cache default namespace only
	SetNamespaces([]string{"default", "kube-system"}),
	// configure custom logic to cache resources manifest and additional metadata
	SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) {
		// if resource belongs to 'extensions' group then mark if with 'deprecated' label
		if un.GroupVersionKind().Group == "extensions" {
			info = []string{"deprecated"}
		}
		_, ok := un.GetLabels()["acme.io/my-label"]
		// cache whole manifest if resource has label
		cacheManifest = ok
		return
	}),
)
// Ensure cluster is synced before using it
if err := clusterCache.EnsureSynced(); err != nil {
	panic(err)
}
// Iterate default namespace resources tree
for _, root := range clusterCache.FindResources("default", TopLevelResource) {
	clusterCache.IterateHierarchy(root.ResourceKey(), func(resource *Resource, _ map[kube.ResourceKey]*Resource) bool {
		fmt.Printf("resource: %s, info: %v\n", resource.Ref.String(), resource.Info)
		return true
	})
}
Output:

Example (ResourceUpdatedEvents)
// kubernetes cluster config here
config := &rest.Config{}

clusterCache := NewClusterCache(config)
// Ensure cluster is synced before using it
if err := clusterCache.EnsureSynced(); err != nil {
	panic(err)
}
unsubscribe := clusterCache.OnResourceUpdated(func(newRes *Resource, oldRes *Resource, _ map[kube.ResourceKey]*Resource) {
	if newRes == nil {
		fmt.Printf("%s deleted\n", oldRes.Ref.String())
	} else if oldRes == nil {
		fmt.Printf("%s created\n", newRes.Ref.String())
	} else {
		fmt.Printf("%s updated\n", newRes.Ref.String())
	}
})
defer unsubscribe()
// observe resource modifications for 1 minute
time.Sleep(time.Minute)
Output:

func NewNoopSettings

func NewNoopSettings() *noopSettings

NewNoopSettings returns cache settings that has not health customizations and don't filter any resources

func ResourceOfGroupKind

func ResourceOfGroupKind(group string, kind string) func(r *Resource) bool

ResourceOfGroupKind returns predicate that matches resource by specified group and kind

func TopLevelResource

func TopLevelResource(r *Resource) bool

TopLevelResource returns true if resource has no parents

Types

type ClusterCache

type ClusterCache interface {
	// EnsureSynced checks cache state and synchronizes it if necessary
	EnsureSynced() error
	// GetServerVersion returns observed cluster version
	GetServerVersion() string
	// GetAPIResources returns information about observed API resources
	GetAPIResources() []kube.APIResourceInfo
	// GetOpenAPISchema returns open API schema of supported API resources
	GetOpenAPISchema() openapi.Resources
	// GetGVKParser returns a parser able to build a TypedValue used in
	// structured merge diffs.
	GetGVKParser() *managedfields.GvkParser
	// Invalidate cache and executes callback that optionally might update cache settings
	Invalidate(opts ...UpdateSettingsFunc)
	// FindResources returns resources that matches given list of predicates from specified namespace or everywhere if specified namespace is empty
	FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource
	// IterateHierarchy iterates resource tree starting from the specified top level resource and executes callback for each resource in the tree.
	// The action callback returns true if iteration should continue and false otherwise.
	IterateHierarchy(key kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool)
	// IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree.
	// The action callback returns true if iteration should continue and false otherwise.
	IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool)
	// IsNamespaced answers if specified group/kind is a namespaced resource API or not
	IsNamespaced(gk schema.GroupKind) (bool, error)
	// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
	// The function returns all resources from cache for those `isManaged` function returns true and resources
	// specified in targetObjs list.
	GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(r *Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error)
	// GetClusterInfo returns cluster cache statistics
	GetClusterInfo() ClusterInfo
	// OnResourceUpdated register event handler that is executed every time when resource get's updated in the cache
	OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
	// OnEvent register event handler that is executed every time when new K8S event received
	OnEvent(handler OnEventHandler) Unsubscribe
}

type ClusterInfo

type ClusterInfo struct {
	// Server holds cluster API server URL
	Server string
	// K8SVersion holds Kubernetes version
	K8SVersion string
	// ResourcesCount holds number of observed Kubernetes resources
	ResourcesCount int
	// APIsCount holds number of observed Kubernetes API count
	APIsCount int
	// LastCacheSyncTime holds time of most recent cache synchronization
	LastCacheSyncTime *time.Time
	// SyncError holds most recent cache synchronization error
	SyncError error
	// APIResources holds list of API resources supported by the cluster
	APIResources []kube.APIResourceInfo
}

ClusterInfo holds cluster cache stats

type ListRetryFunc

type ListRetryFunc func(err error) bool

type OnEventHandler

type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured)

OnEventHandler is a function that handles Kubernetes event

type OnPopulateResourceInfoHandler

type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool)

OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache

type OnResourceUpdatedHandler

type OnResourceUpdatedHandler func(newRes *Resource, oldRes *Resource, namespaceResources map[kube.ResourceKey]*Resource)

OnResourceUpdatedHandler handlers resource update event

type Resource

type Resource struct {
	// ResourceVersion holds most recent observed resource version
	ResourceVersion string
	// Resource reference
	Ref v1.ObjectReference
	// References to resource owners
	OwnerRefs []metav1.OwnerReference
	// Optional creation timestamp of the resource
	CreationTimestamp *metav1.Time
	// Optional additional information about the resource
	Info interface{}
	// Optional whole resource manifest
	Resource *unstructured.Unstructured
	// contains filtered or unexported fields
}

Resource holds the information about Kubernetes resource, ownership references and optional information

func (*Resource) ResourceKey

func (r *Resource) ResourceKey() kube.ResourceKey

type Settings

type Settings struct {
	// ResourceHealthOverride contains health assessment overrides
	ResourceHealthOverride health.HealthOverride
	// ResourcesFilter holds filter that excludes resources
	ResourcesFilter kube.ResourceFilter
}

Settings caching customizations

type Unsubscribe

type Unsubscribe func()

type UpdateSettingsFunc

type UpdateSettingsFunc func(cache *clusterCache)

func SetClusterResources

func SetClusterResources(val bool) UpdateSettingsFunc

SetClusterResources specifies if cluster level resource included or not. Flag is used only if cluster is changed to namespaced mode using SetNamespaces setting

func SetClusterSyncRetryTimeout

func SetClusterSyncRetryTimeout(timeout time.Duration) UpdateSettingsFunc

SetClusterSyncRetryTimeout updates cluster sync retry timeout when sync error happens

func SetConfig

func SetConfig(config *rest.Config) UpdateSettingsFunc

SetConfig updates cluster rest config

func SetKubectl

func SetKubectl(kubectl kube.Kubectl) UpdateSettingsFunc

SetKubectl allows to override kubectl wrapper implementation

func SetListPageBufferSize

func SetListPageBufferSize(listPageBufferSize int32) UpdateSettingsFunc

SetListPageBufferSize sets the number of pages to prefetch for list pager.

func SetListPageSize

func SetListPageSize(listPageSize int64) UpdateSettingsFunc

SetListPageSize sets the page size for list pager.

func SetListSemaphore

func SetListSemaphore(listSemaphore WeightedSemaphore) UpdateSettingsFunc

SetListSemaphore sets the semaphore for list operations. Taking an object rather than a number allows to share a semaphore among multiple caches if necessary.

func SetLogr

func SetLogr(log logr.Logger) UpdateSettingsFunc

SetLogr sets the logger to use.

func SetNamespaces

func SetNamespaces(namespaces []string) UpdateSettingsFunc

SetNamespaces updates list of monitored namespaces

func SetPopulateResourceInfoHandler

func SetPopulateResourceInfoHandler(handler OnPopulateResourceInfoHandler) UpdateSettingsFunc

SetPopulateResourceInfoHandler updates handler that populates resource info

func SetRespectRBAC

func SetRespectRBAC(respectRBAC int) UpdateSettingsFunc

SetRespectRBAC allows to set whether to respect the controller rbac in list/watches

func SetResyncTimeout

func SetResyncTimeout(timeout time.Duration) UpdateSettingsFunc

SetResyncTimeout updates cluster re-sync timeout

func SetRetryOptions

func SetRetryOptions(maxRetries int32, useBackoff bool, retryFunc ListRetryFunc) UpdateSettingsFunc

SetRetryOptions sets cluster list retry options

func SetSettings

func SetSettings(settings Settings) UpdateSettingsFunc

SetSettings updates caching settings

func SetTracer

func SetTracer(tracer tracing.Tracer) UpdateSettingsFunc

SetTracer sets the tracer to use.

func SetWatchResyncTimeout

func SetWatchResyncTimeout(timeout time.Duration) UpdateSettingsFunc

SetWatchResyncTimeout updates cluster re-sync timeout

type WeightedSemaphore

type WeightedSemaphore interface {
	Acquire(ctx context.Context, n int64) error
	TryAcquire(n int64) bool
	Release(n int64)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL