generic

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2024 License: Apache-2.0 Imports: 21 Imported by: 11

Documentation

Index

Constants

View Source
const (
	// TODO we may adjust these after performance test
	DefaultQPS   float32 = 50.0
	DefaultBurst int     = 100
)

Variables

View Source
var DelayFn = wait.Backoff{
	Duration: 5 * time.Second,
	Cap:      1 * time.Minute,
	Steps:    12,
	Factor:   5.0,
	Jitter:   1.0,
}.DelayWithReset(&clock.RealClock{}, 10*time.Minute)

the reconnect backoff will stop at [1,5) min interval. If we don't backoff for 10min, we reset the backoff.

Functions

func BuildCloudEventsAgentOptions added in v0.14.0

func BuildCloudEventsAgentOptions(config any, clusterName, clientId string) (*options.CloudEventsAgentOptions, error)

BuildCloudEventsAgentOptions builds the cloudevents agent options based on the broker type

func BuildCloudEventsSourceOptions added in v0.14.0

func BuildCloudEventsSourceOptions(config any, clientId, sourceId string) (*options.CloudEventsSourceOptions, error)

BuildCloudEventsSourceOptions builds the cloudevents source options based on the broker type

func RegisterResourceResyncMetrics added in v0.15.0

func RegisterResourceResyncMetrics()

Register the metrics:

func ResetResourceResyncMetricsCollectors added in v0.15.0

func ResetResourceResyncMetricsCollectors()

ResetResourceResyncMetricsCollectors resets all collectors

func UnregisterResourceResyncMetrics added in v0.15.0

func UnregisterResourceResyncMetrics()

Unregister the metrics:

Types

type CloudEventAgentClient

type CloudEventAgentClient[T ResourceObject] struct {
	// contains filtered or unexported fields
}

CloudEventAgentClient is a client for an agent to resync/send/receive its resources with cloud events.

An agent is a component that handles the deployment of requested resources on the managed cluster and status report to the source.

func NewCloudEventAgentClient

func NewCloudEventAgentClient[T ResourceObject](
	ctx context.Context,
	agentOptions *options.CloudEventsAgentOptions,
	lister Lister[T],
	statusHashGetter StatusHashGetter[T],
	codecs ...Codec[T],
) (*CloudEventAgentClient[T], error)

NewCloudEventAgentClient returns an instance for CloudEventAgentClient. The following arguments are required to create a client.

  • agentOptions provides the clusterName and agentID and the cloudevents clients that are based on different event protocols for sending/receiving the cloudevents.
  • lister gets the resources from a cache/store of an agent.
  • statusHashGetter calculates the resource status hash.
  • codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet.

func (*CloudEventAgentClient[T]) Publish

func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error

Publish a resource status from an agent to a source.

func (*CloudEventAgentClient[T]) ReconnectedChan added in v0.14.0

func (c *CloudEventAgentClient[T]) ReconnectedChan() <-chan struct{}

ReconnectedChan returns a chan which indicates the source/agent client is reconnected. The source/agent client callers should consider sending a resync request when receiving this signal.

func (*CloudEventAgentClient[T]) Resync

func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, source string) error

Resync the resources spec by sending a spec resync request from the current to the given source.

func (*CloudEventAgentClient[T]) Subscribe

func (c *CloudEventAgentClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T])

Subscribe the events that are from the source status resync request or source resource spec request. For status resync request, agent publish the current resources status back as response. For resource spec request, agent receives resource spec and handles the spec with resource handlers.

type CloudEventSourceClient

type CloudEventSourceClient[T ResourceObject] struct {
	// contains filtered or unexported fields
}

CloudEventSourceClient is a client for a source to resync/send/receive its resources with cloud events.

A source is a component that runs on a server, it can be a controller on the hub cluster or a RESTful service handling resource requests.

func NewCloudEventSourceClient

func NewCloudEventSourceClient[T ResourceObject](
	ctx context.Context,
	sourceOptions *options.CloudEventsSourceOptions,
	lister Lister[T],
	statusHashGetter StatusHashGetter[T],
	codecs ...Codec[T],
) (*CloudEventSourceClient[T], error)

NewCloudEventSourceClient returns an instance for CloudEventSourceClient. The following arguments are required to create a client

  • sourceOptions provides the sourceID and the cloudevents clients that are based on different event protocols for sending/receiving the cloudevents.
  • lister gets the resources from a cache/store of a source.
  • statusHashGetter calculates the resource status hash.
  • codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet.

func (*CloudEventSourceClient[T]) Publish

func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error

Publish a resource spec from a source to an agent.

func (*CloudEventSourceClient[T]) ReconnectedChan added in v0.14.0

func (c *CloudEventSourceClient[T]) ReconnectedChan() <-chan struct{}

func (*CloudEventSourceClient[T]) Resync

func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, clusterName string) error

Resync the resources status by sending a status resync request from the current source to a specified cluster.

func (*CloudEventSourceClient[T]) Subscribe

func (c *CloudEventSourceClient[T]) Subscribe(ctx context.Context, handlers ...ResourceHandler[T])

Subscribe the events that are from the agent spec resync request or agent resource status request. For spec resync request, source publish the current resources spec back as response. For resource status request, source receives resource status and handles the status with resource handlers.

type CloudEventsClient

type CloudEventsClient[T ResourceObject] interface {
	// Resync the resources of one source/agent by sending resync request.
	// The second parameter is used to specify cluster name/source ID for a source/agent.
	//   - A source sends the resource status resync request to a cluster with the given cluster name.
	//     If setting this parameter to `types.ClusterAll`, the source will broadcast the resync request to all clusters.
	//   - An agent sends the resources spec resync request to a source with the given source ID.
	//     If setting this parameter to `types.SourceAll`, the agent will broadcast the resync request to all sources.
	Resync(context.Context, string) error

	// Publish the resources spec/status event to the broker.
	Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error

	// Subscribe the resources status/spec event to the broker to receive the resources status/spec and use
	// ResourceHandler to handle them.
	Subscribe(ctx context.Context, handlers ...ResourceHandler[T])

	// ReconnectedChan returns a chan which indicates the source/agent client is reconnected.
	// The source/agent client callers should consider sending a resync request when receiving this signal.
	ReconnectedChan() <-chan struct{}
}

type Codec

type Codec[T ResourceObject] interface {
	// EventDataType indicates which type of the event data the codec is used for.
	EventDataType() types.CloudEventsDataType

	// Encode a resource object to cloudevents event.
	// Each event should have the following extensions: `resourceid`, `resourceversion` and `clustername`.
	// The source set the `deletiontimestamp` extension to indicate one resource object is deleting from a source.
	// The agent set the `originalsource` extension to indicate one resource belonged to which source.
	Encode(source string, eventType types.CloudEventsType, obj T) (*cloudevents.Event, error)

	// Decode a cloudevents event to a resource object.
	Decode(event *cloudevents.Event) (T, error)
}

type ConfigLoader added in v0.14.0

type ConfigLoader struct {
	// contains filtered or unexported fields
}

ConfigLoader loads a configuration object with a configuration file.

func NewConfigLoader added in v0.14.0

func NewConfigLoader(configType, configPath string) *ConfigLoader

NewConfigLoader returns a ConfigLoader with the given configuration type and configuration file path.

Available configuration types:

  • mqtt
  • grpc
  • kafka

func (*ConfigLoader) LoadConfig added in v0.14.0

func (l *ConfigLoader) LoadConfig() (string, any, error)

TODO using a specified config instead of any

type Lister

type Lister[T ResourceObject] interface {
	// List returns the list of resource objects that are maintained by source/agent.
	List(options types.ListOptions) ([]T, error)
}

type ResourceHandler

type ResourceHandler[T ResourceObject] func(action types.ResourceAction, obj T) error

ResourceHandler handles the received resource object.

type ResourceObject

type ResourceObject interface {
	// GetUID returns the resource ID of this object. The resource ID represents the unique identifier for this object.
	// The source should ensure its uniqueness and consistency.
	GetUID() kubetypes.UID

	// GetResourceVersion returns the resource version of this object. The resource version is a required int64 sequence
	// number property that must be incremented by the source whenever this resource changes.
	// The source should guarantee its incremental nature.
	GetResourceVersion() string

	// GetDeletionTimestamp returns the deletion timestamp of this object. The deletiontimestamp is an optional
	// timestamp property representing the resource is deleting from the source, the agent needs to clean up the
	// resource from its cluster.
	GetDeletionTimestamp() *metav1.Time
}

type StatusHashGetter

type StatusHashGetter[T ResourceObject] func(obj T) (string, error)

StatusHashGetter gets the status hash of one resource object.

Directories

Path Synopsis
kafka
This is the dummy code to pass the compile when Kafka is not enabled.
This is the dummy code to pass the compile when Kafka is not enabled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL