Documentation
¶
Index ¶
- Variables
- func CancelHpfsAsyncTasks(ctx context.Context, client hpfs.HpfsServiceClient, pod *corev1.Pod) error
- func CreatePodsAndHeadlessServicesWithExtraFactory(extraPodFactory factory.ExtraPodFactory) control.BindFunc
- func DeleteHostPathVolume(ctx context.Context, hpfsClient hpfs.HpfsServiceClient, ...) error
- func IsDiskQuotaExceeds(xstore *polardbxv1.XStore) bool
- func IsHpfsAsyncTaskComplete(ctx context.Context, client hpfs.HpfsServiceClient, pod *corev1.Pod) (bool, error)
- func LoadExecutionContext(rc *xstorev1reconcile.Context) (*context.ExecutionContext, error)
- func NewExecutionContext(rc *xstorev1reconcile.Context, xstore *polardbxv1.XStore, selfHeal bool) (*context.ExecutionContext, error)
- func ReportRoleAndCurrentLeader(rc *xstorev1reconcile.Context, pod *corev1.Pod, logger logr.Logger) (string, string, error)
- func SyncBlkioCgroupValuesViaHpfs(ctx context.Context, hpfsClient hpfs.HpfsServiceClient, pod *corev1.Pod, ...) error
- func TrackAndLazyUpdateExecuteContext(current *context.ExecutionContext) control.BindFunc
- func TryDetectLeaderAndTryReconcileLabels(rc *xstorev1reconcile.Context, pods []corev1.Pod, logger logr.Logger) (string, bool)
- func TryDetectLeaderChange(rc *xstorev1reconcile.Context, pods []corev1.Pod, logger logr.Logger) (string, bool)
- func TryReconcileLabels(rc *xstorev1reconcile.Context, pods []corev1.Pod, leaderPod string, ...)
- func UpdateHostPathVolumeSizesTemplate(d time.Duration) control.BindFunc
- func UpdatePhaseTemplate(phase polardbxv1xstore.Phase, requeue ...bool) control.BindFunc
- func UpdateStageTemplate(stage polardbxv1xstore.Stage, requeue ...bool) control.BindFunc
- func WhenDiskQuotaExceeds(binders ...control.BindFunc) control.BindFunc
- func WhenDiskQuotaNotExceeds(binders ...control.BindFunc) control.BindFunc
- func WhenPodsDeletedFound(binders ...control.BindFunc) control.BindFunc
- func WhenTopologyChanged(binders ...control.BindFunc) control.BindFunc
Constants ¶
This section is empty.
Variables ¶
View Source
var AbortReconcileIfHintFound = xstorev1reconcile.NewStepBinder("AbortReconcileIfHintFound", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { if rc.ContainsControllerHint(xstoremeta.HintForbidden) { return flow.Wait("Found hint, abort reconcile.") } return flow.Pass() }, )
View Source
var AddLearnerNodesToClusterOnLeader = xstorev1reconcile.NewStepBinder("AddLearnerNodesToClusterOnLeader", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get xstore pods.") } learnerPods := k8shelper.FilterPodsBy(pods, xstoremeta.IsPodRoleLearner) if len(learnerPods) == 0 { return flow.Pass() } leaderPod, err := rc.TryGetXStoreLeaderPod() if err != nil { return flow.Error(err, "Unable to get leader pod.") } if leaderPod == nil { return flow.Wait("Leader not found, keep waiting...") } for _, learnerPod := range learnerPods { cmd := xstoreexec.NewCanonicalCommandBuilder(). Consensus(). AddNode(learnerPod.Name, xstoremeta.RoleLearner). Build() err := rc.ExecuteCommandOn(leaderPod, convention.ContainerEngine, cmd, control.ExecOptions{ Logger: flow.Logger(), Timeout: 2 * time.Second, }) if err != nil { return flow.Error(err, "Unable to add learner node.", "pod", learnerPod.Name) } } return flow.Continue("Learner nodes added.") }, )
View Source
var BindHostPathVolumesToHost = xstorev1reconcile.NewStepBinder("BindHostPathVolumesToHost", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } binding := xstore.Status.BoundVolumes for _, pod := range pods { if len(pod.Spec.NodeName) == 0 { return flow.Wait("Some pod is still not scheduled.", "pod", pod.Name) } binding[pod.Name].Host = pod.Spec.NodeName } return flow.Pass() }, )
View Source
var BindPodPorts = xstorev1reconcile.NewStepBinder("BindPodPorts", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } podPorts := make(map[string]polardbxv1xstore.PodPorts) for _, pod := range pods { ports := polardbxv1xstore.PodPorts{} for _, container := range pod.Spec.Containers { for _, port := range container.Ports { ports[port.Name] = port.ContainerPort } } podPorts[pod.Name] = ports } xstore.Status.PodPorts = podPorts return flow.Continue("Pod ports updated!") }, )
View Source
var CancelAsyncTasks = xstorev1reconcile.NewStepBinder("CancelAsyncTasks", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } hpfsClient, err := rc.GetHpfsClient() if err != nil { return flow.Error(err, "Unable to get hpfs client.") } for _, pod := range pods { err := CancelHpfsAsyncTasks(rc.Context(), hpfsClient, &pod) if err != nil { return flow.Error(err, "Unable to cancel async task", "pod", pod.Name) } } return flow.Pass() }, )
View Source
var CheckConnectivityAndSetEngineVersion = xstorev1reconcile.NewStepBinder("CheckConnectivityFromController", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { passwd, err := rc.GetXStoreAccountPassword(convention.SuperAccount) if err != nil { return flow.Error(err, "Unable to get password for super account.") } clusterAddr, err := rc.GetXStoreClusterAddr(convention.ServiceTypeReadWrite, convention.PortAccess) if err != nil { return flow.Error(err, "Unable to get cluster address.") } db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s)/information_schema?timeout=1s", convention.SuperAccount, passwd, clusterAddr)) if err != nil { return flow.Error(err, "Unable to open connection to cluster address.") } defer db.Close() if err := db.PingContext(rc.Context()); err != nil { flow.Logger().Error(err, "Ping failed.") return flow.RetryAfter(10*time.Second, "Failed to ping, wait for 10 seconds and retry...") } row := db.QueryRowContext(rc.Context(), "SELECT VERSION()") var version string err = row.Scan(&version) if err != nil { return flow.Error(err, "Unable to read version") } xstore := rc.MustGetXStore() xstore.Status.EngineVersion = version return flow.Continue("Succeed.") }, )
View Source
var CheckTopologySpec = xstorev1reconcile.NewStepBinder("CheckTopologySpec", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() err := checkTopologySpec(xstore) if err != nil { xstore.Status.Phase = polardbxv1xstore.PhaseFailed return flow.Error(err, "Check topology failed. Transfer phase into Failed.") } return flow.Pass() }, )
View Source
var CreateAccounts = xstorev1reconcile.NewStepBinder("CreateAccounts", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { leaderPod, err := rc.TryGetXStoreLeaderPod() if err != nil { return flow.Error(err, "Unable to get leader pod.") } if leaderPod == nil { return flow.RetryAfter(5*time.Second, "Leader not found, wait 5 seconds and retry...") } secret, err := rc.GetXStoreSecret() if err != nil { return flow.Error(err, "Unable to get secret.") } for user, passwd := range secret.Data { cmd := xstoreexec.NewCanonicalCommandBuilder(). Account().Create(user, string(passwd)). Build() err := rc.ExecuteCommandOn(leaderPod, "engine", cmd, control.ExecOptions{ Logger: flow.Logger(), }) if err != nil { if k8shelper.IsExitError(err) { flow.Logger().Error(err, "Failed to create account.") return flow.Wait("Failed to create account.", "leader-pod", leaderPod.Name) } return flow.Error(err, "Unable to create account.", "leader-pod", leaderPod.Name) } } return flow.Continue("All accounts are created.") }, )
View Source
var CreateSecret = xstorev1reconcile.NewStepBinder("CreateSecret", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() secret, err := rc.GetXStoreSecret() if client.IgnoreNotFound(err) != nil { return flow.Error(err, "Unable to get secret.") } if secret == nil { secret = factory.NewSecret(xstore) err := rc.SetControllerRefAndCreate(secret) if err != nil { return flow.Error(err, "Unable to create secret.") } } return flow.Continue("Secret ready.") }, )
View Source
var DeleteAllPods = xstorev1reconcile.NewStepBinder("DeleteAllPods", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } for _, pod := range pods { if pod.DeletionTimestamp.IsZero() { if err := rc.Client().Delete(rc.Context(), &pod); err != nil { if apierrors.IsNotFound(err) { continue } return flow.Error(err, "Unable to delete pod.", "pod", pod.Name) } } } return flow.Continue("All pods deleted.") }, )
View Source
var DeleteExecutionContext = xstorev1reconcile.NewStepBinder("DeleteExecutionContext", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { taskCm, err := rc.GetXStoreConfigMap(convention.ConfigMapTypeTask) if err != nil { return flow.Error(err, "Unable to get task config map.") } _, ok := taskCm.Data["exec"] if !ok { return flow.Pass() } delete(taskCm.Data, "exec") if err := rc.Client().Update(rc.Context(), taskCm); err != nil { return flow.Error(err, "Unable to update tass config map.") } return flow.Continue("Deleted!") }, )
View Source
var DeleteHostPathVolumes = xstorev1reconcile.NewStepBinder("DeleteHostPathVolumes", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() volumes := xstore.Status.BoundVolumes if volumes == nil { return flow.Pass() } nodes, err := rc.GetNodes() if err != nil { return flow.Error(err, "Unable to get nodes.") } observedNodes := k8shelper.ToObjectNameSet(nodes) pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } podMap := k8shelper.BuildPodMap(pods, func(pod *corev1.Pod) string { return pod.Name }) hpfsClient, err := rc.GetHpfsClient() if err != nil { return flow.Error(err, "Unable to get hpfs client.") } for podName, vol := range volumes { if len(vol.Host) == 0 { pod := podMap[podName] if pod != nil { vol.Host = pod.Spec.NodeName } } if _, ok := observedNodes[vol.Host]; !ok { continue } err := DeleteHostPathVolume(rc.Context(), hpfsClient, vol) if err != nil { return flow.Error(err, "Unable to remove host path volume.", "vol.pod", podName, "vol.host", vol.Host, "vol.type", vol.Type, "vol.path", vol.HostPath) } } xstore.Status.BoundVolumes = nil return flow.Continue("Volumes are deleted.") })
View Source
var FillServiceNameIfNotProvided = xstorev1reconcile.NewStepBinder("FillServiceNameIfNotProvided", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() if len(xstore.Spec.ServiceName) == 0 { xstore.Spec.ServiceName = xstore.Name rc.MarkXStoreChanged() } return flow.Pass() }, )
View Source
var GenerateRandInStatus = xstorev1reconcile.NewStepBinder("GenerateRandInStatus", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() if len(xstore.Status.Rand) == 0 { if val, ok := xstore.Annotations[xstoremeta.AnnotationGuideRand]; ok { xstore.Status.Rand = val } else { xstore.Status.Rand = rand.String(4) } } return flow.Pass() }, )
View Source
var InjectFinalizerOnXStore = xstorev1reconcile.NewStepBinder("InjectFinalizerOnXStore", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() if controllerutil.ContainsFinalizer(xstore, xstoremeta.Finalizer) { return flow.Pass() } controllerutil.AddFinalizer(xstore, xstoremeta.Finalizer) rc.MarkXStoreChanged() return flow.Continue("Inject finalizer.") }, )
View Source
var MoveToPhaseDeletingIfDeleted = xstorev1reconcile.NewStepBinder("MoveToPhaseDeletingIfDeleted", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore, err := rc.GetXStore() if err != nil { return flow.Error(err, "Unable to get xstore.") } if xstore.Status.Phase == polardbxv1xstore.PhaseDeleting { return flow.Pass() } if !xstore.DeletionTimestamp.IsZero() { if len(xstore.Finalizers) == 0 || (len(xstore.Finalizers) == 1 && controllerutil.ContainsFinalizer(xstore, xstoremeta.Finalizer)) { xstore.Status.Phase = polardbxv1xstore.PhaseDeleting xstore.Status.Stage = polardbxv1xstore.StageEmpty return flow.Retry("Move phase to deleting. Retry immediately!") } else { return flow.Wait("Other finalizers found, wait until removed...") } } return flow.Pass() })
View Source
var PersistentStatus = xstorev1reconcile.NewStepBinder("PersistentStatus", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { if rc.IsXStoreStatusChanged() { if err := rc.UpdateXStoreStatus(); err != nil { return flow.Error(err, "Unable to persistent status.") } return flow.Continue("Succeeds to persistent status.") } return flow.Continue("Status not changed.") })
View Source
var PersistentXStore = xstorev1reconcile.NewStepBinder("PersistentXStore", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { if rc.IsXStoreChanged() { if err := rc.UpdateXStore(); err != nil { return flow.Error(err, "Unable to persistent xstore.") } return flow.Continue("Succeeds to persistent xstore.") } return flow.Continue("Object not changed.") })
View Source
var PrepareHostPathVolumes = xstorev1reconcile.NewStepBinder("PrepareHostPathVolumes", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() podVolumes := preparePodVolumeBindings(xstore, rc.Config()) volumes := make(map[string]*polardbxv1xstore.HostPathVolume) for pod, vPath := range podVolumes { volumes[pod] = &polardbxv1xstore.HostPathVolume{ Pod: pod, HostPath: vPath, Type: corev1.HostPathDirectory, } } xstore.Status.BoundVolumes = volumes return flow.Continue("Host path volumes prepared.") }, )
View Source
var QueryAndUpdateEngineVersion = xstorev1reconcile.NewStepBinder("QueryAndUpdateEngineVersion", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { leaderPod, err := rc.TryGetXStoreLeaderPod() if err != nil { return flow.Error(err, "Unable to get leader pod.") } if leaderPod == nil { return flow.Wait("Leader pod not found, wait.") } cmd := command.NewCanonicalCommandBuilder().Engine().Version().Build() buf := &bytes.Buffer{} err = rc.ExecuteCommandOn(leaderPod, convention.ContainerEngine, cmd, control.ExecOptions{ Logger: flow.Logger(), Stdout: buf, Timeout: 2 * time.Second, }) if err != nil { return flow.Error(err, "Failed to query version on leader pod.", "pod", leaderPod.Name) } engineVersion := strings.TrimSpace(buf.String()) if engineVersion == "" { return flow.Error(errors.New("empty engine version"), "Engine version is empty.") } xstore := rc.MustGetXStore() xstore.Status.EngineVersion = engineVersion return flow.Pass() }, )
View Source
var ReconcileConsensusRoleLabels = xstorev1reconcile.NewStepBinder("ReconcileConsensusRoleLabels", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } flow.Logger().Info("Try detecting leader and reconciling the labels...") currentLeader, leaderSwitched := TryDetectLeaderAndTryReconcileLabels(rc, pods, flow.Logger()) if len(currentLeader) == 0 { xstore.Status.LeaderPod = "" rc.UpdateXStoreCondition(&polardbxv1xstore.Condition{ Type: polardbxv1xstore.LeaderReady, Status: corev1.ConditionFalse, Reason: "LeaderNotFound", Message: "Leader not found", }) return flow.Continue("Leader not found!") } else if leaderSwitched { xstore.Status.LeaderPod = currentLeader rc.UpdateXStoreCondition(&polardbxv1xstore.Condition{ Type: polardbxv1xstore.LeaderReady, Status: corev1.ConditionTrue, Reason: "LeaderFound", Message: "Leader found: " + currentLeader, }) return flow.Continue("Leader changed!", "leader-pod", currentLeader) } else { xstore.Status.LeaderPod = currentLeader rc.UpdateXStoreCondition(&polardbxv1xstore.Condition{ Type: polardbxv1xstore.LeaderReady, Status: corev1.ConditionTrue, Reason: "LeaderFound", Message: "Leader found: " + currentLeader, }) return flow.Continue("Leader not changed.", "leader-pod", currentLeader) } }, )
View Source
var RemoveFinalizerFromXStore = xstorev1reconcile.NewStepBinder("RemoveFinalizerFromXStore", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() if !controllerutil.ContainsFinalizer(xstore, xstoremeta.Finalizer) { return flow.Pass() } controllerutil.RemoveFinalizer(xstore, xstoremeta.Finalizer) rc.MarkXStoreChanged() return flow.Continue("Remove finalizer.") }, )
View Source
var SyncBlkioCgroupResourceLimits = xstorev1reconcile.NewStepBinder("SyncBlkioCgroupResourceLimits", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } hpfsClient, err := rc.GetHpfsClient() if err != nil { return flow.Error(err, "Unable to get hpfs client") } ctx, cancel := context.WithTimeout(rc.Context(), 10*time.Second) defer cancel() xstore := rc.MustGetXStore() topology := xstore.Spec.Topology nodeSets := make(map[string]*polardbxv1xstore.NodeSet) for i := range topology.NodeSets { ns := &topology.NodeSets[i] nodeSets[ns.Name] = ns } volumes := xstore.Status.BoundVolumes for _, pod := range pods { ns := nodeSets[pod.Labels[xstoremeta.LabelNodeSet]] if ns == nil { continue } resources := topology.Template.Spec.Resources if ns.Template != nil && ns.Template.Spec.Resources != nil { resources = ns.Template.Spec.Resources } if resources == nil || resources.LimitsIO == nil { continue } blkioVal, ok := polardbxv1common.ResourceBlkioValueStr(resources.LimitsIO) if !ok { continue } annotationVal := pod.ObjectMeta.Annotations[xstoremeta.AnnotationBlkioResourceLimit] if annotationVal == blkioVal { continue } err := SyncBlkioCgroupValuesViaHpfs(ctx, hpfsClient, &pod, volumes[pod.Name], resources) if err != nil { return flow.Error(err, "Failed to sync blkio cgroup values.", "host", pod.Spec.NodeName, "pod", pod.Name, "blkio", blkioVal) } metav1.SetMetaDataAnnotation(&pod.ObjectMeta, xstoremeta.AnnotationBlkioResourceLimit, blkioVal) if err := rc.Client().Update(ctx, &pod); err != nil { return flow.Error(err, "Unable to update pod.", "pod", pod.Name) } } return flow.Continue("Succeeds to sync cgroup blkio values.") }, )
View Source
var UpdateDisplayStatus = xstorev1reconcile.NewStepBinder("UpdateDisplayStatus", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() status := &xstore.Status topology := status.ObservedTopology if topology != nil { status.TotalPods = 0 for _, t := range topology.NodeSets { status.TotalPods += t.Replicas } } pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } status.ReadyPods = 0 for _, po := range pods { if k8shelper.IsPodReady(&po) { status.ReadyPods++ } } status.ReadyStatus = fmt.Sprintf("%d/%d", status.ReadyPods, status.TotalPods) if status.BoundVolumes != nil { totalDataDirSize := int64(0) for _, v := range status.BoundVolumes { if v != nil { totalDataDirSize += v.Size } } status.TotalDataDirSize = unit.ByteCountIEC(totalDataDirSize) } return flow.Continue("Display status updated!") })
View Source
var UpdateObservedGeneration = xstorev1reconcile.NewStepBinder("UpdateObservedGeneration", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() prevGen := xstore.Status.ObservedGeneration xstore.Status.ObservedGeneration = xstore.Generation return flow.Continue("Update observed generation.", "previous-generation", prevGen, "current-generation", xstore.Generation) }, )
View Source
var UpdateObservedTopologyAndConfig = xstorev1reconcile.NewStepBinder("UpdateObservedTopologyAndConfig", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { xstore := rc.MustGetXStore() xstore.Status.ObservedTopology = xstore.Spec.Topology.DeepCopy() xstore.Status.ObservedConfig = xstore.Spec.Config.DeepCopy() return flow.Continue("Update observed topology and config.", "current-generation", xstore.Generation) }, )
View Source
var WaitUntilAsyncTasksCanceled = xstorev1reconcile.NewStepBinder("WaitUntilAsyncTasksCanceled", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } hpfsClient, err := rc.GetHpfsClient() if err != nil { return flow.Error(err, "Unable to get hpfs client.") } for _, pod := range pods { completed, err := IsHpfsAsyncTaskComplete(rc.Context(), hpfsClient, &pod) if err != nil { return flow.Error(err, "Unable to determine the async task's status", "pod", pod.Name) } if !completed { return flow.Wait("Found async hpfs task that is still not completed or canceled.", "pod", pod.Name) } } return flow.Pass() }, )
View Source
var WaitUntilCandidatesAndVotersReady = xstorev1reconcile.NewStepBinder("WaitUntilCandidatesAndVotersReady", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } for _, pod := range pods { if xstoremeta.IsPodRoleLearner(&pod) { continue } if !k8shelper.IsPodReady(&pod) { return flow.Wait("Found candidate or voter pod not ready. Just wait.", "pod", pod.Name, "pod.phase", pod.Status.Phase) } err := xstoreexec.CheckConnectivityLocally(rc, &pod, "engine", flow.Logger()) if err != nil { return flow.Error(err, "Failed to check connectivity locally.", "pod", pod.Name) } } return flow.Continue("All candidates and voters are ready for connections.") }, )
View Source
var WaitUntilLeaderElected = xstorev1reconcile.NewStepBinder("WaitUntilLeaderElected", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { leaderPod, err := rc.TryGetXStoreLeaderPod() if err != nil { return flow.Error(err, "Unable to get leader pod.") } if leaderPod == nil { return flow.Wait("Leader not found, keep waiting...") } return flow.Continue("Leader found.", "leader-pod", leaderPod.Name) }, )
View Source
var WaitUntilLearnersReady = xstorev1reconcile.NewStepBinder("WaitUntilLearnersReady", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } for _, pod := range pods { if !xstoremeta.IsPodRoleLearner(&pod) { continue } if !k8shelper.IsPodReady(&pod) { return flow.Wait("Found learner pod not ready. Just wait.", "pod", pod.Name, "pod.phase", pod.Status.Phase) } err := xstoreexec.CheckConnectivityLocally(rc, &pod, "engine", flow.Logger()) if err != nil { return flow.Error(err, "Failed to check connectivity locally.", "pod", pod.Name) } } return flow.Continue("All learners are ready for connections.") }, )
View Source
var WaitUntilPodsReady = xstorev1reconcile.NewStepBinder("WaitUntilPodsReady", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } unready := k8shelper.FilterPodsBy(pods, func(pod *corev1.Pod) bool { return !k8shelper.IsPodReady(pod) }) if len(unready) > 0 { return flow.Wait("Found unready pods, keep waiting...", "unready-pods", strings.Join(k8shelper.ToObjectNames(unready), ",")) } return flow.Pass() }, )
View Source
var WaitUntilPodsScheduled = xstorev1reconcile.NewStepBinder("WaitUntilPodsScheduled", func(rc *xstorev1reconcile.Context, flow control.Flow) (reconcile.Result, error) { pods, err := rc.GetXStorePods() if err != nil { return flow.Error(err, "Unable to get pods.") } unscheduled := k8shelper.FilterPodsBy(pods, func(pod *corev1.Pod) bool { return !k8shelper.IsPodScheduled(pod) || pod.Status.PodIP == "" }) if len(unscheduled) > 0 { return flow.Wait("Found unscheduled pods, keep waiting...", "unscheduled-pods", strings.Join(k8shelper.ToObjectNames(unscheduled), ",")) } return flow.Pass() }, )
Functions ¶
func CancelHpfsAsyncTasks ¶
func CreatePodsAndHeadlessServicesWithExtraFactory ¶
func CreatePodsAndHeadlessServicesWithExtraFactory(extraPodFactory factory.ExtraPodFactory) control.BindFunc
func DeleteHostPathVolume ¶
func DeleteHostPathVolume(ctx context.Context, hpfsClient hpfs.HpfsServiceClient, vol *polardbxv1xstore.HostPathVolume) error
func IsDiskQuotaExceeds ¶
func IsDiskQuotaExceeds(xstore *polardbxv1.XStore) bool
func IsHpfsAsyncTaskComplete ¶
func LoadExecutionContext ¶
func LoadExecutionContext(rc *xstorev1reconcile.Context) (*context.ExecutionContext, error)
func NewExecutionContext ¶
func NewExecutionContext(rc *xstorev1reconcile.Context, xstore *polardbxv1.XStore, selfHeal bool) (*context.ExecutionContext, error)
func SyncBlkioCgroupValuesViaHpfs ¶
func SyncBlkioCgroupValuesViaHpfs(ctx context.Context, hpfsClient hpfs.HpfsServiceClient, pod *corev1.Pod, vol *polardbxv1xstore.HostPathVolume, resources *polardbxv1common.ExtendedResourceRequirements) error
func TrackAndLazyUpdateExecuteContext ¶
func TrackAndLazyUpdateExecuteContext(current *context.ExecutionContext) control.BindFunc
func TryDetectLeaderChange ¶
func TryReconcileLabels ¶
func UpdatePhaseTemplate ¶
func UpdatePhaseTemplate(phase polardbxv1xstore.Phase, requeue ...bool) control.BindFunc
func UpdateStageTemplate ¶
func UpdateStageTemplate(stage polardbxv1xstore.Stage, requeue ...bool) control.BindFunc
func WhenDiskQuotaNotExceeds ¶
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.