adsc

package
v0.0.0-...-83b5d90 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: Apache-2.0 Imports: 52 Imported by: 10

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigInitialRequests

func ConfigInitialRequests() []*discovery.DiscoveryRequest

Types

type ADSC

type ADSC struct {

	// Updates includes the type of the last update received from the server.
	Updates chan string

	XDSUpdates  chan *discovery.DiscoveryResponse
	VersionInfo map[string]string

	// Last received message, by type
	Received map[string]*discovery.DiscoveryResponse

	Mesh *v1alpha1.MeshConfig

	// Retrieved configurations can be stored using the common istio model interface.
	Store model.ConfigStore

	// Retrieved endpoints can be stored in the memory registry. This is used for CDS and EDS responses.
	Registry *memory.ServiceDiscovery
	// contains filtered or unexported fields
}

ADSC implements a basic client for ADS, for use in stress tests and tools or libraries that need to connect to Istio pilot or other ADS servers.

func New

func New(discoveryAddr string, opts *ADSConfig) (*ADSC, error)

New creates a new ADSC, maintaining a connection to an XDS server. Will: - get certificate using the Secret provider, if CertRequired - connect to the XDS server specified in ProxyConfig - send initial request for watched resources - wait for response from XDS server - on success, start a background thread to maintain the connection, with exp. backoff.

func NewWithBackoffPolicy

func NewWithBackoffPolicy(discoveryAddr string, opts *ADSConfig, backoffPolicy backoff.BackOff) (*ADSC, error)

func (*ADSC) Close

func (a *ADSC) Close()

Close the stream.

func (*ADSC) Dial

func (a *ADSC) Dial() error

Dial connects to a ADS server, with optional MTLS authentication if a cert dir is specified.

func (*ADSC) EndpointsJSON

func (a *ADSC) EndpointsJSON() string

EndpointsJSON returns the endpoints, formatted as JSON, for debugging.

func (*ADSC) GetClusters

func (a *ADSC) GetClusters() map[string]*cluster.Cluster

GetClusters returns all the non-eds type clusters.

func (*ADSC) GetEdsClusters

func (a *ADSC) GetEdsClusters() map[string]*cluster.Cluster

GetEdsClusters returns all the eds type clusters.

func (*ADSC) GetEndpoints

func (a *ADSC) GetEndpoints() map[string]*endpoint.ClusterLoadAssignment

GetEndpoints returns all the routes.

func (*ADSC) GetHTTPListeners

func (a *ADSC) GetHTTPListeners() map[string]*listener.Listener

GetHTTPListeners returns all the http listeners.

func (*ADSC) GetRoutes

func (a *ADSC) GetRoutes() map[string]*route.RouteConfiguration

GetRoutes returns all the routes.

func (*ADSC) GetTCPListeners

func (a *ADSC) GetTCPListeners() map[string]*listener.Listener

GetTCPListeners returns all the tcp listeners.

func (*ADSC) HasSynced

func (a *ADSC) HasSynced() bool

HasSynced returns true if MCP configs have synced

func (*ADSC) Run

func (a *ADSC) Run() error

Run will create a new stream using the existing grpc client connection and send the initial xds requests. And then it will run a go routine receiving and handling xds response. Note: it is non blocking

func (*ADSC) Save

func (a *ADSC) Save(base string) error

Save will save the json configs to files, using the base directory

func (*ADSC) Send

func (a *ADSC) Send(req *discovery.DiscoveryRequest) error

Raw send of a request.

func (*ADSC) Wait

func (a *ADSC) Wait(to time.Duration, updates ...string) ([]string, error)

Wait for an updates for all the specified types If updates is empty, this will wait for any update

func (*ADSC) WaitClear

func (a *ADSC) WaitClear()

WaitClear will clear the waiting events, so next call to Wait will get the next push type.

func (*ADSC) WaitSingle

func (a *ADSC) WaitSingle(to time.Duration, want string, reject string) error

WaitSingle waits for a single resource, and fails if the rejected type is returned. We avoid rejecting all other types to avoid race conditions. For example, a test asserting an incremental update of EDS may fail if a previous push's RDS response comes in later. Instead, we can reject events coming before (ie CDS). The only real alternative is to wait which introduces its own issues.

func (*ADSC) WaitVersion

func (a *ADSC) WaitVersion(to time.Duration, typeURL, lastVersion string) (*discovery.DiscoveryResponse, error)

WaitVersion waits for a new or updated for a typeURL.

type ADSConfig

type ADSConfig struct {
	Config

	// InitialDiscoveryRequests is a list of resources to watch at first, represented as URLs (for new XDS resource naming)
	// or type URLs.
	InitialDiscoveryRequests []*discovery.DiscoveryRequest

	// ResponseHandler will be called on each DiscoveryResponse.
	// TODO: mirror Generator, allow adding handler per type
	ResponseHandler ResponseHandler
}

ADSConfig for the ADS connection.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a stateful ADS (Aggregated Discovery Service) client designed to handle delta updates from an xDS server. Central to this client is a dynamic 'tree' of resources, representing the relationships and states of resources in the service mesh. The client's operation unfolds in the following steps:

  1. Sending Initial Requests: The client initiates requests for resources it needs, as specified by the Watch function. This step sets the stage for receiving relevant DeltaDiscoveryResponse from the server.

  2. Processing DeltaDiscoveryResponses: Upon receiving a delta response, the client performs several key actions: - Event Handling: Triggers specific handlers for each resource, as register using Register function during client initialization. - Tree Update: Modifies its 'tree' to reflect changes in resources, such as adding new resources, updating relationships between parents and children, and removing or unlinking resources.

  3. State Synchronization: Post-processing the delta response, the client updates its internal state. This involves: - Acknowledgements and Errors: Communicating acknowledgements or errors back to the server based on the processing outcome. In cases of error or rejection, a Nack can be sent using HandlerContext.Reject. - Dependency Updates: Triggering requests for dependent resources. These dependencies are established via HandlerContext.RegisterDependency.

An example of a handler registration is as follows:

clusterHandler := Register(func(ctx HandlerContext, res *cluster.Cluster, event Event) {
  if event == EventDelete {
    return
  }
  ctx.RegisterDependency(v3.SecretType, ExtractClusterSecretResources(t, res)...)
  ctx.RegisterDependency(v3.EndpointType, ExtractEdsClusterNames([]*cluster.Cluster{res})...)
})

It means that when a cluster is added or updated, the client will trigger requests for the secrets and endpoints that the cluster depends on.

An example of register handlers:

handlers := []Option{
  clusterHandler,
  Watch[*cluster.Cluster]("*"),
  listenerHandler,
  Watch[*listener.Listener]("*"),
  endpointsHandler,
  routesHandler,
  secretsHandler,
}

client := NewDelta("localhost:8080", handlers...)

It means that the client will watch all clusters and listeners, and trigger resource events for clusters, listeners, endpoints, routes and secrets that the clusters and listeners depend on.

func NewDelta

func NewDelta(discoveryAddr string, config *DeltaADSConfig, opts ...Option) *Client

func NewDeltaWithBackoffPolicy

func NewDeltaWithBackoffPolicy(discoveryAddr string, config *DeltaADSConfig, backoffPolicy backoff.BackOff, opts ...Option) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) Run

func (c *Client) Run(ctx context.Context)

Run starts the client. If a backoffPolicy is configured, it will reconnect on error.

func (*Client) Synced

func (c *Client) Synced() <-chan struct{}

Synced returns a channel that will close once the client has received all initial watches

type Config

type Config struct {
	// Is the name of the client for user-facing logs. If not set, Address will be used
	ClientName string

	// Address of the xDS server
	Address string

	// XDSSAN is the expected SAN of the XDS server. If not set, the ProxyConfig.DiscoveryAddress is used.
	XDSSAN string

	// Namespace defaults to 'default'
	Namespace string

	// Workload defaults to 'test'
	Workload string

	// Revision for this control plane instance. We will only read configs that match this revision.
	Revision string

	// Meta includes additional metadata for the node
	Meta *pstruct.Struct

	Locality *core.Locality

	// NodeType defaults to sidecar. "ingress" and "router" are also supported.
	NodeType model.NodeType

	// IP is currently the primary key used to locate inbound configs. It is sent by client,
	// must match a known endpoint IP. Tests can use a ServiceEntry to register fake IPs.
	IP string

	// CertDir is the directory where mTLS certs are configured.
	// If CertDir and Secret are empty, an insecure connection will be used.
	// TODO: implement SecretManager for cert dir
	CertDir string

	// Secrets is the interface used for getting keys and rootCA.
	SecretManager security.SecretManager

	// XDSRootCAFile explicitly set the root CA to be used for the XDS connection.
	// Mirrors Envoy file.
	XDSRootCAFile string

	// InsecureSkipVerify skips client verification the server's certificate chain and host name.
	InsecureSkipVerify bool

	// BackoffPolicy determines the reconnect policy. Based on MCP client.
	BackoffPolicy backoff.BackOff

	GrpcOpts []grpc.DialOption
}

type DeltaADSConfig

type DeltaADSConfig struct {
	Config
}

DeltaADSConfig for delta ADS connection.

type DeltaAggregatedResourcesClient

type DeltaAggregatedResourcesClient interface {
	Send(*discovery.DeltaDiscoveryRequest) error
	Recv() (*discovery.DeltaDiscoveryResponse, error)
	CloseSend() error
}

type Event

type Event int

Event represents a registry update event

const (
	// EventAdd is sent when an object is added
	EventAdd Event = iota

	// EventDelete is sent when an object is deleted
	// Captures the object at the last known state
	EventDelete
)

func (Event) String

func (event Event) String() string

type HandlerContext

type HandlerContext interface {
	RegisterDependency(typeURL string, resourceName ...string)
	Reject(reason error)
}

type HandlerFunc

type HandlerFunc func(ctx HandlerContext, res *Resource, event Event)

type Option

type Option func(c *Client)

func Register

func Register[T proto.Message](f func(ctx HandlerContext, resourceName string, resourceVersion string, resourceEntity T, event Event)) Option

Register registers a handler for a type which is reflected by the proto message.

func Watch

func Watch[T proto.Message](resourceName string) Option

Watch registers an initial watch for a type based on the type reflected by the proto message.

type Resource

type Resource struct {
	Name    string
	Version string
	Entity  proto.Message
}

type ResponseHandler

type ResponseHandler interface {
	HandleResponse(con *ADSC, response *discovery.DiscoveryResponse)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL