Cloudevents Clients
We have implemented the cloudevents-based clients in this package to assist developers in
easily implementing the Event Based Manifestwork
proposal.
Generic Clients
The generic client (generic.CloudEventsClient
) is used to resync/publish/subscribe resource objects between sources
and agents with cloudevents.
A resource object can be any object that implements the generic.ResourceObject
interface.
Building a generic client on a source
Developers can use generic.NewCloudEventSourceClient
method to build a generic client on the source. To build this
client the developers need to provide
-
A cloudevents source options (options.CloudEventsSourceOptions
), this options have two parts
sourceID
, it is a unique identifier for a source, for example, it can generate a source ID by hashing the hub
cluster URL and appending the controller name. Similarly, a RESTful service can select a unique name or generate a
unique ID in the associated database for its source identification.
CloudEventsOptions
, it provides cloudevents clients to send/receive cloudevents based on different event
protocol or driver implementations. Check the Supported Protocols and Drivers for more details.
-
A resource lister (generic.Lister
), it is used to list the resource objects on the source when resyncing the
resources between sources and agents, for example, a hub controller can list the resources from the resource informers,
and a RESTful service can list its resources from a database.
-
A resource status hash getter method (generic.StatusHashGetter
), this method will be used to calculate the resource
status hash when resyncing the resource status between sources and agents.
-
Codecs (generic.Codec
), they are used to encode a resource object into a cloudevent and decode a cloudevent into a
resource object with a given cloudevent data type. We have provided two data types (io.open-cluster-management.works.v1alpha1.manifests
that contains a single resource object in the cloudevent payload and io.open-cluster-management.works.v1alpha1.manifestbundles
that contains a list of resource objects in the cloudevent payload) for ManifestWork
, they can be found in the work/payload
package. Refer to this doc to find the events schema and examples
-
Resource handler methods (generic.ResourceHandler
), they are used to handle the resources status after the client
received the resources status from agents.
for example, build a generic client on the source using MQTT protocol with the following code:
// build a client for the source1
client, err := generic.NewCloudEventSourceClient[*CustomerResource](
ctx,
mqtt.NewSourceOptions(mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml"), "client1", "source1"),
customerResourceLister,
customerResourceStatusHashGetter,
customerResourceCodec,
)
// subscribe to a broker to receive the resources status from agents
client.Subscribe(ctx, customerResourceHandler)
// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-cloudEventsClient.ReconnectedChan():
// handle the cloudevents reconnect
}
}
}()
Building a generic client on a manged cluster
Developers can use generic.NewCloudEventAgentClient
method to build a generic client on a managed cluster. To build
this client the developers need to provide
-
A cloudevents agent options (options.CloudEventsAgentOptions
), this options have three parts
agentID
, it is a unique identifier for an agent, for example, it can consist of a managed cluster name and an
agent name.
clusterName
, it is the name of a managed cluster on which the agent runs.
CloudEventsOptions
, it provides cloudevents clients to send/receive cloudevents based on different event
protocol or driver implementations. Check the Supported Protocols and Drivers for more details.
-
A resource lister (generic.Lister
), it is used to list the resource objects on a managed cluster when resyncing the
resources between sources and agents, for example, a work agent can list its works from its work informers.
-
A resource status hash getter method (generic.StatusHashGetter
), this method will be used to calculate the resource
status hash when resyncing the resource status between sources and agents.
-
Codecs (generic.Codec
), they are used to encode a resource object into a cloudevent and decode a cloudevent into a
resource object with a given cloudevent data type. We have provided two data types (io.open-cluster-management.works.v1alpha1.manifests
that contains a single resource object in the cloudevent payload and io.open-cluster-management.works.v1alpha1.manifestbundles
that contains a list of resource objects in the cloudevent payload) for ManifestWork
, they can be found in the work/payload
package. Refer to this doc to find the events schema and examples
-
Resource handler methods (generic.ResourceHandler
), they are used to handle the resources after the client received
the resources from sources.
for example, build a generic client on the source using MQTT protocol with the following code:
// build a client for a work agent on the cluster1
client, err := generic.NewCloudEventAgentClient[*CustomerResource](
ctx,
mqtt.NewAgentOptions(mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml"), "cluster1", "cluster1-work-agent"),
&ManifestWorkLister{},
ManifestWorkStatusHash,
&ManifestBundleCodec{},
)
// subscribe to a broker to receive the resources from sources
client.Subscribe(ctx, NewManifestWorkAgentHandler())
// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-cloudEventsClient.ReconnectedChan():
// handle the cloudevents reconnect
}
}
}()
Supported Protocols and Drivers
Currently, the CloudEvents options supports the following protocols/drivers:
To create CloudEvents source/agent options for these supported protocols/drivers, developers need to provide configuration specific to the protocol/driver. The configuration format resembles the kubeconfig for the Kubernetes client-go but has a different schema.
MQTT Protocol/Driver
Below is an example of a YAML configuration for the MQTT protocol:
broker: broker.example.com:1883
username: maestro
password: password
topics:
sourceEvents: sources/maestro/consumers/+/sourceevents
agentEvents: $share/statussubscribers/sources/maestro/consumers/+/agentevents
For detailed configuration options for the MQTT driver, refer to the MQTT driver options package.
gRPC Protocol/Driver
Here's an example of a YAML configuration for the gRPC protocol:
url: grpc.example.com:8443
caFile: /certs/ca.crt
clientCertFile: /certs/client.crt
clientKeyFile: /certs/client.key
For detailed configuration options for the gRPC driver, refer to the gRPC driver options package.
Kafka Protocol/Driver
Kafka Protocol/Drive is not enabled by default. To enable it, add the -tags=kafka
flag before the build.
Here’s a sample configuration for Kafka in YAML:
bootstrapServer: kafka.example.com:9092
caFile: /certs/ca.crt
clientCertFile: /certs/client.crt
clientKeyFile: /certs/client.key
For detailed configuration options for the Kafka driver, refer to the Kafka driver options package. You can also add advanced configurations supported by librdkafka. For instance, to set the maximum Kafka protocol request message size to 200,000 bytes, use the following configuration in YAML:
bootstrapServer: kafka.example.com:9092
caFile: /certs/ca.crt
clientCertFile: /certs/client.crt
clientKeyFile: /certs/client.key
message.copy.max.bytes: 200000
For the complete list of supported configurations, refer to the librdkafka documentation.
Work Clients
Building a ManifestWorkSourceClient on the hub cluster with SourceLocalWatcherStore
sourceID := "example-controller"
// Building the clients based on cloudevents with MQTT
config := mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml")
// Define a function to list works for initializing the store
listLocalWorksFunc :=func(ctx context.Context) (works []*workv1.ManifestWork, err error) {
// list the works ...
return works, err
}
// New a SourceLocalWatcherStore
watcherStore, err := workstore.NewSourceLocalWatcherStore(ctx, listLocalWorksFunc)
if err != nil {
return err
}
clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(fmt.Sprintf("%s-client", sourceID)).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {
return err
}
manifestWorkClient := clientHolder.ManifestWorks(metav1.NamespaceAll)
// Use the manifestWorkClient to create/update/delete/watch manifestworks...
sourceID := "example-controller"
// Building the clients based on cloudevents with MQTT
config := mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml")
// New a SourceInformerWatcherStore
watcherStore := workstore.NewSourceInformerWatcherStore(ctx)
clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(fmt.Sprintf("%s-%s", sourceID, rand.String(5))).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {
return nil, nil, err
}
factory := workinformers.NewSharedInformerFactoryWithOptions(clientHolder.WorkInterface(), 5*time.Minute)
informer := factory.Work().V1().ManifestWorks()
// Use the informer's store as the SourceInformerWatcherStore's store
watcherStore.SetStore(informer.Informer().GetStore())
// Building controllers with ManifestWork informer ...
go informer.Informer().Run(ctx.Done())
// Use the manifestWorkClient to create/update/delete manifestworks
clusterName := "cluster1"
// Building the clients based on cloudevents with MQTT
config := mqtt.BuildMQTTOptionsFromFlags("path/to/mqtt-config.yaml")
// New a AgentInformerWatcherStore
watcherStore := store.NewAgentInformerWatcherStore()
clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(fmt.Sprintf("%s-work-agent", clusterName)).
WithClusterName(clusterName).
// Supports two event data types for ManifestWork
WithCodecs(codec.NewManifestBundleCodec(), codec.NewManifestCodec(restMapper)).
WithWorkClientWatcherStore(watcherStore).
NewAgentClientHolder(ctx)
if err != nil {
return err
}
manifestWorkClient := clientHolder.ManifestWorks(clusterName)
factory := workinformers.NewSharedInformerFactoryWithOptions(
clientHolder.WorkInterface(),
5*time.Minute,
workinformers.WithNamespace(clusterName),
)
informer := factory.Work().V1().ManifestWorks()
// Use the informer's store as the AgentInformerWatcherStore's store
watcherStore.SetStore(informer.Informer().GetStore())
// Building controllers with ManifestWork informer ...
// Start the ManifestWork informer
go manifestWorkInformer.Informer().Run(ctx.Done())
// Use the manifestWorkClient to update work status
Building garbage collector for work controllers on the hub cluster
The garbage collector is an optional component running alongside the ManifestWork
source client. It is used to clean up ManifestWork
resources owned by other resources. For example, in the addon-framework, ManifestWork
resources, created by addon controllers, have owner references pointing to ManagedClusterAddon
resources, and when the owner resources are deleted, the ManifestWork
resources should be deleted as well.
Developers need to provide the following to build and run the garbage collector:
- Client Holder (
work.ClientHolder
): This contains the ManifestWork
client and informer, which can be built using the builder mentioned in last two sections.
- Metadata Client (
metadata.Interface
): This is used to retrieve the owner resources of the ManifestWork
resources.
- Owner resource filters map (
map[schema.GroupVersionResource]*metav1.ListOptions
): This is used to filter the owner resources of the ManifestWork
resources.
listOptions := &metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", addonName),
}
ownerGVRFilters := map[schema.GroupVersionResource]*metav1.ListOptions{
// ManagedClusterAddon is the owner of the ManifestWork resources filtered by the addon name
addonv1alpha1.SchemeGroupVersion.WithResource("managedclusteraddons"): listOptions,
}
// Initialize the garbage collector
garbageCollector := garbagecollector.NewGarbageCollector(clientHolder, workInformer, metadataClient, ownerGVRFilters)
// Run the garbage collector with 1 worker to handle the garbage collection
go garbageCollector.Run(ctx, 1)