Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var CreateConfigMapsIfNotFound = polardbxv1reconcile.NewStepBinder("CreateConfigMapsIfNotFound", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { objectFactory := factory.NewObjectFactory(rc) for _, cmType := range []convention.ConfigMapType{ convention.ConfigMapTypeConfig, convention.ConfigMapTypeTask, } { cm, err := rc.GetPolarDBXConfigMap(cmType) if client.IgnoreNotFound(err) != nil { return flow.Error(err, "Unable to get config map.", "configmap-type", cmType) } if cm == nil { cm, err := objectFactory.NewConfigMap(cmType) if err != nil { return flow.Error(err, "Unable to new config map.", "configmap-type", cmType) } err = rc.SetControllerRefAndCreate(cm) if err != nil { return flow.Error(err, "Unable to create config map.", "configmap-type", cmType) } } } return flow.Pass() }, )
View Source
var CreateOrReconcileCDCs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileCDCs", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleCDC) }, )
View Source
var CreateOrReconcileCNs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileCNs", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { return reconcileGroupedDeployments(rc, flow, polardbxmeta.RoleCN) }, )
View Source
var CreateOrReconcileDNs = polardbxv1reconcile.NewStepBinder("CreateOrReconcileDNs", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { polardbx := rc.MustGetPolarDBX() topology := polardbx.Status.SpecSnapshot.Topology observedGeneration := polardbx.Status.ObservedGeneration replicas := int(topology.Nodes.DN.Replicas) dnStores, err := rc.GetDNMap() if err != nil { return flow.Error(err, "Unable to get xstores of DN.") } if !helper.IsPhaseIn(polardbx, polardbxv1polardbx.PhaseCreating, polardbxv1polardbx.PhaseRestoring) { lastIndex := 0 for ; lastIndex < replicas; lastIndex++ { if _, ok := dnStores[lastIndex]; !ok { break } } if lastIndex != replicas && lastIndex != len(dnStores) { helper.TransferPhase(polardbx, polardbxv1polardbx.PhaseFailed) return flow.Retry("Found broken DN, transfer into failed.") } } objectFactory := factory.NewObjectFactory(rc) anyChanged := false wg := &sync.WaitGroup{} errs := make([]error, replicas) var errCnt uint32 for i := 0; i < replicas; i++ { observedDnStore, ok := dnStores[i] if !ok { newDnStore, err := objectFactory.NewXStoreDN(i) if err != nil { return flow.Error(err, "Unable to new xstore of DN.", "index", i) } wg.Add(1) logger, idx := flow.Logger(), i go func() { defer wg.Done() err = rc.SetControllerRefAndCreate(newDnStore) if err != nil { logger.Error(err, "Unable to create xstore of DN.", "index", i) errs[idx] = err atomic.AddUint32(&errCnt, 1) } }() anyChanged = true } else { generation, err := convention.GetGenerationLabelValue(observedDnStore) if err != nil { return flow.Error(err, "Unable to get generation from xstore of DN.", "index", i) } if generation < observedGeneration { if !featuregate.StoreUpgrade.Enabled() { flow.Logger().Info("Feature 'StoreUpgrade' not enabled, skip upgrade.", "xstore", observedDnStore.Name) continue } newDnStore, err := objectFactory.NewXStoreDN(i) if err != nil { return flow.Error(err, "Unable to new xstore of DN.", "index", i) } convention.CopyMetadataForUpdate(&newDnStore.ObjectMeta, &observedDnStore.ObjectMeta, observedGeneration) wg.Add(1) logger, idx := flow.Logger(), i go func() { defer wg.Done() err = rc.Client().Update(rc.Context(), newDnStore) if err != nil { logger.Error(err, "Unable to update xstore of DN.", "index", i) errs[idx] = err atomic.AddUint32(&errCnt, 1) } }() anyChanged = true } } } wg.Wait() if errCnt > 0 { var firstErr error for _, err := range errs { if err != nil { firstErr = err break } } return flow.Error(firstErr, "Unable to create or reconcile xstores of DN.", "error-cnt", errCnt) } toRemoveStores := make(map[int]*polardbxv1.XStore, 0) for index, xstore := range dnStores { if index >= replicas { toRemoveStores[index] = xstore } } if len(toRemoveStores) > 0 { mgr, err := rc.GetPolarDBXGMSManager() if err != nil { return flow.Error(err, "Unable to get manager for GMS.") } storageNodeIds := make(map[string]int) if initialized, err := mgr.IsMetaDBInitialized(); err != nil { return flow.Error(err, "Unable to determine if GMS is initialized.") } else if initialized { storageNodes, err := mgr.ListStorageNodes(gms.StorageKindMaster) if err != nil { return flow.Error(err, "Unable to list storage nodes in GMS.") } for _, s := range storageNodes { storageNodeIds[s.Id] = 1 } } else { } canRemoveStoreIndices := make([]int, 0) for index, xstore := range toRemoveStores { if _, found := storageNodeIds[xstore.Name]; !found { canRemoveStoreIndices = append(canRemoveStoreIndices, index) } } if len(canRemoveStoreIndices) > 0 { sort.Slice(canRemoveStoreIndices, func(i, j int) bool { return canRemoveStoreIndices[i] > canRemoveStoreIndices[j] }) flow.Logger().Info("Trying to remove trailing xstores (not in use)", "trailing-indices", canRemoveStoreIndices) for _, index := range canRemoveStoreIndices { xstore := dnStores[index] err := rc.Client().Delete(rc.Context(), xstore, client.PropagationPolicy(metav1.DeletePropagationBackground)) if err != nil { return flow.Error(err, "Unable to delete unused trailing xstore.", "xstore", xstore.Name) } } flow.Logger().Info("Unused trailing xstores are all removed.") } } if anyChanged { return flow.Retry("DNs created or updated!") } return flow.Pass() }, )
View Source
var CreateOrReconcileGMS = polardbxv1reconcile.NewStepBinder("CreateOrReconcileGMS", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { polardbx := rc.MustGetPolarDBX() if polardbx.Spec.ShareGMS { return flow.Continue("GMS shared, skip.") } gmsStore, err := rc.GetGMS() if client.IgnoreNotFound(err) != nil { return flow.Continue("Unable to get xstore of GMS.") } if gmsStore == nil { if !helper.IsPhaseIn(polardbx, polardbxv1polardbx.PhaseCreating, polardbxv1polardbx.PhaseRestoring) { helper.TransferPhase(polardbx, polardbxv1polardbx.PhaseFailed) return flow.Retry("GMS not found, transfer into failed.") } objectFactory := factory.NewObjectFactory(rc) gmsStore, err := objectFactory.NewXStoreGMS() if err != nil { return flow.Error(err, "Unable to new xstore of GMS.") } err = rc.SetControllerRefAndCreate(gmsStore) if err != nil { return flow.Error(err, "Unable to create xstore of GMS.") } return flow.Continue("GMS xstore created!") } else { gmsStoreGeneration, err := convention.GetGenerationLabelValue(gmsStore) if err != nil { return flow.Error(err, "Unable to get generation from xstore of GMS.") } if gmsStoreGeneration < polardbx.Status.ObservedGeneration { if !featuregate.StoreUpgrade.Enabled() { return flow.Continue("Feature 'StoreUpgrade' not enabled, skip upgrade.", "xstore", gmsStore.Name) } objectFactory := factory.NewObjectFactory(rc) newGmsStore, err := objectFactory.NewXStoreGMS() if err != nil { return flow.Error(err, "Unable to new xstore of GMS.") } convention.CopyMetadataForUpdate(&newGmsStore.ObjectMeta, &gmsStore.ObjectMeta, polardbx.Status.ObservedGeneration) err = rc.Client().Update(rc.Context(), newGmsStore) if err != nil { return flow.Error(err, "Unable to update xstore of GMS.", "generation", polardbx.Status.ObservedGeneration) } return flow.Continue("GMS xstore updated!") } return flow.Pass() } }, )
View Source
var CreateSecretsIfNotFound = polardbxv1reconcile.NewStepBinder("CreateSecretsIfNotFound", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { accountSecret, err := rc.GetPolarDBXSecret(convention.SecretTypeAccount) if client.IgnoreNotFound(err) != nil { return flow.Error(err, "Unable to get account secret.") } if accountSecret == nil { accountSecret, err = factory.NewObjectFactory(rc).NewSecret() if err != nil { return flow.Error(err, "Unable to new account secret.") } err = rc.SetControllerRefAndCreate(accountSecret) if err != nil { return flow.Error(err, "Unable to create account secret.") } } keySecret, err := rc.GetPolarDBXSecret(convention.SecretTypeSecurity) if client.IgnoreNotFound(err) != nil { return flow.Error(err, "Unable to get encode key secret.") } if keySecret == nil { keySecret, err = factory.NewObjectFactory(rc).NewSecuritySecret() if err != nil { return flow.Error(err, "Unable to new encode key secret.") } err = rc.SetControllerRefAndCreate(keySecret) if err != nil { return flow.Error(err, "Unable to create encode key secret.") } } return flow.Pass() }, )
View Source
var CreateServicesIfNotFound = polardbxv1reconcile.NewStepBinder("CreateServicesIfNotFound", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { objectFactory := factory.NewObjectFactory(rc) for _, serviceType := range []convention.ServiceType{ convention.ServiceTypeReadWrite, convention.ServiceTypeReadOnly, convention.ServiceTypeCDCMetrics, } { service, err := rc.GetPolarDBXService(serviceType) if client.IgnoreNotFound(err) != nil { return flow.Error(err, "Unable to get service", "service-type", serviceType) } if service == nil { switch serviceType { case convention.ServiceTypeReadWrite: service, err = objectFactory.NewService() case convention.ServiceTypeReadOnly: service, err = objectFactory.NewReadOnlyService() case convention.ServiceTypeCDCMetrics: service, err = objectFactory.NewCDCMetricsService() default: panic("unimplemented") } if err != nil { return flow.Error(err, "Unable new service.", "service-type", serviceType) } err = rc.SetControllerRefAndCreate(service) if err != nil { return flow.Error(err, "Unable to create service.", "service-type", serviceType) } } } return flow.Pass() }, )
View Source
var RemoveTrailingDNs = polardbxv1reconcile.NewStepBinder("RemoveTrailingDNs", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { polardbx := rc.MustGetPolarDBX() topology := polardbx.Status.SpecSnapshot.Topology replicas := int(topology.Nodes.DN.Replicas) dnStores, err := rc.GetOrderedDNList() if err != nil { return flow.Error(err, "Unable to get xstores of DN.") } for i := len(dnStores) - 1; i >= replicas; i-- { if dnStores[i].DeletionTimestamp.IsZero() { err := rc.Client().Delete(rc.Context(), dnStores[i], client.PropagationPolicy(metav1.DeletePropagationBackground)) if err != nil { return flow.Error(err, "Unable to remove trailing DN.", "xstore", dnStores[i].Name) } } } if len(dnStores) > replicas { return flow.Retry("Trailing DNs are deleted.") } return flow.Pass() }, )
View Source
var WaitUntilCDCDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilCDCDeploymentsRolledOut", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { cdcDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleCDC) if err != nil { return flow.Error(err, "Unable to get deployments of CDC.") } if areDeploymentsRolledOut(cdcDeployments) { return flow.Continue("Deployments of CDC are rolled out.") } return flow.Wait("Some deployment of CDC is rolling.") }, )
View Source
var WaitUntilCDCPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilCDCPodsStable", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { polardbx := rc.MustGetPolarDBX() cdcPods, err := rc.GetPods(polardbxmeta.RoleCDC) if err != nil { return flow.Error(err, "Unable to get pods of CN.") } unFinalizedPodsSize := k8shelper.FilterPodsBy(cdcPods, func(pod *corev1.Pod) bool { return len(pod.Finalizers) > 0 }) cdcTemplate := polardbx.Status.SpecSnapshot.Topology.Nodes.CDC cdcReplicas := 0 if cdcTemplate != nil { cdcReplicas = int(cdcTemplate.Replicas) } if len(unFinalizedPodsSize) == cdcReplicas { return flow.Pass() } return flow.Wait("Wait until some pod to be finalized.") }, )
View Source
var WaitUntilCNDeploymentsRolledOut = polardbxv1reconcile.NewStepBinder("WaitUntilCNDeploymentsRolledOut", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { cnDeployments, err := rc.GetDeploymentMap(polardbxmeta.RoleCN) if err != nil { return flow.Error(err, "Unable to get deployments of CN.") } if areDeploymentsRolledOut(cnDeployments) { return flow.Continue("Deployments of CN are rolled out.") } return flow.Wait("Some deployment of CN is rolling.") }, )
View Source
var WaitUntilCNPodsStable = polardbxv1reconcile.NewStepBinder("WaitUntilCNPodsStable", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { polardbx := rc.MustGetPolarDBX() cnPods, err := rc.GetPods(polardbxmeta.RoleCN) if err != nil { return flow.Error(err, "Unable to get pods of CN.") } unFinalizedPodsSize := k8shelper.FilterPodsBy(cnPods, func(pod *corev1.Pod) bool { return len(pod.Finalizers) > 0 }) cnTemplate := &polardbx.Status.SpecSnapshot.Topology.Nodes.CN if len(unFinalizedPodsSize) == int(cnTemplate.Replicas) { return flow.Pass() } return flow.Wait("Wait until some pod to be finalized.") }, )
View Source
var WaitUntilDNsReady = polardbxv1reconcile.NewStepBinder("WaitUntilDNsReady", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { polardbx := rc.MustGetPolarDBX() topology := &polardbx.Status.SpecSnapshot.Topology dnStores, err := rc.GetDNMap() if err != nil { return flow.Error(err, "Unable to list xstores of DNs.") } notReadyCnt, skipCnt := 0, 0 for i, dnStore := range dnStores { if i >= int(topology.Nodes.DN.Replicas) { skipCnt++ continue } if dnStore.Status.Phase == polardbxv1xstore.PhaseFailed { helper.TransferPhase(rc.MustGetPolarDBX(), polardbxv1polardbx.PhaseFailed) return flow.Retry("XStore of DN is failed, transfer phase into failed.", "xstore", dnStore.Name) } if !isXStoreReady(dnStore) { notReadyCnt++ } } if notReadyCnt > 0 { return flow.Wait("Some xstore of DN is not ready, wait.", "not-ready", notReadyCnt, "skip", skipCnt) } return flow.Continue("XStores of DNs are ready.", "skip", skipCnt) }, )
View Source
var WaitUntilGMSReady = polardbxv1reconcile.NewStepBinder("WaitUntilGMSReady", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { gms, err := rc.GetGMS() if client.IgnoreNotFound(err) != nil { return flow.Error(err, "Unable to get xstore of GMS.") } if gms == nil { return flow.Wait("XStore of GMS not created.") } if gms.Status.Phase == polardbxv1xstore.PhaseFailed { helper.TransferPhase(rc.MustGetPolarDBX(), polardbxv1polardbx.PhaseFailed) return flow.Retry("XStore of GMS is failed, transfer phase into failed.") } if isXStoreReady(gms) { return flow.Continue("XStore of GMS is ready.") } else { return flow.Wait("XStore of GMS isn't ready, wait.", "xstore", gms.Name, "xstore.phase", gms.Status.Phase) } }, )
Functions ¶
func RemoveAnnotation ¶
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.