Documentation ¶
Index ¶
- Constants
- func InitMetrics(registry *prometheus.Registry)
- type AddTable
- type BurstBalance
- type Callback
- type Manager
- func (r *Manager) AdvanceCheckpoint(currentTables *TableRanges, currentPDTime time.Time, ...) (watermark schedulepb.Watermark)
- func (r *Manager) CleanMetrics()
- func (r *Manager) CollectMetrics(currentPDTime time.Time)
- func (r *Manager) GetReplicationSetForTests() *spanz.BtreeMap[*ReplicationSet]
- func (r *Manager) HandleCaptureChanges(init map[model.CaptureID][]tablepb.TableStatus, ...) ([]*schedulepb.Message, error)
- func (r *Manager) HandleMessage(msgs []*schedulepb.Message) ([]*schedulepb.Message, error)
- func (r *Manager) HandleTasks(tasks []*ScheduleTask) ([]*schedulepb.Message, error)
- func (r *Manager) ReplicationSets() *spanz.BtreeMap[*ReplicationSet]
- func (r *Manager) RunningTasks() *spanz.BtreeMap[*ScheduleTask]
- func (r *Manager) SetReplicationSetForTests(rs *ReplicationSet)
- type MoveTable
- type RemoveTable
- type ReplicationSet
- type ReplicationSetState
- type Role
- type ScheduleTask
- type SetHeap
- type TableRanges
Constants ¶
const ( // RolePrimary primary role. RolePrimary = 1 // RoleSecondary secondary role. RoleSecondary = 2 // RoleUndetermined means that we don't know its state, it may be // replicating, stopping or stopped. RoleUndetermined = 3 )
Variables ¶
This section is empty.
Functions ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics used in scheduler
Types ¶
type BurstBalance ¶
type BurstBalance struct { AddTables []AddTable RemoveTables []RemoveTable MoveTables []MoveTable }
BurstBalance for changefeed set up or unplanned TiCDC node failure. TiCDC needs to balance interrupted tables as soon as possible.
func (BurstBalance) String ¶
func (b BurstBalance) String() string
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages replications and running scheduling tasks.
func NewReplicationManager ¶
func NewReplicationManager( maxTaskConcurrency int, changefeedID model.ChangeFeedID, ) *Manager
NewReplicationManager returns a new replication manager.
func (*Manager) AdvanceCheckpoint ¶
func (r *Manager) AdvanceCheckpoint( currentTables *TableRanges, currentPDTime time.Time, barrier *schedulepb.BarrierWithMinTs, redoMetaManager redo.MetaManager, ) (watermark schedulepb.Watermark)
AdvanceCheckpoint tries to advance checkpoint and returns current checkpoint.
func (*Manager) CollectMetrics ¶
CollectMetrics collects metrics.
func (*Manager) GetReplicationSetForTests ¶
func (r *Manager) GetReplicationSetForTests() *spanz.BtreeMap[*ReplicationSet]
GetReplicationSetForTests is only used in tests.
func (*Manager) HandleCaptureChanges ¶
func (r *Manager) HandleCaptureChanges( init map[model.CaptureID][]tablepb.TableStatus, removed map[model.CaptureID][]tablepb.TableStatus, checkpointTs model.Ts, ) ([]*schedulepb.Message, error)
HandleCaptureChanges handles capture changes.
func (*Manager) HandleMessage ¶
func (r *Manager) HandleMessage( msgs []*schedulepb.Message, ) ([]*schedulepb.Message, error)
HandleMessage handles messages sent by other captures.
func (*Manager) HandleTasks ¶
func (r *Manager) HandleTasks( tasks []*ScheduleTask, ) ([]*schedulepb.Message, error)
HandleTasks handles schedule tasks.
func (*Manager) ReplicationSets ¶
func (r *Manager) ReplicationSets() *spanz.BtreeMap[*ReplicationSet]
ReplicationSets return all tracking replication set Caller must not modify the returned map.
func (*Manager) RunningTasks ¶
func (r *Manager) RunningTasks() *spanz.BtreeMap[*ScheduleTask]
RunningTasks return running tasks. Caller must not modify the returned map.
func (*Manager) SetReplicationSetForTests ¶
func (r *Manager) SetReplicationSetForTests(rs *ReplicationSet)
SetReplicationSetForTests is only used in tests.
type RemoveTable ¶
RemoveTable is a schedule task for removing a table.
func (RemoveTable) String ¶
func (t RemoveTable) String() string
type ReplicationSet ¶
type ReplicationSet struct { Changefeed model.ChangeFeedID Span tablepb.Span State ReplicationSetState // Primary is the capture ID that is currently replicating the table. Primary model.CaptureID // Captures is a map of captures that has the table replica. // NB: Invariant, 1) at most one primary, 2) primary capture must be in // CaptureRolePrimary. Captures map[model.CaptureID]Role Checkpoint tablepb.Checkpoint Stats tablepb.Stats }
ReplicationSet is a state machine that manages replication states.
func NewReplicationSet ¶
func NewReplicationSet( span tablepb.Span, checkpoint model.Ts, tableStatus map[model.CaptureID]*tablepb.TableStatus, changefeed model.ChangeFeedID, ) (*ReplicationSet, error)
NewReplicationSet returns a new replication set.
type ReplicationSetState ¶
type ReplicationSetState int
ReplicationSetState is the state of ReplicationSet in owner.
AddTable ┌────────┐ ┌─────────┐ │ Absent ├─> │ Prepare │ └────────┘ └──┬──────┘ ┌──────────┘ ^ v │ MoveTable ┌────────┐ ┌──────┴──────┐ RemoveTable ┌──────────┐ │ Commit ├──>│ Replicating │────────────>│ Removing │ └────────┘ └─────────────┘ └──────────┘
When a capture shutdown unexpectedly, we may need to transit the state to Absent or Replicating immediately.
const ( // ReplicationSetStateUnknown means the replication state is unknown, // it should not happen. ReplicationSetStateUnknown ReplicationSetState = 0 // ReplicationSetStateAbsent means there is no one replicates or prepares it. ReplicationSetStateAbsent ReplicationSetState = 1 // ReplicationSetStatePrepare means one capture is preparing it, // there might have another capture is replicating the table. ReplicationSetStatePrepare ReplicationSetState = 2 // ReplicationSetStateCommit means one capture is prepared, // it needs to promote secondary to primary. ReplicationSetStateCommit ReplicationSetState = 3 // ReplicationSetStateReplicating means there is exactly one capture // that is replicating the table. ReplicationSetStateReplicating ReplicationSetState = 4 // ReplicationSetStateRemoving means all captures need to // stop replication eventually. ReplicationSetStateRemoving ReplicationSetState = 5 )
func (ReplicationSetState) MarshalJSON ¶
func (r ReplicationSetState) MarshalJSON() ([]byte, error)
MarshalJSON returns r as the JSON encoding of ReplicationSetState. Only used for pretty print in zap log.
func (ReplicationSetState) String ¶
func (r ReplicationSetState) String() string
type Role ¶
type Role int
Role is the role of a capture.
func (Role) MarshalJSON ¶
MarshalJSON returns r as the JSON encoding of CaptureRole. Only used for pretty print in zap log.
type ScheduleTask ¶
type ScheduleTask struct { MoveTable *MoveTable AddTable *AddTable RemoveTable *RemoveTable BurstBalance *BurstBalance Accept Callback }
ScheduleTask is a schedule task that wraps add/move/remove table tasks.
func (*ScheduleTask) Name ¶
func (s *ScheduleTask) Name() string
Name returns the name of a schedule task.
func (*ScheduleTask) String ¶
func (s *ScheduleTask) String() string
type SetHeap ¶
type SetHeap []*ReplicationSet
SetHeap is a max-heap, it implements heap.Interface.
func NewReplicationSetHeap ¶
NewReplicationSetHeap creates a new SetHeap.
type TableRanges ¶
type TableRanges struct {
// contains filtered or unexported fields
}
TableRanges wraps current tables and their ranges.
func (*TableRanges) UpdateTables ¶
func (t *TableRanges) UpdateTables(currentTables []model.TableID)
UpdateTables current tables.