controller

package
v1.0.0-rc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: Apache-2.0 Imports: 74 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// NodeRegionLabel is the well-known label for kubernetes node region in beta
	NodeRegionLabel = v1.LabelFailureDomainBetaRegion
	// NodeZoneLabel is the well-known label for kubernetes node zone in beta
	NodeZoneLabel = v1.LabelFailureDomainBetaZone
	// NodeRegionLabelGA is the well-known label for kubernetes node region in ga
	NodeRegionLabelGA = v1.LabelTopologyRegion
	// NodeZoneLabelGA is the well-known label for kubernetes node zone in ga
	NodeZoneLabelGA = v1.LabelTopologyZone

	// DefaultNetworkGatewayPort is the port used by default for cross-network traffic if not otherwise specified
	// by meshNetworks or "networking.istio.io/gatewayPort"
	DefaultNetworkGatewayPort = 15443
)
View Source
const (
	// CACertNamespaceConfigMap is the name of the ConfigMap in each namespace storing the root cert of non-Kube CA.
	CACertNamespaceConfigMap = "istio-ca-root-cert"
)

Variables

View Source
var EndpointModeNames = map[EndpointMode]string{
	EndpointsOnly:     "EndpointsOnly",
	EndpointSliceOnly: "EndpointSliceOnly",
}
View Source
var SyncAllKinds = map[string]struct{}{
	"Services":      {},
	"Nodes":         {},
	"Pods":          {},
	"Endpoints":     {},
	"EndpointSlice": {},
}

Functions

func FindPort

func FindPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error)

Forked from Kubernetes k8s.io/kubernetes/pkg/api/v1/pod FindPort locates the container port for the given pod and portName. If the targetPort is a number, use that. If the targetPort is a string, look that string up in all named ports in all containers in the target pod. If no match is found, fail.

func GetPodCondition

func GetPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition)

func GetPodConditionFromList

func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition)

GetPodConditionFromList extracts the provided condition from the given list of condition and returns the index of the condition and the condition. Returns -1 and nil if the condition is not present.

func GetPodReadyCondition

func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition

func GetServiceImportIPs

func GetServiceImportIPs(si *unstructured.Unstructured) []string

GetServiceImportIPs returns the list of ClusterSet IPs for the ServiceImport. Exported for testing only.

func IsPodReady

func IsPodReady(pod *v1.Pod) bool

IsPodReady is copied from kubernetes/pkg/api/v1/pod/utils.go

func IsPodReadyConditionTrue

func IsPodReadyConditionTrue(status v1.PodStatus) bool

IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.

func NewFakeControllerWithOptions

func NewFakeControllerWithOptions(opts FakeControllerOptions) (*FakeController, *FakeXdsUpdater)

Types

type Controller

type Controller struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Controller is a collection of synchronized resource watchers Caches are thread-safe

func NewController

func NewController(kubeClient kubelib.Client, options Options) *Controller

NewController creates a new Kubernetes controller Created by bootstrap and multicluster (see multicluster.Controller).

func (*Controller) AppendServiceHandler

func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event))

AppendServiceHandler implements a service catalog operation

func (*Controller) AppendWorkloadHandler

func (c *Controller) AppendWorkloadHandler(f func(*model.WorkloadInstance, model.Event))

AppendWorkloadHandler implements a service catalog operation

func (*Controller) Cleanup

func (c *Controller) Cleanup() error

func (*Controller) Cluster

func (c *Controller) Cluster() cluster.ID

func (*Controller) GetIstioServiceAccounts

func (c *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string

GetIstioServiceAccounts returns the Istio service accounts running a service hostname. Each service account is encoded according to the SPIFFE VSID spec. For example, a service account named "bar" in namespace "foo" is encoded as "spiffe://cluster.local/ns/foo/sa/bar".

func (*Controller) GetProxyServiceInstances

func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) []*model.ServiceInstance

GetProxyServiceInstances returns service instances co-located with a given proxy TODO: this code does not return k8s service instances when the proxy's IP is a workload entry To tackle this, we need a ip2instance map like what we have in service entry.

func (*Controller) GetProxyWorkloadLabels

func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) labels.Instance

func (*Controller) GetService

func (c *Controller) GetService(hostname host.Name) *model.Service

GetService implements a service catalog operation by hostname specified.

func (*Controller) HasSynced

func (c *Controller) HasSynced() bool

HasSynced returns true after the initial state synchronization

func (*Controller) InstancesByPort

func (c *Controller) InstancesByPort(svc *model.Service, reqSvcPort int, labels labels.Instance) []*model.ServiceInstance

InstancesByPort implements a service catalog operation

func (*Controller) MCSServices

func (c *Controller) MCSServices() []model.MCSServiceInfo

func (*Controller) Network

func (c *Controller) Network(endpointIP string, labels labels.Instance) network.ID

func (*Controller) NetworkGateways

func (c *Controller) NetworkGateways() []model.NetworkGateway

func (*Controller) Provider

func (c *Controller) Provider() provider.ID

func (*Controller) Run

func (c *Controller) Run(stop <-chan struct{})

Run all controllers until a signal is received

func (*Controller) Services

func (c *Controller) Services() []*model.Service

Services implements a service catalog operation

func (*Controller) Stop

func (c *Controller) Stop()

Stop the controller. Only for tests, to simplify the code (defer c.Stop())

func (*Controller) SyncAll

func (c *Controller) SyncAll() error

SyncAll syncs all the objects node->service->pod->endpoint in order TODO: sync same kind of objects in parallel This can cause great performance cost in multi clusters scenario. Maybe just sync the cache and trigger one push at last.

func (*Controller) WorkloadInstanceHandler

func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event)

WorkloadInstanceHandler defines the handler for service instances generated by other registries

type EndpointBuilder

type EndpointBuilder struct {
	// contains filtered or unexported fields
}

EndpointBuilder is a stateful IstioEndpoint builder with metadata used to build IstioEndpoint

func NewEndpointBuilder

func NewEndpointBuilder(c controllerInterface, pod *v1.Pod) *EndpointBuilder

func NewEndpointBuilderFromMetadata

func NewEndpointBuilderFromMetadata(c controllerInterface, proxy *model.Proxy) *EndpointBuilder

type EndpointMode

type EndpointMode int

EndpointMode decides what source to use to get endpoint information

const (
	// EndpointsOnly type will use only Kubernetes Endpoints
	EndpointsOnly EndpointMode = iota

	// EndpointSliceOnly type will use only Kubernetes EndpointSlices
	EndpointSliceOnly
)

func DetectEndpointMode

func DetectEndpointMode(kubeClient kubelib.Client) EndpointMode

DetectEndpointMode determines whether to use Endpoints or EndpointSlice based on the feature flag and/or Kubernetes version

func (EndpointMode) String

func (m EndpointMode) String() string

type FakeController

type FakeController struct {
	*Controller
}

type FakeControllerOptions

type FakeControllerOptions struct {
	Client                    kubelib.Client
	NetworksWatcher           mesh.NetworksWatcher
	MeshWatcher               mesh.Watcher
	ServiceHandler            func(service *model.Service, event model.Event)
	Mode                      EndpointMode
	ClusterID                 cluster.ID
	WatchedNamespaces         string
	DomainSuffix              string
	XDSUpdater                model.XDSUpdater
	DiscoveryNamespacesFilter filter.DiscoveryNamespacesFilter
	Stop                      chan struct{}
}

type FakeXdsEvent

type FakeXdsEvent struct {
	// Type of the event
	Type string

	// The id of the event
	ID string

	// The endpoints associated with an EDS push if any
	Endpoints []*model.IstioEndpoint
}

FakeXdsEvent is used to watch XdsEvents

type FakeXdsUpdater

type FakeXdsUpdater struct {
	// Events tracks notifications received by the updater
	Events chan FakeXdsEvent
}

FakeXdsUpdater is used to test the registry.

func NewFakeXDS

func NewFakeXDS() *FakeXdsUpdater

NewFakeXDS creates a XdsUpdater reporting events via a channel.

func (*FakeXdsUpdater) Clear

func (fx *FakeXdsUpdater) Clear()

Clear any pending event

func (*FakeXdsUpdater) ConfigUpdate

func (fx *FakeXdsUpdater) ConfigUpdate(req *model.PushRequest)

func (*FakeXdsUpdater) EDSCacheUpdate

func (fx *FakeXdsUpdater) EDSCacheUpdate(_ model.ShardKey, hostname, _ string, entry []*model.IstioEndpoint)

func (*FakeXdsUpdater) EDSUpdate

func (fx *FakeXdsUpdater) EDSUpdate(_ model.ShardKey, hostname string, _ string, entry []*model.IstioEndpoint)

func (*FakeXdsUpdater) ProxyUpdate

func (fx *FakeXdsUpdater) ProxyUpdate(_ cluster.ID, _ string)

func (*FakeXdsUpdater) RemoveShard

func (fx *FakeXdsUpdater) RemoveShard(shardKey model.ShardKey)

func (*FakeXdsUpdater) SvcUpdate

func (fx *FakeXdsUpdater) SvcUpdate(_ model.ShardKey, hostname string, _ string, _ model.Event)

SvcUpdate is called when a service port mapping definition is updated. This interface is WIP - labels, annotations and other changes to service may be updated to force a EDS and CDS recomputation and incremental push, as it doesn't affect LDS/RDS.

func (*FakeXdsUpdater) Wait

func (fx *FakeXdsUpdater) Wait(et string) *FakeXdsEvent

func (*FakeXdsUpdater) WaitForDuration

func (fx *FakeXdsUpdater) WaitForDuration(et string, d time.Duration) *FakeXdsEvent

func (*FakeXdsUpdater) WaitForDurationOrFail

func (fx *FakeXdsUpdater) WaitForDurationOrFail(t test.Failer, et string, d time.Duration) *FakeXdsEvent

func (*FakeXdsUpdater) WaitOrFail

func (fx *FakeXdsUpdater) WaitOrFail(t test.Failer, et string) *FakeXdsEvent

type FilterOutFunc

type FilterOutFunc func(old, cur interface{}) bool

FilterOutFunc func for filtering out objects during update callback

type Multicluster

type Multicluster struct {
	XDSUpdater model.XDSUpdater
	// contains filtered or unexported fields
}

Multicluster structure holds the remote kube Controllers and multicluster specific attributes.

func NewMulticluster

func NewMulticluster(
	serverID string,
	kc kubernetes.Interface,
	secretNamespace string,
	opts Options,
	serviceEntryController *serviceentry.Controller,
	caBundleWatcher *keycertbundle.Watcher,
	revision string,
	startNsController bool,
	clusterLocal model.ClusterLocalProvider,
	s server.Instance) *Multicluster

NewMulticluster initializes data structure to store multicluster information

func (*Multicluster) ClusterAdded

func (m *Multicluster) ClusterAdded(cluster *multicluster.Cluster, clusterStopCh <-chan struct{}) error

ClusterAdded is passed to the secret controller as a callback to be called when a remote cluster is added. This function needs to set up all the handlers to watch for resources being added, deleted or changed on remote clusters.

func (*Multicluster) ClusterDeleted

func (m *Multicluster) ClusterDeleted(clusterID cluster.ID) error

ClusterDeleted is passed to the secret controller as a callback to be called when a remote cluster is deleted. Also must clear the cache so remote resources are removed.

func (*Multicluster) ClusterUpdated

func (m *Multicluster) ClusterUpdated(cluster *multicluster.Cluster, stop <-chan struct{}) error

ClusterUpdated is passed to the secret controller as a callback to be called when a remote cluster is updated.

func (*Multicluster) Run

func (m *Multicluster) Run(stopCh <-chan struct{}) error

type NamespaceController

type NamespaceController struct {
	// contains filtered or unexported fields
}

NamespaceController manages reconciles a configmap in each namespace with a desired set of data.

func NewNamespaceController

func NewNamespaceController(kubeClient kube.Client, caBundleWatcher *keycertbundle.Watcher) *NamespaceController

NewNamespaceController returns a pointer to a newly constructed NamespaceController instance.

func (*NamespaceController) Run

func (nc *NamespaceController) Run(stopCh <-chan struct{})

Run starts the NamespaceController until a value is sent to stopCh.

type Options

type Options struct {
	SystemNamespace string

	// MeshServiceController is a mesh-wide service Controller.
	MeshServiceController *aggregate.Controller

	DomainSuffix string

	// ClusterID identifies the remote cluster in a multicluster env.
	ClusterID cluster.ID

	// ClusterAliases are aliase names for cluster. When a proxy connects with a cluster ID
	// and if it has a different alias we should use that a cluster ID for proxy.
	ClusterAliases map[string]string

	// Metrics for capturing node-based metrics.
	Metrics model.Metrics

	// XDSUpdater will push changes to the xDS server.
	XDSUpdater model.XDSUpdater

	// NetworksWatcher observes changes to the mesh networks config.
	NetworksWatcher mesh.NetworksWatcher

	// MeshWatcher observes changes to the mesh config
	MeshWatcher mesh.Watcher

	// EndpointMode decides what source to use to get endpoint information
	EndpointMode EndpointMode

	// Maximum QPS when communicating with kubernetes API
	KubernetesAPIQPS float32

	// Maximum burst for throttle when communicating with the kubernetes API
	KubernetesAPIBurst int

	// SyncTimeout, if set, causes HasSynced to be returned when marked true.
	SyncTimeout *atomic.Bool

	// If meshConfig.DiscoverySelectors are specified, the DiscoveryNamespacesFilter tracks the namespaces this controller watches.
	DiscoveryNamespacesFilter filter.DiscoveryNamespacesFilter
}

Options stores the configurable attributes of a Controller.

type PodCache

type PodCache struct {
	sync.RWMutex

	// IPByPods is a reverse map of podsByIP. This exists to allow us to prune stale entries in the
	// pod cache if a pod changes IP.
	IPByPods map[string]string
	// contains filtered or unexported fields
}

PodCache is an eventually consistent pod cache

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL