Documentation ¶
Index ¶
- func GenerateChangefeedEpoch(ctx context.Context, pdClient pd.Client) uint64
- func InitMetrics(registry *prometheus.Registry)
- func NewChangefeed(id model.ChangeFeedID, cfInfo *model.ChangeFeedInfo, ...) *changefeed
- type Changefeed
- type ChangefeedState
- type DDLSink
- type FeedStateManager
- type Owner
- type Query
- type QueryType
- type StatusProvider
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenerateChangefeedEpoch ¶
GenerateChangefeedEpoch generates a unique changefeed epoch.
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers all metrics used in owner
func NewChangefeed ¶
func NewChangefeed( id model.ChangeFeedID, cfInfo *model.ChangeFeedInfo, cfStatus *model.ChangeFeedStatus, feedStateManager FeedStateManager, up *upstream.Upstream, cfg *config.SchedulerConfig, globalVars *vars.GlobalVars, ) *changefeed
NewChangefeed creates a new changefeed.
Types ¶
type Changefeed ¶
type Changefeed interface { // Tick is called periodically to drive the changefeed's internal logic. // The main logic of changefeed is in this function, including the calculation of many kinds of ts, // maintain table components, error handling, etc. // // It can be called in etcd ticks, so it should never be blocked. // Tick Returns: checkpointTs, minTableBarrierTs Tick(context.Context, *model.ChangeFeedInfo, *model.ChangeFeedStatus, map[model.CaptureID]*model.CaptureInfo) (model.Ts, model.Ts) // Close closes the changefeed. Close(ctx context.Context) // GetScheduler returns the scheduler of this changefeed. GetScheduler() scheduler.Scheduler }
Changefeed is the tick logic of changefeed.
type ChangefeedState ¶
type ChangefeedState interface { // GetID returns the changefeed ID. GetID() model.ChangeFeedID // GetChangefeedInfo returns the changefeed info. GetChangefeedInfo() *model.ChangeFeedInfo // GetChangefeedStatus returns the changefeed status. GetChangefeedStatus() *model.ChangeFeedStatus // RemoveChangefeed removes the changefeed and clean the information and status. RemoveChangefeed() // ResumeChangefeed resumes the changefeed and set the checkpoint ts. ResumeChangefeed(uint64) // SetWarning sets the warning to changefeed SetWarning(*model.RunningError) // TakeProcessorWarnings reuturns the warning of the changefeed and clean the warning. TakeProcessorWarnings() []*model.RunningError // SetError sets the error to changefeed SetError(*model.RunningError) // TakeProcessorErrors reuturns the error of the changefeed and clean the error. TakeProcessorErrors() []*model.RunningError // CleanUpTaskPositions removes the task positions of the changefeed. CleanUpTaskPositions() // UpdateChangefeedState returns the task status of the changefeed. UpdateChangefeedState(model.FeedState, model.AdminJobType, uint64) }
ChangefeedState is the interface for changefeed state in underlying storage.
type DDLSink ¶
type DDLSink interface {
// contains filtered or unexported methods
}
DDLSink is a wrapper of the `Sink` interface for the owner DDLSink should send `DDLEvent` and `CheckpointTs` to downstream, If `SyncPointEnabled`, also send `syncPoint` to downstream.
type FeedStateManager ¶
type FeedStateManager interface { // PushAdminJob pushed an admin job to the admin job queue PushAdminJob(job *model.AdminJob) // Tick is the main logic of the FeedStateManager, it will be called periodically // resolvedTs is the resolvedTs of the changefeed // returns true if there is a pending admin job, if so changefeed should not run the tick logic Tick(resolvedTs model.Ts, status *model.ChangeFeedStatus, info *model.ChangeFeedInfo) (adminJobPending bool) // HandleError is called an error occurs in Changefeed.Tick HandleError(errs ...*model.RunningError) // HandleWarning is called a warning occurs in Changefeed.Tick HandleWarning(warnings ...*model.RunningError) // ShouldRunning returns if the changefeed should be running ShouldRunning() bool // ShouldRemoved returns if the changefeed should be removed ShouldRemoved() bool // MarkFinished is call when a changefeed is finished MarkFinished() }
FeedStateManager manages the life cycle of a changefeed, currently it is responsible for: 1. Handle admin jobs 2. Handle errors 3. Handle warnings 4. Control the status of a changefeed
func NewFeedStateManager ¶
func NewFeedStateManager(up *upstream.Upstream, state ChangefeedState, ) FeedStateManager
NewFeedStateManager creates feedStateManager and initialize the exponential backoff
type Owner ¶
type Owner interface { EnqueueJob(adminJob model.AdminJob, done chan<- error) RebalanceTables(cfID model.ChangeFeedID, done chan<- error) ScheduleTable( cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID, done chan<- error, ) DrainCapture(query *scheduler.Query, done chan<- error) WriteDebugInfo(w io.Writer, done chan<- error) Query(query *Query, done chan<- error) AsyncStop() UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, ) error UpdateChangefeed(ctx context.Context, changeFeedInfo *model.ChangeFeedInfo) error CreateChangefeed(context.Context, *model.UpstreamInfo, *model.ChangeFeedInfo, ) error }
Owner managers TiCDC cluster.
The interface is thread-safe, except for Tick, it's only used by etcd worker.
func NewOwner ¶
func NewOwner( upstreamManager *upstream.Manager, cfg *config.SchedulerConfig, globalVars *vars.GlobalVars, ) Owner
NewOwner creates a new Owner
type Query ¶
type Query struct { Tp QueryType ChangeFeedID model.ChangeFeedID Data interface{} }
Query wraps query command and return results.
type QueryType ¶
type QueryType int32
QueryType is the type of different queries.
const ( // QueryAllTaskStatuses is the type of query all task statuses. QueryAllTaskStatuses QueryType = iota // QueryProcessors is the type of query processors. QueryProcessors // QueryCaptures is the type of query captures info. QueryCaptures // QueryHealth is the type of query cluster health info. QueryHealth // QueryOwner is the type of query changefeed owner QueryOwner // QueryChangefeedInfo is the type of query changefeed info QueryChangefeedInfo // QueryChangeFeedStatuses is the type of query changefeed status QueryChangeFeedStatuses // QueryChangeFeedSyncedStatus is the type of query changefeed synced status QueryChangeFeedSyncedStatus // QueryAllChangeFeedInfo is the type of query all changefeed info. QueryAllChangeFeedInfo // QueryAllChangeFeedSCheckpointTs query all changefeed checkpoint ts. QueryAllChangeFeedSCheckpointTs // QueryExists is the type of query check if a changefeed exists QueryExists )
type StatusProvider ¶
type StatusProvider interface { // GetAllChangeFeedInfo returns all changefeed infos GetAllChangeFeedInfo(ctx context.Context) ( map[model.ChangeFeedID]*model.ChangeFeedInfo, error, ) // GetAllChangeFeedCheckpointTs returns all changefeed checkpoints GetAllChangeFeedCheckpointTs(ctx context.Context) ( map[model.ChangeFeedID]uint64, error, ) // GetChangeFeedStatus returns a changefeeds' runtime status. GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatusForAPI, error) // GetChangeFeedSyncedStatus returns a changefeeds' synced status. GetChangeFeedSyncedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedSyncedStatusForAPI, error) // GetChangeFeedInfo returns a changefeeds' info. GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) // GetAllTaskStatuses returns the task statuses for the specified changefeed. GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) // GetProcessors returns the statuses of all processors GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) // GetCaptures returns the information about all captures. GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error) // IsHealthy return true if the cluster is healthy IsHealthy(ctx context.Context) (bool, error) // IsChangefeedExists returns true if a changefeed exits IsChangefeedExists(ctx context.Context, id model.ChangeFeedID) (bool, error) }
StatusProvider provide some func to get meta-information from owner The interface is thread-safe.
func NewStatusProvider ¶
func NewStatusProvider(owner Owner) StatusProvider
NewStatusProvider returns a new StatusProvider for the owner.