reconciler

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package reconciler defines implementations of the Reconciler interface defined at sigs.k8s.io/controller-runtime/pkg/reconcile.Reconciler. They implement the basic workhorse functionality of controllers, which include an InterStepBufferService controller, a Pipeline controller, and a Vertex controller.

Despite the implementation of the controllers, this package also implements a Start() function to watch corresponding Kubernetes resources for those controllers, it is supposed to be called at the time the controller manager service starts.

Index

Constants

This section is empty.

Variables

View Source
var (
	// BuildInfo provides the controller binary build information including version and platform, etc.
	BuildInfo = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "build_info",
		Help:      "A metric with a constant value '1', labeled with controller version and platform from which Numaflow was built",
	}, []string{metrics.LabelVersion, metrics.LabelPlatform})

	// ISBSvcHealth indicates whether the ISB Service is healthy (from k8s resource perspective).
	ISBSvcHealth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "isbsvc_health",
		Help:      "A metric to indicate whether the ISB Service is healthy. '1' means healthy, '0' means unhealthy",
	}, []string{metrics.LabelNamespace, metrics.LabelISBService})

	// PipelineHealth indicates whether the pipeline is healthy (from k8s resource perspective).
	PipelineHealth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "pipeline_health",
		Help:      "A metric to indicate whether the Pipeline is healthy. '1' means healthy, '0' means unhealthy",
	}, []string{metrics.LabelNamespace, metrics.LabelISBService})

	MonoVertexHealth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "monovtx_health",
		Help:      "A metric to indicate whether the MonoVertex is healthy. '1' means healthy, '0' means unhealthy",
	}, []string{metrics.LabelNamespace, metrics.LabelMonoVertexName})

	// JetStreamISBSvcReplicas indicates the replicas of a JetStream ISB Service.
	JetStreamISBSvcReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "isbsvc_jetstream_replicas",
		Help:      "A metric indicates the replicas of a JetStream ISB Service",
	}, []string{metrics.LabelNamespace, metrics.LabelISBService})

	// RedisISBSvcReplicas indicates the replicas of a Redis ISB Service.
	RedisISBSvcReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "isbsvc_redis_replicas",
		Help:      "A metric indicates the replicas of a Redis ISB Service",
	}, []string{metrics.LabelNamespace, metrics.LabelISBService})

	// VertexDesiredReplicas indicates the desired replicas of a Vertex.
	VertexDesiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "vertex_desired_replicas",
		Help:      "A metric indicates the desired replicas of a Vertex",
	}, []string{metrics.LabelNamespace, metrics.LabelPipeline, metrics.LabelVertex})

	// VertexCurrentReplicas indicates the current replicas of a Vertex.
	VertexCurrentReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "vertex_current_replicas",
		Help:      "A metric indicates the current replicas of a Vertex",
	}, []string{metrics.LabelNamespace, metrics.LabelPipeline, metrics.LabelVertex})

	// MonoVertexDesiredReplicas indicates the desired replicas of a MonoVertex.
	MonoVertexDesiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "monovtx_desired_replicas",
		Help:      "A metric indicates the desired replicas of a MonoVertex",
	}, []string{metrics.LabelNamespace, metrics.LabelMonoVertexName})

	// MonoVertexCurrentReplicas indicates the current replicas of a MonoVertex.
	MonoVertexCurrentReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Subsystem: "controller",
		Name:      "monovtx_current_replicas",
		Help:      "A metric indicates the current replicas of a MonoVertex",
	}, []string{metrics.LabelNamespace, metrics.LabelMonoVertexName})
)

Functions

func CheckDeploymentStatus added in v1.3.0

func CheckDeploymentStatus(deployment *appv1.Deployment) (done bool, reason string, message string)

CheckDeploymentStatus returns a message describing deployment status, and message with reason where bool value indicating if the status is considered done. Borrowed at kubernetes/kubectl/rollout_status.go https://github.com/kubernetes/kubernetes/blob/cea1d4e20b4a7886d8ff65f34c6d4f95efcb4742/staging/src/k8s.io/kubectl/pkg/polymorphichelpers/rollout_status.go#L59

func CheckPodsStatus added in v1.3.1

func CheckPodsStatus(pods *corev1.PodList) (healthy bool, reason string, message string)

CheckPodsStatus checks the status by iterating over pods objects

func CheckStatefulSetStatus added in v1.3.0

func CheckStatefulSetStatus(sts *appv1.StatefulSet) (done bool, reason string, message string)

CheckStatefulSetStatus returns a message describing statefulset status, and a bool value indicating if the status is considered done. Borrowed at kubernetes/kubectl/rollout_status.go https://github.com/kubernetes/kubernetes/blob/cea1d4e20b4a7886d8ff65f34c6d4f95efcb4742/staging/src/k8s.io/kubectl/pkg/polymorphichelpers/rollout_status.go#L130

func CheckVertexStatus added in v1.3.0

func CheckVertexStatus(vertices *dfv1.VertexList) (healthy bool, reason string, message string)

CheckVertexStatus will calculate the status of the vertices and return the status and reason

func NumOfReadyPods added in v1.3.1

func NumOfReadyPods(pods corev1.PodList) int

Types

type DefaultConfig added in v1.1.6

type DefaultConfig struct {
	ContainerResources string `json:"containerResources"`
}

func (DefaultConfig) GetDefaultContainerResources added in v1.1.6

func (dc DefaultConfig) GetDefaultContainerResources() corev1.ResourceRequirements

type GlobalConfig

type GlobalConfig struct {
	// contains filtered or unexported fields
}

GlobalConfig is the configuration for the controllers, it is supposed to be populated from the configmap attached to the controller manager.

func FakeGlobalConfig added in v1.1.6

func FakeGlobalConfig(t *testing.T, isbSvcConfig *ISBSvcConfig) *GlobalConfig

func LoadConfig

func LoadConfig(onErrorReloading func(error)) (*GlobalConfig, error)

func (*GlobalConfig) GetDefaults added in v1.1.6

func (g *GlobalConfig) GetDefaults() DefaultConfig

Get controller scope default config

func (*GlobalConfig) GetISBSvcConfig added in v1.1.6

func (g *GlobalConfig) GetISBSvcConfig() ISBSvcConfig

Get controller scope ISB Service config

type ISBSvcConfig

type ISBSvcConfig struct {
	Redis     *RedisConfig     `json:"redis"`
	JetStream *JetStreamConfig `json:"jetstream"`
}

func (ISBSvcConfig) GetJetStreamVersion added in v1.1.6

func (isc ISBSvcConfig) GetJetStreamVersion(version string) (*JetStreamVersion, error)

func (ISBSvcConfig) GetRedisVersion added in v1.1.6

func (isc ISBSvcConfig) GetRedisVersion(version string) (*RedisVersion, error)

type JetStreamConfig

type JetStreamConfig struct {
	Settings     string             `json:"settings"`
	BufferConfig string             `json:"bufferConfig"`
	Versions     []JetStreamVersion `json:"versions"`
}

type JetStreamVersion

type JetStreamVersion struct {
	Version              string `json:"version"`
	NatsImage            string `json:"natsImage"`
	MetricsExporterImage string `json:"metricsExporterImage"`
	ConfigReloaderImage  string `json:"configReloaderImage"`
	StartCommand         string `json:"startCommand"`
}

type RedisConfig

type RedisConfig struct {
	Settings *RedisSettings `json:"settings"`
	Versions []RedisVersion `json:"versions"`
}

type RedisSettings

type RedisSettings struct {
	Redis    string `json:"redis"`
	Master   string `json:"master"`
	Replica  string `json:"replica"`
	Sentinel string `json:"sentinel"`
}

type RedisVersion

type RedisVersion struct {
	Version            string `json:"version"`
	RedisImage         string `json:"redisImage"`
	SentinelImage      string `json:"sentinelImage"`
	InitContainerImage string `json:"initContainerImage"`
	RedisExporterImage string `json:"redisExporterImage"`
}

Directories

Path Synopsis
scaling
Package scaling provides the autoscaling capability for MonoVertex objects.
Package scaling provides the autoscaling capability for MonoVertex objects.
scaling
Package scaling provides the autoscaling capability for Vertex objects.
Package scaling provides the autoscaling capability for Vertex objects.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL