instance

package
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CreateConfigMapsIfNotFound = polardbxv1reconcile.NewStepBinder("CreateConfigMapsIfNotFound",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		objectFactory := factory.NewObjectFactory(rc)

		for _, cmType := range []convention.ConfigMapType{
			convention.ConfigMapTypeConfig,
			convention.ConfigMapTypeTask,
		} {
			cm, err := rc.GetPolarDBXConfigMap(cmType)
			if client.IgnoreNotFound(err) != nil {
				return flow.Error(err, "Unable to get config map.", "configmap-type", cmType)
			}

			if cm == nil {
				cm, err := objectFactory.NewConfigMap(cmType)
				if err != nil {
					return flow.Error(err, "Unable to new config map.", "configmap-type", cmType)
				}

				err = rc.SetControllerRefAndCreate(cm)
				if err != nil {
					return flow.Error(err, "Unable to create config map.", "configmap-type", cmType)
				}
			}
		}

		return flow.Pass()
	},
)
View Source
var CreateOrReconcileCDCs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileCDCs",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleCDC)
	},
)
View Source
var CreateOrReconcileCNs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileCNs",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleCN)
	},
)
View Source
var CreateOrReconcileDNs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileDNs",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		polardbx := rc.MustGetPolarDBX()
		topology := polardbx.Status.SpecSnapshot.Topology
		observedGeneration := polardbx.Status.ObservedGeneration
		replicas := int(topology.Nodes.DN.Replicas)

		dnStores, err := rc.GetDNMap()
		if err != nil {
			return flow.Error(err, "Unable to get xstores of DN.")
		}

		if !helper.IsPhaseIn(polardbx, polardbxv1polardbx.PhaseCreating, polardbxv1polardbx.PhaseRestoring) {
			lastIndex := 0
			for ; lastIndex < replicas; lastIndex++ {
				if _, ok := dnStores[lastIndex]; !ok {
					break
				}
			}

			if lastIndex != replicas && lastIndex != len(dnStores) {
				helper.TransferPhase(polardbx, polardbxv1polardbx.PhaseFailed)
				return flow.Retry("Found broken DN, transfer into failed.")
			}
		}

		objectFactory := factory.NewObjectFactory(rc)
		anyChanged := false
		wg := &sync.WaitGroup{}
		errs := make([]error, replicas)
		var errCnt uint32
		for i := 0; i < replicas; i++ {
			observedDnStore, ok := dnStores[i]

			if !ok {
				newDnStore, err := objectFactory.NewXStoreDN(i)
				if err != nil {
					return flow.Error(err, "Unable to new xstore of DN.", "index", i)
				}

				wg.Add(1)
				logger, idx := flow.Logger(), i
				go func() {
					defer wg.Done()
					err = rc.SetControllerRefAndCreate(newDnStore)
					if err != nil {
						logger.Error(err, "Unable to create xstore of DN.", "index", i)
						errs[idx] = err
						atomic.AddUint32(&errCnt, 1)
					}
				}()

				anyChanged = true
			} else {
				generation, err := convention.GetGenerationLabelValue(observedDnStore)
				if err != nil {
					return flow.Error(err, "Unable to get generation from xstore of DN.", "index", i)
				}

				if generation < observedGeneration {

					if !featuregate.StoreUpgrade.Enabled() {
						flow.Logger().Info("Feature 'StoreUpgrade' not enabled, skip upgrade.", "xstore", observedDnStore.Name)
						continue
					}

					newDnStore, err := objectFactory.NewXStoreDN(i)
					if err != nil {
						return flow.Error(err, "Unable to new xstore of DN.", "index", i)
					}

					convention.CopyMetadataForUpdate(&newDnStore.ObjectMeta, &observedDnStore.ObjectMeta, observedGeneration)

					wg.Add(1)
					logger, idx := flow.Logger(), i
					go func() {
						defer wg.Done()
						err = rc.Client().Update(rc.Context(), newDnStore)
						if err != nil {
							logger.Error(err, "Unable to update xstore of DN.", "index", i)
							errs[idx] = err
							atomic.AddUint32(&errCnt, 1)
						}
					}()

					anyChanged = true
				}
			}
		}

		wg.Wait()
		if errCnt > 0 {
			var firstErr error
			for _, err := range errs {
				if err != nil {
					firstErr = err
					break
				}
			}
			return flow.Error(firstErr, "Unable to create or reconcile xstores of DN.", "error-cnt", errCnt)
		}

		toRemoveStores := make(map[int]*polardbxv1.XStore, 0)
		for index, xstore := range dnStores {
			if index >= replicas {
				toRemoveStores[index] = xstore
			}
		}
		if len(toRemoveStores) > 0 {
			mgr, err := rc.GetPolarDBXGMSManager()
			if err != nil {
				return flow.Error(err, "Unable to get manager for GMS.")
			}

			storageNodeIds := make(map[string]int)
			if initialized, err := mgr.IsMetaDBInitialized(); err != nil {
				return flow.Error(err, "Unable to determine if GMS is initialized.")
			} else if initialized {
				storageNodes, err := mgr.ListStorageNodes(gms.StorageKindMaster)
				if err != nil {
					return flow.Error(err, "Unable to list storage nodes in GMS.")
				}
				for _, s := range storageNodes {
					storageNodeIds[s.Id] = 1
				}
			} else {

			}

			canRemoveStoreIndices := make([]int, 0)
			for index, xstore := range toRemoveStores {
				if _, found := storageNodeIds[xstore.Name]; !found {
					canRemoveStoreIndices = append(canRemoveStoreIndices, index)
				}
			}
			if len(canRemoveStoreIndices) > 0 {
				sort.Slice(canRemoveStoreIndices, func(i, j int) bool {
					return canRemoveStoreIndices[i] > canRemoveStoreIndices[j]
				})
				flow.Logger().Info("Trying to remove trailing xstores (not in use)",
					"trailing-indices", canRemoveStoreIndices)
				for _, index := range canRemoveStoreIndices {
					xstore := dnStores[index]
					err := rc.Client().Delete(rc.Context(), xstore, client.PropagationPolicy(metav1.DeletePropagationBackground))
					if err != nil {
						return flow.Error(err, "Unable to delete unused trailing xstore.", "xstore", xstore.Name)
					}
				}
				flow.Logger().Info("Unused trailing xstores are all removed.")
			}
		}

		if anyChanged {
			return flow.Retry("DNs created or updated!")
		}

		return flow.Pass()
	},
)
View Source
var CreateOrReconcileGMS = polardbxv1reconcile.NewStepBinder("CreateOrReconcileGMS",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		polardbx := rc.MustGetPolarDBX()

		if polardbx.Spec.ShareGMS {
			return flow.Continue("GMS shared, skip.")
		}

		gmsStore, err := rc.GetGMS()
		if client.IgnoreNotFound(err) != nil {
			return flow.Continue("Unable to get xstore of GMS.")
		}

		if gmsStore == nil {

			if !helper.IsPhaseIn(polardbx, polardbxv1polardbx.PhaseCreating, polardbxv1polardbx.PhaseRestoring) {
				helper.TransferPhase(polardbx, polardbxv1polardbx.PhaseFailed)
				return flow.Retry("GMS not found, transfer into failed.")
			}

			objectFactory := factory.NewObjectFactory(rc)
			gmsStore, err := objectFactory.NewXStoreGMS()
			if err != nil {
				return flow.Error(err, "Unable to new xstore of GMS.")
			}

			err = rc.SetControllerRefAndCreate(gmsStore)
			if err != nil {
				return flow.Error(err, "Unable to create xstore of GMS.")
			}

			return flow.Continue("GMS xstore created!")
		} else {
			gmsStoreGeneration, err := convention.GetGenerationLabelValue(gmsStore)
			if err != nil {
				return flow.Error(err, "Unable to get generation from xstore of GMS.")
			}

			if gmsStoreGeneration < polardbx.Status.ObservedGeneration {

				if !featuregate.StoreUpgrade.Enabled() {
					return flow.Continue("Feature 'StoreUpgrade' not enabled, skip upgrade.", "xstore", gmsStore.Name)
				}

				objectFactory := factory.NewObjectFactory(rc)
				newGmsStore, err := objectFactory.NewXStoreGMS()
				if err != nil {
					return flow.Error(err, "Unable to new xstore of GMS.")
				}

				convention.CopyMetadataForUpdate(&newGmsStore.ObjectMeta, &gmsStore.ObjectMeta,
					polardbx.Status.ObservedGeneration)

				err = rc.Client().Update(rc.Context(), newGmsStore)
				if err != nil {
					return flow.Error(err, "Unable to update xstore of GMS.", "generation", polardbx.Status.ObservedGeneration)
				}

				return flow.Continue("GMS xstore updated!")
			}

			return flow.Pass()
		}
	},
)
View Source
var CreateSecretsIfNotFound = polardbxv1reconcile.NewStepBinder("CreateSecretsIfNotFound",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		accountSecret, err := rc.GetPolarDBXSecret(convention.SecretTypeAccount)
		if client.IgnoreNotFound(err) != nil {
			return flow.Error(err, "Unable to get account secret.")
		}
		if accountSecret == nil {
			accountSecret, err = factory.NewObjectFactory(rc).NewSecret()
			if err != nil {
				return flow.Error(err, "Unable to new account secret.")
			}
			err = rc.SetControllerRefAndCreate(accountSecret)
			if err != nil {
				return flow.Error(err, "Unable to create account secret.")
			}
		}

		keySecret, err := rc.GetPolarDBXSecret(convention.SecretTypeSecurity)
		if client.IgnoreNotFound(err) != nil {
			return flow.Error(err, "Unable to get encode key secret.")
		}
		if keySecret == nil {
			keySecret, err = factory.NewObjectFactory(rc).NewSecuritySecret()
			if err != nil {
				return flow.Error(err, "Unable to new encode key secret.")
			}
			err = rc.SetControllerRefAndCreate(keySecret)
			if err != nil {
				return flow.Error(err, "Unable to create encode key secret.")
			}
		}

		return flow.Pass()
	},
)
View Source
var CreateServicesIfNotFound = polardbxv1reconcile.NewStepBinder("CreateServicesIfNotFound",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		objectFactory := factory.NewObjectFactory(rc)

		for _, serviceType := range []convention.ServiceType{
			convention.ServiceTypeReadWrite,
			convention.ServiceTypeReadOnly,
			convention.ServiceTypeCDCMetrics,
		} {
			service, err := rc.GetPolarDBXService(serviceType)
			if client.IgnoreNotFound(err) != nil {
				return flow.Error(err, "Unable to get service", "service-type", serviceType)
			}

			if service == nil {
				switch serviceType {
				case convention.ServiceTypeReadWrite:
					service, err = objectFactory.NewService()
				case convention.ServiceTypeReadOnly:
					service, err = objectFactory.NewReadOnlyService()
				case convention.ServiceTypeCDCMetrics:
					service, err = objectFactory.NewCDCMetricsService()
				default:
					panic("unimplemented")
				}

				if err != nil {
					return flow.Error(err, "Unable new service.", "service-type", serviceType)
				}

				err = rc.SetControllerRefAndCreate(service)
				if err != nil {
					return flow.Error(err, "Unable to create service.", "service-type", serviceType)
				}
			}
		}

		return flow.Pass()
	},
)
View Source
var RemoveTrailingDNs = polardbxv1reconcile.NewStepBinder("RemoveTrailingDNs",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		polardbx := rc.MustGetPolarDBX()
		topology := polardbx.Status.SpecSnapshot.Topology
		replicas := int(topology.Nodes.DN.Replicas)

		dnStores, err := rc.GetOrderedDNList()
		if err != nil {
			return flow.Error(err, "Unable to get xstores of DN.")
		}

		for i := len(dnStores) - 1; i >= replicas; i-- {
			if dnStores[i].DeletionTimestamp.IsZero() {
				err := rc.Client().Delete(rc.Context(), dnStores[i],
					client.PropagationPolicy(metav1.DeletePropagationBackground))
				if err != nil {
					return flow.Error(err, "Unable to remove trailing DN.", "xstore", dnStores[i].Name)
				}
			}
		}

		if len(dnStores) > replicas {
			return flow.Retry("Trailing DNs are deleted.")
		}
		return flow.Pass()
	},
)
View Source
var WaitUntilCDCDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilCDCDeploymentsRolledOut",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		cdcDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleCDC)
		if err != nil {
			return flow.Error(err, "Unable to get deployments of CDC.")
		}

		if areDeploymentsRolledOut(cdcDeployments) {
			return flow.Continue("Deployments of CDC are rolled out.")
		}
		return flow.Wait("Some deployment of CDC is rolling.")
	},
)
View Source
var WaitUntilCDCPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilCDCPodsStable",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		polardbx := rc.MustGetPolarDBX()

		cdcPods, err := rc.GetPods(polardbxmeta.RoleCDC)
		if err != nil {
			return flow.Error(err, "Unable to get pods of CN.")
		}

		unFinalizedPodsSize := k8shelper.FilterPodsBy(cdcPods, func(pod *corev1.Pod) bool {
			return len(pod.Finalizers) > 0
		})

		cdcTemplate := polardbx.Status.SpecSnapshot.Topology.Nodes.CDC
		cdcReplicas := 0
		if cdcTemplate != nil {
			cdcReplicas = int(cdcTemplate.Replicas)
		}
		if len(unFinalizedPodsSize) == cdcReplicas {
			return flow.Pass()
		}
		return flow.Wait("Wait until some pod to be finalized.")
	},
)
View Source
var WaitUntilCNDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilCNDeploymentsRolledOut",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		cnDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleCN)
		if err != nil {
			return flow.Error(err, "Unable to get deployments of CN.")
		}

		if areDeploymentsRolledOut(cnDeployments) {
			return flow.Continue("Deployments of CN are rolled out.")
		}
		return flow.Wait("Some deployment of CN is rolling.")
	},
)
View Source
var WaitUntilCNPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilCNPodsStable",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		polardbx := rc.MustGetPolarDBX()

		cnPods, err := rc.GetPods(polardbxmeta.RoleCN)
		if err != nil {
			return flow.Error(err, "Unable to get pods of CN.")
		}

		unFinalizedPodsSize := k8shelper.FilterPodsBy(cnPods, func(pod *corev1.Pod) bool {
			return len(pod.Finalizers) > 0
		})

		cnTemplate := &polardbx.Status.SpecSnapshot.Topology.Nodes.CN
		if len(unFinalizedPodsSize) == int(cnTemplate.Replicas) {
			return flow.Pass()
		}
		return flow.Wait("Wait until some pod to be finalized.")
	},
)
View Source
var WaitUntilDNsReady = polardbxv1reconcile.NewStepBinder("WaitUntilDNsReady",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		polardbx := rc.MustGetPolarDBX()
		topology := &polardbx.Status.SpecSnapshot.Topology

		dnStores, err := rc.GetDNMap()
		if err != nil {
			return flow.Error(err, "Unable to list xstores of DNs.")
		}

		notReadyCnt, skipCnt := 0, 0
		for i, dnStore := range dnStores {

			if i >= int(topology.Nodes.DN.Replicas) {
				skipCnt++
				continue
			}

			if dnStore.Status.Phase == polardbxv1xstore.PhaseFailed {
				helper.TransferPhase(rc.MustGetPolarDBX(), polardbxv1polardbx.PhaseFailed)
				return flow.Retry("XStore of DN is failed, transfer phase into failed.", "xstore", dnStore.Name)
			}

			if !isXStoreReady(dnStore) {
				notReadyCnt++
			}
		}

		if notReadyCnt > 0 {
			return flow.Wait("Some xstore of DN is not ready, wait.", "not-ready", notReadyCnt, "skip", skipCnt)
		}

		return flow.Continue("XStores of DNs are ready.", "skip", skipCnt)
	},
)
View Source
var WaitUntilGMSReady = polardbxv1reconcile.NewStepBinder("WaitUntilGMSReady",
	func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) {
		gms, err := rc.GetGMS()
		if client.IgnoreNotFound(err) != nil {
			return flow.Error(err, "Unable to get xstore of GMS.")
		}
		if gms == nil {
			return flow.Wait("XStore of GMS not created.")
		}

		if gms.Status.Phase == polardbxv1xstore.PhaseFailed {
			helper.TransferPhase(rc.MustGetPolarDBX(), polardbxv1polardbx.PhaseFailed)
			return flow.Retry("XStore of GMS is failed, transfer phase into failed.")
		}

		if isXStoreReady(gms) {
			return flow.Continue("XStore of GMS is ready.")
		} else {
			return flow.Wait("XStore of GMS isn't ready, wait.", "xstore", gms.Name, "xstore.phase", gms.Status.Phase)
		}
	},
)

Functions

func RemoveAnnotation

func RemoveAnnotation(key string) control.BindFunc

Types

This section is empty.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL