Documentation ¶
Overview ¶
Package xds is an implementation of Envoy's xDS (Discovery Service) protocol.
Server is the base implementation of any gRPC server which supports the xDS protocol. All xDS bi-directional gRPC streams from Stream* calls must be handled by calling Server.HandleRequestStream. For example, to implement the ADS protocol:
func (s *myGRPCServer) StreamAggregatedResources(stream api.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { return s.xdsServer.HandleRequestStream(stream.Context(), stream, xds.AnyTypeURL) }
Server is parameterized by a map of supported resource type URLs to resource sets, e.g. to support the LDS and RDS protocols:
ldsCache := xds.NewCache() lds := xds.NewAckingResourceMutatorWrapper(ldsCache) rdsCache := xds.NewCache() rds := xds.NewAckingResourceMutatorWrapper(rdsCache) resTypes := map[string]xds.ResourceTypeConfiguration{ "type.googleapis.com/envoy.api.v2.Listener": {ldsCache, lds}, "type.googleapis.com/envoy.api.v2.RouteConfiguration": {rdsCache, rds}, } server := xds.NewServer(resTypes, 5*time.Seconds)
It is recommended to use a distinct resource set for each resource type to minimize the volume of messages sent and received by xDS clients.
Resource sets must implement the ResourceSource interface to provide read access to resources of one or multiple resource types:
type ResourceSource interface { GetResources(ctx context.Context, typeURL string, lastVersion *uint64, nodeIP string, resourceNames []string) (*VersionedResources, error) }
Resource sets should implement the ResourceSet interface to provide read-write access. It provides an API to atomically update the resources in the set: Upsert inserts or updates a single resource in the set, and Delete deletes a single resource from the set.
Cache is an efficient, ready-to-use implementation of ResourceSet:
typeURL := "type.googleapis.com/envoy.api.v2.Listener" ldsCache := xds.NewCache() ldsCache.Upsert(typeURL, "listener123", listenerA, false) ldsCache.Delete(typeURL, "listener456", false)
In order to wait for acknowledgements of updates by Envoy nodes, each resource set should be wrapped into an AckingResourceMutatorWrapper, which should then be passed to NewServer(). AckingResourceMutatorWrapper provides an extended API which accepts Completions to notify of ACKs.
typeURL := "type.googleapis.com/envoy.api.v2.Listener" ldsCache := xds.NewCache() lds := xds.NewAckingResourceMutatorWrapper(ldsCache) ctx, cancel := context.WithTimeout(..., 5*time.Second) wg := completion.NewWaitGroup(ctx) nodes := []string{"10.0.0.1"} // Nodes to wait an ACK from. lds.Upsert(typeURL, "listener123", listenerA, nodes, wg.AddCompletion()) lds.Delete(typeURL, "listener456", nodes, wg.AddCompletion()) wg.Wait() cancel()
Index ¶
- Constants
- Variables
- func IstioNodeToIP(nodeId string) (string, error)
- type AckingResourceMutator
- type AckingResourceMutatorRevertFunc
- type AckingResourceMutatorWrapper
- func (m *AckingResourceMutatorWrapper) Delete(typeURL string, resourceName string, nodeIDs []string, ...) AckingResourceMutatorRevertFunc
- func (m *AckingResourceMutatorWrapper) DeleteNode(nodeID string)
- func (m *AckingResourceMutatorWrapper) HandleResourceVersionAck(ackVersion uint64, nackVersion uint64, nodeIP string, resourceNames []string, ...)
- func (m *AckingResourceMutatorWrapper) Upsert(typeURL string, resourceName string, resource proto.Message, nodeIDs []string, ...) AckingResourceMutatorRevertFunc
- func (m *AckingResourceMutatorWrapper) UseCurrent(typeURL string, nodeIDs []string, wg *completion.WaitGroup)
- type BaseObservableResourceSource
- func (s *BaseObservableResourceSource) AddResourceVersionObserver(observer ResourceVersionObserver)
- func (s *BaseObservableResourceSource) NotifyNewResourceVersionRLocked(typeURL string, version uint64)
- func (s *BaseObservableResourceSource) RemoveResourceVersionObserver(observer ResourceVersionObserver)
- type Cache
- func (c *Cache) Clear(typeURL string) (version uint64, updated bool)
- func (c *Cache) Delete(typeURL string, resourceName string) (version uint64, updated bool, revert ResourceMutatorRevertFunc)
- func (c *Cache) EnsureVersion(typeURL string, version uint64)
- func (c *Cache) GetResources(ctx context.Context, typeURL string, lastVersion uint64, nodeIP string, ...) (*VersionedResources, error)
- func (c *Cache) Lookup(typeURL string, resourceName string) (proto.Message, error)
- func (c *Cache) Upsert(typeURL string, resourceName string, resource proto.Message) (version uint64, updated bool, revert ResourceMutatorRevertFunc)
- type MockStream
- func (s *MockStream) Close()
- func (s *MockStream) Recv() (*envoy_api_v2.DiscoveryRequest, error)
- func (s *MockStream) RecvResponse() (*envoy_api_v2.DiscoveryResponse, error)
- func (s *MockStream) Send(resp *envoy_api_v2.DiscoveryResponse) error
- func (s *MockStream) SendRequest(req *envoy_api_v2.DiscoveryRequest) error
- type NodeToIDFunc
- type ObservableResourceSet
- type ObservableResourceSource
- type ProxyError
- type ResourceMutator
- type ResourceMutatorRevertFunc
- type ResourceSet
- type ResourceSource
- type ResourceTypeConfiguration
- type ResourceVersionAckObserver
- type ResourceVersionObserver
- type ResourceWatcher
- type Server
- type Stream
- type VersionedResources
Constants ¶
const (
// AnyTypeURL is the default type URL to use for ADS resource sets.
AnyTypeURL = ""
)
Variables ¶
var ( // ErrNoADSTypeURL is the error returned when receiving a request without // a type URL from an ADS stream. ErrNoADSTypeURL = errors.New("type URL is required for ADS") // ErrUnknownTypeURL is the error returned when receiving a request with // an unknown type URL. ErrUnknownTypeURL = errors.New("unknown type URL") // ErrInvalidVersionInfo is the error returned when receiving a request // with a version info that is not a positive integer. ErrInvalidVersionInfo = errors.New("invalid version info") // ErrInvalidNonce is the error returned when receiving a request // with a response nonce that is not a positive integer. ErrInvalidResponseNonce = errors.New("invalid response nonce info") // ErrInvalidNodeFormat is the error returned when receiving a request // with a node that is not a formatted correctly. ErrInvalidNodeFormat = errors.New("invalid node format") // ErrResourceWatch is the error returned whenever an internal error // occurs while waiting for new versions of resources. ErrResourceWatch = errors.New("resource watch failed") )
var (
ErrNackReceived error = errors.New("NACK received")
)
Functions ¶
func IstioNodeToIP ¶
IstioNodeToIP extract the IP address from an Envoy node identifier configured by Istio's pilot-agent.
Istio's pilot-agent structures the node.id as the concatenation of the following parts separated by ~:
- node type: one of "sidecar", "ingress", or "router" - node IP address - node ID: the unique platform-specific sidecar proxy ID - node domain: the DNS domain suffix for short hostnames, e.g. "default.svc.cluster.local"
For instance:
"sidecar~10.1.1.0~v0.default~default.svc.cluster.local"
Types ¶
type AckingResourceMutator ¶
type AckingResourceMutator interface { // Upsert inserts or updates a resource from this set by name and increases // the set's version number atomically if the resource is actually inserted // or updated. // The completion is called back when the new upserted resources' version is // ACKed by the Envoy nodes which IDs are given in nodeIDs. // A call to the returned revert function reverts the effects of this // method call. Upsert(typeURL string, resourceName string, resource proto.Message, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc // UseCurrent inserts a completion that allows the caller to wait for the current // version of the given typeURL to be ACKed. UseCurrent(typeURL string, nodeIDs []string, wg *completion.WaitGroup) // DeleteNode frees resources held for the named node DeleteNode(nodeID string) // Delete deletes a resource from this set by name and increases the cache's // version number atomically if the resource is actually deleted. // The completion is called back when the new deleted resources' version is // ACKed by the Envoy nodes which IDs are given in nodeIDs. // A call to the returned revert function reverts the effects of this // method call. Delete(typeURL string, resourceName string, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc }
AckingResourceMutator is a variant of ResourceMutator which calls back a Completion when a resource update is ACKed by a set of Envoy nodes.
type AckingResourceMutatorRevertFunc ¶
type AckingResourceMutatorRevertFunc func(completion *completion.Completion)
AckingResourceMutatorRevertFunc is a function which reverts the effects of an update on a AckingResourceMutator. The completion is called back when the new resource update is ACKed by the Envoy nodes.
type AckingResourceMutatorWrapper ¶
type AckingResourceMutatorWrapper struct {
// contains filtered or unexported fields
}
AckingResourceMutatorWrapper is an AckingResourceMutator which wraps a ResourceMutator to notifies callers when resource updates are ACKed by nodes. AckingResourceMutatorWrapper also implements ResourceVersionAckObserver in order to be notified of ACKs from nodes.
func NewAckingResourceMutatorWrapper ¶
func NewAckingResourceMutatorWrapper(mutator ResourceMutator) *AckingResourceMutatorWrapper
NewAckingResourceMutatorWrapper creates a new AckingResourceMutatorWrapper to wrap the given ResourceMutator.
func (*AckingResourceMutatorWrapper) Delete ¶
func (m *AckingResourceMutatorWrapper) Delete(typeURL string, resourceName string, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc
func (*AckingResourceMutatorWrapper) DeleteNode ¶ added in v1.6.4
func (m *AckingResourceMutatorWrapper) DeleteNode(nodeID string)
DeleteNode frees resources held for the named nodes
func (*AckingResourceMutatorWrapper) HandleResourceVersionAck ¶
func (m *AckingResourceMutatorWrapper) HandleResourceVersionAck(ackVersion uint64, nackVersion uint64, nodeIP string, resourceNames []string, typeURL string, detail string)
'ackVersion' is the last version that was acked. 'nackVersion', if greater than 'nackVersion', is the last version that was NACKed.
func (*AckingResourceMutatorWrapper) Upsert ¶
func (m *AckingResourceMutatorWrapper) Upsert(typeURL string, resourceName string, resource proto.Message, nodeIDs []string, wg *completion.WaitGroup, callback func(error)) AckingResourceMutatorRevertFunc
func (*AckingResourceMutatorWrapper) UseCurrent ¶ added in v1.6.4
func (m *AckingResourceMutatorWrapper) UseCurrent(typeURL string, nodeIDs []string, wg *completion.WaitGroup)
UseCurrent adds a completion to the WaitGroup if the current version of the cached resource has not been acked yet, allowing the caller to wait for the ACK.
type BaseObservableResourceSource ¶
type BaseObservableResourceSource struct {
// contains filtered or unexported fields
}
BaseObservableResourceSource implements the AddResourceVersionObserver and RemoveResourceVersionObserver methods to handle the notification of new resource versions. This is meant to be used as a base to implement ObservableResourceSource.
func NewBaseObservableResourceSource ¶
func NewBaseObservableResourceSource() *BaseObservableResourceSource
NewBaseObservableResourceSource initializes the given set.
func (*BaseObservableResourceSource) AddResourceVersionObserver ¶
func (s *BaseObservableResourceSource) AddResourceVersionObserver(observer ResourceVersionObserver)
AddResourceVersionObserver registers an observer to be notified of new resource version.
func (*BaseObservableResourceSource) NotifyNewResourceVersionRLocked ¶
func (s *BaseObservableResourceSource) NotifyNewResourceVersionRLocked(typeURL string, version uint64)
NotifyNewResourceVersionRLocked notifies registered observers that a new version of the resources of the given type is available. This function MUST be called with locker's lock acquired.
func (*BaseObservableResourceSource) RemoveResourceVersionObserver ¶
func (s *BaseObservableResourceSource) RemoveResourceVersionObserver(observer ResourceVersionObserver)
RemoveResourceVersionObserver unregisters an observer that was previously registered by calling AddResourceVersionObserver.
type Cache ¶
type Cache struct { *BaseObservableResourceSource // contains filtered or unexported fields }
Cache is a key-value container which allows atomically updating entries and incrementing a version number and notifying observers if the cache is actually modified. Cache implements the ObservableResourceSet interface. This cache implementation ignores the proxy node identifiers, i.e. the same resources are available under the same names to all nodes.
func NewCache ¶
func NewCache() *Cache
NewCache creates a new, empty cache with 0 as its current version.
func (*Cache) EnsureVersion ¶
func (*Cache) GetResources ¶
type MockStream ¶
type MockStream struct {
// contains filtered or unexported fields
}
MockStream is a mock implementation of Stream used for testing.
func NewMockStream ¶
func NewMockStream(ctx context.Context, recvSize, sentSize int, recvTimeout, sentTimeout time.Duration) *MockStream
NewMockStream creates a new mock Stream for testing.
func (*MockStream) Close ¶
func (s *MockStream) Close()
Close closes the resources used by this MockStream.
func (*MockStream) Recv ¶
func (s *MockStream) Recv() (*envoy_api_v2.DiscoveryRequest, error)
func (*MockStream) RecvResponse ¶
func (s *MockStream) RecvResponse() (*envoy_api_v2.DiscoveryResponse, error)
RecvResponse receives a response that was queued by calling Send.
func (*MockStream) Send ¶
func (s *MockStream) Send(resp *envoy_api_v2.DiscoveryResponse) error
func (*MockStream) SendRequest ¶
func (s *MockStream) SendRequest(req *envoy_api_v2.DiscoveryRequest) error
SendRequest queues a request to be received by calling Recv.
type NodeToIDFunc ¶
type NodeToIDFunc func(node *envoy_api_v2_core.Node) (string, error)
NodeToIDFunc extracts a string identifier from an Envoy Node identifier.
type ObservableResourceSet ¶
type ObservableResourceSet interface { ObservableResourceSource ResourceMutator }
ObservableResourceSet is a ResourceSet that allows registering observers of new resource versions from this source.
type ObservableResourceSource ¶
type ObservableResourceSource interface { ResourceSource // AddResourceVersionObserver registers an observer of new versions of // resources from this source. AddResourceVersionObserver(listener ResourceVersionObserver) // RemoveResourceVersionObserver unregisters an observer of new versions of // resources from this source. RemoveResourceVersionObserver(listener ResourceVersionObserver) }
ObservableResourceSource is a ResourceSource that allows registering observers of new resource versions from this source.
type ProxyError ¶
ProxyError wraps the error and the detail received from the proxy in to a new type that implements the error interface.
func (*ProxyError) Error ¶
func (pe *ProxyError) Error() string
type ResourceMutator ¶
type ResourceMutator interface { // Upsert inserts or updates a resource from this set by name. // If the set is modified (the resource is actually inserted or updated), // the set's version number is incremented atomically and the returned // updated value is true. // Otherwise, the version number is not modified and the returned updated // value is false. // The returned version value is the set's version after update. // A call to the returned revert function reverts the effects of this // method call. Upsert(typeURL string, resourceName string, resource proto.Message) (version uint64, updated bool, revert ResourceMutatorRevertFunc) // Delete deletes a resource from this set by name. // If the set is modified (the resource is actually deleted), the set's // version number is incremented atomically and the returned updated value // is true. // Otherwise, the version number is not modified and the returned updated // value is false. // The returned version value is the set's version after update. // A call to the returned revert function reverts the effects of this // method call. Delete(typeURL string, resourceName string) (version uint64, updated bool, revert ResourceMutatorRevertFunc) // Clear deletes all the resources of the given type from this set. // If the set is modified (at least one resource is actually deleted), // the set's version number is incremented atomically and the returned // updated value is true. // Otherwise, the version number is not modified and the returned updated // value is false. // The returned version value is the set's version after update. // This method call cannot be reverted. Clear(typeURL string) (version uint64, updated bool) }
ResourceMutator provides write access to a versioned set of resources. A single version is associated to all the contained resources. The version is monotonically increased for any change to the set.
type ResourceMutatorRevertFunc ¶
ResourceMutatorRevertFunc is a function which reverts the effects of an update on a ResourceMutator. The returned version value is the set's version after update.
type ResourceSet ¶
type ResourceSet interface { ResourceSource ResourceMutator }
ResourceSet provides read-write access to a versioned set of resources. A single version is associated to all the contained resources. The version is monotonically increased for any change to the set.
type ResourceSource ¶
type ResourceSource interface { // GetResources returns the current version of the resources with the given // names. // If lastVersion is not nil and the resources with the given names haven't // changed since lastVersion, nil is returned. // If resourceNames is empty, all resources are returned. // Should not be blocking. GetResources(ctx context.Context, typeURL string, lastVersion uint64, nodeIP string, resourceNames []string) (*VersionedResources, error) // EnsureVersion increases this resource set's version to be at least the // given version. If the current version is already higher than the // given version, this has no effect. EnsureVersion(typeURL string, version uint64) }
ResourceSource provides read access to a versioned set of resources. A single version is associated to all the contained resources. The version is monotonically increased for any change to the set.
type ResourceTypeConfiguration ¶
type ResourceTypeConfiguration struct { // Source contains the resources of this type. Source ObservableResourceSource // AckObserver is called back whenever a node acknowledges having applied a // version of the resources of this type. AckObserver ResourceVersionAckObserver }
ResourceTypeConfiguration is the configuration of the XDS server for a resource type.
type ResourceVersionAckObserver ¶
type ResourceVersionAckObserver interface { // HandleResourceVersionAck notifies that the node with the given NodeIP // has acknowledged having applied the resources. // Calls to this function must not block. HandleResourceVersionAck(ackVersion uint64, nackVersion uint64, nodeIP string, resourceNames []string, typeURL string, detail string) }
ResourceVersionAckObserver defines the HandleResourceVersionAck method which is called whenever a node acknowledges having applied a version of the resources of a given type.
type ResourceVersionObserver ¶
type ResourceVersionObserver interface { // HandleNewResourceVersion notifies of a new version of the resources of // the given type. HandleNewResourceVersion(typeURL string, version uint64) }
ResourceVersionObserver defines the HandleNewResourceVersion method which is called whenever the version of the resources of a given type has changed.
type ResourceWatcher ¶
type ResourceWatcher struct {
// contains filtered or unexported fields
}
ResourceWatcher watches and retrieves new versions of resources from a resource set. ResourceWatcher implements ResourceVersionObserver to get notified when new resource versions are available in the set.
func NewResourceWatcher ¶
func NewResourceWatcher(typeURL string, resourceSet ResourceSource, resourceAccessTimeout time.Duration) *ResourceWatcher
NewResourceWatcher creates a new ResourceWatcher backed by the given resource set.
func (*ResourceWatcher) HandleNewResourceVersion ¶
func (w *ResourceWatcher) HandleNewResourceVersion(typeURL string, version uint64)
func (*ResourceWatcher) WatchResources ¶
func (w *ResourceWatcher) WatchResources(ctx context.Context, typeURL string, lastVersion uint64, nodeIP string, resourceNames []string, out chan<- *VersionedResources)
WatchResources watches for new versions of specific resources and sends them into the given out channel.
A call to this method blocks until a version greater than lastVersion is available. Therefore, every call must be done in a separate goroutine. A watch can be canceled by canceling the given context.
lastVersion is the last version successfully applied by the client; nil if this is the first request for resources. This method call must always close the out channel.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server implements the handling of xDS streams.
func NewServer ¶
func NewServer(resourceTypes map[string]*ResourceTypeConfiguration, resourceAccessTimeout time.Duration) *Server
NewServer creates an xDS gRPC stream handler using the given resource sources. types maps each supported resource type URL to its corresponding resource source and ACK observer.
type Stream ¶
type Stream interface { // Send sends a xDS response back to the client. Send(*envoy_api_v2.DiscoveryResponse) error // Recv receives a xDS request from the client. Recv() (*envoy_api_v2.DiscoveryRequest, error) }
Stream is the subset of the gRPC bi-directional stream types which is used by Server.
type VersionedResources ¶
type VersionedResources struct { // Version is the version of the resources. Version uint64 // ResourceNames is the list of names of resources. // May be empty. ResourceNames []string // Resources is the list of protobuf-encoded resources. // May be empty. Must be of the same length as ResourceNames. Resources []proto.Message // Canary indicates whether the client should only do a dry run of // using the resources. Canary bool }
VersionedResources is a set of protobuf-encoded resources along with their version.