connector

package
v1.5.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Overview

Package connector contains a reusable abstraction for efficiently watching for changes in resources in a Kubernetes cluster.

Index

Constants

View Source
const (
	// AnnotationMeshServiceSync defines mesh service sync annotation
	AnnotationMeshServiceSync = "flomesh.io/mesh-service-sync"

	// AnnotationMeshServiceInternalSync defines mesh service internal sync annotation
	AnnotationMeshServiceInternalSync = "flomesh.io/mesh-service-internal-sync"

	// AnnotationCloudServiceInheritedFrom defines cloud service inherited annotation
	AnnotationCloudServiceInheritedFrom = "flomesh.io/cloud-service-inherited-from"

	// AnnotationCloudServiceAttachedTo defines cloud service attached to namespace
	AnnotationCloudServiceAttachedTo = "flomesh.io/cloud-service-attached-to"

	// AnnotationCloudServiceInheritedClusterID defines cloud service cluster id annotation
	AnnotationCloudServiceInheritedClusterID = "flomesh.io/cloud-service-inherited-cluster-id"

	// AnnotationMeshEndpointAddr defines mesh endpoint addr annotation
	AnnotationMeshEndpointAddr = "flomesh.io/cloud-endpoint-addr"
)
View Source
const (
	// AnnotationServiceSyncK8sToCloud is the key of the annotation that determines
	// whether to sync the k8s Service to Consul/Eureka.
	AnnotationServiceSyncK8sToCloud = "flomesh.io/service-sync-k8s-to-cloud"

	// AnnotationServiceSyncK8sToFgw is the key of the annotation that determines
	// whether to sync the k8s Service to fsm gateway.
	AnnotationServiceSyncK8sToFgw = "flomesh.io/service-sync-k8s-to-fgw"

	// AnnotationCloudHealthCheckService defines health check service annotation
	AnnotationCloudHealthCheckService = "flomesh.io/cloud-health-check-service"

	// AnnotationServiceName is set to override the name of the service
	// registered. By default this will be the name of the CatalogService resource.
	AnnotationServiceName = "flomesh.io/service-name"

	// AnnotationServicePort specifies the port to use as the service instance
	// port when registering a service. This can be a named port in the
	// service or an integer value.
	AnnotationServicePort = "flomesh.io/service-port"

	// AnnotationServiceTags specifies the tags for the registered service
	// instance. Multiple tags should be comma separated. Whitespace around
	// the tags is automatically trimmed.
	AnnotationServiceTags = "flomesh.io/service-tags"

	// AnnotationServiceMetaPrefix is the prefix for setting meta key/value
	// for a service. The remainder of the key is the meta key.
	AnnotationServiceMetaPrefix = "flomesh.io/service-meta-"

	// AnnotationServiceWeight is the key of the annotation that determines
	// the traffic weight of the service which is spanned over multiple k8s cluster.
	// e.g. CatalogService `backend` in k8s cluster `A` receives 25% of the traffic
	// compared to same `backend` service in k8s cluster `B`.
	AnnotationServiceWeight = "flomesh.io/service-weight"
)
View Source
const (
	// HealthAny is special, and is used as a wild card,
	// not as a specific state.
	HealthAny      = "any"
	HealthPassing  = "passing"
	HealthWarning  = "warning"
	HealthCritical = "critical"
	HealthMaint    = "maintenance"
)
View Source
const (
	NACOS_DEFAULT_CLUSTER = "DEFAULT"
)

Variables

View Source
var (
	// ClusterSetKey is the key used in the meta to track the "k8s" source.
	ClusterSetKey = "fsm.connector.service.cluster.set"

	// ConnectUIDKey is the key used in the meta to track the "k8s" source.
	ConnectUIDKey = "fsm.connector.service.connector.uid"

	// CloudK8SNS is the key used in the meta to record the namespace
	// of the service/node registration.
	CloudK8SNS          = "fsm.connector.service.k8s.ns"
	CloudK8SRefKind     = "fsm.connector.service.k8s.ref.kind"
	CloudK8SRefValue    = "fsm.connector.service.k8s.ref.name"
	CloudK8SNodeName    = "fsm.connector.service.k8s.node.name"
	CloudK8SPort        = "fsm.connector.service.k8s.port"
	CloudHTTPViaGateway = "fsm.connector.service.http.via.gateway"
	CloudGRPCViaGateway = "fsm.connector.service.grpc.via.gateway"
	CloudViaGatewayMode = "fsm.connector.service.via.gateway.mode"
)
View Source
var (
	GatewayAPIEnabled = false
)
View Source
var SHARD_COUNT = 32

Functions

func Encode added in v1.3.8

func Encode(m *MicroSvcMeta) (string, uint64)

func WatchMeshConfigUpdated added in v1.1.4

func WatchMeshConfigUpdated(
	connectController ConnectController,
	msgBroker *messaging.Broker,
	stop <-chan struct{})

WatchMeshConfigUpdated watches update of meshconfig

Types

type AgentCheck added in v1.2.1

type AgentCheck struct {
	CheckID   string
	ServiceID string
	Name      string
	Namespace string
	Type      string
	Status    string
	Output    string
}

AgentCheck represents a check known to the agent

func (*AgentCheck) ToConsul added in v1.2.1

func (ac *AgentCheck) ToConsul() *consul.AgentCheck

type AgentService added in v1.2.1

type AgentService struct {
	ID           string
	InstanceId   string
	ClusterId    string
	MicroService MicroService
	Weights      AgentWeights
	Tags         []string
	Meta         map[string]interface{}

	GRPCInterface    string
	GRPCMethods      []string
	GRPCInstanceMeta map[string]interface{}

	HealthCheck bool
}

AgentService represents a service known to the agent

func (*AgentService) FromConsul added in v1.2.1

func (as *AgentService) FromConsul(ins *consul.AgentService)

func (*AgentService) FromEureka added in v1.2.1

func (as *AgentService) FromEureka(ins *eureka.Instance)

func (*AgentService) FromNacos added in v1.2.1

func (as *AgentService) FromNacos(ins *nacos.Instance)

func (*AgentService) FromVM added in v1.2.1

func (*AgentService) FromZookeeper

func (as *AgentService) FromZookeeper(ins discovery.ServiceInstance)

func (*AgentService) ToConsul added in v1.2.1

func (as *AgentService) ToConsul() *consul.AgentService

type AgentWeights added in v1.2.1

type AgentWeights struct {
	Passing int
	Warning int
}

func (*AgentWeights) FromConsul added in v1.2.1

func (aw *AgentWeights) FromConsul(w consul.AgentWeights)

func (*AgentWeights) ToConsul added in v1.2.1

func (aw *AgentWeights) ToConsul() consul.AgentWeights

type Backgrounder

type Backgrounder interface {
	Run(<-chan struct{})
}

Backgrounder should be implemented by a Resource that requires additional background processing. If a Resource implements this, then the CacheController will automatically Run the Backgrounder for the duration of the controller.

The channel will be closed when the CacheController is quitting. The CacheController will block until the Backgrounder completes.

type C2KContext added in v1.2.1

type C2KContext struct {

	// EndpointsKeyToName maps from Kube controller keys to Kube endpoints names.
	// Controller keys are in the form <kube namespace>/<kube endpoints name>
	// e.g. default/foo, and are the keys Kube uses to inform that something
	// changed.
	EndpointsKeyToName map[string]string

	// SourceServices holds cloud services that should be synced to Kube.
	// It maps from cloud service names to cloud DNS entry, e.g.
	// We lowercase the cloud service names and DNS entries
	// because Kube names must be lowercase.
	SourceServices map[string]string
	RawServices    map[string]string

	// ServiceKeyToName maps from Kube controller keys to Kube service names.
	// Controller keys are in the form <kube namespace>/<kube svc name>
	// e.g. default/foo, and are the keys Kube uses to inform that something
	// changed.
	ServiceKeyToName map[string]string

	// ServiceMapCache is a subset of serviceMap. It holds all Kube services
	// that were created by this sync process. Keys are Kube service names.
	// It's populated from Kubernetes data.
	ServiceMapCache map[string]*corev1.Service

	ServiceHashMap map[string]uint64
}

C2KContext is the c2k context for connector controller

func NewC2KContext added in v1.2.1

func NewC2KContext() *C2KContext

type CacheController added in v1.2.1

type CacheController struct {
	Resource Resource
	// contains filtered or unexported fields
}

CacheController is a generic cache.Controller implementation that watches Kubernetes for changes to specific set of resources and calls the configured callbacks as data changes.

func (*CacheController) HasSynced added in v1.2.1

func (c *CacheController) HasSynced() bool

HasSynced implements cache.Controller.

func (*CacheController) LastSyncResourceVersion added in v1.2.1

func (c *CacheController) LastSyncResourceVersion() string

LastSyncResourceVersion implements cache.Controller.

func (*CacheController) Run added in v1.2.1

func (c *CacheController) Run(stopCh <-chan struct{})

Run starts the CacheController and blocks until stopCh is closed.

Important: Callers must ensure that Run is only called once at a time.

type CatalogDeregistration added in v1.2.1

type CatalogDeregistration struct {
	NamespacedService

	Node       string
	ServiceID  string
	ServiceRef string
}

func (*CatalogDeregistration) ToConsul added in v1.2.1

func (*CatalogDeregistration) ToEureka added in v1.2.1

func (cdr *CatalogDeregistration) ToEureka() *eureka.Instance

func (*CatalogDeregistration) ToNacos added in v1.2.1

func (*CatalogDeregistration) ToZookeeper

type CatalogRegistration added in v1.2.1

type CatalogRegistration struct {
	Node           string
	Address        string
	NodeMeta       map[string]string
	Service        *AgentService
	Check          *AgentCheck
	SkipNodeUpdate bool
}

func (*CatalogRegistration) ToConsul added in v1.2.1

func (*CatalogRegistration) ToEureka added in v1.2.1

func (cr *CatalogRegistration) ToEureka() *eureka.Instance

func (*CatalogRegistration) ToNacos added in v1.2.1

func (cr *CatalogRegistration) ToNacos(cluster, group string, weight float64) *vo.RegisterInstanceParam

func (*CatalogRegistration) ToZookeeper

type CatalogService added in v1.2.1

type CatalogService struct {
	Node        string
	ServiceID   string
	ServiceName string
	ServiceRef  string
}

func (*CatalogService) FromConsul added in v1.2.1

func (cs *CatalogService) FromConsul(svc *consul.CatalogService)

func (*CatalogService) FromEureka added in v1.2.1

func (cs *CatalogService) FromEureka(svc *eureka.Instance)

func (*CatalogService) FromNacos added in v1.2.1

func (cs *CatalogService) FromNacos(svc *nacos.Instance)

func (*CatalogService) FromZookeeper

func (cs *CatalogService) FromZookeeper(svc discovery.ServiceInstance)

type ConcurrentMap added in v1.2.1

type ConcurrentMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

ConcurrentMap a "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.

func NewConcurrentMap added in v1.2.1

func NewConcurrentMap[V any]() ConcurrentMap[string, V]

NewConcurrentMap creates a new concurrent map.

func NewStringerConcurrentMap added in v1.2.1

func NewStringerConcurrentMap[K Stringer, V any]() ConcurrentMap[K, V]

NewStringerConcurrentMap creates a new concurrent map.

func (ConcurrentMap[K, V]) Clear added in v1.2.1

func (m ConcurrentMap[K, V]) Clear()

Clear removes all items from map.

func (ConcurrentMap[K, V]) Count added in v1.2.1

func (m ConcurrentMap[K, V]) Count() int

Count returns the number of elements within the map.

func (ConcurrentMap[K, V]) Get added in v1.2.1

func (m ConcurrentMap[K, V]) Get(key K) (V, bool)

Get retrieves an element from map under given key.

func (ConcurrentMap[K, V]) GetShard added in v1.2.1

func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V]

GetShard returns shard under given key

func (ConcurrentMap[K, V]) Has added in v1.2.1

func (m ConcurrentMap[K, V]) Has(key K) bool

Has Looks up an item under specified key

func (ConcurrentMap[K, V]) IsEmpty added in v1.2.1

func (m ConcurrentMap[K, V]) IsEmpty() bool

IsEmpty checks if map is empty.

func (ConcurrentMap[K, V]) Items added in v1.2.1

func (m ConcurrentMap[K, V]) Items() map[K]V

Items returns all items as map[string]V

func (ConcurrentMap[K, V]) Iter added in v1.2.1

func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V]

Iter returns an iterator which could be used in a for range loop. Deprecated: using IterBuffered() will get a better performance

func (ConcurrentMap[K, V]) IterBuffered added in v1.2.1

func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V]

IterBuffered returns a buffered iterator which could be used in a for range loop.

func (ConcurrentMap[K, V]) IterCb added in v1.2.1

func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V])

Callback based iterator, cheapest way to read all elements in a map.

func (ConcurrentMap[K, V]) Keys added in v1.2.1

func (m ConcurrentMap[K, V]) Keys() []K

Keys returns all keys as []string

func (ConcurrentMap[K, V]) MSet added in v1.2.1

func (m ConcurrentMap[K, V]) MSet(data map[K]V)

func (ConcurrentMap[K, V]) Pop added in v1.2.1

func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool)

Pop removes an element from the map and returns it

func (ConcurrentMap[K, V]) Remove added in v1.2.1

func (m ConcurrentMap[K, V]) Remove(key K)

Remove removes an element from the map.

func (ConcurrentMap[K, V]) RemoveCb added in v1.2.1

func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool

RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)

func (ConcurrentMap[K, V]) Set added in v1.2.1

func (m ConcurrentMap[K, V]) Set(key K, value V)

Set the given value under the specified key.

func (ConcurrentMap[K, V]) SetIfAbsent added in v1.2.1

func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool

SetIfAbsent Sets the given value under the specified key if no value was associated with it.

func (ConcurrentMap[K, V]) Upsert added in v1.2.1

func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V)

Upsert Insert or Update - updates existing element or inserts a new one using UpsertCb

type ConcurrentMapShared added in v1.2.1

type ConcurrentMapShared[K comparable, V any] struct {
	sync.RWMutex // Read Write mutex, guards access to internal map.
	// contains filtered or unexported fields
}

ConcurrentMapShared A "thread" safe string to anything map.

type ConnectController added in v1.2.1

type ConnectController interface {
	BroadcastListener(stopCh <-chan struct{})

	GetConnectorProvider() ctv1.DiscoveryServiceProvider
	GetConnectorName() string
	GetConnectorUID() string

	GetConsulConnector(connector string) *ctv1.ConsulConnector
	GetEurekaConnector(connector string) *ctv1.EurekaConnector
	GetNacosConnector(connector string) *ctv1.NacosConnector
	GetZookeeperConnector(connector string) *ctv1.ZookeeperConnector
	GetMachineConnector(connector string) *ctv1.MachineConnector
	GetGatewayConnector(connector string) *ctv1.GatewayConnector
	GetConnector() (connector, spec interface{}, uid string, ok bool)

	Refresh()

	WaitLimiter()

	GetC2KContext() *C2KContext
	GetK2CContext() *K2CContext
	GetK2GContext() *K2GContext

	GetClusterSet() string
	SetClusterSet(name, group, zone, region string)

	SetServiceInstanceIDFunc(f ServiceInstanceIDFunc)
	GetServiceInstanceID(name, addr string, port MicroServicePort, protocol MicroServiceProtocol) string

	GetClusterId() string
	GetPassingOnly() bool
	GetC2KFilterTag() string
	GetC2KFilterMetadatas() []ctv1.Metadata
	GetC2KExcludeMetadatas() []ctv1.Metadata
	GetC2KFilterIPRanges() []*cidr.CIDR
	GetC2KExcludeIPRanges() []*cidr.CIDR
	GetK2CFilterIPRanges() []*cidr.CIDR
	GetK2CExcludeIPRanges() []*cidr.CIDR
	GetPrefix() string
	GetPrefixTag() string
	GetSuffixTag() string
	GetPrefixMetadata() string
	GetSuffixMetadata() string

	GetFixedHTTPServicePort() *uint32

	GetC2KWithGateway() bool
	GetC2KMultiGateways() bool

	GetNacos2KClusterSet() []string
	GetNacos2KGroupSet() []string

	GetSyncPeriod() time.Duration
	GetDefaultSync() bool
	GetSyncClusterIPServices() bool
	GetSyncLoadBalancerEndpoints() bool
	GetNodePortSyncType() ctv1.NodePortSyncType

	GetSyncIngress() bool
	GetSyncIngressLoadBalancerIPs() bool

	GetAddServicePrefix() string
	GetAddK8SNamespaceAsServiceSuffix() bool

	GetAppendTagSet() mapset.Set
	GetAppendMetadataSet() mapset.Set
	GetAllowK8SNamespaceSet() mapset.Set
	GetDenyK8SNamespaceSet() mapset.Set

	GetK2CWithGateway() bool
	GetK2CWithGatewayMode() ctv1.WithGatewayMode

	GetConsulNodeName() string
	GetConsulEnableNamespaces() bool
	GetConsulDestinationNamespace() string
	GetConsulEnableK8SNSMirroring() bool
	GetConsulK8SNSMirroringPrefix() string
	GetConsulCrossNamespaceACLPolicy() string
	GetConsulGenerateInternalServiceHealthCheck() bool

	GetEurekaHeartBeatInstance() bool
	GetEurekaHeartBeatPeriod() time.Duration
	GetEurekaCheckServiceInstanceID() bool

	GetNacosGroupId() string
	GetNacosClusterId() string

	GetZookeeperBasePath() string
	GetZookeeperCategory() string
	GetZookeeperAdaptor() string

	GetK2GDefaultSync() bool
	GetK2GAllowK8SNamespaceSet() mapset.Set
	GetK2GDenyK8SNamespaceSet() mapset.Set

	GetViaIngressIPSelector() ctv1.AddrSelector
	GetViaEgressIPSelector() ctv1.AddrSelector

	GetViaIngressAddr() string
	SetViaIngressAddr(ingressAddr string)

	GetViaEgressAddr() string
	SetViaEgressAddr(egressAddr string)

	GetViaIngressHTTPPort() uint
	SetViaIngressHTTPPort(httpPort uint)

	GetViaIngressGRPCPort() uint
	SetViaIngressGRPCPort(grpcPort uint)

	GetViaEgressHTTPPort() uint
	SetViaEgressHTTPPort(httpPort uint)

	GetViaEgressGRPCPort() uint
	SetViaEgressGRPCPort(grpcPort uint)

	GetAuthConsulUsername() string
	GetAuthConsulPassword() string

	GetAuthNacosUsername() string
	GetAuthNacosPassword() string
	GetAuthNacosAccessKey() string
	GetAuthNacosSecretKey() string
	GetAuthNacosNamespaceId() string

	SyncCloudToK8s() bool
	SyncK8sToCloud() bool
	SyncK8sToGateway() bool

	GetHTTPAddr() string
	GetDeriveNamespace() string
	Purge() bool
	AsInternalServices() bool

	CacheCatalogInstances(key string, catalogFunc func() (interface{}, error)) (interface{}, error)
	CacheRegisterInstance(key string, instance interface{}, registerFunc func() error) error
	CacheDeregisterInstance(key string, deregisterFunc func() error) error
	CacheCleaner(stopCh <-chan struct{})
}

ConnectController is the controller interface for K8s connectors

type Event

type Event struct {
	// Key is in the form of <namespace>/<name>, e.g. default/pod-abc123,
	// and corresponds to the resource modified.
	Key string
	// Obj holds the resource that was modified at the time of the event
	// occurring. If possible, the resource should be retrieved from the informer
	// cache, instead of using this field because the cache will be more up to
	// date at the time the event is processed.
	// In some cases, such as a delete event, the resource will no longer exist
	// in the cache and then it is useful to have the resource here.
	Obj interface{}
}

Event is something that occurred to the resources we're watching.

type GRPCMeta

type GRPCMeta struct {
	Interface string              `json:"interface,omitempty"`
	Methods   map[string][]string `json:"methods,omitempty"`
}

type IterCb added in v1.2.1

type IterCb[K comparable, V any] func(key K, v V)

IterCb Iterator callbacalled for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards

type K2CContext added in v1.2.1

type K2CContext struct {

	// ServiceMap holds services we should sync to cloud. Keys are the
	// in the form <kube namespace>/<kube svc name>.
	ServiceMap ConcurrentMap[string, *corev1.Service]

	// EndpointsMap uses the same keys as serviceMap but maps to the endpoints
	// of each service.
	EndpointsMap ConcurrentMap[string, *corev1.Endpoints]

	// IngressServiceMap uses the same keys as serviceMap but maps to the ingress
	// of each service if it exists.
	IngressServiceMap ConcurrentMap[string, ConcurrentMap[string, string]]

	// ServiceHostnameMap maps the name of a service to the hostName and port that
	// is provided by the Ingress resource for the service.
	ServiceHostnameMap ConcurrentMap[string, ServiceAddress]

	// registeredServiceMap holds the services in cloud that we've registered from kube.
	// It's populated via cloud's API and lets us diff what is actually in
	// cloud vs. what we expect to be there.
	RegisteredServiceMap ConcurrentMap[string, []*CatalogRegistration]

	// ServiceNames is all namespaces mapped to a set of valid cloud service names
	ServiceNames ConcurrentMap[string, mapset.Set]

	// Namespaces is all namespaces mapped to a map of cloud service ids mapped to their CatalogRegistrations
	Namespaces ConcurrentMap[string, ConcurrentMap[string, *CatalogRegistration]]

	//deregistrations
	Deregs ConcurrentMap[string, *CatalogDeregistration]

	// Watchers is all namespaces mapped to a map of cloud service
	// names mapped to a cancel function for watcher routines
	Watchers ConcurrentMap[string, ConcurrentMap[string, context.CancelFunc]]
}

K2CContext is the k2c context for connector controller

func NewK2CContext added in v1.2.1

func NewK2CContext() *K2CContext

type K2GContext added in v1.2.1

type K2GContext struct {

	// ServiceMap holds services we should sync to gateway. Keys are the
	// in the form <kube namespace>/<kube svc name>.
	ServiceMap map[string]*corev1.Service

	//
	// Syncer Context
	//
	Services map[string]*corev1.Service
	Deregs   map[string]*corev1.Service
}

K2GContext is the k2g context for connector controller

func NewK2GContext added in v1.2.1

func NewK2GContext() *K2GContext

type MicroEndpointMeta added in v1.3.8

type MicroEndpointMeta struct {
	Ports   map[MicroServicePort]MicroServiceProtocol `json:"ports,omitempty"`
	Address MicroServiceAddress                       `json:"address,omitempty"`

	GRPCMeta map[string]interface{} `json:"grpcMeta,omitempty"`

	Native struct {
		ClusterSet     string               `json:"clusterSet,omitempty"`
		ClusterId      string               `json:"clusterId,omitempty"`
		ViaGatewayHTTP string               `json:"viaGatewayHttp,omitempty"`
		ViaGatewayGRPC string               `json:"viaGatewayGrpc,omitempty"`
		ViaGatewayMode ctv1.WithGatewayMode `json:"viaGatewayMode,omitempty"`
	} `json:"native"`
	Local struct {
		InternalService   bool                                      `json:"internalService,omitempty"`
		WithGateway       bool                                      `json:"withGateway,omitempty"`
		WithMultiGateways bool                                      `json:"withMultiGateways,omitempty"`
		BindFgwPorts      map[MicroServicePort]MicroServiceProtocol `json:"bindFgwPorts,omitempty"`
	} `json:"local"`
}

MicroEndpointMeta defines micro endpoint meta

func (*MicroEndpointMeta) Init added in v1.3.8

func (m *MicroEndpointMeta) Init(controller ConnectController, discClient ServiceDiscoveryClient)

type MicroService added in v1.2.2

type MicroService struct {
	NamespacedService
	// contains filtered or unexported fields
}

func (*MicroService) Endpoint

func (s *MicroService) Endpoint() *MicroServiceEndpoint

func (*MicroService) EndpointAddress

func (s *MicroService) EndpointAddress() *MicroServiceAddress

func (*MicroService) EndpointPort

func (s *MicroService) EndpointPort() *MicroServicePort

func (*MicroService) Protocol

func (s *MicroService) Protocol() *MicroServiceProtocol

func (*MicroService) SetGRPCPort

func (s *MicroService) SetGRPCPort(port int32)

func (*MicroService) SetHTTPPort

func (s *MicroService) SetHTTPPort(port int32)

func (*MicroService) Via

func (s *MicroService) Via() *MicroServiceVia

type MicroServiceAddress

type MicroServiceAddress string

MicroServiceAddress defines string as microservice address

func (*MicroServiceAddress) Get

func (a *MicroServiceAddress) Get() string

func (*MicroServiceAddress) Set

func (a *MicroServiceAddress) Set(addr string)

func (*MicroServiceAddress) To16

func (a *MicroServiceAddress) To16() net.IP

To16 converts the IP address ip to a 16-byte representation. If ip is not an IP address (it is the wrong length), To16 returns nil.

func (*MicroServiceAddress) To4

func (a *MicroServiceAddress) To4() net.IP

To4 converts the IPv4 address ip to a 4-byte representation. If ip is not an IPv4 address, To4 returns nil.

type MicroServiceEndpoint

type MicroServiceEndpoint struct {
	// contains filtered or unexported fields
}

func (*MicroServiceEndpoint) Get

func (*MicroServiceEndpoint) Set

type MicroServicePort

type MicroServicePort int32

MicroServicePort defines int as microservice port

func (*MicroServicePort) Get

func (p *MicroServicePort) Get() int32

func (*MicroServicePort) Set

func (p *MicroServicePort) Set(port int32)

type MicroServiceProtocol

type MicroServiceProtocol string

MicroServiceProtocol defines string as microservice protocol

func (*MicroServiceProtocol) Empty

func (p *MicroServiceProtocol) Empty() bool

func (*MicroServiceProtocol) Get

func (p *MicroServiceProtocol) Get() string

func (*MicroServiceProtocol) Set

func (p *MicroServiceProtocol) Set(protocol string)

func (*MicroServiceProtocol) SetVar

func (p *MicroServiceProtocol) SetVar(protocol MicroServiceProtocol)

type MicroServiceVia

type MicroServiceVia struct {
	// contains filtered or unexported fields
}

func (*MicroServiceVia) Get

func (*MicroServiceVia) Set

func (v *MicroServiceVia) Set(viaAddress MicroServiceAddress, viaPort MicroServicePort)

type MicroSvcDomainName

type MicroSvcDomainName string

MicroSvcDomainName defines string as microservice domain name

type MicroSvcMeta

type MicroSvcMeta struct {
	Ports     map[MicroServicePort]MicroServiceProtocol  `json:"ports,omitempty"`
	Endpoints map[MicroServiceAddress]*MicroEndpointMeta `json:"endpoints,omitempty"`

	GRPCMeta *GRPCMeta `json:"grpcMeta,omitempty"`

	HealthCheck bool `json:"healthcheck,omitempty"`
}

MicroSvcMeta defines micro service meta

func Decode added in v1.3.8

func Decode(svc *corev1.Service, enc string) *MicroSvcMeta

func (*MicroSvcMeta) Marshal added in v1.3.8

func (m *MicroSvcMeta) Marshal() string

func (*MicroSvcMeta) Unmarshal added in v1.3.8

func (m *MicroSvcMeta) Unmarshal(str string)

type MicroSvcName

type MicroSvcName string

MicroSvcName defines string as microservice name

type NamespacedService

type NamespacedService struct {
	Namespace string
	Service   string
}

type QueryOptions added in v1.2.1

type QueryOptions struct {
	// AllowStale allows any Consul server (non-leader) to service
	// a read. This allows for lower latency and higher throughput
	AllowStale bool

	// Namespace overrides the `default` namespace
	// Note: Namespaces are available only in Consul Enterprise
	Namespace string

	// WaitIndex is used to enable a blocking query. Waits
	// until the timeout or the next index is reached
	WaitIndex uint64

	// WaitTime is used to bound the duration of a wait.
	// Defaults to that of the Config, but can be overridden.
	WaitTime time.Duration

	// Providing a peer name in the query option
	Peer string

	// Filter requests filtering data prior to it being returned. The string
	// is a go-bexpr compatible expression.
	Filter string
	// contains filtered or unexported fields
}

QueryOptions are used to parameterize a query

func (*QueryOptions) Context added in v1.2.1

func (o *QueryOptions) Context() context.Context

func (*QueryOptions) ToConsul added in v1.2.1

func (o *QueryOptions) ToConsul() *consul.QueryOptions

func (*QueryOptions) WithContext added in v1.2.1

func (o *QueryOptions) WithContext(ctx context.Context) *QueryOptions

type RemoveCb added in v1.2.1

type RemoveCb[K any, V any] func(key K, v V, exists bool) bool

RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map

type Resource

type Resource interface {
	// Informer returns the SharedIndexInformer that the controller will
	// use to watch for changes. An Informer is the long-running task that
	// holds blocking queries to K8S and stores data in a local store.
	Informer() cache.SharedIndexInformer

	// Upsert is the callback called when processing the queue
	// of changes from the Informer. If an error is returned, the given item
	// will be retried.
	Upsert(key string, obj interface{}) error
	// Delete is called on object deletion.
	// obj is the last known state of the object before deletion. In some
	// cases, it may not be up to date with the latest state of the object.
	// If an error is returned, the given item will be retried.
	Delete(key string, obj interface{}) error
}

Resource should be implemented by anything that should be watchable by CacheController. The Resource needs to be aware of how to create the Informer that is responsible for making API calls as well as what to do on Upsert and Delete.

type ResourceDeleteFunc added in v1.2.1

type ResourceDeleteFunc func(string, interface{}) error

type ResourceUpsertFunc added in v1.2.1

type ResourceUpsertFunc func(string, interface{}) error

ResourceUpsertFunc and ResourceDeleteFunc are the callback types for when a resource is inserted, updated, or deleted.

type ServiceAddress added in v1.2.1

type ServiceAddress struct {
	HostName string
	Port     int32
}

type ServiceDiscoveryClient added in v1.2.1

type ServiceDiscoveryClient interface {
	CatalogServices(q *QueryOptions) ([]NamespacedService, error)
	CatalogInstances(service string, q *QueryOptions) ([]*AgentService, error)
	RegisteredInstances(service string, q *QueryOptions) ([]*CatalogService, error)
	RegisteredServices(q *QueryOptions) ([]NamespacedService, error)
	Register(reg *CatalogRegistration) error
	Deregister(dereg *CatalogDeregistration) error
	EnableNamespaces() bool
	EnsureNamespaceExists(ns string) (bool, error)
	RegisteredNamespace(kubeNS string) string
	MicroServiceProvider() ctv1.DiscoveryServiceProvider
	IsInternalServices() bool
	Close()
}

type ServiceInstanceIDFunc added in v1.2.1

type ServiceInstanceIDFunc func(name, addr string, port MicroServicePort, protocol MicroServiceProtocol) string

type Stringer added in v1.2.1

type Stringer interface {
	fmt.Stringer
	comparable
}

type Tuple added in v1.2.1

type Tuple[K comparable, V any] struct {
	Key K
	Val V
}

Tuple used by the Iter & IterBuffered functions to wrap two variables together over a channel,

type UpsertCb added in v1.2.1

type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V

UpsertCb Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant

Directories

Path Synopsis
Package ctok implements a syncer from cloud to k8s.
Package ctok implements a syncer from cloud to k8s.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL