Documentation ¶
Index ¶
- Variables
- func IsTrailingDNsDrained(rc *polardbxv1reconcile.Context, rebalanceTask *DataRebalanceTask) (bool, error)
- func WatchRebalanceTaskAntUpdateProgress(interval time.Duration) control.BindFunc
- type DataRebalanceTask
- func (t *DataRebalanceTask) IsReady(rc *polardbxv1reconcile.Context) (bool, error)
- func (t *DataRebalanceTask) Progress(rc *polardbxv1reconcile.Context) (int, error)
- func (t *DataRebalanceTask) Skip() bool
- func (t *DataRebalanceTask) Start(rc *polardbxv1reconcile.Context) (string, error)
- func (t *DataRebalanceTask) Started() bool
Constants ¶
This section is empty.
Variables ¶
View Source
var EnsureTrailingDNsAreDrainedOrBlock = polardbxv1reconcile.NewStepBinder( "EnsureTrailingDNsAreDrainedOrRestartRebalance", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask) if err != nil { return flow.Error(err, "Unable to get config map for task.") } contextAccess := task.NewContextAccess(taskCm, "rebalance") rebalanceTask := &DataRebalanceTask{} ok, err := contextAccess.Read(rebalanceTask) if err != nil { return flow.Error(err, "Unable to read rebalance task context.") } if !ok { return flow.Error(errors.New("no rebalance task context found"), "Unable to find rebalance task context.") } drained, err := IsTrailingDNsDrained(rc, rebalanceTask) if err != nil { return flow.Error(err, "Unable to determine if trailing DNs are drained.") } polardbx := rc.MustGetPolarDBX() cdcNodeSpec := polardbx.Status.SpecSnapshot.Topology.Nodes.CDC if featuregate.WaitDrainedNodeToBeOffline.Enabled() || (cdcNodeSpec != nil && cdcNodeSpec.Replicas > 0) { offline, err := rebalanceTask.areScaleInDrainedNodesOffline(rc) if err != nil { return flow.Error(err, "Unable to determine offline status from GMS.") } if !offline { return flow.RetryAfter(20*time.Second, "Block until trailing DNs are marked offline.") } } if drained { return flow.Pass() } else { polardbx := rc.MustGetPolarDBX() polardbx.Status.StatusForPrint.RebalanceProcess = "stuck" return flow.Wait("Trailing DNs are not drained, must be verified manually.") } }, )
View Source
var PrepareRebalanceTaskContext = polardbxv1reconcile.NewStepBinder("PrepareRebalanceTaskContext", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask) if err != nil { return flow.Error(err, "Unable to get config map for task.") } contextAccess := task.NewContextAccess(taskCm, "rebalance") rebalanceTask := &DataRebalanceTask{} found, err := contextAccess.Read(rebalanceTask) if err != nil { return flow.Error(err, "Unable to read rebalance task context.") } if found { return flow.Pass() } polardbx := rc.MustGetPolarDBX() toReplicas := int(polardbx.Status.SpecSnapshot.Topology.Nodes.DN.Replicas) gmsMgr, err := rc.GetPolarDBXGMSManager() if err != nil { return flow.Error(err, "Unable to get manager of GMS.") } storageNodes, err := gmsMgr.ListStorageNodes(gms.StorageKindMaster) if err != nil { return flow.Error(err, "Unable to list storages of DNs.") } fromReplicas := len(storageNodes) rebalanceTask = &DataRebalanceTask{ From: fromReplicas, To: toReplicas, } err = contextAccess.Write(rebalanceTask) if err != nil { return flow.Error(err, "Unable to write rebalance task into config map.") } err = rc.Client().Update(rc.Context(), taskCm) if err != nil { return flow.Error(err, "Unable to update task config map.") } return flow.Continue("Rebalance task context prepared.") }, )
View Source
var ResetRebalanceTask = polardbxv1reconcile.NewStepBinder("ResetRebalanceTask", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { polardbx := rc.MustGetPolarDBX() polardbx.Status.StatusForPrint.RebalanceProcess = "" taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask) if err != nil { return flow.Error(err, "Unable to get config map for task.") } contextAccess := task.NewContextAccess(taskCm, "rebalance") ok := contextAccess.Clear() if ok { err = rc.Client().Update(rc.Context(), taskCm) if err != nil { return flow.Error(err, "Unable to update task config map.") } } return flow.Pass() }, )
View Source
var StartRebalanceTask = polardbxv1reconcile.NewStepBinder("StartRebalanceTask", func(rc *polardbxv1reconcile.Context, flow control.Flow) (reconcile.Result, error) { taskCm, err := rc.GetPolarDBXConfigMap(convention.ConfigMapTypeTask) if err != nil { return flow.Error(err, "Unable to get config map for task.") } contextAccess := task.NewContextAccess(taskCm, "rebalance") rebalanceTask := &DataRebalanceTask{} ok, err := contextAccess.Read(rebalanceTask) if err != nil { return flow.Error(err, "Unable to read rebalance task context.") } if !ok { return flow.Error(errors.New("no rebalance task context found"), "Unable to find rebalance task context.") } if rebalanceTask.Skip() || rebalanceTask.Started() { return flow.Pass() } planId, err := rebalanceTask.Start(rc) if err != nil { if err == group.ErrAlreadyInRebalance { flow.Logger().Info("Already in rebalance.") } else { return flow.Error(err, "Unable to start rebalance task.") } } flow.Logger().Info("Rebalance actions started.", "rebalance-plan", planId) rebalanceTask.PlanId = &planId err = contextAccess.Write(rebalanceTask) if err != nil { return flow.Error(err, "Unable to write rebalance task into config map.") } err = rc.Client().Update(rc.Context(), taskCm) if err != nil { return flow.Error(err, "Unable to update task config map.") } return flow.Pass() }, )
Functions ¶
func IsTrailingDNsDrained ¶
func IsTrailingDNsDrained(rc *polardbxv1reconcile.Context, rebalanceTask *DataRebalanceTask) (bool, error)
Types ¶
type DataRebalanceTask ¶
type DataRebalanceTask struct { From int `json:"from,omitempty"` To int `json:"to,omitempty"` PlanId *string `json:"plan_id,omitempty"` }
func (*DataRebalanceTask) IsReady ¶
func (t *DataRebalanceTask) IsReady(rc *polardbxv1reconcile.Context) (bool, error)
func (*DataRebalanceTask) Progress ¶
func (t *DataRebalanceTask) Progress(rc *polardbxv1reconcile.Context) (int, error)
func (*DataRebalanceTask) Skip ¶
func (t *DataRebalanceTask) Skip() bool
func (*DataRebalanceTask) Start ¶
func (t *DataRebalanceTask) Start(rc *polardbxv1reconcile.Context) (string, error)
func (*DataRebalanceTask) Started ¶
func (t *DataRebalanceTask) Started() bool
Click to show internal directories.
Click to hide internal directories.