sinkmanager

package
v0.0.0-...-99e00c0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// RedoEventCache indicates redo event memory usage of a changefeed.
	RedoEventCache = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "ticdc",
			Subsystem: "sinkmanager",
			Name:      "redo_event_cache",
			Help:      "redo event cache of the changefeed",
		},
		[]string{"namespace", "changefeed"})

	// RedoEventCacheAccess indicates redo event cache hit ratio of a changefeed.
	RedoEventCacheAccess = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "ticdc",
			Subsystem: "sinkmanager",
			Name:      "redo_event_cache_access",
			Help:      "redo event cache access, including hit and miss",
		},

		[]string{"namespace", "changefeed", "type"})
)

Functions

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics in this file.

Types

type MockPD

type MockPD struct {
	pd.Client
	// contains filtered or unexported fields
}

MockPD only for test.

func (*MockPD) GetTS

func (p *MockPD) GetTS(_ context.Context) (int64, int64, error)

GetTS implements the PD interface.

type SinkManager

type SinkManager struct {
	// contains filtered or unexported fields
}

SinkManager is the implementation of SinkManager.

func CreateManagerWithMemEngine

func CreateManagerWithMemEngine(
	t *testing.T,
	ctx context.Context,
	changefeedID model.ChangeFeedID,
	changefeedInfo *model.ChangeFeedInfo,
	errChan chan error,
) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine)

nolint:revive In test it is ok move the ctx to the second parameter.

func New

func New(
	changefeedID model.ChangeFeedID,
	sinkURI string,
	config *pconfig.ReplicaConfig,
	up *upstream.Upstream,
	schemaStorage entry.SchemaStorage,
	redoDMLMgr redo.DMLManager,
	sourceManager *sourcemanager.SourceManager,
	isMysqlBackend bool,
) *SinkManager

New creates a new sink manager.

func NewManagerWithMemEngine

func NewManagerWithMemEngine(
	t *testing.T,
	changefeedID model.ChangeFeedID,
	changefeedInfo *model.ChangeFeedInfo,
	redoMgr redo.DMLManager,
) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine)

nolint:revive In test it is ok move the ctx to the second parameter.

func (*SinkManager) AddTable

func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper

AddTable adds a table(TableSink) to the sink manager.

func (*SinkManager) AsyncStopTable

func (m *SinkManager) AsyncStopTable(span tablepb.Span) bool

AsyncStopTable sets the table(TableSink) state to stopped.

func (*SinkManager) Close

func (m *SinkManager) Close()

Close closes the manager. Must be called after `Run` returned.

func (*SinkManager) GetAllCurrentTableSpans

func (m *SinkManager) GetAllCurrentTableSpans() []tablepb.Span

GetAllCurrentTableSpans returns all spans in the sinkManager.

func (*SinkManager) GetAllCurrentTableSpansCount

func (m *SinkManager) GetAllCurrentTableSpansCount() int

GetAllCurrentTableSpansCount returns the table spans count in the sinkManager.

func (*SinkManager) GetTableState

func (m *SinkManager) GetTableState(span tablepb.Span) (tablepb.TableState, bool)

GetTableState returns the table(TableSink) state.

func (*SinkManager) GetTableStats

func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats

GetTableStats returns the state of the table.

func (*SinkManager) RemoveTable

func (m *SinkManager) RemoveTable(span tablepb.Span)

RemoveTable removes a table(TableSink) from the sink manager.

func (*SinkManager) Run

func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err error)

Run implements util.Runnable. When it returns, all sub-goroutines should be closed.

func (*SinkManager) StartTable

func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error

StartTable sets the table(TableSink) state to replicating.

func (*SinkManager) UpdateBarrierTs

func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map[model.TableID]model.Ts)

UpdateBarrierTs update all tableSink's barrierTs in the SinkManager

func (*SinkManager) UpdateReceivedSorterResolvedTs

func (m *SinkManager) UpdateReceivedSorterResolvedTs(span tablepb.Span, ts model.Ts)

UpdateReceivedSorterResolvedTs updates the received sorter resolved ts for the table. NOTE: it's still possible to be called during m.Close is in calling, so Close should take care of this.

func (*SinkManager) WaitForReady

func (m *SinkManager) WaitForReady(ctx context.Context)

WaitForReady implements pkg/util.Runnable.

type TableStats

type TableStats struct {
	CheckpointTs model.Ts
	ResolvedTs   model.Ts
	LastSyncedTs model.Ts
	BarrierTs    model.Ts
}

TableStats of a table sink.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL