Cloudevents Clients
We have implemented the cloudevents-based clients in this package to assist developers in
easily implementing the Event Based Manifestwork
proposal.
Generic Clients
The generic client (generic.CloudEventsClient
) is used to resync/publish/subscribe resource objects between sources
and agents with cloudevents.
A resource object can be any object that implements the generic.ResourceObject
interface.
Building a generic client on a source
Developers can use generic.NewCloudEventSourceClient
method to build a generic client on the source. To build this
client the developers need to provide
-
A cloudevents source options (options.CloudEventsSourceOptions
), this options have two parts
sourceID
, it is a unique identifier for a source, for example, it can generate a source ID by hashing the hub
cluster URL and appending the controller name. Similarly, a RESTful service can select a unique name or generate a
unique ID in the associated database for its source identification.
CloudEventsOptions
, it provides cloudevents clients to send/receive cloudevents based on different event
protocol. We have supported MQTT protocol (mqtt.NewSourceOptions
) and gRPC protocol (grpc.NewSourceOptions
) developers can use it directly.
-
A resource lister (generic.Lister
), it is used to list the resource objects on the source when resyncing the
resources between sources and agents, for example, a hub controller can list the resources from the resource informers,
and a RESTful service can list its resources from a database.
-
A resource status hash getter method (generic.StatusHashGetter
), this method will be used to calculate the resource
status hash when resyncing the resource status between sources and agents.
-
Codecs (generic.Codec
), they are used to encode a resource object into a cloudevent and decode a cloudevent into a
resource object with a given cloudevent data type. We have provided two data types (io.open-cluster-management.works.v1alpha1.manifests
that contains a single resource object in the cloudevent payload and io.open-cluster-management.works.v1alpha1.manifestbundles
that contains a list of resource objects in the cloudevent payload) for ManifestWork
, they can be found in the work/payload
package.
-
Resource handler methods (generic.ResourceHandler
), they are used to handle the resources status after the client
received the resources status from agents.
for example, build a generic client on the source using MQTT protocol with the following code:
// build a client for the source1
client, err := generic.NewCloudEventSourceClient[*CustomerResource](
ctx,
mqtt.NewSourceOptions(mqtt.NewMQTTOptions(), "source1"),
customerResourceLister,
customerResourceStatusHashGetter,
customerResourceCodec,
)
// start a go routine to receive the resources status from agents
go func() {
if err := client.Subscribe(ctx, customerResourceHandler); err != nil {
//TODO handle this error when subscribing the cloudevents failed
}
}()
You may refer to the cloudevents client integration test as an example.
Building a generic client on a manged cluster
Developers can use generic.NewCloudEventAgentClient
method to build a generic client on a managed cluster. To build
this client the developers need to provide
-
A cloudevents agent options (options.CloudEventsAgentOptions
), this options have three parts
agentID
, it is a unique identifier for an agent, for example, it can consist of a managed cluster name and an
agent name.
clusterName
, it is the name of a managed cluster on which the agent runs.
CloudEventsOptions
, it provides cloudevents clients to send/receive cloudevents based on different event
protocol. We have supported MQTT protocol (mqtt.NewAgentOptions
) and gRPC protocol (grpc.NewAgentOptions
) , developers can use it directly.
-
A resource lister (generic.Lister
), it is used to list the resource objects on a managed cluster when resyncing the
resources between sources and agents, for example, a work agent can list its works from its work informers.
-
A resource status hash getter method (generic.StatusHashGetter
), this method will be used to calculate the resource
status hash when resyncing the resource status between sources and agents.
-
Codecs (generic.Codec
), they are used to encode a resource object into a cloudevent and decode a cloudevent into a
resource object with a given cloudevent data type. We have provided two data types (io.open-cluster-management.works.v1alpha1.manifests
that contains a single resource object in the cloudevent payload and io.open-cluster-management.works.v1alpha1.manifestbundles
that contains a list of resource objects in the cloudevent payload) for ManifestWork
, they can be found in the work/payload
package.
-
Resource handler methods (generic.ResourceHandler
), they are used to handle the resources after the client received
the resources from sources.
for example, build a generic client on the source using MQTT protocol with the following code:
// build a client for a work agent on the cluster1
client, err := generic.NewCloudEventAgentClient[*CustomerResource](
ctx,
mqtt.NewAgentOptions(mqtt.NewMQTTOptions(), "cluster1", "cluster1-work-agent"),
&ManifestWorkLister{},
ManifestWorkStatusHash,
&ManifestBundleCodec{},
)
// start a go routine to receive the resources from sources
go func() {
if err := client.Subscribe(ctx, NewManifestWorkAgentHandler()); err != nil {
//TODO handle this error when subscribing the cloudevents failed
}
}()
Work Clients
We have provided a builder to build the ManifestWork
client (ManifestWorkInterface
) and informer (ManifestWorkInformer
)
based on the generic client.
Building work client for work controllers on the hub cluster
TODO
Building work client for work agent on the managed cluster
Developers can use the builder to build the ManifestWork
client and informer with the cluster name.
clusterName := "cluster1"
// Building the clients based on cloudevents with MQTT
config := mqtt.NewMQTTOptions()
clientHolder, err := work.NewClientHolderBuilder(fmt.Sprintf("%s-work-agent", clusterName), config).
WithClusterName(clusterName).
// Supports two event data types for ManifestWork
WithCodecs(codec.NewManifestBundleCodec(), codec.NewManifestCodec(restMapper)).
NewClientHolder(ctx)
if err != nil {
return err
}
manifestWorkClient := clientHolder.ManifestWorks(clusterName)
manifestWorkInformer := clientHolder.ManifestWorkInformer()
// Building controllers with ManifestWork client and informer ...
// Start the ManifestWork informer
go manifestWorkInformer.Informer().Run(ctx.Done())