Documentation ¶
Index ¶
- Constants
- Variables
- func BuildCloudEventsAgentOptions(config any, clusterName, clientId string) (*options.CloudEventsAgentOptions, error)
- func BuildCloudEventsSourceOptions(config any, clientId, sourceId string) (*options.CloudEventsSourceOptions, error)
- func NewRateLimiter(limit options.EventRateLimit) flowcontrol.RateLimiter
- type CloudEventAgentClient
- func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error
- func (c *CloudEventAgentClient[T]) ReconnectedChan() <-chan struct{}
- func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, source string) error
- func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T])
- type CloudEventSourceClient
- func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error
- func (c *CloudEventSourceClient[T]) ReconnectedChan() <-chan struct{}
- func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, clusterName string) error
- func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T])
- type CloudEventsClient
- type Codec
- type ConfigLoader
- type Lister
- type ResourceHandler
- type ResourceObject
- type StatusHashGetter
Constants ¶
const ( // TODO we may adjust these after performance test DefaultQPS float32 = 50.0 DefaultBurst int = 100 )
Variables ¶
var DelayFn = wait.Backoff{ Duration: 5 * time.Second, Cap: 1 * time.Minute, Steps: 12, Factor: 5.0, Jitter: 1.0, }.DelayWithReset(&clock.RealClock{}, 10*time.Minute)
the reconnect backoff will stop at [1,5) min interval. If we don't backoff for 10min, we reset the backoff.
Functions ¶
func BuildCloudEventsAgentOptions ¶ added in v0.14.0
func BuildCloudEventsAgentOptions(config any, clusterName, clientId string) (*options.CloudEventsAgentOptions, error)
BuildCloudEventsAgentOptions builds the cloudevents agent options based on the broker type
func BuildCloudEventsSourceOptions ¶ added in v0.14.0
func BuildCloudEventsSourceOptions(config any, clientId, sourceId string) (*options.CloudEventsSourceOptions, error)
BuildCloudEventsSourceOptions builds the cloudevents source options based on the broker type
func NewRateLimiter ¶
func NewRateLimiter(limit options.EventRateLimit) flowcontrol.RateLimiter
Types ¶
type CloudEventAgentClient ¶
type CloudEventAgentClient[T ResourceObject] struct { // contains filtered or unexported fields }
CloudEventAgentClient is a client for an agent to resync/send/receive its resources with cloud events.
An agent is a component that handles the deployment of requested resources on the managed cluster and status report to the source.
func NewCloudEventAgentClient ¶
func NewCloudEventAgentClient[T ResourceObject]( ctx context.Context, agentOptions *options.CloudEventsAgentOptions, lister Lister[T], statusHashGetter StatusHashGetter[T], codecs ...Codec[T], ) (*CloudEventAgentClient[T], error)
NewCloudEventAgentClient returns an instance for CloudEventAgentClient. The following arguments are required to create a client.
- agentOptions provides the clusterName and agentID and the cloudevents clients that are based on different event protocols for sending/receiving the cloudevents.
- lister gets the resources from a cache/store of an agent.
- statusHashGetter calculates the resource status hash.
- codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet.
func (*CloudEventAgentClient[T]) Publish ¶
func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error
Publish a resource status from an agent to a source.
func (*CloudEventAgentClient[T]) ReconnectedChan ¶ added in v0.14.0
func (c *CloudEventAgentClient[T]) ReconnectedChan() <-chan struct{}
ReconnectedChan returns a chan which indicates the source/agent client is reconnected. The source/agent client callers should consider sending a resync request when receiving this signal.
func (*CloudEventAgentClient[T]) Resync ¶
func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, source string) error
Resync the resources spec by sending a spec resync request from the current to the given source.
func (*CloudEventAgentClient[T]) Subscribe ¶
func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T])
Subscribe the events that are from the source status resync request or source resource spec request. For status resync request, agent publish the current resources status back as response. For resource spec request, agent receives resource spec and handles the spec with resource handlers.
type CloudEventSourceClient ¶
type CloudEventSourceClient[T ResourceObject] struct { // contains filtered or unexported fields }
CloudEventSourceClient is a client for a source to resync/send/receive its resources with cloud events.
A source is a component that runs on a server, it can be a controller on the hub cluster or a RESTful service handling resource requests.
func NewCloudEventSourceClient ¶
func NewCloudEventSourceClient[T ResourceObject]( ctx context.Context, sourceOptions *options.CloudEventsSourceOptions, lister Lister[T], statusHashGetter StatusHashGetter[T], codecs ...Codec[T], ) (*CloudEventSourceClient[T], error)
NewCloudEventSourceClient returns an instance for CloudEventSourceClient. The following arguments are required to create a client
- sourceOptions provides the sourceID and the cloudevents clients that are based on different event protocols for sending/receiving the cloudevents.
- lister gets the resources from a cache/store of a source.
- statusHashGetter calculates the resource status hash.
- codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet.
func (*CloudEventSourceClient[T]) Publish ¶
func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error
Publish a resource spec from a source to an agent.
func (*CloudEventSourceClient[T]) ReconnectedChan ¶ added in v0.14.0
func (c *CloudEventSourceClient[T]) ReconnectedChan() <-chan struct{}
func (*CloudEventSourceClient[T]) Resync ¶
func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, clusterName string) error
Resync the resources status by sending a status resync request from the current source to a specified cluster.
func (*CloudEventSourceClient[T]) Subscribe ¶
func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T])
Subscribe the events that are from the agent spec resync request or agent resource status request. For spec resync request, source publish the current resources spec back as response. For resource status request, source receives resource status and handles the status with resource handlers.
type CloudEventsClient ¶
type CloudEventsClient[T ResourceObject] interface { // Resync the resources of one source/agent by sending resync request. // The second parameter is used to specify cluster name/source ID for a source/agent. // - A source sends the resource status resync request to a cluster with the given cluster name. // If setting this parameter to `types.ClusterAll`, the source will broadcast the resync request to all clusters. // - An agent sends the resources spec resync request to a source with the given source ID. // If setting this parameter to `types.SourceAll`, the agent will broadcast the resync request to all sources. Resync(context.Context, string) error // Publish the resources spec/status event to the broker. Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error // Subscribe the resources status/spec event to the broker to receive the resources status/spec and use // ResourceHandler to handle them. Subscribe(ctx context.Context, handlers ...ResourceHandler[T]) // ReconnectedChan returns a chan which indicates the source/agent client is reconnected. // The source/agent client callers should consider sending a resync request when receiving this signal. ReconnectedChan() <-chan struct{} }
type Codec ¶
type Codec[T ResourceObject] interface { // EventDataType indicates which type of the event data the codec is used for. EventDataType() types.CloudEventsDataType // Encode a resource object to cloudevents event. // Each event should have the following extensions: `resourceid`, `resourceversion` and `clustername`. // The source set the `deletiontimestamp` extension to indicate one resource object is deleting from a source. // The agent set the `originalsource` extension to indicate one resource belonged to which source. Encode(source string, eventType types.CloudEventsType, obj T) (*cloudevents.Event, error) // Decode a cloudevents event to a resource object. Decode(event *cloudevents.Event) (T, error) }
type ConfigLoader ¶ added in v0.14.0
type ConfigLoader struct {
// contains filtered or unexported fields
}
ConfigLoader loads a configuration object with a configuration file.
func NewConfigLoader ¶ added in v0.14.0
func NewConfigLoader(configType, configPath string) *ConfigLoader
NewConfigLoader returns a ConfigLoader with the given configuration type and configuration file path.
Available configuration types:
- mqtt
- grpc
- kafka
func (*ConfigLoader) LoadConfig ¶ added in v0.14.0
func (l *ConfigLoader) LoadConfig() (string, any, error)
TODO using a specified config instead of any
type Lister ¶
type Lister[T ResourceObject] interface { // List returns the list of resource objects that are maintained by source/agent. List(options types.ListOptions) ([]T, error) }
type ResourceHandler ¶
type ResourceHandler[T ResourceObject] func(action types.ResourceAction, obj T) error
ResourceHandler handles the received resource object.
type ResourceObject ¶
type ResourceObject interface { // GetUID returns the resource ID of this object. The resource ID represents the unique identifier for this object. // The source should ensure its uniqueness and consistency. GetUID() kubetypes.UID // GetResourceVersion returns the resource version of this object. The resource version is a required int64 sequence // number property that must be incremented by the source whenever this resource changes. // The source should guarantee its incremental nature. GetResourceVersion() string // GetDeletionTimestamp returns the deletion timestamp of this object. The deletiontimestamp is an optional // timestamp property representing the resource is deleting from the source, the agent needs to clean up the // resource from its cluster. GetDeletionTimestamp() *metav1.Time }
type StatusHashGetter ¶
type StatusHashGetter[T ResourceObject] func(obj T) (string, error)
StatusHashGetter gets the status hash of one resource object.