cloudevents/

directory
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2024 License: Apache-2.0

README

Cloudevents Clients

We have implemented the cloudevents-based clients in this package to assist developers in easily implementing the Event Based Manifestwork proposal.

Generic Clients

The generic client (generic.CloudEventsClient) is used to resync/publish/subscribe resource objects between sources and agents with cloudevents.

A resource object can be any object that implements the generic.ResourceObject interface.

Building a generic client on a source

Developers can use generic.NewCloudEventSourceClient method to build a generic client on the source. To build this client the developers need to provide

  1. A cloudevents source options (options.CloudEventsSourceOptions), this options have two parts

    • sourceID, it is a unique identifier for a source, for example, it can generate a source ID by hashing the hub cluster URL and appending the controller name. Similarly, a RESTful service can select a unique name or generate a unique ID in the associated database for its source identification.
    • CloudEventsOptions, it provides cloudevents clients to send/receive cloudevents based on different event protocol or driver implementations. Check the Supported Protocols and Drivers for more details.
  2. A resource lister (generic.Lister), it is used to list the resource objects on the source when resyncing the resources between sources and agents, for example, a hub controller can list the resources from the resource informers, and a RESTful service can list its resources from a database.

  3. A resource status hash getter method (generic.StatusHashGetter), this method will be used to calculate the resource status hash when resyncing the resource status between sources and agents.

  4. Codecs (generic.Codec), they are used to encode a resource object into a cloudevent and decode a cloudevent into a resource object with a given cloudevent data type. We have provided two data types (io.open-cluster-management.works.v1alpha1.manifests that contains a single resource object in the cloudevent payload and io.open-cluster-management.works.v1alpha1.manifestbundles that contains a list of resource objects in the cloudevent payload) for ManifestWork, they can be found in the work/payload package.

  5. Resource handler methods (generic.ResourceHandler), they are used to handle the resources status after the client received the resources status from agents.

for example, build a generic client on the source using MQTT protocol with the following code:

// build a client for the source1
client, err := generic.NewCloudEventSourceClient[*CustomerResource](
        ctx,
        mqtt.NewSourceOptions(mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml"), "client1", "source1"),
        customerResourceLister,
		customerResourceStatusHashGetter,
		customerResourceCodec,
	)

// start a go routine to receive the resources status from agents
go func() {
	if err := client.Subscribe(ctx, customerResourceHandler); err != nil {
		//TODO handle this error when subscribing the cloudevents failed
	}
}()

You may refer to the cloudevents client integration test as an example.

Building a generic client on a manged cluster

Developers can use generic.NewCloudEventAgentClient method to build a generic client on a managed cluster. To build this client the developers need to provide

  1. A cloudevents agent options (options.CloudEventsAgentOptions), this options have three parts

    • agentID, it is a unique identifier for an agent, for example, it can consist of a managed cluster name and an agent name.
    • clusterName, it is the name of a managed cluster on which the agent runs.
    • CloudEventsOptions, it provides cloudevents clients to send/receive cloudevents based on different event protocol or driver implementations. Check the Supported Protocols and Drivers for more details.
  2. A resource lister (generic.Lister), it is used to list the resource objects on a managed cluster when resyncing the resources between sources and agents, for example, a work agent can list its works from its work informers.

  3. A resource status hash getter method (generic.StatusHashGetter), this method will be used to calculate the resource status hash when resyncing the resource status between sources and agents.

  4. Codecs (generic.Codec), they are used to encode a resource object into a cloudevent and decode a cloudevent into a resource object with a given cloudevent data type. We have provided two data types (io.open-cluster-management.works.v1alpha1.manifests that contains a single resource object in the cloudevent payload and io.open-cluster-management.works.v1alpha1.manifestbundles that contains a list of resource objects in the cloudevent payload) for ManifestWork, they can be found in the work/payload package.

  5. Resource handler methods (generic.ResourceHandler), they are used to handle the resources after the client received the resources from sources.

for example, build a generic client on the source using MQTT protocol with the following code:

// build a client for a work agent on the cluster1
client, err := generic.NewCloudEventAgentClient[*CustomerResource](
        ctx,
        mqtt.NewAgentOptions(mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml"), "cluster1", "cluster1-work-agent"),
        &ManifestWorkLister{},
		ManifestWorkStatusHash,
		&ManifestBundleCodec{},
	)

// start a go routine to receive the resources from sources
go func() {
	if err := client.Subscribe(ctx, NewManifestWorkAgentHandler()); err != nil {
		//TODO handle this error when subscribing the cloudevents failed
	}
}()

Supported Protocols and Drivers

Currently, the CloudEvents options supports the following protocols/drivers:

To create CloudEvents source/agent options for these supported protocols/drivers, developers need to provide configuration specific to the protocol/driver. The configuration format resembles the kubeconfig for the Kubernetes client-go but has a different schema.

MQTT Protocol/Driver

Below is an example of a YAML configuration for the MQTT protocol:

broker: broker.example.com:1883
username: maestro
password: password
topics:
  sourceEvents: sources/maestro/consumers/+/sourceevents
  agentEvents: $share/statussubscribers/sources/maestro/consumers/+/agentevents

For detailed configuration options for the MQTT driver, refer to the MQTT driver options package.

gRPC Protocol/Driver

Here's an example of a YAML configuration for the gRPC protocol:

url: grpc.example.com:8443
caFile: /certs/ca.crt
clientCertFile: /certs/client.crt
clientKeyFile: /certs/client.key

For detailed configuration options for the gRPC driver, refer to the gRPC driver options package.

Kafka Protocol/Driver

Kafka Protocol/Drive is not enabled by default. You need to add -tags=kafka to build when you need Kafka.

Here's an example of a YAML configuration for the Kafka protocol:

bootstrapServer: kafka.example.com:9092
caFile: /certs/ca.crt
clientCertFile: /certs/client.crt
clientKeyFile: /certs/client.key

For detailed configuration options for the Kafka driver, refer to the Kafka driver options package.

Work Clients

We have provided a builder to build the ManifestWork client (ManifestWorkInterface) and informer (ManifestWorkInformer) based on the generic client.

Building work client for work controllers on the hub cluster

Developers can use the builder to build the ManifestWork client and informer with the source ID on the hub cluster.

sourceID := "example-controller"
// Building the clients based on cloudevents with MQTT
config := mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml")

clientHolder, err := work.NewClientHolderBuilder(config).
    WithClientID(fmt.Sprintf("%s-client", sourceID)).
    WithSourceID(sourceID).
    WithCodecs(codec.NewManifestBundleCodec()).
    NewSourceClientHolder(ctx)
if err != nil {
	return err
}

manifestWorkClient := clientHolder.ManifestWorks(clusterName)
manifestWorkInformer := clientHolder.ManifestWorkInformer()

// Building controllers with ManifestWork client and informer ...

// Start the ManifestWork informer
go manifestWorkInformer.Informer().Run(ctx.Done())
Building work client for work agent on the managed cluster

Developers can use the builder to build the ManifestWork client and informer with the cluster name on the managed cluster.

clusterName := "cluster1"
// Building the clients based on cloudevents with MQTT
config := mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml")

clientHolder, err := work.NewClientHolderBuilder(config).
    WithClientID(fmt.Sprintf("%s-work-agent", clusterName)).
    WithClusterName(clusterName).
    // Supports two event data types for ManifestWork
    WithCodecs(codec.NewManifestBundleCodec(), codec.NewManifestCodec(restMapper)).
    NewAgentClientHolder(ctx)
if err != nil {
	return err
}

manifestWorkClient := clientHolder.ManifestWorks(clusterName)
manifestWorkInformer := clientHolder.ManifestWorkInformer()

// Building controllers with ManifestWork client and informer ...

// Start the ManifestWork informer
go manifestWorkInformer.Informer().Run(ctx.Done())
Building garbage collector for work controllers on the hub cluster

The garbage collector is an optional component running alongside the ManifestWork source client. It is used to clean up ManifestWork resources owned by other resources. For example, in the addon-framework, ManifestWork resources, created by addon controllers, have owner references pointing to ManagedClusterAddon resources, and when the owner resources are deleted, the ManifestWork resources should be deleted as well.

Developers need to provide the following to build and run the garbage collector:

  1. Client Holder (work.ClientHolder): This contains the ManifestWork client and informer, which can be built using the builder mentioned in last two sections.
  2. Metadata Client (metadata.Interface): This is used to retrieve the owner resources of the ManifestWork resources.
  3. Owner resource filters map (map[schema.GroupVersionResource]*metav1.ListOptions): This is used to filter the owner resources of the ManifestWork resources.
listOptions := &metav1.ListOptions{
    FieldSelector: fmt.Sprintf("metadata.name=%s", addonName),
}
ownerGVRFilters := map[schema.GroupVersionResource]*metav1.ListOptions{
    // ManagedClusterAddon is the owner of the ManifestWork resources filtered by the addon name
    addonv1alpha1.SchemeGroupVersion.WithResource("managedclusteraddons"): listOptions,
}
// Initialize the garbage collector
garbageCollector := garbagecollector.NewGarbageCollector(clientHolder, metadataClient, ownerGVRFilters)
// Run the garbage collector with 1 worker to handle the garbage collection
go garbageCollector.Run(ctx, 1)

Directories

Path Synopsis
options/kafka
This is the dummy code to pass the compile when Kafka is not enabled.
This is the dummy code to pass the compile when Kafka is not enabled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL