ktoc

package
v1.2.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package ktoc contains a reusable abstraction for efficiently watching for changes in resources in a Kubernetes cluster.

Index

Constants

View Source
const (
	// CloudK8SNS is the key used in the meta to record the namespace
	// of the service/node registration.
	CloudK8SNS       = "fsm-connector-external-k8s-ns"
	CloudK8SRefKind  = "fsm-connector-external-k8s-ref-kind"
	CloudK8SRefValue = "fsm-connector-external-k8s-ref-name"
	CloudK8SNodeName = "fsm-connector-external-k8s-node-name"
)
View Source
const (
	// SyncPeriod is how often the syncer will attempt to
	// reconcile the expected service states with the remote cloud server.
	SyncPeriod = 5 * time.Second

	// ServicePollPeriod is how often a service is checked for
	// whether it has instances to reap.
	ServicePollPeriod = 10 * time.Second
)

Variables

This section is empty.

Functions

func SetSyncCloudNamespace

func SetSyncCloudNamespace(ns string)

SetSyncCloudNamespace sets sync namespace

func WithGateway

func WithGateway(enable bool)

WithGateway sets enable or disable

Types

type Backgrounder

type Backgrounder interface {
	Run(<-chan struct{})
}

Backgrounder should be implemented by a Resource that requires additional background processing. If a Resource implements this, then the Controller will automatically Run the Backgrounder for the duration of the controller.

The channel will be closed when the Controller is quitting. The Controller will block until the Backgrounder completes.

type CloudSyncer

type CloudSyncer struct {
	// EnableNamespaces indicates that a user is running Consul Enterprise
	// with version 1.7+ which is namespace aware. It enables Consul namespaces,
	// with syncing into either a single Consul namespace or mirrored from
	// k8s namespaces.
	EnableNamespaces bool

	// CrossNamespaceACLPolicy is the name of the ACL policy to attach to
	// any created Consul namespaces to allow cross namespace service discovery.
	// Only necessary if ACLs are enabled.
	CrossNamespaceACLPolicy string

	// SyncPeriod is the interval between full catalog syncs. These will
	// re-register all services to prevent overwrites of data. This should
	// happen relatively infrequently and default to 5 seconds.
	//
	// ServicePollPeriod is the interval to look for invalid services to
	// deregister. One request will be made for each synced service in
	// Kubernetes.
	//
	// For both syncs, smaller more frequent and focused syncs may be
	// triggered by known drift or changes.
	SyncPeriod        time.Duration
	ServicePollPeriod time.Duration

	// ConsulK8STag is the tag value for services registered.
	ConsulK8STag string

	// The Consul node name to register services with.
	ConsulNodeName string

	DiscClient provider.ServiceDiscoveryClient
	// contains filtered or unexported fields
}

CloudSyncer is a Syncer that takes the set of registrations and registers them with cloud. It also watches cloud for changes to the services and ensures the local set of registrations represents the source of truth, overwriting any external changes to the services.

func (*CloudSyncer) Run

func (s *CloudSyncer) Run(ctx context.Context)

Run is the long-running runloop for reconciling the local set of services to register with the remote state.

func (*CloudSyncer) Sync

func (s *CloudSyncer) Sync(rs []*provider.CatalogRegistration)

Sync implements Syncer.

type Controller

type Controller struct {
	Resource Resource
	// contains filtered or unexported fields
}

Controller is a generic cache.Controller implementation that watches Kubernetes for changes to specific set of resources and calls the configured callbacks as data changes.

func (*Controller) HasSynced

func (c *Controller) HasSynced() bool

HasSynced implements cache.Controller.

func (*Controller) LastSyncResourceVersion

func (c *Controller) LastSyncResourceVersion() string

LastSyncResourceVersion implements cache.Controller.

func (*Controller) Run

func (c *Controller) Run(stopCh <-chan struct{})

Run starts the Controller and blocks until stopCh is closed.

Important: Callers must ensure that Run is only called once at a time.

type Event

type Event struct {
	// Key is in the form of <namespace>/<name>, e.g. default/pod-abc123,
	// and corresponds to the resource modified.
	Key string
	// Obj holds the resource that was modified at the time of the event
	// occurring. If possible, the resource should be retrieved from the informer
	// cache, instead of using this field because the cache will be more up to
	// date at the time the event is processed.
	// In some cases, such as a delete event, the resource will no longer exist
	// in the cache and then it is useful to have the resource here.
	Obj interface{}
}

Event is something that occurred to the resources we're watching.

type NodePortSyncType

type NodePortSyncType string
const (
	// ExternalOnly only sync NodePort services with a node's ExternalIP address.
	// Doesn't sync if an ExternalIP doesn't exist.
	ExternalOnly NodePortSyncType = "ExternalOnly"

	// ExternalFirst sync with an ExternalIP first, if it doesn't exist, use the
	// node's InternalIP address instead.
	ExternalFirst NodePortSyncType = "ExternalFirst"

	// InternalOnly sync NodePort services using.
	InternalOnly NodePortSyncType = "InternalOnly"
)

type Resource

type Resource interface {
	// Informer returns the SharedIndexInformer that the controller will
	// use to watch for changes. An Informer is the long-running task that
	// holds blocking queries to K8S and stores data in a local store.
	Informer() cache.SharedIndexInformer

	// Upsert is the callback called when processing the queue
	// of changes from the Informer. If an error is returned, the given item
	// will be retried.
	Upsert(key string, obj interface{}) error
	// Delete is called on object deletion.
	// obj is the last known state of the object before deletion. In some
	// cases, it may not be up to date with the latest state of the object.
	// If an error is returned, the given item will be retried.
	Delete(key string, obj interface{}) error
}

Resource should be implemented by anything that should be watchable by Controller. The Resource needs to be aware of how to create the Informer that is responsible for making API calls as well as what to do on Upsert and Delete.

func NewResource

func NewResource(
	informer cache.SharedIndexInformer,
	upsert ResourceUpsertFunc,
	delete ResourceDeleteFunc,
) Resource

NewResource returns a Resource implementation for the given informer, upsert handler, and delete handler.

type ResourceDeleteFunc

type ResourceDeleteFunc func(string, interface{}) error

type ResourceUpsertFunc

type ResourceUpsertFunc func(string, interface{}) error

ResourceUpsertFunc and ResourceDeleteFunc are the callback types for when a resource is inserted, updated, or deleted.

type ServiceResource

type ServiceResource struct {
	Client kubernetes.Interface
	Syncer Syncer

	// Ctx is used to cancel processes kicked off by ServiceResource.
	Ctx context.Context

	// AllowK8sNamespacesSet is a set of k8s namespaces to explicitly allow for
	// syncing. It supports the special character `*` which indicates that
	// all k8s namespaces are eligible unless explicitly denied. This filter
	// is applied before checking pod annotations.
	AllowK8sNamespacesSet mapset.Set

	// DenyK8sNamespacesSet is a set of k8s namespaces to explicitly deny
	// syncing and thus service registration with Consul. An empty set
	// means that no namespaces are removed from consideration. This filter
	// takes precedence over AllowK8sNamespacesSet.
	DenyK8sNamespacesSet mapset.Set

	// ConsulK8STag is the tag value for services registered.
	ConsulK8STag string

	//AddServicePrefix prepends K8s services in cloud with a prefix
	AddServicePrefix string

	// ExplictEnable should be set to true to require explicit enabling
	// using annotations. If this is false, then services are implicitly
	// enabled (aka default enabled).
	ExplicitEnable bool

	// ClusterIPSync set to true (the default) syncs ClusterIP-type services.
	// Setting this to false will ignore ClusterIP services during the sync.
	ClusterIPSync bool

	// LoadBalancerEndpointsSync set to true (default false) will sync ServiceTypeLoadBalancer endpoints.
	LoadBalancerEndpointsSync bool

	// NodeExternalIPSync set to true (the default) syncs NodePort services
	// using the node's external ip address. When false, the node's internal
	// ip address will be used instead.
	NodePortSync NodePortSyncType

	// AddK8SNamespaceAsServiceSuffix set to true appends Kubernetes namespace
	// to the service name being synced to Cloud separated by a dash.
	// For example, service 'foo' in the 'default' namespace will be synced
	// as 'foo-default'.
	AddK8SNamespaceAsServiceSuffix bool

	// EnableNamespaces indicates that a user is running Consul Enterprise
	// with version 1.7+ which is namespace aware. It enables Consul namespaces,
	// with syncing into either a single Consul namespace or mirrored from
	// k8s namespaces.
	EnableNamespaces bool

	// ConsulDestinationNamespace is the name of the Consul namespace to register all
	// synced services into if Consul namespaces are enabled and mirroring
	// is disabled. This will not be used if mirroring is enabled.
	ConsulDestinationNamespace string

	// EnableK8SNSMirroring causes Consul namespaces to be created to match the
	// organization within k8s. Services are registered into the Consul
	// namespace that mirrors their k8s namespace.
	EnableK8SNSMirroring bool

	// K8SNSMirroringPrefix is an optional prefix that can be added to the Consul
	// namespaces created while mirroring. For example, if it is set to "k8s-",
	// then the k8s `default` namespace will be mirrored in Consul's
	// `k8s-default` namespace.
	K8SNSMirroringPrefix string

	// The Consul node name to register service with.
	ConsulNodeName string

	// SyncIngress enables syncing of the hostname from an Ingress resource
	// to the service registration if an Ingress rule matches the service.
	SyncIngress bool

	// SyncIngressLoadBalancerIPs enables syncing the IP of the Ingress LoadBalancer
	// if we do not want to sync the hostname from the Ingress resource.
	SyncIngressLoadBalancerIPs bool
	// contains filtered or unexported fields
}

ServiceResource implements controller.Resource to sync CatalogService resource types from K8S.

func (*ServiceResource) Delete

func (t *ServiceResource) Delete(key string, _ interface{}) error

Delete implements the controller.Resource interface.

func (*ServiceResource) Informer

Informer implements the controller.Resource interface.

func (*ServiceResource) Run

func (t *ServiceResource) Run(ch <-chan struct{})

Run implements the controller.Backgrounder interface.

func (*ServiceResource) Upsert

func (t *ServiceResource) Upsert(key string, raw interface{}) error

Upsert implements the controller.Resource interface.

type Syncer

type Syncer interface {
	// Sync is called to sync the full set of registrations.
	Sync([]*provider.CatalogRegistration)
}

Syncer is responsible for syncing a set of cloud catalog registrations. An external system manages the set of registrations and periodically updates the Syncer. The Syncer should keep the remote system in sync with the given set of registrations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL