owner

package
v0.0.0-...-1dd87cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2025 License: Apache-2.0 Imports: 50 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateChangefeedEpoch

func GenerateChangefeedEpoch(ctx context.Context, pdClient pd.Client) uint64

GenerateChangefeedEpoch generates a unique changefeed epoch.

func InitMetrics

func InitMetrics(registry *prometheus.Registry)

InitMetrics registers all metrics used in owner

func NewChangefeed

func NewChangefeed(
	id model.ChangeFeedID,
	cfInfo *model.ChangeFeedInfo,
	cfStatus *model.ChangeFeedStatus,
	feedStateManager FeedStateManager,
	up *upstream.Upstream,
	cfg *config.SchedulerConfig,
	globalVars *vars.GlobalVars,
) *changefeed

NewChangefeed creates a new changefeed.

Types

type Changefeed

type Changefeed interface {
	// Tick is called periodically to drive the changefeed's internal logic.
	// The main logic of changefeed is in this function, including the calculation of many kinds of ts,
	// maintain table components, error handling, etc.
	//
	// It can be called in etcd ticks, so it should never be blocked.
	// Tick Returns:  checkpointTs, minTableBarrierTs
	Tick(context.Context, *model.ChangeFeedInfo,
		*model.ChangeFeedStatus,
		map[model.CaptureID]*model.CaptureInfo) (model.Ts, model.Ts)

	// Close closes the changefeed.
	Close(ctx context.Context)

	// GetScheduler returns the scheduler of this changefeed.
	GetScheduler() scheduler.Scheduler
}

Changefeed is the tick logic of changefeed.

type ChangefeedState

type ChangefeedState interface {
	// GetID returns the changefeed ID.
	GetID() model.ChangeFeedID
	// GetChangefeedInfo returns the changefeed info.
	GetChangefeedInfo() *model.ChangeFeedInfo
	// GetChangefeedStatus returns the changefeed status.
	GetChangefeedStatus() *model.ChangeFeedStatus
	// RemoveChangefeed removes the changefeed and clean the information and status.
	RemoveChangefeed()
	// ResumeChangefeed resumes the changefeed and set the checkpoint ts.
	ResumeChangefeed(uint64)
	// SetWarning sets the warning to changefeed
	SetWarning(*model.RunningError)
	// TakeProcessorWarnings reuturns the warning of the changefeed and clean the warning.
	TakeProcessorWarnings() []*model.RunningError
	// SetError sets the error to changefeed
	SetError(*model.RunningError)
	// TakeProcessorErrors reuturns the error of the changefeed and clean the error.
	TakeProcessorErrors() []*model.RunningError
	// CleanUpTaskPositions removes the task positions of the changefeed.
	CleanUpTaskPositions()
	// UpdateChangefeedState returns the task status of the changefeed.
	UpdateChangefeedState(model.FeedState, model.AdminJobType, uint64)
}

ChangefeedState is the interface for changefeed state in underlying storage.

type DDLSink

type DDLSink interface {
	// contains filtered or unexported methods
}

DDLSink is a wrapper of the `Sink` interface for the owner DDLSink should send `DDLEvent` and `CheckpointTs` to downstream, If `SyncPointEnabled`, also send `syncPoint` to downstream.

type FeedStateManager

type FeedStateManager interface {
	// PushAdminJob pushed an admin job to the admin job queue
	PushAdminJob(job *model.AdminJob)
	// Tick is the main logic of the FeedStateManager, it will be called periodically
	// resolvedTs is the resolvedTs of the changefeed
	// returns true if there is a pending admin job, if so changefeed should not run the tick logic
	Tick(resolvedTs model.Ts, status *model.ChangeFeedStatus, info *model.ChangeFeedInfo) (adminJobPending bool)
	// HandleError is called an error occurs in Changefeed.Tick
	HandleError(errs ...*model.RunningError)
	// HandleWarning is called a warning occurs in Changefeed.Tick
	HandleWarning(warnings ...*model.RunningError)
	// ShouldRunning returns if the changefeed should be running
	ShouldRunning() bool
	// ShouldRemoved returns if the changefeed should be removed
	ShouldRemoved() bool
	// MarkFinished is call when a changefeed is finished
	MarkFinished()
}

FeedStateManager manages the life cycle of a changefeed, currently it is responsible for: 1. Handle admin jobs 2. Handle errors 3. Handle warnings 4. Control the status of a changefeed

func NewFeedStateManager

func NewFeedStateManager(up *upstream.Upstream,
	state ChangefeedState,
) FeedStateManager

NewFeedStateManager creates feedStateManager and initialize the exponential backoff

type Owner

type Owner interface {
	EnqueueJob(adminJob model.AdminJob, done chan<- error)
	RebalanceTables(cfID model.ChangeFeedID, done chan<- error)
	ScheduleTable(
		cfID model.ChangeFeedID, toCapture model.CaptureID,
		tableID model.TableID, done chan<- error,
	)
	DrainCapture(query *scheduler.Query, done chan<- error)
	WriteDebugInfo(w io.Writer, done chan<- error)
	Query(query *Query, done chan<- error)
	AsyncStop()
	UpdateChangefeedAndUpstream(ctx context.Context,
		upstreamInfo *model.UpstreamInfo,
		changeFeedInfo *model.ChangeFeedInfo,
	) error
	UpdateChangefeed(ctx context.Context,
		changeFeedInfo *model.ChangeFeedInfo) error
	CreateChangefeed(context.Context,
		*model.UpstreamInfo,
		*model.ChangeFeedInfo,
	) error
}

Owner managers TiCDC cluster.

The interface is thread-safe, except for Tick, it's only used by etcd worker.

func NewOwner

func NewOwner(
	upstreamManager *upstream.Manager,
	cfg *config.SchedulerConfig,
	globalVars *vars.GlobalVars,
) Owner

NewOwner creates a new Owner

type Query

type Query struct {
	Tp           QueryType
	ChangeFeedID model.ChangeFeedID

	Data interface{}
}

Query wraps query command and return results.

type QueryType

type QueryType int32

QueryType is the type of different queries.

const (
	// QueryAllTaskStatuses is the type of query all task statuses.
	QueryAllTaskStatuses QueryType = iota
	// QueryProcessors is the type of query processors.
	QueryProcessors
	// QueryCaptures is the type of query captures info.
	QueryCaptures
	// QueryHealth is the type of query cluster health info.
	QueryHealth
	// QueryOwner is the type of query changefeed owner
	QueryOwner
	// QueryChangefeedInfo is the type of query changefeed info
	QueryChangefeedInfo
	// QueryChangeFeedStatuses is the type of query changefeed status
	QueryChangeFeedStatuses
	// QueryChangeFeedSyncedStatus is the type of query changefeed synced status
	QueryChangeFeedSyncedStatus
	// QueryAllChangeFeedInfo is the type of query all changefeed info.
	QueryAllChangeFeedInfo
	// QueryAllChangeFeedSCheckpointTs query all changefeed checkpoint ts.
	QueryAllChangeFeedSCheckpointTs
	// QueryExists is the type of query check if a changefeed exists
	QueryExists
)

type StatusProvider

type StatusProvider interface {
	// GetAllChangeFeedInfo returns all changefeed infos
	GetAllChangeFeedInfo(ctx context.Context) (
		map[model.ChangeFeedID]*model.ChangeFeedInfo, error,
	)
	// GetAllChangeFeedCheckpointTs returns all changefeed checkpoints
	GetAllChangeFeedCheckpointTs(ctx context.Context) (
		map[model.ChangeFeedID]uint64, error,
	)
	// GetChangeFeedStatus returns a changefeeds' runtime status.
	GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatusForAPI, error)

	// GetChangeFeedSyncedStatus returns a changefeeds' synced status.
	GetChangeFeedSyncedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedSyncedStatusForAPI, error)

	// GetChangeFeedInfo returns a changefeeds' info.
	GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error)

	// GetAllTaskStatuses returns the task statuses for the specified changefeed.
	GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error)

	// GetProcessors returns the statuses of all processors
	GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error)

	// GetCaptures returns the information about all captures.
	GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error)

	// IsHealthy return true if the cluster is healthy
	IsHealthy(ctx context.Context) (bool, error)
	// IsChangefeedExists returns true if a changefeed exits
	IsChangefeedExists(ctx context.Context, id model.ChangeFeedID) (bool, error)
}

StatusProvider provide some func to get meta-information from owner The interface is thread-safe.

func NewStatusProvider

func NewStatusProvider(owner Owner) StatusProvider

NewStatusProvider returns a new StatusProvider for the owner.

Directories

Path Synopsis
Package mock_owner is a generated GoMock package.
Package mock_owner is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL