Documentation ¶
Index ¶
- Variables
- func InitMetrics(registry *prometheus.Registry)
- type MockPD
- type SinkManager
- func CreateManagerWithMemEngine(t *testing.T, ctx context.Context, changefeedID model.ChangeFeedID, ...) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine)
- func New(changefeedID model.ChangeFeedID, sinkURI string, config *pconfig.ReplicaConfig, ...) *SinkManager
- func NewManagerWithMemEngine(t *testing.T, changefeedID model.ChangeFeedID, ...) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine)
- func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper
- func (m *SinkManager) AsyncStopTable(span tablepb.Span) bool
- func (m *SinkManager) Close()
- func (m *SinkManager) GetAllCurrentTableSpans() []tablepb.Span
- func (m *SinkManager) GetAllCurrentTableSpansCount() int
- func (m *SinkManager) GetTableState(span tablepb.Span) (tablepb.TableState, bool)
- func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats
- func (m *SinkManager) RemoveTable(span tablepb.Span)
- func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err error)
- func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error
- func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map[model.TableID]model.Ts)
- func (m *SinkManager) UpdateReceivedSorterResolvedTs(span tablepb.Span, ts model.Ts)
- func (m *SinkManager) WaitForReady(ctx context.Context)
- type TableStats
Constants ¶
This section is empty.
Variables ¶
var ( // RedoEventCache indicates redo event memory usage of a changefeed. RedoEventCache = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sinkmanager", Name: "redo_event_cache", Help: "redo event cache of the changefeed", }, []string{"namespace", "changefeed"}) // RedoEventCacheAccess indicates redo event cache hit ratio of a changefeed. RedoEventCacheAccess = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "sinkmanager", Name: "redo_event_cache_access", Help: "redo event cache access, including hit and miss", }, []string{"namespace", "changefeed", "type"}) )
Functions ¶
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics in this file.
Types ¶
type SinkManager ¶
type SinkManager struct {
// contains filtered or unexported fields
}
SinkManager is the implementation of SinkManager.
func CreateManagerWithMemEngine ¶
func CreateManagerWithMemEngine( t *testing.T, ctx context.Context, changefeedID model.ChangeFeedID, changefeedInfo *model.ChangeFeedInfo, errChan chan error, ) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine)
nolint:revive In test it is ok move the ctx to the second parameter.
func New ¶
func New( changefeedID model.ChangeFeedID, sinkURI string, config *pconfig.ReplicaConfig, up *upstream.Upstream, schemaStorage entry.SchemaStorage, redoDMLMgr redo.DMLManager, sourceManager *sourcemanager.SourceManager, isMysqlBackend bool, ) *SinkManager
New creates a new sink manager.
func NewManagerWithMemEngine ¶
func NewManagerWithMemEngine( t *testing.T, changefeedID model.ChangeFeedID, changefeedInfo *model.ChangeFeedInfo, redoMgr redo.DMLManager, ) (*SinkManager, *sourcemanager.SourceManager, sorter.SortEngine)
nolint:revive In test it is ok move the ctx to the second parameter.
func (*SinkManager) AddTable ¶
func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper
AddTable adds a table(TableSink) to the sink manager.
func (*SinkManager) AsyncStopTable ¶
func (m *SinkManager) AsyncStopTable(span tablepb.Span) bool
AsyncStopTable sets the table(TableSink) state to stopped.
func (*SinkManager) Close ¶
func (m *SinkManager) Close()
Close closes the manager. Must be called after `Run` returned.
func (*SinkManager) GetAllCurrentTableSpans ¶
func (m *SinkManager) GetAllCurrentTableSpans() []tablepb.Span
GetAllCurrentTableSpans returns all spans in the sinkManager.
func (*SinkManager) GetAllCurrentTableSpansCount ¶
func (m *SinkManager) GetAllCurrentTableSpansCount() int
GetAllCurrentTableSpansCount returns the table spans count in the sinkManager.
func (*SinkManager) GetTableState ¶
func (m *SinkManager) GetTableState(span tablepb.Span) (tablepb.TableState, bool)
GetTableState returns the table(TableSink) state.
func (*SinkManager) GetTableStats ¶
func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats
GetTableStats returns the state of the table.
func (*SinkManager) RemoveTable ¶
func (m *SinkManager) RemoveTable(span tablepb.Span)
RemoveTable removes a table(TableSink) from the sink manager.
func (*SinkManager) Run ¶
func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err error)
Run implements util.Runnable. When it returns, all sub-goroutines should be closed.
func (*SinkManager) StartTable ¶
StartTable sets the table(TableSink) state to replicating.
func (*SinkManager) UpdateBarrierTs ¶
func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map[model.TableID]model.Ts)
UpdateBarrierTs update all tableSink's barrierTs in the SinkManager
func (*SinkManager) UpdateReceivedSorterResolvedTs ¶
func (m *SinkManager) UpdateReceivedSorterResolvedTs(span tablepb.Span, ts model.Ts)
UpdateReceivedSorterResolvedTs updates the received sorter resolved ts for the table. NOTE: it's still possible to be called during m.Close is in calling, so Close should take care of this.
func (*SinkManager) WaitForReady ¶
func (m *SinkManager) WaitForReady(ctx context.Context)
WaitForReady implements pkg/util.Runnable.