replication

package
v0.0.0-...-80f49c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RolePrimary primary role.
	RolePrimary = 1
	// RoleSecondary secondary role.
	RoleSecondary = 2
	// RoleUndetermined means that we don't know its state, it may be
	// replicating, stopping or stopped.
	RoleUndetermined = 3
)

Variables

This section is empty.

Functions

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics used in scheduler

Types

type AddTable

type AddTable struct {
	Span         tablepb.Span
	CaptureID    model.CaptureID
	CheckpointTs model.Ts
}

AddTable is a schedule task for adding a table.

func (AddTable) String

func (t AddTable) String() string

type BurstBalance

type BurstBalance struct {
	AddTables    []AddTable
	RemoveTables []RemoveTable
	MoveTables   []MoveTable
}

BurstBalance for changefeed set up or unplanned TiCDC node failure. TiCDC needs to balance interrupted tables as soon as possible.

func (BurstBalance) String

func (b BurstBalance) String() string

type Callback

type Callback func()

Callback is invoked when something is done.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages replications and running scheduling tasks.

func NewReplicationManager

func NewReplicationManager(
	maxTaskConcurrency int, changefeedID model.ChangeFeedID,
) *Manager

NewReplicationManager returns a new replication manager.

func (*Manager) AdvanceCheckpoint

func (r *Manager) AdvanceCheckpoint(
	currentTables *TableRanges,
	currentPDTime time.Time,
	barrier *schedulepb.BarrierWithMinTs,
	redoMetaManager redo.MetaManager,
) (watermark schedulepb.Watermark)

AdvanceCheckpoint tries to advance checkpoint and returns current checkpoint.

func (*Manager) CleanMetrics

func (r *Manager) CleanMetrics()

CleanMetrics cleans metrics.

func (*Manager) CollectMetrics

func (r *Manager) CollectMetrics(currentPDTime time.Time)

CollectMetrics collects metrics.

func (*Manager) GetReplicationSetForTests

func (r *Manager) GetReplicationSetForTests() *spanz.BtreeMap[*ReplicationSet]

GetReplicationSetForTests is only used in tests.

func (*Manager) HandleCaptureChanges

func (r *Manager) HandleCaptureChanges(
	init map[model.CaptureID][]tablepb.TableStatus,
	removed map[model.CaptureID][]tablepb.TableStatus,
	checkpointTs model.Ts,
) ([]*schedulepb.Message, error)

HandleCaptureChanges handles capture changes.

func (*Manager) HandleMessage

func (r *Manager) HandleMessage(
	msgs []*schedulepb.Message,
) ([]*schedulepb.Message, error)

HandleMessage handles messages sent by other captures.

func (*Manager) HandleTasks

func (r *Manager) HandleTasks(
	tasks []*ScheduleTask,
) ([]*schedulepb.Message, error)

HandleTasks handles schedule tasks.

func (*Manager) ReplicationSets

func (r *Manager) ReplicationSets() *spanz.BtreeMap[*ReplicationSet]

ReplicationSets return all tracking replication set Caller must not modify the returned map.

func (*Manager) RunningTasks

func (r *Manager) RunningTasks() *spanz.BtreeMap[*ScheduleTask]

RunningTasks return running tasks. Caller must not modify the returned map.

func (*Manager) SetReplicationSetForTests

func (r *Manager) SetReplicationSetForTests(rs *ReplicationSet)

SetReplicationSetForTests is only used in tests.

type MoveTable

type MoveTable struct {
	Span        tablepb.Span
	DestCapture model.CaptureID
}

MoveTable is a schedule task for moving a table.

func (MoveTable) String

func (t MoveTable) String() string

type RemoveTable

type RemoveTable struct {
	Span      tablepb.Span
	CaptureID model.CaptureID
}

RemoveTable is a schedule task for removing a table.

func (RemoveTable) String

func (t RemoveTable) String() string

type ReplicationSet

type ReplicationSet struct {
	Changefeed model.ChangeFeedID
	Span       tablepb.Span
	State      ReplicationSetState
	// Primary is the capture ID that is currently replicating the table.
	Primary model.CaptureID
	// Captures is a map of captures that has the table replica.
	// NB: Invariant, 1) at most one primary, 2) primary capture must be in
	//     CaptureRolePrimary.
	Captures   map[model.CaptureID]Role
	Checkpoint tablepb.Checkpoint
	Stats      tablepb.Stats
}

ReplicationSet is a state machine that manages replication states.

func NewReplicationSet

func NewReplicationSet(
	span tablepb.Span,
	checkpoint model.Ts,
	tableStatus map[model.CaptureID]*tablepb.TableStatus,
	changefeed model.ChangeFeedID,
) (*ReplicationSet, error)

NewReplicationSet returns a new replication set.

type ReplicationSetState

type ReplicationSetState int

ReplicationSetState is the state of ReplicationSet in owner.

 AddTable
┌────────┐   ┌─────────┐
│ Absent ├─> │ Prepare │
└────────┘   └──┬──────┘
     ┌──────────┘   ^
     v              │ MoveTable
┌────────┐   ┌──────┴──────┐ RemoveTable ┌──────────┐
│ Commit ├──>│ Replicating │────────────>│ Removing │
└────────┘   └─────────────┘             └──────────┘

When a capture shutdown unexpectedly, we may need to transit the state to Absent or Replicating immediately.

const (
	// ReplicationSetStateUnknown means the replication state is unknown,
	// it should not happen.
	ReplicationSetStateUnknown ReplicationSetState = 0

	// ReplicationSetStateAbsent means there is no one replicates or prepares it.
	ReplicationSetStateAbsent ReplicationSetState = 1

	// ReplicationSetStatePrepare means one capture is preparing it,
	// there might have another capture is replicating the table.
	ReplicationSetStatePrepare ReplicationSetState = 2

	// ReplicationSetStateCommit means one capture is prepared,
	// it needs to promote secondary to primary.
	ReplicationSetStateCommit ReplicationSetState = 3

	// ReplicationSetStateReplicating means there is exactly one capture
	// that is replicating the table.
	ReplicationSetStateReplicating ReplicationSetState = 4

	// ReplicationSetStateRemoving means all captures need to
	// stop replication eventually.
	ReplicationSetStateRemoving ReplicationSetState = 5
)

func (ReplicationSetState) MarshalJSON

func (r ReplicationSetState) MarshalJSON() ([]byte, error)

MarshalJSON returns r as the JSON encoding of ReplicationSetState. Only used for pretty print in zap log.

func (ReplicationSetState) String

func (r ReplicationSetState) String() string

type Role

type Role int

Role is the role of a capture.

func (Role) MarshalJSON

func (r Role) MarshalJSON() ([]byte, error)

MarshalJSON returns r as the JSON encoding of CaptureRole. Only used for pretty print in zap log.

func (Role) String

func (r Role) String() string

type ScheduleTask

type ScheduleTask struct {
	MoveTable    *MoveTable
	AddTable     *AddTable
	RemoveTable  *RemoveTable
	BurstBalance *BurstBalance

	Accept Callback
}

ScheduleTask is a schedule task that wraps add/move/remove table tasks.

func (*ScheduleTask) Name

func (s *ScheduleTask) Name() string

Name returns the name of a schedule task.

func (*ScheduleTask) String

func (s *ScheduleTask) String() string

type SetHeap

type SetHeap []*ReplicationSet

SetHeap is a max-heap, it implements heap.Interface.

func NewReplicationSetHeap

func NewReplicationSetHeap(capacity int) SetHeap

NewReplicationSetHeap creates a new SetHeap.

func (SetHeap) Len

func (h SetHeap) Len() int

Len returns the length of the heap.

func (SetHeap) Less

func (h SetHeap) Less(i, j int) bool

Less returns true if the element at i is less than the element at j.

func (*SetHeap) Pop

func (h *SetHeap) Pop() interface{}

Pop pops an element from the heap.

func (*SetHeap) Push

func (h *SetHeap) Push(x interface{})

Push pushes an element to the heap.

func (SetHeap) Swap

func (h SetHeap) Swap(i, j int)

Swap swaps the elements with indexes i and j.

type TableRanges

type TableRanges struct {
	// contains filtered or unexported fields
}

TableRanges wraps current tables and their ranges.

func (*TableRanges) Iter

func (t *TableRanges) Iter(fn func(tableID model.TableID, tableStart, tableEnd tablepb.Span) bool)

Iter iterate current tables.

func (*TableRanges) Len

func (t *TableRanges) Len() int

Len returns the length of current tables.

func (*TableRanges) UpdateTables

func (t *TableRanges) UpdateTables(currentTables []model.TableID)

UpdateTables current tables.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL