connector

package
v1.1.4-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2023 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package connector contains a reusable abstraction for efficiently watching for changes in resources in a Kubernetes cluster.

Index

Constants

View Source
const (
	// K8SQuietPeriod is the time to wait for no service changes before syncing.
	K8SQuietPeriod = 1 * time.Second

	// K8SMaxPeriod is the maximum time to wait before forcing a sync, even
	// if there are active changes going on.
	K8SMaxPeriod = 5 * time.Second
)
View Source
const (
	// CloudSourcedServiceLabel defines cloud-sourced service label
	CloudSourcedServiceLabel = "cloud-sourced-service"
	// CloudServiceLabel defines cloud service label
	CloudServiceLabel = "cloud-service"
	// CloudServiceInheritedFromAnnotation defines cloud service inherited annotation
	CloudServiceInheritedFromAnnotation = "flomesh.io/cloud-service-inherited-from"

	// MeshServiceSyncAnnotation defines mesh service sync annotation
	MeshServiceSyncAnnotation = "flomesh.io/mesh-service-sync"
	// MeshEndpointAddrAnnotation defines mesh endpoint addr annotation
	MeshEndpointAddrAnnotation = "flomesh.io/cloud-endpoint-addr"

	//ConsulDiscoveryService defines consul discovery service name
	ConsulDiscoveryService = "consul"

	//EurekaDiscoveryService defines eureka discovery service name
	EurekaDiscoveryService = "eureka"
)

Variables

This section is empty.

Functions

func EnabledGatewayAPI added in v1.1.4

func EnabledGatewayAPI(enabled bool)

EnabledGatewayAPI set gatewayAPIEnabled

func IsSyncCloudNamespace

func IsSyncCloudNamespace(ns *corev1.Namespace) bool

IsSyncCloudNamespace if sync namespace

func SetSyncCloudNamespace

func SetSyncCloudNamespace(ns string)

SetSyncCloudNamespace sets sync namespace

func WatchMeshConfigUpdated added in v1.1.4

func WatchMeshConfigUpdated(msgBroker *messaging.Broker, stop <-chan struct{})

WatchMeshConfigUpdated watches update of meshconfig

Types

type Aggregator

type Aggregator interface {
	// Aggregate micro services
	Aggregate(svcName MicroSvcName, svcDomainName MicroSvcDomainName) (map[MicroSvcName]*MicroSvcMeta, string)
}

Aggregator aggregates micro services

type Backgrounder

type Backgrounder interface {
	Run(<-chan struct{})
}

Backgrounder should be implemented by a Resource that requires additional background processing. If a Resource implements this, then the Controller will automatically Run the Backgrounder for the duration of the controller.

The channel will be closed when the Controller is quitting. The Controller will block until the Backgrounder completes.

type Controller

type Controller struct {
	Resource Resource
	// contains filtered or unexported fields
}

Controller is a generic cache.Controller implementation that watches Kubernetes for changes to specific set of resources and calls the configured callbacks as data changes.

func (*Controller) Run

func (c *Controller) Run(stopCh <-chan struct{})

Run starts the Controller and blocks until stopCh is closed.

Important: Callers must ensure that Run is only called once at a time.

type Event

type Event struct {
	// Key is in the form of <namespace>/<name>, e.g. default/pod-abc123,
	// and corresponds to the resource modified.
	Key string
	// Obj holds the resource that was modified at the time of the event
	// occurring. If possible, the resource should be retrieved from the informer
	// cache, instead of using this field because the cache will be more up to
	// date at the time the event is processed.
	// In some cases, such as a delete event, the resource will no longer exist
	// in the cache and then it is useful to have the resource here.
	Obj interface{}
}

Event is something that occurred to the resources we're watching.

type MicroEndpointAddr

type MicroEndpointAddr string

MicroEndpointAddr defines string as micro endpoint addr

func (MicroEndpointAddr) To16

func (addr MicroEndpointAddr) To16() net.IP

To16 converts the IP address ip to a 16-byte representation. If ip is not an IP address (it is the wrong length), To16 returns nil.

func (MicroEndpointAddr) To4

func (addr MicroEndpointAddr) To4() net.IP

To4 converts the IPv4 address ip to a 4-byte representation. If ip is not an IPv4 address, To4 returns nil.

type MicroSvcAppProtocol

type MicroSvcAppProtocol string

MicroSvcAppProtocol defines app protocol

type MicroSvcDomainName

type MicroSvcDomainName string

MicroSvcDomainName defines string as microservice domain name

type MicroSvcMeta

type MicroSvcMeta struct {
	Ports     map[MicroSvcPort]MicroSvcAppProtocol
	Addresses map[MicroEndpointAddr]int
}

MicroSvcMeta defines micro service meta

type MicroSvcName

type MicroSvcName string

MicroSvcName defines string as microservice name

type MicroSvcPort

type MicroSvcPort int

MicroSvcPort defines int as micro service port

type Resource

type Resource interface {
	// Ready wait util ready
	Ready()

	// ServiceInformer returns the SharedIndexInformer that the controller will
	// use to watch for changes. An Informer is the long-running task that
	// holds blocking queries to K8S and stores data in a local store.
	ServiceInformer() cache.SharedIndexInformer

	// EndpointsInformer returns the SharedIndexInformer that the controller will
	// use to watch for changes. An Informer is the long-running task that
	// holds blocking queries to K8S and stores data in a local store.
	EndpointsInformer() cache.SharedIndexInformer

	// GatewayInformer returns the SharedIndexInformer that the controller will
	// use to watch for changes. An Informer is the long-running task that
	// holds blocking queries to K8S and stores data in a local store.
	GatewayInformer() cache.SharedIndexInformer

	// UpsertService is the callback called when processing the queue
	// of changes from the Informer. If an error is returned, the given item
	// will be retried.
	UpsertService(key string, obj interface{}) error

	// DeleteService is called on object deletion.
	// obj is the last known state of the object before deletion. In some
	// cases, it may not be up to date with the latest state of the object.
	// If an error is returned, the given item will be retried.
	DeleteService(key string, obj interface{}) error

	// UpsertEndpoints is the callback called when processing the queue
	// of changes from the Informer. If an error is returned, the given item
	// will be retried.
	UpsertEndpoints(key string, obj interface{}) error

	// DeleteEndpoints is called on object deletion.
	// obj is the last known state of the object before deletion. In some
	// cases, it may not be up to date with the latest state of the object.
	// If an error is returned, the given item will be retried.
	DeleteEndpoints(key string, obj interface{}) error

	// UpsertGateway is the callback called when processing the queue
	// of changes from the Informer. If an error is returned, the given item
	// will be retried.
	UpsertGateway(key string, obj interface{}) error

	// DeleteGateway is called on object deletion.
	// obj is the last known state of the object before deletion. In some
	// cases, it may not be up to date with the latest state of the object.
	// If an error is returned, the given item will be retried.
	DeleteGateway(key string, obj interface{}) error
}

Resource should be implemented by anything that should be watchable by Controller. The Resource needs to be aware of how to create the Informer that is responsible for making API calls as well as what to do on Upsert and Delete.

type Sink

type Sink struct {
	KubeClient kubernetes.Interface

	MicroAggregator Aggregator

	// SyncPeriod is the duration to wait between registering or deregistering
	// services in Kubernetes. This can be fairly short since no work will be
	// done if there are no changes.
	SyncPeriod time.Duration

	// Ctx is used to cancel the Sink.
	Ctx context.Context
	// contains filtered or unexported fields
}

Sink is the destination where services are registered.

While in practice we only have one sink (K8S), the interface abstraction makes it easy and possible to test the Source in isolation.

func NewSink

func NewSink(ctx context.Context, kubeClient kubernetes.Interface, gatewayClient gwapi.Interface, fsmNamespace string) *Sink

NewSink creates a new mesh sink

func (*Sink) DeleteEndpoints

func (s *Sink) DeleteEndpoints(key string, _ interface{}) error

DeleteEndpoints implements the controller.Resource interface.

func (*Sink) DeleteGateway added in v1.1.4

func (s *Sink) DeleteGateway(_ string, _ interface{}) error

DeleteGateway implements the controller.Resource interface.

func (*Sink) DeleteService

func (s *Sink) DeleteService(key string, _ interface{}) error

DeleteService implements the controller.Resource interface.

func (*Sink) EndpointsInformer

func (s *Sink) EndpointsInformer() cache.SharedIndexInformer

EndpointsInformer tells Kubernetes that we want to watch for changes to Endpoints.

func (*Sink) GatewayInformer added in v1.1.4

func (s *Sink) GatewayInformer() cache.SharedIndexInformer

GatewayInformer implements the controller.Resource interface. It tells Kubernetes that we want to watch for changes to Gateways.

func (*Sink) Ready

func (s *Sink) Ready()

Ready wait util ready

func (*Sink) Run

func (s *Sink) Run(ch <-chan struct{})

Run implements the controller.Backgrounder interface.

func (*Sink) ServiceInformer

func (s *Sink) ServiceInformer() cache.SharedIndexInformer

ServiceInformer implements the controller.Resource interface. It tells Kubernetes that we want to watch for changes to Services.

func (*Sink) SetServices

func (s *Sink) SetServices(svcs map[MicroSvcName]MicroSvcDomainName)

SetServices is called with the services that should be created. The key is the service name and the destination is the external DNS entry to point to.

func (*Sink) UpsertEndpoints

func (s *Sink) UpsertEndpoints(key string, raw interface{}) error

UpsertEndpoints implements the controller.Resource interface.

func (*Sink) UpsertGateway added in v1.1.4

func (s *Sink) UpsertGateway(key string, raw interface{}) error

UpsertGateway implements the controller.Resource interface.

func (*Sink) UpsertService

func (s *Sink) UpsertService(key string, raw interface{}) error

UpsertService implements the controller.Resource interface.

Directories

Path Synopsis
Package consul implements a syncer from consul to k8s.
Package consul implements a syncer from consul to k8s.
Package eureka implements a syncer from eureka to k8s.
Package eureka implements a syncer from eureka to k8s.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL